diff options
author | Max Kellermann <max@duempel.org> | 2011-08-24 02:54:05 +0200 |
---|---|---|
committer | Max Kellermann <max@duempel.org> | 2011-08-25 19:20:57 +0200 |
commit | 4f021cbced19dfc610d4f24168d29bd71963dd5e (patch) | |
tree | e08137159ba2a4c45b773f3e776cef2b56d1ef7e | |
parent | ba31d176c8a58a91ac720486c4af2e591d1b7e43 (diff) | |
download | mpd-4f021cbced19dfc610d4f24168d29bd71963dd5e.tar.gz mpd-4f021cbced19dfc610d4f24168d29bd71963dd5e.tar.xz mpd-4f021cbced19dfc610d4f24168d29bd71963dd5e.zip |
input/curl: use the I/O thread
Background buffering and better timeout handling. This patch sort of
obsoletes the input_plugin method buffer().
Diffstat (limited to '')
-rw-r--r-- | src/input/curl_input_plugin.c | 875 |
1 files changed, 653 insertions, 222 deletions
diff --git a/src/input/curl_input_plugin.c b/src/input/curl_input_plugin.c index f344c3f11..54c6d2f18 100644 --- a/src/input/curl_input_plugin.c +++ b/src/input/curl_input_plugin.c @@ -23,6 +23,7 @@ #include "conf.h" #include "tag.h" #include "icy_metadata.h" +#include "io_thread.h" #include "glib_compat.h" #include <assert.h> @@ -73,14 +74,29 @@ struct input_curl { /** the curl handles */ CURL *easy; - CURLM *multi; + + /** the GMainLoop source used to poll all CURL file + descriptors */ + GSource *source; + + /** the source id of #source */ + guint source_id; + + /** a linked list of all registered GPollFD objects */ + GSList *fds; /** list of buffers, where input_curl_writefunction() appends to, and input_curl_read() reads from them */ GQueue *buffers; - /** has something been added to the buffers list? */ - bool buffered; +#if LIBCURL_VERSION_NUM >= 0x071200 + /** + * Is the connection currently paused? That happens when the + * buffer was getting too large. It will be unpaused when the + * buffer is below the threshold again. + */ + bool paused; +#endif /** error message provided by libcurl */ char error[CURL_ERROR_SIZE]; @@ -94,6 +110,8 @@ struct input_curl { /** the tag object ready to be requested via input_stream_tag() */ struct tag *tag; + + GError *postponed_error; }; /** libcurl should accept "ICY 200 OK" */ @@ -103,12 +121,525 @@ static struct curl_slist *http_200_aliases; static const char *proxy, *proxy_user, *proxy_password; static unsigned proxy_port; +static struct { + GStaticMutex mutex; + GCond *cond; + CURLM *multi; + + /** + * A linked list of all active HTTP requests. An active + * request is one that doesn't have the "eof" flag set. + */ + GSList *requests; + + /** + * The GMainLoop source used to poll all CURL file + * descriptors. + */ + GSource *source; + + /** + * The source id of #source. + */ + guint source_id; + + GSList *fds; + + /** + * When this is non-zero, then an update of #fds is scheduled. + */ + guint dirty_source_id; + +#if LIBCURL_VERSION_NUM >= 0x070f04 + /** + * Did CURL give us a timeout? If yes, then we need to call + * curl_multi_perform(), even if there was no event on any + * file descriptor. + */ + bool timeout; + + /** + * The absolute time stamp when the timeout expires. This is + * used in the GSource method check(). + */ + GTimeVal absolute_timeout; +#endif +} curl = { + .mutex = G_STATIC_MUTEX_INIT, +}; + static inline GQuark curl_quark(void) { return g_quark_from_static_string("curl"); } +/** + * Find a request by its CURL "easy" handle. + * + * The caller must lock the mutex. + */ +static struct input_curl * +input_curl_find_request(CURL *easy) +{ + for (GSList *i = curl.requests; i != NULL; i = g_slist_next(i)) { + struct input_curl *c = i->data; + if (c->easy == easy) + return c; + } + + return NULL; +} + +#if LIBCURL_VERSION_NUM >= 0x071200 + +static gpointer +input_curl_resume(gpointer data) +{ + struct input_curl *c = data; + + if (c->paused) { + curl_easy_pause(c->easy, CURLPAUSE_CONT); + c->paused = false; + } + + return NULL; +} + +#endif + +/** + * Calculates the GLib event bit mask for one file descriptor, + * obtained from three #fd_set objects filled by curl_multi_fdset(). + */ +static gushort +input_curl_fd_events(int fd, fd_set *rfds, fd_set *wfds, fd_set *efds) +{ + gushort events = 0; + + if (FD_ISSET(fd, rfds)) { + events |= G_IO_IN | G_IO_HUP | G_IO_ERR; + FD_CLR(fd, rfds); + } + + if (FD_ISSET(fd, wfds)) { + events |= G_IO_OUT | G_IO_ERR; + FD_CLR(fd, wfds); + } + + if (FD_ISSET(fd, efds)) { + events |= G_IO_HUP | G_IO_ERR; + FD_CLR(fd, efds); + } + + return events; +} + +/** + * Updates all registered GPollFD objects, unregisters old ones, + * registers new ones. + * + * The caller must lock the mutex. Runs in the I/O thread. + */ +static void +curl_update_fds(void) +{ + fd_set rfds, wfds, efds; + + FD_ZERO(&rfds); + FD_ZERO(&wfds); + FD_ZERO(&efds); + + int max_fd; + CURLMcode mcode = curl_multi_fdset(curl.multi, &rfds, &wfds, + &efds, &max_fd); + if (mcode != CURLM_OK) { + g_warning("curl_multi_fdset() failed: %s\n", + curl_multi_strerror(mcode)); + return; + } + + GSList *fds = curl.fds; + curl.fds = NULL; + + while (fds != NULL) { + GPollFD *poll_fd = fds->data; + gushort events = input_curl_fd_events(poll_fd->fd, &rfds, + &wfds, &efds); + + assert(poll_fd->events != 0); + + fds = g_slist_remove(fds, poll_fd); + + if (events != poll_fd->events) + g_source_remove_poll(curl.source, poll_fd); + + if (events != 0) { + if (events != poll_fd->events) { + poll_fd->events = events; + g_source_add_poll(curl.source, poll_fd); + } + + curl.fds = g_slist_prepend(curl.fds, poll_fd); + } else { + g_free(poll_fd); + } + } + + for (int fd = 0; fd <= max_fd; ++fd) { + gushort events = input_curl_fd_events(fd, &rfds, &wfds, &efds); + if (events != 0) { + GPollFD *poll_fd = g_new(GPollFD, 1); + poll_fd->fd = fd; + poll_fd->events = events; + g_source_add_poll(curl.source, poll_fd); + curl.fds = g_slist_prepend(curl.fds, poll_fd); + } + } +} + +/** + * Callback for curl_schedule_update() that runs in the I/O thread. + */ +static gboolean +input_curl_dirty_callback(G_GNUC_UNUSED gpointer data) +{ + g_static_mutex_lock(&curl.mutex); + + assert(curl.dirty_source_id != 0 || curl.requests == NULL); + curl.dirty_source_id = 0; + + curl_update_fds(); + + g_static_mutex_unlock(&curl.mutex); + return false; +} + +/** + * Schedule a refresh of curl.fds. Does nothing if that is already + * scheduled. + * + * The caller must lock the mutex. + */ +static void +input_curl_schedule_update(void) +{ + if (curl.dirty_source_id != 0) + /* already scheduled */ + return; + + curl.dirty_source_id = + io_thread_idle_add(input_curl_dirty_callback, NULL); +} + +static bool +input_curl_easy_add(struct input_curl *c, GError **error_r) +{ + assert(c != NULL); + assert(c->easy != NULL); + assert(input_curl_find_request(c->easy) == NULL); + + curl.requests = g_slist_prepend(curl.requests, c); + + CURLMcode mcode = curl_multi_add_handle(curl.multi, c->easy); + if (mcode != CURLM_OK) { + g_set_error(error_r, curl_quark(), mcode, + "curl_multi_add_handle() failed: %s", + curl_multi_strerror(mcode)); + return false; + } + + input_curl_schedule_update(); + + return true; +} + +/** + * Frees the current "libcurl easy" handle, and everything associated + * with it. + * + * The caller must lock the mutex. + */ +static void +input_curl_easy_free(struct input_curl *c) +{ + assert(c != NULL); + + if (c->easy == NULL) + return; + + curl.requests = g_slist_remove(curl.requests, c); + + curl_multi_remove_handle(curl.multi, c->easy); + curl_easy_cleanup(c->easy); + c->easy = NULL; + + curl_slist_free_all(c->request_headers); + c->request_headers = NULL; + + g_free(c->range); + c->range = NULL; + + c->base.ready = true; +} + +static gpointer +input_curl_easy_free_callback(gpointer data) +{ + struct input_curl *c = data; + + g_static_mutex_lock(&curl.mutex); + + input_curl_easy_free(c); + curl_update_fds(); + + g_static_mutex_unlock(&curl.mutex); + + return NULL; +} + +/** + * Frees the current "libcurl easy" handle, and everything associated + * with it. + * + * The mutex must not be locked. + */ +static void +input_curl_easy_free_indirect(struct input_curl *c) +{ + io_thread_call(input_curl_easy_free_callback, c); + assert(c->easy == NULL); +} + +/** + * Aborts and frees a running HTTP request. + * + * The caller must lock the mutex. Runs in the I/O thread. + */ +static void +input_curl_request_abort(struct input_curl *c, GError *error) +{ + assert(c != NULL); + assert(c->postponed_error == NULL); + assert(error != NULL); + + input_curl_easy_free(c); + + c->postponed_error = error; + + g_cond_broadcast(curl.cond); +} + +/** + * Abort and free all HTTP requests. + * + * The caller must lock the mutex. Runs in the I/O thread. + */ +static void +input_curl_abort_all_requests(GError *error) +{ + while (curl.requests != NULL) { + struct input_curl *is = curl.requests->data; + input_curl_request_abort(is, g_error_copy(error)); + } + + g_error_free(error); +} + +/** + * A HTTP request is finished. + * + * The caller must lock the mutex. Runs in the I/O thread. + */ +static void +input_curl_request_done(struct input_curl *c, CURLcode result, long status) +{ + assert(c->easy == NULL); + assert(c->base.ready); + + if (result != CURLE_OK) { + GError *error = g_error_new(curl_quark(), result, + "curl failed: %s", + c->error); + input_curl_request_abort(c, error); + } else if (status < 200 || status >= 300) { + GError *error = g_error_new(curl_quark(), 0, + "got HTTP status %ld", + status); + input_curl_request_abort(c, error); + } else { + g_cond_broadcast(curl.cond); + } +} + +static void +input_curl_handle_done(CURL *easy_handle, CURLcode result) +{ + struct input_curl *c = input_curl_find_request(easy_handle); + assert(c != NULL); + + long status = 0; + curl_easy_getinfo(easy_handle, CURLINFO_RESPONSE_CODE, &status); + + input_curl_easy_free(c); + input_curl_request_done(c, result, status); +} + +/** + * Check for finished HTTP responses. + * + * The caller must lock the mutex. Runs in the I/O thread. + */ +static void +input_curl_info_read(void) +{ + CURLMsg *msg; + int msgs_in_queue; + + while ((msg = curl_multi_info_read(curl.multi, + &msgs_in_queue)) != NULL) { + if (msg->msg == CURLMSG_DONE) + input_curl_handle_done(msg->easy_handle, msg->data.result); + } +} + +/** + * Give control to CURL. + * + * The caller must lock the mutex. Runs in the I/O thread. + */ +static bool +input_curl_perform(void) +{ + CURLMcode mcode; + + do { + int running_handles; + mcode = curl_multi_perform(curl.multi, &running_handles); + } while (mcode == CURLM_CALL_MULTI_PERFORM); + + if (mcode != CURLM_OK && mcode != CURLM_CALL_MULTI_PERFORM) { + GError *error = g_error_new(curl_quark(), mcode, + "curl_multi_perform() failed: %s", + curl_multi_strerror(mcode)); + input_curl_abort_all_requests(error); + return false; + } + + return true; +} + +/* + * GSource methods + * + */ + +/** + * The GSource prepare() method implementation. + */ +static gboolean +input_curl_source_prepare(G_GNUC_UNUSED GSource *source, gint *timeout_r) +{ + curl_update_fds(); + +#if LIBCURL_VERSION_NUM >= 0x070f04 + curl.timeout = false; + + long timeout2; + CURLMcode mcode = curl_multi_timeout(curl.multi, &timeout2); + if (mcode == CURLM_OK) { + if (timeout2 >= 0) { + g_source_get_current_time(source, + &curl.absolute_timeout); + g_time_val_add(&curl.absolute_timeout, + timeout2 * 1000); + } + + if (timeout2 >= 0 && timeout2 < 10) + /* CURL 7.21.1 likes to report "timeout=0", + which means we're running in a busy loop. + Quite a bad idea to waste so much CPU. + Let's use a lower limit of 10ms. */ + timeout2 = 10; + + *timeout_r = timeout2; + + curl.timeout = timeout2 >= 0; + } else + g_warning("curl_multi_timeout() failed: %s\n", + curl_multi_strerror(mcode)); +#else + (void)timeout_r; +#endif + + return false; +} + +/** + * The GSource check() method implementation. + */ +static gboolean +input_curl_source_check(G_GNUC_UNUSED GSource *source) +{ +#if LIBCURL_VERSION_NUM >= 0x070f04 + if (curl.timeout) { + /* when a timeout has expired, we need to call + curl_multi_perform(), even if there was no file + descriptor event */ + + GTimeVal now; + g_source_get_current_time(source, &now); + if (now.tv_sec > curl.absolute_timeout.tv_sec || + (now.tv_sec == curl.absolute_timeout.tv_sec && + now.tv_usec >= curl.absolute_timeout.tv_usec)) + return true; + } +#endif + + for (GSList *i = curl.fds; i != NULL; i = i->next) { + GPollFD *poll_fd = i->data; + if (poll_fd->revents != 0) + return true; + } + + return false; +} + +/** + * The GSource dispatch() method implementation. The callback isn't + * used, because we're handling all events directly. + */ +static gboolean +input_curl_source_dispatch(G_GNUC_UNUSED GSource *source, + G_GNUC_UNUSED GSourceFunc callback, + G_GNUC_UNUSED gpointer user_data) +{ + g_static_mutex_lock(&curl.mutex); + + if (input_curl_perform()) + input_curl_info_read(); + + g_static_mutex_unlock(&curl.mutex); + + return true; +} + +/** + * The vtable for our GSource implementation. Unfortunately, we + * cannot declare it "const", because g_source_new() takes a non-const + * pointer, for whatever reason. + */ +static GSourceFuncs curl_source_funcs = { + .prepare = input_curl_source_prepare, + .check = input_curl_source_check, + .dispatch = input_curl_source_dispatch, +}; + +/* + * input_plugin methods + * + */ + static bool input_curl_init(const struct config_param *param, G_GNUC_UNUSED GError **error_r) @@ -138,20 +669,61 @@ input_curl_init(const struct config_param *param, ""); } + curl.multi = curl_multi_init(); + if (curl.multi == NULL) { + g_set_error(error_r, curl_quark(), 0, + "curl_multi_init() failed"); + return false; + } + + curl.cond = g_cond_new(); + + curl.source = g_source_new(&curl_source_funcs, sizeof(*curl.source)); + curl.source_id = g_source_attach(curl.source, io_thread_context()); + return true; } +static gpointer +curl_destroy_sources(G_GNUC_UNUSED gpointer data) +{ + g_source_destroy(curl.source); + + if (curl.dirty_source_id != 0) { + GSource *source = + g_main_context_find_source_by_id(io_thread_context(), + curl.dirty_source_id); + assert(source != NULL); + curl.dirty_source_id = 0; + + g_source_destroy(source); + } + + return NULL; +} + static void input_curl_finish(void) { + assert(curl.requests == NULL); + + io_thread_call(curl_destroy_sources, NULL); + + curl_multi_cleanup(curl.multi); + g_cond_free(curl.cond); + curl_slist_free_all(http_200_aliases); curl_global_cleanup(); } +#if LIBCURL_VERSION_NUM >= 0x071200 + /** * Determine the total sizes of all buffers, including portions that * have already been consumed. + * + * The caller must lock the mutex. */ G_GNUC_PURE static size_t @@ -168,6 +740,8 @@ curl_total_buffer_size(const struct input_curl *c) return total; } +#endif + static void buffer_free_callback(gpointer data, G_GNUC_UNUSED gpointer user_data) { @@ -186,28 +760,6 @@ input_curl_flush_buffers(struct input_curl *c) } /** - * Frees the current "libcurl easy" handle, and everything associated - * with it. - */ -static void -input_curl_easy_free(struct input_curl *c) -{ - if (c->easy != NULL) { - curl_multi_remove_handle(c->multi, c->easy); - curl_easy_cleanup(c->easy); - c->easy = NULL; - } - - curl_slist_free_all(c->request_headers); - c->request_headers = NULL; - - g_free(c->range); - c->range = NULL; - - c->base.ready = true; -} - -/** * Frees this stream (but not the input_stream struct itself). */ static void @@ -217,12 +769,9 @@ input_curl_free(struct input_curl *c) tag_free(c->tag); g_free(c->meta_name); - input_curl_easy_free(c); + input_curl_easy_free_indirect(c); input_curl_flush_buffers(c); - if (c->multi != NULL) - curl_multi_cleanup(c->multi); - g_queue_free(c->buffers); g_free(c->url); @@ -241,117 +790,15 @@ input_curl_tag(struct input_stream *is) } static bool -input_curl_multi_info_read(struct input_curl *c, GError **error_r) -{ - CURLMsg *msg; - int msgs_in_queue; - - while ((msg = curl_multi_info_read(c->multi, - &msgs_in_queue)) != NULL) { - if (msg->msg == CURLMSG_DONE) { - CURLcode result = msg->data.result; - - input_curl_easy_free(c); - - if (result != CURLE_OK) { - g_set_error(error_r, curl_quark(), result, - "curl failed: %s", c->error); - return false; - } - } - } - - return true; -} - -/** - * Wait for the libcurl socket. - * - * @return -1 on error, 0 if no data is available yet, 1 if data is - * available - */ -static int -input_curl_select(struct input_curl *c, GError **error_r) -{ - fd_set rfds, wfds, efds; - int max_fd, ret; - CURLMcode mcode; - struct timeval timeout = { - .tv_sec = 1, - .tv_usec = 0, - }; - - assert(c->easy != NULL); - - FD_ZERO(&rfds); - FD_ZERO(&wfds); - FD_ZERO(&efds); - - mcode = curl_multi_fdset(c->multi, &rfds, &wfds, &efds, &max_fd); - if (mcode != CURLM_OK) { - g_set_error(error_r, curl_quark(), mcode, - "curl_multi_fdset() failed: %s", - curl_multi_strerror(mcode)); - return -1; - } - -#if LIBCURL_VERSION_NUM >= 0x070f04 - long timeout2; - mcode = curl_multi_timeout(c->multi, &timeout2); - if (mcode != CURLM_OK) { - g_warning("curl_multi_timeout() failed: %s\n", - curl_multi_strerror(mcode)); - return -1; - } - - if (timeout2 >= 0) { - if (timeout2 > 10000) - timeout2 = 10000; - - timeout.tv_sec = timeout2 / 1000; - timeout.tv_usec = (timeout2 % 1000) * 1000; - } -#endif - - ret = select(max_fd + 1, &rfds, &wfds, &efds, &timeout); - if (ret < 0) - g_set_error(error_r, g_quark_from_static_string("errno"), - errno, - "select() failed: %s\n", g_strerror(errno)); - - return ret; -} - -static bool fill_buffer(struct input_curl *c, GError **error_r) { - CURLMcode mcode = CURLM_CALL_MULTI_PERFORM; - - while (c->easy != NULL && g_queue_is_empty(c->buffers)) { - int running_handles; - bool bret; - - if (mcode != CURLM_CALL_MULTI_PERFORM) { - /* if we're still here, there is no input yet - - wait for input */ - int ret = input_curl_select(c, error_r); - if (ret <= 0) - /* no data yet or error */ - return false; - } - - mcode = curl_multi_perform(c->multi, &running_handles); - if (mcode != CURLM_OK && mcode != CURLM_CALL_MULTI_PERFORM) { - g_set_error(error_r, curl_quark(), mcode, - "curl_multi_perform() failed: %s", - curl_multi_strerror(mcode)); - input_curl_easy_free(c); - return false; - } + while (c->easy != NULL && g_queue_is_empty(c->buffers)) + g_cond_wait(curl.cond, g_static_mutex_get_mutex(&curl.mutex)); - bret = input_curl_multi_info_read(c, error_r); - if (!bret) - return false; + if (c->postponed_error != NULL) { + g_propagate_error(error_r, c->postponed_error); + c->postponed_error = NULL; + return false; } return !g_queue_is_empty(c->buffers); @@ -456,12 +903,16 @@ input_curl_read(struct input_stream *is, void *ptr, size_t size, size_t nbytes = 0; char *dest = ptr; + g_static_mutex_lock(&curl.mutex); + do { /* fill the buffer */ success = fill_buffer(c, error_r); - if (!success) + if (!success) { + g_static_mutex_unlock(&curl.mutex); return 0; + } /* send buffer contents */ @@ -479,6 +930,13 @@ input_curl_read(struct input_stream *is, void *ptr, size_t size, is->offset += (goffset)nbytes; +#if LIBCURL_VERSION_NUM >= 0x071200 + if (c->paused && curl_total_buffer_size(c) < CURL_MAX_BUFFERED) + io_thread_call(input_curl_resume, c); +#endif + + g_static_mutex_unlock(&curl.mutex); + return nbytes; } @@ -495,7 +953,11 @@ input_curl_eof(G_GNUC_UNUSED struct input_stream *is) { struct input_curl *c = (struct input_curl *)is; - return c->easy == NULL && g_queue_is_empty(c->buffers); + g_static_mutex_lock(&curl.mutex); + bool eof = c->easy == NULL && g_queue_is_empty(c->buffers); + g_static_mutex_unlock(&curl.mutex); + + return eof; } static int @@ -503,40 +965,21 @@ input_curl_buffer(struct input_stream *is, GError **error_r) { struct input_curl *c = (struct input_curl *)is; - if (curl_total_buffer_size(c) >= CURL_MAX_BUFFERED) - return 0; + g_static_mutex_lock(&curl.mutex); - CURLMcode mcode; - int running_handles; - bool ret; + int result; + if (c->postponed_error != NULL) { + g_propagate_error(error_r, c->postponed_error); + c->postponed_error = NULL; + result = -1; + } else if (g_queue_is_empty(c->buffers)) + result = 0; + else + result = 1; - c->buffered = false; - - if (!is->ready && c->easy != NULL) - /* not ready yet means the caller is waiting in a busy - loop; relax that by calling select() on the - socket */ - if (input_curl_select(c, error_r) < 0) - return -1; - - do { - mcode = curl_multi_perform(c->multi, &running_handles); - } while (mcode == CURLM_CALL_MULTI_PERFORM && - g_queue_is_empty(c->buffers)); + g_static_mutex_unlock(&curl.mutex); - if (mcode != CURLM_OK && mcode != CURLM_CALL_MULTI_PERFORM) { - g_set_error(error_r, curl_quark(), mcode, - "curl_multi_perform() failed: %s", - curl_multi_strerror(mcode)); - input_curl_easy_free(c); - return -1; - } - - ret = input_curl_multi_info_read(c, error_r); - if (!ret) - return -1; - - return c->buffered; + return result; } /** called by curl when new data is available */ @@ -634,15 +1077,24 @@ input_curl_writefunction(void *ptr, size_t size, size_t nmemb, void *stream) if (size == 0) return 0; +#if LIBCURL_VERSION_NUM >= 0x071200 + if (curl_total_buffer_size(c) + size >= CURL_MAX_BUFFERED) { + c->paused = true; + return CURL_WRITEFUNC_PAUSE; + } +#endif + buffer = g_malloc(sizeof(*buffer) - sizeof(buffer->data) + size); buffer->size = size; buffer->consumed = 0; memcpy(buffer->data, ptr, size); + g_queue_push_tail(c->buffers, buffer); - c->buffered = true; c->base.ready = true; + g_cond_broadcast(curl.cond); + return size; } @@ -650,7 +1102,6 @@ static bool input_curl_easy_init(struct input_curl *c, GError **error_r) { CURLcode code; - CURLMcode mcode; c->easy = curl_easy_init(); if (c->easy == NULL) { @@ -659,14 +1110,6 @@ input_curl_easy_init(struct input_curl *c, GError **error_r) return false; } - mcode = curl_multi_add_handle(c->multi, c->easy); - if (mcode != CURLM_OK) { - g_set_error(error_r, curl_quark(), mcode, - "curl_multi_add_handle() failed: %s", - curl_multi_strerror(mcode)); - return false; - } - curl_easy_setopt(c->easy, CURLOPT_USERAGENT, "Music Player Daemon " VERSION); curl_easy_setopt(c->easy, CURLOPT_HEADERFUNCTION, @@ -712,26 +1155,6 @@ input_curl_easy_init(struct input_curl *c, GError **error_r) } static bool -input_curl_send_request(struct input_curl *c, GError **error_r) -{ - CURLMcode mcode; - int running_handles; - - do { - mcode = curl_multi_perform(c->multi, &running_handles); - } while (mcode == CURLM_CALL_MULTI_PERFORM); - - if (mcode != CURLM_OK) { - g_set_error(error_r, curl_quark(), mcode, - "curl_multi_perform() failed: %s", - curl_multi_strerror(mcode)); - return false; - } - - return true; -} - -static bool input_curl_seek(struct input_stream *is, goffset offset, int whence, GError **error_r) { @@ -796,7 +1219,7 @@ input_curl_seek(struct input_stream *is, goffset offset, int whence, /* close the old connection and open a new one */ - input_curl_easy_free(c); + input_curl_easy_free_indirect(c); input_curl_flush_buffers(c); is->offset = offset; @@ -818,18 +1241,34 @@ input_curl_seek(struct input_stream *is, goffset offset, int whence, curl_easy_setopt(c->easy, CURLOPT_RANGE, c->range); } - ret = input_curl_send_request(c, error_r); - if (!ret) + g_static_mutex_lock(&curl.mutex); + + c->base.ready = false; + + if (!input_curl_easy_add(c, error_r)) { + g_static_mutex_unlock(&curl.mutex); return false; + } + + while (!c->base.ready) + g_cond_wait(curl.cond, g_static_mutex_get_mutex(&curl.mutex)); - return input_curl_multi_info_read(c, error_r); + if (c->postponed_error != NULL) { + g_propagate_error(error_r, c->postponed_error); + c->postponed_error = NULL; + g_static_mutex_unlock(&curl.mutex); + return false; + } + + g_static_mutex_unlock(&curl.mutex); + + return true; } static struct input_stream * input_curl_open(const char *url, GError **error_r) { struct input_curl *c; - bool ret; if (strncmp(url, "http://", 7) != 0) return NULL; @@ -840,35 +1279,27 @@ input_curl_open(const char *url, GError **error_r) c->url = g_strdup(url); c->buffers = g_queue_new(); - c->multi = curl_multi_init(); - if (c->multi == NULL) { - g_set_error(error_r, curl_quark(), 0, - "curl_multi_init() failed"); - input_curl_free(c); - return NULL; - } - icy_clear(&c->icy_metadata); c->tag = NULL; - ret = input_curl_easy_init(c, error_r); - if (!ret) { - input_curl_free(c); - return NULL; - } +#if LIBCURL_VERSION_NUM >= 0x071200 + c->paused = false; +#endif - ret = input_curl_send_request(c, error_r); - if (!ret) { + if (!input_curl_easy_init(c, error_r)) { input_curl_free(c); return NULL; } - ret = input_curl_multi_info_read(c, error_r); - if (!ret) { + g_static_mutex_lock(&curl.mutex); + if (!input_curl_easy_add(c, error_r)) { + g_static_mutex_unlock(&curl.mutex); input_curl_free(c); return NULL; } + g_static_mutex_unlock(&curl.mutex); + return &c->base; } |