aboutsummaryrefslogtreecommitdiffstats
path: root/src/output/pulse_output_plugin.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/output/pulse_output_plugin.c')
-rw-r--r--src/output/pulse_output_plugin.c654
1 files changed, 581 insertions, 73 deletions
diff --git a/src/output/pulse_output_plugin.c b/src/output/pulse_output_plugin.c
index 1b1c27575..c24a356f2 100644
--- a/src/output/pulse_output_plugin.c
+++ b/src/output/pulse_output_plugin.c
@@ -17,22 +17,20 @@
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
+#include "pulse_output_plugin.h"
#include "output_api.h"
#include "mixer_list.h"
#include <glib.h>
-#include <pulse/simple.h>
-#include <pulse/error.h>
-#define MPD_PULSE_NAME "Music Player Daemon"
+#include <pulse/thread-mainloop.h>
+#include <pulse/context.h>
+#include <pulse/stream.h>
+#include <pulse/error.h>
-struct pulse_data {
- const char *name;
- const char *server;
- const char *sink;
+#include <assert.h>
- pa_simple *s;
-};
+#define MPD_PULSE_NAME "Music Player Daemon"
/**
* The quark used for GError.domain.
@@ -43,76 +41,299 @@ pulse_output_quark(void)
return g_quark_from_static_string("pulse_output");
}
-static struct pulse_data *pulse_new_data(void)
+/**
+ * \brief waits for a pulseaudio operation to finish, frees it and
+ * unlocks the mainloop
+ * \param operation the operation to wait for
+ * \return true if operation has finished normally (DONE state),
+ * false otherwise
+ */
+static bool
+pulse_wait_for_operation(struct pa_threaded_mainloop *mainloop,
+ struct pa_operation *operation)
+{
+ pa_operation_state_t state;
+
+ assert(mainloop != NULL);
+ assert(operation != NULL);
+
+ state = pa_operation_get_state(operation);
+ while (state == PA_OPERATION_RUNNING) {
+ pa_threaded_mainloop_wait(mainloop);
+ state = pa_operation_get_state(operation);
+ }
+
+ pa_operation_unref(operation);
+
+ return state == PA_OPERATION_DONE;
+}
+
+/**
+ * Callback function for stream operation. It just sends a signal to
+ * the caller thread, to wake pulse_wait_for_operation() up.
+ */
+static void
+pulse_output_stream_success_cb(G_GNUC_UNUSED pa_stream *s,
+ G_GNUC_UNUSED int success, void *userdata)
+{
+ struct pulse_output *po = userdata;
+
+ pa_threaded_mainloop_signal(po->mainloop, 0);
+}
+
+void
+pulse_output_context_state_cb(struct pa_context *context, void *userdata)
+{
+ struct pulse_output *po = userdata;
+
+ switch (pa_context_get_state(context)) {
+ case PA_CONTEXT_READY:
+ case PA_CONTEXT_TERMINATED:
+ case PA_CONTEXT_FAILED:
+ /* the caller thread might be waiting for these
+ states */
+ pa_threaded_mainloop_signal(po->mainloop, 0);
+ break;
+
+ case PA_CONTEXT_UNCONNECTED:
+ case PA_CONTEXT_CONNECTING:
+ case PA_CONTEXT_AUTHORIZING:
+ case PA_CONTEXT_SETTING_NAME:
+ break;
+ }
+}
+
+/**
+ * Attempt to connect asynchronously to the PulseAudio server.
+ *
+ * @return true on success, false on error
+ */
+static bool
+pulse_output_connect(struct pulse_output *po, GError **error_r)
{
- struct pulse_data *ret;
+ int error;
- ret = g_new(struct pulse_data, 1);
+ error = pa_context_connect(po->context, po->server,
+ (pa_context_flags_t)0, NULL);
+ if (error < 0) {
+ g_set_error(error_r, pulse_output_quark(), 0,
+ "pa_context_connect() has failed: %s",
+ pa_strerror(pa_context_errno(po->context)));
+ return false;
+ }
- ret->server = NULL;
- ret->sink = NULL;
+ return true;
+}
- return ret;
+/**
+ * Create, set up and connect a context.
+ *
+ * @return true on success, false on error
+ */
+static bool
+pulse_output_setup_context(struct pulse_output *po, GError **error_r)
+{
+ po->context = pa_context_new(pa_threaded_mainloop_get_api(po->mainloop),
+ MPD_PULSE_NAME);
+ if (po->context == NULL) {
+ g_set_error(error_r, pulse_output_quark(), 0,
+ "pa_context_new() has failed");
+ return false;
+ }
+
+ pa_context_set_state_callback(po->context,
+ pulse_output_context_state_cb, po);
+
+ if (!pulse_output_connect(po, error_r)) {
+ pa_context_unref(po->context);
+ return false;
+ }
+
+ return true;
}
-static void pulse_free_data(struct pulse_data *pd)
+/**
+ * Frees and clears the context.
+ */
+static void
+pulse_output_delete_context(struct pulse_output *po)
{
- g_free(pd);
+ pa_context_disconnect(po->context);
+ pa_context_unref(po->context);
+ po->context = NULL;
}
static void *
-pulse_init(G_GNUC_UNUSED const struct audio_format *audio_format,
- const struct config_param *param, G_GNUC_UNUSED GError **error)
+pulse_output_init(G_GNUC_UNUSED const struct audio_format *audio_format,
+ const struct config_param *param, GError **error_r)
{
- struct pulse_data *pd;
+ struct pulse_output *po;
g_setenv("PULSE_PROP_media.role", "music", true);
- pd = pulse_new_data();
- pd->name = config_get_block_string(param, "name", "mpd_pulse");
- pd->server = config_get_block_string(param, "server", NULL);
- pd->sink = config_get_block_string(param, "sink", NULL);
+ po = g_new(struct pulse_output, 1);
+ po->name = config_get_block_string(param, "name", "mpd_pulse");
+ po->server = config_get_block_string(param, "server", NULL);
+ po->sink = config_get_block_string(param, "sink", NULL);
- return pd;
+ /* create the libpulse mainloop and start the thread */
+
+ po->mainloop = pa_threaded_mainloop_new();
+ if (po->mainloop == NULL) {
+ g_free(po);
+
+ g_set_error(error_r, pulse_output_quark(), 0,
+ "pa_threaded_mainloop_new() has failed");
+ return NULL;
+ }
+
+ pa_threaded_mainloop_lock(po->mainloop);
+
+ if (pa_threaded_mainloop_start(po->mainloop) < 0) {
+ pa_threaded_mainloop_unlock(po->mainloop);
+ pa_threaded_mainloop_free(po->mainloop);
+ g_free(po);
+
+ g_set_error(error_r, pulse_output_quark(), 0,
+ "pa_threaded_mainloop_start() has failed");
+ return false;
+ }
+
+ pa_threaded_mainloop_unlock(po->mainloop);
+
+ /* create the libpulse context and connect it */
+
+ pa_threaded_mainloop_lock(po->mainloop);
+
+ if (!pulse_output_setup_context(po, error_r)) {
+ pa_threaded_mainloop_unlock(po->mainloop);
+ pa_threaded_mainloop_stop(po->mainloop);
+ pa_threaded_mainloop_free(po->mainloop);
+ g_free(po);
+ return NULL;
+ }
+
+ pa_threaded_mainloop_unlock(po->mainloop);
+
+ return po;
}
-static void pulse_finish(void *data)
+static void
+pulse_output_finish(void *data)
{
- struct pulse_data *pd = data;
+ struct pulse_output *po = data;
+
+ pa_threaded_mainloop_stop(po->mainloop);
+ if (po->context != NULL)
+ pulse_output_delete_context(po);
+ pa_threaded_mainloop_free(po->mainloop);
- pulse_free_data(pd);
+ g_free(po);
}
-static bool pulse_test_default_device(void)
+/**
+ * Check if the context is (already) connected, and waits if not. If
+ * the context has been disconnected, retry to connect.
+ *
+ * @return true on success, false on error
+ */
+static bool
+pulse_output_wait_connection(struct pulse_output *po, GError **error_r)
{
- pa_simple *s;
- pa_sample_spec ss;
- int error;
+ pa_context_state_t state;
- ss.format = PA_SAMPLE_S16NE;
- ss.rate = 44100;
- ss.channels = 2;
-
- s = pa_simple_new(NULL, MPD_PULSE_NAME, PA_STREAM_PLAYBACK, NULL,
- MPD_PULSE_NAME, &ss, NULL, NULL, &error);
- if (!s) {
- g_message("Cannot connect to default PulseAudio server: %s\n",
- pa_strerror(error));
+ pa_threaded_mainloop_lock(po->mainloop);
+
+ if (po->context == NULL && !pulse_output_setup_context(po, error_r))
return false;
+
+ while (true) {
+ state = pa_context_get_state(po->context);
+ switch (state) {
+ case PA_CONTEXT_READY:
+ /* nothing to do */
+ pa_threaded_mainloop_unlock(po->mainloop);
+ return true;
+
+ case PA_CONTEXT_UNCONNECTED:
+ case PA_CONTEXT_TERMINATED:
+ case PA_CONTEXT_FAILED:
+ /* failure */
+ g_set_error(error_r, pulse_output_quark(), 0,
+ "failed to connect: %s",
+ pa_strerror(pa_context_errno(po->context)));
+ pulse_output_delete_context(po);
+ pa_threaded_mainloop_unlock(po->mainloop);
+ return false;
+
+ case PA_CONTEXT_CONNECTING:
+ case PA_CONTEXT_AUTHORIZING:
+ case PA_CONTEXT_SETTING_NAME:
+ /* wait some more */
+ pa_threaded_mainloop_wait(po->mainloop);
+ break;
+ }
}
+}
- pa_simple_free(s);
+static void
+pulse_output_stream_state_cb(pa_stream *stream, void *userdata)
+{
+ struct pulse_output *po = userdata;
+
+ switch (pa_stream_get_state(stream)) {
+ case PA_STREAM_READY:
+ case PA_STREAM_FAILED:
+ case PA_STREAM_TERMINATED:
+ pa_threaded_mainloop_signal(po->mainloop, 0);
+ break;
+
+ case PA_STREAM_UNCONNECTED:
+ case PA_STREAM_CREATING:
+ break;
+ }
+}
- return true;
+static void
+pulse_output_stream_write_cb(G_GNUC_UNUSED pa_stream *stream, size_t nbytes,
+ void *userdata)
+{
+ struct pulse_output *po = userdata;
+
+ po->writable = nbytes;
+ pa_threaded_mainloop_signal(po->mainloop, 0);
}
static bool
-pulse_open(void *data, struct audio_format *audio_format, GError **error_r)
+pulse_output_open(void *data, struct audio_format *audio_format,
+ GError **error_r)
{
- struct pulse_data *pd = data;
+ struct pulse_output *po = data;
pa_sample_spec ss;
int error;
+ if (po->context != NULL) {
+ switch (pa_context_get_state(po->context)) {
+ case PA_CONTEXT_UNCONNECTED:
+ case PA_CONTEXT_TERMINATED:
+ case PA_CONTEXT_FAILED:
+ /* the connection was closed meanwhile; delete
+ it, and pulse_output_wait_connection() will
+ reopen it */
+ pulse_output_delete_context(po);
+ break;
+
+ case PA_CONTEXT_READY:
+ case PA_CONTEXT_CONNECTING:
+ case PA_CONTEXT_AUTHORIZING:
+ case PA_CONTEXT_SETTING_NAME:
+ break;
+ }
+ }
+
+ if (!pulse_output_wait_connection(po, error_r))
+ return false;
+
/* MPD doesn't support the other pulseaudio sample formats, so
we just force MPD to send us everything as 16 bit */
audio_format->bits = 16;
@@ -121,45 +342,243 @@ pulse_open(void *data, struct audio_format *audio_format, GError **error_r)
ss.rate = audio_format->sample_rate;
ss.channels = audio_format->channels;
- pd->s = pa_simple_new(pd->server, MPD_PULSE_NAME, PA_STREAM_PLAYBACK,
- pd->sink, pd->name,
- &ss, NULL, NULL,
- &error);
- if (!pd->s) {
- g_set_error(error_r, pulse_output_quark(), error,
- "Cannot connect to PulseAudio server: %s",
- pa_strerror(error));
+ pa_threaded_mainloop_lock(po->mainloop);
+
+ /* create a stream .. */
+
+ po->stream = pa_stream_new(po->context, po->name, &ss, NULL);
+ if (po->stream == NULL) {
+ g_set_error(error_r, pulse_output_quark(), 0,
+ "pa_stream_new() has failed: %s",
+ pa_strerror(pa_context_errno(po->context)));
+ pa_threaded_mainloop_unlock(po->mainloop);
+ return false;
+ }
+
+ pa_stream_set_state_callback(po->stream,
+ pulse_output_stream_state_cb, po);
+ pa_stream_set_write_callback(po->stream,
+ pulse_output_stream_write_cb, po);
+
+ /* .. and connect it (asynchronously) */
+
+ error = pa_stream_connect_playback(po->stream, po->sink,
+ NULL, 0, NULL, NULL);
+ if (error < 0) {
+ pa_stream_unref(po->stream);
+ po->stream = NULL;
+
+ g_set_error(error_r, pulse_output_quark(), 0,
+ "pa_stream_connect_playback() has failed: %s",
+ pa_strerror(pa_context_errno(po->context)));
+ pa_threaded_mainloop_unlock(po->mainloop);
return false;
}
+ pa_threaded_mainloop_unlock(po->mainloop);
+
+#if !PA_CHECK_VERSION(0,9,11)
+ po->pause = false;
+#endif
+
return true;
}
-static void pulse_cancel(void *data)
+static void
+pulse_output_close(void *data)
{
- struct pulse_data *pd = data;
- int error;
+ struct pulse_output *po = data;
+ pa_operation *o;
+
+ pa_threaded_mainloop_lock(po->mainloop);
+
+ if (pa_stream_get_state(po->stream) == PA_STREAM_READY) {
+ o = pa_stream_drain(po->stream,
+ pulse_output_stream_success_cb, po);
+ if (o == NULL) {
+ g_warning("pa_stream_drain() has failed: %s",
+ pa_strerror(pa_context_errno(po->context)));
+ } else
+ pulse_wait_for_operation(po->mainloop, o);
+ }
+
+ pa_stream_disconnect(po->stream);
+ pa_stream_unref(po->stream);
+ po->stream = NULL;
- if (pa_simple_flush(pd->s, &error) < 0)
- g_warning("Flush failed in PulseAudio output \"%s\": %s\n",
- pd->name, pa_strerror(error));
+ if (po->context != NULL &&
+ pa_context_get_state(po->context) != PA_CONTEXT_READY)
+ pulse_output_delete_context(po);
+
+ pa_threaded_mainloop_unlock(po->mainloop);
+}
+
+/**
+ * Check if the stream is (already) connected, and waits for a signal
+ * if not. The mainloop must be locked before calling this function.
+ *
+ * @return the current stream state
+ */
+static pa_stream_state_t
+pulse_output_check_stream(struct pulse_output *po)
+{
+ pa_stream_state_t state = pa_stream_get_state(po->stream);
+
+ switch (state) {
+ case PA_STREAM_READY:
+ case PA_STREAM_FAILED:
+ case PA_STREAM_TERMINATED:
+ case PA_STREAM_UNCONNECTED:
+ break;
+
+ case PA_STREAM_CREATING:
+ pa_threaded_mainloop_wait(po->mainloop);
+ state = pa_stream_get_state(po->stream);
+ break;
+ }
+
+ return state;
+}
+
+/**
+ * Check if the stream is (already) connected, and waits if not. The
+ * mainloop must be locked before calling this function.
+ *
+ * @return true on success, false on error
+ */
+static bool
+pulse_output_wait_stream(struct pulse_output *po, GError **error_r)
+{
+ pa_stream_state_t state = pa_stream_get_state(po->stream);
+
+ switch (state) {
+ case PA_STREAM_READY:
+ return true;
+
+ case PA_STREAM_FAILED:
+ case PA_STREAM_TERMINATED:
+ case PA_STREAM_UNCONNECTED:
+ g_set_error(error_r, pulse_output_quark(), 0,
+ "disconnected");
+ return false;
+
+ case PA_STREAM_CREATING:
+ break;
+ }
+
+ do {
+ state = pulse_output_check_stream(po);
+ } while (state == PA_STREAM_CREATING);
+
+ if (state != PA_STREAM_READY) {
+ g_set_error(error_r, pulse_output_quark(), 0,
+ "failed to connect the stream: %s",
+ pa_strerror(pa_context_errno(po->context)));
+ return false;
+ }
+
+ return true;
}
-static void pulse_close(void *data)
+/**
+ * Determines whether the stream is paused. On libpulse older than
+ * 0.9.11, it uses a custom pause flag.
+ */
+static bool
+pulse_output_stream_is_paused(struct pulse_output *po)
{
- struct pulse_data *pd = data;
+ assert(po->stream != NULL);
- pa_simple_drain(pd->s, NULL);
- pa_simple_free(pd->s);
+#if !defined(PA_CHECK_VERSION) || !PA_CHECK_VERSION(0,9,11)
+ return po->pause;
+#else
+ return pa_stream_is_corked(po->stream);
+#endif
+}
+
+/**
+ * Sets cork mode on the stream.
+ */
+static bool
+pulse_output_stream_pause(struct pulse_output *po, bool pause,
+ GError **error_r)
+{
+ pa_operation *o;
+
+ assert(po->stream != NULL);
+
+ o = pa_stream_cork(po->stream, pause,
+ pulse_output_stream_success_cb, po);
+ if (o == NULL) {
+ g_set_error(error_r, pulse_output_quark(), 0,
+ "pa_stream_cork() has failed: %s",
+ pa_strerror(pa_context_errno(po->context)));
+ return false;
+ }
+
+ if (!pulse_wait_for_operation(po->mainloop, o)) {
+ g_set_error(error_r, pulse_output_quark(), 0,
+ "pa_stream_cork() has failed: %s",
+ pa_strerror(pa_context_errno(po->context)));
+ return false;
+ }
+
+#if !PA_CHECK_VERSION(0,9,11)
+ po->pause = pause;
+#endif
+ return true;
}
static size_t
-pulse_play(void *data, const void *chunk, size_t size, GError **error_r)
+pulse_output_play(void *data, const void *chunk, size_t size, GError **error_r)
{
- struct pulse_data *pd = data;
+ struct pulse_output *po = data;
int error;
- if (pa_simple_write(pd->s, chunk, size, &error) < 0) {
+ assert(po->stream != NULL);
+
+ pa_threaded_mainloop_lock(po->mainloop);
+
+ /* check if the stream is (already) connected */
+
+ if (!pulse_output_wait_stream(po, error_r)) {
+ pa_threaded_mainloop_unlock(po->mainloop);
+ return 0;
+ }
+
+ assert(po->context != NULL);
+
+ /* unpause if previously paused */
+
+ if (pulse_output_stream_is_paused(po) &&
+ !pulse_output_stream_pause(po, false, error_r))
+ return 0;
+
+ /* wait until the server allows us to write */
+
+ while (po->writable == 0) {
+ pa_threaded_mainloop_wait(po->mainloop);
+
+ if (pa_stream_get_state(po->stream) != PA_STREAM_READY) {
+ pa_threaded_mainloop_unlock(po->mainloop);
+ g_set_error(error_r, pulse_output_quark(), 0,
+ "disconnected");
+ return false;
+ }
+ }
+
+ /* now write */
+
+ if (size > po->writable)
+ /* don't send more than possible */
+ size = po->writable;
+
+ po->writable -= size;
+
+ error = pa_stream_write(po->stream, chunk, size, NULL,
+ 0, PA_SEEK_RELATIVE);
+ pa_threaded_mainloop_unlock(po->mainloop);
+ if (error < 0) {
g_set_error(error_r, pulse_output_quark(), error,
"%s", pa_strerror(error));
return 0;
@@ -168,16 +587,105 @@ pulse_play(void *data, const void *chunk, size_t size, GError **error_r)
return size;
}
+static void
+pulse_output_cancel(void *data)
+{
+ struct pulse_output *po = data;
+ pa_operation *o;
+
+ assert(po->stream != NULL);
+
+ pa_threaded_mainloop_lock(po->mainloop);
+
+ if (pa_stream_get_state(po->stream) != PA_STREAM_READY) {
+ /* no need to flush when the stream isn't connected
+ yet */
+ pa_threaded_mainloop_unlock(po->mainloop);
+ return;
+ }
+
+ assert(po->context != NULL);
+
+ o = pa_stream_flush(po->stream, pulse_output_stream_success_cb, po);
+ if (o == NULL) {
+ g_warning("pa_stream_flush() has failed: %s",
+ pa_strerror(pa_context_errno(po->context)));
+ pa_threaded_mainloop_unlock(po->mainloop);
+ return;
+ }
+
+ pulse_wait_for_operation(po->mainloop, o);
+ pa_threaded_mainloop_unlock(po->mainloop);
+}
+
+static bool
+pulse_output_pause(void *data)
+{
+ struct pulse_output *po = data;
+ GError *error = NULL;
+
+ assert(po->stream != NULL);
+
+ pa_threaded_mainloop_lock(po->mainloop);
+
+ /* check if the stream is (already/still) connected */
+
+ if (!pulse_output_wait_stream(po, &error)) {
+ pa_threaded_mainloop_unlock(po->mainloop);
+ g_warning("%s", error->message);
+ g_error_free(error);
+ return false;
+ }
+
+ assert(po->context != NULL);
+
+ /* cork the stream */
+
+ if (pulse_output_stream_is_paused(po)) {
+ /* already paused; due to a MPD API limitation, we
+ have to sleep a little bit here, to avoid hogging
+ the CPU */
+
+ g_usleep(50000);
+ } else if (!pulse_output_stream_pause(po, true, &error)) {
+ pa_threaded_mainloop_unlock(po->mainloop);
+ g_warning("%s", error->message);
+ g_error_free(error);
+ return false;
+ }
+
+ pa_threaded_mainloop_unlock(po->mainloop);
+
+ return true;
+}
+
+static bool
+pulse_output_test_default_device(void)
+{
+ struct pulse_output *po;
+ bool success;
+
+ po = pulse_output_init(NULL, NULL, NULL);
+ if (po == NULL)
+ return false;
+
+ success = pulse_output_wait_connection(po, NULL);
+ pulse_output_finish(po);
+
+ return success;
+}
+
const struct audio_output_plugin pulse_output_plugin = {
.name = "pulse",
- .test_default_device = pulse_test_default_device,
- .init = pulse_init,
- .finish = pulse_finish,
- .open = pulse_open,
- .play = pulse_play,
- .cancel = pulse_cancel,
- .close = pulse_close,
+ .test_default_device = pulse_output_test_default_device,
+ .init = pulse_output_init,
+ .finish = pulse_output_finish,
+ .open = pulse_output_open,
+ .play = pulse_output_play,
+ .cancel = pulse_output_cancel,
+ .pause = pulse_output_pause,
+ .close = pulse_output_close,
.mixer_plugin = &pulse_mixer_plugin,
};