diff options
author | Max Kellermann <max@duempel.org> | 2013-12-31 16:31:36 +0100 |
---|---|---|
committer | Max Kellermann <max@duempel.org> | 2014-01-04 18:22:55 +0100 |
commit | c9da3363a04bfdf8a0f0b2d974291e422a0345d2 (patch) | |
tree | fd79aa539d93da5c12d35f05a104a4c8e82258b9 /src/output/HttpdOutputPlugin.cxx | |
parent | 9bd4ed3e60df771241114ed460b8b5943ff57982 (diff) | |
download | mpd-c9da3363a04bfdf8a0f0b2d974291e422a0345d2.tar.gz mpd-c9da3363a04bfdf8a0f0b2d974291e422a0345d2.tar.xz mpd-c9da3363a04bfdf8a0f0b2d974291e422a0345d2.zip |
output/httpd: move all broadcast operations to the IOThread
Add a Page queue to class HttpdOutput, and use DeferredMonitor to
flush this queue inside the IOThread. This fixes a thread-safety
issue: much of EventLoop is not thread-safe, and the httpd plugin
ignored that problem.
Diffstat (limited to 'src/output/HttpdOutputPlugin.cxx')
-rw-r--r-- | src/output/HttpdOutputPlugin.cxx | 66 |
1 files changed, 56 insertions, 10 deletions
diff --git a/src/output/HttpdOutputPlugin.cxx b/src/output/HttpdOutputPlugin.cxx index 2790dd98d..fa6934ff0 100644 --- a/src/output/HttpdOutputPlugin.cxx +++ b/src/output/HttpdOutputPlugin.cxx @@ -29,6 +29,7 @@ #include "IcyMetaDataServer.hxx" #include "system/fd_util.h" #include "IOThread.hxx" +#include "event/Call.hxx" #include "util/Error.hxx" #include "util/Domain.hxx" #include "Log.hxx" @@ -49,7 +50,7 @@ const Domain httpd_output_domain("httpd_output"); inline HttpdOutput::HttpdOutput(EventLoop &_loop) - :ServerSocket(_loop), + :ServerSocket(_loop), DeferredMonitor(_loop), encoder(nullptr), unflushed_input(0), metadata(nullptr) { @@ -162,7 +163,7 @@ httpd_output_finish(struct audio_output *ao) inline void HttpdOutput::AddClient(int fd) { - clients.emplace_front(*this, fd, GetEventLoop(), + clients.emplace_front(*this, fd, ServerSocket::GetEventLoop(), encoder->plugin.tag == nullptr); ++clients_cnt; @@ -172,6 +173,29 @@ HttpdOutput::AddClient(int fd) } void +HttpdOutput::RunDeferred() +{ + /* this method runs in the IOThread; it broadcasts pages from + our own queue to all clients */ + + const ScopeLock protect(mutex); + + while (!pages.empty()) { + Page *page = pages.front(); + pages.pop(); + + for (auto &client : clients) + client.PushPage(page); + + page->Unref(); + } + + /* wake up the client that may be waiting for the queue to be + flushed */ + cond.broadcast(); +} + +void HttpdOutput::OnAccept(int fd, const sockaddr &address, size_t address_length, gcc_unused int uid) { @@ -393,19 +417,29 @@ HttpdOutput::BroadcastPage(Page *page) { assert(page != nullptr); - const ScopeLock protect(mutex); - for (auto &client : clients) - client.PushPage(page); + mutex.lock(); + pages.push(page); + page->Ref(); + mutex.unlock(); + + DeferredMonitor::Schedule(); } void HttpdOutput::BroadcastFromEncoder() { + /* synchronize with the IOThread */ + mutex.lock(); + while (!pages.empty()) + cond.wait(mutex); + Page *page; - while ((page = ReadPage()) != nullptr) { - BroadcastPage(page); - page->Unref(); - } + while ((page = ReadPage()) != nullptr) + pages.push(page); + + mutex.unlock(); + + DeferredMonitor::Schedule(); } inline bool @@ -519,15 +553,27 @@ inline void HttpdOutput::CancelAllClients() { const ScopeLock protect(mutex); + + while (!pages.empty()) { + Page *page = pages.front(); + pages.pop(); + page->Unref(); + } + for (auto &client : clients) client.CancelQueue(); + + cond.broadcast(); } static void httpd_output_cancel(struct audio_output *ao) { HttpdOutput *httpd = HttpdOutput::Cast(ao); - httpd->CancelAllClients(); + + BlockingCall(io_thread_get(), [httpd](){ + httpd->CancelAllClients(); + }); } const struct audio_output_plugin httpd_output_plugin = { |