diff options
author | Eric Wong <normalperson@yhbt.net> | 2008-08-20 01:31:51 -0700 |
---|---|---|
committer | Eric Wong <normalperson@yhbt.net> | 2008-08-20 01:31:51 -0700 |
commit | 1b197e65232e1a51b853db53985e8eda61f1b196 (patch) | |
tree | 2d85ff67ba79e32256a20d2d2bee7222352007fd /src/outputBuffer.c | |
parent | 5a0216af3dc7c9dedc4dacb708191f0fd380bb73 (diff) | |
parent | 508ae1c18d3bdc99a1bb06181762e5ec859cf072 (diff) | |
download | mpd-1b197e65232e1a51b853db53985e8eda61f1b196.tar.gz mpd-1b197e65232e1a51b853db53985e8eda61f1b196.tar.xz mpd-1b197e65232e1a51b853db53985e8eda61f1b196.zip |
Merge branch 'core-rewrite' of git://git.musicpd.org/normalperson/mpd
* 'core-rewrite' of git://git.musicpd.org/normalperson/mpd:
Remove ob_wait_sync and cleanup triggering in playlist
fix output buffer deadlock when daemonizing
log.c: thread-safety for warning log
core rewrite (decode,player,outputBuffer,playlist)
Diffstat (limited to '')
-rw-r--r-- | src/outputBuffer.c | 631 |
1 files changed, 445 insertions, 186 deletions
diff --git a/src/outputBuffer.c b/src/outputBuffer.c index b0cfc00df..aca84d07f 100644 --- a/src/outputBuffer.c +++ b/src/outputBuffer.c @@ -20,252 +20,511 @@ #include "utils.h" #include "normalize.h" -#include "playerData.h" - -void ob_init(unsigned int size) +#include "ringbuf.h" +#include "condition.h" +#include "song.h" +#include "main_notify.h" +#include "player_error.h" +#include "log.h" +#include "action_status.h" + +/* typically have 2048-4096 of these structs, so pack tightly */ +struct ob_chunk { + mpd_uint16 len; /* 0: skip this chunk */ + mpd_uint16 bit_rate; + float time; + mpd_uint8 seq; /* see seq_ok() for explanation */ + char data[CHUNK_SIZE]; +}; + +enum ob_xfade_state { + XFADE_DISABLED = 0, + XFADE_ENABLED +}; + +static struct condition ob_action_cond = STATIC_COND_INITIALIZER; +static struct condition ob_halt_cond = STATIC_COND_INITIALIZER; +static struct condition ob_seq_cond = STATIC_COND_INITIALIZER; + +struct output_buffer { + struct ringbuf *index; /* index for chunks */ + struct ob_chunk *chunks; + size_t nr_bpp; /* nr (chunks) buffered before play */ + enum ob_state state; /* protected by ob_action_cond */ + enum ob_action action; /* protected by ob_action_cond */ + enum ob_xfade_state xfade_state; /* thread-internal */ + int sw_vol; + int bit_rate; + float total_time; + float elapsed_time; + AudioFormat audio_format; + size_t xfade_cur; + size_t xfade_max; + float xfade_time; + void *conv_buf; + size_t conv_buf_len; + pthread_t thread; + ConvState conv_state; + unsigned int seq_drop; + unsigned int seq_player; /* only gets changed by ob.thread */ + mpd_uint8 seq_decoder; /* only gets changed by dc.thread */ + struct ringbuf preseek_index; + enum ob_state preseek_state; + mpd_uint16 *preseek_len; +}; + +static struct output_buffer ob; + +#include "outputBuffer_xfade.h" +#include "outputBuffer_accessors.h" + +static enum action_status ob_do_stop(void); +static void stop_playback(void) { - assert(size > 0); - - memset(&ob.convState, 0, sizeof(ConvState)); - ob.chunks = xmalloc(size * sizeof(*ob.chunks)); - ob.size = size; - ob.begin = 0; - ob.end = 0; - ob.lazy = 0; - ob.chunks[0].chunkSize = 0; + assert(pthread_equal(pthread_self(), ob.thread)); + cond_enter(&ob_action_cond); + ob_do_stop(); + cond_leave(&ob_action_cond); } -void ob_free(void) +void ob_trigger_action(enum ob_action action) { - assert(ob.chunks != NULL); - free(ob.chunks); + /* + * This can be called by both dc.thread and main_thread, but only one + * action can be in progress at once. So we use this private mutex + * to protect against simultaneous invocations stepping over + * each other + */ + static pthread_mutex_t trigger_lock = PTHREAD_MUTEX_INITIALIZER; + pthread_mutex_lock(&trigger_lock); + DEBUG(__FILE__": %d action: %d\n", __LINE__, action); + assert(!pthread_equal(pthread_self(), ob.thread)); + DEBUG(__FILE__":%s %d\n", __func__, __LINE__); + cond_enter(&ob_action_cond); + DEBUG(__FILE__":%s %d\n", __func__, __LINE__); + assert(ob.action == OB_ACTION_NONE); + + if (pthread_equal(pthread_self(), dc.thread)) + assert(action == OB_ACTION_PLAY || + action == OB_ACTION_SEEK_START || + action == OB_ACTION_SEEK_FINISH); + else + assert(action != OB_ACTION_PLAY && + action != OB_ACTION_SEEK_START && + action != OB_ACTION_SEEK_FINISH); + ob.action = action; + do { + switch (ob.state) { + case OB_STATE_PAUSE: + case OB_STATE_STOP: + case OB_STATE_SEEK: + DEBUG(__FILE__":%s %d\n", __func__, __LINE__); + cond_signal_sync(&ob_halt_cond); + DEBUG(__FILE__":%s %d\n", __func__, __LINE__); + break; + default: break; + } + DEBUG(__FILE__":%s %d\n", __func__, __LINE__); + cond_wait(&ob_action_cond); + } while (ob.action != OB_ACTION_NONE); + DEBUG(__FILE__":%s %d\n", __func__, __LINE__); + assert(ob.action == OB_ACTION_NONE); + cond_leave(&ob_action_cond); + DEBUG(__FILE__":%s %d\n", __func__, __LINE__); + pthread_mutex_unlock(&trigger_lock); } -void ob_clear(void) +static enum action_status ob_finalize_action(void) { - ob.end = ob.begin; - ob.chunks[ob.end].chunkSize = 0; + assert(pthread_equal(pthread_self(), ob.thread)); + ob.action = OB_ACTION_NONE; + /* DEBUG(__FILE__ ":%s signaling %d\n", __func__, __LINE__); */ + cond_signal(&ob_action_cond); + cond_leave(&ob_action_cond); + return AS_COMPLETE; } -/** return the index of the chunk after i */ -static inline unsigned successor(unsigned i) +/* marks all buffered chunks with sequence number matching `seq' as invalid */ +static enum action_status ob_do_drop(void) { - assert(i <= ob.size); - - ++i; - return i == ob.size ? 0 : i; + struct iovec vec[2]; + long i; + unsigned int seq_drop; + + cond_enter(&ob_seq_cond); + seq_drop = ob.seq_drop; + /* drop the audio that we've already pushed to the device, too */ + if (seq_drop == ob.seq_player) + dropBufferedAudio(); + cond_leave(&ob_seq_cond); + + assert(pthread_equal(pthread_self(), ob.thread)); + for (i = (long)ringbuf_get_read_vector(ob.index, vec); --i >= 0; ) { + struct ob_chunk *c = get_chunk(vec, i); + assert(c); + if (c->seq == seq_drop) + c->len = 0; + } + return ob_finalize_action(); } -/** - * Mark the tail chunk as "full" and wake up the player if is waiting - * for the decoder. - */ -static void output_buffer_expand(unsigned i) +static void close_audio_devices(void); +static enum action_status ob_do_pause(void) { - int was_empty = !ob.lazy || ob_is_empty(); - - assert(i == (ob.end + 1) % ob.size); - assert(i != ob.end); - - ob.end = i; - ob.chunks[i].chunkSize = 0; - if (was_empty) - /* if the buffer was empty, the player thread might be - waiting for us; wake it up now that another decoded - buffer has become available. */ - decoder_wakeup_player(); + assert(pthread_equal(pthread_self(), ob.thread)); + ob.xfade_state = XFADE_DISABLED; + /* + * This will eventually set certain outputs (like shout) into 'pause' + * state where it'll just play silence instead of disconnecting + * listeners + */ + close_audio_devices(); + ob.state = OB_STATE_PAUSE; + return AS_INPROGRESS; } -void ob_flush(void) +static void reader_reset_buffer(void) { - ob_chunk *chunk = ob_get_chunk(ob.end); - - if (chunk->chunkSize > 0) { - unsigned int next = successor(ob.end); - if (next == ob.begin) - /* all buffers are full; we have to wait for - the player to free one, so don't flush - right now */ - return; - - output_buffer_expand(next); + struct iovec vec[2]; + size_t nr; + long i; + + assert(pthread_equal(pthread_self(), ob.thread)); + nr = ringbuf_get_read_vector(ob.index, vec); + for (i = nr; --i >= 0; ) { + struct ob_chunk *c = get_chunk(vec, i); + assert(c); + c->len = 0; } + ringbuf_read_advance(ob.index, nr); } -void ob_set_lazy(int lazy) +static void ob_seq_player_set(unsigned int seq_num) { - ob.lazy = lazy; + cond_enter(&ob_seq_cond); + ob.seq_player = seq_num; + cond_signal(&ob_seq_cond); + cond_leave(&ob_seq_cond); } -int ob_is_empty(void) +static enum action_status ob_do_reset(void) { - return ob.begin == ob.end; + assert(pthread_equal(pthread_self(), ob.thread)); + ob.elapsed_time = 0; + ob.total_time = 0; + reader_reset_buffer(); + ob.xfade_state = XFADE_DISABLED; + ob_seq_player_set((unsigned int)ob.seq_decoder); + return ob_finalize_action(); } -void ob_shift(void) +static enum action_status ob_do_stop(void) { - assert(ob.begin != ob.end); - assert(ob.begin < ob.size); - - ob.begin = successor(ob.begin); + assert(pthread_equal(pthread_self(), ob.thread)); + if (ob.state == OB_STATE_STOP) + return AS_INPROGRESS; + ob.state = OB_STATE_STOP; + return ob_do_reset(); } -unsigned int ob_relative(const unsigned i) +/* + * we need to reset the buffer *before* we seek because the decoder + * _may_ try to flush out the last remnants of the previously decoded audio, + * so we need to ensure there is space available for that + */ +static enum action_status ob_do_seek_start(void) { - if (i >= ob.begin) - return i - ob.begin; - else - return i + ob.size - ob.begin; + int i; + + assert(pthread_equal(pthread_self(), ob.thread)); + + /* preserve pre-seek ringbuf and state information */ + memcpy(&ob.preseek_index, ob.index, sizeof(struct ringbuf)); + for (i = ob.preseek_index.size; --i >= 0; ) + ob.preseek_len[i] = ob.chunks[i].len; + ob.preseek_state = ob.state; + ob.state = OB_STATE_SEEK; + reader_reset_buffer(); + return AS_INPROGRESS; } -unsigned ob_available(void) +static enum action_status ob_do_seek_finish(void) { - return ob_relative(ob.end); + assert(pthread_equal(pthread_self(), ob.thread)); + assert(ob.state == OB_STATE_SEEK); + ob.state = ob.preseek_state; + if (dc.seek_where < 0) { + int i; + assert(dc.seek_where == DC_SEEK_MISMATCH || + dc.seek_where == DC_SEEK_ERROR); + + /* restore the old ringbuf index if we failed to seek */ + memcpy(ob.index, &ob.preseek_index, sizeof(struct ringbuf)); + for (i = ob.preseek_index.size; --i >= 0; ) + ob.chunks[i].len = ob.preseek_len[i]; + } else { + assert(dc.seek_where >= 0); + ob.xfade_state = XFADE_DISABLED; + ob.elapsed_time = dc.seek_where; + ob.total_time = dc.total_time; + reader_reset_buffer(); + dropBufferedAudio(); + ob_seq_player_set((unsigned int)ob.seq_decoder); + } + return ob_finalize_action(); } -int ob_absolute(const unsigned relative) +static enum action_status ob_take_action(void) { - unsigned i, max; - - max = ob.end; - if (max < ob.begin) - max += ob.size; - i = (unsigned)ob.begin + relative; - if (i >= max) - return -1; + assert(pthread_equal(pthread_self(), ob.thread)); + if (mpd_likely(ob.action == OB_ACTION_NONE)) + return AS_COMPLETE; + DEBUG(__FILE__": %s %d\n", __func__, __LINE__); + cond_enter(&ob_action_cond); + DEBUG(__FILE__": %s %d action: %d\n", __func__, __LINE__, ob.action); + switch (ob.action) { + case OB_ACTION_NONE: return ob_finalize_action(); + case OB_ACTION_PLAY: ob.state = OB_STATE_PLAY; break; + case OB_ACTION_DROP: return ob_do_drop(); + case OB_ACTION_SEEK_START: return ob_do_seek_start(); + case OB_ACTION_SEEK_FINISH: return ob_do_seek_finish(); + case OB_ACTION_PAUSE_SET: + if (ob.state == OB_STATE_PLAY) + return ob_do_pause(); + ob.state = OB_STATE_PAUSE; + break; + case OB_ACTION_PAUSE_UNSET: + if (ob.state == OB_STATE_PAUSE) + ob.state = OB_STATE_PLAY; + break; + case OB_ACTION_PAUSE_FLIP: + switch (ob.state) { + case OB_STATE_PLAY: return ob_do_pause(); + case OB_STATE_PAUSE: ob.state = OB_STATE_PLAY; break; + default: break; + } + break; + case OB_ACTION_STOP: return ob_do_stop(); + case OB_ACTION_RESET: return ob_do_reset(); + case OB_ACTION_QUIT: + close_audio_devices(); + ob.state = OB_STATE_QUIT; + return AS_INPROGRESS; + } + return ob_finalize_action(); +} - if (i >= ob.size) - i -= ob.size; +/* + * looks up the chunk given by index `i', returns NULL if `i' is beyond + * the end of the buffer. This allows us to treat our chunks array + * like an infinite, rotating buffer. The first available chunk + * is always indexed as `0', the second one as `1', and so on... + */ +static struct ob_chunk *get_chunk(struct iovec vec[2], size_t i) +{ + if (vec[0].iov_len > i) + return &ob.chunks[vec[0].iov_base + i - ob.index->buf]; + if (vec[1].iov_base) { + assert(vec[0].iov_len > 0); + i -= vec[0].iov_len; + if (vec[1].iov_len > i) + return &ob.chunks[vec[1].iov_base + i - ob.index->buf]; + } + return NULL; +} - return (int)i; +static void prevent_buffer_underrun(void) +{ + static const char silence[CHUNK_SIZE]; + if (playAudio(silence, sizeof(silence)) < 0) + stop_playback(); } -ob_chunk * ob_get_chunk(const unsigned i) +/* causes ob_do_drop() to be called (and waits for completion) */ +void ob_drop_audio(enum ob_drop_type type) { - assert(i < ob.size); + assert(!pthread_equal(pthread_self(), ob.thread)); + assert(!pthread_equal(pthread_self(), dc.thread)); + assert(dc.state == DC_STATE_STOP); /* not needed, just a good idea */ + cond_enter(&ob_seq_cond); + switch (type) { + case OB_DROP_DECODED: ob.seq_drop = ob.seq_decoder; break; + case OB_DROP_PLAYING: ob.seq_drop = ob.seq_player; break; + } + cond_leave(&ob_seq_cond); + /* DEBUG("dropping %u\n", ob.seq_drop); */ + ob_trigger_action(OB_ACTION_DROP); + /* DEBUG("done dropping %u\n", ob.seq_drop); */ +} - return &ob.chunks[i]; +/* call this exactly once before decoding each song */ +void ob_advance_sequence(void) +{ + assert(pthread_equal(pthread_self(), dc.thread)); + DEBUG(__FILE__": %s %d\n", __func__, __LINE__); + cond_enter(&ob_seq_cond); + ++ob.seq_decoder; + cond_leave(&ob_seq_cond); + DEBUG(__FILE__": %s %d\n", __func__, __LINE__); + DEBUG("ob.seq_decoder: %d\n", ob.seq_decoder); } -/** - * Return the tail chunk which has room for additional data. If there - * is no room in the queue, this function blocks until the player - * thread has finished playing its current chunk. - * - * @return the positive index of the new chunk; OUTPUT_BUFFER_DC_SEEK - * if another thread requested seeking; OUTPUT_BUFFER_DC_STOP if - * another thread requested stopping the decoder. +/* + * Returns true if output buffer is playing the song we're decoding */ -static int tailChunk(InputStream * inStream, - int seekable, float data_time, mpd_uint16 bitRate) +int ob_synced(void) { - unsigned int next; - ob_chunk *chunk; - - chunk = ob_get_chunk(ob.end); - assert(chunk->chunkSize <= sizeof(chunk->data)); - if (chunk->chunkSize == sizeof(chunk->data)) { - /* this chunk is full; allocate a new chunk */ - next = successor(ob.end); - while (ob.begin == next) { - /* all chunks are full of decoded data; wait - for the player to free one */ - - if (dc.stop) - return OUTPUT_BUFFER_DC_STOP; - - if (dc.seek) { - if (seekable) { - return OUTPUT_BUFFER_DC_SEEK; - } else { - dc.seekError = 1; - dc.seek = 0; - decoder_wakeup_player(); - } - } - if (!inStream || bufferInputStream(inStream) <= 0) { - decoder_sleep(); - } - } + int ret; + assert(!pthread_equal(pthread_self(), dc.thread)); + assert(!pthread_equal(pthread_self(), ob.thread)); + /* assert(pthread_equal(pthread_self(), main_thread)); */ + cond_enter(&ob_seq_cond); + ret = (ob.seq_decoder == ob.seq_player); + cond_leave(&ob_seq_cond); + return ret; +} + +static void new_song_chunk(struct ob_chunk *a) +{ + assert(pthread_equal(pthread_self(), ob.thread)); + ob.xfade_state = XFADE_DISABLED; + ob.total_time = dc.total_time; + /* DEBUG("ob.total_time: %f\n", ob.total_time); */ + ob_seq_player_set((unsigned int)a->seq); + wakeup_main_task(); /* sync playlist */ +} + +#include "outputBuffer_audio.h" - output_buffer_expand(next); - chunk = ob_get_chunk(next); - assert(chunk->chunkSize == 0); +static void play_next_chunk(void) +{ + struct iovec vec[2]; + struct ob_chunk *a; + size_t nr; + static float last_time; + + assert(pthread_equal(pthread_self(), ob.thread)); + + nr = ringbuf_get_read_vector(ob.index, vec); + if (mpd_unlikely(!nr && + (dc.state == DC_STATE_STOP) && + ! playlist_playing())) { + stop_playback(); + return; + } + if (nr < ((ob.xfade_time <= 0) ? ob.nr_bpp : xfade_chunks_needed(vec))) + { + prevent_buffer_underrun(); + return; } - if (chunk->chunkSize == 0) { - /* if the chunk is empty, nobody has set bitRate and - times yet */ + a = get_chunk(vec, 0); + assert(a); + if (! a->len) + goto out; - chunk->bitRate = bitRate; - chunk->times = data_time; + if (nr > 1 && ob.xfade_state == XFADE_ENABLED) { + struct ob_chunk *b = get_chunk(vec, ob.xfade_max); + xfade_mix(a, b); } - return ob.end; + last_time = ob.elapsed_time = a->time; + ob.bit_rate = a->bit_rate; + + if (mpd_unlikely(ob.seq_player != a->seq)) { + if (open_audio_devices(1) < 0) + return; + new_song_chunk(a); + } + /* pcm_volumeChange(a->data, a->len, &ob.audio_format, ob.sw_vol); */ + if (playAudio(a->data, a->len) < 0) + stop_playback(); + a->len = 0; /* mark the chunk as empty for ob_send() */ +out: + ringbuf_read_advance(ob.index, 1); + + /* unblock ob_send() if it was waiting on a full buffer */ + dc_try_unhalt(); } -int ob_send(InputStream * inStream, - int seekable, void *dataIn, - size_t dataInLen, float data_time, mpd_uint16 bitRate, - ReplayGainInfo * replayGainInfo) +static void * ob_task(mpd_unused void *arg) { - size_t dataToSend; - char *data; - size_t datalen; - static char *convBuffer; - static size_t convBufferLen; - ob_chunk *chunk = NULL; - - if (cmpAudioFormat(&(ob.audioFormat), &(dc.audioFormat)) == 0) { - data = dataIn; - datalen = dataInLen; - } else { - datalen = pcm_sizeOfConvBuffer(&(dc.audioFormat), dataInLen, - &(ob.audioFormat)); - if (datalen > convBufferLen) { - if (convBuffer != NULL) - free(convBuffer); - convBuffer = xmalloc(datalen); - convBufferLen = datalen; + enum action_status as; + + assert(pthread_equal(pthread_self(), ob.thread)); + cond_enter(&ob_halt_cond); + while (1) { + as = ob_take_action(); + switch (ob.state) { + case OB_STATE_PLAY: + assert(as == AS_COMPLETE); + if (open_audio_devices(0) >= 0) + play_next_chunk(); + break; + case OB_STATE_STOP: + case OB_STATE_PAUSE: + case OB_STATE_SEEK: + assert(as != AS_DEFERRED); + if (as == AS_INPROGRESS) + ob_finalize_action(); + cond_wait(&ob_halt_cond); + break; + case OB_STATE_QUIT: goto out; } - data = convBuffer; - datalen = pcm_convertAudioFormat(&(dc.audioFormat), dataIn, - dataInLen, &(ob.audioFormat), - data, &(ob.convState)); } +out: + cond_leave(&ob_halt_cond); + assert(ob.state == OB_STATE_QUIT); + assert(as == AS_INPROGRESS); + ob_finalize_action(); + return NULL; +} - if (replayGainInfo && (replayGainState != REPLAYGAIN_OFF)) - doReplayGain(replayGainInfo, data, datalen, &ob.audioFormat); - else if (normalizationEnabled) - normalizeData(data, datalen, &ob.audioFormat); - - while (datalen) { - int chunk_index = tailChunk(inStream, seekable, - data_time, bitRate); - if (chunk_index < 0) - return chunk_index; - - chunk = ob_get_chunk(chunk_index); - - dataToSend = sizeof(chunk->data) - chunk->chunkSize; - if (dataToSend > datalen) - dataToSend = datalen; +#include "outputBuffer_config_init.h" - memcpy(chunk->data + chunk->chunkSize, data, dataToSend); - chunk->chunkSize += dataToSend; - datalen -= dataToSend; - data += dataToSend; - } - - if (chunk != NULL && chunk->chunkSize == sizeof(chunk->data)) - ob_flush(); +void ob_seek_start(void) +{ + assert(pthread_equal(pthread_self(), dc.thread)); + assert(dc.seek_where >= 0); + ob_trigger_action(OB_ACTION_SEEK_START); +} - return 0; +void ob_seek_finish(void) +{ + assert(pthread_equal(pthread_self(), dc.thread)); + ob_trigger_action(OB_ACTION_SEEK_FINISH); } -void ob_skip(unsigned num) +/* + * if there are any partially written chunk, flush them out to + * the output process _before_ decoding the next track + */ +void ob_flush(void) { - int i = ob_absolute(num); - if (i >= 0) - ob.begin = i; + struct iovec vec[2]; + + assert(pthread_equal(pthread_self(), dc.thread)); + /* DEBUG(__FILE__":%s %d\n", __func__, __LINE__); */ + + if (ringbuf_get_write_vector(ob.index, vec)) { + /* DEBUG(__FILE__":%s %d\n", __func__, __LINE__); */ + struct ob_chunk *c = get_chunk(vec, 0); + assert(c); + if (c->len) { + assert(ob.seq_decoder == c->seq); + switch (ob.state) { + case OB_STATE_SEEK: + assert(0); + case OB_STATE_PLAY: + case OB_STATE_PAUSE: + ringbuf_write_advance(ob.index, 1); + break; + case OB_STATE_STOP: + case OB_STATE_QUIT: + c->len = 0; + } + } + } } + +#include "outputBuffer_ob_send.h" |