diff options
author | Max Kellermann <max@duempel.org> | 2011-09-14 21:46:41 +0200 |
---|---|---|
committer | Max Kellermann <max@duempel.org> | 2011-09-16 21:22:13 +0200 |
commit | 754f26a97c816781e80500d98f2515ae97836145 (patch) | |
tree | caa7dbaa879b29d018a4559524390670ad33a605 /src/input/soup_input_plugin.c | |
parent | 29241c4f835797f635816a9f37528aa981f722b5 (diff) | |
download | mpd-754f26a97c816781e80500d98f2515ae97836145.tar.gz mpd-754f26a97c816781e80500d98f2515ae97836145.tar.xz mpd-754f26a97c816781e80500d98f2515ae97836145.zip |
input_stream: non-blocking I/O
Add GMutex, GCond attributes which will be used by callers to
conditionally wait on the stream.
Remove the (now-useless) plugin method buffer(), wait on GCond
instead. Lock the input_stream before each method call. Do the same
with the playlist plugins.
Diffstat (limited to 'src/input/soup_input_plugin.c')
-rw-r--r-- | src/input/soup_input_plugin.c | 102 |
1 files changed, 29 insertions, 73 deletions
diff --git a/src/input/soup_input_plugin.c b/src/input/soup_input_plugin.c index 23665c1a2..dc005a58c 100644 --- a/src/input/soup_input_plugin.c +++ b/src/input/soup_input_plugin.c @@ -46,9 +46,6 @@ static SoupSession *soup_session; struct input_soup { struct input_stream base; - GMutex *mutex; - GCond *cond; - SoupMessage *msg; GQueue *buffers; @@ -124,14 +121,14 @@ input_soup_session_callback(G_GNUC_UNUSED SoupSession *session, assert(msg == s->msg); assert(!s->completed); - g_mutex_lock(s->mutex); + g_mutex_lock(s->base.mutex); s->base.ready = true; s->alive = false; s->completed = true; - g_cond_broadcast(s->cond); - g_mutex_unlock(s->mutex); + g_cond_broadcast(s->base.cond); + g_mutex_unlock(s->base.mutex); } static void @@ -140,7 +137,7 @@ input_soup_got_headers(SoupMessage *msg, gpointer user_data) struct input_soup *s = user_data; if (!SOUP_STATUS_IS_SUCCESSFUL(msg->status_code)) { - g_mutex_lock(s->mutex); + g_mutex_lock(s->base.mutex); if (s->postponed_error == NULL) s->postponed_error = @@ -148,7 +145,7 @@ input_soup_got_headers(SoupMessage *msg, gpointer user_data) "got HTTP status %d", msg->status_code); - g_mutex_unlock(s->mutex); + g_mutex_unlock(s->base.mutex); soup_session_cancel_message(soup_session, msg, SOUP_STATUS_CANCELLED); @@ -157,10 +154,10 @@ input_soup_got_headers(SoupMessage *msg, gpointer user_data) soup_message_body_set_accumulate(msg->response_body, false); - g_mutex_lock(s->mutex); + g_mutex_lock(s->base.mutex); s->base.ready = true; - g_cond_broadcast(s->cond); - g_mutex_unlock(s->mutex); + g_cond_broadcast(s->base.cond); + g_mutex_unlock(s->base.mutex); } static void @@ -170,7 +167,7 @@ input_soup_got_chunk(SoupMessage *msg, SoupBuffer *chunk, gpointer user_data) assert(msg == s->msg); - g_mutex_lock(s->mutex); + g_mutex_lock(s->base.mutex); g_queue_push_tail(s->buffers, soup_buffer_copy(chunk)); s->total_buffered += chunk->length; @@ -180,8 +177,8 @@ input_soup_got_chunk(SoupMessage *msg, SoupBuffer *chunk, gpointer user_data) soup_session_pause_message(soup_session, msg); } - g_cond_broadcast(s->cond); - g_mutex_unlock(s->mutex); + g_cond_broadcast(s->base.cond); + g_mutex_unlock(s->base.mutex); } static void @@ -191,14 +188,14 @@ input_soup_got_body(G_GNUC_UNUSED SoupMessage *msg, gpointer user_data) assert(msg == s->msg); - g_mutex_lock(s->mutex); + g_mutex_lock(s->base.mutex); s->base.ready = true; s->eof = true; s->alive = false; - g_cond_broadcast(s->cond); - g_mutex_unlock(s->mutex); + g_cond_broadcast(s->base.cond); + g_mutex_unlock(s->base.mutex); } static bool @@ -216,7 +213,7 @@ input_soup_wait_data(struct input_soup *s) assert(s->current_consumed == 0); - g_cond_wait(s->cond, s->mutex); + g_cond_wait(s->base.cond, s->base.mutex); } } @@ -232,16 +229,16 @@ input_soup_queue(gpointer data) } static struct input_stream * -input_soup_open(const char *uri, G_GNUC_UNUSED GError **error_r) +input_soup_open(const char *uri, + GMutex *mutex, GCond *cond, + G_GNUC_UNUSED GError **error_r) { if (strncmp(uri, "http://", 7) != 0) return NULL; struct input_soup *s = g_new(struct input_soup, 1); - input_stream_init(&s->base, &input_plugin_soup, uri); - - s->mutex = g_mutex_new(); - s->cond = g_cond_new(); + input_stream_init(&s->base, &input_plugin_soup, uri, + mutex, cond); s->buffers = g_queue_new(); s->current_consumed = 0; @@ -288,25 +285,22 @@ input_soup_close(struct input_stream *is) { struct input_soup *s = (struct input_soup *)is; - g_mutex_lock(s->mutex); + g_mutex_lock(s->base.mutex); if (!s->completed) { /* the messages's session callback hasn't been invoked yet; cancel it and wait for completion */ - g_mutex_unlock(s->mutex); + g_mutex_unlock(s->base.mutex); io_thread_call(input_soup_cancel, s); - g_mutex_lock(s->mutex); + g_mutex_lock(s->base.mutex); while (!s->completed) - g_cond_wait(s->cond, s->mutex); + g_cond_wait(s->base.cond, s->base.mutex); } - g_mutex_unlock(s->mutex); - - g_mutex_free(s->mutex); - g_cond_free(s->cond); + g_mutex_unlock(s->base.mutex); SoupBuffer *buffer; while ((buffer = g_queue_pop_head(s->buffers)) != NULL) @@ -325,54 +319,21 @@ input_soup_check(struct input_stream *is, GError **error_r) { struct input_soup *s = (struct input_soup *)is; - g_mutex_lock(s->mutex); - bool success = s->postponed_error == NULL; if (!success) { g_propagate_error(error_r, s->postponed_error); s->postponed_error = NULL; } - g_mutex_unlock(s->mutex); return success; } -static int -input_soup_buffer(struct input_stream *is, GError **error_r) +static bool +input_soup_available(struct input_stream *is) { struct input_soup *s = (struct input_soup *)is; - g_mutex_lock(s->mutex); - - if (s->pause) { - if (s->total_buffered >= SOUP_MAX_BUFFERED) { - g_mutex_unlock(s->mutex); - return 1; - } - - s->pause = false; - soup_session_unpause_message(soup_session, s->msg); - } - - - bool success = input_soup_wait_data(s); - - if (!success) { - if (s->postponed_error != NULL) { - g_propagate_error(error_r, s->postponed_error); - s->postponed_error = NULL; - } else - g_set_error_literal(error_r, soup_quark(), 0, - "HTTP failure"); - } - - g_mutex_unlock(s->mutex); - - if (!success) { - return -1; - } - - return 1; + return s->eof || !s->alive || !g_queue_is_empty(s->buffers); } static size_t @@ -381,8 +342,6 @@ input_soup_read(struct input_stream *is, void *ptr, size_t size, { struct input_soup *s = (struct input_soup *)is; - g_mutex_lock(s->mutex); - if (!input_soup_wait_data(s)) { assert(!s->alive); @@ -392,8 +351,6 @@ input_soup_read(struct input_stream *is, void *ptr, size_t size, } else g_set_error_literal(error_r, soup_quark(), 0, "HTTP failure"); - - g_mutex_unlock(s->mutex); return 0; } @@ -442,7 +399,6 @@ input_soup_read(struct input_stream *is, void *ptr, size_t size, size_t nbytes = p - p0; s->base.offset += nbytes; - g_mutex_unlock(s->mutex); return nbytes; } @@ -462,7 +418,7 @@ const struct input_plugin input_plugin_soup = { .open = input_soup_open, .close = input_soup_close, .check = input_soup_check, - .buffer = input_soup_buffer, + .available = input_soup_available, .read = input_soup_read, .eof = input_soup_eof, }; |