From c1f4f1fdb64d97b5c3461723a8482ca64efea30e Mon Sep 17 00:00:00 2001 From: Max Kellermann Date: Wed, 7 Aug 2013 22:16:59 +0200 Subject: EventLoop: new implementation using epoll Implement an event loop without GLib. --- src/event/Loop.cxx | 196 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 196 insertions(+) (limited to 'src/event/Loop.cxx') diff --git a/src/event/Loop.cxx b/src/event/Loop.cxx index 5154c3562..6e771d9dc 100644 --- a/src/event/Loop.cxx +++ b/src/event/Loop.cxx @@ -19,6 +19,89 @@ #include "config.h" #include "Loop.hxx" +#include "system/clock.h" + +#ifdef USE_EPOLL + +#include "TimeoutMonitor.hxx" +#include "SocketMonitor.hxx" +#include "IdleMonitor.hxx" + +#include + +EventLoop::EventLoop(Default) + :SocketMonitor(*this), + now_ms(::monotonic_clock_ms()), + quit(false), + n_events(0) +{ + SocketMonitor::Open(wake_fd.Get()); + SocketMonitor::Schedule(SocketMonitor::READ); +} + +EventLoop::~EventLoop() +{ + assert(idle.empty()); + assert(timers.empty()); + + /* avoid closing the WakeFD twice */ + SocketMonitor::Steal(); +} + +void +EventLoop::Break() +{ + if (IsInside()) + quit = true; + else + AddCall([this]() { Break(); }); +} + +bool +EventLoop::RemoveFD(int _fd, SocketMonitor &m) +{ + for (unsigned i = 0, n = n_events; i < n; ++i) + if (events[i].data.ptr == &m) + events[i].events = 0; + + return epoll.Remove(_fd); +} + +void +EventLoop::AddIdle(IdleMonitor &i) +{ + assert(std::find(idle.begin(), idle.end(), &i) == idle.end()); + + idle.push_back(&i); +} + +void +EventLoop::RemoveIdle(IdleMonitor &i) +{ + auto it = std::find(idle.begin(), idle.end(), &i); + assert(it != idle.end()); + + idle.erase(it); +} + +void +EventLoop::AddTimer(TimeoutMonitor &t, unsigned ms) +{ + timers.insert(TimerRecord(t, now_ms + ms)); +} + +void +EventLoop::CancelTimer(TimeoutMonitor &t) +{ + for (auto i = timers.begin(), end = timers.end(); i != end; ++i) { + if (&i->timer == &t) { + timers.erase(i); + return; + } + } +} + +#endif void EventLoop::Run() @@ -26,11 +109,122 @@ EventLoop::Run() assert(thread.IsNull()); thread = ThreadId::GetCurrent(); +#ifdef USE_EPOLL + assert(!quit); + + do { + now_ms = ::monotonic_clock_ms(); + + /* invoke timers */ + + int timeout_ms; + while (true) { + auto i = timers.begin(); + if (i == timers.end()) { + timeout_ms = -1; + break; + } + + timeout_ms = i->due_ms - now_ms; + if (timeout_ms > 0) + break; + + TimeoutMonitor &m = i->timer; + timers.erase(i); + + m.Run(); + + if (quit) + return; + } + + /* invoke idle */ + + const bool idle_empty = idle.empty(); + while (!idle.empty()) { + IdleMonitor &m = *idle.front(); + idle.pop_front(); + m.Run(); + + if (quit) + return; + } + + if (!idle_empty) + /* re-evaluate timers because one of the + IdleMonitors may have added a new + timeout */ + continue; + + /* wait for new event */ + + const int n = epoll.Wait(events, MAX_EVENTS, timeout_ms); + n_events = std::max(n, 0); + + now_ms = ::monotonic_clock_ms(); + + assert(!quit); + + /* invoke sockets */ + + for (int i = 0; i < n; ++i) { + const auto &e = events[i]; + + if (e.events != 0) { + SocketMonitor &m = *(SocketMonitor *)e.data.ptr; + m.Dispatch(e.events); + + if (quit) + break; + } + } + + n_events = 0; + } while (!quit); +#else g_main_loop_run(loop); +#endif assert(thread.IsInside()); } +#ifdef USE_EPOLL + +void +EventLoop::AddCall(std::function &&f) +{ + mutex.lock(); + calls.push_back(f); + mutex.unlock(); + + wake_fd.Write(); +} + +bool +EventLoop::OnSocketReady(gcc_unused unsigned flags) +{ + assert(!quit); + + wake_fd.Read(); + + mutex.lock(); + + while (!calls.empty() && !quit) { + auto f = std::move(calls.front()); + calls.pop_front(); + + mutex.unlock(); + f(); + mutex.lock(); + } + + mutex.unlock(); + + return true; +} + +#else + guint EventLoop::AddIdle(GSourceFunc function, gpointer data) { @@ -60,3 +254,5 @@ EventLoop::AddTimeoutSeconds(guint interval_s, g_source_attach(source, GetContext()); return source; } + +#endif -- cgit v1.2.3