From e28a0e97b5d2e54684c6452d6d45f64ff1e542d9 Mon Sep 17 00:00:00 2001 From: Max Kellermann Date: Thu, 13 Aug 2009 23:33:46 +0200 Subject: decoder_control: protect command, state with a mutex Replace decoder_control.notify with decoder_control.mutex and decoder_control.cond. Lock the mutex on all accesses to decoder_control.command and decoder_control.state. --- src/decoder_api.c | 19 +++++++--- src/decoder_control.c | 52 +++++++++++++++++++++------ src/decoder_control.h | 97 +++++++++++++++++++++++++++++++++++++++++++++++--- src/decoder_internal.c | 27 ++++++++++++-- src/decoder_thread.c | 47 ++++++++++++++++++++++-- src/player_thread.c | 30 ++++++++++++---- 6 files changed, 241 insertions(+), 31 deletions(-) diff --git a/src/decoder_api.c b/src/decoder_api.c index 7f66c881e..4cff9916c 100644 --- a/src/decoder_api.c +++ b/src/decoder_api.c @@ -57,7 +57,10 @@ void decoder_initialized(G_GNUC_UNUSED struct decoder * decoder, dc.seekable = seekable; dc.total_time = total_time; + decoder_lock(); dc.state = DECODE_STATE_DECODE; + decoder_unlock(); + notify_signal(&pc.notify); g_debug("audio_format=%u:%u:%u, seekable=%s", @@ -88,6 +91,8 @@ enum decoder_command decoder_get_command(G_GNUC_UNUSED struct decoder * decoder) void decoder_command_finished(G_GNUC_UNUSED struct decoder * decoder) { + decoder_lock(); + assert(dc.command != DECODE_COMMAND_NONE); assert(dc.command != DECODE_COMMAND_SEEK || dc.seek_error || decoder->seeking); @@ -105,6 +110,8 @@ void decoder_command_finished(G_GNUC_UNUSED struct decoder * decoder) } dc.command = DECODE_COMMAND_NONE; + decoder_unlock(); + notify_signal(&pc.notify); } @@ -226,21 +233,23 @@ decoder_data(struct decoder *decoder, { const char *data = _data; GError *error = NULL; + enum decoder_command cmd; assert(dc.state == DECODE_STATE_DECODE); assert(dc.pipe != NULL); assert(length % audio_format_frame_size(&dc.in_audio_format) == 0); - if (dc.command == DECODE_COMMAND_STOP || - dc.command == DECODE_COMMAND_SEEK || + decoder_lock(); + cmd = dc.command; + decoder_unlock(); + + if (cmd == DECODE_COMMAND_STOP || cmd == DECODE_COMMAND_SEEK || length == 0) - return dc.command; + return cmd; /* send stream tags */ if (update_stream_tag(decoder, is)) { - enum decoder_command cmd; - if (decoder->decoder_tag != NULL) { /* merge with tag from decoder plugin */ struct tag *tag; diff --git a/src/decoder_control.c b/src/decoder_control.c index 44bb63e15..3b993431c 100644 --- a/src/decoder_control.c +++ b/src/decoder_control.c @@ -18,6 +18,7 @@ */ #include "decoder_control.h" +#include "notify.h" #include @@ -25,36 +26,63 @@ struct decoder_control dc; void dc_init(void) { - notify_init(&dc.notify); + dc.mutex = g_mutex_new(); + dc.cond = g_cond_new(); + dc.state = DECODE_STATE_STOP; dc.command = DECODE_COMMAND_NONE; } void dc_deinit(void) { - notify_deinit(&dc.notify); + g_cond_free(dc.cond); + g_mutex_free(dc.mutex); } -void -dc_command_wait(struct notify *notify) +static void +dc_command_wait_locked(struct notify *notify) { while (dc.command != DECODE_COMMAND_NONE) { - notify_signal(&dc.notify); + decoder_signal(); + decoder_unlock(); + notify_wait(notify); + + decoder_lock(); } } +void +dc_command_wait(struct notify *notify) +{ + decoder_lock(); + dc_command_wait_locked(notify); + decoder_unlock(); +} + static void -dc_command(struct notify *notify, enum decoder_command cmd) +dc_command_locked(struct notify *notify, enum decoder_command cmd) { dc.command = cmd; - dc_command_wait(notify); + dc_command_wait_locked(notify); +} + +static void +dc_command(struct notify *notify, enum decoder_command cmd) +{ + decoder_lock(); + dc_command_locked(notify, cmd); + decoder_unlock(); } static void dc_command_async(enum decoder_command cmd) { + decoder_lock(); + dc.command = cmd; - notify_signal(&dc.notify); + decoder_signal(); + + decoder_unlock(); } void @@ -80,15 +108,19 @@ dc_start_async(struct song *song) void dc_stop(struct notify *notify) { + decoder_lock(); + if (dc.command != DECODE_COMMAND_NONE) /* Attempt to cancel the current command. If it's too late and the decoder thread is already executing the old command, we'll call STOP again in this function (see below). */ - dc_command(notify, DECODE_COMMAND_STOP); + dc_command_locked(notify, DECODE_COMMAND_STOP); if (dc.state != DECODE_STATE_STOP && dc.state != DECODE_STATE_ERROR) - dc_command(notify, DECODE_COMMAND_STOP); + dc_command_locked(notify, DECODE_COMMAND_STOP); + + decoder_unlock(); } bool diff --git a/src/decoder_control.h b/src/decoder_control.h index 703ea256c..7e861f970 100644 --- a/src/decoder_control.h +++ b/src/decoder_control.h @@ -22,13 +22,16 @@ #include "decoder_command.h" #include "audio_format.h" -#include "notify.h" + +#include #include #define DECODE_TYPE_FILE 0 #define DECODE_TYPE_URL 1 +struct notify; + enum decoder_state { DECODE_STATE_STOP = 0, DECODE_STATE_START, @@ -48,14 +51,25 @@ struct decoder_control { thread isn't running */ GThread *thread; - struct notify notify; + /** + * This lock protects #state and #command. + */ + GMutex *mutex; + + /** + * Trigger this object after you have modified #command. This + * is also used by the decoder thread to notify the caller + * when it has finished a command. + */ + GCond *cond; + + enum decoder_state state; + enum decoder_command command; - volatile enum decoder_state state; - volatile enum decoder_command command; bool quit; bool seek_error; bool seekable; - volatile double seek_where; + double seek_where; /** the format of the song file */ struct audio_format in_audio_format; @@ -80,6 +94,46 @@ void dc_init(void); void dc_deinit(void); +/** + * Locks the #decoder_control object. + */ +static inline void +decoder_lock(void) +{ + g_mutex_lock(dc.mutex); +} + +/** + * Unlocks the #decoder_control object. + */ +static inline void +decoder_unlock(void) +{ + g_mutex_unlock(dc.mutex); +} + +/** + * Waits for a signal on the #decoder_control object. This function + * is only valid in the decoder thread. The object must be locked + * prior to calling this function. + */ +static inline void +decoder_wait(void) +{ + g_cond_wait(dc.cond, dc.mutex); +} + +/** + * Signals the #decoder_control object. This function is only valid + * in the player thread. The object should be locked prior to calling + * this function. + */ +static inline void +decoder_signal(void) +{ + g_cond_signal(dc.cond); +} + static inline bool decoder_is_idle(void) { return (dc.state == DECODE_STATE_STOP || @@ -100,6 +154,39 @@ static inline bool decoder_has_failed(void) return dc.state == DECODE_STATE_ERROR; } +static inline bool decoder_lock_is_idle(void) +{ + bool ret; + + decoder_lock(); + ret = decoder_is_idle(); + decoder_unlock(); + + return ret; +} + +static inline bool decoder_lock_is_starting(void) +{ + bool ret; + + decoder_lock(); + ret = decoder_is_starting(); + decoder_unlock(); + + return ret; +} + +static inline bool decoder_lock_has_failed(void) +{ + bool ret; + + decoder_lock(); + ret = decoder_has_failed(); + decoder_unlock(); + + return ret; +} + static inline struct song * decoder_current_song(void) { diff --git a/src/decoder_internal.c b/src/decoder_internal.c index 4a56fa5f3..1b064d0aa 100644 --- a/src/decoder_internal.c +++ b/src/decoder_internal.c @@ -27,6 +27,24 @@ #include +/** + * This is a wrapper for input_stream_buffer(). It assumes that the + * decoder is currently locked, and temporarily unlocks it while + * calling input_stream_buffer(). We shouldn't hold the lock during a + * potentially blocking operation. + */ +static int +decoder_input_buffer(struct input_stream *is) +{ + int ret; + + decoder_unlock(); + ret = input_stream_buffer(is) > 0; + decoder_lock(); + + return ret; +} + /** * All chunks are full of decoded data; wait for the player to free * one. @@ -38,9 +56,12 @@ need_chunks(struct input_stream *is, bool do_wait) dc.command == DECODE_COMMAND_SEEK) return dc.command; - if ((is == NULL || input_stream_buffer(is) <= 0) && do_wait) { - notify_wait(&dc.notify); + if ((is == NULL || decoder_input_buffer(is) <= 0) && do_wait) { + decoder_wait(); + + decoder_unlock(); notify_signal(&pc.notify); + decoder_lock(); return dc.command; } @@ -63,7 +84,9 @@ decoder_get_chunk(struct decoder *decoder, struct input_stream *is) if (decoder->chunk != NULL) return decoder->chunk; + decoder_lock(); cmd = need_chunks(is, true); + decoder_unlock(); } while (cmd == DECODE_COMMAND_NONE); return NULL; diff --git a/src/decoder_thread.c b/src/decoder_thread.c index 2b1a6299a..be37896c1 100644 --- a/src/decoder_thread.c +++ b/src/decoder_thread.c @@ -49,11 +49,15 @@ decoder_stream_decode(const struct decoder_plugin *plugin, assert(input_stream->ready); assert(dc.state == DECODE_STATE_START); + decoder_unlock(); + /* rewind the stream, so each plugin gets a fresh start */ input_stream_seek(input_stream, 0, SEEK_SET); decoder_plugin_stream_decode(plugin, decoder, input_stream); + decoder_lock(); + assert(dc.state == DECODE_STATE_START || dc.state == DECODE_STATE_DECODE); @@ -73,8 +77,12 @@ decoder_file_decode(const struct decoder_plugin *plugin, assert(path[0] == '/'); assert(dc.state == DECODE_STATE_START); + decoder_unlock(); + decoder_plugin_file_decode(plugin, decoder, path); + decoder_lock(); + assert(dc.state == DECODE_STATE_START || dc.state == DECODE_STATE_DECODE); @@ -103,28 +111,40 @@ static void decoder_run_song(const struct song *song, const char *uri) dc.state = DECODE_STATE_START; dc.command = DECODE_COMMAND_NONE; + + decoder_unlock(); notify_signal(&pc.notify); + decoder_lock(); /* wait for the input stream to become ready; its metadata will be available then */ while (!input_stream.ready) { if (dc.command == DECODE_COMMAND_STOP) { + decoder_unlock(); input_stream_close(&input_stream); + decoder_lock(); dc.state = DECODE_STATE_STOP; return; } + decoder_unlock(); ret = input_stream_buffer(&input_stream); if (ret < 0) { input_stream_close(&input_stream); + decoder_lock(); dc.state = DECODE_STATE_ERROR; return; } + + decoder_lock(); } if (dc.command == DECODE_COMMAND_STOP) { + decoder_unlock(); input_stream_close(&input_stream); + decoder_lock(); + dc.state = DECODE_STATE_STOP; return; } @@ -179,7 +199,10 @@ static void decoder_run_song(const struct song *song, const char *uri) const char *s = uri_get_suffix(uri); while ((plugin = decoder_plugin_from_suffix(s, next++))) { if (plugin->file_decode != NULL) { + decoder_unlock(); input_stream_close(&input_stream); + decoder_lock(); + close_instream = false; ret = decoder_file_decode(plugin, &decoder, uri); @@ -191,7 +214,13 @@ static void decoder_run_song(const struct song *song, const char *uri) been closed before decoder_file_decode() - reopen it */ - if (input_stream_open(&input_stream, uri)) + bool success; + + decoder_unlock(); + success = input_stream_open(&input_stream, uri); + decoder_lock(); + + if (success) close_instream = true; else continue; @@ -205,6 +234,8 @@ static void decoder_run_song(const struct song *song, const char *uri) } } + decoder_unlock(); + pcm_convert_deinit(&decoder.conv_state); /* flush the last chunk */ @@ -223,6 +254,8 @@ static void decoder_run_song(const struct song *song, const char *uri) if (decoder.decoder_tag != NULL) tag_free(decoder.decoder_tag); + decoder_lock(); + dc.state = ret ? DECODE_STATE_STOP : DECODE_STATE_ERROR; } @@ -249,6 +282,8 @@ static void decoder_run(void) static gpointer decoder_task(G_GNUC_UNUSED gpointer arg) { + decoder_lock(); + do { assert(dc.state == DECODE_STATE_STOP || dc.state == DECODE_STATE_ERROR); @@ -259,20 +294,28 @@ static gpointer decoder_task(G_GNUC_UNUSED gpointer arg) decoder_run(); dc.command = DECODE_COMMAND_NONE; + + decoder_unlock(); notify_signal(&pc.notify); + decoder_lock(); break; case DECODE_COMMAND_STOP: dc.command = DECODE_COMMAND_NONE; + + decoder_unlock(); notify_signal(&pc.notify); + decoder_lock(); break; case DECODE_COMMAND_NONE: - notify_wait(&dc.notify); + decoder_wait(); break; } } while (dc.command != DECODE_COMMAND_NONE || !dc.quit); + decoder_unlock(); + return NULL; } diff --git a/src/player_thread.c b/src/player_thread.c index f23faa8b3..b501937b6 100644 --- a/src/player_thread.c +++ b/src/player_thread.c @@ -135,7 +135,7 @@ player_wait_for_decoder(struct player *player) { dc_command_wait(&pc.notify); - if (decoder_has_failed()) { + if (decoder_lock_has_failed()) { assert(dc.next_song == NULL || dc.next_song->url != NULL); pc.errored_song = dc.next_song; pc.error = PLAYER_ERROR_FILE; @@ -174,10 +174,14 @@ player_check_decoder_startup(struct player *player) { assert(player->decoder_starting); + decoder_lock(); + if (decoder_has_failed()) { /* the decoder failed */ assert(dc.next_song == NULL || dc.next_song->url != NULL); + decoder_unlock(); + pc.errored_song = dc.next_song; pc.error = PLAYER_ERROR_FILE; @@ -185,6 +189,8 @@ player_check_decoder_startup(struct player *player) } else if (!decoder_is_starting()) { /* the decoder is ready and ok */ + decoder_unlock(); + if (audio_format_defined(&player->play_audio_format) && !audio_output_all_wait(1)) /* the output devices havn't finished playing @@ -219,6 +225,7 @@ player_check_decoder_startup(struct player *player) } else { /* the decoder is not yet ready; wait some more */ + decoder_unlock(); notify_wait(&pc.notify); return true; @@ -512,13 +519,20 @@ play_next_chunk(struct player *player) music_buffer_return(player_buffer, other_chunk); } else { /* there are not enough decoded chunks yet */ + + decoder_lock(); + if (decoder_is_idle()) { /* the decoder isn't running, abort cross fading */ + decoder_unlock(); + player->xfade = XFADE_DISABLED; } else { /* wait for the decoder */ - notify_signal(&dc.notify); + decoder_signal(); + decoder_unlock(); + notify_wait(&pc.notify); return true; @@ -549,10 +563,12 @@ play_next_chunk(struct player *player) /* this formula should prevent that the decoder gets woken up with each chunk; it is more efficient to make it decode a larger block at a time */ + decoder_lock(); if (!decoder_is_idle() && music_pipe_size(dc.pipe) <= (pc.buffered_before_play + music_buffer_size(player_buffer) * 3) / 4) - notify_signal(&dc.notify); + decoder_signal(); + decoder_unlock(); return true; } @@ -634,7 +650,7 @@ static void do_play(void) prevent stuttering on slow machines */ if (music_pipe_size(player.pipe) < pc.buffered_before_play && - !decoder_is_idle()) { + !decoder_lock_is_idle()) { /* not enough decoded buffer space yet */ if (!player.paused && @@ -669,7 +685,7 @@ static void do_play(void) */ #endif - if (decoder_is_idle() && player.queued) { + if (decoder_lock_is_idle() && player.queued) { /* the decoder has finished the current song; make it decode the next song */ assert(pc.next_song != NULL); @@ -682,7 +698,7 @@ static void do_play(void) if (dc.pipe != NULL && dc.pipe != player.pipe && player.xfade == XFADE_UNKNOWN && - !decoder_is_starting()) { + !decoder_lock_is_starting()) { /* enable cross fading in this song? if yes, calculate how many chunks will be required for it */ @@ -720,7 +736,7 @@ static void do_play(void) if (!player_song_border(&player)) break; - } else if (decoder_is_idle()) { + } else if (decoder_lock_is_idle()) { /* check the size of the pipe again, because the decoder thread may have added something since we last checked */ -- cgit v1.2.3