diff --git a/Makefile.in b/Makefile.in index 2ba205ee..c26b3d96 100644 --- a/Makefile.in +++ b/Makefile.in @@ -47,7 +47,7 @@ LIB_SRCS := $(addprefix src/,$(LIB_SRCS)) BIN_SRCS := src/main.cpp LIB_OBJS := $(patsubst %cpp,%o,$(LIB_SRCS)) BIN_OBJS := $(patsubst %cpp,%o,$(BIN_SRCS)) -TESTS := StringTest ConfigTest UtilsTest +TESTS := StringTest ConfigTest UtilsTest ThreadTest TESTS := $(addprefix test/,$(addsuffix .o,$(TESTS))) CLEAN := znc src/*.o test/*.o core core.* .version_extra .depend modules/.depend unittest DISTCLEAN := Makefile config.log config.status znc-buildmod \ diff --git a/include/znc/Modules.h b/include/znc/Modules.h index 02433e8b..df864864 100644 --- a/include/znc/Modules.h +++ b/include/znc/Modules.h @@ -19,6 +19,7 @@ #include #include +#include #include #include #include @@ -177,6 +178,29 @@ private: FPTimer_t m_pFBCallback; }; +#ifdef HAVE_PTHREAD +/// A CJob version which can be safely used in modules. The job will be +/// cancelled when the module is unloaded. +class CModuleJob : public CJob { +public: + CModuleJob(CModule *pModule, const CString& sName, const CString& sDesc) + : CJob(), m_pModule(pModule), m_sName(sName), m_sDescription(sDesc) { + } + virtual ~CModuleJob(); + + // Getters + CModule* GetModule() const { return m_pModule; } + const CString& GetName() const { return m_sName; } + const CString& GetDescription() const { return m_sDescription; } + // !Getters + +protected: + CModule* m_pModule; + const CString m_sName; + const CString m_sDescription; +}; +#endif + class CModInfo { public: typedef CModule* (*ModLoader)(ModHandle p, CUser* pUser, CIRCNetwork* pNetwork, const CString& sModName, const CString& sModPath); @@ -885,6 +909,16 @@ public: virtual void ListSockets(); // !Socket stuff +#ifdef HAVE_PTHREAD + // Job stuff + void AddJob(CModuleJob *pJob); + void CancelJob(CModuleJob *pJob); + bool CancelJob(const CString& sJobName); + void CancelJobs(const std::set& sJobs); + bool UnlinkJob(CModuleJob *pJob); + // !Job stuff +#endif + // Command stuff /// Register the "Help" command. void AddHelpCommand(); @@ -1052,6 +1086,9 @@ protected: CString m_sDescription; std::set m_sTimers; std::set m_sSockets; +#ifdef HAVE_PTHREAD + std::set m_sJobs; +#endif ModHandle m_pDLL; CSockManager* m_pManager; CUser* m_pUser; diff --git a/include/znc/Threads.h b/include/znc/Threads.h index ace1ca0a..a83fdc46 100644 --- a/include/znc/Threads.h +++ b/include/znc/Threads.h @@ -71,6 +71,10 @@ public: } private: + // Undefined copy constructor and assignment operator + CMutex(const CMutex&); + CMutex& operator=(const CMutex&); + pthread_mutex_t m_mutex; }; @@ -105,6 +109,10 @@ public: } private: + // Undefined copy constructor and assignment operator + CMutexLocker(const CMutexLocker&); + CMutexLocker& operator=(const CMutexLocker&); + CMutex &m_mutex; bool m_locked; }; @@ -161,6 +169,10 @@ public: } private: + // Undefined copy constructor and assignment operator + CConditionVariable(const CConditionVariable&); + CConditionVariable& operator=(const CConditionVariable&); + pthread_cond_t m_cond; }; @@ -185,25 +197,85 @@ public: exit(1); } } + +private: + // Undefined constructor + CThread(); }; +/** + * A job is a task which should run without blocking the main thread. You do + * this by inheriting from this class and implementing the pure virtual methods + * runThread(), which gets executed in a separate thread and does not block the + * main thread, and runMain() which gets automatically called from the main + * thread after runThread() finishes. + * + * After you create a new instance of your class, you can pass it to + * CThreadPool()::Get().addJob(job) to start it. The thread pool automatically + * deletes your class after it finished. + * + * For modules you should use CModuleJob instead. + */ class CJob { public: + friend class CThreadPool; + + enum EJobState { + READY, + RUNNING, + DONE, + CANCELLED + }; + + CJob() : m_eState(READY) {} + + /// Destructor, always called from the main thread. virtual ~CJob() {} + + /// This function is called in a separate thread and can do heavy, blocking work. virtual void runThread() = 0; + + /// This function is called from the main thread after runThread() + /// finishes. It can be used to handle the results from runThread() + /// without needing synchronization primitives. virtual void runMain() = 0; + + /// This can be used to check if the job was cancelled. For example, + /// runThread() can return early if this returns true. + bool wasCancelled() const; + +private: + // Undefined copy constructor and assignment operator + CJob(const CJob&); + CJob& operator=(const CJob&); + + // Synchronized via the thread pool's mutex! Do not access without that mutex! + EJobState m_eState; }; class CThreadPool { private: + friend class CJob; + CThreadPool(); ~CThreadPool(); public: static CThreadPool& Get(); + /// Add a job to the thread pool and run it. The job will be deleted when done. void addJob(CJob *job); + /// Cancel a job that was previously passed to addJob(). This *might* + /// mean that runThread() and/or runMain() will not be called on the job. + /// This function BLOCKS until the job finishes! + void cancelJob(CJob *job); + + /// Cancel some jobs that were previously passed to addJob(). This *might* + /// mean that runThread() and/or runMain() will not be called on some of + /// the jobs. This function BLOCKS until all jobs finish! + void cancelJobs(const std::set &jobs); + int getReadFD() const { return m_iJobPipe[0]; } @@ -211,11 +283,14 @@ public: void handlePipeReadable() const; private: - void jobDone(const CJob* pJob) const; + void jobDone(CJob* pJob); // Check if the calling thread is still needed, must be called with m_mutex held bool threadNeeded() const; + CJob *getJobFromPipe() const; + void finishJob(CJob *) const; + void threadFunc(); static void *threadPoolFunc(void *arg) { CThreadPool &pool = *reinterpret_cast(arg); @@ -229,6 +304,9 @@ private: // condition variable for waiting idle threads CConditionVariable m_cond; + // condition variable for reporting finished cancellation + CConditionVariable m_cancellationCond; + // when this is true, all threads should exit bool m_done; diff --git a/modules/modperl/modperl.i b/modules/modperl/modperl.i index e7aac012..739799a8 100644 --- a/modules/modperl/modperl.i +++ b/modules/modperl/modperl.i @@ -26,6 +26,7 @@ #endif #include #include "../include/znc/Utils.h" +#include "../include/znc/Threads.h" #include "../include/znc/Config.h" #include "../include/znc/Socket.h" #include "../include/znc/Modules.h" @@ -115,9 +116,11 @@ class MCString : public std::map {}; #define u_short unsigned short #define u_int unsigned int +#include "../include/znc/zncconfig.h" #include "../include/znc/ZNCString.h" %include "../include/znc/defines.h" %include "../include/znc/Utils.h" +%include "../include/znc/Threads.h" %include "../include/znc/Config.h" %include "../include/znc/Csocket.h" %template(ZNCSocketManager) TSocketManager; diff --git a/modules/modpython/modpython.i b/modules/modpython/modpython.i index 98ea6cbd..8495d09a 100644 --- a/modules/modpython/modpython.i +++ b/modules/modpython/modpython.i @@ -17,6 +17,7 @@ %module znc_core %{ #include #include "../include/znc/Utils.h" +#include "../include/znc/Threads.h" #include "../include/znc/Config.h" #include "../include/znc/Socket.h" #include "../include/znc/Modules.h" @@ -123,9 +124,11 @@ class MCString : public std::map {}; #define u_short unsigned short #define u_int unsigned int +#include "../include/znc/zncconfig.h" #include "../include/znc/ZNCString.h" %include "../include/znc/defines.h" %include "../include/znc/Utils.h" +%include "../include/znc/Threads.h" %template(PAuthBase) CSmartPtr; %template(WebSession) CSmartPtr; %include "../include/znc/Config.h" diff --git a/modules/sample.cpp b/modules/sample.cpp index c666b4b3..043bc8e5 100644 --- a/modules/sample.cpp +++ b/modules/sample.cpp @@ -20,6 +20,36 @@ using std::vector; +class CSampleJob : public CModuleJob { +public: + CSampleJob(CModule *pModule) : CModuleJob(pModule, "sample", "Message the user after a delay") {} + + ~CSampleJob() { + if (wasCancelled()) { + m_pModule->PutModule("Sample job cancelled"); + } else { + m_pModule->PutModule("Sample job destroyed"); + } + } + + virtual void runThread() { + // Cannot safely use m_pModule in here, because this runs in its + // own thread and such an access would require synchronisation + // between this thread and the main thread! + + for (int i = 0; i < 10; i++) { + // Regularly check if we were cancelled + if (wasCancelled()) + return; + sleep(1); + } + } + + virtual void runMain() { + m_pModule->PutModule("Sample job done"); + } +}; + class CSampleTimer : public CTimer { public: @@ -42,6 +72,7 @@ public: //AddTimer(new CSampleTimer(this, 300, 0, "Sample", "Sample timer for sample things.")); //AddTimer(new CSampleTimer(this, 5, 20, "Another", "Another sample timer.")); //AddTimer(new CSampleTimer(this, 25000, 5, "Third", "A third sample timer.")); + AddJob(new CSampleJob(this)); return true; } diff --git a/src/Modules.cpp b/src/Modules.cpp index 5e8e39a7..b696831f 100644 --- a/src/Modules.cpp +++ b/src/Modules.cpp @@ -157,6 +157,10 @@ CModule::~CModule() { } SaveRegistry(); + +#ifdef HAVE_PTHREAD + CancelJobs(m_sJobs); +#endif } void CModule::SetUser(CUser* pUser) { m_pUser = pUser; } @@ -449,6 +453,52 @@ void CModule::ListSockets() { PutModule(Table); } +#ifdef HAVE_PTHREAD +CModuleJob::~CModuleJob() +{ + m_pModule->UnlinkJob(this); +} + +void CModule::AddJob(CModuleJob *pJob) +{ + CThreadPool::Get().addJob(pJob); + m_sJobs.insert(pJob); +} + +void CModule::CancelJob(CModuleJob *pJob) +{ + if (pJob == NULL) + return; + // Destructor calls UnlinkJob and removes the job from m_sJobs + CThreadPool::Get().cancelJob(pJob); +} + +bool CModule::CancelJob(const CString& sJobName) +{ + set::iterator it; + for (it = m_sJobs.begin(); it != m_sJobs.end(); ++it) { + if ((*it)->GetName().Equals(sJobName)) { + CancelJob(*it); + return true; + } + } + return false; +} + +void CModule::CancelJobs(const std::set& sJobs) +{ + set sPlainJobs(sJobs.begin(), sJobs.end()); + + // Destructor calls UnlinkJob and removes the jobs from m_sJobs + CThreadPool::Get().cancelJobs(sPlainJobs); +} + +bool CModule::UnlinkJob(CModuleJob *pJob) +{ + return 0 != m_sJobs.erase(pJob); +} +#endif + bool CModule::AddCommand(const CModCommand& Command) { if (Command.GetFunction() == NULL) diff --git a/src/Threads.cpp b/src/Threads.cpp index aeae3091..91f8ecce 100644 --- a/src/Threads.cpp +++ b/src/Threads.cpp @@ -15,10 +15,12 @@ */ #include -#include #ifdef HAVE_PTHREAD +#include +#include + /* Just an arbitrary limit for the number of idle threads */ static const size_t MAX_IDLE_THREADS = 3; @@ -39,7 +41,18 @@ CThreadPool::CThreadPool() : m_done(false), m_num_threads(0), m_num_idle(0) { } } -void CThreadPool::jobDone(const CJob* job) const { +void CThreadPool::jobDone(CJob* job) { + // This must be called with the mutex locked! + + enum CJob::EJobState oldState = job->m_eState; + job->m_eState = CJob::DONE; + + if (oldState == CJob::CANCELLED) { + // Signal the main thread that cancellation is done + m_cancellationCond.signal(); + return; + } + // This write() must succeed because POSIX guarantees that writes of // less than PIPE_BUF are atomic (and PIPE_BUF is at least 512). // (Yes, this really wants to write a pointer(!) to the pipe. @@ -51,6 +64,10 @@ void CThreadPool::jobDone(const CJob* job) const { } void CThreadPool::handlePipeReadable() const { + finishJob(getJobFromPipe()); +} + +CJob *CThreadPool::getJobFromPipe() const { CJob* a = NULL; ssize_t need = sizeof(a); ssize_t r = read(m_iJobPipe[0], &a, need); @@ -58,8 +75,12 @@ void CThreadPool::handlePipeReadable() const { DEBUG("Something bad happened during read() from a pipe for thread pool: " << strerror(errno)); exit(1); } - a->runMain(); - delete a; + return a; +} + +void CThreadPool::finishJob(CJob *job) const { + job->runMain(); + delete job; } CThreadPool::~CThreadPool() { @@ -83,7 +104,7 @@ bool CThreadPool::threadNeeded() const { void CThreadPool::threadFunc() { CMutexLocker guard(m_mutex); - m_num_threads++; + // m_num_threads was already increased m_num_idle++; while (true) { @@ -101,12 +122,13 @@ void CThreadPool::threadFunc() { // Now do the actual job m_num_idle--; + job->m_eState = CJob::RUNNING; guard.unlock(); job->runThread(); - jobDone(job); guard.lock(); + jobDone(job); m_num_idle++; } assert(m_num_threads > 0 && m_num_idle > 0); @@ -130,7 +152,115 @@ void CThreadPool::addJob(CJob *job) { return; // Start a new thread for our pool + m_num_threads++; CThread::startThread(threadPoolFunc, this); } +void CThreadPool::cancelJob(CJob *job) { + std::set jobs; + jobs.insert(job); + cancelJobs(jobs); +} + +void CThreadPool::cancelJobs(const std::set &jobs) { + // Thanks to the mutex, jobs cannot change state anymore. There are + // three different states which can occur: + // + // READY: The job is still in our list of pending jobs and no threads + // got it yet. Just clean up. + // + // DONE: The job finished running and was already written to the pipe + // that is used for waking up finished jobs. We can just read from the + // pipe until we see this job. + // + // RUNNING: This is the complicated case. The job is currently being + // executed. We change its state to CANCELLED so that wasCancelled() + // returns true. Afterwards we wait on a CV for the job to have finished + // running. This CV is signaled by jobDone() which checks the job's + // status and sees that the job was cancelled. It signals to us that + // cancellation is done by changing the job's status to DONE. + + CMutexLocker guard(m_mutex); + std::set wait, finished, deleteLater; + std::set::const_iterator it; + + // Start cancelling all jobs + for (it = jobs.begin(); it != jobs.end(); ++it) { + switch ((*it)->m_eState) { + case CJob::READY: { + (*it)->m_eState = CJob::CANCELLED; + + // Job wasn't started yet, must be in the queue + std::list::iterator it2 = std::find(m_jobs.begin(), m_jobs.end(), *it); + assert(it2 != m_jobs.end()); + m_jobs.erase(it2); + deleteLater.insert(*it); + continue; + } + + case CJob::RUNNING: + (*it)->m_eState = CJob::CANCELLED; + wait.insert(*it); + continue; + + case CJob::DONE: + (*it)->m_eState = CJob::CANCELLED; + finished.insert(*it); + continue; + + case CJob::CANCELLED: + default: + assert(0); + } + } + + // Now wait for cancellation to be done + + // Collect jobs that really were cancelled. Finished cancellation is + // signaled by changing their state to DONE. + while (!wait.empty()) { + it = wait.begin(); + while (it != wait.end()) { + if ((*it)->m_eState != CJob::CANCELLED) { + assert((*it)->m_eState == CJob::DONE); + // Re-set state for the destructor + (*it)->m_eState = CJob::CANCELLED;; + deleteLater.insert(*it); + wait.erase(it++); + } else + it++; + } + + if (wait.empty()) + break; + + // Then wait for more to be done + m_cancellationCond.wait(m_mutex); + } + + // We must call destructors with m_mutex unlocked so that they can call wasCancelled() + guard.unlock(); + + // Handle finished jobs. They must already be in the pipe. + while (!finished.empty()) { + CJob *job = getJobFromPipe(); + if (finished.erase(job) > 0) { + assert(job->m_eState == CJob::CANCELLED); + delete job; + } else + finishJob(job); + } + + // Delete things that still need to be deleted + while (!deleteLater.empty()) { + delete *deleteLater.begin(); + deleteLater.erase(deleteLater.begin()); + } +} + +bool CJob::wasCancelled() const { + CMutexLocker guard(CThreadPool::Get().m_mutex); + return m_eState == CANCELLED; +} + #endif // HAVE_PTHREAD diff --git a/test/StringTest.cpp b/test/StringTest.cpp index b27fc182..02be98fb 100644 --- a/test/StringTest.cpp +++ b/test/StringTest.cpp @@ -18,7 +18,7 @@ #include // GTest uses this function to output objects -void PrintTo(const CString& s, std::ostream* o) { +static void PrintTo(const CString& s, std::ostream* o) { *o << '"' << s.Escape_n(CString::EASCII, CString::EDEBUG) << '"'; } diff --git a/test/ThreadTest.cpp b/test/ThreadTest.cpp new file mode 100644 index 00000000..8346b21b --- /dev/null +++ b/test/ThreadTest.cpp @@ -0,0 +1,185 @@ +/* + * Copyright (C) 2004-2014 ZNC, see the NOTICE file for details. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include + +class CWaitingJob : public CJob { +public: + CWaitingJob(bool& destroyed) + : m_bDestroyed(destroyed), m_Mutex(), m_CV(), m_bThreadReady(false), m_bThreadDone(false) { + }; + + + ~CWaitingJob() { + EXPECT_TRUE(m_bThreadReady); + EXPECT_TRUE(m_bThreadDone); + EXPECT_FALSE(wasCancelled()); + m_bDestroyed = true; + } + + void signal() { + CMutexLocker locker(m_Mutex); + // Wait for the thread to run + while (!m_bThreadReady) + m_CV.wait(m_Mutex); + + // and signal it to exit + m_bThreadDone = true; + m_CV.broadcast(); + } + + virtual void runThread() { + CMutexLocker locker(m_Mutex); + // We are running + m_bThreadReady = true; + m_CV.broadcast(); + + // wait for our exit signal + while (!m_bThreadDone) + m_CV.wait(m_Mutex); + } + + virtual void runMain() {} + +private: + bool& m_bDestroyed; + CMutex m_Mutex; + CConditionVariable m_CV; + bool m_bThreadReady; + bool m_bThreadDone; +}; + +TEST(Thread, RunJob) { + bool destroyed = false; + CWaitingJob *pJob = new CWaitingJob(destroyed); + + CThreadPool::Get().addJob(pJob); + pJob->signal(); + + while (!destroyed) + CThreadPool::Get().handlePipeReadable(); +} + +class CCancelJob : public CJob { +public: + CCancelJob(bool& destroyed) + : m_bDestroyed(destroyed), m_Mutex(), m_CVThreadReady(), m_bThreadReady(false) { + } + + ~CCancelJob() { + EXPECT_TRUE(wasCancelled()); + m_bDestroyed = true; + } + + void wait() { + CMutexLocker locker(m_Mutex); + // Wait for the thread to run + while (!m_bThreadReady) + m_CVThreadReady.wait(m_Mutex); + } + + virtual void runThread() { + m_Mutex.lock(); + // We are running, tell the main thread + m_bThreadReady = true; + m_CVThreadReady.broadcast(); + // Have to unlock here so that wait() can get the mutex + m_Mutex.unlock(); + + while (!wasCancelled()) { + // We can't do much besides busy-looping here. If the + // job really gets cancelled while it is already + // running, the main thread is stuck in cancelJob(), so + // it cannot signal us in any way. And signaling us + // before calling cancelJob() effictively is the same + // thing as busy looping anyway. So busy looping it is. + // (Yes, CJob shouldn't be used for anything that + // requires synchronisation between threads!) + } + } + + virtual void runMain() { } + +private: + bool& m_bDestroyed; + CMutex m_Mutex; + CConditionVariable m_CVThreadReady; + bool m_bThreadReady; +}; + +TEST(Thread, CancelJobEarly) { + bool destroyed = false; + CCancelJob *pJob = new CCancelJob(destroyed); + + CThreadPool::Get().addJob(pJob); + // Don't wait for the job to run. The idea here is that we are calling + // cancelJob() before pJob->runThread() runs, but this is a race. + CThreadPool::Get().cancelJob(pJob); + + // cancelJob() should only return after successful cancellation + EXPECT_TRUE(destroyed); +} + +TEST(Thread, CancelJobWhileRunning) { + bool destroyed = false; + CCancelJob *pJob = new CCancelJob(destroyed); + + CThreadPool::Get().addJob(pJob); + // Wait for the job to run + pJob->wait(); + CThreadPool::Get().cancelJob(pJob); + + // cancelJob() should only return after successful cancellation + EXPECT_TRUE(destroyed); +} + +class CEmptyJob : public CJob { +public: + CEmptyJob(bool& destroyed) + : m_bDestroyed(destroyed) { + } + + ~CEmptyJob() { + EXPECT_TRUE(wasCancelled()); + m_bDestroyed = true; + } + + virtual void runThread() { } + virtual void runMain() { } + +private: + bool& m_bDestroyed; +}; + +TEST(Thread, CancelJobWhenDone) { + bool destroyed = false; + CEmptyJob *pJob = new CEmptyJob(destroyed); + + CThreadPool::Get().addJob(pJob); + + // Wait for the job to finish + fd_set fds; + FD_ZERO(&fds); + FD_SET(CThreadPool::Get().getReadFD(), &fds); + EXPECT_EQ(1, select(1 + CThreadPool::Get().getReadFD(), &fds, NULL, NULL, NULL)); + + // And only cancel it afterwards + CThreadPool::Get().cancelJob(pJob); + + // cancelJob() should only return after successful cancellation + EXPECT_TRUE(destroyed); +}