aboutsummaryrefslogtreecommitdiffstats
path: root/src/output/HttpdOutputPlugin.cxx
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/output/HttpdOutputPlugin.cxx218
1 files changed, 127 insertions, 91 deletions
diff --git a/src/output/HttpdOutputPlugin.cxx b/src/output/HttpdOutputPlugin.cxx
index 369c06937..4cd2b4ae8 100644
--- a/src/output/HttpdOutputPlugin.cxx
+++ b/src/output/HttpdOutputPlugin.cxx
@@ -28,13 +28,12 @@
#include "Page.hxx"
#include "IcyMetaDataServer.hxx"
#include "system/fd_util.h"
-#include "Main.hxx"
+#include "IOThread.hxx"
+#include "event/Call.hxx"
#include "util/Error.hxx"
#include "util/Domain.hxx"
#include "Log.hxx"
-#include <glib.h>
-
#include <assert.h>
#include <sys/types.h>
@@ -51,7 +50,7 @@ const Domain httpd_output_domain("httpd_output");
inline
HttpdOutput::HttpdOutput(EventLoop &_loop)
- :ServerSocket(_loop),
+ :ServerSocket(_loop), DeferredMonitor(_loop),
encoder(nullptr), unflushed_input(0),
metadata(nullptr)
{
@@ -72,8 +71,11 @@ HttpdOutput::Bind(Error &error)
{
open = false;
- const ScopeLock protect(mutex);
- return ServerSocket::Open(error);
+ bool result = false;
+ BlockingCall(GetEventLoop(), [this, &error, &result](){
+ result = ServerSocket::Open(error);
+ });
+ return result;
}
inline void
@@ -81,8 +83,9 @@ HttpdOutput::Unbind()
{
assert(!open);
- const ScopeLock protect(mutex);
- ServerSocket::Close();
+ BlockingCall(GetEventLoop(), [this](){
+ ServerSocket::Close();
+ });
}
inline bool
@@ -130,47 +133,30 @@ HttpdOutput::Configure(const config_param &param, Error &error)
return true;
}
+inline bool
+HttpdOutput::Init(const config_param &param, Error &error)
+{
+ return ao_base_init(&base, &httpd_output_plugin, param, error);
+}
+
static struct audio_output *
httpd_output_init(const config_param &param, Error &error)
{
- HttpdOutput *httpd = new HttpdOutput(*main_loop);
+ HttpdOutput *httpd = new HttpdOutput(io_thread_get());
- if (!ao_base_init(&httpd->base, &httpd_output_plugin, param,
- error)) {
+ audio_output *result = httpd->InitAndConfigure(param, error);
+ if (result == nullptr)
delete httpd;
- return nullptr;
- }
- if (!httpd->Configure(param, error)) {
- ao_base_finish(&httpd->base);
- delete httpd;
- return nullptr;
- }
-
- return &httpd->base;
-}
-
-#if GCC_CHECK_VERSION(4,6) || defined(__clang__)
-#pragma GCC diagnostic push
-#pragma GCC diagnostic ignored "-Winvalid-offsetof"
-#endif
-
-static inline constexpr HttpdOutput *
-Cast(audio_output *ao)
-{
- return (HttpdOutput *)((char *)ao - offsetof(HttpdOutput, base));
+ return result;
}
-#if GCC_CHECK_VERSION(4,6) || defined(__clang__)
-#pragma GCC diagnostic pop
-#endif
-
static void
httpd_output_finish(struct audio_output *ao)
{
- HttpdOutput *httpd = Cast(ao);
+ HttpdOutput *httpd = HttpdOutput::Cast(ao);
- ao_base_finish(&httpd->base);
+ httpd->Finish();
delete httpd;
}
@@ -181,7 +167,7 @@ httpd_output_finish(struct audio_output *ao)
inline void
HttpdOutput::AddClient(int fd)
{
- clients.emplace_front(this, fd, GetEventLoop(),
+ clients.emplace_front(*this, fd, GetEventLoop(),
encoder->plugin.tag == nullptr);
++clients_cnt;
@@ -191,6 +177,29 @@ HttpdOutput::AddClient(int fd)
}
void
+HttpdOutput::RunDeferred()
+{
+ /* this method runs in the IOThread; it broadcasts pages from
+ our own queue to all clients */
+
+ const ScopeLock protect(mutex);
+
+ while (!pages.empty()) {
+ Page *page = pages.front();
+ pages.pop();
+
+ for (auto &client : clients)
+ client.PushPage(page);
+
+ page->Unref();
+ }
+
+ /* wake up the client that may be waiting for the queue to be
+ flushed */
+ cond.broadcast();
+}
+
+void
HttpdOutput::OnAccept(int fd, const sockaddr &address,
size_t address_length, gcc_unused int uid)
{
@@ -199,9 +208,10 @@ HttpdOutput::OnAccept(int fd, const sockaddr &address,
#ifdef HAVE_LIBWRAP
if (address.sa_family != AF_UNIX) {
- char *hostaddr = sockaddr_to_string(&address, address_length,
- IgnoreError());
- const char *progname = g_get_prgname();
+ const auto hostaddr = sockaddr_to_string(&address,
+ address_length);
+ // TODO: shall we obtain the program name from argv[0]?
+ const char *progname = "mpd";
struct request_info req;
request_init(&req, RQ_FILE, fd, RQ_DAEMON, progname, 0);
@@ -212,13 +222,10 @@ HttpdOutput::OnAccept(int fd, const sockaddr &address,
/* tcp wrappers says no */
FormatWarning(httpd_output_domain,
"libwrap refused connection (libwrap=%s) from %s",
- progname, hostaddr);
- g_free(hostaddr);
+ progname, hostaddr.c_str());
close_socket(fd);
return;
}
-
- g_free(hostaddr);
}
#else
(void)address;
@@ -271,7 +278,7 @@ HttpdOutput::ReadPage()
static bool
httpd_output_enable(struct audio_output *ao, Error &error)
{
- HttpdOutput *httpd = Cast(ao);
+ HttpdOutput *httpd = HttpdOutput::Cast(ao);
return httpd->Bind(error);
}
@@ -279,7 +286,7 @@ httpd_output_enable(struct audio_output *ao, Error &error)
static void
httpd_output_disable(struct audio_output *ao)
{
- HttpdOutput *httpd = Cast(ao);
+ HttpdOutput *httpd = HttpdOutput::Cast(ao);
httpd->Unbind();
}
@@ -325,9 +332,7 @@ static bool
httpd_output_open(struct audio_output *ao, AudioFormat &audio_format,
Error &error)
{
- HttpdOutput *httpd = Cast(ao);
-
- assert(httpd->clients.empty());
+ HttpdOutput *httpd = HttpdOutput::Cast(ao);
const ScopeLock protect(httpd->mutex);
return httpd->Open(audio_format, error);
@@ -342,7 +347,9 @@ HttpdOutput::Close()
delete timer;
- clients.clear();
+ BlockingCall(GetEventLoop(), [this](){
+ clients.clear();
+ });
if (header != nullptr)
header->Unref();
@@ -353,7 +360,7 @@ HttpdOutput::Close()
static void
httpd_output_close(struct audio_output *ao)
{
- HttpdOutput *httpd = Cast(ao);
+ HttpdOutput *httpd = HttpdOutput::Cast(ao);
const ScopeLock protect(httpd->mutex);
httpd->Close();
@@ -382,17 +389,15 @@ HttpdOutput::SendHeader(HttpdClient &client) const
client.PushPage(header);
}
-static unsigned
-httpd_output_delay(struct audio_output *ao)
+inline unsigned
+HttpdOutput::Delay() const
{
- HttpdOutput *httpd = Cast(ao);
-
- if (!httpd->LockHasClients() && httpd->base.pause) {
+ if (!LockHasClients() && base.pause) {
/* if there's no client and this output is paused,
then httpd_output_pause() will not do anything, it
will not fill the buffer and it will not update the
timer; therefore, we reset the timer here */
- httpd->timer->Reset();
+ timer->Reset();
/* some arbitrary delay that is long enough to avoid
consuming too much CPU, and short enough to notice
@@ -400,39 +405,47 @@ httpd_output_delay(struct audio_output *ao)
return 1000;
}
- return httpd->timer->IsStarted()
- ? httpd->timer->GetDelay()
+ return timer->IsStarted()
+ ? timer->GetDelay()
: 0;
}
+static unsigned
+httpd_output_delay(struct audio_output *ao)
+{
+ HttpdOutput *httpd = HttpdOutput::Cast(ao);
+
+ return httpd->Delay();
+}
+
void
HttpdOutput::BroadcastPage(Page *page)
{
assert(page != nullptr);
- const ScopeLock protect(mutex);
- for (auto &client : clients)
- client.PushPage(page);
+ mutex.lock();
+ pages.push(page);
+ page->Ref();
+ mutex.unlock();
+
+ DeferredMonitor::Schedule();
}
void
HttpdOutput::BroadcastFromEncoder()
{
+ /* synchronize with the IOThread */
mutex.lock();
- for (auto &client : clients) {
- if (client.GetQueueSize() > 256 * 1024) {
- FormatDebug(httpd_output_domain,
- "client is too slow, flushing its queue");
- client.CancelQueue();
- }
- }
- mutex.unlock();
+ while (!pages.empty())
+ cond.wait(mutex);
Page *page;
- while ((page = ReadPage()) != nullptr) {
- BroadcastPage(page);
- page->Unref();
- }
+ while ((page = ReadPage()) != nullptr)
+ pages.push(page);
+
+ mutex.unlock();
+
+ DeferredMonitor::Schedule();
}
inline bool
@@ -447,28 +460,34 @@ HttpdOutput::EncodeAndPlay(const void *chunk, size_t size, Error &error)
return true;
}
-static size_t
-httpd_output_play(struct audio_output *ao, const void *chunk, size_t size,
- Error &error)
+inline size_t
+HttpdOutput::Play(const void *chunk, size_t size, Error &error)
{
- HttpdOutput *httpd = Cast(ao);
-
- if (httpd->LockHasClients()) {
- if (!httpd->EncodeAndPlay(chunk, size, error))
+ if (LockHasClients()) {
+ if (!EncodeAndPlay(chunk, size, error))
return 0;
}
- if (!httpd->timer->IsStarted())
- httpd->timer->Start();
- httpd->timer->Add(size);
+ if (!timer->IsStarted())
+ timer->Start();
+ timer->Add(size);
return size;
}
+static size_t
+httpd_output_play(struct audio_output *ao, const void *chunk, size_t size,
+ Error &error)
+{
+ HttpdOutput *httpd = HttpdOutput::Cast(ao);
+
+ return httpd->Play(chunk, size, error);
+}
+
static bool
httpd_output_pause(struct audio_output *ao)
{
- HttpdOutput *httpd = Cast(ao);
+ HttpdOutput *httpd = HttpdOutput::Cast(ao);
if (httpd->LockHasClients()) {
static const char silence[1020] = { 0 };
@@ -531,19 +550,36 @@ HttpdOutput::SendTag(const Tag *tag)
static void
httpd_output_tag(struct audio_output *ao, const Tag *tag)
{
- HttpdOutput *httpd = Cast(ao);
+ HttpdOutput *httpd = HttpdOutput::Cast(ao);
httpd->SendTag(tag);
}
+inline void
+HttpdOutput::CancelAllClients()
+{
+ const ScopeLock protect(mutex);
+
+ while (!pages.empty()) {
+ Page *page = pages.front();
+ pages.pop();
+ page->Unref();
+ }
+
+ for (auto &client : clients)
+ client.CancelQueue();
+
+ cond.broadcast();
+}
+
static void
httpd_output_cancel(struct audio_output *ao)
{
- HttpdOutput *httpd = Cast(ao);
+ HttpdOutput *httpd = HttpdOutput::Cast(ao);
- const ScopeLock protect(httpd->mutex);
- for (auto &client : httpd->clients)
- client.CancelQueue();
+ BlockingCall(io_thread_get(), [httpd](){
+ httpd->CancelAllClients();
+ });
}
const struct audio_output_plugin httpd_output_plugin = {