aboutsummaryrefslogtreecommitdiffstats
path: root/src/output/HttpdOutputPlugin.cxx
diff options
context:
space:
mode:
authorMax Kellermann <max@duempel.org>2013-12-31 16:31:36 +0100
committerMax Kellermann <max@duempel.org>2014-01-04 18:22:55 +0100
commitc9da3363a04bfdf8a0f0b2d974291e422a0345d2 (patch)
treefd79aa539d93da5c12d35f05a104a4c8e82258b9 /src/output/HttpdOutputPlugin.cxx
parent9bd4ed3e60df771241114ed460b8b5943ff57982 (diff)
downloadmpd-c9da3363a04bfdf8a0f0b2d974291e422a0345d2.tar.gz
mpd-c9da3363a04bfdf8a0f0b2d974291e422a0345d2.tar.xz
mpd-c9da3363a04bfdf8a0f0b2d974291e422a0345d2.zip
output/httpd: move all broadcast operations to the IOThread
Add a Page queue to class HttpdOutput, and use DeferredMonitor to flush this queue inside the IOThread. This fixes a thread-safety issue: much of EventLoop is not thread-safe, and the httpd plugin ignored that problem.
Diffstat (limited to 'src/output/HttpdOutputPlugin.cxx')
-rw-r--r--src/output/HttpdOutputPlugin.cxx66
1 files changed, 56 insertions, 10 deletions
diff --git a/src/output/HttpdOutputPlugin.cxx b/src/output/HttpdOutputPlugin.cxx
index 2790dd98d..fa6934ff0 100644
--- a/src/output/HttpdOutputPlugin.cxx
+++ b/src/output/HttpdOutputPlugin.cxx
@@ -29,6 +29,7 @@
#include "IcyMetaDataServer.hxx"
#include "system/fd_util.h"
#include "IOThread.hxx"
+#include "event/Call.hxx"
#include "util/Error.hxx"
#include "util/Domain.hxx"
#include "Log.hxx"
@@ -49,7 +50,7 @@ const Domain httpd_output_domain("httpd_output");
inline
HttpdOutput::HttpdOutput(EventLoop &_loop)
- :ServerSocket(_loop),
+ :ServerSocket(_loop), DeferredMonitor(_loop),
encoder(nullptr), unflushed_input(0),
metadata(nullptr)
{
@@ -162,7 +163,7 @@ httpd_output_finish(struct audio_output *ao)
inline void
HttpdOutput::AddClient(int fd)
{
- clients.emplace_front(*this, fd, GetEventLoop(),
+ clients.emplace_front(*this, fd, ServerSocket::GetEventLoop(),
encoder->plugin.tag == nullptr);
++clients_cnt;
@@ -172,6 +173,29 @@ HttpdOutput::AddClient(int fd)
}
void
+HttpdOutput::RunDeferred()
+{
+ /* this method runs in the IOThread; it broadcasts pages from
+ our own queue to all clients */
+
+ const ScopeLock protect(mutex);
+
+ while (!pages.empty()) {
+ Page *page = pages.front();
+ pages.pop();
+
+ for (auto &client : clients)
+ client.PushPage(page);
+
+ page->Unref();
+ }
+
+ /* wake up the client that may be waiting for the queue to be
+ flushed */
+ cond.broadcast();
+}
+
+void
HttpdOutput::OnAccept(int fd, const sockaddr &address,
size_t address_length, gcc_unused int uid)
{
@@ -393,19 +417,29 @@ HttpdOutput::BroadcastPage(Page *page)
{
assert(page != nullptr);
- const ScopeLock protect(mutex);
- for (auto &client : clients)
- client.PushPage(page);
+ mutex.lock();
+ pages.push(page);
+ page->Ref();
+ mutex.unlock();
+
+ DeferredMonitor::Schedule();
}
void
HttpdOutput::BroadcastFromEncoder()
{
+ /* synchronize with the IOThread */
+ mutex.lock();
+ while (!pages.empty())
+ cond.wait(mutex);
+
Page *page;
- while ((page = ReadPage()) != nullptr) {
- BroadcastPage(page);
- page->Unref();
- }
+ while ((page = ReadPage()) != nullptr)
+ pages.push(page);
+
+ mutex.unlock();
+
+ DeferredMonitor::Schedule();
}
inline bool
@@ -519,15 +553,27 @@ inline void
HttpdOutput::CancelAllClients()
{
const ScopeLock protect(mutex);
+
+ while (!pages.empty()) {
+ Page *page = pages.front();
+ pages.pop();
+ page->Unref();
+ }
+
for (auto &client : clients)
client.CancelQueue();
+
+ cond.broadcast();
}
static void
httpd_output_cancel(struct audio_output *ao)
{
HttpdOutput *httpd = HttpdOutput::Cast(ao);
- httpd->CancelAllClients();
+
+ BlockingCall(io_thread_get(), [httpd](){
+ httpd->CancelAllClients();
+ });
}
const struct audio_output_plugin httpd_output_plugin = {