aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--Makefile.am1
-rw-r--r--NEWS1
-rw-r--r--configure.ac2
-rw-r--r--src/mixer/pulse_mixer_plugin.c335
-rw-r--r--src/output/pulse_output_plugin.c654
-rw-r--r--src/output/pulse_output_plugin.h58
-rw-r--r--test/read_mixer.c13
-rw-r--r--test/run_output.c6
8 files changed, 787 insertions, 283 deletions
diff --git a/Makefile.am b/Makefile.am
index ad39b8f3e..dfeb5386a 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -115,6 +115,7 @@ mpd_headers = \
src/mapper.h \
src/output/httpd_client.h \
src/output/httpd_internal.h \
+ src/output/pulse_output_plugin.h \
src/page.h \
src/pcm_buffer.h \
src/pcm_utils.h \
diff --git a/NEWS b/NEWS
index 3f5f5f896..4906fa3f3 100644
--- a/NEWS
+++ b/NEWS
@@ -31,6 +31,7 @@ ver 0.16 (20??/??/??)
- openal: new output plugin
- pulse: announce "media.role=music"
- pulse: renamed context to "Music Player Daemon"
+ - pulse: connect to server on MPD startup, implement pause
* mixers:
- removed support for legacy mixer configuration
- reimplemented software volume as mixer+filter plugin
diff --git a/configure.ac b/configure.ac
index 885115971..150e81aa0 100644
--- a/configure.ac
+++ b/configure.ac
@@ -726,7 +726,7 @@ AC_ARG_ENABLE(pulse,
[enable support for the PulseAudio sound server]),,
enable_pulse=auto)
-MPD_AUTO_PKG(pulse, PULSE, [libpulse-simple],
+MPD_AUTO_PKG(pulse, PULSE, [libpulse],
[PulseAudio output plugin], [libpulse not found])
if test x$enable_pulse = xyes; then
AC_DEFINE([HAVE_PULSE], 1,
diff --git a/src/mixer/pulse_mixer_plugin.c b/src/mixer/pulse_mixer_plugin.c
index ecc0fc75b..b33ef80ae 100644
--- a/src/mixer/pulse_mixer_plugin.c
+++ b/src/mixer/pulse_mixer_plugin.c
@@ -18,12 +18,20 @@
*/
#include "mixer_api.h"
+#include "output/pulse_output_plugin.h"
#include "conf.h"
+#include "event_pipe.h"
#include <glib.h>
-#include <pulse/volume.h>
-#include <pulse/pulseaudio.h>
+#include <pulse/thread-mainloop.h>
+#include <pulse/context.h>
+#include <pulse/introspect.h>
+#include <pulse/stream.h>
+#include <pulse/subscribe.h>
+#include <pulse/error.h>
+
+#include <assert.h>
#include <string.h>
#undef G_LOG_DOMAIN
@@ -32,15 +40,9 @@
struct pulse_mixer {
struct mixer base;
- const char *server;
- const char *sink;
- const char *output_name;
+ struct pulse_output *output;
- uint32_t index;
bool online;
-
- struct pa_context *context;
- struct pa_threaded_mainloop *mainloop;
struct pa_cvolume volume;
};
@@ -54,175 +56,159 @@ pulse_mixer_quark(void)
return g_quark_from_static_string("pulse_mixer");
}
-/**
- * \brief waits for a pulseaudio operation to finish, frees it and
- * unlocks the mainloop
- * \param operation the operation to wait for
- * \return true if operation has finished normally (DONE state),
- * false otherwise
- */
-static bool
-pulse_wait_for_operation(struct pa_threaded_mainloop *mainloop,
- struct pa_operation *operation)
+static void
+pulse_mixer_offline(struct pulse_mixer *pm)
{
- pa_operation_state_t state;
-
- assert(mainloop != NULL);
- assert(operation != NULL);
-
- state = pa_operation_get_state(operation);
- while (state == PA_OPERATION_RUNNING) {
- pa_threaded_mainloop_wait(mainloop);
- state = pa_operation_get_state(operation);
- }
+ if (!pm->online)
+ return;
- pa_operation_unref(operation);
+ pm->online = false;
- return state == PA_OPERATION_DONE;
+ event_pipe_emit(PIPE_EVENT_MIXER);
}
+/**
+ * Callback invoked by pulse_mixer_update(). Receives the new mixer
+ * value.
+ */
static void
-sink_input_cb(G_GNUC_UNUSED pa_context *context, const pa_sink_input_info *i,
- int eol, void *userdata)
+pulse_mixer_volume_cb(G_GNUC_UNUSED pa_context *context, const pa_sink_input_info *i,
+ int eol, void *userdata)
{
-
struct pulse_mixer *pm = userdata;
- if (eol) {
- g_debug("eol error sink_input_cb");
+ if (eol)
return;
- }
if (i == NULL) {
- g_debug("Sink input callback failure");
+ pulse_mixer_offline(pm);
return;
}
- g_debug("sink input cb %s, index %d ",i->name,i->index);
+ pm->online = true;
+ pm->volume = i->volume;
- if (strcmp(i->name,pm->output_name) == 0) {
- pm->index = i->index;
- pm->online = true;
- pm->volume = i->volume;
- } else
- g_debug("bad name");
+ event_pipe_emit(PIPE_EVENT_MIXER);
}
static void
-sink_input_vol(G_GNUC_UNUSED pa_context *context, const pa_sink_input_info *i,
- int eol, void *userdata)
+pulse_mixer_update(struct pulse_mixer *pm)
{
+ pa_operation *o;
- struct pulse_mixer *pm = userdata;
+ assert(pm->output->stream != NULL);
- if (eol) {
- g_debug("eol error sink_input_vol");
+ if (pm->output->context == NULL)
return;
- }
- if (i == NULL) {
- g_debug("Sink input callback failure");
+ o = pa_context_get_sink_input_info(pm->output->context,
+ pa_stream_get_index(pm->output->stream),
+ pulse_mixer_volume_cb, pm);
+ if (o == NULL) {
+ g_warning("pa_context_get_sink_input_info() failed: %s",
+ pa_strerror(pa_context_errno(pm->output->context)));
+ pulse_mixer_offline(pm);
return;
}
- g_debug("sink input vol %s, index %d ", i->name, i->index);
+ pa_operation_unref(o);
+}
- pm->volume = i->volume;
+static void
+pulse_mixer_handle_sink_input(struct pulse_mixer *pm,
+ pa_subscription_event_type_t t,
+ uint32_t idx)
+{
+ if (pm->output->stream == NULL) {
+ pulse_mixer_offline(pm);
+ return;
+ }
+
+ if (idx != pa_stream_get_index(pm->output->stream))
+ return;
- pa_threaded_mainloop_signal(pm->mainloop, 0);
+ if (t == PA_SUBSCRIPTION_EVENT_NEW ||
+ t == PA_SUBSCRIPTION_EVENT_CHANGE)
+ pulse_mixer_update(pm);
}
static void
-subscribe_cb(pa_context *c, pa_subscription_event_type_t t,
+pulse_mixer_subscribe_cb(G_GNUC_UNUSED pa_context *c, pa_subscription_event_type_t t,
uint32_t idx, void *userdata)
{
-
struct pulse_mixer *pm = userdata;
- g_debug("subscribe call back");
-
switch (t & PA_SUBSCRIPTION_EVENT_FACILITY_MASK) {
case PA_SUBSCRIPTION_EVENT_SINK_INPUT:
- if ((t & PA_SUBSCRIPTION_EVENT_TYPE_MASK) ==
- PA_SUBSCRIPTION_EVENT_REMOVE &&
- pm->index == idx)
- pm->online = false;
- else {
- pa_operation *o;
-
- o = pa_context_get_sink_input_info(c, idx,
- sink_input_cb, pm);
- if (o == NULL) {
- g_debug("pa_context_get_sink_input_info() failed");
- return;
- }
-
- pa_operation_unref(o);
- }
-
+ pulse_mixer_handle_sink_input(pm,
+ t & PA_SUBSCRIPTION_EVENT_TYPE_MASK,
+ idx);
break;
}
}
static void
-context_state_cb(pa_context *context, void *userdata)
+pulxe_mixer_context_state_cb(pa_context *context, void *userdata)
{
struct pulse_mixer *pm = userdata;
+ pa_operation *o;
- switch (pa_context_get_state(context)) {
- case PA_CONTEXT_READY: {
- pa_operation *o;
+ /* pass event to the output's callback function */
+ pulse_output_context_state_cb(context, pm->output);
- pa_context_set_subscribe_callback(context, subscribe_cb, pm);
+ if (pa_context_get_state(context) == PA_CONTEXT_READY) {
+ /* subscribe to sink_input events after the connection
+ has been established */
o = pa_context_subscribe(context,
(pa_subscription_mask_t)PA_SUBSCRIPTION_MASK_SINK_INPUT,
NULL, NULL);
if (o == NULL) {
- g_debug("pa_context_subscribe() failed");
+ g_warning("pa_context_subscribe() failed: %s",
+ pa_strerror(pa_context_errno(context)));
return;
}
pa_operation_unref(o);
- o = pa_context_get_sink_input_info_list(context,
- sink_input_cb, pm);
- if (o == NULL) {
- g_debug("pa_context_get_sink_input_info_list() failed");
- return;
- }
-
- pa_operation_unref(o);
-
- pa_threaded_mainloop_signal(pm->mainloop, 0);
- break;
- }
-
- case PA_CONTEXT_UNCONNECTED:
- case PA_CONTEXT_CONNECTING:
- case PA_CONTEXT_AUTHORIZING:
- case PA_CONTEXT_SETTING_NAME:
- break;
- case PA_CONTEXT_TERMINATED:
- case PA_CONTEXT_FAILED:
- pa_threaded_mainloop_signal(pm->mainloop, 0);
- break;
+ if (pm->output->stream != NULL)
+ pulse_mixer_update(pm);
}
}
-
static struct mixer *
-pulse_mixer_init(G_GNUC_UNUSED void *ao, const struct config_param *param,
- G_GNUC_UNUSED GError **error_r)
+pulse_mixer_init(void *ao, G_GNUC_UNUSED const struct config_param *param,
+ GError **error_r)
{
- struct pulse_mixer *pm = g_new(struct pulse_mixer,1);
+ struct pulse_mixer *pm;
+
+ if (ao == NULL) {
+ g_set_error(error_r, pulse_mixer_quark(), 0,
+ "The pulse mixer cannot work without the audio output");
+ return false;
+ }
+
+ pm = g_new(struct pulse_mixer,1);
mixer_init(&pm->base, &pulse_mixer_plugin);
+ pm->output = ao;
pm->online = false;
- pm->server = config_get_block_string(param, "server", NULL);
- pm->sink = config_get_block_string(param, "sink", NULL);
- pm->output_name = config_get_block_string(param, "name", NULL);
+ pa_threaded_mainloop_lock(pm->output->mainloop);
+
+ /* register callbacks (override the output's context state
+ callback) */
+
+ pa_context_set_state_callback(pm->output->context,
+ pulxe_mixer_context_state_cb, pm);
+ pa_context_set_subscribe_callback(pm->output->context,
+ pulse_mixer_subscribe_cb, pm);
+
+ /* check the current state now (we might have missed the first
+ events!) */
+ pulxe_mixer_context_state_cb(pm->output->context, pm);
+
+ pa_threaded_mainloop_unlock(pm->output->mainloop);
return &pm->base;
}
@@ -232,79 +218,35 @@ pulse_mixer_finish(struct mixer *data)
{
struct pulse_mixer *pm = (struct pulse_mixer *) data;
- g_free(pm);
-}
+ /* restore callbacks */
-static bool
-pulse_mixer_setup(struct pulse_mixer *pm, GError **error_r)
-{
- pa_context_set_state_callback(pm->context, context_state_cb, pm);
+ pa_threaded_mainloop_lock(pm->output->mainloop);
- if (pa_context_connect(pm->context, pm->server,
- (pa_context_flags_t)0, NULL) < 0) {
- g_set_error(error_r, pulse_mixer_quark(), 0,
- "pa_context_connect() has failed");
- return false;
+ if (pm->output->context != NULL) {
+ pa_context_set_state_callback(pm->output->context,
+ pulse_output_context_state_cb,
+ pm->output);
+ pa_context_set_subscribe_callback(pm->output->context,
+ NULL, NULL);
}
- pa_threaded_mainloop_lock(pm->mainloop);
+ pa_threaded_mainloop_unlock(pm->output->mainloop);
- if (pa_threaded_mainloop_start(pm->mainloop) < 0) {
- pa_threaded_mainloop_unlock(pm->mainloop);
- g_set_error(error_r, pulse_mixer_quark(), 0,
- "pa_threaded_mainloop_start() has failed");
- return false;
- }
-
- pa_threaded_mainloop_wait(pm->mainloop);
-
- if (pa_context_get_state(pm->context) != PA_CONTEXT_READY) {
- g_set_error(error_r, pulse_mixer_quark(), 0,
- "failed to connect: %s",
- pa_strerror(pa_context_errno(pm->context)));
- pa_threaded_mainloop_unlock(pm->mainloop);
- return false;
- }
+ /* free resources */
- pa_threaded_mainloop_unlock(pm->mainloop);
-
- return true;
+ g_free(pm);
}
static bool
-pulse_mixer_open(struct mixer *data, GError **error_r)
+pulse_mixer_open(struct mixer *data, G_GNUC_UNUSED GError **error_r)
{
struct pulse_mixer *pm = (struct pulse_mixer *) data;
- g_debug("pulse mixer open");
-
- pm->index = 0;
- pm->online = false;
-
- pm->mainloop = pa_threaded_mainloop_new();
- if (pm->mainloop == NULL) {
- g_set_error(error_r, pulse_mixer_quark(), 0,
- "pa_threaded_mainloop_new() has failed");
- return false;
- }
-
- pm->context = pa_context_new(pa_threaded_mainloop_get_api(pm->mainloop),
- "Mixer mpd");
- if (pm->context == NULL) {
- pa_threaded_mainloop_stop(pm->mainloop);
- pa_threaded_mainloop_free(pm->mainloop);
- g_set_error(error_r, pulse_mixer_quark(), 0,
- "pa_context_new() has failed");
- return false;
- }
-
- if (!pulse_mixer_setup(pm, error_r)) {
- pa_threaded_mainloop_stop(pm->mainloop);
- pa_context_disconnect(pm->context);
- pa_context_unref(pm->context);
- pa_threaded_mainloop_free(pm->mainloop);
- return false;
- }
+ pa_threaded_mainloop_lock(pm->output->mainloop);
+ if (pm->output->stream != NULL &&
+ pa_stream_get_state(pm->output->stream) == PA_STREAM_READY)
+ pulse_mixer_update(pm);
+ pa_threaded_mainloop_unlock(pm->output->mainloop);
return true;
}
@@ -314,49 +256,22 @@ pulse_mixer_close(struct mixer *data)
{
struct pulse_mixer *pm = (struct pulse_mixer *) data;
- pa_threaded_mainloop_stop(pm->mainloop);
- pa_context_disconnect(pm->context);
- pa_context_unref(pm->context);
- pa_threaded_mainloop_free(pm->mainloop);
-
- pm->online = false;
+ pulse_mixer_offline(pm);
}
static int
-pulse_mixer_get_volume(struct mixer *mixer, GError **error_r)
+pulse_mixer_get_volume(struct mixer *mixer, G_GNUC_UNUSED GError **error_r)
{
struct pulse_mixer *pm = (struct pulse_mixer *) mixer;
int ret;
- pa_operation *o;
- pa_threaded_mainloop_lock(pm->mainloop);
-
- if (!pm->online) {
- pa_threaded_mainloop_unlock(pm->mainloop);
- return false;
- }
-
- o = pa_context_get_sink_input_info(pm->context, pm->index,
- sink_input_vol, pm);
- if (o == NULL) {
- pa_threaded_mainloop_unlock(pm->mainloop);
- g_set_error(error_r, pulse_mixer_quark(), 0,
- "pa_context_get_sink_input_info() has failed");
- return false;
- }
-
- if (!pulse_wait_for_operation(pm->mainloop, o)) {
- pa_threaded_mainloop_unlock(pm->mainloop);
- g_set_error(error_r, pulse_mixer_quark(), 0,
- "failed to read PulseAudio volume");
- return false;
- }
+ pa_threaded_mainloop_lock(pm->output->mainloop);
ret = pm->online
? (int)((100*(pa_cvolume_avg(&pm->volume)+1))/PA_VOLUME_NORM)
: -1;
- pa_threaded_mainloop_unlock(pm->mainloop);
+ pa_threaded_mainloop_unlock(pm->output->mainloop);
return ret;
}
@@ -368,10 +283,11 @@ pulse_mixer_set_volume(struct mixer *mixer, unsigned volume, GError **error_r)
struct pa_cvolume cvolume;
pa_operation *o;
- pa_threaded_mainloop_lock(pm->mainloop);
+ pa_threaded_mainloop_lock(pm->output->mainloop);
- if (!pm->online) {
- pa_threaded_mainloop_unlock(pm->mainloop);
+ if (!pm->online || pm->output->stream == NULL ||
+ pm->output->context == NULL) {
+ pa_threaded_mainloop_unlock(pm->output->mainloop);
g_set_error(error_r, pulse_mixer_quark(), 0, "disconnected");
return false;
}
@@ -379,9 +295,10 @@ pulse_mixer_set_volume(struct mixer *mixer, unsigned volume, GError **error_r)
pa_cvolume_set(&cvolume, pm->volume.channels,
(pa_volume_t)volume * PA_VOLUME_NORM / 100 + 0.5);
- o = pa_context_set_sink_input_volume(pm->context, pm->index,
+ o = pa_context_set_sink_input_volume(pm->output->context,
+ pa_stream_get_index(pm->output->stream),
&cvolume, NULL, NULL);
- pa_threaded_mainloop_unlock(pm->mainloop);
+ pa_threaded_mainloop_unlock(pm->output->mainloop);
if (o == NULL) {
g_set_error(error_r, pulse_mixer_quark(), 0,
"failed to set PulseAudio volume");
diff --git a/src/output/pulse_output_plugin.c b/src/output/pulse_output_plugin.c
index 1b1c27575..c24a356f2 100644
--- a/src/output/pulse_output_plugin.c
+++ b/src/output/pulse_output_plugin.c
@@ -17,22 +17,20 @@
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
+#include "pulse_output_plugin.h"
#include "output_api.h"
#include "mixer_list.h"
#include <glib.h>
-#include <pulse/simple.h>
-#include <pulse/error.h>
-#define MPD_PULSE_NAME "Music Player Daemon"
+#include <pulse/thread-mainloop.h>
+#include <pulse/context.h>
+#include <pulse/stream.h>
+#include <pulse/error.h>
-struct pulse_data {
- const char *name;
- const char *server;
- const char *sink;
+#include <assert.h>
- pa_simple *s;
-};
+#define MPD_PULSE_NAME "Music Player Daemon"
/**
* The quark used for GError.domain.
@@ -43,76 +41,299 @@ pulse_output_quark(void)
return g_quark_from_static_string("pulse_output");
}
-static struct pulse_data *pulse_new_data(void)
+/**
+ * \brief waits for a pulseaudio operation to finish, frees it and
+ * unlocks the mainloop
+ * \param operation the operation to wait for
+ * \return true if operation has finished normally (DONE state),
+ * false otherwise
+ */
+static bool
+pulse_wait_for_operation(struct pa_threaded_mainloop *mainloop,
+ struct pa_operation *operation)
+{
+ pa_operation_state_t state;
+
+ assert(mainloop != NULL);
+ assert(operation != NULL);
+
+ state = pa_operation_get_state(operation);
+ while (state == PA_OPERATION_RUNNING) {
+ pa_threaded_mainloop_wait(mainloop);
+ state = pa_operation_get_state(operation);
+ }
+
+ pa_operation_unref(operation);
+
+ return state == PA_OPERATION_DONE;
+}
+
+/**
+ * Callback function for stream operation. It just sends a signal to
+ * the caller thread, to wake pulse_wait_for_operation() up.
+ */
+static void
+pulse_output_stream_success_cb(G_GNUC_UNUSED pa_stream *s,
+ G_GNUC_UNUSED int success, void *userdata)
+{
+ struct pulse_output *po = userdata;
+
+ pa_threaded_mainloop_signal(po->mainloop, 0);
+}
+
+void
+pulse_output_context_state_cb(struct pa_context *context, void *userdata)
+{
+ struct pulse_output *po = userdata;
+
+ switch (pa_context_get_state(context)) {
+ case PA_CONTEXT_READY:
+ case PA_CONTEXT_TERMINATED:
+ case PA_CONTEXT_FAILED:
+ /* the caller thread might be waiting for these
+ states */
+ pa_threaded_mainloop_signal(po->mainloop, 0);
+ break;
+
+ case PA_CONTEXT_UNCONNECTED:
+ case PA_CONTEXT_CONNECTING:
+ case PA_CONTEXT_AUTHORIZING:
+ case PA_CONTEXT_SETTING_NAME:
+ break;
+ }
+}
+
+/**
+ * Attempt to connect asynchronously to the PulseAudio server.
+ *
+ * @return true on success, false on error
+ */
+static bool
+pulse_output_connect(struct pulse_output *po, GError **error_r)
{
- struct pulse_data *ret;
+ int error;
- ret = g_new(struct pulse_data, 1);
+ error = pa_context_connect(po->context, po->server,
+ (pa_context_flags_t)0, NULL);
+ if (error < 0) {
+ g_set_error(error_r, pulse_output_quark(), 0,
+ "pa_context_connect() has failed: %s",
+ pa_strerror(pa_context_errno(po->context)));
+ return false;
+ }
- ret->server = NULL;
- ret->sink = NULL;
+ return true;
+}
- return ret;
+/**
+ * Create, set up and connect a context.
+ *
+ * @return true on success, false on error
+ */
+static bool
+pulse_output_setup_context(struct pulse_output *po, GError **error_r)
+{
+ po->context = pa_context_new(pa_threaded_mainloop_get_api(po->mainloop),
+ MPD_PULSE_NAME);
+ if (po->context == NULL) {
+ g_set_error(error_r, pulse_output_quark(), 0,
+ "pa_context_new() has failed");
+ return false;
+ }
+
+ pa_context_set_state_callback(po->context,
+ pulse_output_context_state_cb, po);
+
+ if (!pulse_output_connect(po, error_r)) {
+ pa_context_unref(po->context);
+ return false;
+ }
+
+ return true;
}
-static void pulse_free_data(struct pulse_data *pd)
+/**
+ * Frees and clears the context.
+ */
+static void
+pulse_output_delete_context(struct pulse_output *po)
{
- g_free(pd);
+ pa_context_disconnect(po->context);
+ pa_context_unref(po->context);
+ po->context = NULL;
}
static void *
-pulse_init(G_GNUC_UNUSED const struct audio_format *audio_format,
- const struct config_param *param, G_GNUC_UNUSED GError **error)
+pulse_output_init(G_GNUC_UNUSED const struct audio_format *audio_format,
+ const struct config_param *param, GError **error_r)
{
- struct pulse_data *pd;
+ struct pulse_output *po;
g_setenv("PULSE_PROP_media.role", "music", true);
- pd = pulse_new_data();
- pd->name = config_get_block_string(param, "name", "mpd_pulse");
- pd->server = config_get_block_string(param, "server", NULL);
- pd->sink = config_get_block_string(param, "sink", NULL);
+ po = g_new(struct pulse_output, 1);
+ po->name = config_get_block_string(param, "name", "mpd_pulse");
+ po->server = config_get_block_string(param, "server", NULL);
+ po->sink = config_get_block_string(param, "sink", NULL);
- return pd;
+ /* create the libpulse mainloop and start the thread */
+
+ po->mainloop = pa_threaded_mainloop_new();
+ if (po->mainloop == NULL) {
+ g_free(po);
+
+ g_set_error(error_r, pulse_output_quark(), 0,
+ "pa_threaded_mainloop_new() has failed");
+ return NULL;
+ }
+
+ pa_threaded_mainloop_lock(po->mainloop);
+
+ if (pa_threaded_mainloop_start(po->mainloop) < 0) {
+ pa_threaded_mainloop_unlock(po->mainloop);
+ pa_threaded_mainloop_free(po->mainloop);
+ g_free(po);
+
+ g_set_error(error_r, pulse_output_quark(), 0,
+ "pa_threaded_mainloop_start() has failed");
+ return false;
+ }
+
+ pa_threaded_mainloop_unlock(po->mainloop);
+
+ /* create the libpulse context and connect it */
+
+ pa_threaded_mainloop_lock(po->mainloop);
+
+ if (!pulse_output_setup_context(po, error_r)) {
+ pa_threaded_mainloop_unlock(po->mainloop);
+ pa_threaded_mainloop_stop(po->mainloop);
+ pa_threaded_mainloop_free(po->mainloop);
+ g_free(po);
+ return NULL;
+ }
+
+ pa_threaded_mainloop_unlock(po->mainloop);
+
+ return po;
}
-static void pulse_finish(void *data)
+static void
+pulse_output_finish(void *data)
{
- struct pulse_data *pd = data;
+ struct pulse_output *po = data;
+
+ pa_threaded_mainloop_stop(po->mainloop);
+ if (po->context != NULL)
+ pulse_output_delete_context(po);
+ pa_threaded_mainloop_free(po->mainloop);
- pulse_free_data(pd);
+ g_free(po);
}
-static bool pulse_test_default_device(void)
+/**
+ * Check if the context is (already) connected, and waits if not. If
+ * the context has been disconnected, retry to connect.
+ *
+ * @return true on success, false on error
+ */
+static bool
+pulse_output_wait_connection(struct pulse_output *po, GError **error_r)
{
- pa_simple *s;
- pa_sample_spec ss;
- int error;
+ pa_context_state_t state;
- ss.format = PA_SAMPLE_S16NE;
- ss.rate = 44100;
- ss.channels = 2;
-
- s = pa_simple_new(NULL, MPD_PULSE_NAME, PA_STREAM_PLAYBACK, NULL,
- MPD_PULSE_NAME, &ss, NULL, NULL, &error);
- if (!s) {
- g_message("Cannot connect to default PulseAudio server: %s\n",
- pa_strerror(error));
+ pa_threaded_mainloop_lock(po->mainloop);
+
+ if (po->context == NULL && !pulse_output_setup_context(po, error_r))
return false;
+
+ while (true) {
+ state = pa_context_get_state(po->context);
+ switch (state) {
+ case PA_CONTEXT_READY:
+ /* nothing to do */
+ pa_threaded_mainloop_unlock(po->mainloop);
+ return true;
+
+ case PA_CONTEXT_UNCONNECTED:
+ case PA_CONTEXT_TERMINATED:
+ case PA_CONTEXT_FAILED:
+ /* failure */
+ g_set_error(error_r, pulse_output_quark(), 0,
+ "failed to connect: %s",
+ pa_strerror(pa_context_errno(po->context)));
+ pulse_output_delete_context(po);
+ pa_threaded_mainloop_unlock(po->mainloop);
+ return false;
+
+ case PA_CONTEXT_CONNECTING:
+ case PA_CONTEXT_AUTHORIZING:
+ case PA_CONTEXT_SETTING_NAME:
+ /* wait some more */
+ pa_threaded_mainloop_wait(po->mainloop);
+ break;
+ }
}
+}
- pa_simple_free(s);
+static void
+pulse_output_stream_state_cb(pa_stream *stream, void *userdata)
+{
+ struct pulse_output *po = userdata;
+
+ switch (pa_stream_get_state(stream)) {
+ case PA_STREAM_READY:
+ case PA_STREAM_FAILED:
+ case PA_STREAM_TERMINATED:
+ pa_threaded_mainloop_signal(po->mainloop, 0);
+ break;
+
+ case PA_STREAM_UNCONNECTED:
+ case PA_STREAM_CREATING:
+ break;
+ }
+}
- return true;
+static void
+pulse_output_stream_write_cb(G_GNUC_UNUSED pa_stream *stream, size_t nbytes,
+ void *userdata)
+{
+ struct pulse_output *po = userdata;
+
+ po->writable = nbytes;
+ pa_threaded_mainloop_signal(po->mainloop, 0);
}
static bool
-pulse_open(void *data, struct audio_format *audio_format, GError **error_r)
+pulse_output_open(void *data, struct audio_format *audio_format,
+ GError **error_r)
{
- struct pulse_data *pd = data;
+ struct pulse_output *po = data;
pa_sample_spec ss;
int error;
+ if (po->context != NULL) {
+ switch (pa_context_get_state(po->context)) {
+ case PA_CONTEXT_UNCONNECTED:
+ case PA_CONTEXT_TERMINATED:
+ case PA_CONTEXT_FAILED:
+ /* the connection was closed meanwhile; delete
+ it, and pulse_output_wait_connection() will
+ reopen it */
+ pulse_output_delete_context(po);
+ break;
+
+ case PA_CONTEXT_READY:
+ case PA_CONTEXT_CONNECTING:
+ case PA_CONTEXT_AUTHORIZING:
+ case PA_CONTEXT_SETTING_NAME:
+ break;
+ }
+ }
+
+ if (!pulse_output_wait_connection(po, error_r))
+ return false;
+
/* MPD doesn't support the other pulseaudio sample formats, so
we just force MPD to send us everything as 16 bit */
audio_format->bits = 16;
@@ -121,45 +342,243 @@ pulse_open(void *data, struct audio_format *audio_format, GError **error_r)
ss.rate = audio_format->sample_rate;
ss.channels = audio_format->channels;
- pd->s = pa_simple_new(pd->server, MPD_PULSE_NAME, PA_STREAM_PLAYBACK,
- pd->sink, pd->name,
- &ss, NULL, NULL,
- &error);
- if (!pd->s) {
- g_set_error(error_r, pulse_output_quark(), error,
- "Cannot connect to PulseAudio server: %s",
- pa_strerror(error));
+ pa_threaded_mainloop_lock(po->mainloop);
+
+ /* create a stream .. */
+
+ po->stream = pa_stream_new(po->context, po->name, &ss, NULL);
+ if (po->stream == NULL) {
+ g_set_error(error_r, pulse_output_quark(), 0,
+ "pa_stream_new() has failed: %s",
+ pa_strerror(pa_context_errno(po->context)));
+ pa_threaded_mainloop_unlock(po->mainloop);
+ return false;
+ }
+
+ pa_stream_set_state_callback(po->stream,
+ pulse_output_stream_state_cb, po);
+ pa_stream_set_write_callback(po->stream,
+ pulse_output_stream_write_cb, po);
+
+ /* .. and connect it (asynchronously) */
+
+ error = pa_stream_connect_playback(po->stream, po->sink,
+ NULL, 0, NULL, NULL);
+ if (error < 0) {
+ pa_stream_unref(po->stream);
+ po->stream = NULL;
+
+ g_set_error(error_r, pulse_output_quark(), 0,
+ "pa_stream_connect_playback() has failed: %s",
+ pa_strerror(pa_context_errno(po->context)));
+ pa_threaded_mainloop_unlock(po->mainloop);
return false;
}
+ pa_threaded_mainloop_unlock(po->mainloop);
+
+#if !PA_CHECK_VERSION(0,9,11)
+ po->pause = false;
+#endif
+
return true;
}
-static void pulse_cancel(void *data)
+static void
+pulse_output_close(void *data)
{
- struct pulse_data *pd = data;
- int error;
+ struct pulse_output *po = data;
+ pa_operation *o;
+
+ pa_threaded_mainloop_lock(po->mainloop);
+
+ if (pa_stream_get_state(po->stream) == PA_STREAM_READY) {
+ o = pa_stream_drain(po->stream,
+ pulse_output_stream_success_cb, po);
+ if (o == NULL) {
+ g_warning("pa_stream_drain() has failed: %s",
+ pa_strerror(pa_context_errno(po->context)));
+ } else
+ pulse_wait_for_operation(po->mainloop, o);
+ }
+
+ pa_stream_disconnect(po->stream);
+ pa_stream_unref(po->stream);
+ po->stream = NULL;
- if (pa_simple_flush(pd->s, &error) < 0)
- g_warning("Flush failed in PulseAudio output \"%s\": %s\n",
- pd->name, pa_strerror(error));
+ if (po->context != NULL &&
+ pa_context_get_state(po->context) != PA_CONTEXT_READY)
+ pulse_output_delete_context(po);
+
+ pa_threaded_mainloop_unlock(po->mainloop);
+}
+
+/**
+ * Check if the stream is (already) connected, and waits for a signal
+ * if not. The mainloop must be locked before calling this function.
+ *
+ * @return the current stream state
+ */
+static pa_stream_state_t
+pulse_output_check_stream(struct pulse_output *po)
+{
+ pa_stream_state_t state = pa_stream_get_state(po->stream);
+
+ switch (state) {
+ case PA_STREAM_READY:
+ case PA_STREAM_FAILED:
+ case PA_STREAM_TERMINATED:
+ case PA_STREAM_UNCONNECTED:
+ break;
+
+ case PA_STREAM_CREATING:
+ pa_threaded_mainloop_wait(po->mainloop);
+ state = pa_stream_get_state(po->stream);
+ break;
+ }
+
+ return state;
+}
+
+/**
+ * Check if the stream is (already) connected, and waits if not. The
+ * mainloop must be locked before calling this function.
+ *
+ * @return true on success, false on error
+ */
+static bool
+pulse_output_wait_stream(struct pulse_output *po, GError **error_r)
+{
+ pa_stream_state_t state = pa_stream_get_state(po->stream);
+
+ switch (state) {
+ case PA_STREAM_READY:
+ return true;
+
+ case PA_STREAM_FAILED:
+ case PA_STREAM_TERMINATED:
+ case PA_STREAM_UNCONNECTED:
+ g_set_error(error_r, pulse_output_quark(), 0,
+ "disconnected");
+ return false;
+
+ case PA_STREAM_CREATING:
+ break;
+ }
+
+ do {
+ state = pulse_output_check_stream(po);
+ } while (state == PA_STREAM_CREATING);
+
+ if (state != PA_STREAM_READY) {
+ g_set_error(error_r, pulse_output_quark(), 0,
+ "failed to connect the stream: %s",
+ pa_strerror(pa_context_errno(po->context)));
+ return false;
+ }
+
+ return true;
}
-static void pulse_close(void *data)
+/**
+ * Determines whether the stream is paused. On libpulse older than
+ * 0.9.11, it uses a custom pause flag.
+ */
+static bool
+pulse_output_stream_is_paused(struct pulse_output *po)
{
- struct pulse_data *pd = data;
+ assert(po->stream != NULL);
- pa_simple_drain(pd->s, NULL);
- pa_simple_free(pd->s);
+#if !defined(PA_CHECK_VERSION) || !PA_CHECK_VERSION(0,9,11)
+ return po->pause;
+#else
+ return pa_stream_is_corked(po->stream);
+#endif
+}
+
+/**
+ * Sets cork mode on the stream.
+ */
+static bool
+pulse_output_stream_pause(struct pulse_output *po, bool pause,
+ GError **error_r)
+{
+ pa_operation *o;
+
+ assert(po->stream != NULL);
+
+ o = pa_stream_cork(po->stream, pause,
+ pulse_output_stream_success_cb, po);
+ if (o == NULL) {
+ g_set_error(error_r, pulse_output_quark(), 0,
+ "pa_stream_cork() has failed: %s",
+ pa_strerror(pa_context_errno(po->context)));
+ return false;
+ }
+
+ if (!pulse_wait_for_operation(po->mainloop, o)) {
+ g_set_error(error_r, pulse_output_quark(), 0,
+ "pa_stream_cork() has failed: %s",
+ pa_strerror(pa_context_errno(po->context)));
+ return false;
+ }
+
+#if !PA_CHECK_VERSION(0,9,11)
+ po->pause = pause;
+#endif
+ return true;
}
static size_t
-pulse_play(void *data, const void *chunk, size_t size, GError **error_r)
+pulse_output_play(void *data, const void *chunk, size_t size, GError **error_r)
{
- struct pulse_data *pd = data;
+ struct pulse_output *po = data;
int error;
- if (pa_simple_write(pd->s, chunk, size, &error) < 0) {
+ assert(po->stream != NULL);
+
+ pa_threaded_mainloop_lock(po->mainloop);
+
+ /* check if the stream is (already) connected */
+
+ if (!pulse_output_wait_stream(po, error_r)) {
+ pa_threaded_mainloop_unlock(po->mainloop);
+ return 0;
+ }
+
+ assert(po->context != NULL);
+
+ /* unpause if previously paused */
+
+ if (pulse_output_stream_is_paused(po) &&
+ !pulse_output_stream_pause(po, false, error_r))
+ return 0;
+
+ /* wait until the server allows us to write */
+
+ while (po->writable == 0) {
+ pa_threaded_mainloop_wait(po->mainloop);
+
+ if (pa_stream_get_state(po->stream) != PA_STREAM_READY) {
+ pa_threaded_mainloop_unlock(po->mainloop);
+ g_set_error(error_r, pulse_output_quark(), 0,
+ "disconnected");
+ return false;
+ }
+ }
+
+ /* now write */
+
+ if (size > po->writable)
+ /* don't send more than possible */
+ size = po->writable;
+
+ po->writable -= size;
+
+ error = pa_stream_write(po->stream, chunk, size, NULL,
+ 0, PA_SEEK_RELATIVE);
+ pa_threaded_mainloop_unlock(po->mainloop);
+ if (error < 0) {
g_set_error(error_r, pulse_output_quark(), error,
"%s", pa_strerror(error));
return 0;
@@ -168,16 +587,105 @@ pulse_play(void *data, const void *chunk, size_t size, GError **error_r)
return size;
}
+static void
+pulse_output_cancel(void *data)
+{
+ struct pulse_output *po = data;
+ pa_operation *o;
+
+ assert(po->stream != NULL);
+
+ pa_threaded_mainloop_lock(po->mainloop);
+
+ if (pa_stream_get_state(po->stream) != PA_STREAM_READY) {
+ /* no need to flush when the stream isn't connected
+ yet */
+ pa_threaded_mainloop_unlock(po->mainloop);
+ return;
+ }
+
+ assert(po->context != NULL);
+
+ o = pa_stream_flush(po->stream, pulse_output_stream_success_cb, po);
+ if (o == NULL) {
+ g_warning("pa_stream_flush() has failed: %s",
+ pa_strerror(pa_context_errno(po->context)));
+ pa_threaded_mainloop_unlock(po->mainloop);
+ return;
+ }
+
+ pulse_wait_for_operation(po->mainloop, o);
+ pa_threaded_mainloop_unlock(po->mainloop);
+}
+
+static bool
+pulse_output_pause(void *data)
+{
+ struct pulse_output *po = data;
+ GError *error = NULL;
+
+ assert(po->stream != NULL);
+
+ pa_threaded_mainloop_lock(po->mainloop);
+
+ /* check if the stream is (already/still) connected */
+
+ if (!pulse_output_wait_stream(po, &error)) {
+ pa_threaded_mainloop_unlock(po->mainloop);
+ g_warning("%s", error->message);
+ g_error_free(error);
+ return false;
+ }
+
+ assert(po->context != NULL);
+
+ /* cork the stream */
+
+ if (pulse_output_stream_is_paused(po)) {
+ /* already paused; due to a MPD API limitation, we
+ have to sleep a little bit here, to avoid hogging
+ the CPU */
+
+ g_usleep(50000);
+ } else if (!pulse_output_stream_pause(po, true, &error)) {
+ pa_threaded_mainloop_unlock(po->mainloop);
+ g_warning("%s", error->message);
+ g_error_free(error);
+ return false;
+ }
+
+ pa_threaded_mainloop_unlock(po->mainloop);
+
+ return true;
+}
+
+static bool
+pulse_output_test_default_device(void)
+{
+ struct pulse_output *po;
+ bool success;
+
+ po = pulse_output_init(NULL, NULL, NULL);
+ if (po == NULL)
+ return false;
+
+ success = pulse_output_wait_connection(po, NULL);
+ pulse_output_finish(po);
+
+ return success;
+}
+
const struct audio_output_plugin pulse_output_plugin = {
.name = "pulse",
- .test_default_device = pulse_test_default_device,
- .init = pulse_init,
- .finish = pulse_finish,
- .open = pulse_open,
- .play = pulse_play,
- .cancel = pulse_cancel,
- .close = pulse_close,
+ .test_default_device = pulse_output_test_default_device,
+ .init = pulse_output_init,
+ .finish = pulse_output_finish,
+ .open = pulse_output_open,
+ .play = pulse_output_play,
+ .cancel = pulse_output_cancel,
+ .pause = pulse_output_pause,
+ .close = pulse_output_close,
.mixer_plugin = &pulse_mixer_plugin,
};
diff --git a/src/output/pulse_output_plugin.h b/src/output/pulse_output_plugin.h
new file mode 100644
index 000000000..fc2a7d4d5
--- /dev/null
+++ b/src/output/pulse_output_plugin.h
@@ -0,0 +1,58 @@
+/*
+ * Copyright (C) 2003-2009 The Music Player Daemon Project
+ * http://www.musicpd.org
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+
+#ifndef MPD_PULSE_OUTPUT_PLUGIN_H
+#define MPD_PULSE_OUTPUT_PLUGIN_H
+
+#include <stdbool.h>
+#include <stddef.h>
+
+#if !defined(PA_CHECK_VERSION)
+/**
+ * This macro was implemented in libpulse 0.9.16.
+ */
+#define PA_CHECK_VERSION(a,b,c) false
+#endif
+
+struct pa_operation;
+
+struct pulse_output {
+ const char *name;
+ const char *server;
+ const char *sink;
+
+ struct pa_threaded_mainloop *mainloop;
+ struct pa_context *context;
+ struct pa_stream *stream;
+
+ size_t writable;
+
+#if !PA_CHECK_VERSION(0,9,11)
+ /**
+ * We need this variable because pa_stream_is_corked() wasn't
+ * added before 0.9.11.
+ */
+ bool pause;
+#endif
+};
+
+void
+pulse_output_context_state_cb(struct pa_context *context, void *userdata);
+
+#endif
diff --git a/test/read_mixer.c b/test/read_mixer.c
index fdf6b7fe1..1bf40bd5b 100644
--- a/test/read_mixer.c
+++ b/test/read_mixer.c
@@ -21,6 +21,8 @@
#include "mixer_list.h"
#include "filter_registry.h"
#include "pcm_volume.h"
+#include "output/pulse_output_plugin.h"
+#include "event_pipe.h"
#include <glib.h>
@@ -28,6 +30,17 @@
#include <string.h>
#include <unistd.h>
+void
+pulse_output_context_state_cb(G_GNUC_UNUSED struct pa_context *context,
+ G_GNUC_UNUSED void *userdata)
+{
+}
+
+void
+event_pipe_emit(G_GNUC_UNUSED enum pipe_event event)
+{
+}
+
const struct filter_plugin *
filter_plugin_by_name(G_GNUC_UNUSED const char *name)
{
diff --git a/test/run_output.c b/test/run_output.c
index 5ab9625e8..3731b6c09 100644
--- a/test/run_output.c
+++ b/test/run_output.c
@@ -24,6 +24,7 @@
#include "audio_parser.h"
#include "filter_registry.h"
#include "pcm_convert.h"
+#include "event_pipe.h"
#include <glib.h>
@@ -31,6 +32,11 @@
#include <string.h>
#include <unistd.h>
+void
+event_pipe_emit(G_GNUC_UNUSED enum pipe_event event)
+{
+}
+
void pcm_convert_init(G_GNUC_UNUSED struct pcm_convert_state *state)
{
}