From 6cb725391d0388dcb94f6bb7216f0ce3c1fac06a Mon Sep 17 00:00:00 2001 From: Max Kellermann Date: Tue, 14 Jan 2014 10:47:52 +0100 Subject: db/upnp/WorkQueue: rename attributes --- src/db/upnp/WorkQueue.hxx | 156 +++++++++++++++++++++++----------------------- 1 file changed, 78 insertions(+), 78 deletions(-) (limited to 'src/db/upnp') diff --git a/src/db/upnp/WorkQueue.hxx b/src/db/upnp/WorkQueue.hxx index d1ab7ec43..4bf8c39c2 100644 --- a/src/db/upnp/WorkQueue.hxx +++ b/src/db/upnp/WorkQueue.hxx @@ -60,27 +60,27 @@ class WorkQueue { }; // Configuration - std::string m_name; - size_t m_high; - size_t m_low; + const std::string name; + const size_t high; + const size_t low; // Status // Worker threads having called exit - unsigned int m_workers_exited; - bool m_ok; + unsigned n_workers_exited; + bool ok; // Per-thread data. The data is not used currently, this could be // a set - std::unordered_map m_worker_threads; + std::unordered_map threads; // Synchronization - std::queue m_queue; - Cond m_ccond; - Cond m_wcond; - Mutex m_mutex; + std::queue queue; + Cond client_cond; + Cond worker_cond; + Mutex mutex; // Client/Worker threads currently waiting for a job - unsigned int m_clients_waiting; - unsigned int m_workers_waiting; + unsigned n_clients_waiting; + unsigned n_workers_waiting; public: /** Create a WorkQueue @@ -89,11 +89,11 @@ public: * meaning no limit. hi == -1 means that the queue is disabled. * @param lo minimum count of tasks before worker starts. Default 1. */ - WorkQueue(const char *name, size_t hi = 0, size_t lo = 1) - :m_name(name), m_high(hi), m_low(lo), - m_workers_exited(0), - m_ok(true), - m_clients_waiting(0), m_workers_waiting(0) + WorkQueue(const char *_name, size_t hi = 0, size_t lo = 1) + :name(_name), high(hi), low(lo), + n_workers_exited(0), + ok(true), + n_clients_waiting(0), n_workers_waiting(0) { } @@ -111,17 +111,17 @@ public: */ bool start(int nworkers, void *(*workproc)(void *), void *arg) { - const ScopeLock protect(m_mutex); + const ScopeLock protect(mutex); for (int i = 0; i < nworkers; i++) { int err; pthread_t thr; if ((err = pthread_create(&thr, 0, workproc, arg))) { LOGERR(("WorkQueue:%s: pthread_create failed, err %d\n", - m_name.c_str(), err)); + name.c_str(), err)); return false; } - m_worker_threads.insert(std::make_pair(thr, WQTData())); + threads.insert(std::make_pair(thr, WQTData())); } return true; } @@ -132,29 +132,29 @@ public: */ bool put(T t) { - const ScopeLock protect(m_mutex); + const ScopeLock protect(mutex); - if (!ok()) { + if (!IsOK()) { LOGERR(("WorkQueue::put:%s: !ok or mutex_lock failed\n", - m_name.c_str())); + name.c_str())); return false; } - while (ok() && m_high > 0 && m_queue.size() >= m_high) { - // Keep the order: we test ok() AFTER the sleep... - m_clients_waiting++; - m_ccond.wait(m_mutex); - if (!ok()) { - m_clients_waiting--; + while (IsOK() && high > 0 && queue.size() >= high) { + // Keep the order: we test IsOK() AFTER the sleep... + n_clients_waiting++; + client_cond.wait(mutex); + if (!IsOK()) { + n_clients_waiting--; return false; } - m_clients_waiting--; + n_clients_waiting--; } - m_queue.push(t); - if (m_workers_waiting > 0) { + queue.push(t); + if (n_workers_waiting > 0) { // Just wake one worker, there is only one new task. - m_wcond.signal(); + worker_cond.signal(); } return true; @@ -179,24 +179,24 @@ public: */ bool waitIdle() { - const ScopeLock protect(m_mutex); + const ScopeLock protect(mutex); - if (!ok()) { + if (!IsOK()) { LOGERR(("WorkQueue::waitIdle:%s: not ok or can't lock\n", - m_name.c_str())); + name.c_str())); return false; } // We're done when the queue is empty AND all workers are back // waiting for a task. - while (ok() && (m_queue.size() > 0 || - m_workers_waiting != m_worker_threads.size())) { - m_clients_waiting++; - m_ccond.wait(m_mutex); - m_clients_waiting--; + while (IsOK() && (queue.size() > 0 || + n_workers_waiting != threads.size())) { + n_clients_waiting++; + client_cond.wait(mutex); + n_clients_waiting--; } - return ok(); + return IsOK(); } @@ -207,33 +207,33 @@ public: */ void setTerminateAndWait() { - const ScopeLock protect(m_mutex); + const ScopeLock protect(mutex); - if (m_worker_threads.empty()) + if (threads.empty()) // Already called ? return; // Wait for all worker threads to have called workerExit() - m_ok = false; - while (m_workers_exited < m_worker_threads.size()) { - m_wcond.broadcast(); - m_clients_waiting++; - m_ccond.wait(m_mutex); - m_clients_waiting--; + ok = false; + while (n_workers_exited < threads.size()) { + worker_cond.broadcast(); + n_clients_waiting++; + client_cond.wait(mutex); + n_clients_waiting--; } // Perform the thread joins and compute overall status // Workers return (void*)1 if ok - while (!m_worker_threads.empty()) { + while (!threads.empty()) { void *status; - auto it = m_worker_threads.begin(); + auto it = threads.begin(); pthread_join(it->first, &status); - m_worker_threads.erase(it); + threads.erase(it); } // Reset to start state. - m_workers_exited = m_clients_waiting = m_workers_waiting = 0; - m_ok = true; + n_workers_exited = n_clients_waiting = n_workers_waiting = 0; + ok = true; } /** Take task from queue. Called from worker. @@ -243,34 +243,34 @@ public: */ bool take(T &tp) { - const ScopeLock protect(m_mutex); + const ScopeLock protect(mutex); - if (!ok()) { + if (!IsOK()) { return false; } - while (ok() && m_queue.size() < m_low) { - m_workers_waiting++; - if (m_queue.empty()) - m_ccond.broadcast(); - m_wcond.wait(m_mutex); - if (!ok()) { + while (IsOK() && queue.size() < low) { + n_workers_waiting++; + if (queue.empty()) + client_cond.broadcast(); + worker_cond.wait(mutex); + if (!IsOK()) { // !ok is a normal condition when shutting down - if (ok()) { + if (IsOK()) { LOGERR(("WorkQueue::take:%s: cond_wait failed or !ok\n", - m_name.c_str())); + name.c_str())); } - m_workers_waiting--; + n_workers_waiting--; return false; } - m_workers_waiting--; + n_workers_waiting--; } - tp = m_queue.front(); - m_queue.pop(); - if (m_clients_waiting > 0) { + tp = queue.front(); + queue.pop(); + if (n_clients_waiting > 0) { // No reason to wake up more than one client thread - m_ccond.signal(); + client_cond.signal(); } return true; } @@ -279,23 +279,23 @@ public: * * This would happen after an unrecoverable error, or when * the queue is terminated by the client. Workers never exit normally, - * except when the queue is shut down (at which point m_ok is set to + * except when the queue is shut down (at which point ok is set to * false by the shutdown code anyway). The thread must return/exit * immediately after calling this. */ void workerExit() { - const ScopeLock protect(m_mutex); + const ScopeLock protect(mutex); - m_workers_exited++; - m_ok = false; - m_ccond.broadcast(); + n_workers_exited++; + ok = false; + client_cond.broadcast(); } private: - bool ok() + bool IsOK() { - return m_ok && m_workers_exited == 0 && !m_worker_threads.empty(); + return ok && n_workers_exited == 0 && !threads.empty(); } }; -- cgit v1.2.3