diff options
author | Max Kellermann <max@duempel.org> | 2011-09-14 21:46:41 +0200 |
---|---|---|
committer | Max Kellermann <max@duempel.org> | 2011-09-16 21:22:13 +0200 |
commit | 754f26a97c816781e80500d98f2515ae97836145 (patch) | |
tree | caa7dbaa879b29d018a4559524390670ad33a605 /src/decoder | |
parent | 29241c4f835797f635816a9f37528aa981f722b5 (diff) | |
download | mpd-754f26a97c816781e80500d98f2515ae97836145.tar.gz mpd-754f26a97c816781e80500d98f2515ae97836145.tar.xz mpd-754f26a97c816781e80500d98f2515ae97836145.zip |
input_stream: non-blocking I/O
Add GMutex, GCond attributes which will be used by callers to
conditionally wait on the stream.
Remove the (now-useless) plugin method buffer(), wait on GCond
instead. Lock the input_stream before each method call. Do the same
with the playlist plugins.
Diffstat (limited to '')
-rw-r--r-- | src/decoder/audiofile_decoder_plugin.c | 4 | ||||
-rw-r--r-- | src/decoder/faad_decoder_plugin.c | 4 | ||||
-rw-r--r-- | src/decoder/ffmpeg_decoder_plugin.c | 5 | ||||
-rw-r--r-- | src/decoder/flac_decoder_plugin.c | 9 | ||||
-rw-r--r-- | src/decoder/mad_decoder_plugin.c | 2 | ||||
-rw-r--r-- | src/decoder/modplug_decoder_plugin.c | 2 | ||||
-rw-r--r-- | src/decoder/mp4ff_decoder_plugin.c | 3 | ||||
-rw-r--r-- | src/decoder/mpcdec_decoder_plugin.c | 2 | ||||
-rw-r--r-- | src/decoder/pcm_decoder_plugin.c | 5 | ||||
-rw-r--r-- | src/decoder/sndfile_decoder_plugin.c | 4 | ||||
-rw-r--r-- | src/decoder/vorbis_decoder_plugin.c | 4 | ||||
-rw-r--r-- | src/decoder/wavpack_decoder_plugin.c | 12 | ||||
-rw-r--r-- | src/decoder_api.c | 19 | ||||
-rw-r--r-- | src/decoder_thread.c | 44 |
14 files changed, 68 insertions, 51 deletions
diff --git a/src/decoder/audiofile_decoder_plugin.c b/src/decoder/audiofile_decoder_plugin.c index c862168f8..0cf54bc60 100644 --- a/src/decoder/audiofile_decoder_plugin.c +++ b/src/decoder/audiofile_decoder_plugin.c @@ -53,7 +53,7 @@ audiofile_file_read(AFvirtualfile *vfile, void *data, size_t length) GError *error = NULL; size_t nbytes; - nbytes = input_stream_read(is, data, length, &error); + nbytes = input_stream_lock_read(is, data, length, &error); if (nbytes == 0 && error != NULL) { g_warning("%s", error->message); g_error_free(error); @@ -90,7 +90,7 @@ audiofile_file_seek(AFvirtualfile *vfile, long offset, int is_relative) { struct input_stream *is = (struct input_stream *) vfile->closure; int whence = (is_relative ? SEEK_CUR : SEEK_SET); - if (input_stream_seek(is, offset, whence, NULL)) { + if (input_stream_lock_seek(is, offset, whence, NULL)) { return is->offset; } else { return -1; diff --git a/src/decoder/faad_decoder_plugin.c b/src/decoder/faad_decoder_plugin.c index 02c72a4a1..91aa5392a 100644 --- a/src/decoder/faad_decoder_plugin.c +++ b/src/decoder/faad_decoder_plugin.c @@ -205,7 +205,7 @@ faad_song_duration(struct decoder_buffer *buffer, struct input_stream *is) /* obtain the duration from the ADTS header */ float song_length = adts_song_duration(buffer); - input_stream_seek(is, tagsize, SEEK_SET, NULL); + input_stream_lock_seek(is, tagsize, SEEK_SET, NULL); data = decoder_buffer_read(buffer, &length); if (data != NULL) @@ -406,7 +406,7 @@ faad_stream_decode(struct decoder *mpd_decoder, struct input_stream *is) faacDecSetConfiguration(decoder, config); while (!decoder_buffer_is_full(buffer) && - !input_stream_eof(is) && + !input_stream_lock_eof(is) && decoder_get_command(mpd_decoder) == DECODE_COMMAND_NONE) { adts_find_frame(buffer); decoder_buffer_fill(buffer); diff --git a/src/decoder/ffmpeg_decoder_plugin.c b/src/decoder/ffmpeg_decoder_plugin.c index b4f1f0b51..2923c1400 100644 --- a/src/decoder/ffmpeg_decoder_plugin.c +++ b/src/decoder/ffmpeg_decoder_plugin.c @@ -105,7 +105,7 @@ mpd_ffmpeg_stream_seek(void *opaque, int64_t pos, int whence) if (whence == AVSEEK_SIZE) return stream->input->size; - if (!input_stream_seek(stream->input, pos, whence, NULL)) + if (!input_stream_lock_seek(stream->input, pos, whence, NULL)) return -1; return stream->input->offset; @@ -320,7 +320,8 @@ ffmpeg_probe(struct decoder *decoder, struct input_stream *is) unsigned char *buffer = g_malloc(BUFFER_SIZE); size_t nbytes = decoder_read(decoder, is, buffer, BUFFER_SIZE); - if (nbytes <= PADDING || !input_stream_seek(is, 0, SEEK_SET, NULL)) { + if (nbytes <= PADDING || + !input_stream_lock_seek(is, 0, SEEK_SET, NULL)) { g_free(buffer); return NULL; } diff --git a/src/decoder/flac_decoder_plugin.c b/src/decoder/flac_decoder_plugin.c index ca9cd5968..2a826f862 100644 --- a/src/decoder/flac_decoder_plugin.c +++ b/src/decoder/flac_decoder_plugin.c @@ -50,7 +50,7 @@ flac_read_cb(G_GNUC_UNUSED const FLAC__StreamDecoder *fd, if (r == 0) { if (decoder_get_command(data->decoder) != DECODE_COMMAND_NONE || - input_stream_eof(data->input_stream)) + input_stream_lock_eof(data->input_stream)) return FLAC__STREAM_DECODER_READ_STATUS_END_OF_STREAM; else return FLAC__STREAM_DECODER_READ_STATUS_ABORT; @@ -68,7 +68,8 @@ flac_seek_cb(G_GNUC_UNUSED const FLAC__StreamDecoder *fd, if (!data->input_stream->seekable) return FLAC__STREAM_DECODER_SEEK_STATUS_UNSUPPORTED; - if (!input_stream_seek(data->input_stream, offset, SEEK_SET, NULL)) + if (!input_stream_lock_seek(data->input_stream, offset, SEEK_SET, + NULL)) return FLAC__STREAM_DECODER_SEEK_STATUS_ERROR; return FLAC__STREAM_DECODER_SEEK_STATUS_OK; @@ -109,7 +110,7 @@ flac_eof_cb(G_GNUC_UNUSED const FLAC__StreamDecoder *fd, void *fdata) return (decoder_get_command(data->decoder) != DECODE_COMMAND_NONE && decoder_get_command(data->decoder) != DECODE_COMMAND_SEEK) || - input_stream_eof(data->input_stream); + input_stream_lock_eof(data->input_stream); } static void @@ -449,7 +450,7 @@ oggflac_decode(struct decoder *decoder, struct input_stream *input_stream) /* rewind the stream, because ogg_stream_type_detect() has moved it */ - input_stream_seek(input_stream, 0, SEEK_SET, NULL); + input_stream_lock_seek(input_stream, 0, SEEK_SET, NULL); flac_decode_internal(decoder, input_stream, true); } diff --git a/src/decoder/mad_decoder_plugin.c b/src/decoder/mad_decoder_plugin.c index 8f77052f7..8bf3f6546 100644 --- a/src/decoder/mad_decoder_plugin.c +++ b/src/decoder/mad_decoder_plugin.c @@ -168,7 +168,7 @@ mp3_data_init(struct mp3_data *data, struct decoder *decoder, static bool mp3_seek(struct mp3_data *data, long offset) { - if (!input_stream_seek(data->input_stream, offset, SEEK_SET, NULL)) + if (!input_stream_lock_seek(data->input_stream, offset, SEEK_SET, NULL)) return false; mad_stream_buffer(&data->stream, data->input_buffer, 0); diff --git a/src/decoder/modplug_decoder_plugin.c b/src/decoder/modplug_decoder_plugin.c index 341b00927..9345dd240 100644 --- a/src/decoder/modplug_decoder_plugin.c +++ b/src/decoder/modplug_decoder_plugin.c @@ -62,7 +62,7 @@ static GByteArray *mod_loadfile(struct decoder *decoder, struct input_stream *is while (true) { ret = decoder_read(decoder, is, data, MODPLUG_READ_BLOCK); if (ret == 0) { - if (input_stream_eof(is)) + if (input_stream_lock_eof(is)) /* end of file */ break; diff --git a/src/decoder/mp4ff_decoder_plugin.c b/src/decoder/mp4ff_decoder_plugin.c index 38ae5793a..6475211a4 100644 --- a/src/decoder/mp4ff_decoder_plugin.c +++ b/src/decoder/mp4ff_decoder_plugin.c @@ -102,7 +102,8 @@ mp4_seek(void *user_data, uint64_t position) { struct mp4ff_input_stream *mis = user_data; - return input_stream_seek(mis->input_stream, position, SEEK_SET, NULL) + return input_stream_lock_seek(mis->input_stream, position, SEEK_SET, + NULL) ? 0 : -1; } diff --git a/src/decoder/mpcdec_decoder_plugin.c b/src/decoder/mpcdec_decoder_plugin.c index f31dcdb99..7864c0790 100644 --- a/src/decoder/mpcdec_decoder_plugin.c +++ b/src/decoder/mpcdec_decoder_plugin.c @@ -61,7 +61,7 @@ mpc_seek_cb(cb_first_arg, mpc_int32_t offset) { struct mpc_decoder_data *data = (struct mpc_decoder_data *) cb_data; - return input_stream_seek(data->is, offset, SEEK_SET, NULL); + return input_stream_lock_seek(data->is, offset, SEEK_SET, NULL); } static mpc_int32_t diff --git a/src/decoder/pcm_decoder_plugin.c b/src/decoder/pcm_decoder_plugin.c index c8340ab67..24ad93cb6 100644 --- a/src/decoder/pcm_decoder_plugin.c +++ b/src/decoder/pcm_decoder_plugin.c @@ -52,7 +52,7 @@ pcm_stream_decode(struct decoder *decoder, struct input_stream *is) size_t nbytes = decoder_read(decoder, is, buffer, sizeof(buffer)); - if (nbytes == 0 && input_stream_eof(is)) + if (nbytes == 0 && input_stream_lock_eof(is)) break; cmd = nbytes > 0 @@ -62,7 +62,8 @@ pcm_stream_decode(struct decoder *decoder, struct input_stream *is) if (cmd == DECODE_COMMAND_SEEK) { goffset offset = (goffset)(time_to_size * decoder_seek_where(decoder)); - if (input_stream_seek(is, offset, SEEK_SET, &error)) { + if (input_stream_lock_seek(is, offset, SEEK_SET, + &error)) { decoder_command_finished(decoder); } else { g_warning("seeking failed: %s", error->message); diff --git a/src/decoder/sndfile_decoder_plugin.c b/src/decoder/sndfile_decoder_plugin.c index dbe9bf067..25952dfd5 100644 --- a/src/decoder/sndfile_decoder_plugin.c +++ b/src/decoder/sndfile_decoder_plugin.c @@ -40,7 +40,7 @@ sndfile_vio_seek(sf_count_t offset, int whence, void *user_data) struct input_stream *is = user_data; bool success; - success = input_stream_seek(is, offset, whence, NULL); + success = input_stream_lock_seek(is, offset, whence, NULL); if (!success) return -1; @@ -54,7 +54,7 @@ sndfile_vio_read(void *ptr, sf_count_t count, void *user_data) GError *error = NULL; size_t nbytes; - nbytes = input_stream_read(is, ptr, count, &error); + nbytes = input_stream_lock_read(is, ptr, count, &error); if (nbytes == 0 && error != NULL) { g_warning("%s", error->message); g_error_free(error); diff --git a/src/decoder/vorbis_decoder_plugin.c b/src/decoder/vorbis_decoder_plugin.c index c130005a7..f73a41895 100644 --- a/src/decoder/vorbis_decoder_plugin.c +++ b/src/decoder/vorbis_decoder_plugin.c @@ -80,7 +80,7 @@ static int ogg_seek_cb(void *data, ogg_int64_t offset, int whence) return vis->seekable && (!vis->decoder || decoder_get_command(vis->decoder) != DECODE_COMMAND_STOP) && - input_stream_seek(vis->input_stream, offset, whence, NULL) + input_stream_lock_seek(vis->input_stream, offset, whence, NULL) ? 0 : -1; } @@ -290,7 +290,7 @@ vorbis_stream_decode(struct decoder *decoder, /* rewind the stream, because ogg_stream_type_detect() has moved it */ - input_stream_seek(input_stream, 0, SEEK_SET, NULL); + input_stream_lock_seek(input_stream, 0, SEEK_SET, NULL); if (!vorbis_is_open(&vis, &vf, decoder, input_stream)) return; diff --git a/src/decoder/wavpack_decoder_plugin.c b/src/decoder/wavpack_decoder_plugin.c index 200bf6455..bfb13b2a4 100644 --- a/src/decoder/wavpack_decoder_plugin.c +++ b/src/decoder/wavpack_decoder_plugin.c @@ -390,13 +390,15 @@ wavpack_input_get_pos(void *id) static int wavpack_input_set_pos_abs(void *id, uint32_t pos) { - return input_stream_seek(wpin(id)->is, pos, SEEK_SET, NULL) ? 0 : -1; + return input_stream_lock_seek(wpin(id)->is, pos, SEEK_SET, NULL) + ? 0 : -1; } static int wavpack_input_set_pos_rel(void *id, int32_t delta, int mode) { - return input_stream_seek(wpin(id)->is, delta, mode, NULL) ? 0 : -1; + return input_stream_lock_seek(wpin(id)->is, delta, mode, NULL) + ? 0 : -1; } static int @@ -447,6 +449,7 @@ wavpack_input_init(struct wavpack_input *isp, struct decoder *decoder, static struct input_stream * wavpack_open_wvc(struct decoder *decoder, const char *uri, + GMutex *mutex, GCond *cond, struct wavpack_input *wpi) { struct input_stream *is_wvc; @@ -462,7 +465,7 @@ wavpack_open_wvc(struct decoder *decoder, const char *uri, return false; wvc_url = g_strconcat(uri, "c", NULL); - is_wvc = input_stream_open(wvc_url, NULL); + is_wvc = input_stream_open(wvc_url, mutex, cond, NULL); g_free(wvc_url); if (is_wvc == NULL) @@ -499,7 +502,8 @@ wavpack_streamdecode(struct decoder * decoder, struct input_stream *is) struct wavpack_input isp, isp_wvc; bool can_seek = is->seekable; - is_wvc = wavpack_open_wvc(decoder, is->uri, &isp_wvc); + is_wvc = wavpack_open_wvc(decoder, is->uri, is->mutex, is->cond, + &isp_wvc); if (is_wvc != NULL) { open_flags |= OPEN_WVC; can_seek &= is_wvc->seekable; diff --git a/src/decoder_api.c b/src/decoder_api.c index 20e976e69..1f2075638 100644 --- a/src/decoder_api.c +++ b/src/decoder_api.c @@ -183,8 +183,19 @@ size_t decoder_read(struct decoder *decoder, if (length == 0) return 0; - if (decoder_check_cancel_read(decoder)) - return 0; + input_stream_lock(is); + + while (true) { + if (decoder_check_cancel_read(decoder)) { + input_stream_unlock(is); + return 0; + } + + if (input_stream_available(is)) + break; + + g_cond_wait(is->cond, is->mutex); + } nbytes = input_stream_read(is, buffer, length, &error); assert(nbytes == 0 || error == NULL); @@ -195,6 +206,8 @@ size_t decoder_read(struct decoder *decoder, g_error_free(error); } + input_stream_unlock(is); + return nbytes; } @@ -241,7 +254,7 @@ update_stream_tag(struct decoder *decoder, struct input_stream *is) struct tag *tag; tag = is != NULL - ? input_stream_tag(is) + ? input_stream_lock_tag(is) : NULL; if (tag == NULL) { tag = decoder->song_tag; diff --git a/src/decoder_thread.c b/src/decoder_thread.c index 320a04638..dff4ca08f 100644 --- a/src/decoder_thread.c +++ b/src/decoder_thread.c @@ -41,18 +41,6 @@ #undef G_LOG_DOMAIN #define G_LOG_DOMAIN "decoder_thread" -static enum decoder_command -decoder_lock_get_command(struct decoder_control *dc) -{ - enum decoder_command command; - - decoder_lock(dc); - command = dc->command; - decoder_unlock(dc); - - return command; -} - /** * Marks the current decoder command as "finished" and notifies the * player thread. @@ -86,7 +74,7 @@ decoder_input_stream_open(struct decoder_control *dc, const char *uri) GError *error = NULL; struct input_stream *is; - is = input_stream_open(uri, &error); + is = input_stream_open(uri, dc->mutex, dc->cond, &error); if (is == NULL) { if (error != NULL) { g_warning("%s", error->message); @@ -99,19 +87,27 @@ decoder_input_stream_open(struct decoder_control *dc, const char *uri) /* wait for the input stream to become ready; its metadata will be available then */ + decoder_lock(dc); + + input_stream_update(is); while (!is->ready && - decoder_lock_get_command(dc) != DECODE_COMMAND_STOP) { - int ret; + dc->command != DECODE_COMMAND_STOP) { + decoder_wait(dc); - ret = input_stream_buffer(is, &error); - if (ret < 0) { - input_stream_close(is); - g_warning("%s", error->message); - g_error_free(error); - return NULL; - } + input_stream_update(is); } + if (!input_stream_check(is, &error)) { + decoder_unlock(dc); + + g_warning("%s", error->message); + g_error_free(error); + + return NULL; + } + + decoder_unlock(dc); + return is; } @@ -132,11 +128,11 @@ decoder_stream_decode(const struct decoder_plugin *plugin, if (decoder->dc->command == DECODE_COMMAND_STOP) return true; - decoder_unlock(decoder->dc); - /* rewind the stream, so each plugin gets a fresh start */ input_stream_seek(input_stream, 0, SEEK_SET, NULL); + decoder_unlock(decoder->dc); + decoder_plugin_stream_decode(plugin, decoder, input_stream); decoder_lock(decoder->dc); |