aboutsummaryrefslogtreecommitdiffstats
path: root/src/decoder
diff options
context:
space:
mode:
authorMax Kellermann <max@duempel.org>2011-09-14 21:46:41 +0200
committerMax Kellermann <max@duempel.org>2011-09-16 21:22:13 +0200
commit754f26a97c816781e80500d98f2515ae97836145 (patch)
treecaa7dbaa879b29d018a4559524390670ad33a605 /src/decoder
parent29241c4f835797f635816a9f37528aa981f722b5 (diff)
downloadmpd-754f26a97c816781e80500d98f2515ae97836145.tar.gz
mpd-754f26a97c816781e80500d98f2515ae97836145.tar.xz
mpd-754f26a97c816781e80500d98f2515ae97836145.zip
input_stream: non-blocking I/O
Add GMutex, GCond attributes which will be used by callers to conditionally wait on the stream. Remove the (now-useless) plugin method buffer(), wait on GCond instead. Lock the input_stream before each method call. Do the same with the playlist plugins.
Diffstat (limited to '')
-rw-r--r--src/decoder/audiofile_decoder_plugin.c4
-rw-r--r--src/decoder/faad_decoder_plugin.c4
-rw-r--r--src/decoder/ffmpeg_decoder_plugin.c5
-rw-r--r--src/decoder/flac_decoder_plugin.c9
-rw-r--r--src/decoder/mad_decoder_plugin.c2
-rw-r--r--src/decoder/modplug_decoder_plugin.c2
-rw-r--r--src/decoder/mp4ff_decoder_plugin.c3
-rw-r--r--src/decoder/mpcdec_decoder_plugin.c2
-rw-r--r--src/decoder/pcm_decoder_plugin.c5
-rw-r--r--src/decoder/sndfile_decoder_plugin.c4
-rw-r--r--src/decoder/vorbis_decoder_plugin.c4
-rw-r--r--src/decoder/wavpack_decoder_plugin.c12
-rw-r--r--src/decoder_api.c19
-rw-r--r--src/decoder_thread.c44
14 files changed, 68 insertions, 51 deletions
diff --git a/src/decoder/audiofile_decoder_plugin.c b/src/decoder/audiofile_decoder_plugin.c
index c862168f8..0cf54bc60 100644
--- a/src/decoder/audiofile_decoder_plugin.c
+++ b/src/decoder/audiofile_decoder_plugin.c
@@ -53,7 +53,7 @@ audiofile_file_read(AFvirtualfile *vfile, void *data, size_t length)
GError *error = NULL;
size_t nbytes;
- nbytes = input_stream_read(is, data, length, &error);
+ nbytes = input_stream_lock_read(is, data, length, &error);
if (nbytes == 0 && error != NULL) {
g_warning("%s", error->message);
g_error_free(error);
@@ -90,7 +90,7 @@ audiofile_file_seek(AFvirtualfile *vfile, long offset, int is_relative)
{
struct input_stream *is = (struct input_stream *) vfile->closure;
int whence = (is_relative ? SEEK_CUR : SEEK_SET);
- if (input_stream_seek(is, offset, whence, NULL)) {
+ if (input_stream_lock_seek(is, offset, whence, NULL)) {
return is->offset;
} else {
return -1;
diff --git a/src/decoder/faad_decoder_plugin.c b/src/decoder/faad_decoder_plugin.c
index 02c72a4a1..91aa5392a 100644
--- a/src/decoder/faad_decoder_plugin.c
+++ b/src/decoder/faad_decoder_plugin.c
@@ -205,7 +205,7 @@ faad_song_duration(struct decoder_buffer *buffer, struct input_stream *is)
/* obtain the duration from the ADTS header */
float song_length = adts_song_duration(buffer);
- input_stream_seek(is, tagsize, SEEK_SET, NULL);
+ input_stream_lock_seek(is, tagsize, SEEK_SET, NULL);
data = decoder_buffer_read(buffer, &length);
if (data != NULL)
@@ -406,7 +406,7 @@ faad_stream_decode(struct decoder *mpd_decoder, struct input_stream *is)
faacDecSetConfiguration(decoder, config);
while (!decoder_buffer_is_full(buffer) &&
- !input_stream_eof(is) &&
+ !input_stream_lock_eof(is) &&
decoder_get_command(mpd_decoder) == DECODE_COMMAND_NONE) {
adts_find_frame(buffer);
decoder_buffer_fill(buffer);
diff --git a/src/decoder/ffmpeg_decoder_plugin.c b/src/decoder/ffmpeg_decoder_plugin.c
index b4f1f0b51..2923c1400 100644
--- a/src/decoder/ffmpeg_decoder_plugin.c
+++ b/src/decoder/ffmpeg_decoder_plugin.c
@@ -105,7 +105,7 @@ mpd_ffmpeg_stream_seek(void *opaque, int64_t pos, int whence)
if (whence == AVSEEK_SIZE)
return stream->input->size;
- if (!input_stream_seek(stream->input, pos, whence, NULL))
+ if (!input_stream_lock_seek(stream->input, pos, whence, NULL))
return -1;
return stream->input->offset;
@@ -320,7 +320,8 @@ ffmpeg_probe(struct decoder *decoder, struct input_stream *is)
unsigned char *buffer = g_malloc(BUFFER_SIZE);
size_t nbytes = decoder_read(decoder, is, buffer, BUFFER_SIZE);
- if (nbytes <= PADDING || !input_stream_seek(is, 0, SEEK_SET, NULL)) {
+ if (nbytes <= PADDING ||
+ !input_stream_lock_seek(is, 0, SEEK_SET, NULL)) {
g_free(buffer);
return NULL;
}
diff --git a/src/decoder/flac_decoder_plugin.c b/src/decoder/flac_decoder_plugin.c
index ca9cd5968..2a826f862 100644
--- a/src/decoder/flac_decoder_plugin.c
+++ b/src/decoder/flac_decoder_plugin.c
@@ -50,7 +50,7 @@ flac_read_cb(G_GNUC_UNUSED const FLAC__StreamDecoder *fd,
if (r == 0) {
if (decoder_get_command(data->decoder) != DECODE_COMMAND_NONE ||
- input_stream_eof(data->input_stream))
+ input_stream_lock_eof(data->input_stream))
return FLAC__STREAM_DECODER_READ_STATUS_END_OF_STREAM;
else
return FLAC__STREAM_DECODER_READ_STATUS_ABORT;
@@ -68,7 +68,8 @@ flac_seek_cb(G_GNUC_UNUSED const FLAC__StreamDecoder *fd,
if (!data->input_stream->seekable)
return FLAC__STREAM_DECODER_SEEK_STATUS_UNSUPPORTED;
- if (!input_stream_seek(data->input_stream, offset, SEEK_SET, NULL))
+ if (!input_stream_lock_seek(data->input_stream, offset, SEEK_SET,
+ NULL))
return FLAC__STREAM_DECODER_SEEK_STATUS_ERROR;
return FLAC__STREAM_DECODER_SEEK_STATUS_OK;
@@ -109,7 +110,7 @@ flac_eof_cb(G_GNUC_UNUSED const FLAC__StreamDecoder *fd, void *fdata)
return (decoder_get_command(data->decoder) != DECODE_COMMAND_NONE &&
decoder_get_command(data->decoder) != DECODE_COMMAND_SEEK) ||
- input_stream_eof(data->input_stream);
+ input_stream_lock_eof(data->input_stream);
}
static void
@@ -449,7 +450,7 @@ oggflac_decode(struct decoder *decoder, struct input_stream *input_stream)
/* rewind the stream, because ogg_stream_type_detect() has
moved it */
- input_stream_seek(input_stream, 0, SEEK_SET, NULL);
+ input_stream_lock_seek(input_stream, 0, SEEK_SET, NULL);
flac_decode_internal(decoder, input_stream, true);
}
diff --git a/src/decoder/mad_decoder_plugin.c b/src/decoder/mad_decoder_plugin.c
index 8f77052f7..8bf3f6546 100644
--- a/src/decoder/mad_decoder_plugin.c
+++ b/src/decoder/mad_decoder_plugin.c
@@ -168,7 +168,7 @@ mp3_data_init(struct mp3_data *data, struct decoder *decoder,
static bool mp3_seek(struct mp3_data *data, long offset)
{
- if (!input_stream_seek(data->input_stream, offset, SEEK_SET, NULL))
+ if (!input_stream_lock_seek(data->input_stream, offset, SEEK_SET, NULL))
return false;
mad_stream_buffer(&data->stream, data->input_buffer, 0);
diff --git a/src/decoder/modplug_decoder_plugin.c b/src/decoder/modplug_decoder_plugin.c
index 341b00927..9345dd240 100644
--- a/src/decoder/modplug_decoder_plugin.c
+++ b/src/decoder/modplug_decoder_plugin.c
@@ -62,7 +62,7 @@ static GByteArray *mod_loadfile(struct decoder *decoder, struct input_stream *is
while (true) {
ret = decoder_read(decoder, is, data, MODPLUG_READ_BLOCK);
if (ret == 0) {
- if (input_stream_eof(is))
+ if (input_stream_lock_eof(is))
/* end of file */
break;
diff --git a/src/decoder/mp4ff_decoder_plugin.c b/src/decoder/mp4ff_decoder_plugin.c
index 38ae5793a..6475211a4 100644
--- a/src/decoder/mp4ff_decoder_plugin.c
+++ b/src/decoder/mp4ff_decoder_plugin.c
@@ -102,7 +102,8 @@ mp4_seek(void *user_data, uint64_t position)
{
struct mp4ff_input_stream *mis = user_data;
- return input_stream_seek(mis->input_stream, position, SEEK_SET, NULL)
+ return input_stream_lock_seek(mis->input_stream, position, SEEK_SET,
+ NULL)
? 0 : -1;
}
diff --git a/src/decoder/mpcdec_decoder_plugin.c b/src/decoder/mpcdec_decoder_plugin.c
index f31dcdb99..7864c0790 100644
--- a/src/decoder/mpcdec_decoder_plugin.c
+++ b/src/decoder/mpcdec_decoder_plugin.c
@@ -61,7 +61,7 @@ mpc_seek_cb(cb_first_arg, mpc_int32_t offset)
{
struct mpc_decoder_data *data = (struct mpc_decoder_data *) cb_data;
- return input_stream_seek(data->is, offset, SEEK_SET, NULL);
+ return input_stream_lock_seek(data->is, offset, SEEK_SET, NULL);
}
static mpc_int32_t
diff --git a/src/decoder/pcm_decoder_plugin.c b/src/decoder/pcm_decoder_plugin.c
index c8340ab67..24ad93cb6 100644
--- a/src/decoder/pcm_decoder_plugin.c
+++ b/src/decoder/pcm_decoder_plugin.c
@@ -52,7 +52,7 @@ pcm_stream_decode(struct decoder *decoder, struct input_stream *is)
size_t nbytes = decoder_read(decoder, is,
buffer, sizeof(buffer));
- if (nbytes == 0 && input_stream_eof(is))
+ if (nbytes == 0 && input_stream_lock_eof(is))
break;
cmd = nbytes > 0
@@ -62,7 +62,8 @@ pcm_stream_decode(struct decoder *decoder, struct input_stream *is)
if (cmd == DECODE_COMMAND_SEEK) {
goffset offset = (goffset)(time_to_size *
decoder_seek_where(decoder));
- if (input_stream_seek(is, offset, SEEK_SET, &error)) {
+ if (input_stream_lock_seek(is, offset, SEEK_SET,
+ &error)) {
decoder_command_finished(decoder);
} else {
g_warning("seeking failed: %s", error->message);
diff --git a/src/decoder/sndfile_decoder_plugin.c b/src/decoder/sndfile_decoder_plugin.c
index dbe9bf067..25952dfd5 100644
--- a/src/decoder/sndfile_decoder_plugin.c
+++ b/src/decoder/sndfile_decoder_plugin.c
@@ -40,7 +40,7 @@ sndfile_vio_seek(sf_count_t offset, int whence, void *user_data)
struct input_stream *is = user_data;
bool success;
- success = input_stream_seek(is, offset, whence, NULL);
+ success = input_stream_lock_seek(is, offset, whence, NULL);
if (!success)
return -1;
@@ -54,7 +54,7 @@ sndfile_vio_read(void *ptr, sf_count_t count, void *user_data)
GError *error = NULL;
size_t nbytes;
- nbytes = input_stream_read(is, ptr, count, &error);
+ nbytes = input_stream_lock_read(is, ptr, count, &error);
if (nbytes == 0 && error != NULL) {
g_warning("%s", error->message);
g_error_free(error);
diff --git a/src/decoder/vorbis_decoder_plugin.c b/src/decoder/vorbis_decoder_plugin.c
index c130005a7..f73a41895 100644
--- a/src/decoder/vorbis_decoder_plugin.c
+++ b/src/decoder/vorbis_decoder_plugin.c
@@ -80,7 +80,7 @@ static int ogg_seek_cb(void *data, ogg_int64_t offset, int whence)
return vis->seekable &&
(!vis->decoder || decoder_get_command(vis->decoder) != DECODE_COMMAND_STOP) &&
- input_stream_seek(vis->input_stream, offset, whence, NULL)
+ input_stream_lock_seek(vis->input_stream, offset, whence, NULL)
? 0 : -1;
}
@@ -290,7 +290,7 @@ vorbis_stream_decode(struct decoder *decoder,
/* rewind the stream, because ogg_stream_type_detect() has
moved it */
- input_stream_seek(input_stream, 0, SEEK_SET, NULL);
+ input_stream_lock_seek(input_stream, 0, SEEK_SET, NULL);
if (!vorbis_is_open(&vis, &vf, decoder, input_stream))
return;
diff --git a/src/decoder/wavpack_decoder_plugin.c b/src/decoder/wavpack_decoder_plugin.c
index 200bf6455..bfb13b2a4 100644
--- a/src/decoder/wavpack_decoder_plugin.c
+++ b/src/decoder/wavpack_decoder_plugin.c
@@ -390,13 +390,15 @@ wavpack_input_get_pos(void *id)
static int
wavpack_input_set_pos_abs(void *id, uint32_t pos)
{
- return input_stream_seek(wpin(id)->is, pos, SEEK_SET, NULL) ? 0 : -1;
+ return input_stream_lock_seek(wpin(id)->is, pos, SEEK_SET, NULL)
+ ? 0 : -1;
}
static int
wavpack_input_set_pos_rel(void *id, int32_t delta, int mode)
{
- return input_stream_seek(wpin(id)->is, delta, mode, NULL) ? 0 : -1;
+ return input_stream_lock_seek(wpin(id)->is, delta, mode, NULL)
+ ? 0 : -1;
}
static int
@@ -447,6 +449,7 @@ wavpack_input_init(struct wavpack_input *isp, struct decoder *decoder,
static struct input_stream *
wavpack_open_wvc(struct decoder *decoder, const char *uri,
+ GMutex *mutex, GCond *cond,
struct wavpack_input *wpi)
{
struct input_stream *is_wvc;
@@ -462,7 +465,7 @@ wavpack_open_wvc(struct decoder *decoder, const char *uri,
return false;
wvc_url = g_strconcat(uri, "c", NULL);
- is_wvc = input_stream_open(wvc_url, NULL);
+ is_wvc = input_stream_open(wvc_url, mutex, cond, NULL);
g_free(wvc_url);
if (is_wvc == NULL)
@@ -499,7 +502,8 @@ wavpack_streamdecode(struct decoder * decoder, struct input_stream *is)
struct wavpack_input isp, isp_wvc;
bool can_seek = is->seekable;
- is_wvc = wavpack_open_wvc(decoder, is->uri, &isp_wvc);
+ is_wvc = wavpack_open_wvc(decoder, is->uri, is->mutex, is->cond,
+ &isp_wvc);
if (is_wvc != NULL) {
open_flags |= OPEN_WVC;
can_seek &= is_wvc->seekable;
diff --git a/src/decoder_api.c b/src/decoder_api.c
index 20e976e69..1f2075638 100644
--- a/src/decoder_api.c
+++ b/src/decoder_api.c
@@ -183,8 +183,19 @@ size_t decoder_read(struct decoder *decoder,
if (length == 0)
return 0;
- if (decoder_check_cancel_read(decoder))
- return 0;
+ input_stream_lock(is);
+
+ while (true) {
+ if (decoder_check_cancel_read(decoder)) {
+ input_stream_unlock(is);
+ return 0;
+ }
+
+ if (input_stream_available(is))
+ break;
+
+ g_cond_wait(is->cond, is->mutex);
+ }
nbytes = input_stream_read(is, buffer, length, &error);
assert(nbytes == 0 || error == NULL);
@@ -195,6 +206,8 @@ size_t decoder_read(struct decoder *decoder,
g_error_free(error);
}
+ input_stream_unlock(is);
+
return nbytes;
}
@@ -241,7 +254,7 @@ update_stream_tag(struct decoder *decoder, struct input_stream *is)
struct tag *tag;
tag = is != NULL
- ? input_stream_tag(is)
+ ? input_stream_lock_tag(is)
: NULL;
if (tag == NULL) {
tag = decoder->song_tag;
diff --git a/src/decoder_thread.c b/src/decoder_thread.c
index 320a04638..dff4ca08f 100644
--- a/src/decoder_thread.c
+++ b/src/decoder_thread.c
@@ -41,18 +41,6 @@
#undef G_LOG_DOMAIN
#define G_LOG_DOMAIN "decoder_thread"
-static enum decoder_command
-decoder_lock_get_command(struct decoder_control *dc)
-{
- enum decoder_command command;
-
- decoder_lock(dc);
- command = dc->command;
- decoder_unlock(dc);
-
- return command;
-}
-
/**
* Marks the current decoder command as "finished" and notifies the
* player thread.
@@ -86,7 +74,7 @@ decoder_input_stream_open(struct decoder_control *dc, const char *uri)
GError *error = NULL;
struct input_stream *is;
- is = input_stream_open(uri, &error);
+ is = input_stream_open(uri, dc->mutex, dc->cond, &error);
if (is == NULL) {
if (error != NULL) {
g_warning("%s", error->message);
@@ -99,19 +87,27 @@ decoder_input_stream_open(struct decoder_control *dc, const char *uri)
/* wait for the input stream to become ready; its metadata
will be available then */
+ decoder_lock(dc);
+
+ input_stream_update(is);
while (!is->ready &&
- decoder_lock_get_command(dc) != DECODE_COMMAND_STOP) {
- int ret;
+ dc->command != DECODE_COMMAND_STOP) {
+ decoder_wait(dc);
- ret = input_stream_buffer(is, &error);
- if (ret < 0) {
- input_stream_close(is);
- g_warning("%s", error->message);
- g_error_free(error);
- return NULL;
- }
+ input_stream_update(is);
}
+ if (!input_stream_check(is, &error)) {
+ decoder_unlock(dc);
+
+ g_warning("%s", error->message);
+ g_error_free(error);
+
+ return NULL;
+ }
+
+ decoder_unlock(dc);
+
return is;
}
@@ -132,11 +128,11 @@ decoder_stream_decode(const struct decoder_plugin *plugin,
if (decoder->dc->command == DECODE_COMMAND_STOP)
return true;
- decoder_unlock(decoder->dc);
-
/* rewind the stream, so each plugin gets a fresh start */
input_stream_seek(input_stream, 0, SEEK_SET, NULL);
+ decoder_unlock(decoder->dc);
+
decoder_plugin_stream_decode(plugin, decoder, input_stream);
decoder_lock(decoder->dc);