From 33ba2886220a03883417c45b680606c3866dcfe7 Mon Sep 17 00:00:00 2001 From: Max Kellermann Date: Thu, 10 Jan 2013 10:14:29 +0100 Subject: input/{curl,soup}: convert to C++ --- src/input/CurlInputPlugin.cxx | 1311 +++++++++++++++++++++++++++++++++++++++++ src/input/CurlInputPlugin.hxx | 27 + src/input/SoupInputPlugin.cxx | 482 +++++++++++++++ src/input/SoupInputPlugin.hxx | 25 + src/input/curl_input_plugin.c | 1301 ---------------------------------------- src/input/curl_input_plugin.h | 27 - src/input/soup_input_plugin.c | 473 --------------- src/input/soup_input_plugin.h | 25 - 8 files changed, 1845 insertions(+), 1826 deletions(-) create mode 100644 src/input/CurlInputPlugin.cxx create mode 100644 src/input/CurlInputPlugin.hxx create mode 100644 src/input/SoupInputPlugin.cxx create mode 100644 src/input/SoupInputPlugin.hxx delete mode 100644 src/input/curl_input_plugin.c delete mode 100644 src/input/curl_input_plugin.h delete mode 100644 src/input/soup_input_plugin.c delete mode 100644 src/input/soup_input_plugin.h (limited to 'src/input') diff --git a/src/input/CurlInputPlugin.cxx b/src/input/CurlInputPlugin.cxx new file mode 100644 index 000000000..e2b3f11db --- /dev/null +++ b/src/input/CurlInputPlugin.cxx @@ -0,0 +1,1311 @@ +/* + * Copyright (C) 2003-2013 The Music Player Daemon Project + * http://www.musicpd.org + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#include "config.h" +#include "CurlInputPlugin.hxx" +#include "input_plugin.h" +#include "conf.h" +#include "tag.h" + +extern "C" { +#include "input_internal.h" +#include "icy_metadata.h" +#include "io_thread.h" +} + +#include "glib_compat.h" + +#include + +#if defined(WIN32) + #include +#else + #include +#endif + +#include +#include + +#include +#include + +#undef G_LOG_DOMAIN +#define G_LOG_DOMAIN "input_curl" + +/** + * Do not buffer more than this number of bytes. It should be a + * reasonable limit that doesn't make low-end machines suffer too + * much, but doesn't cause stuttering on high-latency lines. + */ +static const size_t CURL_MAX_BUFFERED = 512 * 1024; + +/** + * Resume the stream at this number of bytes after it has been paused. + */ +static const size_t CURL_RESUME_AT = 384 * 1024; + +/** + * Buffers created by input_curl_writefunction(). + */ +struct buffer { + /** size of the payload */ + size_t size; + + /** how much has been consumed yet? */ + size_t consumed; + + /** the payload */ + unsigned char data[sizeof(long)]; +}; + +struct input_curl { + struct input_stream base; + + /* some buffers which were passed to libcurl, which we have + too free */ + char *url, *range; + struct curl_slist *request_headers; + + /** the curl handles */ + CURL *easy; + + /** the GMainLoop source used to poll all CURL file + descriptors */ + GSource *source; + + /** the source id of #source */ + guint source_id; + + /** a linked list of all registered GPollFD objects */ + GSList *fds; + + /** list of buffers, where input_curl_writefunction() appends + to, and input_curl_read() reads from them */ + GQueue *buffers; + +#if LIBCURL_VERSION_NUM >= 0x071200 + /** + * Is the connection currently paused? That happens when the + * buffer was getting too large. It will be unpaused when the + * buffer is below the threshold again. + */ + bool paused; +#endif + + /** error message provided by libcurl */ + char error[CURL_ERROR_SIZE]; + + /** parser for icy-metadata */ + struct icy_metadata icy_metadata; + + /** the stream name from the icy-name response header */ + char *meta_name; + + /** the tag object ready to be requested via + input_stream_tag() */ + struct tag *tag; + + GError *postponed_error; +}; + +/** libcurl should accept "ICY 200 OK" */ +static struct curl_slist *http_200_aliases; + +/** HTTP proxy settings */ +static const char *proxy, *proxy_user, *proxy_password; +static unsigned proxy_port; + +static struct { + CURLM *multi; + + /** + * A linked list of all active HTTP requests. An active + * request is one that doesn't have the "eof" flag set. + */ + GSList *requests; + + /** + * The GMainLoop source used to poll all CURL file + * descriptors. + */ + GSource *source; + + /** + * The source id of #source. + */ + guint source_id; + + GSList *fds; + +#if LIBCURL_VERSION_NUM >= 0x070f04 + /** + * Did CURL give us a timeout? If yes, then we need to call + * curl_multi_perform(), even if there was no event on any + * file descriptor. + */ + bool timeout; + + /** + * The absolute time stamp when the timeout expires. This is + * used in the GSource method check(). + */ + gint64 absolute_timeout; +#endif +} curl; + +static inline GQuark +curl_quark(void) +{ + return g_quark_from_static_string("curl"); +} + +/** + * Find a request by its CURL "easy" handle. + * + * Runs in the I/O thread. No lock needed. + */ +static struct input_curl * +input_curl_find_request(CURL *easy) +{ + assert(io_thread_inside()); + + for (GSList *i = curl.requests; i != NULL; i = g_slist_next(i)) { + struct input_curl *c = (struct input_curl *)i->data; + if (c->easy == easy) + return c; + } + + return NULL; +} + +#if LIBCURL_VERSION_NUM >= 0x071200 + +static gpointer +input_curl_resume(gpointer data) +{ + assert(io_thread_inside()); + + struct input_curl *c = (struct input_curl *)data; + + if (c->paused) { + c->paused = false; + curl_easy_pause(c->easy, CURLPAUSE_CONT); + } + + return NULL; +} + +#endif + +/** + * Calculates the GLib event bit mask for one file descriptor, + * obtained from three #fd_set objects filled by curl_multi_fdset(). + */ +static gushort +input_curl_fd_events(int fd, fd_set *rfds, fd_set *wfds, fd_set *efds) +{ + gushort events = 0; + + if (FD_ISSET(fd, rfds)) { + events |= G_IO_IN | G_IO_HUP | G_IO_ERR; + FD_CLR(fd, rfds); + } + + if (FD_ISSET(fd, wfds)) { + events |= G_IO_OUT | G_IO_ERR; + FD_CLR(fd, wfds); + } + + if (FD_ISSET(fd, efds)) { + events |= G_IO_HUP | G_IO_ERR; + FD_CLR(fd, efds); + } + + return events; +} + +/** + * Updates all registered GPollFD objects, unregisters old ones, + * registers new ones. + * + * Runs in the I/O thread. No lock needed. + */ +static void +curl_update_fds(void) +{ + assert(io_thread_inside()); + + fd_set rfds, wfds, efds; + + FD_ZERO(&rfds); + FD_ZERO(&wfds); + FD_ZERO(&efds); + + int max_fd; + CURLMcode mcode = curl_multi_fdset(curl.multi, &rfds, &wfds, + &efds, &max_fd); + if (mcode != CURLM_OK) { + g_warning("curl_multi_fdset() failed: %s\n", + curl_multi_strerror(mcode)); + return; + } + + GSList *fds = curl.fds; + curl.fds = NULL; + + while (fds != NULL) { + GPollFD *poll_fd = (GPollFD *)fds->data; + gushort events = input_curl_fd_events(poll_fd->fd, &rfds, + &wfds, &efds); + + assert(poll_fd->events != 0); + + fds = g_slist_remove(fds, poll_fd); + + if (events != poll_fd->events) + g_source_remove_poll(curl.source, poll_fd); + + if (events != 0) { + if (events != poll_fd->events) { + poll_fd->events = events; + g_source_add_poll(curl.source, poll_fd); + } + + curl.fds = g_slist_prepend(curl.fds, poll_fd); + } else { + g_free(poll_fd); + } + } + + for (int fd = 0; fd <= max_fd; ++fd) { + gushort events = input_curl_fd_events(fd, &rfds, &wfds, &efds); + if (events != 0) { + GPollFD *poll_fd = g_new(GPollFD, 1); + poll_fd->fd = fd; + poll_fd->events = events; + g_source_add_poll(curl.source, poll_fd); + curl.fds = g_slist_prepend(curl.fds, poll_fd); + } + } +} + +/** + * Runs in the I/O thread. No lock needed. + */ +static bool +input_curl_easy_add(struct input_curl *c, GError **error_r) +{ + assert(io_thread_inside()); + assert(c != NULL); + assert(c->easy != NULL); + assert(input_curl_find_request(c->easy) == NULL); + + curl.requests = g_slist_prepend(curl.requests, c); + + CURLMcode mcode = curl_multi_add_handle(curl.multi, c->easy); + if (mcode != CURLM_OK) { + g_set_error(error_r, curl_quark(), mcode, + "curl_multi_add_handle() failed: %s", + curl_multi_strerror(mcode)); + return false; + } + + curl_update_fds(); + + return true; +} + +struct easy_add_params { + struct input_curl *c; + GError **error_r; +}; + +static gpointer +input_curl_easy_add_callback(gpointer data) +{ + const struct easy_add_params *params = + (const struct easy_add_params *)data; + + bool success = input_curl_easy_add(params->c, params->error_r); + return GUINT_TO_POINTER(success); +} + +/** + * Call input_curl_easy_add() in the I/O thread. May be called from + * any thread. Caller must not hold a mutex. + */ +static bool +input_curl_easy_add_indirect(struct input_curl *c, GError **error_r) +{ + assert(c != NULL); + assert(c->easy != NULL); + + struct easy_add_params params = { + c, + error_r, + }; + + gpointer result = + io_thread_call(input_curl_easy_add_callback, ¶ms); + return GPOINTER_TO_UINT(result); +} + +/** + * Frees the current "libcurl easy" handle, and everything associated + * with it. + * + * Runs in the I/O thread. + */ +static void +input_curl_easy_free(struct input_curl *c) +{ + assert(io_thread_inside()); + assert(c != NULL); + + if (c->easy == NULL) + return; + + curl.requests = g_slist_remove(curl.requests, c); + + curl_multi_remove_handle(curl.multi, c->easy); + curl_easy_cleanup(c->easy); + c->easy = NULL; + + curl_slist_free_all(c->request_headers); + c->request_headers = NULL; + + g_free(c->range); + c->range = NULL; +} + +static gpointer +input_curl_easy_free_callback(gpointer data) +{ + struct input_curl *c = (struct input_curl *)data; + + input_curl_easy_free(c); + curl_update_fds(); + + return NULL; +} + +/** + * Frees the current "libcurl easy" handle, and everything associated + * with it. + * + * The mutex must not be locked. + */ +static void +input_curl_easy_free_indirect(struct input_curl *c) +{ + io_thread_call(input_curl_easy_free_callback, c); + assert(c->easy == NULL); +} + +/** + * Abort and free all HTTP requests. + * + * Runs in the I/O thread. The caller must not hold locks. + */ +static void +input_curl_abort_all_requests(GError *error) +{ + assert(io_thread_inside()); + assert(error != NULL); + + while (curl.requests != NULL) { + struct input_curl *c = + (struct input_curl *)curl.requests->data; + assert(c->postponed_error == NULL); + + input_curl_easy_free(c); + + g_mutex_lock(c->base.mutex); + c->postponed_error = g_error_copy(error); + c->base.ready = true; + g_cond_broadcast(c->base.cond); + g_mutex_unlock(c->base.mutex); + } + + g_error_free(error); + +} + +/** + * A HTTP request is finished. + * + * Runs in the I/O thread. The caller must not hold locks. + */ +static void +input_curl_request_done(struct input_curl *c, CURLcode result, long status) +{ + assert(io_thread_inside()); + assert(c != NULL); + assert(c->easy == NULL); + assert(c->postponed_error == NULL); + + g_mutex_lock(c->base.mutex); + + if (result != CURLE_OK) { + c->postponed_error = g_error_new(curl_quark(), result, + "curl failed: %s", + c->error); + } else if (status < 200 || status >= 300) { + c->postponed_error = g_error_new(curl_quark(), 0, + "got HTTP status %ld", + status); + } + + c->base.ready = true; + g_cond_broadcast(c->base.cond); + g_mutex_unlock(c->base.mutex); +} + +static void +input_curl_handle_done(CURL *easy_handle, CURLcode result) +{ + struct input_curl *c = input_curl_find_request(easy_handle); + assert(c != NULL); + + long status = 0; + curl_easy_getinfo(easy_handle, CURLINFO_RESPONSE_CODE, &status); + + input_curl_easy_free(c); + input_curl_request_done(c, result, status); +} + +/** + * Check for finished HTTP responses. + * + * Runs in the I/O thread. The caller must not hold locks. + */ +static void +input_curl_info_read(void) +{ + assert(io_thread_inside()); + + CURLMsg *msg; + int msgs_in_queue; + + while ((msg = curl_multi_info_read(curl.multi, + &msgs_in_queue)) != NULL) { + if (msg->msg == CURLMSG_DONE) + input_curl_handle_done(msg->easy_handle, msg->data.result); + } +} + +/** + * Give control to CURL. + * + * Runs in the I/O thread. The caller must not hold locks. + */ +static bool +input_curl_perform(void) +{ + assert(io_thread_inside()); + + CURLMcode mcode; + + do { + int running_handles; + mcode = curl_multi_perform(curl.multi, &running_handles); + } while (mcode == CURLM_CALL_MULTI_PERFORM); + + if (mcode != CURLM_OK && mcode != CURLM_CALL_MULTI_PERFORM) { + GError *error = g_error_new(curl_quark(), mcode, + "curl_multi_perform() failed: %s", + curl_multi_strerror(mcode)); + input_curl_abort_all_requests(error); + return false; + } + + return true; +} + +/* + * GSource methods + * + */ + +/** + * The GSource prepare() method implementation. + */ +static gboolean +input_curl_source_prepare(G_GNUC_UNUSED GSource *source, gint *timeout_r) +{ + curl_update_fds(); + +#if LIBCURL_VERSION_NUM >= 0x070f04 + curl.timeout = false; + + long timeout2; + CURLMcode mcode = curl_multi_timeout(curl.multi, &timeout2); + if (mcode == CURLM_OK) { + if (timeout2 >= 0) + curl.absolute_timeout = g_source_get_time(source) + + timeout2 * 1000; + + if (timeout2 >= 0 && timeout2 < 10) + /* CURL 7.21.1 likes to report "timeout=0", + which means we're running in a busy loop. + Quite a bad idea to waste so much CPU. + Let's use a lower limit of 10ms. */ + timeout2 = 10; + + *timeout_r = timeout2; + + curl.timeout = timeout2 >= 0; + } else + g_warning("curl_multi_timeout() failed: %s\n", + curl_multi_strerror(mcode)); +#else + (void)timeout_r; +#endif + + return false; +} + +/** + * The GSource check() method implementation. + */ +static gboolean +input_curl_source_check(G_GNUC_UNUSED GSource *source) +{ +#if LIBCURL_VERSION_NUM >= 0x070f04 + if (curl.timeout) { + /* when a timeout has expired, we need to call + curl_multi_perform(), even if there was no file + descriptor event */ + + if (g_source_get_time(source) >= curl.absolute_timeout) + return true; + } +#endif + + for (GSList *i = curl.fds; i != NULL; i = i->next) { + GPollFD *poll_fd = (GPollFD *)i->data; + if (poll_fd->revents != 0) + return true; + } + + return false; +} + +/** + * The GSource dispatch() method implementation. The callback isn't + * used, because we're handling all events directly. + */ +static gboolean +input_curl_source_dispatch(G_GNUC_UNUSED GSource *source, + G_GNUC_UNUSED GSourceFunc callback, + G_GNUC_UNUSED gpointer user_data) +{ + if (input_curl_perform()) + input_curl_info_read(); + + return true; +} + +/** + * The vtable for our GSource implementation. Unfortunately, we + * cannot declare it "const", because g_source_new() takes a non-const + * pointer, for whatever reason. + */ +static GSourceFuncs curl_source_funcs = { + input_curl_source_prepare, + input_curl_source_check, + input_curl_source_dispatch, + nullptr, + nullptr, + nullptr, +}; + +/* + * input_plugin methods + * + */ + +static bool +input_curl_init(const struct config_param *param, + G_GNUC_UNUSED GError **error_r) +{ + CURLcode code = curl_global_init(CURL_GLOBAL_ALL); + if (code != CURLE_OK) { + g_set_error(error_r, curl_quark(), code, + "curl_global_init() failed: %s\n", + curl_easy_strerror(code)); + return false; + } + + http_200_aliases = curl_slist_append(http_200_aliases, "ICY 200 OK"); + + proxy = config_get_block_string(param, "proxy", NULL); + proxy_port = config_get_block_unsigned(param, "proxy_port", 0); + proxy_user = config_get_block_string(param, "proxy_user", NULL); + proxy_password = config_get_block_string(param, "proxy_password", + NULL); + + if (proxy == NULL) { + /* deprecated proxy configuration */ + proxy = config_get_string(CONF_HTTP_PROXY_HOST, NULL); + proxy_port = config_get_positive(CONF_HTTP_PROXY_PORT, 0); + proxy_user = config_get_string(CONF_HTTP_PROXY_USER, NULL); + proxy_password = config_get_string(CONF_HTTP_PROXY_PASSWORD, + ""); + } + + curl.multi = curl_multi_init(); + if (curl.multi == NULL) { + g_set_error(error_r, curl_quark(), 0, + "curl_multi_init() failed"); + return false; + } + + curl.source = g_source_new(&curl_source_funcs, sizeof(*curl.source)); + curl.source_id = g_source_attach(curl.source, io_thread_context()); + + return true; +} + +static gpointer +curl_destroy_sources(G_GNUC_UNUSED gpointer data) +{ + g_source_destroy(curl.source); + + return NULL; +} + +static void +input_curl_finish(void) +{ + assert(curl.requests == NULL); + + io_thread_call(curl_destroy_sources, NULL); + + curl_multi_cleanup(curl.multi); + + curl_slist_free_all(http_200_aliases); + + curl_global_cleanup(); +} + +#if LIBCURL_VERSION_NUM >= 0x071200 + +/** + * Determine the total sizes of all buffers, including portions that + * have already been consumed. + * + * The caller must lock the mutex. + */ +G_GNUC_PURE +static size_t +curl_total_buffer_size(const struct input_curl *c) +{ + size_t total = 0; + + for (GList *i = g_queue_peek_head_link(c->buffers); + i != NULL; i = g_list_next(i)) { + struct buffer *buffer = (struct buffer *)i->data; + total += buffer->size; + } + + return total; +} + +#endif + +static void +buffer_free_callback(gpointer data, G_GNUC_UNUSED gpointer user_data) +{ + struct buffer *buffer = (struct buffer *)data; + + assert(buffer->consumed <= buffer->size); + + g_free(buffer); +} + +static void +input_curl_flush_buffers(struct input_curl *c) +{ + g_queue_foreach(c->buffers, buffer_free_callback, NULL); + g_queue_clear(c->buffers); +} + +/** + * Frees this stream, including the input_stream struct. + */ +static void +input_curl_free(struct input_curl *c) +{ + if (c->tag != NULL) + tag_free(c->tag); + g_free(c->meta_name); + + input_curl_easy_free_indirect(c); + input_curl_flush_buffers(c); + + g_queue_free(c->buffers); + + if (c->postponed_error != NULL) + g_error_free(c->postponed_error); + + g_free(c->url); + input_stream_deinit(&c->base); + g_free(c); +} + +static bool +input_curl_check(struct input_stream *is, GError **error_r) +{ + struct input_curl *c = (struct input_curl *)is; + + bool success = c->postponed_error == NULL; + if (!success) { + g_propagate_error(error_r, c->postponed_error); + c->postponed_error = NULL; + } + + return success; +} + +static struct tag * +input_curl_tag(struct input_stream *is) +{ + struct input_curl *c = (struct input_curl *)is; + struct tag *tag = c->tag; + + c->tag = NULL; + return tag; +} + +static bool +fill_buffer(struct input_curl *c, GError **error_r) +{ + while (c->easy != NULL && g_queue_is_empty(c->buffers)) + g_cond_wait(c->base.cond, c->base.mutex); + + if (c->postponed_error != NULL) { + g_propagate_error(error_r, c->postponed_error); + c->postponed_error = NULL; + return false; + } + + return !g_queue_is_empty(c->buffers); +} + +/** + * Mark a part of the buffer object as consumed. + */ +static struct buffer * +consume_buffer(struct buffer *buffer, size_t length) +{ + assert(buffer != NULL); + assert(buffer->consumed < buffer->size); + + buffer->consumed += length; + if (buffer->consumed < buffer->size) + return buffer; + + assert(buffer->consumed == buffer->size); + + g_free(buffer); + + return NULL; +} + +static size_t +read_from_buffer(struct icy_metadata *icy_metadata, GQueue *buffers, + void *dest0, size_t length) +{ + struct buffer *buffer = (struct buffer *)g_queue_pop_head(buffers); + uint8_t *dest = (uint8_t *)dest0; + size_t nbytes = 0; + + assert(buffer->size > 0); + assert(buffer->consumed < buffer->size); + + if (length > buffer->size - buffer->consumed) + length = buffer->size - buffer->consumed; + + while (true) { + size_t chunk; + + chunk = icy_data(icy_metadata, length); + if (chunk > 0) { + memcpy(dest, buffer->data + buffer->consumed, + chunk); + buffer = consume_buffer(buffer, chunk); + + nbytes += chunk; + dest += chunk; + length -= chunk; + + if (length == 0) + break; + + assert(buffer != NULL); + } + + chunk = icy_meta(icy_metadata, buffer->data + buffer->consumed, + length); + if (chunk > 0) { + buffer = consume_buffer(buffer, chunk); + + length -= chunk; + + if (length == 0) + break; + + assert(buffer != NULL); + } + } + + if (buffer != NULL) + g_queue_push_head(buffers, buffer); + + return nbytes; +} + +static void +copy_icy_tag(struct input_curl *c) +{ + struct tag *tag = icy_tag(&c->icy_metadata); + + if (tag == NULL) + return; + + if (c->tag != NULL) + tag_free(c->tag); + + if (c->meta_name != NULL && !tag_has_type(tag, TAG_NAME)) + tag_add_item(tag, TAG_NAME, c->meta_name); + + c->tag = tag; +} + +static bool +input_curl_available(struct input_stream *is) +{ + struct input_curl *c = (struct input_curl *)is; + + return c->postponed_error != NULL || c->easy == NULL || + !g_queue_is_empty(c->buffers); +} + +static size_t +input_curl_read(struct input_stream *is, void *ptr, size_t size, + GError **error_r) +{ + struct input_curl *c = (struct input_curl *)is; + bool success; + size_t nbytes = 0; + char *dest = (char *)ptr; + + do { + /* fill the buffer */ + + success = fill_buffer(c, error_r); + if (!success) + return 0; + + /* send buffer contents */ + + while (size > 0 && !g_queue_is_empty(c->buffers)) { + size_t copy = read_from_buffer(&c->icy_metadata, c->buffers, + dest + nbytes, size); + + nbytes += copy; + size -= copy; + } + } while (nbytes == 0); + + if (icy_defined(&c->icy_metadata)) + copy_icy_tag(c); + + is->offset += (goffset)nbytes; + +#if LIBCURL_VERSION_NUM >= 0x071200 + if (c->paused && curl_total_buffer_size(c) < CURL_RESUME_AT) { + g_mutex_unlock(c->base.mutex); + io_thread_call(input_curl_resume, c); + g_mutex_lock(c->base.mutex); + } +#endif + + return nbytes; +} + +static void +input_curl_close(struct input_stream *is) +{ + struct input_curl *c = (struct input_curl *)is; + + input_curl_free(c); +} + +static bool +input_curl_eof(G_GNUC_UNUSED struct input_stream *is) +{ + struct input_curl *c = (struct input_curl *)is; + + return c->easy == NULL && g_queue_is_empty(c->buffers); +} + +/** called by curl when new data is available */ +static size_t +input_curl_headerfunction(void *ptr, size_t size, size_t nmemb, void *stream) +{ + struct input_curl *c = (struct input_curl *)stream; + char name[64]; + + size *= nmemb; + + const char *header = (const char *)ptr; + const char *end = header + size; + + const char *value = (const char *)memchr(header, ':', size); + if (value == NULL || (size_t)(value - header) >= sizeof(name)) + return size; + + memcpy(name, header, value - header); + name[value - header] = 0; + + /* skip the colon */ + + ++value; + + /* strip the value */ + + while (value < end && g_ascii_isspace(*value)) + ++value; + + while (end > value && g_ascii_isspace(end[-1])) + --end; + + if (g_ascii_strcasecmp(name, "accept-ranges") == 0) { + /* a stream with icy-metadata is not seekable */ + if (!icy_defined(&c->icy_metadata)) + c->base.seekable = true; + } else if (g_ascii_strcasecmp(name, "content-length") == 0) { + char buffer[64]; + + if ((size_t)(end - header) >= sizeof(buffer)) + return size; + + memcpy(buffer, value, end - value); + buffer[end - value] = 0; + + c->base.size = c->base.offset + g_ascii_strtoull(buffer, NULL, 10); + } else if (g_ascii_strcasecmp(name, "content-type") == 0) { + g_free(c->base.mime); + c->base.mime = g_strndup(value, end - value); + } else if (g_ascii_strcasecmp(name, "icy-name") == 0 || + g_ascii_strcasecmp(name, "ice-name") == 0 || + g_ascii_strcasecmp(name, "x-audiocast-name") == 0) { + g_free(c->meta_name); + c->meta_name = g_strndup(value, end - value); + + if (c->tag != NULL) + tag_free(c->tag); + + c->tag = tag_new(); + tag_add_item(c->tag, TAG_NAME, c->meta_name); + } else if (g_ascii_strcasecmp(name, "icy-metaint") == 0) { + char buffer[64]; + size_t icy_metaint; + + if ((size_t)(end - header) >= sizeof(buffer) || + icy_defined(&c->icy_metadata)) + return size; + + memcpy(buffer, value, end - value); + buffer[end - value] = 0; + + icy_metaint = g_ascii_strtoull(buffer, NULL, 10); + g_debug("icy-metaint=%zu", icy_metaint); + + if (icy_metaint > 0) { + icy_start(&c->icy_metadata, icy_metaint); + + /* a stream with icy-metadata is not + seekable */ + c->base.seekable = false; + } + } + + return size; +} + +/** called by curl when new data is available */ +static size_t +input_curl_writefunction(void *ptr, size_t size, size_t nmemb, void *stream) +{ + struct input_curl *c = (struct input_curl *)stream; + + size *= nmemb; + if (size == 0) + return 0; + + g_mutex_lock(c->base.mutex); + +#if LIBCURL_VERSION_NUM >= 0x071200 + if (curl_total_buffer_size(c) + size >= CURL_MAX_BUFFERED) { + c->paused = true; + g_mutex_unlock(c->base.mutex); + return CURL_WRITEFUNC_PAUSE; + } +#endif + + struct buffer *buffer = (struct buffer *) + g_malloc(sizeof(*buffer) - sizeof(buffer->data) + size); + buffer->size = size; + buffer->consumed = 0; + memcpy(buffer->data, ptr, size); + + g_queue_push_tail(c->buffers, buffer); + c->base.ready = true; + + g_cond_broadcast(c->base.cond); + g_mutex_unlock(c->base.mutex); + + return size; +} + +static bool +input_curl_easy_init(struct input_curl *c, GError **error_r) +{ + CURLcode code; + + c->easy = curl_easy_init(); + if (c->easy == NULL) { + g_set_error(error_r, curl_quark(), 0, + "curl_easy_init() failed"); + return false; + } + + curl_easy_setopt(c->easy, CURLOPT_USERAGENT, + "Music Player Daemon " VERSION); + curl_easy_setopt(c->easy, CURLOPT_HEADERFUNCTION, + input_curl_headerfunction); + curl_easy_setopt(c->easy, CURLOPT_WRITEHEADER, c); + curl_easy_setopt(c->easy, CURLOPT_WRITEFUNCTION, + input_curl_writefunction); + curl_easy_setopt(c->easy, CURLOPT_WRITEDATA, c); + curl_easy_setopt(c->easy, CURLOPT_HTTP200ALIASES, http_200_aliases); + curl_easy_setopt(c->easy, CURLOPT_FOLLOWLOCATION, 1); + curl_easy_setopt(c->easy, CURLOPT_NETRC, 1); + curl_easy_setopt(c->easy, CURLOPT_MAXREDIRS, 5); + curl_easy_setopt(c->easy, CURLOPT_FAILONERROR, true); + curl_easy_setopt(c->easy, CURLOPT_ERRORBUFFER, c->error); + curl_easy_setopt(c->easy, CURLOPT_NOPROGRESS, 1l); + curl_easy_setopt(c->easy, CURLOPT_NOSIGNAL, 1l); + curl_easy_setopt(c->easy, CURLOPT_CONNECTTIMEOUT, 10l); + + if (proxy != NULL) + curl_easy_setopt(c->easy, CURLOPT_PROXY, proxy); + + if (proxy_port > 0) + curl_easy_setopt(c->easy, CURLOPT_PROXYPORT, (long)proxy_port); + + if (proxy_user != NULL && proxy_password != NULL) { + char *proxy_auth_str = + g_strconcat(proxy_user, ":", proxy_password, NULL); + curl_easy_setopt(c->easy, CURLOPT_PROXYUSERPWD, proxy_auth_str); + g_free(proxy_auth_str); + } + + code = curl_easy_setopt(c->easy, CURLOPT_URL, c->url); + if (code != CURLE_OK) { + g_set_error(error_r, curl_quark(), code, + "curl_easy_setopt() failed: %s", + curl_easy_strerror(code)); + return false; + } + + c->request_headers = NULL; + c->request_headers = curl_slist_append(c->request_headers, + "Icy-Metadata: 1"); + curl_easy_setopt(c->easy, CURLOPT_HTTPHEADER, c->request_headers); + + return true; +} + +static bool +input_curl_seek(struct input_stream *is, goffset offset, int whence, + GError **error_r) +{ + struct input_curl *c = (struct input_curl *)is; + bool ret; + + assert(is->ready); + + if (whence == SEEK_SET && offset == is->offset) + /* no-op */ + return true; + + if (!is->seekable) + return false; + + /* calculate the absolute offset */ + + switch (whence) { + case SEEK_SET: + break; + + case SEEK_CUR: + offset += is->offset; + break; + + case SEEK_END: + if (is->size < 0) + /* stream size is not known */ + return false; + + offset += is->size; + break; + + default: + return false; + } + + if (offset < 0) + return false; + + /* check if we can fast-forward the buffer */ + + while (offset > is->offset && !g_queue_is_empty(c->buffers)) { + struct buffer *buffer; + size_t length; + + buffer = (struct buffer *)g_queue_pop_head(c->buffers); + + length = buffer->size - buffer->consumed; + if (offset - is->offset < (goffset)length) + length = offset - is->offset; + + buffer = consume_buffer(buffer, length); + if (buffer != NULL) + g_queue_push_head(c->buffers, buffer); + + is->offset += length; + } + + if (offset == is->offset) + return true; + + /* close the old connection and open a new one */ + + g_mutex_unlock(c->base.mutex); + + input_curl_easy_free_indirect(c); + input_curl_flush_buffers(c); + + is->offset = offset; + if (is->offset == is->size) { + /* seek to EOF: simulate empty result; avoid + triggering a "416 Requested Range Not Satisfiable" + response */ + return true; + } + + ret = input_curl_easy_init(c, error_r); + if (!ret) + return false; + + /* send the "Range" header */ + + if (is->offset > 0) { + c->range = g_strdup_printf("%lld-", (long long)is->offset); + curl_easy_setopt(c->easy, CURLOPT_RANGE, c->range); + } + + c->base.ready = false; + + if (!input_curl_easy_add_indirect(c, error_r)) + return false; + + g_mutex_lock(c->base.mutex); + + while (!c->base.ready) + g_cond_wait(c->base.cond, c->base.mutex); + + if (c->postponed_error != NULL) { + g_propagate_error(error_r, c->postponed_error); + c->postponed_error = NULL; + return false; + } + + return true; +} + +static struct input_stream * +input_curl_open(const char *url, GMutex *mutex, GCond *cond, + GError **error_r) +{ + assert(mutex != NULL); + assert(cond != NULL); + + struct input_curl *c; + + if (strncmp(url, "http://", 7) != 0) + return NULL; + + c = g_new0(struct input_curl, 1); + input_stream_init(&c->base, &input_plugin_curl, url, + mutex, cond); + + c->url = g_strdup(url); + c->buffers = g_queue_new(); + + icy_clear(&c->icy_metadata); + c->tag = NULL; + + c->postponed_error = NULL; + +#if LIBCURL_VERSION_NUM >= 0x071200 + c->paused = false; +#endif + + if (!input_curl_easy_init(c, error_r)) { + input_curl_free(c); + return NULL; + } + + if (!input_curl_easy_add_indirect(c, error_r)) { + input_curl_free(c); + return NULL; + } + + return &c->base; +} + +const struct input_plugin input_plugin_curl = { + "curl", + input_curl_init, + input_curl_finish, + input_curl_open, + input_curl_close, + input_curl_check, + nullptr, + input_curl_tag, + input_curl_available, + input_curl_read, + input_curl_eof, + input_curl_seek, +}; diff --git a/src/input/CurlInputPlugin.hxx b/src/input/CurlInputPlugin.hxx new file mode 100644 index 000000000..20d1309d8 --- /dev/null +++ b/src/input/CurlInputPlugin.hxx @@ -0,0 +1,27 @@ +/* + * Copyright (C) 2003-2013 The Music Player Daemon Project + * http://www.musicpd.org + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#ifndef MPD_INPUT_CURL_HXX +#define MPD_INPUT_CURL_HXX + +struct input_stream; + +extern const struct input_plugin input_plugin_curl; + +#endif diff --git a/src/input/SoupInputPlugin.cxx b/src/input/SoupInputPlugin.cxx new file mode 100644 index 000000000..f3422d242 --- /dev/null +++ b/src/input/SoupInputPlugin.cxx @@ -0,0 +1,482 @@ +/* + * Copyright (C) 2003-2013 The Music Player Daemon Project + * http://www.musicpd.org + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#include "config.h" +#include "SoupInputPlugin.hxx" +#include "input_plugin.h" + +extern "C" { +#include "input_internal.h" +#include "io_thread.h" +} + +#include "conf.h" + +extern "C" { +#include +#include +} + +#include +#include + +#undef G_LOG_DOMAIN +#define G_LOG_DOMAIN "input_soup" + +/** + * Do not buffer more than this number of bytes. It should be a + * reasonable limit that doesn't make low-end machines suffer too + * much, but doesn't cause stuttering on high-latency lines. + */ +static const size_t SOUP_MAX_BUFFERED = 512 * 1024; + +/** + * Resume the stream at this number of bytes after it has been paused. + */ +static const size_t SOUP_RESUME_AT = 384 * 1024; + +static SoupURI *soup_proxy; +static SoupSession *soup_session; + +struct input_soup { + struct input_stream base; + + SoupMessage *msg; + + GQueue *buffers; + + size_t current_consumed; + + size_t total_buffered; + + bool alive, pause, eof; + + /** + * Set when the session callback has been invoked, when it is + * safe to free this object. + */ + bool completed; + + GError *postponed_error; +}; + +static inline GQuark +soup_quark(void) +{ + return g_quark_from_static_string("soup"); +} + +static bool +input_soup_init(const struct config_param *param, GError **error_r) +{ + assert(soup_proxy == NULL); + assert(soup_session == NULL); + + g_type_init(); + + const char *proxy = config_get_block_string(param, "proxy", NULL); + + if (proxy != NULL) { + soup_proxy = soup_uri_new(proxy); + if (soup_proxy == NULL) { + g_set_error(error_r, soup_quark(), 0, + "failed to parse proxy setting"); + return false; + } + } + + soup_session = + soup_session_async_new_with_options(SOUP_SESSION_PROXY_URI, + soup_proxy, + SOUP_SESSION_ASYNC_CONTEXT, + io_thread_context(), + NULL); + + return true; +} + +static void +input_soup_finish(void) +{ + assert(soup_session != NULL); + + soup_session_abort(soup_session); + g_object_unref(G_OBJECT(soup_session)); + + if (soup_proxy != NULL) + soup_uri_free(soup_proxy); +} + +/** + * Copy the error from the SoupMessage object to + * input_soup::postponed_error. + * + * @return true if there was no error + */ +static bool +input_soup_copy_error(struct input_soup *s, const SoupMessage *msg) +{ + if (SOUP_STATUS_IS_SUCCESSFUL(msg->status_code)) + return true; + + if (msg->status_code == SOUP_STATUS_CANCELLED) + /* failure, but don't generate a GError, because this + status was caused by _close() */ + return false; + + if (s->postponed_error != NULL) + /* there's already a GError, don't overwrite it */ + return false; + + if (SOUP_STATUS_IS_TRANSPORT_ERROR(msg->status_code)) + s->postponed_error = + g_error_new(soup_quark(), msg->status_code, + "HTTP client error: %s", + msg->reason_phrase); + else + s->postponed_error = + g_error_new(soup_quark(), msg->status_code, + "got HTTP status: %d %s", + msg->status_code, msg->reason_phrase); + + return false; +} + +static void +input_soup_session_callback(G_GNUC_UNUSED SoupSession *session, + SoupMessage *msg, gpointer user_data) +{ + struct input_soup *s = (struct input_soup *)user_data; + + assert(msg == s->msg); + assert(!s->completed); + + g_mutex_lock(s->base.mutex); + + if (!s->base.ready) + input_soup_copy_error(s, msg); + + s->base.ready = true; + s->alive = false; + s->completed = true; + + g_cond_broadcast(s->base.cond); + g_mutex_unlock(s->base.mutex); +} + +static void +input_soup_got_headers(SoupMessage *msg, gpointer user_data) +{ + struct input_soup *s = (struct input_soup *)user_data; + + g_mutex_lock(s->base.mutex); + + if (!input_soup_copy_error(s, msg)) { + g_mutex_unlock(s->base.mutex); + + soup_session_cancel_message(soup_session, msg, + SOUP_STATUS_CANCELLED); + return; + } + + s->base.ready = true; + g_cond_broadcast(s->base.cond); + g_mutex_unlock(s->base.mutex); + + soup_message_body_set_accumulate(msg->response_body, false); +} + +static void +input_soup_got_chunk(SoupMessage *msg, SoupBuffer *chunk, gpointer user_data) +{ + struct input_soup *s = (struct input_soup *)user_data; + + assert(msg == s->msg); + + g_mutex_lock(s->base.mutex); + + g_queue_push_tail(s->buffers, soup_buffer_copy(chunk)); + s->total_buffered += chunk->length; + + if (s->total_buffered >= SOUP_MAX_BUFFERED && !s->pause) { + s->pause = true; + soup_session_pause_message(soup_session, msg); + } + + g_cond_broadcast(s->base.cond); + g_mutex_unlock(s->base.mutex); +} + +static void +input_soup_got_body(G_GNUC_UNUSED SoupMessage *msg, gpointer user_data) +{ + struct input_soup *s = (struct input_soup *)user_data; + + assert(msg == s->msg); + + g_mutex_lock(s->base.mutex); + + s->base.ready = true; + s->eof = true; + s->alive = false; + + g_cond_broadcast(s->base.cond); + g_mutex_unlock(s->base.mutex); +} + +static bool +input_soup_wait_data(struct input_soup *s) +{ + while (true) { + if (s->eof) + return true; + + if (!s->alive) + return false; + + if (!g_queue_is_empty(s->buffers)) + return true; + + assert(s->current_consumed == 0); + + g_cond_wait(s->base.cond, s->base.mutex); + } +} + +static gpointer +input_soup_queue(gpointer data) +{ + struct input_soup *s = (struct input_soup *)data; + + soup_session_queue_message(soup_session, s->msg, + input_soup_session_callback, s); + + return NULL; +} + +static struct input_stream * +input_soup_open(const char *uri, + GMutex *mutex, GCond *cond, + G_GNUC_UNUSED GError **error_r) +{ + if (strncmp(uri, "http://", 7) != 0) + return NULL; + + struct input_soup *s = g_new(struct input_soup, 1); + input_stream_init(&s->base, &input_plugin_soup, uri, + mutex, cond); + + s->buffers = g_queue_new(); + s->current_consumed = 0; + s->total_buffered = 0; + +#if GCC_CHECK_VERSION(4,6) +#pragma GCC diagnostic push + /* the libsoup macro SOUP_METHOD_GET discards the "const" + attribute of the g_intern_static_string() return value; + don't make the gcc warning fatal: */ +#pragma GCC diagnostic ignored "-Wcast-qual" +#endif + + s->msg = soup_message_new(SOUP_METHOD_GET, uri); + +#if GCC_CHECK_VERSION(4,6) +#pragma GCC diagnostic pop +#endif + + soup_message_set_flags(s->msg, SOUP_MESSAGE_NO_REDIRECT); + + soup_message_headers_append(s->msg->request_headers, "User-Agent", + "Music Player Daemon " VERSION); + + g_signal_connect(s->msg, "got-headers", + G_CALLBACK(input_soup_got_headers), s); + g_signal_connect(s->msg, "got-chunk", + G_CALLBACK(input_soup_got_chunk), s); + g_signal_connect(s->msg, "got-body", + G_CALLBACK(input_soup_got_body), s); + + s->alive = true; + s->pause = false; + s->eof = false; + s->completed = false; + s->postponed_error = NULL; + + io_thread_call(input_soup_queue, s); + + return &s->base; +} + +static gpointer +input_soup_cancel(gpointer data) +{ + struct input_soup *s = (struct input_soup *)data; + + if (!s->completed) + soup_session_cancel_message(soup_session, s->msg, + SOUP_STATUS_CANCELLED); + + return NULL; +} + +static void +input_soup_close(struct input_stream *is) +{ + struct input_soup *s = (struct input_soup *)is; + + g_mutex_lock(s->base.mutex); + + if (!s->completed) { + /* the messages's session callback hasn't been invoked + yet; cancel it and wait for completion */ + + g_mutex_unlock(s->base.mutex); + + io_thread_call(input_soup_cancel, s); + + g_mutex_lock(s->base.mutex); + while (!s->completed) + g_cond_wait(s->base.cond, s->base.mutex); + } + + g_mutex_unlock(s->base.mutex); + + SoupBuffer *buffer; + while ((buffer = (SoupBuffer *)g_queue_pop_head(s->buffers)) != NULL) + soup_buffer_free(buffer); + g_queue_free(s->buffers); + + if (s->postponed_error != NULL) + g_error_free(s->postponed_error); + + input_stream_deinit(&s->base); + g_free(s); +} + +static bool +input_soup_check(struct input_stream *is, GError **error_r) +{ + struct input_soup *s = (struct input_soup *)is; + + bool success = s->postponed_error == NULL; + if (!success) { + g_propagate_error(error_r, s->postponed_error); + s->postponed_error = NULL; + } + + return success; +} + +static bool +input_soup_available(struct input_stream *is) +{ + struct input_soup *s = (struct input_soup *)is; + + return s->eof || !s->alive || !g_queue_is_empty(s->buffers); +} + +static size_t +input_soup_read(struct input_stream *is, void *ptr, size_t size, + G_GNUC_UNUSED GError **error_r) +{ + struct input_soup *s = (struct input_soup *)is; + + if (!input_soup_wait_data(s)) { + assert(!s->alive); + + if (s->postponed_error != NULL) { + g_propagate_error(error_r, s->postponed_error); + s->postponed_error = NULL; + } else + g_set_error_literal(error_r, soup_quark(), 0, + "HTTP failure"); + return 0; + } + + char *p0 = (char *)ptr, *p = p0, *p_end = p0 + size; + + while (p < p_end) { + SoupBuffer *buffer = (SoupBuffer *) + g_queue_pop_head(s->buffers); + if (buffer == NULL) { + assert(s->current_consumed == 0); + break; + } + + assert(s->current_consumed < buffer->length); + assert(s->total_buffered >= buffer->length); + + const char *q = buffer->data; + q += s->current_consumed; + + size_t remaining = buffer->length - s->current_consumed; + size_t nbytes = p_end - p; + if (nbytes > remaining) + nbytes = remaining; + + memcpy(p, q, nbytes); + p += nbytes; + + s->current_consumed += remaining; + if (s->current_consumed >= buffer->length) { + /* done with this buffer */ + s->total_buffered -= buffer->length; + soup_buffer_free(buffer); + s->current_consumed = 0; + } else { + /* partial read */ + assert(p == p_end); + + g_queue_push_head(s->buffers, buffer); + } + } + + if (s->pause && s->total_buffered < SOUP_RESUME_AT) { + s->pause = false; + soup_session_unpause_message(soup_session, s->msg); + } + + size_t nbytes = p - p0; + s->base.offset += nbytes; + + return nbytes; +} + +static bool +input_soup_eof(G_GNUC_UNUSED struct input_stream *is) +{ + struct input_soup *s = (struct input_soup *)is; + + return !s->alive && g_queue_is_empty(s->buffers); +} + +const struct input_plugin input_plugin_soup = { + "soup", + input_soup_init, + input_soup_finish, + input_soup_open, + input_soup_close, + input_soup_check, + nullptr, + nullptr, + input_soup_available, + input_soup_read, + input_soup_eof, + nullptr, +}; diff --git a/src/input/SoupInputPlugin.hxx b/src/input/SoupInputPlugin.hxx new file mode 100644 index 000000000..4c089b39b --- /dev/null +++ b/src/input/SoupInputPlugin.hxx @@ -0,0 +1,25 @@ +/* + * Copyright (C) 2003-2013 The Music Player Daemon Project + * http://www.musicpd.org + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#ifndef MPD_INPUT_SOUP_HXX +#define MPD_INPUT_SOUP_HXX + +extern const struct input_plugin input_plugin_soup; + +#endif diff --git a/src/input/curl_input_plugin.c b/src/input/curl_input_plugin.c deleted file mode 100644 index 3f191141e..000000000 --- a/src/input/curl_input_plugin.c +++ /dev/null @@ -1,1301 +0,0 @@ -/* - * Copyright (C) 2003-2011 The Music Player Daemon Project - * http://www.musicpd.org - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation; either version 2 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License along - * with this program; if not, write to the Free Software Foundation, Inc., - * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. - */ - -#include "config.h" -#include "input/curl_input_plugin.h" -#include "input_internal.h" -#include "input_plugin.h" -#include "conf.h" -#include "tag.h" -#include "icy_metadata.h" -#include "io_thread.h" -#include "glib_compat.h" - -#include - -#if defined(WIN32) - #include -#else - #include -#endif - -#include -#include - -#include -#include - -#undef G_LOG_DOMAIN -#define G_LOG_DOMAIN "input_curl" - -/** - * Do not buffer more than this number of bytes. It should be a - * reasonable limit that doesn't make low-end machines suffer too - * much, but doesn't cause stuttering on high-latency lines. - */ -static const size_t CURL_MAX_BUFFERED = 512 * 1024; - -/** - * Resume the stream at this number of bytes after it has been paused. - */ -static const size_t CURL_RESUME_AT = 384 * 1024; - -/** - * Buffers created by input_curl_writefunction(). - */ -struct buffer { - /** size of the payload */ - size_t size; - - /** how much has been consumed yet? */ - size_t consumed; - - /** the payload */ - unsigned char data[sizeof(long)]; -}; - -struct input_curl { - struct input_stream base; - - /* some buffers which were passed to libcurl, which we have - too free */ - char *url, *range; - struct curl_slist *request_headers; - - /** the curl handles */ - CURL *easy; - - /** the GMainLoop source used to poll all CURL file - descriptors */ - GSource *source; - - /** the source id of #source */ - guint source_id; - - /** a linked list of all registered GPollFD objects */ - GSList *fds; - - /** list of buffers, where input_curl_writefunction() appends - to, and input_curl_read() reads from them */ - GQueue *buffers; - -#if LIBCURL_VERSION_NUM >= 0x071200 - /** - * Is the connection currently paused? That happens when the - * buffer was getting too large. It will be unpaused when the - * buffer is below the threshold again. - */ - bool paused; -#endif - - /** error message provided by libcurl */ - char error[CURL_ERROR_SIZE]; - - /** parser for icy-metadata */ - struct icy_metadata icy_metadata; - - /** the stream name from the icy-name response header */ - char *meta_name; - - /** the tag object ready to be requested via - input_stream_tag() */ - struct tag *tag; - - GError *postponed_error; -}; - -/** libcurl should accept "ICY 200 OK" */ -static struct curl_slist *http_200_aliases; - -/** HTTP proxy settings */ -static const char *proxy, *proxy_user, *proxy_password; -static unsigned proxy_port; - -static struct { - CURLM *multi; - - /** - * A linked list of all active HTTP requests. An active - * request is one that doesn't have the "eof" flag set. - */ - GSList *requests; - - /** - * The GMainLoop source used to poll all CURL file - * descriptors. - */ - GSource *source; - - /** - * The source id of #source. - */ - guint source_id; - - GSList *fds; - -#if LIBCURL_VERSION_NUM >= 0x070f04 - /** - * Did CURL give us a timeout? If yes, then we need to call - * curl_multi_perform(), even if there was no event on any - * file descriptor. - */ - bool timeout; - - /** - * The absolute time stamp when the timeout expires. This is - * used in the GSource method check(). - */ - gint64 absolute_timeout; -#endif -} curl; - -static inline GQuark -curl_quark(void) -{ - return g_quark_from_static_string("curl"); -} - -/** - * Find a request by its CURL "easy" handle. - * - * Runs in the I/O thread. No lock needed. - */ -static struct input_curl * -input_curl_find_request(CURL *easy) -{ - assert(io_thread_inside()); - - for (GSList *i = curl.requests; i != NULL; i = g_slist_next(i)) { - struct input_curl *c = i->data; - if (c->easy == easy) - return c; - } - - return NULL; -} - -#if LIBCURL_VERSION_NUM >= 0x071200 - -static gpointer -input_curl_resume(gpointer data) -{ - assert(io_thread_inside()); - - struct input_curl *c = data; - - if (c->paused) { - c->paused = false; - curl_easy_pause(c->easy, CURLPAUSE_CONT); - } - - return NULL; -} - -#endif - -/** - * Calculates the GLib event bit mask for one file descriptor, - * obtained from three #fd_set objects filled by curl_multi_fdset(). - */ -static gushort -input_curl_fd_events(int fd, fd_set *rfds, fd_set *wfds, fd_set *efds) -{ - gushort events = 0; - - if (FD_ISSET(fd, rfds)) { - events |= G_IO_IN | G_IO_HUP | G_IO_ERR; - FD_CLR(fd, rfds); - } - - if (FD_ISSET(fd, wfds)) { - events |= G_IO_OUT | G_IO_ERR; - FD_CLR(fd, wfds); - } - - if (FD_ISSET(fd, efds)) { - events |= G_IO_HUP | G_IO_ERR; - FD_CLR(fd, efds); - } - - return events; -} - -/** - * Updates all registered GPollFD objects, unregisters old ones, - * registers new ones. - * - * Runs in the I/O thread. No lock needed. - */ -static void -curl_update_fds(void) -{ - assert(io_thread_inside()); - - fd_set rfds, wfds, efds; - - FD_ZERO(&rfds); - FD_ZERO(&wfds); - FD_ZERO(&efds); - - int max_fd; - CURLMcode mcode = curl_multi_fdset(curl.multi, &rfds, &wfds, - &efds, &max_fd); - if (mcode != CURLM_OK) { - g_warning("curl_multi_fdset() failed: %s\n", - curl_multi_strerror(mcode)); - return; - } - - GSList *fds = curl.fds; - curl.fds = NULL; - - while (fds != NULL) { - GPollFD *poll_fd = fds->data; - gushort events = input_curl_fd_events(poll_fd->fd, &rfds, - &wfds, &efds); - - assert(poll_fd->events != 0); - - fds = g_slist_remove(fds, poll_fd); - - if (events != poll_fd->events) - g_source_remove_poll(curl.source, poll_fd); - - if (events != 0) { - if (events != poll_fd->events) { - poll_fd->events = events; - g_source_add_poll(curl.source, poll_fd); - } - - curl.fds = g_slist_prepend(curl.fds, poll_fd); - } else { - g_free(poll_fd); - } - } - - for (int fd = 0; fd <= max_fd; ++fd) { - gushort events = input_curl_fd_events(fd, &rfds, &wfds, &efds); - if (events != 0) { - GPollFD *poll_fd = g_new(GPollFD, 1); - poll_fd->fd = fd; - poll_fd->events = events; - g_source_add_poll(curl.source, poll_fd); - curl.fds = g_slist_prepend(curl.fds, poll_fd); - } - } -} - -/** - * Runs in the I/O thread. No lock needed. - */ -static bool -input_curl_easy_add(struct input_curl *c, GError **error_r) -{ - assert(io_thread_inside()); - assert(c != NULL); - assert(c->easy != NULL); - assert(input_curl_find_request(c->easy) == NULL); - - curl.requests = g_slist_prepend(curl.requests, c); - - CURLMcode mcode = curl_multi_add_handle(curl.multi, c->easy); - if (mcode != CURLM_OK) { - g_set_error(error_r, curl_quark(), mcode, - "curl_multi_add_handle() failed: %s", - curl_multi_strerror(mcode)); - return false; - } - - curl_update_fds(); - - return true; -} - -struct easy_add_params { - struct input_curl *c; - GError **error_r; -}; - -static gpointer -input_curl_easy_add_callback(gpointer data) -{ - const struct easy_add_params *params = data; - - bool success = input_curl_easy_add(params->c, params->error_r); - return GUINT_TO_POINTER(success); -} - -/** - * Call input_curl_easy_add() in the I/O thread. May be called from - * any thread. Caller must not hold a mutex. - */ -static bool -input_curl_easy_add_indirect(struct input_curl *c, GError **error_r) -{ - assert(c != NULL); - assert(c->easy != NULL); - - struct easy_add_params params = { - .c = c, - .error_r = error_r, - }; - - gpointer result = - io_thread_call(input_curl_easy_add_callback, ¶ms); - return GPOINTER_TO_UINT(result); -} - -/** - * Frees the current "libcurl easy" handle, and everything associated - * with it. - * - * Runs in the I/O thread. - */ -static void -input_curl_easy_free(struct input_curl *c) -{ - assert(io_thread_inside()); - assert(c != NULL); - - if (c->easy == NULL) - return; - - curl.requests = g_slist_remove(curl.requests, c); - - curl_multi_remove_handle(curl.multi, c->easy); - curl_easy_cleanup(c->easy); - c->easy = NULL; - - curl_slist_free_all(c->request_headers); - c->request_headers = NULL; - - g_free(c->range); - c->range = NULL; -} - -static gpointer -input_curl_easy_free_callback(gpointer data) -{ - struct input_curl *c = data; - - input_curl_easy_free(c); - curl_update_fds(); - - return NULL; -} - -/** - * Frees the current "libcurl easy" handle, and everything associated - * with it. - * - * The mutex must not be locked. - */ -static void -input_curl_easy_free_indirect(struct input_curl *c) -{ - io_thread_call(input_curl_easy_free_callback, c); - assert(c->easy == NULL); -} - -/** - * Abort and free all HTTP requests. - * - * Runs in the I/O thread. The caller must not hold locks. - */ -static void -input_curl_abort_all_requests(GError *error) -{ - assert(io_thread_inside()); - assert(error != NULL); - - while (curl.requests != NULL) { - struct input_curl *c = curl.requests->data; - assert(c->postponed_error == NULL); - - input_curl_easy_free(c); - - g_mutex_lock(c->base.mutex); - c->postponed_error = g_error_copy(error); - c->base.ready = true; - g_cond_broadcast(c->base.cond); - g_mutex_unlock(c->base.mutex); - } - - g_error_free(error); - -} - -/** - * A HTTP request is finished. - * - * Runs in the I/O thread. The caller must not hold locks. - */ -static void -input_curl_request_done(struct input_curl *c, CURLcode result, long status) -{ - assert(io_thread_inside()); - assert(c != NULL); - assert(c->easy == NULL); - assert(c->postponed_error == NULL); - - g_mutex_lock(c->base.mutex); - - if (result != CURLE_OK) { - c->postponed_error = g_error_new(curl_quark(), result, - "curl failed: %s", - c->error); - } else if (status < 200 || status >= 300) { - c->postponed_error = g_error_new(curl_quark(), 0, - "got HTTP status %ld", - status); - } - - c->base.ready = true; - g_cond_broadcast(c->base.cond); - g_mutex_unlock(c->base.mutex); -} - -static void -input_curl_handle_done(CURL *easy_handle, CURLcode result) -{ - struct input_curl *c = input_curl_find_request(easy_handle); - assert(c != NULL); - - long status = 0; - curl_easy_getinfo(easy_handle, CURLINFO_RESPONSE_CODE, &status); - - input_curl_easy_free(c); - input_curl_request_done(c, result, status); -} - -/** - * Check for finished HTTP responses. - * - * Runs in the I/O thread. The caller must not hold locks. - */ -static void -input_curl_info_read(void) -{ - assert(io_thread_inside()); - - CURLMsg *msg; - int msgs_in_queue; - - while ((msg = curl_multi_info_read(curl.multi, - &msgs_in_queue)) != NULL) { - if (msg->msg == CURLMSG_DONE) - input_curl_handle_done(msg->easy_handle, msg->data.result); - } -} - -/** - * Give control to CURL. - * - * Runs in the I/O thread. The caller must not hold locks. - */ -static bool -input_curl_perform(void) -{ - assert(io_thread_inside()); - - CURLMcode mcode; - - do { - int running_handles; - mcode = curl_multi_perform(curl.multi, &running_handles); - } while (mcode == CURLM_CALL_MULTI_PERFORM); - - if (mcode != CURLM_OK && mcode != CURLM_CALL_MULTI_PERFORM) { - GError *error = g_error_new(curl_quark(), mcode, - "curl_multi_perform() failed: %s", - curl_multi_strerror(mcode)); - input_curl_abort_all_requests(error); - return false; - } - - return true; -} - -/* - * GSource methods - * - */ - -/** - * The GSource prepare() method implementation. - */ -static gboolean -input_curl_source_prepare(G_GNUC_UNUSED GSource *source, gint *timeout_r) -{ - curl_update_fds(); - -#if LIBCURL_VERSION_NUM >= 0x070f04 - curl.timeout = false; - - long timeout2; - CURLMcode mcode = curl_multi_timeout(curl.multi, &timeout2); - if (mcode == CURLM_OK) { - if (timeout2 >= 0) - curl.absolute_timeout = g_source_get_time(source) - + timeout2 * 1000; - - if (timeout2 >= 0 && timeout2 < 10) - /* CURL 7.21.1 likes to report "timeout=0", - which means we're running in a busy loop. - Quite a bad idea to waste so much CPU. - Let's use a lower limit of 10ms. */ - timeout2 = 10; - - *timeout_r = timeout2; - - curl.timeout = timeout2 >= 0; - } else - g_warning("curl_multi_timeout() failed: %s\n", - curl_multi_strerror(mcode)); -#else - (void)timeout_r; -#endif - - return false; -} - -/** - * The GSource check() method implementation. - */ -static gboolean -input_curl_source_check(G_GNUC_UNUSED GSource *source) -{ -#if LIBCURL_VERSION_NUM >= 0x070f04 - if (curl.timeout) { - /* when a timeout has expired, we need to call - curl_multi_perform(), even if there was no file - descriptor event */ - - if (g_source_get_time(source) >= curl.absolute_timeout) - return true; - } -#endif - - for (GSList *i = curl.fds; i != NULL; i = i->next) { - GPollFD *poll_fd = i->data; - if (poll_fd->revents != 0) - return true; - } - - return false; -} - -/** - * The GSource dispatch() method implementation. The callback isn't - * used, because we're handling all events directly. - */ -static gboolean -input_curl_source_dispatch(G_GNUC_UNUSED GSource *source, - G_GNUC_UNUSED GSourceFunc callback, - G_GNUC_UNUSED gpointer user_data) -{ - if (input_curl_perform()) - input_curl_info_read(); - - return true; -} - -/** - * The vtable for our GSource implementation. Unfortunately, we - * cannot declare it "const", because g_source_new() takes a non-const - * pointer, for whatever reason. - */ -static GSourceFuncs curl_source_funcs = { - .prepare = input_curl_source_prepare, - .check = input_curl_source_check, - .dispatch = input_curl_source_dispatch, -}; - -/* - * input_plugin methods - * - */ - -static bool -input_curl_init(const struct config_param *param, - G_GNUC_UNUSED GError **error_r) -{ - CURLcode code = curl_global_init(CURL_GLOBAL_ALL); - if (code != CURLE_OK) { - g_set_error(error_r, curl_quark(), code, - "curl_global_init() failed: %s\n", - curl_easy_strerror(code)); - return false; - } - - http_200_aliases = curl_slist_append(http_200_aliases, "ICY 200 OK"); - - proxy = config_get_block_string(param, "proxy", NULL); - proxy_port = config_get_block_unsigned(param, "proxy_port", 0); - proxy_user = config_get_block_string(param, "proxy_user", NULL); - proxy_password = config_get_block_string(param, "proxy_password", - NULL); - - if (proxy == NULL) { - /* deprecated proxy configuration */ - proxy = config_get_string(CONF_HTTP_PROXY_HOST, NULL); - proxy_port = config_get_positive(CONF_HTTP_PROXY_PORT, 0); - proxy_user = config_get_string(CONF_HTTP_PROXY_USER, NULL); - proxy_password = config_get_string(CONF_HTTP_PROXY_PASSWORD, - ""); - } - - curl.multi = curl_multi_init(); - if (curl.multi == NULL) { - g_set_error(error_r, curl_quark(), 0, - "curl_multi_init() failed"); - return false; - } - - curl.source = g_source_new(&curl_source_funcs, sizeof(*curl.source)); - curl.source_id = g_source_attach(curl.source, io_thread_context()); - - return true; -} - -static gpointer -curl_destroy_sources(G_GNUC_UNUSED gpointer data) -{ - g_source_destroy(curl.source); - - return NULL; -} - -static void -input_curl_finish(void) -{ - assert(curl.requests == NULL); - - io_thread_call(curl_destroy_sources, NULL); - - curl_multi_cleanup(curl.multi); - - curl_slist_free_all(http_200_aliases); - - curl_global_cleanup(); -} - -#if LIBCURL_VERSION_NUM >= 0x071200 - -/** - * Determine the total sizes of all buffers, including portions that - * have already been consumed. - * - * The caller must lock the mutex. - */ -G_GNUC_PURE -static size_t -curl_total_buffer_size(const struct input_curl *c) -{ - size_t total = 0; - - for (GList *i = g_queue_peek_head_link(c->buffers); - i != NULL; i = g_list_next(i)) { - struct buffer *buffer = i->data; - total += buffer->size; - } - - return total; -} - -#endif - -static void -buffer_free_callback(gpointer data, G_GNUC_UNUSED gpointer user_data) -{ - struct buffer *buffer = data; - - assert(buffer->consumed <= buffer->size); - - g_free(buffer); -} - -static void -input_curl_flush_buffers(struct input_curl *c) -{ - g_queue_foreach(c->buffers, buffer_free_callback, NULL); - g_queue_clear(c->buffers); -} - -/** - * Frees this stream, including the input_stream struct. - */ -static void -input_curl_free(struct input_curl *c) -{ - if (c->tag != NULL) - tag_free(c->tag); - g_free(c->meta_name); - - input_curl_easy_free_indirect(c); - input_curl_flush_buffers(c); - - g_queue_free(c->buffers); - - if (c->postponed_error != NULL) - g_error_free(c->postponed_error); - - g_free(c->url); - input_stream_deinit(&c->base); - g_free(c); -} - -static bool -input_curl_check(struct input_stream *is, GError **error_r) -{ - struct input_curl *c = (struct input_curl *)is; - - bool success = c->postponed_error == NULL; - if (!success) { - g_propagate_error(error_r, c->postponed_error); - c->postponed_error = NULL; - } - - return success; -} - -static struct tag * -input_curl_tag(struct input_stream *is) -{ - struct input_curl *c = (struct input_curl *)is; - struct tag *tag = c->tag; - - c->tag = NULL; - return tag; -} - -static bool -fill_buffer(struct input_curl *c, GError **error_r) -{ - while (c->easy != NULL && g_queue_is_empty(c->buffers)) - g_cond_wait(c->base.cond, c->base.mutex); - - if (c->postponed_error != NULL) { - g_propagate_error(error_r, c->postponed_error); - c->postponed_error = NULL; - return false; - } - - return !g_queue_is_empty(c->buffers); -} - -/** - * Mark a part of the buffer object as consumed. - */ -static struct buffer * -consume_buffer(struct buffer *buffer, size_t length) -{ - assert(buffer != NULL); - assert(buffer->consumed < buffer->size); - - buffer->consumed += length; - if (buffer->consumed < buffer->size) - return buffer; - - assert(buffer->consumed == buffer->size); - - g_free(buffer); - - return NULL; -} - -static size_t -read_from_buffer(struct icy_metadata *icy_metadata, GQueue *buffers, - void *dest0, size_t length) -{ - struct buffer *buffer = g_queue_pop_head(buffers); - uint8_t *dest = dest0; - size_t nbytes = 0; - - assert(buffer->size > 0); - assert(buffer->consumed < buffer->size); - - if (length > buffer->size - buffer->consumed) - length = buffer->size - buffer->consumed; - - while (true) { - size_t chunk; - - chunk = icy_data(icy_metadata, length); - if (chunk > 0) { - memcpy(dest, buffer->data + buffer->consumed, - chunk); - buffer = consume_buffer(buffer, chunk); - - nbytes += chunk; - dest += chunk; - length -= chunk; - - if (length == 0) - break; - - assert(buffer != NULL); - } - - chunk = icy_meta(icy_metadata, buffer->data + buffer->consumed, - length); - if (chunk > 0) { - buffer = consume_buffer(buffer, chunk); - - length -= chunk; - - if (length == 0) - break; - - assert(buffer != NULL); - } - } - - if (buffer != NULL) - g_queue_push_head(buffers, buffer); - - return nbytes; -} - -static void -copy_icy_tag(struct input_curl *c) -{ - struct tag *tag = icy_tag(&c->icy_metadata); - - if (tag == NULL) - return; - - if (c->tag != NULL) - tag_free(c->tag); - - if (c->meta_name != NULL && !tag_has_type(tag, TAG_NAME)) - tag_add_item(tag, TAG_NAME, c->meta_name); - - c->tag = tag; -} - -static bool -input_curl_available(struct input_stream *is) -{ - struct input_curl *c = (struct input_curl *)is; - - return c->postponed_error != NULL || c->easy == NULL || - !g_queue_is_empty(c->buffers); -} - -static size_t -input_curl_read(struct input_stream *is, void *ptr, size_t size, - GError **error_r) -{ - struct input_curl *c = (struct input_curl *)is; - bool success; - size_t nbytes = 0; - char *dest = ptr; - - do { - /* fill the buffer */ - - success = fill_buffer(c, error_r); - if (!success) - return 0; - - /* send buffer contents */ - - while (size > 0 && !g_queue_is_empty(c->buffers)) { - size_t copy = read_from_buffer(&c->icy_metadata, c->buffers, - dest + nbytes, size); - - nbytes += copy; - size -= copy; - } - } while (nbytes == 0); - - if (icy_defined(&c->icy_metadata)) - copy_icy_tag(c); - - is->offset += (goffset)nbytes; - -#if LIBCURL_VERSION_NUM >= 0x071200 - if (c->paused && curl_total_buffer_size(c) < CURL_RESUME_AT) { - g_mutex_unlock(c->base.mutex); - io_thread_call(input_curl_resume, c); - g_mutex_lock(c->base.mutex); - } -#endif - - return nbytes; -} - -static void -input_curl_close(struct input_stream *is) -{ - struct input_curl *c = (struct input_curl *)is; - - input_curl_free(c); -} - -static bool -input_curl_eof(G_GNUC_UNUSED struct input_stream *is) -{ - struct input_curl *c = (struct input_curl *)is; - - return c->easy == NULL && g_queue_is_empty(c->buffers); -} - -/** called by curl when new data is available */ -static size_t -input_curl_headerfunction(void *ptr, size_t size, size_t nmemb, void *stream) -{ - struct input_curl *c = (struct input_curl *)stream; - const char *header = ptr, *end, *value; - char name[64]; - - size *= nmemb; - end = header + size; - - value = memchr(header, ':', size); - if (value == NULL || (size_t)(value - header) >= sizeof(name)) - return size; - - memcpy(name, header, value - header); - name[value - header] = 0; - - /* skip the colon */ - - ++value; - - /* strip the value */ - - while (value < end && g_ascii_isspace(*value)) - ++value; - - while (end > value && g_ascii_isspace(end[-1])) - --end; - - if (g_ascii_strcasecmp(name, "accept-ranges") == 0) { - /* a stream with icy-metadata is not seekable */ - if (!icy_defined(&c->icy_metadata)) - c->base.seekable = true; - } else if (g_ascii_strcasecmp(name, "content-length") == 0) { - char buffer[64]; - - if ((size_t)(end - header) >= sizeof(buffer)) - return size; - - memcpy(buffer, value, end - value); - buffer[end - value] = 0; - - c->base.size = c->base.offset + g_ascii_strtoull(buffer, NULL, 10); - } else if (g_ascii_strcasecmp(name, "content-type") == 0) { - g_free(c->base.mime); - c->base.mime = g_strndup(value, end - value); - } else if (g_ascii_strcasecmp(name, "icy-name") == 0 || - g_ascii_strcasecmp(name, "ice-name") == 0 || - g_ascii_strcasecmp(name, "x-audiocast-name") == 0) { - g_free(c->meta_name); - c->meta_name = g_strndup(value, end - value); - - if (c->tag != NULL) - tag_free(c->tag); - - c->tag = tag_new(); - tag_add_item(c->tag, TAG_NAME, c->meta_name); - } else if (g_ascii_strcasecmp(name, "icy-metaint") == 0) { - char buffer[64]; - size_t icy_metaint; - - if ((size_t)(end - header) >= sizeof(buffer) || - icy_defined(&c->icy_metadata)) - return size; - - memcpy(buffer, value, end - value); - buffer[end - value] = 0; - - icy_metaint = g_ascii_strtoull(buffer, NULL, 10); - g_debug("icy-metaint=%zu", icy_metaint); - - if (icy_metaint > 0) { - icy_start(&c->icy_metadata, icy_metaint); - - /* a stream with icy-metadata is not - seekable */ - c->base.seekable = false; - } - } - - return size; -} - -/** called by curl when new data is available */ -static size_t -input_curl_writefunction(void *ptr, size_t size, size_t nmemb, void *stream) -{ - struct input_curl *c = (struct input_curl *)stream; - struct buffer *buffer; - - size *= nmemb; - if (size == 0) - return 0; - - g_mutex_lock(c->base.mutex); - -#if LIBCURL_VERSION_NUM >= 0x071200 - if (curl_total_buffer_size(c) + size >= CURL_MAX_BUFFERED) { - c->paused = true; - g_mutex_unlock(c->base.mutex); - return CURL_WRITEFUNC_PAUSE; - } -#endif - - buffer = g_malloc(sizeof(*buffer) - sizeof(buffer->data) + size); - buffer->size = size; - buffer->consumed = 0; - memcpy(buffer->data, ptr, size); - - g_queue_push_tail(c->buffers, buffer); - c->base.ready = true; - - g_cond_broadcast(c->base.cond); - g_mutex_unlock(c->base.mutex); - - return size; -} - -static bool -input_curl_easy_init(struct input_curl *c, GError **error_r) -{ - CURLcode code; - - c->easy = curl_easy_init(); - if (c->easy == NULL) { - g_set_error(error_r, curl_quark(), 0, - "curl_easy_init() failed"); - return false; - } - - curl_easy_setopt(c->easy, CURLOPT_USERAGENT, - "Music Player Daemon " VERSION); - curl_easy_setopt(c->easy, CURLOPT_HEADERFUNCTION, - input_curl_headerfunction); - curl_easy_setopt(c->easy, CURLOPT_WRITEHEADER, c); - curl_easy_setopt(c->easy, CURLOPT_WRITEFUNCTION, - input_curl_writefunction); - curl_easy_setopt(c->easy, CURLOPT_WRITEDATA, c); - curl_easy_setopt(c->easy, CURLOPT_HTTP200ALIASES, http_200_aliases); - curl_easy_setopt(c->easy, CURLOPT_FOLLOWLOCATION, 1); - curl_easy_setopt(c->easy, CURLOPT_NETRC, 1); - curl_easy_setopt(c->easy, CURLOPT_MAXREDIRS, 5); - curl_easy_setopt(c->easy, CURLOPT_FAILONERROR, true); - curl_easy_setopt(c->easy, CURLOPT_ERRORBUFFER, c->error); - curl_easy_setopt(c->easy, CURLOPT_NOPROGRESS, 1l); - curl_easy_setopt(c->easy, CURLOPT_NOSIGNAL, 1l); - curl_easy_setopt(c->easy, CURLOPT_CONNECTTIMEOUT, 10l); - - if (proxy != NULL) - curl_easy_setopt(c->easy, CURLOPT_PROXY, proxy); - - if (proxy_port > 0) - curl_easy_setopt(c->easy, CURLOPT_PROXYPORT, (long)proxy_port); - - if (proxy_user != NULL && proxy_password != NULL) { - char *proxy_auth_str = - g_strconcat(proxy_user, ":", proxy_password, NULL); - curl_easy_setopt(c->easy, CURLOPT_PROXYUSERPWD, proxy_auth_str); - g_free(proxy_auth_str); - } - - code = curl_easy_setopt(c->easy, CURLOPT_URL, c->url); - if (code != CURLE_OK) { - g_set_error(error_r, curl_quark(), code, - "curl_easy_setopt() failed: %s", - curl_easy_strerror(code)); - return false; - } - - c->request_headers = NULL; - c->request_headers = curl_slist_append(c->request_headers, - "Icy-Metadata: 1"); - curl_easy_setopt(c->easy, CURLOPT_HTTPHEADER, c->request_headers); - - return true; -} - -static bool -input_curl_seek(struct input_stream *is, goffset offset, int whence, - GError **error_r) -{ - struct input_curl *c = (struct input_curl *)is; - bool ret; - - assert(is->ready); - - if (whence == SEEK_SET && offset == is->offset) - /* no-op */ - return true; - - if (!is->seekable) - return false; - - /* calculate the absolute offset */ - - switch (whence) { - case SEEK_SET: - break; - - case SEEK_CUR: - offset += is->offset; - break; - - case SEEK_END: - if (is->size < 0) - /* stream size is not known */ - return false; - - offset += is->size; - break; - - default: - return false; - } - - if (offset < 0) - return false; - - /* check if we can fast-forward the buffer */ - - while (offset > is->offset && !g_queue_is_empty(c->buffers)) { - struct buffer *buffer; - size_t length; - - buffer = (struct buffer *)g_queue_pop_head(c->buffers); - - length = buffer->size - buffer->consumed; - if (offset - is->offset < (goffset)length) - length = offset - is->offset; - - buffer = consume_buffer(buffer, length); - if (buffer != NULL) - g_queue_push_head(c->buffers, buffer); - - is->offset += length; - } - - if (offset == is->offset) - return true; - - /* close the old connection and open a new one */ - - g_mutex_unlock(c->base.mutex); - - input_curl_easy_free_indirect(c); - input_curl_flush_buffers(c); - - is->offset = offset; - if (is->offset == is->size) { - /* seek to EOF: simulate empty result; avoid - triggering a "416 Requested Range Not Satisfiable" - response */ - return true; - } - - ret = input_curl_easy_init(c, error_r); - if (!ret) - return false; - - /* send the "Range" header */ - - if (is->offset > 0) { - c->range = g_strdup_printf("%lld-", (long long)is->offset); - curl_easy_setopt(c->easy, CURLOPT_RANGE, c->range); - } - - c->base.ready = false; - - if (!input_curl_easy_add_indirect(c, error_r)) - return false; - - g_mutex_lock(c->base.mutex); - - while (!c->base.ready) - g_cond_wait(c->base.cond, c->base.mutex); - - if (c->postponed_error != NULL) { - g_propagate_error(error_r, c->postponed_error); - c->postponed_error = NULL; - return false; - } - - return true; -} - -static struct input_stream * -input_curl_open(const char *url, GMutex *mutex, GCond *cond, - GError **error_r) -{ - assert(mutex != NULL); - assert(cond != NULL); - - struct input_curl *c; - - if (strncmp(url, "http://", 7) != 0) - return NULL; - - c = g_new0(struct input_curl, 1); - input_stream_init(&c->base, &input_plugin_curl, url, - mutex, cond); - - c->url = g_strdup(url); - c->buffers = g_queue_new(); - - icy_clear(&c->icy_metadata); - c->tag = NULL; - - c->postponed_error = NULL; - -#if LIBCURL_VERSION_NUM >= 0x071200 - c->paused = false; -#endif - - if (!input_curl_easy_init(c, error_r)) { - input_curl_free(c); - return NULL; - } - - if (!input_curl_easy_add_indirect(c, error_r)) { - input_curl_free(c); - return NULL; - } - - return &c->base; -} - -const struct input_plugin input_plugin_curl = { - .name = "curl", - .init = input_curl_init, - .finish = input_curl_finish, - - .open = input_curl_open, - .close = input_curl_close, - .check = input_curl_check, - .tag = input_curl_tag, - .available = input_curl_available, - .read = input_curl_read, - .eof = input_curl_eof, - .seek = input_curl_seek, -}; diff --git a/src/input/curl_input_plugin.h b/src/input/curl_input_plugin.h deleted file mode 100644 index c6e71bf40..000000000 --- a/src/input/curl_input_plugin.h +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Copyright (C) 2003-2011 The Music Player Daemon Project - * http://www.musicpd.org - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation; either version 2 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License along - * with this program; if not, write to the Free Software Foundation, Inc., - * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. - */ - -#ifndef MPD_INPUT_CURL_H -#define MPD_INPUT_CURL_H - -struct input_stream; - -extern const struct input_plugin input_plugin_curl; - -#endif diff --git a/src/input/soup_input_plugin.c b/src/input/soup_input_plugin.c deleted file mode 100644 index fc903b48c..000000000 --- a/src/input/soup_input_plugin.c +++ /dev/null @@ -1,473 +0,0 @@ -/* - * Copyright (C) 2003-2011 The Music Player Daemon Project - * http://www.musicpd.org - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation; either version 2 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License along - * with this program; if not, write to the Free Software Foundation, Inc., - * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. - */ - -#include "config.h" -#include "input/soup_input_plugin.h" -#include "input_internal.h" -#include "input_plugin.h" -#include "io_thread.h" -#include "conf.h" - -#include -#include - -#include -#include - -#undef G_LOG_DOMAIN -#define G_LOG_DOMAIN "input_soup" - -/** - * Do not buffer more than this number of bytes. It should be a - * reasonable limit that doesn't make low-end machines suffer too - * much, but doesn't cause stuttering on high-latency lines. - */ -static const size_t SOUP_MAX_BUFFERED = 512 * 1024; - -/** - * Resume the stream at this number of bytes after it has been paused. - */ -static const size_t SOUP_RESUME_AT = 384 * 1024; - -static SoupURI *soup_proxy; -static SoupSession *soup_session; - -struct input_soup { - struct input_stream base; - - SoupMessage *msg; - - GQueue *buffers; - - size_t current_consumed; - - size_t total_buffered; - - bool alive, pause, eof; - - /** - * Set when the session callback has been invoked, when it is - * safe to free this object. - */ - bool completed; - - GError *postponed_error; -}; - -static inline GQuark -soup_quark(void) -{ - return g_quark_from_static_string("soup"); -} - -static bool -input_soup_init(const struct config_param *param, GError **error_r) -{ - assert(soup_proxy == NULL); - assert(soup_session == NULL); - - g_type_init(); - - const char *proxy = config_get_block_string(param, "proxy", NULL); - - if (proxy != NULL) { - soup_proxy = soup_uri_new(proxy); - if (soup_proxy == NULL) { - g_set_error(error_r, soup_quark(), 0, - "failed to parse proxy setting"); - return false; - } - } - - soup_session = - soup_session_async_new_with_options(SOUP_SESSION_PROXY_URI, - soup_proxy, - SOUP_SESSION_ASYNC_CONTEXT, - io_thread_context(), - NULL); - - return true; -} - -static void -input_soup_finish(void) -{ - assert(soup_session != NULL); - - soup_session_abort(soup_session); - g_object_unref(G_OBJECT(soup_session)); - - if (soup_proxy != NULL) - soup_uri_free(soup_proxy); -} - -/** - * Copy the error from the SoupMessage object to - * input_soup::postponed_error. - * - * @return true if there was no error - */ -static bool -input_soup_copy_error(struct input_soup *s, const SoupMessage *msg) -{ - if (SOUP_STATUS_IS_SUCCESSFUL(msg->status_code)) - return true; - - if (msg->status_code == SOUP_STATUS_CANCELLED) - /* failure, but don't generate a GError, because this - status was caused by _close() */ - return false; - - if (s->postponed_error != NULL) - /* there's already a GError, don't overwrite it */ - return false; - - if (SOUP_STATUS_IS_TRANSPORT_ERROR(msg->status_code)) - s->postponed_error = - g_error_new(soup_quark(), msg->status_code, - "HTTP client error: %s", - msg->reason_phrase); - else - s->postponed_error = - g_error_new(soup_quark(), msg->status_code, - "got HTTP status: %d %s", - msg->status_code, msg->reason_phrase); - - return false; -} - -static void -input_soup_session_callback(G_GNUC_UNUSED SoupSession *session, - SoupMessage *msg, gpointer user_data) -{ - struct input_soup *s = user_data; - - assert(msg == s->msg); - assert(!s->completed); - - g_mutex_lock(s->base.mutex); - - if (!s->base.ready) - input_soup_copy_error(s, msg); - - s->base.ready = true; - s->alive = false; - s->completed = true; - - g_cond_broadcast(s->base.cond); - g_mutex_unlock(s->base.mutex); -} - -static void -input_soup_got_headers(SoupMessage *msg, gpointer user_data) -{ - struct input_soup *s = user_data; - - g_mutex_lock(s->base.mutex); - - if (!input_soup_copy_error(s, msg)) { - g_mutex_unlock(s->base.mutex); - - soup_session_cancel_message(soup_session, msg, - SOUP_STATUS_CANCELLED); - return; - } - - s->base.ready = true; - g_cond_broadcast(s->base.cond); - g_mutex_unlock(s->base.mutex); - - soup_message_body_set_accumulate(msg->response_body, false); -} - -static void -input_soup_got_chunk(SoupMessage *msg, SoupBuffer *chunk, gpointer user_data) -{ - struct input_soup *s = user_data; - - assert(msg == s->msg); - - g_mutex_lock(s->base.mutex); - - g_queue_push_tail(s->buffers, soup_buffer_copy(chunk)); - s->total_buffered += chunk->length; - - if (s->total_buffered >= SOUP_MAX_BUFFERED && !s->pause) { - s->pause = true; - soup_session_pause_message(soup_session, msg); - } - - g_cond_broadcast(s->base.cond); - g_mutex_unlock(s->base.mutex); -} - -static void -input_soup_got_body(G_GNUC_UNUSED SoupMessage *msg, gpointer user_data) -{ - struct input_soup *s = user_data; - - assert(msg == s->msg); - - g_mutex_lock(s->base.mutex); - - s->base.ready = true; - s->eof = true; - s->alive = false; - - g_cond_broadcast(s->base.cond); - g_mutex_unlock(s->base.mutex); -} - -static bool -input_soup_wait_data(struct input_soup *s) -{ - while (true) { - if (s->eof) - return true; - - if (!s->alive) - return false; - - if (!g_queue_is_empty(s->buffers)) - return true; - - assert(s->current_consumed == 0); - - g_cond_wait(s->base.cond, s->base.mutex); - } -} - -static gpointer -input_soup_queue(gpointer data) -{ - struct input_soup *s = data; - - soup_session_queue_message(soup_session, s->msg, - input_soup_session_callback, s); - - return NULL; -} - -static struct input_stream * -input_soup_open(const char *uri, - GMutex *mutex, GCond *cond, - G_GNUC_UNUSED GError **error_r) -{ - if (strncmp(uri, "http://", 7) != 0) - return NULL; - - struct input_soup *s = g_new(struct input_soup, 1); - input_stream_init(&s->base, &input_plugin_soup, uri, - mutex, cond); - - s->buffers = g_queue_new(); - s->current_consumed = 0; - s->total_buffered = 0; - -#if GCC_CHECK_VERSION(4,6) -#pragma GCC diagnostic push - /* the libsoup macro SOUP_METHOD_GET discards the "const" - attribute of the g_intern_static_string() return value; - don't make the gcc warning fatal: */ -#pragma GCC diagnostic ignored "-Wcast-qual" -#endif - - s->msg = soup_message_new(SOUP_METHOD_GET, uri); - -#if GCC_CHECK_VERSION(4,6) -#pragma GCC diagnostic pop -#endif - - soup_message_set_flags(s->msg, SOUP_MESSAGE_NO_REDIRECT); - - soup_message_headers_append(s->msg->request_headers, "User-Agent", - "Music Player Daemon " VERSION); - - g_signal_connect(s->msg, "got-headers", - G_CALLBACK(input_soup_got_headers), s); - g_signal_connect(s->msg, "got-chunk", - G_CALLBACK(input_soup_got_chunk), s); - g_signal_connect(s->msg, "got-body", - G_CALLBACK(input_soup_got_body), s); - - s->alive = true; - s->pause = false; - s->eof = false; - s->completed = false; - s->postponed_error = NULL; - - io_thread_call(input_soup_queue, s); - - return &s->base; -} - -static gpointer -input_soup_cancel(gpointer data) -{ - struct input_soup *s = data; - - if (!s->completed) - soup_session_cancel_message(soup_session, s->msg, - SOUP_STATUS_CANCELLED); - - return NULL; -} - -static void -input_soup_close(struct input_stream *is) -{ - struct input_soup *s = (struct input_soup *)is; - - g_mutex_lock(s->base.mutex); - - if (!s->completed) { - /* the messages's session callback hasn't been invoked - yet; cancel it and wait for completion */ - - g_mutex_unlock(s->base.mutex); - - io_thread_call(input_soup_cancel, s); - - g_mutex_lock(s->base.mutex); - while (!s->completed) - g_cond_wait(s->base.cond, s->base.mutex); - } - - g_mutex_unlock(s->base.mutex); - - SoupBuffer *buffer; - while ((buffer = g_queue_pop_head(s->buffers)) != NULL) - soup_buffer_free(buffer); - g_queue_free(s->buffers); - - if (s->postponed_error != NULL) - g_error_free(s->postponed_error); - - input_stream_deinit(&s->base); - g_free(s); -} - -static bool -input_soup_check(struct input_stream *is, GError **error_r) -{ - struct input_soup *s = (struct input_soup *)is; - - bool success = s->postponed_error == NULL; - if (!success) { - g_propagate_error(error_r, s->postponed_error); - s->postponed_error = NULL; - } - - return success; -} - -static bool -input_soup_available(struct input_stream *is) -{ - struct input_soup *s = (struct input_soup *)is; - - return s->eof || !s->alive || !g_queue_is_empty(s->buffers); -} - -static size_t -input_soup_read(struct input_stream *is, void *ptr, size_t size, - G_GNUC_UNUSED GError **error_r) -{ - struct input_soup *s = (struct input_soup *)is; - - if (!input_soup_wait_data(s)) { - assert(!s->alive); - - if (s->postponed_error != NULL) { - g_propagate_error(error_r, s->postponed_error); - s->postponed_error = NULL; - } else - g_set_error_literal(error_r, soup_quark(), 0, - "HTTP failure"); - return 0; - } - - char *p0 = ptr, *p = p0, *p_end = p0 + size; - - while (p < p_end) { - SoupBuffer *buffer = g_queue_pop_head(s->buffers); - if (buffer == NULL) { - assert(s->current_consumed == 0); - break; - } - - assert(s->current_consumed < buffer->length); - assert(s->total_buffered >= buffer->length); - - const char *q = buffer->data; - q += s->current_consumed; - - size_t remaining = buffer->length - s->current_consumed; - size_t nbytes = p_end - p; - if (nbytes > remaining) - nbytes = remaining; - - memcpy(p, q, nbytes); - p += nbytes; - - s->current_consumed += remaining; - if (s->current_consumed >= buffer->length) { - /* done with this buffer */ - s->total_buffered -= buffer->length; - soup_buffer_free(buffer); - s->current_consumed = 0; - } else { - /* partial read */ - assert(p == p_end); - - g_queue_push_head(s->buffers, buffer); - } - } - - if (s->pause && s->total_buffered < SOUP_RESUME_AT) { - s->pause = false; - soup_session_unpause_message(soup_session, s->msg); - } - - size_t nbytes = p - p0; - s->base.offset += nbytes; - - return nbytes; -} - -static bool -input_soup_eof(G_GNUC_UNUSED struct input_stream *is) -{ - struct input_soup *s = (struct input_soup *)is; - - return !s->alive && g_queue_is_empty(s->buffers); -} - -const struct input_plugin input_plugin_soup = { - .name = "soup", - .init = input_soup_init, - .finish = input_soup_finish, - - .open = input_soup_open, - .close = input_soup_close, - .check = input_soup_check, - .available = input_soup_available, - .read = input_soup_read, - .eof = input_soup_eof, -}; diff --git a/src/input/soup_input_plugin.h b/src/input/soup_input_plugin.h deleted file mode 100644 index 689b2d971..000000000 --- a/src/input/soup_input_plugin.h +++ /dev/null @@ -1,25 +0,0 @@ -/* - * Copyright (C) 2003-2011 The Music Player Daemon Project - * http://www.musicpd.org - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation; either version 2 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License along - * with this program; if not, write to the Free Software Foundation, Inc., - * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. - */ - -#ifndef MPD_INPUT_SOUP_H -#define MPD_INPUT_SOUP_H - -extern const struct input_plugin input_plugin_soup; - -#endif -- cgit v1.2.3