/* * Copyright (C) 2003-2014 The Music Player Daemon Project * http://www.musicpd.org * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License along * with this program; if not, write to the Free Software Foundation, Inc., * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ #ifndef _WORKQUEUE_H_INCLUDED_ #define _WORKQUEUE_H_INCLUDED_ #include "thread/Mutex.hxx" #include "thread/Cond.hxx" #include #include #include #include #include //#include "debuglog.h" #define LOGINFO(X) #define LOGERR(X) /** * A WorkQueue manages the synchronisation around a queue of work items, * where a number of client threads queue tasks and a number of worker * threads take and execute them. The goal is to introduce some level * of parallelism between the successive steps of a previously single * threaded pipeline. For example data extraction / data preparation / index * update, but this could have other uses. * * There is no individual task status return. In case of fatal error, * the client or worker sets an end condition on the queue. A second * queue could conceivably be used for returning individual task * status. */ template class WorkQueue { /** * Store per-worker-thread data. Just an initialized timespec, * and used at the moment. */ class WQTData { public: WQTData() {wstart.tv_sec = 0; wstart.tv_nsec = 0;} struct timespec wstart; }; // Configuration std::string m_name; size_t m_high; size_t m_low; // Status // Worker threads having called exit unsigned int m_workers_exited; bool m_ok; // Per-thread data. The data is not used currently, this could be // a set std::unordered_map m_worker_threads; // Synchronization std::queue m_queue; Cond m_ccond; Cond m_wcond; Mutex m_mutex; // Client/Worker threads currently waiting for a job unsigned int m_clients_waiting; unsigned int m_workers_waiting; // Statistics unsigned int m_tottasks; unsigned int m_nowake; unsigned int m_workersleeps; unsigned int m_clientsleeps; public: /** Create a WorkQueue * @param name for message printing * @param hi number of tasks on queue before clients blocks. Default 0 * meaning no limit. hi == -1 means that the queue is disabled. * @param lo minimum count of tasks before worker starts. Default 1. */ WorkQueue(const char *name, size_t hi = 0, size_t lo = 1) :m_name(name), m_high(hi), m_low(lo), m_workers_exited(0), m_ok(true), m_clients_waiting(0), m_workers_waiting(0), m_tottasks(0), m_nowake(0), m_workersleeps(0), m_clientsleeps(0) { } ~WorkQueue() { setTerminateAndWait(); } /** Start the worker threads. * * @param nworkers number of threads copies to start. * @param start_routine thread function. It should loop * taking (QueueWorker::take()) and executing tasks. * @param arg initial parameter to thread function. * @return true if ok. */ bool start(int nworkers, void *(*workproc)(void *), void *arg) { const ScopeLock protect(m_mutex); for (int i = 0; i < nworkers; i++) { int err; pthread_t thr; if ((err = pthread_create(&thr, 0, workproc, arg))) { LOGERR(("WorkQueue:%s: pthread_create failed, err %d\n", m_name.c_str(), err)); return false; } m_worker_threads.insert(std::make_pair(thr, WQTData())); } return true; } /** Add item to work queue, called from client. * * Sleeps if there are already too many. */ bool put(T t) { const ScopeLock protect(m_mutex); if (!ok()) { LOGERR(("WorkQueue::put:%s: !ok or mutex_lock failed\n", m_name.c_str())); return false; } while (ok() && m_high > 0 && m_queue.size() >= m_high) { m_clientsleeps++; // Keep the order: we test ok() AFTER the sleep... m_clients_waiting++; m_ccond.wait(m_mutex); if (!ok()) { m_clients_waiting--; return false; } m_clients_waiting--; } m_queue.push(t); if (m_workers_waiting > 0) { // Just wake one worker, there is only one new task. m_wcond.signal(); } else { m_nowake++; } return true; } /** * Wait until the queue is inactive. Called from client. * * Waits until the task queue is empty and the workers are all * back sleeping. Used by the client to wait for all current work * to be completed, when it needs to perform work that couldn't be * done in parallel with the worker's tasks, or before shutting * down. Work can be resumed after calling this. Note that the * only thread which can call it safely is the client just above * (which can control the task flow), else there could be * tasks in the intermediate queues. * To rephrase: there is no warranty on return that the queue is actually * idle EXCEPT if the caller knows that no jobs are still being created. * It would be possible to transform this into a safe call if some kind * of suspend condition was set on the queue by waitIdle(), to be reset by * some kind of "resume" call. Not currently the case. */ bool waitIdle() { const ScopeLock protect(m_mutex); if (!ok()) { LOGERR(("WorkQueue::waitIdle:%s: not ok or can't lock\n", m_name.c_str())); return false; } // We're done when the queue is empty AND all workers are back // waiting for a task. while (ok() && (m_queue.size() > 0 || m_workers_waiting != m_worker_threads.size())) { m_clients_waiting++; m_ccond.wait(m_mutex); m_clients_waiting--; } return ok(); } /** Tell the workers to exit, and wait for them. * * Does not bother about tasks possibly remaining on the queue, so * should be called after waitIdle() for an orderly shutdown. */ void setTerminateAndWait() { const ScopeLock protect(m_mutex); if (m_worker_threads.empty()) // Already called ? return; // Wait for all worker threads to have called workerExit() m_ok = false; while (m_workers_exited < m_worker_threads.size()) { m_wcond.broadcast(); m_clients_waiting++; m_ccond.wait(m_mutex); m_clients_waiting--; } // Perform the thread joins and compute overall status // Workers return (void*)1 if ok while (!m_worker_threads.empty()) { void *status; auto it = m_worker_threads.begin(); pthread_join(it->first, &status); m_worker_threads.erase(it); } // Reset to start state. m_workers_exited = m_clients_waiting = m_workers_waiting = m_tottasks = m_nowake = m_workersleeps = m_clientsleeps = 0; m_ok = true; } /** Take task from queue. Called from worker. * * Sleeps if there are not enough. Signal if we go to sleep on empty * queue: client may be waiting for our going idle. */ bool take(T* tp, size_t *szp = 0) { const ScopeLock protect(m_mutex); if (!ok()) { return false; } while (ok() && m_queue.size() < m_low) { m_workersleeps++; m_workers_waiting++; if (m_queue.empty()) m_ccond.broadcast(); m_wcond.wait(m_mutex); if (!ok()) { // !ok is a normal condition when shutting down if (ok()) { LOGERR(("WorkQueue::take:%s: cond_wait failed or !ok\n", m_name.c_str())); } m_workers_waiting--; return false; } m_workers_waiting--; } m_tottasks++; *tp = m_queue.front(); if (szp) *szp = m_queue.size(); m_queue.pop(); if (m_clients_waiting > 0) { // No reason to wake up more than one client thread m_ccond.signal(); } else { m_nowake++; } return true; } /** Advertise exit and abort queue. Called from worker * * This would happen after an unrecoverable error, or when * the queue is terminated by the client. Workers never exit normally, * except when the queue is shut down (at which point m_ok is set to * false by the shutdown code anyway). The thread must return/exit * immediately after calling this. */ void workerExit() { const ScopeLock protect(m_mutex); m_workers_exited++; m_ok = false; m_ccond.broadcast(); } size_t qsize() { const ScopeLock protect(m_mutex); size_t sz = m_queue.size(); return sz; } private: bool ok() { return m_ok && m_workers_exited == 0 && !m_worker_threads.empty(); } }; #endif /* _WORKQUEUE_H_INCLUDED_ */