diff --git a/include/znc/Threads.h b/include/znc/Threads.h index e39f157c..5c614994 100644 --- a/include/znc/Threads.h +++ b/include/znc/Threads.h @@ -216,6 +216,17 @@ private: */ class CJob { public: + friend class CThreadPool; + + enum EJobState { + READY, + RUNNING, + DONE, + CANCELLED + }; + + CJob() : m_eState(READY) {} + /// Destructor, always called from the main thread. virtual ~CJob() {} @@ -224,17 +235,26 @@ public: /// This function is called from the main thread after runThread() /// finishes. It can be used to handle the results from runThread() - /// without needing synchronization primitives. + /// without needing synchronization primitives. virtual void runMain() = 0; + /// This can be used to check if the job was cancelled. For example, + /// runThread() can return early if this returns true. + bool wasCancelled() const; + private: // Undefined copy constructor and assignment operator CJob(const CJob&); CJob& operator=(const CJob&); + + // Synchronized via the thread pool's mutex! Do not access without that mutex! + EJobState m_eState; }; class CThreadPool { private: + friend class CJob; + CThreadPool(); ~CThreadPool(); @@ -244,6 +264,16 @@ public: /// Add a job to the thread pool and run it. The job will be deleted when done. void addJob(CJob *job); + /// Cancel a job that was previously passed to addJob(). This *might* + /// mean that runThread() and/or runMain() will not be called on the job. + /// This function BLOCKS until the job finishes! + void cancelJob(CJob *job); + + /// Cancel some jobs that were previously passed to addJob(). This *might* + /// mean that runThread() and/or runMain() will not be called on some of + /// the jobs. This function BLOCKS until all jobs finish! + void cancelJobs(const std::set &jobs); + int getReadFD() const { return m_iJobPipe[0]; } @@ -251,11 +281,14 @@ public: void handlePipeReadable() const; private: - void jobDone(const CJob* pJob) const; + void jobDone(CJob* pJob); // Check if the calling thread is still needed, must be called with m_mutex held bool threadNeeded() const; + CJob *getJobFromPipe() const; + void finishJob(CJob *) const; + void threadFunc(); static void *threadPoolFunc(void *arg) { CThreadPool &pool = *reinterpret_cast(arg); @@ -269,6 +302,9 @@ private: // condition variable for waiting idle threads CConditionVariable m_cond; + // condition variable for reporting finished cancellation + CConditionVariable m_cancellationCond; + // when this is true, all threads should exit bool m_done; diff --git a/src/Threads.cpp b/src/Threads.cpp index aeae3091..ad3b9312 100644 --- a/src/Threads.cpp +++ b/src/Threads.cpp @@ -15,10 +15,12 @@ */ #include -#include #ifdef HAVE_PTHREAD +#include +#include + /* Just an arbitrary limit for the number of idle threads */ static const size_t MAX_IDLE_THREADS = 3; @@ -39,7 +41,18 @@ CThreadPool::CThreadPool() : m_done(false), m_num_threads(0), m_num_idle(0) { } } -void CThreadPool::jobDone(const CJob* job) const { +void CThreadPool::jobDone(CJob* job) { + // This must be called with the mutex locked! + + enum CJob::EJobState oldState = job->m_eState; + job->m_eState = CJob::DONE; + + if (oldState == CJob::CANCELLED) { + // Signal the main thread that cancellation is done + m_cancellationCond.signal(); + return; + } + // This write() must succeed because POSIX guarantees that writes of // less than PIPE_BUF are atomic (and PIPE_BUF is at least 512). // (Yes, this really wants to write a pointer(!) to the pipe. @@ -51,6 +64,10 @@ void CThreadPool::jobDone(const CJob* job) const { } void CThreadPool::handlePipeReadable() const { + finishJob(getJobFromPipe()); +} + +CJob *CThreadPool::getJobFromPipe() const { CJob* a = NULL; ssize_t need = sizeof(a); ssize_t r = read(m_iJobPipe[0], &a, need); @@ -58,8 +75,12 @@ void CThreadPool::handlePipeReadable() const { DEBUG("Something bad happened during read() from a pipe for thread pool: " << strerror(errno)); exit(1); } - a->runMain(); - delete a; + return a; +} + +void CThreadPool::finishJob(CJob *job) const { + job->runMain(); + delete job; } CThreadPool::~CThreadPool() { @@ -101,12 +122,13 @@ void CThreadPool::threadFunc() { // Now do the actual job m_num_idle--; + job->m_eState = CJob::RUNNING; guard.unlock(); job->runThread(); - jobDone(job); guard.lock(); + jobDone(job); m_num_idle++; } assert(m_num_threads > 0 && m_num_idle > 0); @@ -133,4 +155,90 @@ void CThreadPool::addJob(CJob *job) { CThread::startThread(threadPoolFunc, this); } +void CThreadPool::cancelJob(CJob *job) { + std::set jobs; + jobs.insert(job); + cancelJobs(jobs); +} + +void CThreadPool::cancelJobs(const std::set &jobs) { + CMutexLocker guard(m_mutex); + std::set wait, finished, deleteLater; + std::set::const_iterator it; + + // Start cancelling all jobs + for (it = jobs.begin(); it != jobs.end(); ++it) { + switch ((*it)->m_eState) { + case CJob::READY: { + (*it)->m_eState = CJob::CANCELLED; + + // Job wasn't started yet, must be in the queue + std::list::iterator it2 = std::find(m_jobs.begin(), m_jobs.end(), *it); + assert(it2 != m_jobs.end()); + m_jobs.erase(it2); + deleteLater.insert(*it); + continue; + } + + case CJob::RUNNING: + (*it)->m_eState = CJob::CANCELLED; + wait.insert(*it); + continue; + + case CJob::DONE: + finished.insert(*it); + continue; + + case CJob::CANCELLED: + default: + assert(0); + } + } + + // Now wait for cancellation to be done + + // Collect jobs that really were cancelled. Finished cancellation is + // signaled by changing their state to DONE. + while (!wait.empty()) { + it = wait.begin(); + while (it != wait.end()) { + if ((*it)->m_eState != CJob::CANCELLED) { + assert((*it)->m_eState == CJob::DONE); + // Re-set state for the destructor + (*it)->m_eState = CJob::CANCELLED;; + deleteLater.insert(*it); + wait.erase(it++); + } else + it++; + } + + if (wait.empty()) + break; + + // Then wait for more to be done + m_cancellationCond.wait(m_mutex); + } + + // We must call destructors with m_mutex unlocked so that they can call wasCancelled() + guard.unlock(); + + // Handle finished jobs. They must already be in the pipe. + while (!finished.empty()) { + CJob *job = getJobFromPipe(); + finishJob(job); + finished.erase(job); + } + + // Delete things that still need to be deleted + while (!deleteLater.empty()) { + delete *deleteLater.begin(); + deleteLater.erase(deleteLater.begin()); + } +} + +bool CJob::wasCancelled() const { + CMutexLocker guard(CThreadPool::Get().m_mutex); + return m_eState == CANCELLED; +} + #endif // HAVE_PTHREAD diff --git a/test/ThreadTest.cpp b/test/ThreadTest.cpp index 7b015913..8e7a65ad 100644 --- a/test/ThreadTest.cpp +++ b/test/ThreadTest.cpp @@ -27,6 +27,7 @@ public: ~CWaitingJob() { EXPECT_TRUE(m_bThreadReady); EXPECT_TRUE(m_bThreadDone); + EXPECT_FALSE(wasCancelled()); m_bDestroyed = true; } @@ -72,3 +73,113 @@ TEST(Thread, RunJob) { while (!destroyed) CThreadPool::Get().handlePipeReadable(); } + +class CCancelJob : public CJob { +public: + CCancelJob(bool& destroyed) + : m_bDestroyed(destroyed), m_Mutex(), m_CVThreadReady(), m_bThreadReady(false) { + } + + ~CCancelJob() { + EXPECT_TRUE(wasCancelled()); + m_bDestroyed = true; + } + + void wait() { + CMutexLocker locker(m_Mutex); + // Wait for the thread to run + while (!m_bThreadReady) + m_CVThreadReady.wait(m_Mutex); + } + + virtual void runThread() { + m_Mutex.lock(); + // We are running, tell the main thread + m_bThreadReady = true; + m_CVThreadReady.broadcast(); + // Have to unlock here so that wait() can get the mutex + m_Mutex.unlock(); + + while (!wasCancelled()) { + // We can't do much besides busy-looping here. If the + // job really gets cancelled while it is already + // running, the main thread is stuck in cancelJob(), so + // it cannot signal us in any way. And signaling us + // before calling cancelJob() effictively is the same + // thing as busy looping anyway. So busy looping it is. + // (Yes, CJob shouldn't be used for anything that + // requires synchronisation between threads!) + } + } + + virtual void runMain() { } + +private: + bool& m_bDestroyed; + CMutex m_Mutex; + CConditionVariable m_CVThreadReady; + bool m_bThreadReady; +}; + +TEST(Thread, CancelJobEarly) { + bool destroyed = false; + CCancelJob *pJob = new CCancelJob(destroyed); + + CThreadPool::Get().addJob(pJob); + // Don't wait for the job to run. The idea here is that we are calling + // cancelJob() before pJob->runThread() runs, but this is a race. + CThreadPool::Get().cancelJob(pJob); + + // cancelJob() should only return after successful cancellation + EXPECT_TRUE(destroyed); +} + +TEST(Thread, CancelJobWhileRunning) { + bool destroyed = false; + CCancelJob *pJob = new CCancelJob(destroyed); + + CThreadPool::Get().addJob(pJob); + // Wait for the job to run + pJob->wait(); + CThreadPool::Get().cancelJob(pJob); + + // cancelJob() should only return after successful cancellation + EXPECT_TRUE(destroyed); +} + +class CEmptyJob : public CJob { +public: + CEmptyJob(bool& destroyed) + : m_bDestroyed(destroyed) { + } + + ~CEmptyJob() { + EXPECT_FALSE(wasCancelled()); + m_bDestroyed = true; + } + + virtual void runThread() { } + virtual void runMain() { } + +private: + bool& m_bDestroyed; +}; + +TEST(Thread, CancelJobWhenDone) { + bool destroyed = false; + CEmptyJob *pJob = new CEmptyJob(destroyed); + + CThreadPool::Get().addJob(pJob); + + // Wait for the job to finish + fd_set fds; + FD_ZERO(&fds); + FD_SET(CThreadPool::Get().getReadFD(), &fds); + EXPECT_EQ(1, select(1 + CThreadPool::Get().getReadFD(), &fds, NULL, NULL, NULL)); + + // And only cancel it afterwards + CThreadPool::Get().cancelJob(pJob); + + // cancelJob() should only return after successful cancellation + EXPECT_TRUE(destroyed); +}