aboutsummaryrefslogtreecommitdiffstats
path: root/src/db/upnp/WorkQueue.hxx
diff options
context:
space:
mode:
authorJean-Francois Dockes <jf@dockes.org>2013-11-01 19:26:01 +0100
committerMax Kellermann <max@duempel.org>2014-01-09 20:56:00 +0100
commit406452f019f097bf52d8db529eabe8dd2d0e977a (patch)
treed70ce0b620cc97ed9d352f89063fa7cecffd6341 /src/db/upnp/WorkQueue.hxx
parent12b139beafc191d02277e7ce97b4c59f7bb0c095 (diff)
downloadmpd-406452f019f097bf52d8db529eabe8dd2d0e977a.tar.gz
mpd-406452f019f097bf52d8db529eabe8dd2d0e977a.tar.xz
mpd-406452f019f097bf52d8db529eabe8dd2d0e977a.zip
UPnP database plugin
[mk: renamed source files, applied coding style, reduced bloat, using MPD's threading library, using MPD's error reporting and logging library and refactoring, fixed lots of bugs]
Diffstat (limited to 'src/db/upnp/WorkQueue.hxx')
-rw-r--r--src/db/upnp/WorkQueue.hxx327
1 files changed, 327 insertions, 0 deletions
diff --git a/src/db/upnp/WorkQueue.hxx b/src/db/upnp/WorkQueue.hxx
new file mode 100644
index 000000000..851f81837
--- /dev/null
+++ b/src/db/upnp/WorkQueue.hxx
@@ -0,0 +1,327 @@
+/*
+ * Copyright (C) 2003-2014 The Music Player Daemon Project
+ * http://www.musicpd.org
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+
+#ifndef _WORKQUEUE_H_INCLUDED_
+#define _WORKQUEUE_H_INCLUDED_
+
+#include "thread/Mutex.hxx"
+#include "thread/Cond.hxx"
+
+#include <pthread.h>
+#include <time.h>
+
+#include <string>
+#include <queue>
+#include <unordered_map>
+
+//#include "debuglog.h"
+#define LOGINFO(X)
+#define LOGERR(X)
+
+/**
+ * A WorkQueue manages the synchronisation around a queue of work items,
+ * where a number of client threads queue tasks and a number of worker
+ * threads take and execute them. The goal is to introduce some level
+ * of parallelism between the successive steps of a previously single
+ * threaded pipeline. For example data extraction / data preparation / index
+ * update, but this could have other uses.
+ *
+ * There is no individual task status return. In case of fatal error,
+ * the client or worker sets an end condition on the queue. A second
+ * queue could conceivably be used for returning individual task
+ * status.
+ */
+template <class T>
+class WorkQueue {
+ /**
+ * Store per-worker-thread data. Just an initialized timespec,
+ * and used at the moment.
+ */
+ class WQTData {
+ public:
+ WQTData() {wstart.tv_sec = 0; wstart.tv_nsec = 0;}
+ struct timespec wstart;
+ };
+
+ // Configuration
+ std::string m_name;
+ size_t m_high;
+ size_t m_low;
+
+ // Status
+ // Worker threads having called exit
+ unsigned int m_workers_exited;
+ bool m_ok;
+
+ // Per-thread data. The data is not used currently, this could be
+ // a set<pthread_t>
+ std::unordered_map<pthread_t, WQTData> m_worker_threads;
+
+ // Synchronization
+ std::queue<T> m_queue;
+ Cond m_ccond;
+ Cond m_wcond;
+ Mutex m_mutex;
+ // Client/Worker threads currently waiting for a job
+ unsigned int m_clients_waiting;
+ unsigned int m_workers_waiting;
+
+ // Statistics
+ unsigned int m_tottasks;
+ unsigned int m_nowake;
+ unsigned int m_workersleeps;
+ unsigned int m_clientsleeps;
+
+public:
+ /** Create a WorkQueue
+ * @param name for message printing
+ * @param hi number of tasks on queue before clients blocks. Default 0
+ * meaning no limit. hi == -1 means that the queue is disabled.
+ * @param lo minimum count of tasks before worker starts. Default 1.
+ */
+ WorkQueue(const char *name, size_t hi = 0, size_t lo = 1)
+ :m_name(name), m_high(hi), m_low(lo),
+ m_workers_exited(0),
+ m_ok(true),
+ m_clients_waiting(0), m_workers_waiting(0),
+ m_tottasks(0), m_nowake(0), m_workersleeps(0), m_clientsleeps(0)
+ {
+ }
+
+ ~WorkQueue() {
+ setTerminateAndWait();
+ }
+
+ /** Start the worker threads.
+ *
+ * @param nworkers number of threads copies to start.
+ * @param start_routine thread function. It should loop
+ * taking (QueueWorker::take()) and executing tasks.
+ * @param arg initial parameter to thread function.
+ * @return true if ok.
+ */
+ bool start(int nworkers, void *(*workproc)(void *), void *arg)
+ {
+ const ScopeLock protect(m_mutex);
+
+ for (int i = 0; i < nworkers; i++) {
+ int err;
+ pthread_t thr;
+ if ((err = pthread_create(&thr, 0, workproc, arg))) {
+ LOGERR(("WorkQueue:%s: pthread_create failed, err %d\n",
+ m_name.c_str(), err));
+ return false;
+ }
+ m_worker_threads.insert(std::make_pair(thr, WQTData()));
+ }
+ return true;
+ }
+
+ /** Add item to work queue, called from client.
+ *
+ * Sleeps if there are already too many.
+ */
+ bool put(T t)
+ {
+ const ScopeLock protect(m_mutex);
+
+ if (!ok()) {
+ LOGERR(("WorkQueue::put:%s: !ok or mutex_lock failed\n",
+ m_name.c_str()));
+ return false;
+ }
+
+ while (ok() && m_high > 0 && m_queue.size() >= m_high) {
+ m_clientsleeps++;
+ // Keep the order: we test ok() AFTER the sleep...
+ m_clients_waiting++;
+ m_ccond.wait(m_mutex);
+ if (!ok()) {
+ m_clients_waiting--;
+ return false;
+ }
+ m_clients_waiting--;
+ }
+
+ m_queue.push(t);
+ if (m_workers_waiting > 0) {
+ // Just wake one worker, there is only one new task.
+ m_wcond.signal();
+ } else {
+ m_nowake++;
+ }
+
+ return true;
+ }
+
+ /**
+ * Wait until the queue is inactive. Called from client.
+ *
+ * Waits until the task queue is empty and the workers are all
+ * back sleeping. Used by the client to wait for all current work
+ * to be completed, when it needs to perform work that couldn't be
+ * done in parallel with the worker's tasks, or before shutting
+ * down. Work can be resumed after calling this. Note that the
+ * only thread which can call it safely is the client just above
+ * (which can control the task flow), else there could be
+ * tasks in the intermediate queues.
+ * To rephrase: there is no warranty on return that the queue is actually
+ * idle EXCEPT if the caller knows that no jobs are still being created.
+ * It would be possible to transform this into a safe call if some kind
+ * of suspend condition was set on the queue by waitIdle(), to be reset by
+ * some kind of "resume" call. Not currently the case.
+ */
+ bool waitIdle()
+ {
+ const ScopeLock protect(m_mutex);
+
+ if (!ok()) {
+ LOGERR(("WorkQueue::waitIdle:%s: not ok or can't lock\n",
+ m_name.c_str()));
+ return false;
+ }
+
+ // We're done when the queue is empty AND all workers are back
+ // waiting for a task.
+ while (ok() && (m_queue.size() > 0 ||
+ m_workers_waiting != m_worker_threads.size())) {
+ m_clients_waiting++;
+ m_ccond.wait(m_mutex);
+ m_clients_waiting--;
+ }
+
+ return ok();
+ }
+
+
+ /** Tell the workers to exit, and wait for them.
+ *
+ * Does not bother about tasks possibly remaining on the queue, so
+ * should be called after waitIdle() for an orderly shutdown.
+ */
+ void setTerminateAndWait()
+ {
+ const ScopeLock protect(m_mutex);
+
+ if (m_worker_threads.empty())
+ // Already called ?
+ return;
+
+ // Wait for all worker threads to have called workerExit()
+ m_ok = false;
+ while (m_workers_exited < m_worker_threads.size()) {
+ m_wcond.broadcast();
+ m_clients_waiting++;
+ m_ccond.wait(m_mutex);
+ m_clients_waiting--;
+ }
+
+ // Perform the thread joins and compute overall status
+ // Workers return (void*)1 if ok
+ while (!m_worker_threads.empty()) {
+ void *status;
+ auto it = m_worker_threads.begin();
+ pthread_join(it->first, &status);
+ m_worker_threads.erase(it);
+ }
+
+ // Reset to start state.
+ m_workers_exited = m_clients_waiting = m_workers_waiting =
+ m_tottasks = m_nowake = m_workersleeps = m_clientsleeps = 0;
+ m_ok = true;
+ }
+
+ /** Take task from queue. Called from worker.
+ *
+ * Sleeps if there are not enough. Signal if we go to sleep on empty
+ * queue: client may be waiting for our going idle.
+ */
+ bool take(T* tp, size_t *szp = 0)
+ {
+ const ScopeLock protect(m_mutex);
+
+ if (!ok()) {
+ return false;
+ }
+
+ while (ok() && m_queue.size() < m_low) {
+ m_workersleeps++;
+ m_workers_waiting++;
+ if (m_queue.empty())
+ m_ccond.broadcast();
+ m_wcond.wait(m_mutex);
+ if (!ok()) {
+ // !ok is a normal condition when shutting down
+ if (ok()) {
+ LOGERR(("WorkQueue::take:%s: cond_wait failed or !ok\n",
+ m_name.c_str()));
+ }
+ m_workers_waiting--;
+ return false;
+ }
+ m_workers_waiting--;
+ }
+
+ m_tottasks++;
+ *tp = m_queue.front();
+ if (szp)
+ *szp = m_queue.size();
+ m_queue.pop();
+ if (m_clients_waiting > 0) {
+ // No reason to wake up more than one client thread
+ m_ccond.signal();
+ } else {
+ m_nowake++;
+ }
+ return true;
+ }
+
+ /** Advertise exit and abort queue. Called from worker
+ *
+ * This would happen after an unrecoverable error, or when
+ * the queue is terminated by the client. Workers never exit normally,
+ * except when the queue is shut down (at which point m_ok is set to
+ * false by the shutdown code anyway). The thread must return/exit
+ * immediately after calling this.
+ */
+ void workerExit()
+ {
+ const ScopeLock protect(m_mutex);
+
+ m_workers_exited++;
+ m_ok = false;
+ m_ccond.broadcast();
+ }
+
+ size_t qsize()
+ {
+ const ScopeLock protect(m_mutex);
+
+ size_t sz = m_queue.size();
+ return sz;
+ }
+
+private:
+ bool ok()
+ {
+ return m_ok && m_workers_exited == 0 && !m_worker_threads.empty();
+ }
+};
+
+#endif /* _WORKQUEUE_H_INCLUDED_ */