From d128b418449e53e42d45db8d45321b324c67ff6e Mon Sep 17 00:00:00 2001 From: Uli Schlachter Date: Wed, 6 Aug 2014 14:19:38 +0200 Subject: [PATCH 01/11] Threads: Add some undefined constructors etc This is needed to "get rid" of the C++ default implementation. Yes, I know that this can be done way nicer with C++11... Signed-off-by: Uli Schlachter --- include/znc/Threads.h | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/include/znc/Threads.h b/include/znc/Threads.h index ace1ca0a..cdae9ffe 100644 --- a/include/znc/Threads.h +++ b/include/znc/Threads.h @@ -71,6 +71,10 @@ public: } private: + // Undefined copy constructor and assignment operator + CMutex(const CMutex&); + CMutex& operator=(const CMutex&); + pthread_mutex_t m_mutex; }; @@ -105,6 +109,10 @@ public: } private: + // Undefined copy constructor and assignment operator + CMutexLocker(const CMutexLocker&); + CMutexLocker& operator=(const CMutexLocker&); + CMutex &m_mutex; bool m_locked; }; @@ -161,6 +169,10 @@ public: } private: + // Undefined copy constructor and assignment operator + CConditionVariable(const CConditionVariable&); + CConditionVariable& operator=(const CConditionVariable&); + pthread_cond_t m_cond; }; @@ -185,6 +197,10 @@ public: exit(1); } } + +private: + // Undefined constructor + CThread(); }; class CJob { @@ -192,6 +208,11 @@ public: virtual ~CJob() {} virtual void runThread() = 0; virtual void runMain() = 0; + +private: + // Undefined copy constructor and assignment operator + CJob(const CJob&); + CJob& operator=(const CJob&); }; class CThreadPool { From d4fefd188827f8ad0fda519b853c4c16758b4d20 Mon Sep 17 00:00:00 2001 From: Uli Schlachter Date: Wed, 6 Aug 2014 14:25:18 +0200 Subject: [PATCH 02/11] StringTest: Make a local function static Signed-off-by: Uli Schlachter --- test/StringTest.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/StringTest.cpp b/test/StringTest.cpp index b27fc182..02be98fb 100644 --- a/test/StringTest.cpp +++ b/test/StringTest.cpp @@ -18,7 +18,7 @@ #include // GTest uses this function to output objects -void PrintTo(const CString& s, std::ostream* o) { +static void PrintTo(const CString& s, std::ostream* o) { *o << '"' << s.Escape_n(CString::EASCII, CString::EDEBUG) << '"'; } From 0e057702db06fb03d7cafddd7c410fe5a13246ea Mon Sep 17 00:00:00 2001 From: Uli Schlachter Date: Wed, 6 Aug 2014 14:46:01 +0200 Subject: [PATCH 03/11] Add a ThreadTest to the testsuite Signed-off-by: Uli Schlachter --- Makefile.in | 2 +- test/ThreadTest.cpp | 74 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 75 insertions(+), 1 deletion(-) create mode 100644 test/ThreadTest.cpp diff --git a/Makefile.in b/Makefile.in index cfa5141b..8f676eb2 100644 --- a/Makefile.in +++ b/Makefile.in @@ -47,7 +47,7 @@ LIB_SRCS := $(addprefix src/,$(LIB_SRCS)) BIN_SRCS := src/main.cpp LIB_OBJS := $(patsubst %cpp,%o,$(LIB_SRCS)) BIN_OBJS := $(patsubst %cpp,%o,$(BIN_SRCS)) -TESTS := StringTest ConfigTest UtilsTest +TESTS := StringTest ConfigTest UtilsTest ThreadTest TESTS := $(addprefix test/,$(addsuffix .o,$(TESTS))) CLEAN := znc src/*.o test/*.o core core.* .version_extra .depend modules/.depend unittest DISTCLEAN := Makefile config.log config.status znc-buildmod \ diff --git a/test/ThreadTest.cpp b/test/ThreadTest.cpp new file mode 100644 index 00000000..7b015913 --- /dev/null +++ b/test/ThreadTest.cpp @@ -0,0 +1,74 @@ +/* + * Copyright (C) 2004-2014 ZNC, see the NOTICE file for details. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include + +class CWaitingJob : public CJob { +public: + CWaitingJob(bool& destroyed) + : m_bDestroyed(destroyed), m_Mutex(), m_CV(), m_bThreadReady(false), m_bThreadDone(false) { + }; + + + ~CWaitingJob() { + EXPECT_TRUE(m_bThreadReady); + EXPECT_TRUE(m_bThreadDone); + m_bDestroyed = true; + } + + void signal() { + CMutexLocker locker(m_Mutex); + // Wait for the thread to run + while (!m_bThreadReady) + m_CV.wait(m_Mutex); + + // and signal it to exit + m_bThreadDone = true; + m_CV.broadcast(); + } + + virtual void runThread() { + CMutexLocker locker(m_Mutex); + // We are running + m_bThreadReady = true; + m_CV.broadcast(); + + // wait for our exit signal + while (!m_bThreadDone) + m_CV.wait(m_Mutex); + } + + virtual void runMain() {} + +private: + bool& m_bDestroyed; + CMutex m_Mutex; + CConditionVariable m_CV; + bool m_bThreadReady; + bool m_bThreadDone; +}; + +TEST(Thread, RunJob) { + bool destroyed = false; + CWaitingJob *pJob = new CWaitingJob(destroyed); + + CThreadPool::Get().addJob(pJob); + pJob->signal(); + + while (!destroyed) + CThreadPool::Get().handlePipeReadable(); +} From 970b8020735ceccb8cc07fab182e7e2e41db2225 Mon Sep 17 00:00:00 2001 From: Uli Schlachter Date: Wed, 6 Aug 2014 14:46:46 +0200 Subject: [PATCH 04/11] modpython, modperl: Parse all headers Signed-off-by: Uli Schlachter --- modules/modperl/modperl.i | 3 +++ modules/modpython/modpython.i | 3 +++ 2 files changed, 6 insertions(+) diff --git a/modules/modperl/modperl.i b/modules/modperl/modperl.i index e7aac012..739799a8 100644 --- a/modules/modperl/modperl.i +++ b/modules/modperl/modperl.i @@ -26,6 +26,7 @@ #endif #include #include "../include/znc/Utils.h" +#include "../include/znc/Threads.h" #include "../include/znc/Config.h" #include "../include/znc/Socket.h" #include "../include/znc/Modules.h" @@ -115,9 +116,11 @@ class MCString : public std::map {}; #define u_short unsigned short #define u_int unsigned int +#include "../include/znc/zncconfig.h" #include "../include/znc/ZNCString.h" %include "../include/znc/defines.h" %include "../include/znc/Utils.h" +%include "../include/znc/Threads.h" %include "../include/znc/Config.h" %include "../include/znc/Csocket.h" %template(ZNCSocketManager) TSocketManager; diff --git a/modules/modpython/modpython.i b/modules/modpython/modpython.i index 98ea6cbd..8495d09a 100644 --- a/modules/modpython/modpython.i +++ b/modules/modpython/modpython.i @@ -17,6 +17,7 @@ %module znc_core %{ #include #include "../include/znc/Utils.h" +#include "../include/znc/Threads.h" #include "../include/znc/Config.h" #include "../include/znc/Socket.h" #include "../include/znc/Modules.h" @@ -123,9 +124,11 @@ class MCString : public std::map {}; #define u_short unsigned short #define u_int unsigned int +#include "../include/znc/zncconfig.h" #include "../include/znc/ZNCString.h" %include "../include/znc/defines.h" %include "../include/znc/Utils.h" +%include "../include/znc/Threads.h" %template(PAuthBase) CSmartPtr; %template(WebSession) CSmartPtr; %include "../include/znc/Config.h" From 6118ad53457f5b95920d5197cb3204a7b8a35198 Mon Sep 17 00:00:00 2001 From: Uli Schlachter Date: Wed, 6 Aug 2014 14:49:59 +0200 Subject: [PATCH 05/11] Threads.h: Add some doxygen comments Signed-off-by: Uli Schlachter --- include/znc/Threads.h | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/include/znc/Threads.h b/include/znc/Threads.h index cdae9ffe..e39f157c 100644 --- a/include/znc/Threads.h +++ b/include/znc/Threads.h @@ -203,10 +203,28 @@ private: CThread(); }; +/** + * A job is a task which should run without blocking the main thread. You do + * this by inheriting from this class and implementing the pure virtual methods + * runThread(), which gets executed in a separate thread and does not block the + * main thread, and runMain() which gets automatically called from the main + * thread after runThread() finishes. + * + * After you create a new instance of your class, you can pass it to + * CThreadPool()::Get().addJob(job) to start it. The thread pool automatically + * deletes your class after it finished. + */ class CJob { public: + /// Destructor, always called from the main thread. virtual ~CJob() {} + + /// This function is called in a separate thread and can do heavy, blocking work. virtual void runThread() = 0; + + /// This function is called from the main thread after runThread() + /// finishes. It can be used to handle the results from runThread() + /// without needing synchronization primitives. virtual void runMain() = 0; private: @@ -223,6 +241,7 @@ private: public: static CThreadPool& Get(); + /// Add a job to the thread pool and run it. The job will be deleted when done. void addJob(CJob *job); int getReadFD() const { From 1d67e87d905f730ffaf48764aced9d1ece8035f3 Mon Sep 17 00:00:00 2001 From: Uli Schlachter Date: Wed, 6 Aug 2014 15:15:57 +0200 Subject: [PATCH 06/11] CThreadPool: Add cancellation support This adds CThreadPool::cancelJob() and cancelJobs() which can cancel a set of jobs synchronously. These functions only return when the job was successfully cancelled. It tries to cancel the jobs as quickly as possible, skipping any callbacks on CJob that were not yet called. A job that is already running can use CJob::wasCancelled() to check if it should quit. Signed-off-by: Uli Schlachter --- include/znc/Threads.h | 40 +++++++++++++- src/Threads.cpp | 118 ++++++++++++++++++++++++++++++++++++++++-- test/ThreadTest.cpp | 111 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 262 insertions(+), 7 deletions(-) diff --git a/include/znc/Threads.h b/include/znc/Threads.h index e39f157c..5c614994 100644 --- a/include/znc/Threads.h +++ b/include/znc/Threads.h @@ -216,6 +216,17 @@ private: */ class CJob { public: + friend class CThreadPool; + + enum EJobState { + READY, + RUNNING, + DONE, + CANCELLED + }; + + CJob() : m_eState(READY) {} + /// Destructor, always called from the main thread. virtual ~CJob() {} @@ -224,17 +235,26 @@ public: /// This function is called from the main thread after runThread() /// finishes. It can be used to handle the results from runThread() - /// without needing synchronization primitives. + /// without needing synchronization primitives. virtual void runMain() = 0; + /// This can be used to check if the job was cancelled. For example, + /// runThread() can return early if this returns true. + bool wasCancelled() const; + private: // Undefined copy constructor and assignment operator CJob(const CJob&); CJob& operator=(const CJob&); + + // Synchronized via the thread pool's mutex! Do not access without that mutex! + EJobState m_eState; }; class CThreadPool { private: + friend class CJob; + CThreadPool(); ~CThreadPool(); @@ -244,6 +264,16 @@ public: /// Add a job to the thread pool and run it. The job will be deleted when done. void addJob(CJob *job); + /// Cancel a job that was previously passed to addJob(). This *might* + /// mean that runThread() and/or runMain() will not be called on the job. + /// This function BLOCKS until the job finishes! + void cancelJob(CJob *job); + + /// Cancel some jobs that were previously passed to addJob(). This *might* + /// mean that runThread() and/or runMain() will not be called on some of + /// the jobs. This function BLOCKS until all jobs finish! + void cancelJobs(const std::set &jobs); + int getReadFD() const { return m_iJobPipe[0]; } @@ -251,11 +281,14 @@ public: void handlePipeReadable() const; private: - void jobDone(const CJob* pJob) const; + void jobDone(CJob* pJob); // Check if the calling thread is still needed, must be called with m_mutex held bool threadNeeded() const; + CJob *getJobFromPipe() const; + void finishJob(CJob *) const; + void threadFunc(); static void *threadPoolFunc(void *arg) { CThreadPool &pool = *reinterpret_cast(arg); @@ -269,6 +302,9 @@ private: // condition variable for waiting idle threads CConditionVariable m_cond; + // condition variable for reporting finished cancellation + CConditionVariable m_cancellationCond; + // when this is true, all threads should exit bool m_done; diff --git a/src/Threads.cpp b/src/Threads.cpp index aeae3091..ad3b9312 100644 --- a/src/Threads.cpp +++ b/src/Threads.cpp @@ -15,10 +15,12 @@ */ #include -#include #ifdef HAVE_PTHREAD +#include +#include + /* Just an arbitrary limit for the number of idle threads */ static const size_t MAX_IDLE_THREADS = 3; @@ -39,7 +41,18 @@ CThreadPool::CThreadPool() : m_done(false), m_num_threads(0), m_num_idle(0) { } } -void CThreadPool::jobDone(const CJob* job) const { +void CThreadPool::jobDone(CJob* job) { + // This must be called with the mutex locked! + + enum CJob::EJobState oldState = job->m_eState; + job->m_eState = CJob::DONE; + + if (oldState == CJob::CANCELLED) { + // Signal the main thread that cancellation is done + m_cancellationCond.signal(); + return; + } + // This write() must succeed because POSIX guarantees that writes of // less than PIPE_BUF are atomic (and PIPE_BUF is at least 512). // (Yes, this really wants to write a pointer(!) to the pipe. @@ -51,6 +64,10 @@ void CThreadPool::jobDone(const CJob* job) const { } void CThreadPool::handlePipeReadable() const { + finishJob(getJobFromPipe()); +} + +CJob *CThreadPool::getJobFromPipe() const { CJob* a = NULL; ssize_t need = sizeof(a); ssize_t r = read(m_iJobPipe[0], &a, need); @@ -58,8 +75,12 @@ void CThreadPool::handlePipeReadable() const { DEBUG("Something bad happened during read() from a pipe for thread pool: " << strerror(errno)); exit(1); } - a->runMain(); - delete a; + return a; +} + +void CThreadPool::finishJob(CJob *job) const { + job->runMain(); + delete job; } CThreadPool::~CThreadPool() { @@ -101,12 +122,13 @@ void CThreadPool::threadFunc() { // Now do the actual job m_num_idle--; + job->m_eState = CJob::RUNNING; guard.unlock(); job->runThread(); - jobDone(job); guard.lock(); + jobDone(job); m_num_idle++; } assert(m_num_threads > 0 && m_num_idle > 0); @@ -133,4 +155,90 @@ void CThreadPool::addJob(CJob *job) { CThread::startThread(threadPoolFunc, this); } +void CThreadPool::cancelJob(CJob *job) { + std::set jobs; + jobs.insert(job); + cancelJobs(jobs); +} + +void CThreadPool::cancelJobs(const std::set &jobs) { + CMutexLocker guard(m_mutex); + std::set wait, finished, deleteLater; + std::set::const_iterator it; + + // Start cancelling all jobs + for (it = jobs.begin(); it != jobs.end(); ++it) { + switch ((*it)->m_eState) { + case CJob::READY: { + (*it)->m_eState = CJob::CANCELLED; + + // Job wasn't started yet, must be in the queue + std::list::iterator it2 = std::find(m_jobs.begin(), m_jobs.end(), *it); + assert(it2 != m_jobs.end()); + m_jobs.erase(it2); + deleteLater.insert(*it); + continue; + } + + case CJob::RUNNING: + (*it)->m_eState = CJob::CANCELLED; + wait.insert(*it); + continue; + + case CJob::DONE: + finished.insert(*it); + continue; + + case CJob::CANCELLED: + default: + assert(0); + } + } + + // Now wait for cancellation to be done + + // Collect jobs that really were cancelled. Finished cancellation is + // signaled by changing their state to DONE. + while (!wait.empty()) { + it = wait.begin(); + while (it != wait.end()) { + if ((*it)->m_eState != CJob::CANCELLED) { + assert((*it)->m_eState == CJob::DONE); + // Re-set state for the destructor + (*it)->m_eState = CJob::CANCELLED;; + deleteLater.insert(*it); + wait.erase(it++); + } else + it++; + } + + if (wait.empty()) + break; + + // Then wait for more to be done + m_cancellationCond.wait(m_mutex); + } + + // We must call destructors with m_mutex unlocked so that they can call wasCancelled() + guard.unlock(); + + // Handle finished jobs. They must already be in the pipe. + while (!finished.empty()) { + CJob *job = getJobFromPipe(); + finishJob(job); + finished.erase(job); + } + + // Delete things that still need to be deleted + while (!deleteLater.empty()) { + delete *deleteLater.begin(); + deleteLater.erase(deleteLater.begin()); + } +} + +bool CJob::wasCancelled() const { + CMutexLocker guard(CThreadPool::Get().m_mutex); + return m_eState == CANCELLED; +} + #endif // HAVE_PTHREAD diff --git a/test/ThreadTest.cpp b/test/ThreadTest.cpp index 7b015913..8e7a65ad 100644 --- a/test/ThreadTest.cpp +++ b/test/ThreadTest.cpp @@ -27,6 +27,7 @@ public: ~CWaitingJob() { EXPECT_TRUE(m_bThreadReady); EXPECT_TRUE(m_bThreadDone); + EXPECT_FALSE(wasCancelled()); m_bDestroyed = true; } @@ -72,3 +73,113 @@ TEST(Thread, RunJob) { while (!destroyed) CThreadPool::Get().handlePipeReadable(); } + +class CCancelJob : public CJob { +public: + CCancelJob(bool& destroyed) + : m_bDestroyed(destroyed), m_Mutex(), m_CVThreadReady(), m_bThreadReady(false) { + } + + ~CCancelJob() { + EXPECT_TRUE(wasCancelled()); + m_bDestroyed = true; + } + + void wait() { + CMutexLocker locker(m_Mutex); + // Wait for the thread to run + while (!m_bThreadReady) + m_CVThreadReady.wait(m_Mutex); + } + + virtual void runThread() { + m_Mutex.lock(); + // We are running, tell the main thread + m_bThreadReady = true; + m_CVThreadReady.broadcast(); + // Have to unlock here so that wait() can get the mutex + m_Mutex.unlock(); + + while (!wasCancelled()) { + // We can't do much besides busy-looping here. If the + // job really gets cancelled while it is already + // running, the main thread is stuck in cancelJob(), so + // it cannot signal us in any way. And signaling us + // before calling cancelJob() effictively is the same + // thing as busy looping anyway. So busy looping it is. + // (Yes, CJob shouldn't be used for anything that + // requires synchronisation between threads!) + } + } + + virtual void runMain() { } + +private: + bool& m_bDestroyed; + CMutex m_Mutex; + CConditionVariable m_CVThreadReady; + bool m_bThreadReady; +}; + +TEST(Thread, CancelJobEarly) { + bool destroyed = false; + CCancelJob *pJob = new CCancelJob(destroyed); + + CThreadPool::Get().addJob(pJob); + // Don't wait for the job to run. The idea here is that we are calling + // cancelJob() before pJob->runThread() runs, but this is a race. + CThreadPool::Get().cancelJob(pJob); + + // cancelJob() should only return after successful cancellation + EXPECT_TRUE(destroyed); +} + +TEST(Thread, CancelJobWhileRunning) { + bool destroyed = false; + CCancelJob *pJob = new CCancelJob(destroyed); + + CThreadPool::Get().addJob(pJob); + // Wait for the job to run + pJob->wait(); + CThreadPool::Get().cancelJob(pJob); + + // cancelJob() should only return after successful cancellation + EXPECT_TRUE(destroyed); +} + +class CEmptyJob : public CJob { +public: + CEmptyJob(bool& destroyed) + : m_bDestroyed(destroyed) { + } + + ~CEmptyJob() { + EXPECT_FALSE(wasCancelled()); + m_bDestroyed = true; + } + + virtual void runThread() { } + virtual void runMain() { } + +private: + bool& m_bDestroyed; +}; + +TEST(Thread, CancelJobWhenDone) { + bool destroyed = false; + CEmptyJob *pJob = new CEmptyJob(destroyed); + + CThreadPool::Get().addJob(pJob); + + // Wait for the job to finish + fd_set fds; + FD_ZERO(&fds); + FD_SET(CThreadPool::Get().getReadFD(), &fds); + EXPECT_EQ(1, select(1 + CThreadPool::Get().getReadFD(), &fds, NULL, NULL, NULL)); + + // And only cancel it afterwards + CThreadPool::Get().cancelJob(pJob); + + // cancelJob() should only return after successful cancellation + EXPECT_TRUE(destroyed); +} From 0fddbba230a3795fb6e5e214485f1c630e797b74 Mon Sep 17 00:00:00 2001 From: Uli Schlachter Date: Wed, 6 Aug 2014 15:20:12 +0200 Subject: [PATCH 07/11] CThreadPool: Fix a race when starting threads There was a race where threads were themselves responsible for increasing the counter for the number of running threads. This left an open window where the thread was already started, but our counter was not yet increased. The effect of this race would be that we start more threads than we should. Fix this by increasing the counter before actually starting new worker threads. Signed-off-by: Uli Schlachter --- src/Threads.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/Threads.cpp b/src/Threads.cpp index ad3b9312..492056cb 100644 --- a/src/Threads.cpp +++ b/src/Threads.cpp @@ -104,7 +104,7 @@ bool CThreadPool::threadNeeded() const { void CThreadPool::threadFunc() { CMutexLocker guard(m_mutex); - m_num_threads++; + // m_num_threads was already increased m_num_idle++; while (true) { @@ -152,6 +152,7 @@ void CThreadPool::addJob(CJob *job) { return; // Start a new thread for our pool + m_num_threads++; CThread::startThread(threadPoolFunc, this); } From 308df2151192f297516d5f8cf0fdacd4adcf4d7a Mon Sep 17 00:00:00 2001 From: Uli Schlachter Date: Wed, 6 Aug 2014 15:32:41 +0200 Subject: [PATCH 08/11] Modules: Add a module version of CJob Signed-off-by: Uli Schlachter --- include/znc/Modules.h | 37 ++++++++++++++++++++++++++++++++ include/znc/Threads.h | 2 ++ src/Modules.cpp | 50 +++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 89 insertions(+) diff --git a/include/znc/Modules.h b/include/znc/Modules.h index 02433e8b..df864864 100644 --- a/include/znc/Modules.h +++ b/include/znc/Modules.h @@ -19,6 +19,7 @@ #include #include +#include #include #include #include @@ -177,6 +178,29 @@ private: FPTimer_t m_pFBCallback; }; +#ifdef HAVE_PTHREAD +/// A CJob version which can be safely used in modules. The job will be +/// cancelled when the module is unloaded. +class CModuleJob : public CJob { +public: + CModuleJob(CModule *pModule, const CString& sName, const CString& sDesc) + : CJob(), m_pModule(pModule), m_sName(sName), m_sDescription(sDesc) { + } + virtual ~CModuleJob(); + + // Getters + CModule* GetModule() const { return m_pModule; } + const CString& GetName() const { return m_sName; } + const CString& GetDescription() const { return m_sDescription; } + // !Getters + +protected: + CModule* m_pModule; + const CString m_sName; + const CString m_sDescription; +}; +#endif + class CModInfo { public: typedef CModule* (*ModLoader)(ModHandle p, CUser* pUser, CIRCNetwork* pNetwork, const CString& sModName, const CString& sModPath); @@ -885,6 +909,16 @@ public: virtual void ListSockets(); // !Socket stuff +#ifdef HAVE_PTHREAD + // Job stuff + void AddJob(CModuleJob *pJob); + void CancelJob(CModuleJob *pJob); + bool CancelJob(const CString& sJobName); + void CancelJobs(const std::set& sJobs); + bool UnlinkJob(CModuleJob *pJob); + // !Job stuff +#endif + // Command stuff /// Register the "Help" command. void AddHelpCommand(); @@ -1052,6 +1086,9 @@ protected: CString m_sDescription; std::set m_sTimers; std::set m_sSockets; +#ifdef HAVE_PTHREAD + std::set m_sJobs; +#endif ModHandle m_pDLL; CSockManager* m_pManager; CUser* m_pUser; diff --git a/include/znc/Threads.h b/include/znc/Threads.h index 5c614994..a83fdc46 100644 --- a/include/znc/Threads.h +++ b/include/znc/Threads.h @@ -213,6 +213,8 @@ private: * After you create a new instance of your class, you can pass it to * CThreadPool()::Get().addJob(job) to start it. The thread pool automatically * deletes your class after it finished. + * + * For modules you should use CModuleJob instead. */ class CJob { public: diff --git a/src/Modules.cpp b/src/Modules.cpp index 5e8e39a7..b696831f 100644 --- a/src/Modules.cpp +++ b/src/Modules.cpp @@ -157,6 +157,10 @@ CModule::~CModule() { } SaveRegistry(); + +#ifdef HAVE_PTHREAD + CancelJobs(m_sJobs); +#endif } void CModule::SetUser(CUser* pUser) { m_pUser = pUser; } @@ -449,6 +453,52 @@ void CModule::ListSockets() { PutModule(Table); } +#ifdef HAVE_PTHREAD +CModuleJob::~CModuleJob() +{ + m_pModule->UnlinkJob(this); +} + +void CModule::AddJob(CModuleJob *pJob) +{ + CThreadPool::Get().addJob(pJob); + m_sJobs.insert(pJob); +} + +void CModule::CancelJob(CModuleJob *pJob) +{ + if (pJob == NULL) + return; + // Destructor calls UnlinkJob and removes the job from m_sJobs + CThreadPool::Get().cancelJob(pJob); +} + +bool CModule::CancelJob(const CString& sJobName) +{ + set::iterator it; + for (it = m_sJobs.begin(); it != m_sJobs.end(); ++it) { + if ((*it)->GetName().Equals(sJobName)) { + CancelJob(*it); + return true; + } + } + return false; +} + +void CModule::CancelJobs(const std::set& sJobs) +{ + set sPlainJobs(sJobs.begin(), sJobs.end()); + + // Destructor calls UnlinkJob and removes the jobs from m_sJobs + CThreadPool::Get().cancelJobs(sPlainJobs); +} + +bool CModule::UnlinkJob(CModuleJob *pJob) +{ + return 0 != m_sJobs.erase(pJob); +} +#endif + bool CModule::AddCommand(const CModCommand& Command) { if (Command.GetFunction() == NULL) From 5a6224d2b4989f1cede45ba977c5b0a8c47f44fd Mon Sep 17 00:00:00 2001 From: Uli Schlachter Date: Wed, 6 Aug 2014 23:31:26 +0200 Subject: [PATCH 09/11] sample: Add an example for CModuleJob Signed-off-by: Uli Schlachter --- modules/sample.cpp | 31 +++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/modules/sample.cpp b/modules/sample.cpp index c666b4b3..043bc8e5 100644 --- a/modules/sample.cpp +++ b/modules/sample.cpp @@ -20,6 +20,36 @@ using std::vector; +class CSampleJob : public CModuleJob { +public: + CSampleJob(CModule *pModule) : CModuleJob(pModule, "sample", "Message the user after a delay") {} + + ~CSampleJob() { + if (wasCancelled()) { + m_pModule->PutModule("Sample job cancelled"); + } else { + m_pModule->PutModule("Sample job destroyed"); + } + } + + virtual void runThread() { + // Cannot safely use m_pModule in here, because this runs in its + // own thread and such an access would require synchronisation + // between this thread and the main thread! + + for (int i = 0; i < 10; i++) { + // Regularly check if we were cancelled + if (wasCancelled()) + return; + sleep(1); + } + } + + virtual void runMain() { + m_pModule->PutModule("Sample job done"); + } +}; + class CSampleTimer : public CTimer { public: @@ -42,6 +72,7 @@ public: //AddTimer(new CSampleTimer(this, 300, 0, "Sample", "Sample timer for sample things.")); //AddTimer(new CSampleTimer(this, 5, 20, "Another", "Another sample timer.")); //AddTimer(new CSampleTimer(this, 25000, 5, "Third", "A third sample timer.")); + AddJob(new CSampleJob(this)); return true; } From 74fdd97a5204800e85ea77bb6b46efa865ff2d0a Mon Sep 17 00:00:00 2001 From: Uli Schlachter Date: Thu, 7 Aug 2014 22:19:15 +0200 Subject: [PATCH 10/11] CJob: Even cancel finished jobs When a job was cancelled after its runThread() method finished, but before the main thread noticed this and reacted, we would just run runMain() before and pretend the job finished normally. However, with CModuleJob this means that runMain() might get called for a module which is currently being destructed. This has bad effects with virtual functions and thus causes problems. It's better to just really cancel the job instead. Signed-off-by: Uli Schlachter --- src/Threads.cpp | 8 ++++++-- test/ThreadTest.cpp | 2 +- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/src/Threads.cpp b/src/Threads.cpp index 492056cb..a49eab9e 100644 --- a/src/Threads.cpp +++ b/src/Threads.cpp @@ -187,6 +187,7 @@ void CThreadPool::cancelJobs(const std::set &jobs) { continue; case CJob::DONE: + (*it)->m_eState = CJob::CANCELLED; finished.insert(*it); continue; @@ -226,8 +227,11 @@ void CThreadPool::cancelJobs(const std::set &jobs) { // Handle finished jobs. They must already be in the pipe. while (!finished.empty()) { CJob *job = getJobFromPipe(); - finishJob(job); - finished.erase(job); + if (finished.erase(job) > 0) { + assert(job->m_eState == CJob::CANCELLED); + delete job; + } else + finishJob(job); } // Delete things that still need to be deleted diff --git a/test/ThreadTest.cpp b/test/ThreadTest.cpp index 8e7a65ad..8346b21b 100644 --- a/test/ThreadTest.cpp +++ b/test/ThreadTest.cpp @@ -154,7 +154,7 @@ public: } ~CEmptyJob() { - EXPECT_FALSE(wasCancelled()); + EXPECT_TRUE(wasCancelled()); m_bDestroyed = true; } From e0e86e8b022c594c344a06c5d0c46d01533cfff8 Mon Sep 17 00:00:00 2001 From: Uli Schlachter Date: Thu, 7 Aug 2014 22:34:51 +0200 Subject: [PATCH 11/11] CThreadPool::cancelJobs(): Add a comment explaining the logic a bit Signed-off-by: Uli Schlachter --- src/Threads.cpp | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/src/Threads.cpp b/src/Threads.cpp index a49eab9e..91f8ecce 100644 --- a/src/Threads.cpp +++ b/src/Threads.cpp @@ -163,6 +163,23 @@ void CThreadPool::cancelJob(CJob *job) { } void CThreadPool::cancelJobs(const std::set &jobs) { + // Thanks to the mutex, jobs cannot change state anymore. There are + // three different states which can occur: + // + // READY: The job is still in our list of pending jobs and no threads + // got it yet. Just clean up. + // + // DONE: The job finished running and was already written to the pipe + // that is used for waking up finished jobs. We can just read from the + // pipe until we see this job. + // + // RUNNING: This is the complicated case. The job is currently being + // executed. We change its state to CANCELLED so that wasCancelled() + // returns true. Afterwards we wait on a CV for the job to have finished + // running. This CV is signaled by jobDone() which checks the job's + // status and sees that the job was cancelled. It signals to us that + // cancellation is done by changing the job's status to DONE. + CMutexLocker guard(m_mutex); std::set wait, finished, deleteLater; std::set::const_iterator it;