Merge pull request #616 from psychon/module-jobs2

Add support for CJob cancellation and necessary module support
This commit is contained in:
Alexey Sokolov
2014-08-11 21:09:57 +01:00
10 changed files with 526 additions and 9 deletions

View File

@@ -47,7 +47,7 @@ LIB_SRCS := $(addprefix src/,$(LIB_SRCS))
BIN_SRCS := src/main.cpp
LIB_OBJS := $(patsubst %cpp,%o,$(LIB_SRCS))
BIN_OBJS := $(patsubst %cpp,%o,$(BIN_SRCS))
TESTS := StringTest ConfigTest UtilsTest
TESTS := StringTest ConfigTest UtilsTest ThreadTest
TESTS := $(addprefix test/,$(addsuffix .o,$(TESTS)))
CLEAN := znc src/*.o test/*.o core core.* .version_extra .depend modules/.depend unittest
DISTCLEAN := Makefile config.log config.status znc-buildmod \

View File

@@ -19,6 +19,7 @@
#include <znc/zncconfig.h>
#include <znc/WebModules.h>
#include <znc/Threads.h>
#include <znc/main.h>
#include <set>
#include <queue>
@@ -177,6 +178,29 @@ private:
FPTimer_t m_pFBCallback;
};
#ifdef HAVE_PTHREAD
/// A CJob version which can be safely used in modules. The job will be
/// cancelled when the module is unloaded.
class CModuleJob : public CJob {
public:
CModuleJob(CModule *pModule, const CString& sName, const CString& sDesc)
: CJob(), m_pModule(pModule), m_sName(sName), m_sDescription(sDesc) {
}
virtual ~CModuleJob();
// Getters
CModule* GetModule() const { return m_pModule; }
const CString& GetName() const { return m_sName; }
const CString& GetDescription() const { return m_sDescription; }
// !Getters
protected:
CModule* m_pModule;
const CString m_sName;
const CString m_sDescription;
};
#endif
class CModInfo {
public:
typedef CModule* (*ModLoader)(ModHandle p, CUser* pUser, CIRCNetwork* pNetwork, const CString& sModName, const CString& sModPath);
@@ -885,6 +909,16 @@ public:
virtual void ListSockets();
// !Socket stuff
#ifdef HAVE_PTHREAD
// Job stuff
void AddJob(CModuleJob *pJob);
void CancelJob(CModuleJob *pJob);
bool CancelJob(const CString& sJobName);
void CancelJobs(const std::set<CModuleJob*>& sJobs);
bool UnlinkJob(CModuleJob *pJob);
// !Job stuff
#endif
// Command stuff
/// Register the "Help" command.
void AddHelpCommand();
@@ -1052,6 +1086,9 @@ protected:
CString m_sDescription;
std::set<CTimer*> m_sTimers;
std::set<CSocket*> m_sSockets;
#ifdef HAVE_PTHREAD
std::set<CModuleJob*> m_sJobs;
#endif
ModHandle m_pDLL;
CSockManager* m_pManager;
CUser* m_pUser;

View File

@@ -71,6 +71,10 @@ public:
}
private:
// Undefined copy constructor and assignment operator
CMutex(const CMutex&);
CMutex& operator=(const CMutex&);
pthread_mutex_t m_mutex;
};
@@ -105,6 +109,10 @@ public:
}
private:
// Undefined copy constructor and assignment operator
CMutexLocker(const CMutexLocker&);
CMutexLocker& operator=(const CMutexLocker&);
CMutex &m_mutex;
bool m_locked;
};
@@ -161,6 +169,10 @@ public:
}
private:
// Undefined copy constructor and assignment operator
CConditionVariable(const CConditionVariable&);
CConditionVariable& operator=(const CConditionVariable&);
pthread_cond_t m_cond;
};
@@ -185,25 +197,85 @@ public:
exit(1);
}
}
private:
// Undefined constructor
CThread();
};
/**
* A job is a task which should run without blocking the main thread. You do
* this by inheriting from this class and implementing the pure virtual methods
* runThread(), which gets executed in a separate thread and does not block the
* main thread, and runMain() which gets automatically called from the main
* thread after runThread() finishes.
*
* After you create a new instance of your class, you can pass it to
* CThreadPool()::Get().addJob(job) to start it. The thread pool automatically
* deletes your class after it finished.
*
* For modules you should use CModuleJob instead.
*/
class CJob {
public:
friend class CThreadPool;
enum EJobState {
READY,
RUNNING,
DONE,
CANCELLED
};
CJob() : m_eState(READY) {}
/// Destructor, always called from the main thread.
virtual ~CJob() {}
/// This function is called in a separate thread and can do heavy, blocking work.
virtual void runThread() = 0;
/// This function is called from the main thread after runThread()
/// finishes. It can be used to handle the results from runThread()
/// without needing synchronization primitives.
virtual void runMain() = 0;
/// This can be used to check if the job was cancelled. For example,
/// runThread() can return early if this returns true.
bool wasCancelled() const;
private:
// Undefined copy constructor and assignment operator
CJob(const CJob&);
CJob& operator=(const CJob&);
// Synchronized via the thread pool's mutex! Do not access without that mutex!
EJobState m_eState;
};
class CThreadPool {
private:
friend class CJob;
CThreadPool();
~CThreadPool();
public:
static CThreadPool& Get();
/// Add a job to the thread pool and run it. The job will be deleted when done.
void addJob(CJob *job);
/// Cancel a job that was previously passed to addJob(). This *might*
/// mean that runThread() and/or runMain() will not be called on the job.
/// This function BLOCKS until the job finishes!
void cancelJob(CJob *job);
/// Cancel some jobs that were previously passed to addJob(). This *might*
/// mean that runThread() and/or runMain() will not be called on some of
/// the jobs. This function BLOCKS until all jobs finish!
void cancelJobs(const std::set<CJob *> &jobs);
int getReadFD() const {
return m_iJobPipe[0];
}
@@ -211,11 +283,14 @@ public:
void handlePipeReadable() const;
private:
void jobDone(const CJob* pJob) const;
void jobDone(CJob* pJob);
// Check if the calling thread is still needed, must be called with m_mutex held
bool threadNeeded() const;
CJob *getJobFromPipe() const;
void finishJob(CJob *) const;
void threadFunc();
static void *threadPoolFunc(void *arg) {
CThreadPool &pool = *reinterpret_cast<CThreadPool *>(arg);
@@ -229,6 +304,9 @@ private:
// condition variable for waiting idle threads
CConditionVariable m_cond;
// condition variable for reporting finished cancellation
CConditionVariable m_cancellationCond;
// when this is true, all threads should exit
bool m_done;

View File

@@ -26,6 +26,7 @@
#endif
#include <utility>
#include "../include/znc/Utils.h"
#include "../include/znc/Threads.h"
#include "../include/znc/Config.h"
#include "../include/znc/Socket.h"
#include "../include/znc/Modules.h"
@@ -115,9 +116,11 @@ class MCString : public std::map<CString, CString> {};
#define u_short unsigned short
#define u_int unsigned int
#include "../include/znc/zncconfig.h"
#include "../include/znc/ZNCString.h"
%include "../include/znc/defines.h"
%include "../include/znc/Utils.h"
%include "../include/znc/Threads.h"
%include "../include/znc/Config.h"
%include "../include/znc/Csocket.h"
%template(ZNCSocketManager) TSocketManager<CZNCSock>;

View File

@@ -17,6 +17,7 @@
%module znc_core %{
#include <utility>
#include "../include/znc/Utils.h"
#include "../include/znc/Threads.h"
#include "../include/znc/Config.h"
#include "../include/znc/Socket.h"
#include "../include/znc/Modules.h"
@@ -123,9 +124,11 @@ class MCString : public std::map<CString, CString> {};
#define u_short unsigned short
#define u_int unsigned int
#include "../include/znc/zncconfig.h"
#include "../include/znc/ZNCString.h"
%include "../include/znc/defines.h"
%include "../include/znc/Utils.h"
%include "../include/znc/Threads.h"
%template(PAuthBase) CSmartPtr<CAuthBase>;
%template(WebSession) CSmartPtr<CWebSession>;
%include "../include/znc/Config.h"

View File

@@ -20,6 +20,36 @@
using std::vector;
class CSampleJob : public CModuleJob {
public:
CSampleJob(CModule *pModule) : CModuleJob(pModule, "sample", "Message the user after a delay") {}
~CSampleJob() {
if (wasCancelled()) {
m_pModule->PutModule("Sample job cancelled");
} else {
m_pModule->PutModule("Sample job destroyed");
}
}
virtual void runThread() {
// Cannot safely use m_pModule in here, because this runs in its
// own thread and such an access would require synchronisation
// between this thread and the main thread!
for (int i = 0; i < 10; i++) {
// Regularly check if we were cancelled
if (wasCancelled())
return;
sleep(1);
}
}
virtual void runMain() {
m_pModule->PutModule("Sample job done");
}
};
class CSampleTimer : public CTimer {
public:
@@ -42,6 +72,7 @@ public:
//AddTimer(new CSampleTimer(this, 300, 0, "Sample", "Sample timer for sample things."));
//AddTimer(new CSampleTimer(this, 5, 20, "Another", "Another sample timer."));
//AddTimer(new CSampleTimer(this, 25000, 5, "Third", "A third sample timer."));
AddJob(new CSampleJob(this));
return true;
}

View File

@@ -157,6 +157,10 @@ CModule::~CModule() {
}
SaveRegistry();
#ifdef HAVE_PTHREAD
CancelJobs(m_sJobs);
#endif
}
void CModule::SetUser(CUser* pUser) { m_pUser = pUser; }
@@ -449,6 +453,52 @@ void CModule::ListSockets() {
PutModule(Table);
}
#ifdef HAVE_PTHREAD
CModuleJob::~CModuleJob()
{
m_pModule->UnlinkJob(this);
}
void CModule::AddJob(CModuleJob *pJob)
{
CThreadPool::Get().addJob(pJob);
m_sJobs.insert(pJob);
}
void CModule::CancelJob(CModuleJob *pJob)
{
if (pJob == NULL)
return;
// Destructor calls UnlinkJob and removes the job from m_sJobs
CThreadPool::Get().cancelJob(pJob);
}
bool CModule::CancelJob(const CString& sJobName)
{
set<CModuleJob*>::iterator it;
for (it = m_sJobs.begin(); it != m_sJobs.end(); ++it) {
if ((*it)->GetName().Equals(sJobName)) {
CancelJob(*it);
return true;
}
}
return false;
}
void CModule::CancelJobs(const std::set<CModuleJob*>& sJobs)
{
set<CJob*> sPlainJobs(sJobs.begin(), sJobs.end());
// Destructor calls UnlinkJob and removes the jobs from m_sJobs
CThreadPool::Get().cancelJobs(sPlainJobs);
}
bool CModule::UnlinkJob(CModuleJob *pJob)
{
return 0 != m_sJobs.erase(pJob);
}
#endif
bool CModule::AddCommand(const CModCommand& Command)
{
if (Command.GetFunction() == NULL)

View File

@@ -15,10 +15,12 @@
*/
#include <znc/Threads.h>
#include <znc/ZNCDebug.h>
#ifdef HAVE_PTHREAD
#include <znc/ZNCDebug.h>
#include <algorithm>
/* Just an arbitrary limit for the number of idle threads */
static const size_t MAX_IDLE_THREADS = 3;
@@ -39,7 +41,18 @@ CThreadPool::CThreadPool() : m_done(false), m_num_threads(0), m_num_idle(0) {
}
}
void CThreadPool::jobDone(const CJob* job) const {
void CThreadPool::jobDone(CJob* job) {
// This must be called with the mutex locked!
enum CJob::EJobState oldState = job->m_eState;
job->m_eState = CJob::DONE;
if (oldState == CJob::CANCELLED) {
// Signal the main thread that cancellation is done
m_cancellationCond.signal();
return;
}
// This write() must succeed because POSIX guarantees that writes of
// less than PIPE_BUF are atomic (and PIPE_BUF is at least 512).
// (Yes, this really wants to write a pointer(!) to the pipe.
@@ -51,6 +64,10 @@ void CThreadPool::jobDone(const CJob* job) const {
}
void CThreadPool::handlePipeReadable() const {
finishJob(getJobFromPipe());
}
CJob *CThreadPool::getJobFromPipe() const {
CJob* a = NULL;
ssize_t need = sizeof(a);
ssize_t r = read(m_iJobPipe[0], &a, need);
@@ -58,8 +75,12 @@ void CThreadPool::handlePipeReadable() const {
DEBUG("Something bad happened during read() from a pipe for thread pool: " << strerror(errno));
exit(1);
}
a->runMain();
delete a;
return a;
}
void CThreadPool::finishJob(CJob *job) const {
job->runMain();
delete job;
}
CThreadPool::~CThreadPool() {
@@ -83,7 +104,7 @@ bool CThreadPool::threadNeeded() const {
void CThreadPool::threadFunc() {
CMutexLocker guard(m_mutex);
m_num_threads++;
// m_num_threads was already increased
m_num_idle++;
while (true) {
@@ -101,12 +122,13 @@ void CThreadPool::threadFunc() {
// Now do the actual job
m_num_idle--;
job->m_eState = CJob::RUNNING;
guard.unlock();
job->runThread();
jobDone(job);
guard.lock();
jobDone(job);
m_num_idle++;
}
assert(m_num_threads > 0 && m_num_idle > 0);
@@ -130,7 +152,115 @@ void CThreadPool::addJob(CJob *job) {
return;
// Start a new thread for our pool
m_num_threads++;
CThread::startThread(threadPoolFunc, this);
}
void CThreadPool::cancelJob(CJob *job) {
std::set<CJob *> jobs;
jobs.insert(job);
cancelJobs(jobs);
}
void CThreadPool::cancelJobs(const std::set<CJob *> &jobs) {
// Thanks to the mutex, jobs cannot change state anymore. There are
// three different states which can occur:
//
// READY: The job is still in our list of pending jobs and no threads
// got it yet. Just clean up.
//
// DONE: The job finished running and was already written to the pipe
// that is used for waking up finished jobs. We can just read from the
// pipe until we see this job.
//
// RUNNING: This is the complicated case. The job is currently being
// executed. We change its state to CANCELLED so that wasCancelled()
// returns true. Afterwards we wait on a CV for the job to have finished
// running. This CV is signaled by jobDone() which checks the job's
// status and sees that the job was cancelled. It signals to us that
// cancellation is done by changing the job's status to DONE.
CMutexLocker guard(m_mutex);
std::set<CJob *> wait, finished, deleteLater;
std::set<CJob *>::const_iterator it;
// Start cancelling all jobs
for (it = jobs.begin(); it != jobs.end(); ++it) {
switch ((*it)->m_eState) {
case CJob::READY: {
(*it)->m_eState = CJob::CANCELLED;
// Job wasn't started yet, must be in the queue
std::list<CJob *>::iterator it2 = std::find(m_jobs.begin(), m_jobs.end(), *it);
assert(it2 != m_jobs.end());
m_jobs.erase(it2);
deleteLater.insert(*it);
continue;
}
case CJob::RUNNING:
(*it)->m_eState = CJob::CANCELLED;
wait.insert(*it);
continue;
case CJob::DONE:
(*it)->m_eState = CJob::CANCELLED;
finished.insert(*it);
continue;
case CJob::CANCELLED:
default:
assert(0);
}
}
// Now wait for cancellation to be done
// Collect jobs that really were cancelled. Finished cancellation is
// signaled by changing their state to DONE.
while (!wait.empty()) {
it = wait.begin();
while (it != wait.end()) {
if ((*it)->m_eState != CJob::CANCELLED) {
assert((*it)->m_eState == CJob::DONE);
// Re-set state for the destructor
(*it)->m_eState = CJob::CANCELLED;;
deleteLater.insert(*it);
wait.erase(it++);
} else
it++;
}
if (wait.empty())
break;
// Then wait for more to be done
m_cancellationCond.wait(m_mutex);
}
// We must call destructors with m_mutex unlocked so that they can call wasCancelled()
guard.unlock();
// Handle finished jobs. They must already be in the pipe.
while (!finished.empty()) {
CJob *job = getJobFromPipe();
if (finished.erase(job) > 0) {
assert(job->m_eState == CJob::CANCELLED);
delete job;
} else
finishJob(job);
}
// Delete things that still need to be deleted
while (!deleteLater.empty()) {
delete *deleteLater.begin();
deleteLater.erase(deleteLater.begin());
}
}
bool CJob::wasCancelled() const {
CMutexLocker guard(CThreadPool::Get().m_mutex);
return m_eState == CANCELLED;
}
#endif // HAVE_PTHREAD

View File

@@ -18,7 +18,7 @@
#include <znc/ZNCString.h>
// GTest uses this function to output objects
void PrintTo(const CString& s, std::ostream* o) {
static void PrintTo(const CString& s, std::ostream* o) {
*o << '"' << s.Escape_n(CString::EASCII, CString::EDEBUG) << '"';
}

185
test/ThreadTest.cpp Normal file
View File

@@ -0,0 +1,185 @@
/*
* Copyright (C) 2004-2014 ZNC, see the NOTICE file for details.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <gtest/gtest.h>
#include <znc/Threads.h>
class CWaitingJob : public CJob {
public:
CWaitingJob(bool& destroyed)
: m_bDestroyed(destroyed), m_Mutex(), m_CV(), m_bThreadReady(false), m_bThreadDone(false) {
};
~CWaitingJob() {
EXPECT_TRUE(m_bThreadReady);
EXPECT_TRUE(m_bThreadDone);
EXPECT_FALSE(wasCancelled());
m_bDestroyed = true;
}
void signal() {
CMutexLocker locker(m_Mutex);
// Wait for the thread to run
while (!m_bThreadReady)
m_CV.wait(m_Mutex);
// and signal it to exit
m_bThreadDone = true;
m_CV.broadcast();
}
virtual void runThread() {
CMutexLocker locker(m_Mutex);
// We are running
m_bThreadReady = true;
m_CV.broadcast();
// wait for our exit signal
while (!m_bThreadDone)
m_CV.wait(m_Mutex);
}
virtual void runMain() {}
private:
bool& m_bDestroyed;
CMutex m_Mutex;
CConditionVariable m_CV;
bool m_bThreadReady;
bool m_bThreadDone;
};
TEST(Thread, RunJob) {
bool destroyed = false;
CWaitingJob *pJob = new CWaitingJob(destroyed);
CThreadPool::Get().addJob(pJob);
pJob->signal();
while (!destroyed)
CThreadPool::Get().handlePipeReadable();
}
class CCancelJob : public CJob {
public:
CCancelJob(bool& destroyed)
: m_bDestroyed(destroyed), m_Mutex(), m_CVThreadReady(), m_bThreadReady(false) {
}
~CCancelJob() {
EXPECT_TRUE(wasCancelled());
m_bDestroyed = true;
}
void wait() {
CMutexLocker locker(m_Mutex);
// Wait for the thread to run
while (!m_bThreadReady)
m_CVThreadReady.wait(m_Mutex);
}
virtual void runThread() {
m_Mutex.lock();
// We are running, tell the main thread
m_bThreadReady = true;
m_CVThreadReady.broadcast();
// Have to unlock here so that wait() can get the mutex
m_Mutex.unlock();
while (!wasCancelled()) {
// We can't do much besides busy-looping here. If the
// job really gets cancelled while it is already
// running, the main thread is stuck in cancelJob(), so
// it cannot signal us in any way. And signaling us
// before calling cancelJob() effictively is the same
// thing as busy looping anyway. So busy looping it is.
// (Yes, CJob shouldn't be used for anything that
// requires synchronisation between threads!)
}
}
virtual void runMain() { }
private:
bool& m_bDestroyed;
CMutex m_Mutex;
CConditionVariable m_CVThreadReady;
bool m_bThreadReady;
};
TEST(Thread, CancelJobEarly) {
bool destroyed = false;
CCancelJob *pJob = new CCancelJob(destroyed);
CThreadPool::Get().addJob(pJob);
// Don't wait for the job to run. The idea here is that we are calling
// cancelJob() before pJob->runThread() runs, but this is a race.
CThreadPool::Get().cancelJob(pJob);
// cancelJob() should only return after successful cancellation
EXPECT_TRUE(destroyed);
}
TEST(Thread, CancelJobWhileRunning) {
bool destroyed = false;
CCancelJob *pJob = new CCancelJob(destroyed);
CThreadPool::Get().addJob(pJob);
// Wait for the job to run
pJob->wait();
CThreadPool::Get().cancelJob(pJob);
// cancelJob() should only return after successful cancellation
EXPECT_TRUE(destroyed);
}
class CEmptyJob : public CJob {
public:
CEmptyJob(bool& destroyed)
: m_bDestroyed(destroyed) {
}
~CEmptyJob() {
EXPECT_TRUE(wasCancelled());
m_bDestroyed = true;
}
virtual void runThread() { }
virtual void runMain() { }
private:
bool& m_bDestroyed;
};
TEST(Thread, CancelJobWhenDone) {
bool destroyed = false;
CEmptyJob *pJob = new CEmptyJob(destroyed);
CThreadPool::Get().addJob(pJob);
// Wait for the job to finish
fd_set fds;
FD_ZERO(&fds);
FD_SET(CThreadPool::Get().getReadFD(), &fds);
EXPECT_EQ(1, select(1 + CThreadPool::Get().getReadFD(), &fds, NULL, NULL, NULL));
// And only cancel it afterwards
CThreadPool::Get().cancelJob(pJob);
// cancelJob() should only return after successful cancellation
EXPECT_TRUE(destroyed);
}