aboutsummaryrefslogtreecommitdiffstats
path: root/src/db/plugins/upnp/WorkQueue.hxx
diff options
context:
space:
mode:
Diffstat (limited to 'src/db/plugins/upnp/WorkQueue.hxx')
-rw-r--r--src/db/plugins/upnp/WorkQueue.hxx206
1 files changed, 0 insertions, 206 deletions
diff --git a/src/db/plugins/upnp/WorkQueue.hxx b/src/db/plugins/upnp/WorkQueue.hxx
deleted file mode 100644
index fe8ce53f9..000000000
--- a/src/db/plugins/upnp/WorkQueue.hxx
+++ /dev/null
@@ -1,206 +0,0 @@
-/*
- * Copyright (C) 2003-2014 The Music Player Daemon Project
- * http://www.musicpd.org
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation; either version 2 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License along
- * with this program; if not, write to the Free Software Foundation, Inc.,
- * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
- */
-
-#ifndef _WORKQUEUE_H_INCLUDED_
-#define _WORKQUEUE_H_INCLUDED_
-
-#include "thread/Mutex.hxx"
-#include "thread/Cond.hxx"
-
-#include <assert.h>
-#include <pthread.h>
-
-#include <string>
-#include <queue>
-
-#define LOGINFO(X)
-#define LOGERR(X)
-
-/**
- * A WorkQueue manages the synchronisation around a queue of work items,
- * where a number of client threads queue tasks and a number of worker
- * threads take and execute them. The goal is to introduce some level
- * of parallelism between the successive steps of a previously single
- * threaded pipeline. For example data extraction / data preparation / index
- * update, but this could have other uses.
- *
- * There is no individual task status return. In case of fatal error,
- * the client or worker sets an end condition on the queue. A second
- * queue could conceivably be used for returning individual task
- * status.
- */
-template <class T>
-class WorkQueue {
- // Configuration
- const std::string name;
-
- // Status
- // Worker threads having called exit
- unsigned n_workers_exited;
- bool ok;
-
- unsigned n_threads;
- pthread_t *threads;
-
- // Synchronization
- std::queue<T> queue;
- Cond client_cond;
- Cond worker_cond;
- Mutex mutex;
-
-public:
- /** Create a WorkQueue
- * @param name for message printing
- * @param hi number of tasks on queue before clients blocks. Default 0
- * meaning no limit. hi == -1 means that the queue is disabled.
- * @param lo minimum count of tasks before worker starts. Default 1.
- */
- WorkQueue(const char *_name)
- :name(_name),
- n_workers_exited(0),
- ok(false),
- n_threads(0), threads(nullptr)
- {
- }
-
- ~WorkQueue() {
- setTerminateAndWait();
- }
-
- /** Start the worker threads.
- *
- * @param nworkers number of threads copies to start.
- * @param start_routine thread function. It should loop
- * taking (QueueWorker::take()) and executing tasks.
- * @param arg initial parameter to thread function.
- * @return true if ok.
- */
- bool start(unsigned nworkers, void *(*workproc)(void *), void *arg)
- {
- const ScopeLock protect(mutex);
-
- assert(nworkers > 0);
- assert(!ok);
- assert(n_threads == 0);
- assert(threads == nullptr);
-
- n_threads = nworkers;
- threads = new pthread_t[n_threads];
-
- for (unsigned i = 0; i < nworkers; i++) {
- int err;
- if ((err = pthread_create(&threads[i], 0, workproc, arg))) {
- LOGERR(("WorkQueue:%s: pthread_create failed, err %d\n",
- name.c_str(), err));
- return false;
- }
- }
-
- ok = true;
- return true;
- }
-
- /** Add item to work queue, called from client.
- *
- * Sleeps if there are already too many.
- */
- template<typename U>
- bool put(U &&u)
- {
- const ScopeLock protect(mutex);
-
- queue.emplace(std::forward<U>(u));
-
- // Just wake one worker, there is only one new task.
- worker_cond.signal();
-
- return true;
- }
-
-
- /** Tell the workers to exit, and wait for them.
- */
- void setTerminateAndWait()
- {
- const ScopeLock protect(mutex);
-
- // Wait for all worker threads to have called workerExit()
- ok = false;
- while (n_workers_exited < n_threads) {
- worker_cond.broadcast();
- client_cond.wait(mutex);
- }
-
- // Perform the thread joins and compute overall status
- // Workers return (void*)1 if ok
- for (unsigned i = 0; i < n_threads; ++i) {
- void *status;
- pthread_join(threads[i], &status);
- }
-
- delete[] threads;
- threads = nullptr;
- n_threads = 0;
-
- // Reset to start state.
- n_workers_exited = 0;
- }
-
- /** Take task from queue. Called from worker.
- *
- * Sleeps if there are not enough. Signal if we go to sleep on empty
- * queue: client may be waiting for our going idle.
- */
- bool take(T &tp)
- {
- const ScopeLock protect(mutex);
-
- if (!ok)
- return false;
-
- while (queue.empty()) {
- worker_cond.wait(mutex);
- if (!ok)
- return false;
- }
-
- tp = std::move(queue.front());
- queue.pop();
- return true;
- }
-
- /** Advertise exit and abort queue. Called from worker
- *
- * This would happen after an unrecoverable error, or when
- * the queue is terminated by the client. Workers never exit normally,
- * except when the queue is shut down (at which point ok is set to
- * false by the shutdown code anyway). The thread must return/exit
- * immediately after calling this.
- */
- void workerExit()
- {
- const ScopeLock protect(mutex);
-
- n_workers_exited++;
- ok = false;
- client_cond.broadcast();
- }
-};
-
-#endif /* _WORKQUEUE_H_INCLUDED_ */