diff options
Diffstat (limited to 'src/input/curl_input_plugin.c')
-rw-r--r-- | src/input/curl_input_plugin.c | 947 |
1 files changed, 703 insertions, 244 deletions
diff --git a/src/input/curl_input_plugin.c b/src/input/curl_input_plugin.c index 2c4ac2ff8..d4c116136 100644 --- a/src/input/curl_input_plugin.c +++ b/src/input/curl_input_plugin.c @@ -1,5 +1,5 @@ /* - * Copyright (C) 2003-2010 The Music Player Daemon Project + * Copyright (C) 2003-2011 The Music Player Daemon Project * http://www.musicpd.org * * This program is free software; you can redistribute it and/or modify @@ -19,10 +19,12 @@ #include "config.h" #include "input/curl_input_plugin.h" +#include "input_internal.h" #include "input_plugin.h" #include "conf.h" #include "tag.h" #include "icy_metadata.h" +#include "io_thread.h" #include "glib_compat.h" #include <assert.h> @@ -73,17 +75,32 @@ struct input_curl { /** the curl handles */ CURL *easy; - CURLM *multi; + + GMutex *mutex; + GCond *cond; + + /** the GMainLoop source used to poll all CURL file + descriptors */ + GSource *source; + + /** the source id of #source */ + guint source_id; + + /** a linked list of all registered GPollFD objects */ + GSList *fds; /** list of buffers, where input_curl_writefunction() appends to, and input_curl_read() reads from them */ GQueue *buffers; - /** has something been added to the buffers list? */ - bool buffered; - - /** did libcurl tell us the we're at the end of the response body? */ - bool eof; +#if LIBCURL_VERSION_NUM >= 0x071200 + /** + * Is the connection currently paused? That happens when the + * buffer was getting too large. It will be unpaused when the + * buffer is below the threshold again. + */ + bool paused; +#endif /** error message provided by libcurl */ char error[CURL_ERROR_SIZE]; @@ -97,6 +114,8 @@ struct input_curl { /** the tag object ready to be requested via input_stream_tag() */ struct tag *tag; + + GError *postponed_error; }; /** libcurl should accept "ICY 200 OK" */ @@ -106,20 +125,564 @@ static struct curl_slist *http_200_aliases; static const char *proxy, *proxy_user, *proxy_password; static unsigned proxy_port; +static struct { + CURLM *multi; + + /** + * A linked list of all active HTTP requests. An active + * request is one that doesn't have the "eof" flag set. + */ + GSList *requests; + + /** + * The GMainLoop source used to poll all CURL file + * descriptors. + */ + GSource *source; + + /** + * The source id of #source. + */ + guint source_id; + + GSList *fds; + + /** + * When this is non-zero, then an update of #fds is scheduled. + */ + guint dirty_source_id; + +#if LIBCURL_VERSION_NUM >= 0x070f04 + /** + * Did CURL give us a timeout? If yes, then we need to call + * curl_multi_perform(), even if there was no event on any + * file descriptor. + */ + bool timeout; + + /** + * The absolute time stamp when the timeout expires. This is + * used in the GSource method check(). + */ + GTimeVal absolute_timeout; +#endif +} curl; + static inline GQuark curl_quark(void) { return g_quark_from_static_string("curl"); } +/** + * Find a request by its CURL "easy" handle. + * + * Runs in the I/O thread. No lock needed. + */ +static struct input_curl * +input_curl_find_request(CURL *easy) +{ + assert(io_thread_inside()); + + for (GSList *i = curl.requests; i != NULL; i = g_slist_next(i)) { + struct input_curl *c = i->data; + if (c->easy == easy) + return c; + } + + return NULL; +} + +#if LIBCURL_VERSION_NUM >= 0x071200 + +static gpointer +input_curl_resume(gpointer data) +{ + assert(io_thread_inside()); + + struct input_curl *c = data; + + if (c->paused) { + curl_easy_pause(c->easy, CURLPAUSE_CONT); + c->paused = false; + } + + return NULL; +} + +#endif + +/** + * Calculates the GLib event bit mask for one file descriptor, + * obtained from three #fd_set objects filled by curl_multi_fdset(). + */ +static gushort +input_curl_fd_events(int fd, fd_set *rfds, fd_set *wfds, fd_set *efds) +{ + gushort events = 0; + + if (FD_ISSET(fd, rfds)) { + events |= G_IO_IN | G_IO_HUP | G_IO_ERR; + FD_CLR(fd, rfds); + } + + if (FD_ISSET(fd, wfds)) { + events |= G_IO_OUT | G_IO_ERR; + FD_CLR(fd, wfds); + } + + if (FD_ISSET(fd, efds)) { + events |= G_IO_HUP | G_IO_ERR; + FD_CLR(fd, efds); + } + + return events; +} + +/** + * Updates all registered GPollFD objects, unregisters old ones, + * registers new ones. + * + * Runs in the I/O thread. No lock needed. + */ +static void +curl_update_fds(void) +{ + assert(io_thread_inside()); + + fd_set rfds, wfds, efds; + + FD_ZERO(&rfds); + FD_ZERO(&wfds); + FD_ZERO(&efds); + + int max_fd; + CURLMcode mcode = curl_multi_fdset(curl.multi, &rfds, &wfds, + &efds, &max_fd); + if (mcode != CURLM_OK) { + g_warning("curl_multi_fdset() failed: %s\n", + curl_multi_strerror(mcode)); + return; + } + + GSList *fds = curl.fds; + curl.fds = NULL; + + while (fds != NULL) { + GPollFD *poll_fd = fds->data; + gushort events = input_curl_fd_events(poll_fd->fd, &rfds, + &wfds, &efds); + + assert(poll_fd->events != 0); + + fds = g_slist_remove(fds, poll_fd); + + if (events != poll_fd->events) + g_source_remove_poll(curl.source, poll_fd); + + if (events != 0) { + if (events != poll_fd->events) { + poll_fd->events = events; + g_source_add_poll(curl.source, poll_fd); + } + + curl.fds = g_slist_prepend(curl.fds, poll_fd); + } else { + g_free(poll_fd); + } + } + + for (int fd = 0; fd <= max_fd; ++fd) { + gushort events = input_curl_fd_events(fd, &rfds, &wfds, &efds); + if (events != 0) { + GPollFD *poll_fd = g_new(GPollFD, 1); + poll_fd->fd = fd; + poll_fd->events = events; + g_source_add_poll(curl.source, poll_fd); + curl.fds = g_slist_prepend(curl.fds, poll_fd); + } + } +} + +/** + * Callback for curl_schedule_update() that runs in the I/O thread. + */ +static gboolean +input_curl_dirty_callback(G_GNUC_UNUSED gpointer data) +{ + assert(io_thread_inside()); + assert(curl.dirty_source_id != 0 || curl.requests == NULL); + curl.dirty_source_id = 0; + + curl_update_fds(); + + return false; +} + +/** + * Schedule a refresh of curl.fds. Does nothing if that is already + * scheduled. + * + * No lock needed. + */ +static void +input_curl_schedule_update(void) +{ + if (curl.dirty_source_id != 0) + /* already scheduled */ + return; + + curl.dirty_source_id = + io_thread_idle_add(input_curl_dirty_callback, NULL); +} + +/** + * Runs in the I/O thread. No lock needed. + */ +static bool +input_curl_easy_add(struct input_curl *c, GError **error_r) +{ + assert(io_thread_inside()); + assert(c != NULL); + assert(c->easy != NULL); + assert(input_curl_find_request(c->easy) == NULL); + + curl.requests = g_slist_prepend(curl.requests, c); + + CURLMcode mcode = curl_multi_add_handle(curl.multi, c->easy); + if (mcode != CURLM_OK) { + g_set_error(error_r, curl_quark(), mcode, + "curl_multi_add_handle() failed: %s", + curl_multi_strerror(mcode)); + return false; + } + + input_curl_schedule_update(); + + return true; +} + +struct easy_add_params { + struct input_curl *c; + GError **error_r; +}; + +static gpointer +input_curl_easy_add_callback(gpointer data) +{ + const struct easy_add_params *params = data; + + bool success = input_curl_easy_add(params->c, params->error_r); + return GUINT_TO_POINTER(success); +} + +/** + * Call input_curl_easy_add() in the I/O thread. May be called from + * any thread. Caller must not hold a mutex. + */ +static bool +input_curl_easy_add_indirect(struct input_curl *c, GError **error_r) +{ + assert(c != NULL); + assert(c->easy != NULL); + + struct easy_add_params params = { + .c = c, + .error_r = error_r, + }; + + gpointer result = + io_thread_call(input_curl_easy_add_callback, ¶ms); + return GPOINTER_TO_UINT(result); +} + +/** + * Frees the current "libcurl easy" handle, and everything associated + * with it. + * + * Runs in the I/O thread. + */ +static void +input_curl_easy_free(struct input_curl *c) +{ + assert(io_thread_inside()); + assert(c != NULL); + + if (c->easy == NULL) + return; + + curl.requests = g_slist_remove(curl.requests, c); + + curl_multi_remove_handle(curl.multi, c->easy); + curl_easy_cleanup(c->easy); + c->easy = NULL; + + curl_slist_free_all(c->request_headers); + c->request_headers = NULL; + + g_free(c->range); + c->range = NULL; +} + +static gpointer +input_curl_easy_free_callback(gpointer data) +{ + struct input_curl *c = data; + + input_curl_easy_free(c); + curl_update_fds(); + + return NULL; +} + +/** + * Frees the current "libcurl easy" handle, and everything associated + * with it. + * + * The mutex must not be locked. + */ +static void +input_curl_easy_free_indirect(struct input_curl *c) +{ + io_thread_call(input_curl_easy_free_callback, c); + assert(c->easy == NULL); +} + +/** + * Abort and free all HTTP requests. + * + * Runs in the I/O thread. The caller must not hold locks. + */ +static void +input_curl_abort_all_requests(GError *error) +{ + assert(io_thread_inside()); + assert(error != NULL); + + while (curl.requests != NULL) { + struct input_curl *c = curl.requests->data; + assert(c->postponed_error == NULL); + + input_curl_easy_free(c); + + g_mutex_lock(c->mutex); + c->postponed_error = g_error_copy(error); + c->base.ready = true; + g_cond_broadcast(c->cond); + g_mutex_unlock(c->mutex); + } + + g_error_free(error); + +} + +/** + * A HTTP request is finished. + * + * Runs in the I/O thread. The caller must not hold locks. + */ +static void +input_curl_request_done(struct input_curl *c, CURLcode result, long status) +{ + assert(io_thread_inside()); + assert(c != NULL); + assert(c->easy == NULL); + assert(c->postponed_error == NULL); + + g_mutex_lock(c->mutex); + + if (result != CURLE_OK) { + c->postponed_error = g_error_new(curl_quark(), result, + "curl failed: %s", + c->error); + } else if (status < 200 || status >= 300) { + c->postponed_error = g_error_new(curl_quark(), 0, + "got HTTP status %ld", + status); + } + + c->base.ready = true; + g_cond_broadcast(c->cond); + g_mutex_unlock(c->mutex); +} + +static void +input_curl_handle_done(CURL *easy_handle, CURLcode result) +{ + struct input_curl *c = input_curl_find_request(easy_handle); + assert(c != NULL); + + long status = 0; + curl_easy_getinfo(easy_handle, CURLINFO_RESPONSE_CODE, &status); + + input_curl_easy_free(c); + input_curl_request_done(c, result, status); +} + +/** + * Check for finished HTTP responses. + * + * Runs in the I/O thread. The caller must not hold locks. + */ +static void +input_curl_info_read(void) +{ + assert(io_thread_inside()); + + CURLMsg *msg; + int msgs_in_queue; + + while ((msg = curl_multi_info_read(curl.multi, + &msgs_in_queue)) != NULL) { + if (msg->msg == CURLMSG_DONE) + input_curl_handle_done(msg->easy_handle, msg->data.result); + } +} + +/** + * Give control to CURL. + * + * Runs in the I/O thread. The caller must not hold locks. + */ +static bool +input_curl_perform(void) +{ + assert(io_thread_inside()); + + CURLMcode mcode; + + do { + int running_handles; + mcode = curl_multi_perform(curl.multi, &running_handles); + } while (mcode == CURLM_CALL_MULTI_PERFORM); + + if (mcode != CURLM_OK && mcode != CURLM_CALL_MULTI_PERFORM) { + GError *error = g_error_new(curl_quark(), mcode, + "curl_multi_perform() failed: %s", + curl_multi_strerror(mcode)); + input_curl_abort_all_requests(error); + return false; + } + + return true; +} + +/* + * GSource methods + * + */ + +/** + * The GSource prepare() method implementation. + */ +static gboolean +input_curl_source_prepare(G_GNUC_UNUSED GSource *source, gint *timeout_r) +{ + curl_update_fds(); + +#if LIBCURL_VERSION_NUM >= 0x070f04 + curl.timeout = false; + + long timeout2; + CURLMcode mcode = curl_multi_timeout(curl.multi, &timeout2); + if (mcode == CURLM_OK) { + if (timeout2 >= 0) { + g_source_get_current_time(source, + &curl.absolute_timeout); + g_time_val_add(&curl.absolute_timeout, + timeout2 * 1000); + } + + if (timeout2 >= 0 && timeout2 < 10) + /* CURL 7.21.1 likes to report "timeout=0", + which means we're running in a busy loop. + Quite a bad idea to waste so much CPU. + Let's use a lower limit of 10ms. */ + timeout2 = 10; + + *timeout_r = timeout2; + + curl.timeout = timeout2 >= 0; + } else + g_warning("curl_multi_timeout() failed: %s\n", + curl_multi_strerror(mcode)); +#else + (void)timeout_r; +#endif + + return false; +} + +/** + * The GSource check() method implementation. + */ +static gboolean +input_curl_source_check(G_GNUC_UNUSED GSource *source) +{ +#if LIBCURL_VERSION_NUM >= 0x070f04 + if (curl.timeout) { + /* when a timeout has expired, we need to call + curl_multi_perform(), even if there was no file + descriptor event */ + + GTimeVal now; + g_source_get_current_time(source, &now); + if (now.tv_sec > curl.absolute_timeout.tv_sec || + (now.tv_sec == curl.absolute_timeout.tv_sec && + now.tv_usec >= curl.absolute_timeout.tv_usec)) + return true; + } +#endif + + for (GSList *i = curl.fds; i != NULL; i = i->next) { + GPollFD *poll_fd = i->data; + if (poll_fd->revents != 0) + return true; + } + + return false; +} + +/** + * The GSource dispatch() method implementation. The callback isn't + * used, because we're handling all events directly. + */ +static gboolean +input_curl_source_dispatch(G_GNUC_UNUSED GSource *source, + G_GNUC_UNUSED GSourceFunc callback, + G_GNUC_UNUSED gpointer user_data) +{ + if (input_curl_perform()) + input_curl_info_read(); + + return true; +} + +/** + * The vtable for our GSource implementation. Unfortunately, we + * cannot declare it "const", because g_source_new() takes a non-const + * pointer, for whatever reason. + */ +static GSourceFuncs curl_source_funcs = { + .prepare = input_curl_source_prepare, + .check = input_curl_source_check, + .dispatch = input_curl_source_dispatch, +}; + +/* + * input_plugin methods + * + */ + static bool input_curl_init(const struct config_param *param, G_GNUC_UNUSED GError **error_r) { CURLcode code = curl_global_init(CURL_GLOBAL_ALL); if (code != CURLE_OK) { - g_warning("curl_global_init() failed: %s\n", - curl_easy_strerror(code)); + g_set_error(error_r, curl_quark(), code, + "curl_global_init() failed: %s\n", + curl_easy_strerror(code)); return false; } @@ -140,20 +703,58 @@ input_curl_init(const struct config_param *param, ""); } + curl.multi = curl_multi_init(); + if (curl.multi == NULL) { + g_set_error(error_r, curl_quark(), 0, + "curl_multi_init() failed"); + return false; + } + + curl.source = g_source_new(&curl_source_funcs, sizeof(*curl.source)); + curl.source_id = g_source_attach(curl.source, io_thread_context()); + return true; } +static gpointer +curl_destroy_sources(G_GNUC_UNUSED gpointer data) +{ + g_source_destroy(curl.source); + + if (curl.dirty_source_id != 0) { + GSource *source = + g_main_context_find_source_by_id(io_thread_context(), + curl.dirty_source_id); + assert(source != NULL); + curl.dirty_source_id = 0; + + g_source_destroy(source); + } + + return NULL; +} + static void input_curl_finish(void) { + assert(curl.requests == NULL); + + io_thread_call(curl_destroy_sources, NULL); + + curl_multi_cleanup(curl.multi); + curl_slist_free_all(http_200_aliases); curl_global_cleanup(); } +#if LIBCURL_VERSION_NUM >= 0x071200 + /** * Determine the total sizes of all buffers, including portions that * have already been consumed. + * + * The caller must lock the mutex. */ G_GNUC_PURE static size_t @@ -170,6 +771,8 @@ curl_total_buffer_size(const struct input_curl *c) return total; } +#endif + static void buffer_free_callback(gpointer data, G_GNUC_UNUSED gpointer user_data) { @@ -180,31 +783,15 @@ buffer_free_callback(gpointer data, G_GNUC_UNUSED gpointer user_data) g_free(data); } -/** - * Frees the current "libcurl easy" handle, and everything associated - * with it. - */ static void -input_curl_easy_free(struct input_curl *c) +input_curl_flush_buffers(struct input_curl *c) { - if (c->easy != NULL) { - curl_multi_remove_handle(c->multi, c->easy); - curl_easy_cleanup(c->easy); - c->easy = NULL; - } - - curl_slist_free_all(c->request_headers); - c->request_headers = NULL; - - g_free(c->range); - c->range = NULL; - g_queue_foreach(c->buffers, buffer_free_callback, NULL); g_queue_clear(c->buffers); } /** - * Frees this stream (but not the input_stream struct itself). + * Frees this stream, including the input_stream struct. */ static void input_curl_free(struct input_curl *c) @@ -213,13 +800,14 @@ input_curl_free(struct input_curl *c) tag_free(c->tag); g_free(c->meta_name); - input_curl_easy_free(c); - - if (c->multi != NULL) - curl_multi_cleanup(c->multi); + input_curl_easy_free_indirect(c); + input_curl_flush_buffers(c); g_queue_free(c->buffers); + g_mutex_free(c->mutex); + g_cond_free(c->cond); + g_free(c->url); input_stream_deinit(&c->base); g_free(c); @@ -236,119 +824,15 @@ input_curl_tag(struct input_stream *is) } static bool -input_curl_multi_info_read(struct input_curl *c, GError **error_r) -{ - CURLMsg *msg; - int msgs_in_queue; - - while ((msg = curl_multi_info_read(c->multi, - &msgs_in_queue)) != NULL) { - if (msg->msg == CURLMSG_DONE) { - c->eof = true; - c->base.ready = true; - - if (msg->data.result != CURLE_OK) { - g_set_error(error_r, curl_quark(), - msg->data.result, - "curl failed: %s", c->error); - return false; - } - } - } - - return true; -} - -/** - * Wait for the libcurl socket. - * - * @return -1 on error, 0 if no data is available yet, 1 if data is - * available - */ -static int -input_curl_select(struct input_curl *c, GError **error_r) +fill_buffer(struct input_curl *c, GError **error_r) { - fd_set rfds, wfds, efds; - int max_fd, ret; - CURLMcode mcode; - struct timeval timeout = { - .tv_sec = 1, - .tv_usec = 0, - }; - - assert(!c->eof); - - FD_ZERO(&rfds); - FD_ZERO(&wfds); - FD_ZERO(&efds); + while (c->easy != NULL && g_queue_is_empty(c->buffers)) + g_cond_wait(c->cond, c->mutex); - mcode = curl_multi_fdset(c->multi, &rfds, &wfds, &efds, &max_fd); - if (mcode != CURLM_OK) { - g_set_error(error_r, curl_quark(), mcode, - "curl_multi_fdset() failed: %s", - curl_multi_strerror(mcode)); - return -1; - } - -#if LIBCURL_VERSION_NUM >= 0x070f04 - long timeout2; - mcode = curl_multi_timeout(c->multi, &timeout2); - if (mcode != CURLM_OK) { - g_warning("curl_multi_timeout() failed: %s\n", - curl_multi_strerror(mcode)); - return -1; - } - - if (timeout2 >= 0) { - if (timeout2 > 10000) - timeout2 = 10000; - - timeout.tv_sec = timeout2 / 1000; - timeout.tv_usec = (timeout2 % 1000) * 1000; - } -#endif - - ret = select(max_fd + 1, &rfds, &wfds, &efds, &timeout); - if (ret < 0) - g_set_error(error_r, g_quark_from_static_string("errno"), - errno, - "select() failed: %s\n", g_strerror(errno)); - - return ret; -} - -static bool -fill_buffer(struct input_stream *is, GError **error_r) -{ - struct input_curl *c = (struct input_curl *)is; - CURLMcode mcode = CURLM_CALL_MULTI_PERFORM; - - while (!c->eof && g_queue_is_empty(c->buffers)) { - int running_handles; - bool bret; - - if (mcode != CURLM_CALL_MULTI_PERFORM) { - /* if we're still here, there is no input yet - - wait for input */ - int ret = input_curl_select(c, error_r); - if (ret <= 0) - /* no data yet or error */ - return false; - } - - mcode = curl_multi_perform(c->multi, &running_handles); - if (mcode != CURLM_OK && mcode != CURLM_CALL_MULTI_PERFORM) { - g_set_error(error_r, curl_quark(), mcode, - "curl_multi_perform() failed: %s", - curl_multi_strerror(mcode)); - c->eof = true; - is->ready = true; - return false; - } - - bret = input_curl_multi_info_read(c, error_r); - if (!bret) - return false; + if (c->postponed_error != NULL) { + g_propagate_error(error_r, c->postponed_error); + c->postponed_error = NULL; + return false; } return !g_queue_is_empty(c->buffers); @@ -453,12 +937,16 @@ input_curl_read(struct input_stream *is, void *ptr, size_t size, size_t nbytes = 0; char *dest = ptr; + g_mutex_lock(c->mutex); + do { /* fill the buffer */ - success = fill_buffer(is, error_r); - if (!success) + success = fill_buffer(c, error_r); + if (!success) { + g_mutex_unlock(c->mutex); return 0; + } /* send buffer contents */ @@ -476,6 +964,13 @@ input_curl_read(struct input_stream *is, void *ptr, size_t size, is->offset += (goffset)nbytes; +#if LIBCURL_VERSION_NUM >= 0x071200 + if (c->paused && curl_total_buffer_size(c) < CURL_MAX_BUFFERED) + io_thread_call(input_curl_resume, c); +#endif + + g_mutex_unlock(c->mutex); + return nbytes; } @@ -492,7 +987,11 @@ input_curl_eof(G_GNUC_UNUSED struct input_stream *is) { struct input_curl *c = (struct input_curl *)is; - return c->eof && g_queue_is_empty(c->buffers); + g_mutex_lock(c->mutex); + bool eof = c->easy == NULL && g_queue_is_empty(c->buffers); + g_mutex_unlock(c->mutex); + + return eof; } static int @@ -500,41 +999,21 @@ input_curl_buffer(struct input_stream *is, GError **error_r) { struct input_curl *c = (struct input_curl *)is; - if (curl_total_buffer_size(c) >= CURL_MAX_BUFFERED) - return 0; - - CURLMcode mcode; - int running_handles; - bool ret; - - c->buffered = false; - - if (!is->ready && !c->eof) - /* not ready yet means the caller is waiting in a busy - loop; relax that by calling select() on the - socket */ - if (input_curl_select(c, error_r) < 0) - return -1; + g_mutex_lock(c->mutex); - do { - mcode = curl_multi_perform(c->multi, &running_handles); - } while (mcode == CURLM_CALL_MULTI_PERFORM && - g_queue_is_empty(c->buffers)); + int result; + if (c->postponed_error != NULL) { + g_propagate_error(error_r, c->postponed_error); + c->postponed_error = NULL; + result = -1; + } else if (g_queue_is_empty(c->buffers)) + result = 0; + else + result = 1; - if (mcode != CURLM_OK && mcode != CURLM_CALL_MULTI_PERFORM) { - g_set_error(error_r, curl_quark(), mcode, - "curl_multi_perform() failed: %s", - curl_multi_strerror(mcode)); - c->eof = true; - is->ready = true; - return -1; - } - - ret = input_curl_multi_info_read(c, error_r); - if (!ret) - return -1; + g_mutex_unlock(c->mutex); - return c->buffered; + return result; } /** called by curl when new data is available */ @@ -632,15 +1111,28 @@ input_curl_writefunction(void *ptr, size_t size, size_t nmemb, void *stream) if (size == 0) return 0; + g_mutex_lock(c->mutex); + +#if LIBCURL_VERSION_NUM >= 0x071200 + if (curl_total_buffer_size(c) + size >= CURL_MAX_BUFFERED) { + c->paused = true; + g_mutex_unlock(c->mutex); + return CURL_WRITEFUNC_PAUSE; + } +#endif + buffer = g_malloc(sizeof(*buffer) - sizeof(buffer->data) + size); buffer->size = size; buffer->consumed = 0; memcpy(buffer->data, ptr, size); + g_queue_push_tail(c->buffers, buffer); - c->buffered = true; c->base.ready = true; + g_cond_broadcast(c->cond); + g_mutex_unlock(c->mutex); + return size; } @@ -648,9 +1140,6 @@ static bool input_curl_easy_init(struct input_curl *c, GError **error_r) { CURLcode code; - CURLMcode mcode; - - c->eof = false; c->easy = curl_easy_init(); if (c->easy == NULL) { @@ -659,14 +1148,6 @@ input_curl_easy_init(struct input_curl *c, GError **error_r) return false; } - mcode = curl_multi_add_handle(c->multi, c->easy); - if (mcode != CURLM_OK) { - g_set_error(error_r, curl_quark(), mcode, - "curl_multi_add_handle() failed: %s", - curl_multi_strerror(mcode)); - return false; - } - curl_easy_setopt(c->easy, CURLOPT_USERAGENT, "Music Player Daemon " VERSION); curl_easy_setopt(c->easy, CURLOPT_HEADERFUNCTION, @@ -677,6 +1158,7 @@ input_curl_easy_init(struct input_curl *c, GError **error_r) curl_easy_setopt(c->easy, CURLOPT_WRITEDATA, c); curl_easy_setopt(c->easy, CURLOPT_HTTP200ALIASES, http_200_aliases); curl_easy_setopt(c->easy, CURLOPT_FOLLOWLOCATION, 1); + curl_easy_setopt(c->easy, CURLOPT_NETRC, 1); curl_easy_setopt(c->easy, CURLOPT_MAXREDIRS, 5); curl_easy_setopt(c->easy, CURLOPT_FAILONERROR, true); curl_easy_setopt(c->easy, CURLOPT_ERRORBUFFER, c->error); @@ -713,38 +1195,6 @@ input_curl_easy_init(struct input_curl *c, GError **error_r) return true; } -void -input_curl_reinit(struct input_stream *is) -{ - struct input_curl *c = (struct input_curl *)is; - - assert(c->base.plugin == &input_plugin_curl); - assert(c->easy != NULL); - - curl_easy_setopt(c->easy, CURLOPT_WRITEHEADER, is); - curl_easy_setopt(c->easy, CURLOPT_WRITEDATA, is); -} - -static bool -input_curl_send_request(struct input_curl *c, GError **error_r) -{ - CURLMcode mcode; - int running_handles; - - do { - mcode = curl_multi_perform(c->multi, &running_handles); - } while (mcode == CURLM_CALL_MULTI_PERFORM); - - if (mcode != CURLM_OK) { - g_set_error(error_r, curl_quark(), mcode, - "curl_multi_perform() failed: %s", - curl_multi_strerror(mcode)); - return false; - } - - return true; -} - static bool input_curl_seek(struct input_stream *is, goffset offset, int whence, GError **error_r) @@ -788,6 +1238,8 @@ input_curl_seek(struct input_stream *is, goffset offset, int whence, /* check if we can fast-forward the buffer */ + g_mutex_lock(c->mutex); + while (offset > is->offset && !g_queue_is_empty(c->buffers)) { struct buffer *buffer; size_t length; @@ -805,19 +1257,21 @@ input_curl_seek(struct input_stream *is, goffset offset, int whence, is->offset += length; } + g_mutex_unlock(c->mutex); + if (offset == is->offset) return true; /* close the old connection and open a new one */ - input_curl_easy_free(c); + input_curl_easy_free_indirect(c); + input_curl_flush_buffers(c); is->offset = offset; if (is->offset == is->size) { /* seek to EOF: simulate empty result; avoid triggering a "416 Requested Range Not Satisfiable" response */ - c->eof = true; return true; } @@ -832,18 +1286,32 @@ input_curl_seek(struct input_stream *is, goffset offset, int whence, curl_easy_setopt(c->easy, CURLOPT_RANGE, c->range); } - ret = input_curl_send_request(c, error_r); - if (!ret) + c->base.ready = false; + + if (!input_curl_easy_add_indirect(c, error_r)) return false; - return input_curl_multi_info_read(c, error_r); + g_mutex_lock(c->mutex); + + while (!c->base.ready) + g_cond_wait(c->cond, c->mutex); + + if (c->postponed_error != NULL) { + g_propagate_error(error_r, c->postponed_error); + c->postponed_error = NULL; + g_mutex_unlock(c->mutex); + return false; + } + + g_mutex_unlock(c->mutex); + + return true; } static struct input_stream * input_curl_open(const char *url, GError **error_r) { struct input_curl *c; - bool ret; if (strncmp(url, "http://", 7) != 0) return NULL; @@ -851,34 +1319,25 @@ input_curl_open(const char *url, GError **error_r) c = g_new0(struct input_curl, 1); input_stream_init(&c->base, &input_plugin_curl, url); + c->mutex = g_mutex_new(); + c->cond = g_cond_new(); + c->url = g_strdup(url); c->buffers = g_queue_new(); - c->multi = curl_multi_init(); - if (c->multi == NULL) { - g_set_error(error_r, curl_quark(), 0, - "curl_multi_init() failed"); - input_curl_free(c); - return NULL; - } - icy_clear(&c->icy_metadata); c->tag = NULL; - ret = input_curl_easy_init(c, error_r); - if (!ret) { - input_curl_free(c); - return NULL; - } +#if LIBCURL_VERSION_NUM >= 0x071200 + c->paused = false; +#endif - ret = input_curl_send_request(c, error_r); - if (!ret) { + if (!input_curl_easy_init(c, error_r)) { input_curl_free(c); return NULL; } - ret = input_curl_multi_info_read(c, error_r); - if (!ret) { + if (!input_curl_easy_add_indirect(c, error_r)) { input_curl_free(c); return NULL; } |