diff options
Diffstat (limited to 'src/event/Loop.cxx')
-rw-r--r-- | src/event/Loop.cxx | 203 |
1 files changed, 117 insertions, 86 deletions
diff --git a/src/event/Loop.cxx b/src/event/Loop.cxx index 5aa24aea2..4ded68ff4 100644 --- a/src/event/Loop.cxx +++ b/src/event/Loop.cxx @@ -1,5 +1,5 @@ /* - * Copyright (C) 2003-2013 The Music Player Daemon Project + * Copyright (C) 2003-2014 The Music Player Daemon Project * http://www.musicpd.org * * This program is free software; you can redistribute it and/or modify @@ -20,20 +20,21 @@ #include "config.h" #include "Loop.hxx" -#ifdef USE_EPOLL - #include "system/Clock.hxx" #include "TimeoutMonitor.hxx" #include "SocketMonitor.hxx" #include "IdleMonitor.hxx" +#include "DeferredMonitor.hxx" #include <algorithm> -EventLoop::EventLoop(Default) +EventLoop::EventLoop() :SocketMonitor(*this), now_ms(::MonotonicClockMS()), - quit(false), - n_events(0), + quit(false), busy(true), +#ifndef NDEBUG + virgin(true), +#endif thread(ThreadId::Null()) { SocketMonitor::Open(wake_fd.Get()); @@ -45,45 +46,51 @@ EventLoop::~EventLoop() assert(idle.empty()); assert(timers.empty()); - /* avoid closing the WakeFD twice */ - SocketMonitor::Steal(); + /* this is necessary to get a well-defined destruction + order */ + SocketMonitor::Cancel(); } void EventLoop::Break() { - if (IsInside()) - quit = true; - else - AddCall([this]() { Break(); }); + quit = true; + wake_fd.Write(); } -void -EventLoop::Abandon(SocketMonitor &m) +bool +EventLoop::Abandon(int _fd, SocketMonitor &m) { - for (unsigned i = 0, n = n_events; i < n; ++i) - if (events[i].data.ptr == &m) - events[i].events = 0; + assert(IsInsideOrVirgin()); + + poll_result.Clear(&m); + return poll_group.Abandon(_fd); } bool EventLoop::RemoveFD(int _fd, SocketMonitor &m) { - Abandon(m); - return epoll.Remove(_fd); + assert(IsInsideOrNull()); + + poll_result.Clear(&m); + return poll_group.Remove(_fd); } void EventLoop::AddIdle(IdleMonitor &i) { + assert(IsInsideOrVirgin()); assert(std::find(idle.begin(), idle.end(), &i) == idle.end()); idle.push_back(&i); + again = true; } void EventLoop::RemoveIdle(IdleMonitor &i) { + assert(IsInsideOrVirgin()); + auto it = std::find(idle.begin(), idle.end(), &i); assert(it != idle.end()); @@ -93,12 +100,19 @@ EventLoop::RemoveIdle(IdleMonitor &i) void EventLoop::AddTimer(TimeoutMonitor &t, unsigned ms) { + /* can't use IsInsideOrVirgin() here because libavahi-client + modifies the timeout during avahi_client_free() */ + assert(IsInsideOrNull()); + timers.insert(TimerRecord(t, now_ms + ms)); + again = true; } void EventLoop::CancelTimer(TimeoutMonitor &t) { + assert(IsInsideOrNull()); + for (auto i = timers.begin(), end = timers.end(); i != end; ++i) { if (&i->timer == &t) { timers.erase(i); @@ -107,19 +121,24 @@ EventLoop::CancelTimer(TimeoutMonitor &t) } } -#endif - void EventLoop::Run() { assert(thread.IsNull()); + assert(virgin); + +#ifndef NDEBUG + virgin = false; +#endif + thread = ThreadId::GetCurrent(); -#ifdef USE_EPOLL assert(!quit); + assert(busy); do { now_ms = ::MonotonicClockMS(); + again = false; /* invoke timers */ @@ -146,7 +165,6 @@ EventLoop::Run() /* invoke idle */ - const bool idle_empty = idle.empty(); while (!idle.empty()) { IdleMonitor &m = *idle.front(); idle.pop_front(); @@ -156,7 +174,14 @@ EventLoop::Run() return; } - if (!idle_empty) + /* try to handle DeferredMonitors without WakeFD + overhead */ + mutex.lock(); + HandleDeferred(); + busy = false; + mutex.unlock(); + + if (again) /* re-evaluate timers because one of the IdleMonitors may have added a new timeout */ @@ -164,101 +189,107 @@ EventLoop::Run() /* wait for new event */ - const int n = epoll.Wait(events, MAX_EVENTS, timeout_ms); - n_events = std::max(n, 0); + poll_group.ReadEvents(poll_result, timeout_ms); now_ms = ::MonotonicClockMS(); - assert(!quit); + mutex.lock(); + busy = true; + mutex.unlock(); /* invoke sockets */ - - for (int i = 0; i < n; ++i) { - const auto &e = events[i]; - - if (e.events != 0) { - SocketMonitor &m = *(SocketMonitor *)e.data.ptr; - m.Dispatch(e.events); - + for (int i = 0; i < poll_result.GetSize(); ++i) { + auto events = poll_result.GetEvents(i); + if (events != 0) { if (quit) break; + + auto m = (SocketMonitor *)poll_result.GetObject(i); + m->Dispatch(events); } } - n_events = 0; + poll_result.Reset(); + } while (!quit); -#else - g_main_loop_run(loop); -#endif +#ifndef NDEBUG + assert(busy); assert(thread.IsInside()); + thread = ThreadId::Null(); +#endif } -#ifdef USE_EPOLL - void -EventLoop::AddCall(std::function<void()> &&f) +EventLoop::AddDeferred(DeferredMonitor &d) { mutex.lock(); - calls.push_back(f); + if (d.pending) { + mutex.unlock(); + return; + } + + assert(std::find(deferred.begin(), + deferred.end(), &d) == deferred.end()); + + /* we don't need to wake up the EventLoop if another + DeferredMonitor has already done it */ + const bool must_wake = !busy && deferred.empty(); + + d.pending = true; + deferred.push_back(&d); + again = true; mutex.unlock(); - wake_fd.Write(); + if (must_wake) + wake_fd.Write(); } -bool -EventLoop::OnSocketReady(gcc_unused unsigned flags) +void +EventLoop::RemoveDeferred(DeferredMonitor &d) { - assert(!quit); + const ScopeLock protect(mutex); - wake_fd.Read(); + if (!d.pending) { + assert(std::find(deferred.begin(), + deferred.end(), &d) == deferred.end()); + return; + } - mutex.lock(); + d.pending = false; - while (!calls.empty() && !quit) { - auto f = std::move(calls.front()); - calls.pop_front(); + auto i = std::find(deferred.begin(), deferred.end(), &d); + assert(i != deferred.end()); + + deferred.erase(i); +} + +void +EventLoop::HandleDeferred() +{ + while (!deferred.empty() && !quit) { + DeferredMonitor &m = *deferred.front(); + assert(m.pending); + + deferred.pop_front(); + m.pending = false; mutex.unlock(); - f(); + m.RunDeferred(); mutex.lock(); } - - mutex.unlock(); - - return true; } -#else - -guint -EventLoop::AddIdle(GSourceFunc function, gpointer data) +bool +EventLoop::OnSocketReady(gcc_unused unsigned flags) { - GSource *source = g_idle_source_new(); - g_source_set_callback(source, function, data, nullptr); - guint id = g_source_attach(source, GetContext()); - g_source_unref(source); - return id; -} + assert(IsInside()); -GSource * -EventLoop::AddTimeout(guint interval_ms, - GSourceFunc function, gpointer data) -{ - GSource *source = g_timeout_source_new(interval_ms); - g_source_set_callback(source, function, data, nullptr); - g_source_attach(source, GetContext()); - return source; -} + wake_fd.Read(); -GSource * -EventLoop::AddTimeoutSeconds(guint interval_s, - GSourceFunc function, gpointer data) -{ - GSource *source = g_timeout_source_new_seconds(interval_s); - g_source_set_callback(source, function, data, nullptr); - g_source_attach(source, GetContext()); - return source; -} + mutex.lock(); + HandleDeferred(); + mutex.unlock(); -#endif + return true; +} |