aboutsummaryrefslogtreecommitdiffstats
path: root/src/event/Loop.cxx
diff options
context:
space:
mode:
Diffstat (limited to 'src/event/Loop.cxx')
-rw-r--r--src/event/Loop.cxx203
1 files changed, 117 insertions, 86 deletions
diff --git a/src/event/Loop.cxx b/src/event/Loop.cxx
index 5aa24aea2..4ded68ff4 100644
--- a/src/event/Loop.cxx
+++ b/src/event/Loop.cxx
@@ -1,5 +1,5 @@
/*
- * Copyright (C) 2003-2013 The Music Player Daemon Project
+ * Copyright (C) 2003-2014 The Music Player Daemon Project
* http://www.musicpd.org
*
* This program is free software; you can redistribute it and/or modify
@@ -20,20 +20,21 @@
#include "config.h"
#include "Loop.hxx"
-#ifdef USE_EPOLL
-
#include "system/Clock.hxx"
#include "TimeoutMonitor.hxx"
#include "SocketMonitor.hxx"
#include "IdleMonitor.hxx"
+#include "DeferredMonitor.hxx"
#include <algorithm>
-EventLoop::EventLoop(Default)
+EventLoop::EventLoop()
:SocketMonitor(*this),
now_ms(::MonotonicClockMS()),
- quit(false),
- n_events(0),
+ quit(false), busy(true),
+#ifndef NDEBUG
+ virgin(true),
+#endif
thread(ThreadId::Null())
{
SocketMonitor::Open(wake_fd.Get());
@@ -45,45 +46,51 @@ EventLoop::~EventLoop()
assert(idle.empty());
assert(timers.empty());
- /* avoid closing the WakeFD twice */
- SocketMonitor::Steal();
+ /* this is necessary to get a well-defined destruction
+ order */
+ SocketMonitor::Cancel();
}
void
EventLoop::Break()
{
- if (IsInside())
- quit = true;
- else
- AddCall([this]() { Break(); });
+ quit = true;
+ wake_fd.Write();
}
-void
-EventLoop::Abandon(SocketMonitor &m)
+bool
+EventLoop::Abandon(int _fd, SocketMonitor &m)
{
- for (unsigned i = 0, n = n_events; i < n; ++i)
- if (events[i].data.ptr == &m)
- events[i].events = 0;
+ assert(IsInsideOrVirgin());
+
+ poll_result.Clear(&m);
+ return poll_group.Abandon(_fd);
}
bool
EventLoop::RemoveFD(int _fd, SocketMonitor &m)
{
- Abandon(m);
- return epoll.Remove(_fd);
+ assert(IsInsideOrNull());
+
+ poll_result.Clear(&m);
+ return poll_group.Remove(_fd);
}
void
EventLoop::AddIdle(IdleMonitor &i)
{
+ assert(IsInsideOrVirgin());
assert(std::find(idle.begin(), idle.end(), &i) == idle.end());
idle.push_back(&i);
+ again = true;
}
void
EventLoop::RemoveIdle(IdleMonitor &i)
{
+ assert(IsInsideOrVirgin());
+
auto it = std::find(idle.begin(), idle.end(), &i);
assert(it != idle.end());
@@ -93,12 +100,19 @@ EventLoop::RemoveIdle(IdleMonitor &i)
void
EventLoop::AddTimer(TimeoutMonitor &t, unsigned ms)
{
+ /* can't use IsInsideOrVirgin() here because libavahi-client
+ modifies the timeout during avahi_client_free() */
+ assert(IsInsideOrNull());
+
timers.insert(TimerRecord(t, now_ms + ms));
+ again = true;
}
void
EventLoop::CancelTimer(TimeoutMonitor &t)
{
+ assert(IsInsideOrNull());
+
for (auto i = timers.begin(), end = timers.end(); i != end; ++i) {
if (&i->timer == &t) {
timers.erase(i);
@@ -107,19 +121,24 @@ EventLoop::CancelTimer(TimeoutMonitor &t)
}
}
-#endif
-
void
EventLoop::Run()
{
assert(thread.IsNull());
+ assert(virgin);
+
+#ifndef NDEBUG
+ virgin = false;
+#endif
+
thread = ThreadId::GetCurrent();
-#ifdef USE_EPOLL
assert(!quit);
+ assert(busy);
do {
now_ms = ::MonotonicClockMS();
+ again = false;
/* invoke timers */
@@ -146,7 +165,6 @@ EventLoop::Run()
/* invoke idle */
- const bool idle_empty = idle.empty();
while (!idle.empty()) {
IdleMonitor &m = *idle.front();
idle.pop_front();
@@ -156,7 +174,14 @@ EventLoop::Run()
return;
}
- if (!idle_empty)
+ /* try to handle DeferredMonitors without WakeFD
+ overhead */
+ mutex.lock();
+ HandleDeferred();
+ busy = false;
+ mutex.unlock();
+
+ if (again)
/* re-evaluate timers because one of the
IdleMonitors may have added a new
timeout */
@@ -164,101 +189,107 @@ EventLoop::Run()
/* wait for new event */
- const int n = epoll.Wait(events, MAX_EVENTS, timeout_ms);
- n_events = std::max(n, 0);
+ poll_group.ReadEvents(poll_result, timeout_ms);
now_ms = ::MonotonicClockMS();
- assert(!quit);
+ mutex.lock();
+ busy = true;
+ mutex.unlock();
/* invoke sockets */
-
- for (int i = 0; i < n; ++i) {
- const auto &e = events[i];
-
- if (e.events != 0) {
- SocketMonitor &m = *(SocketMonitor *)e.data.ptr;
- m.Dispatch(e.events);
-
+ for (int i = 0; i < poll_result.GetSize(); ++i) {
+ auto events = poll_result.GetEvents(i);
+ if (events != 0) {
if (quit)
break;
+
+ auto m = (SocketMonitor *)poll_result.GetObject(i);
+ m->Dispatch(events);
}
}
- n_events = 0;
+ poll_result.Reset();
+
} while (!quit);
-#else
- g_main_loop_run(loop);
-#endif
+#ifndef NDEBUG
+ assert(busy);
assert(thread.IsInside());
+ thread = ThreadId::Null();
+#endif
}
-#ifdef USE_EPOLL
-
void
-EventLoop::AddCall(std::function<void()> &&f)
+EventLoop::AddDeferred(DeferredMonitor &d)
{
mutex.lock();
- calls.push_back(f);
+ if (d.pending) {
+ mutex.unlock();
+ return;
+ }
+
+ assert(std::find(deferred.begin(),
+ deferred.end(), &d) == deferred.end());
+
+ /* we don't need to wake up the EventLoop if another
+ DeferredMonitor has already done it */
+ const bool must_wake = !busy && deferred.empty();
+
+ d.pending = true;
+ deferred.push_back(&d);
+ again = true;
mutex.unlock();
- wake_fd.Write();
+ if (must_wake)
+ wake_fd.Write();
}
-bool
-EventLoop::OnSocketReady(gcc_unused unsigned flags)
+void
+EventLoop::RemoveDeferred(DeferredMonitor &d)
{
- assert(!quit);
+ const ScopeLock protect(mutex);
- wake_fd.Read();
+ if (!d.pending) {
+ assert(std::find(deferred.begin(),
+ deferred.end(), &d) == deferred.end());
+ return;
+ }
- mutex.lock();
+ d.pending = false;
- while (!calls.empty() && !quit) {
- auto f = std::move(calls.front());
- calls.pop_front();
+ auto i = std::find(deferred.begin(), deferred.end(), &d);
+ assert(i != deferred.end());
+
+ deferred.erase(i);
+}
+
+void
+EventLoop::HandleDeferred()
+{
+ while (!deferred.empty() && !quit) {
+ DeferredMonitor &m = *deferred.front();
+ assert(m.pending);
+
+ deferred.pop_front();
+ m.pending = false;
mutex.unlock();
- f();
+ m.RunDeferred();
mutex.lock();
}
-
- mutex.unlock();
-
- return true;
}
-#else
-
-guint
-EventLoop::AddIdle(GSourceFunc function, gpointer data)
+bool
+EventLoop::OnSocketReady(gcc_unused unsigned flags)
{
- GSource *source = g_idle_source_new();
- g_source_set_callback(source, function, data, nullptr);
- guint id = g_source_attach(source, GetContext());
- g_source_unref(source);
- return id;
-}
+ assert(IsInside());
-GSource *
-EventLoop::AddTimeout(guint interval_ms,
- GSourceFunc function, gpointer data)
-{
- GSource *source = g_timeout_source_new(interval_ms);
- g_source_set_callback(source, function, data, nullptr);
- g_source_attach(source, GetContext());
- return source;
-}
+ wake_fd.Read();
-GSource *
-EventLoop::AddTimeoutSeconds(guint interval_s,
- GSourceFunc function, gpointer data)
-{
- GSource *source = g_timeout_source_new_seconds(interval_s);
- g_source_set_callback(source, function, data, nullptr);
- g_source_attach(source, GetContext());
- return source;
-}
+ mutex.lock();
+ HandleDeferred();
+ mutex.unlock();
-#endif
+ return true;
+}