aboutsummaryrefslogblamecommitdiffstats
path: root/src/db/upnp/WorkQueue.hxx
blob: 851f81837e47cf2a67ee6267fd7426be17dfaea1 (plain) (tree)






































































































































































































































































































































                                                                                                
/*
 * Copyright (C) 2003-2014 The Music Player Daemon Project
 * http://www.musicpd.org
 *
 * This program is free software; you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation; either version 2 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License along
 * with this program; if not, write to the Free Software Foundation, Inc.,
 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
 */

#ifndef _WORKQUEUE_H_INCLUDED_
#define _WORKQUEUE_H_INCLUDED_

#include "thread/Mutex.hxx"
#include "thread/Cond.hxx"

#include <pthread.h>
#include <time.h>

#include <string>
#include <queue>
#include <unordered_map>

//#include "debuglog.h"
#define LOGINFO(X)
#define LOGERR(X)

/**
 * A WorkQueue manages the synchronisation around a queue of work items,
 * where a number of client threads queue tasks and a number of worker
 * threads take and execute them. The goal is to introduce some level
 * of parallelism between the successive steps of a previously single
 * threaded pipeline. For example data extraction / data preparation / index
 * update, but this could have other uses.
 *
 * There is no individual task status return. In case of fatal error,
 * the client or worker sets an end condition on the queue. A second
 * queue could conceivably be used for returning individual task
 * status.
 */
template <class T>
class WorkQueue {
	/**
	 * Store per-worker-thread data. Just an initialized timespec,
	 * and used at the moment.
	 */
	class WQTData {
	public:
		WQTData() {wstart.tv_sec = 0; wstart.tv_nsec = 0;}
		struct timespec wstart;
	};

	// Configuration
	std::string m_name;
	size_t m_high;
	size_t m_low;

	// Status
	// Worker threads having called exit
	unsigned int m_workers_exited;
	bool m_ok;

	// Per-thread data. The data is not used currently, this could be
	// a set<pthread_t>
	std::unordered_map<pthread_t, WQTData> m_worker_threads;

	// Synchronization
	std::queue<T> m_queue;
	Cond m_ccond;
	Cond m_wcond;
	Mutex m_mutex;
	// Client/Worker threads currently waiting for a job
	unsigned int m_clients_waiting;
	unsigned int m_workers_waiting;

	// Statistics
	unsigned int m_tottasks;
	unsigned int m_nowake;
	unsigned int m_workersleeps;
	unsigned int m_clientsleeps;

public:
	/** Create a WorkQueue
	 * @param name for message printing
	 * @param hi number of tasks on queue before clients blocks. Default 0
	 *    meaning no limit. hi == -1 means that the queue is disabled.
	 * @param lo minimum count of tasks before worker starts. Default 1.
	 */
	WorkQueue(const char *name, size_t hi = 0, size_t lo = 1)
		:m_name(name), m_high(hi), m_low(lo),
		 m_workers_exited(0),
		 m_ok(true),
		 m_clients_waiting(0), m_workers_waiting(0),
		 m_tottasks(0), m_nowake(0), m_workersleeps(0), m_clientsleeps(0)
	{
	}

	~WorkQueue() {
		setTerminateAndWait();
	}

	/** Start the worker threads.
	 *
	 * @param nworkers number of threads copies to start.
	 * @param start_routine thread function. It should loop
	 *      taking (QueueWorker::take()) and executing tasks.
	 * @param arg initial parameter to thread function.
	 * @return true if ok.
	 */
	bool start(int nworkers, void *(*workproc)(void *), void *arg)
	{
		const ScopeLock protect(m_mutex);

		for  (int i = 0; i < nworkers; i++) {
			int err;
			pthread_t thr;
			if ((err = pthread_create(&thr, 0, workproc, arg))) {
				LOGERR(("WorkQueue:%s: pthread_create failed, err %d\n",
					m_name.c_str(), err));
				return false;
			}
			m_worker_threads.insert(std::make_pair(thr, WQTData()));
		}
		return true;
	}

	/** Add item to work queue, called from client.
	 *
	 * Sleeps if there are already too many.
	 */
	bool put(T t)
	{
		const ScopeLock protect(m_mutex);

		if (!ok()) {
			LOGERR(("WorkQueue::put:%s: !ok or mutex_lock failed\n",
				m_name.c_str()));
			return false;
		}

		while (ok() && m_high > 0 && m_queue.size() >= m_high) {
			m_clientsleeps++;
			// Keep the order: we test ok() AFTER the sleep...
			m_clients_waiting++;
			m_ccond.wait(m_mutex);
			if (!ok()) {
				m_clients_waiting--;
				return false;
			}
			m_clients_waiting--;
		}

		m_queue.push(t);
		if (m_workers_waiting > 0) {
			// Just wake one worker, there is only one new task.
			m_wcond.signal();
		} else {
			m_nowake++;
		}

		return true;
	}

	/**
	 * Wait until the queue is inactive. Called from client.
	 *
	 * Waits until the task queue is empty and the workers are all
	 * back sleeping. Used by the client to wait for all current work
	 * to be completed, when it needs to perform work that couldn't be
	 * done in parallel with the worker's tasks, or before shutting
	 * down. Work can be resumed after calling this. Note that the
	 * only thread which can call it safely is the client just above
	 * (which can control the task flow), else there could be
	 * tasks in the intermediate queues.
	 * To rephrase: there is no warranty on return that the queue is actually
	 * idle EXCEPT if the caller knows that no jobs are still being created.
	 * It would be possible to transform this into a safe call if some kind
	 * of suspend condition was set on the queue by waitIdle(), to be reset by
	 * some kind of "resume" call. Not currently the case.
	 */
	bool waitIdle()
	{
		const ScopeLock protect(m_mutex);

		if (!ok()) {
			LOGERR(("WorkQueue::waitIdle:%s: not ok or can't lock\n",
				m_name.c_str()));
			return false;
		}

		// We're done when the queue is empty AND all workers are back
		// waiting for a task.
		while (ok() && (m_queue.size() > 0 ||
				m_workers_waiting != m_worker_threads.size())) {
			m_clients_waiting++;
			m_ccond.wait(m_mutex);
			m_clients_waiting--;
		}

		return ok();
	}


	/** Tell the workers to exit, and wait for them.
	 *
	 * Does not bother about tasks possibly remaining on the queue, so
	 * should be called after waitIdle() for an orderly shutdown.
	 */
	void setTerminateAndWait()
	{
		const ScopeLock protect(m_mutex);

		if (m_worker_threads.empty())
			// Already called ?
			return;

		// Wait for all worker threads to have called workerExit()
		m_ok = false;
		while (m_workers_exited < m_worker_threads.size()) {
			m_wcond.broadcast();
			m_clients_waiting++;
			m_ccond.wait(m_mutex);
			m_clients_waiting--;
		}

		// Perform the thread joins and compute overall status
		// Workers return (void*)1 if ok
		while (!m_worker_threads.empty()) {
			void *status;
			auto it = m_worker_threads.begin();
			pthread_join(it->first, &status);
			m_worker_threads.erase(it);
		}

		// Reset to start state.
		m_workers_exited = m_clients_waiting = m_workers_waiting =
			m_tottasks = m_nowake = m_workersleeps = m_clientsleeps = 0;
		m_ok = true;
	}

	/** Take task from queue. Called from worker.
	 *
	 * Sleeps if there are not enough. Signal if we go to sleep on empty
	 * queue: client may be waiting for our going idle.
	 */
	bool take(T* tp, size_t *szp = 0)
	{
		const ScopeLock protect(m_mutex);

		if (!ok()) {
			return false;
		}

		while (ok() && m_queue.size() < m_low) {
			m_workersleeps++;
			m_workers_waiting++;
			if (m_queue.empty())
				m_ccond.broadcast();
			m_wcond.wait(m_mutex);
			if (!ok()) {
				// !ok is a normal condition when shutting down
				if (ok()) {
					LOGERR(("WorkQueue::take:%s: cond_wait failed or !ok\n",
						m_name.c_str()));
				}
				m_workers_waiting--;
				return false;
			}
			m_workers_waiting--;
		}

		m_tottasks++;
		*tp = m_queue.front();
		if (szp)
			*szp = m_queue.size();
		m_queue.pop();
		if (m_clients_waiting > 0) {
			// No reason to wake up more than one client thread
			m_ccond.signal();
		} else {
			m_nowake++;
		}
		return true;
	}

	/** Advertise exit and abort queue. Called from worker
	 *
	 * This would happen after an unrecoverable error, or when
	 * the queue is terminated by the client. Workers never exit normally,
	 * except when the queue is shut down (at which point m_ok is set to
	 * false by the shutdown code anyway). The thread must return/exit
	 * immediately after calling this.
	 */
	void workerExit()
	{
		const ScopeLock protect(m_mutex);

		m_workers_exited++;
		m_ok = false;
		m_ccond.broadcast();
	}

	size_t qsize()
	{
		const ScopeLock protect(m_mutex);

		size_t sz = m_queue.size();
		return sz;
	}

private:
	bool ok()
	{
		return m_ok && m_workers_exited == 0 && !m_worker_threads.empty();
	}
};

#endif /* _WORKQUEUE_H_INCLUDED_ */