diff options
Diffstat (limited to '')
-rw-r--r-- | src/output/HttpdOutputPlugin.cxx | 220 |
1 files changed, 128 insertions, 92 deletions
diff --git a/src/output/HttpdOutputPlugin.cxx b/src/output/HttpdOutputPlugin.cxx index 369c06937..388d967df 100644 --- a/src/output/HttpdOutputPlugin.cxx +++ b/src/output/HttpdOutputPlugin.cxx @@ -1,5 +1,5 @@ /* - * Copyright (C) 2003-2013 The Music Player Daemon Project + * Copyright (C) 2003-2014 The Music Player Daemon Project * http://www.musicpd.org * * This program is free software; you can redistribute it and/or modify @@ -28,13 +28,12 @@ #include "Page.hxx" #include "IcyMetaDataServer.hxx" #include "system/fd_util.h" -#include "Main.hxx" +#include "IOThread.hxx" +#include "event/Call.hxx" #include "util/Error.hxx" #include "util/Domain.hxx" #include "Log.hxx" -#include <glib.h> - #include <assert.h> #include <sys/types.h> @@ -51,7 +50,7 @@ const Domain httpd_output_domain("httpd_output"); inline HttpdOutput::HttpdOutput(EventLoop &_loop) - :ServerSocket(_loop), + :ServerSocket(_loop), DeferredMonitor(_loop), encoder(nullptr), unflushed_input(0), metadata(nullptr) { @@ -72,8 +71,11 @@ HttpdOutput::Bind(Error &error) { open = false; - const ScopeLock protect(mutex); - return ServerSocket::Open(error); + bool result = false; + BlockingCall(GetEventLoop(), [this, &error, &result](){ + result = ServerSocket::Open(error); + }); + return result; } inline void @@ -81,8 +83,9 @@ HttpdOutput::Unbind() { assert(!open); - const ScopeLock protect(mutex); - ServerSocket::Close(); + BlockingCall(GetEventLoop(), [this](){ + ServerSocket::Close(); + }); } inline bool @@ -130,47 +133,30 @@ HttpdOutput::Configure(const config_param ¶m, Error &error) return true; } +inline bool +HttpdOutput::Init(const config_param ¶m, Error &error) +{ + return ao_base_init(&base, &httpd_output_plugin, param, error); +} + static struct audio_output * httpd_output_init(const config_param ¶m, Error &error) { - HttpdOutput *httpd = new HttpdOutput(*main_loop); + HttpdOutput *httpd = new HttpdOutput(io_thread_get()); - if (!ao_base_init(&httpd->base, &httpd_output_plugin, param, - error)) { + audio_output *result = httpd->InitAndConfigure(param, error); + if (result == nullptr) delete httpd; - return nullptr; - } - if (!httpd->Configure(param, error)) { - ao_base_finish(&httpd->base); - delete httpd; - return nullptr; - } - - return &httpd->base; -} - -#if GCC_CHECK_VERSION(4,6) || defined(__clang__) -#pragma GCC diagnostic push -#pragma GCC diagnostic ignored "-Winvalid-offsetof" -#endif - -static inline constexpr HttpdOutput * -Cast(audio_output *ao) -{ - return (HttpdOutput *)((char *)ao - offsetof(HttpdOutput, base)); + return result; } -#if GCC_CHECK_VERSION(4,6) || defined(__clang__) -#pragma GCC diagnostic pop -#endif - static void httpd_output_finish(struct audio_output *ao) { - HttpdOutput *httpd = Cast(ao); + HttpdOutput *httpd = HttpdOutput::Cast(ao); - ao_base_finish(&httpd->base); + httpd->Finish(); delete httpd; } @@ -181,7 +167,7 @@ httpd_output_finish(struct audio_output *ao) inline void HttpdOutput::AddClient(int fd) { - clients.emplace_front(this, fd, GetEventLoop(), + clients.emplace_front(*this, fd, GetEventLoop(), encoder->plugin.tag == nullptr); ++clients_cnt; @@ -191,6 +177,29 @@ HttpdOutput::AddClient(int fd) } void +HttpdOutput::RunDeferred() +{ + /* this method runs in the IOThread; it broadcasts pages from + our own queue to all clients */ + + const ScopeLock protect(mutex); + + while (!pages.empty()) { + Page *page = pages.front(); + pages.pop(); + + for (auto &client : clients) + client.PushPage(page); + + page->Unref(); + } + + /* wake up the client that may be waiting for the queue to be + flushed */ + cond.broadcast(); +} + +void HttpdOutput::OnAccept(int fd, const sockaddr &address, size_t address_length, gcc_unused int uid) { @@ -199,9 +208,10 @@ HttpdOutput::OnAccept(int fd, const sockaddr &address, #ifdef HAVE_LIBWRAP if (address.sa_family != AF_UNIX) { - char *hostaddr = sockaddr_to_string(&address, address_length, - IgnoreError()); - const char *progname = g_get_prgname(); + const auto hostaddr = sockaddr_to_string(&address, + address_length); + // TODO: shall we obtain the program name from argv[0]? + const char *progname = "mpd"; struct request_info req; request_init(&req, RQ_FILE, fd, RQ_DAEMON, progname, 0); @@ -212,13 +222,10 @@ HttpdOutput::OnAccept(int fd, const sockaddr &address, /* tcp wrappers says no */ FormatWarning(httpd_output_domain, "libwrap refused connection (libwrap=%s) from %s", - progname, hostaddr); - g_free(hostaddr); + progname, hostaddr.c_str()); close_socket(fd); return; } - - g_free(hostaddr); } #else (void)address; @@ -271,7 +278,7 @@ HttpdOutput::ReadPage() static bool httpd_output_enable(struct audio_output *ao, Error &error) { - HttpdOutput *httpd = Cast(ao); + HttpdOutput *httpd = HttpdOutput::Cast(ao); return httpd->Bind(error); } @@ -279,7 +286,7 @@ httpd_output_enable(struct audio_output *ao, Error &error) static void httpd_output_disable(struct audio_output *ao) { - HttpdOutput *httpd = Cast(ao); + HttpdOutput *httpd = HttpdOutput::Cast(ao); httpd->Unbind(); } @@ -325,9 +332,7 @@ static bool httpd_output_open(struct audio_output *ao, AudioFormat &audio_format, Error &error) { - HttpdOutput *httpd = Cast(ao); - - assert(httpd->clients.empty()); + HttpdOutput *httpd = HttpdOutput::Cast(ao); const ScopeLock protect(httpd->mutex); return httpd->Open(audio_format, error); @@ -342,7 +347,9 @@ HttpdOutput::Close() delete timer; - clients.clear(); + BlockingCall(GetEventLoop(), [this](){ + clients.clear(); + }); if (header != nullptr) header->Unref(); @@ -353,7 +360,7 @@ HttpdOutput::Close() static void httpd_output_close(struct audio_output *ao) { - HttpdOutput *httpd = Cast(ao); + HttpdOutput *httpd = HttpdOutput::Cast(ao); const ScopeLock protect(httpd->mutex); httpd->Close(); @@ -382,17 +389,15 @@ HttpdOutput::SendHeader(HttpdClient &client) const client.PushPage(header); } -static unsigned -httpd_output_delay(struct audio_output *ao) +inline unsigned +HttpdOutput::Delay() const { - HttpdOutput *httpd = Cast(ao); - - if (!httpd->LockHasClients() && httpd->base.pause) { + if (!LockHasClients() && base.pause) { /* if there's no client and this output is paused, then httpd_output_pause() will not do anything, it will not fill the buffer and it will not update the timer; therefore, we reset the timer here */ - httpd->timer->Reset(); + timer->Reset(); /* some arbitrary delay that is long enough to avoid consuming too much CPU, and short enough to notice @@ -400,39 +405,47 @@ httpd_output_delay(struct audio_output *ao) return 1000; } - return httpd->timer->IsStarted() - ? httpd->timer->GetDelay() + return timer->IsStarted() + ? timer->GetDelay() : 0; } +static unsigned +httpd_output_delay(struct audio_output *ao) +{ + HttpdOutput *httpd = HttpdOutput::Cast(ao); + + return httpd->Delay(); +} + void HttpdOutput::BroadcastPage(Page *page) { assert(page != nullptr); - const ScopeLock protect(mutex); - for (auto &client : clients) - client.PushPage(page); + mutex.lock(); + pages.push(page); + page->Ref(); + mutex.unlock(); + + DeferredMonitor::Schedule(); } void HttpdOutput::BroadcastFromEncoder() { + /* synchronize with the IOThread */ mutex.lock(); - for (auto &client : clients) { - if (client.GetQueueSize() > 256 * 1024) { - FormatDebug(httpd_output_domain, - "client is too slow, flushing its queue"); - client.CancelQueue(); - } - } - mutex.unlock(); + while (!pages.empty()) + cond.wait(mutex); Page *page; - while ((page = ReadPage()) != nullptr) { - BroadcastPage(page); - page->Unref(); - } + while ((page = ReadPage()) != nullptr) + pages.push(page); + + mutex.unlock(); + + DeferredMonitor::Schedule(); } inline bool @@ -447,28 +460,34 @@ HttpdOutput::EncodeAndPlay(const void *chunk, size_t size, Error &error) return true; } -static size_t -httpd_output_play(struct audio_output *ao, const void *chunk, size_t size, - Error &error) +inline size_t +HttpdOutput::Play(const void *chunk, size_t size, Error &error) { - HttpdOutput *httpd = Cast(ao); - - if (httpd->LockHasClients()) { - if (!httpd->EncodeAndPlay(chunk, size, error)) + if (LockHasClients()) { + if (!EncodeAndPlay(chunk, size, error)) return 0; } - if (!httpd->timer->IsStarted()) - httpd->timer->Start(); - httpd->timer->Add(size); + if (!timer->IsStarted()) + timer->Start(); + timer->Add(size); return size; } +static size_t +httpd_output_play(struct audio_output *ao, const void *chunk, size_t size, + Error &error) +{ + HttpdOutput *httpd = HttpdOutput::Cast(ao); + + return httpd->Play(chunk, size, error); +} + static bool httpd_output_pause(struct audio_output *ao) { - HttpdOutput *httpd = Cast(ao); + HttpdOutput *httpd = HttpdOutput::Cast(ao); if (httpd->LockHasClients()) { static const char silence[1020] = { 0 }; @@ -531,19 +550,36 @@ HttpdOutput::SendTag(const Tag *tag) static void httpd_output_tag(struct audio_output *ao, const Tag *tag) { - HttpdOutput *httpd = Cast(ao); + HttpdOutput *httpd = HttpdOutput::Cast(ao); httpd->SendTag(tag); } +inline void +HttpdOutput::CancelAllClients() +{ + const ScopeLock protect(mutex); + + while (!pages.empty()) { + Page *page = pages.front(); + pages.pop(); + page->Unref(); + } + + for (auto &client : clients) + client.CancelQueue(); + + cond.broadcast(); +} + static void httpd_output_cancel(struct audio_output *ao) { - HttpdOutput *httpd = Cast(ao); + HttpdOutput *httpd = HttpdOutput::Cast(ao); - const ScopeLock protect(httpd->mutex); - for (auto &client : httpd->clients) - client.CancelQueue(); + BlockingCall(io_thread_get(), [httpd](){ + httpd->CancelAllClients(); + }); } const struct audio_output_plugin httpd_output_plugin = { |