diff options
Diffstat (limited to '')
-rw-r--r-- | src/input/SoupInputPlugin.cxx | 492 |
1 files changed, 492 insertions, 0 deletions
diff --git a/src/input/SoupInputPlugin.cxx b/src/input/SoupInputPlugin.cxx new file mode 100644 index 000000000..e9767c20e --- /dev/null +++ b/src/input/SoupInputPlugin.cxx @@ -0,0 +1,492 @@ +/* + * Copyright (C) 2003-2013 The Music Player Daemon Project + * http://www.musicpd.org + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#include "config.h" +#include "SoupInputPlugin.hxx" +#include "InputPlugin.hxx" +#include "InputStream.hxx" +#include "InputInternal.hxx" +#include "IOThread.hxx" +#include "event/Loop.hxx" +#include "conf.h" + +extern "C" { +#include <libsoup/soup-uri.h> +#include <libsoup/soup-session-async.h> +} + +#include <assert.h> +#include <string.h> + +#undef G_LOG_DOMAIN +#define G_LOG_DOMAIN "input_soup" + +/** + * Do not buffer more than this number of bytes. It should be a + * reasonable limit that doesn't make low-end machines suffer too + * much, but doesn't cause stuttering on high-latency lines. + */ +static const size_t SOUP_MAX_BUFFERED = 512 * 1024; + +/** + * Resume the stream at this number of bytes after it has been paused. + */ +static const size_t SOUP_RESUME_AT = 384 * 1024; + +static SoupURI *soup_proxy; +static SoupSession *soup_session; + +struct SoupInputStream { + struct input_stream base; + + SoupMessage *msg; + + GQueue *buffers; + + size_t current_consumed; + + size_t total_buffered; + + bool alive, pause, eof; + + /** + * Set when the session callback has been invoked, when it is + * safe to free this object. + */ + bool completed; + + GError *postponed_error; + + SoupInputStream(const char *uri, Mutex &mutex, Cond &cond); + ~SoupInputStream(); + + bool CopyError(const SoupMessage *msg); + + bool WaitData(); + + size_t Read(void *ptr, size_t size, GError **error_r); +}; + +static inline GQuark +soup_quark(void) +{ + return g_quark_from_static_string("soup"); +} + +static bool +input_soup_init(const struct config_param *param, GError **error_r) +{ + assert(soup_proxy == NULL); + assert(soup_session == NULL); + + g_type_init(); + + const char *proxy = config_get_block_string(param, "proxy", NULL); + + if (proxy != NULL) { + soup_proxy = soup_uri_new(proxy); + if (soup_proxy == NULL) { + g_set_error(error_r, soup_quark(), 0, + "failed to parse proxy setting"); + return false; + } + } + + soup_session = + soup_session_async_new_with_options(SOUP_SESSION_PROXY_URI, + soup_proxy, + SOUP_SESSION_ASYNC_CONTEXT, + io_thread_get().GetContext(), + NULL); + + return true; +} + +static void +input_soup_finish(void) +{ + assert(soup_session != NULL); + + soup_session_abort(soup_session); + g_object_unref(G_OBJECT(soup_session)); + + if (soup_proxy != NULL) + soup_uri_free(soup_proxy); +} + +/** + * Copy the error from the SoupMessage object to + * input_soup::postponed_error. + * + * @return true if there was no error + */ +bool +SoupInputStream::CopyError(const SoupMessage *src) +{ + if (SOUP_STATUS_IS_SUCCESSFUL(src->status_code)) + return true; + + if (src->status_code == SOUP_STATUS_CANCELLED) + /* failure, but don't generate a GError, because this + status was caused by _close() */ + return false; + + if (postponed_error != nullptr) + /* there's already a GError, don't overwrite it */ + return false; + + if (SOUP_STATUS_IS_TRANSPORT_ERROR(src->status_code)) + postponed_error = + g_error_new(soup_quark(), src->status_code, + "HTTP client error: %s", + src->reason_phrase); + else + postponed_error = + g_error_new(soup_quark(), src->status_code, + "got HTTP status: %d %s", + src->status_code, src->reason_phrase); + + return false; +} + +static void +input_soup_session_callback(G_GNUC_UNUSED SoupSession *session, + SoupMessage *msg, gpointer user_data) +{ + SoupInputStream *s = (SoupInputStream *)user_data; + + assert(msg == s->msg); + assert(!s->completed); + + const ScopeLock protect(s->base.mutex); + + if (!s->base.ready) + s->CopyError(msg); + + s->base.ready = true; + s->alive = false; + s->completed = true; + + s->base.cond.broadcast(); +} + +static void +input_soup_got_headers(SoupMessage *msg, gpointer user_data) +{ + SoupInputStream *s = (SoupInputStream *)user_data; + + s->base.mutex.lock(); + + if (!s->CopyError(msg)) { + s->base.mutex.unlock(); + + soup_session_cancel_message(soup_session, msg, + SOUP_STATUS_CANCELLED); + return; + } + + s->base.ready = true; + s->base.cond.broadcast(); + s->base.mutex.unlock(); + + soup_message_body_set_accumulate(msg->response_body, false); +} + +static void +input_soup_got_chunk(SoupMessage *msg, SoupBuffer *chunk, gpointer user_data) +{ + SoupInputStream *s = (SoupInputStream *)user_data; + + assert(msg == s->msg); + + const ScopeLock protect(s->base.mutex); + + g_queue_push_tail(s->buffers, soup_buffer_copy(chunk)); + s->total_buffered += chunk->length; + + if (s->total_buffered >= SOUP_MAX_BUFFERED && !s->pause) { + s->pause = true; + soup_session_pause_message(soup_session, msg); + } + + s->base.cond.broadcast(); + s->base.mutex.unlock(); +} + +static void +input_soup_got_body(G_GNUC_UNUSED SoupMessage *msg, gpointer user_data) +{ + SoupInputStream *s = (SoupInputStream *)user_data; + + assert(msg == s->msg); + + const ScopeLock protect(s->base.mutex); + + s->base.ready = true; + s->eof = true; + s->alive = false; + + s->base.cond.broadcast(); + s->base.mutex.unlock(); +} + +inline bool +SoupInputStream::WaitData() +{ + while (true) { + if (eof) + return true; + + if (!alive) + return false; + + if (!g_queue_is_empty(buffers)) + return true; + + assert(current_consumed == 0); + + base.cond.wait(base.mutex); + } +} + +static gpointer +input_soup_queue(gpointer data) +{ + SoupInputStream *s = (SoupInputStream *)data; + + soup_session_queue_message(soup_session, s->msg, + input_soup_session_callback, s); + + return NULL; +} + +SoupInputStream::SoupInputStream(const char *uri, + Mutex &mutex, Cond &cond) + :base(input_plugin_soup, uri, mutex, cond), + buffers(g_queue_new()), + current_consumed(0), total_buffered(0), + alive(false), pause(false), eof(false), completed(false), + postponed_error(nullptr) +{ +#if GCC_CHECK_VERSION(4,6) +#pragma GCC diagnostic push + /* the libsoup macro SOUP_METHOD_GET discards the "const" + attribute of the g_intern_static_string() return value; + don't make the gcc warning fatal: */ +#pragma GCC diagnostic ignored "-Wcast-qual" +#endif + + msg = soup_message_new(SOUP_METHOD_GET, uri); + +#if GCC_CHECK_VERSION(4,6) +#pragma GCC diagnostic pop +#endif + + soup_message_set_flags(msg, SOUP_MESSAGE_NO_REDIRECT); + + soup_message_headers_append(msg->request_headers, "User-Agent", + "Music Player Daemon " VERSION); + + g_signal_connect(msg, "got-headers", + G_CALLBACK(input_soup_got_headers), this); + g_signal_connect(msg, "got-chunk", + G_CALLBACK(input_soup_got_chunk), this); + g_signal_connect(msg, "got-body", + G_CALLBACK(input_soup_got_body), this); + + io_thread_call(input_soup_queue, this); +} + +static struct input_stream * +input_soup_open(const char *uri, + Mutex &mutex, Cond &cond, + G_GNUC_UNUSED GError **error_r) +{ + if (strncmp(uri, "http://", 7) != 0) + return NULL; + + SoupInputStream *s = new SoupInputStream(uri, mutex, cond); + return &s->base; +} + +static gpointer +input_soup_cancel(gpointer data) +{ + SoupInputStream *s = (SoupInputStream *)data; + + if (!s->completed) + soup_session_cancel_message(soup_session, s->msg, + SOUP_STATUS_CANCELLED); + + return NULL; +} + +SoupInputStream::~SoupInputStream() +{ + base.mutex.lock(); + + if (!completed) { + /* the messages's session callback hasn't been invoked + yet; cancel it and wait for completion */ + + base.mutex.unlock(); + + io_thread_call(input_soup_cancel, this); + + base.mutex.lock(); + while (!completed) + base.cond.wait(base.mutex); + } + + base.mutex.unlock(); + + SoupBuffer *buffer; + while ((buffer = (SoupBuffer *)g_queue_pop_head(buffers)) != NULL) + soup_buffer_free(buffer); + g_queue_free(buffers); + + if (postponed_error != NULL) + g_error_free(postponed_error); +} + +static void +input_soup_close(struct input_stream *is) +{ + SoupInputStream *s = (SoupInputStream *)is; + + delete s; +} + +static bool +input_soup_check(struct input_stream *is, GError **error_r) +{ + SoupInputStream *s = (SoupInputStream *)is; + + bool success = s->postponed_error == NULL; + if (!success) { + g_propagate_error(error_r, s->postponed_error); + s->postponed_error = NULL; + } + + return success; +} + +static bool +input_soup_available(struct input_stream *is) +{ + SoupInputStream *s = (SoupInputStream *)is; + + return s->eof || !s->alive || !g_queue_is_empty(s->buffers); +} + +inline size_t +SoupInputStream::Read(void *ptr, size_t size, GError **error_r) +{ + if (!WaitData()) { + assert(!alive); + + if (postponed_error != nullptr) { + g_propagate_error(error_r, postponed_error); + postponed_error = nullptr; + } else + g_set_error_literal(error_r, soup_quark(), 0, + "HTTP failure"); + return 0; + } + + char *p0 = (char *)ptr, *p = p0, *p_end = p0 + size; + + while (p < p_end) { + SoupBuffer *buffer = (SoupBuffer *) + g_queue_pop_head(buffers); + if (buffer == NULL) { + assert(current_consumed == 0); + break; + } + + assert(current_consumed < buffer->length); + assert(total_buffered >= buffer->length); + + const char *q = buffer->data; + q += current_consumed; + + size_t remaining = buffer->length - current_consumed; + size_t nbytes = p_end - p; + if (nbytes > remaining) + nbytes = remaining; + + memcpy(p, q, nbytes); + p += nbytes; + + current_consumed += remaining; + if (current_consumed >= buffer->length) { + /* done with this buffer */ + total_buffered -= buffer->length; + soup_buffer_free(buffer); + current_consumed = 0; + } else { + /* partial read */ + assert(p == p_end); + + g_queue_push_head(buffers, buffer); + } + } + + if (pause && total_buffered < SOUP_RESUME_AT) { + pause = false; + soup_session_unpause_message(soup_session, msg); + } + + size_t nbytes = p - p0; + base.offset += nbytes; + + return nbytes; +} + +static size_t +input_soup_read(struct input_stream *is, void *ptr, size_t size, + GError **error_r) +{ + SoupInputStream *s = (SoupInputStream *)is; + + return s->Read(ptr, size, error_r); +} + +static bool +input_soup_eof(G_GNUC_UNUSED struct input_stream *is) +{ + SoupInputStream *s = (SoupInputStream *)is; + + return !s->alive && g_queue_is_empty(s->buffers); +} + +const struct input_plugin input_plugin_soup = { + "soup", + input_soup_init, + input_soup_finish, + input_soup_open, + input_soup_close, + input_soup_check, + nullptr, + nullptr, + input_soup_available, + input_soup_read, + input_soup_eof, + nullptr, +}; |