mirror of
https://github.com/znc/znc.git
synced 2026-03-28 17:42:41 +01:00
CThreadPool: Add cancellation support
This adds CThreadPool::cancelJob() and cancelJobs() which can cancel a set of jobs synchronously. These functions only return when the job was successfully cancelled. It tries to cancel the jobs as quickly as possible, skipping any callbacks on CJob that were not yet called. A job that is already running can use CJob::wasCancelled() to check if it should quit. Signed-off-by: Uli Schlachter <psychon@znc.in>
This commit is contained in:
118
src/Threads.cpp
118
src/Threads.cpp
@@ -15,10 +15,12 @@
|
||||
*/
|
||||
|
||||
#include <znc/Threads.h>
|
||||
#include <znc/ZNCDebug.h>
|
||||
|
||||
#ifdef HAVE_PTHREAD
|
||||
|
||||
#include <znc/ZNCDebug.h>
|
||||
#include <algorithm>
|
||||
|
||||
/* Just an arbitrary limit for the number of idle threads */
|
||||
static const size_t MAX_IDLE_THREADS = 3;
|
||||
|
||||
@@ -39,7 +41,18 @@ CThreadPool::CThreadPool() : m_done(false), m_num_threads(0), m_num_idle(0) {
|
||||
}
|
||||
}
|
||||
|
||||
void CThreadPool::jobDone(const CJob* job) const {
|
||||
void CThreadPool::jobDone(CJob* job) {
|
||||
// This must be called with the mutex locked!
|
||||
|
||||
enum CJob::EJobState oldState = job->m_eState;
|
||||
job->m_eState = CJob::DONE;
|
||||
|
||||
if (oldState == CJob::CANCELLED) {
|
||||
// Signal the main thread that cancellation is done
|
||||
m_cancellationCond.signal();
|
||||
return;
|
||||
}
|
||||
|
||||
// This write() must succeed because POSIX guarantees that writes of
|
||||
// less than PIPE_BUF are atomic (and PIPE_BUF is at least 512).
|
||||
// (Yes, this really wants to write a pointer(!) to the pipe.
|
||||
@@ -51,6 +64,10 @@ void CThreadPool::jobDone(const CJob* job) const {
|
||||
}
|
||||
|
||||
void CThreadPool::handlePipeReadable() const {
|
||||
finishJob(getJobFromPipe());
|
||||
}
|
||||
|
||||
CJob *CThreadPool::getJobFromPipe() const {
|
||||
CJob* a = NULL;
|
||||
ssize_t need = sizeof(a);
|
||||
ssize_t r = read(m_iJobPipe[0], &a, need);
|
||||
@@ -58,8 +75,12 @@ void CThreadPool::handlePipeReadable() const {
|
||||
DEBUG("Something bad happened during read() from a pipe for thread pool: " << strerror(errno));
|
||||
exit(1);
|
||||
}
|
||||
a->runMain();
|
||||
delete a;
|
||||
return a;
|
||||
}
|
||||
|
||||
void CThreadPool::finishJob(CJob *job) const {
|
||||
job->runMain();
|
||||
delete job;
|
||||
}
|
||||
|
||||
CThreadPool::~CThreadPool() {
|
||||
@@ -101,12 +122,13 @@ void CThreadPool::threadFunc() {
|
||||
|
||||
// Now do the actual job
|
||||
m_num_idle--;
|
||||
job->m_eState = CJob::RUNNING;
|
||||
guard.unlock();
|
||||
|
||||
job->runThread();
|
||||
jobDone(job);
|
||||
|
||||
guard.lock();
|
||||
jobDone(job);
|
||||
m_num_idle++;
|
||||
}
|
||||
assert(m_num_threads > 0 && m_num_idle > 0);
|
||||
@@ -133,4 +155,90 @@ void CThreadPool::addJob(CJob *job) {
|
||||
CThread::startThread(threadPoolFunc, this);
|
||||
}
|
||||
|
||||
void CThreadPool::cancelJob(CJob *job) {
|
||||
std::set<CJob *> jobs;
|
||||
jobs.insert(job);
|
||||
cancelJobs(jobs);
|
||||
}
|
||||
|
||||
void CThreadPool::cancelJobs(const std::set<CJob *> &jobs) {
|
||||
CMutexLocker guard(m_mutex);
|
||||
std::set<CJob *> wait, finished, deleteLater;
|
||||
std::set<CJob *>::const_iterator it;
|
||||
|
||||
// Start cancelling all jobs
|
||||
for (it = jobs.begin(); it != jobs.end(); ++it) {
|
||||
switch ((*it)->m_eState) {
|
||||
case CJob::READY: {
|
||||
(*it)->m_eState = CJob::CANCELLED;
|
||||
|
||||
// Job wasn't started yet, must be in the queue
|
||||
std::list<CJob *>::iterator it2 = std::find(m_jobs.begin(), m_jobs.end(), *it);
|
||||
assert(it2 != m_jobs.end());
|
||||
m_jobs.erase(it2);
|
||||
deleteLater.insert(*it);
|
||||
continue;
|
||||
}
|
||||
|
||||
case CJob::RUNNING:
|
||||
(*it)->m_eState = CJob::CANCELLED;
|
||||
wait.insert(*it);
|
||||
continue;
|
||||
|
||||
case CJob::DONE:
|
||||
finished.insert(*it);
|
||||
continue;
|
||||
|
||||
case CJob::CANCELLED:
|
||||
default:
|
||||
assert(0);
|
||||
}
|
||||
}
|
||||
|
||||
// Now wait for cancellation to be done
|
||||
|
||||
// Collect jobs that really were cancelled. Finished cancellation is
|
||||
// signaled by changing their state to DONE.
|
||||
while (!wait.empty()) {
|
||||
it = wait.begin();
|
||||
while (it != wait.end()) {
|
||||
if ((*it)->m_eState != CJob::CANCELLED) {
|
||||
assert((*it)->m_eState == CJob::DONE);
|
||||
// Re-set state for the destructor
|
||||
(*it)->m_eState = CJob::CANCELLED;;
|
||||
deleteLater.insert(*it);
|
||||
wait.erase(it++);
|
||||
} else
|
||||
it++;
|
||||
}
|
||||
|
||||
if (wait.empty())
|
||||
break;
|
||||
|
||||
// Then wait for more to be done
|
||||
m_cancellationCond.wait(m_mutex);
|
||||
}
|
||||
|
||||
// We must call destructors with m_mutex unlocked so that they can call wasCancelled()
|
||||
guard.unlock();
|
||||
|
||||
// Handle finished jobs. They must already be in the pipe.
|
||||
while (!finished.empty()) {
|
||||
CJob *job = getJobFromPipe();
|
||||
finishJob(job);
|
||||
finished.erase(job);
|
||||
}
|
||||
|
||||
// Delete things that still need to be deleted
|
||||
while (!deleteLater.empty()) {
|
||||
delete *deleteLater.begin();
|
||||
deleteLater.erase(deleteLater.begin());
|
||||
}
|
||||
}
|
||||
|
||||
bool CJob::wasCancelled() const {
|
||||
CMutexLocker guard(CThreadPool::Get().m_mutex);
|
||||
return m_eState == CANCELLED;
|
||||
}
|
||||
|
||||
#endif // HAVE_PTHREAD
|
||||
|
||||
Reference in New Issue
Block a user