From 53c579b296eb63afad6861a7bfbb488537831193 Mon Sep 17 00:00:00 2001 From: Uli Schlachter Date: Fri, 16 Nov 2012 17:54:32 +0100 Subject: [PATCH] CJob: Add a way to do stuff on the main thread This just moves the pipe from the socket code to the thread pool. However, now all CJobs can use this and there is a single place for them to get deleted. Signed-off-by: Uli Schlachter --- include/znc/Socket.h | 20 ++++++-------- include/znc/Threads.h | 19 ++++++++++--- src/Socket.cpp | 64 ++++++++++++++----------------------------- src/Threads.cpp | 34 ++++++++++++++++++++++- 4 files changed, 77 insertions(+), 60 deletions(-) diff --git a/include/znc/Socket.h b/include/znc/Socket.h index 1a413b1f..6acff68c 100644 --- a/include/znc/Socket.h +++ b/include/znc/Socket.h @@ -102,11 +102,9 @@ public: private: void FinishConnect(const CString& sHostname, u_short iPort, const CString& sSockName, int iTimeout, bool bSSL, const CString& sBindHost, CZNCSock *pcSock); -#ifdef HAVE_THREADED_DNS - int m_iTDNSpipe[2]; - class CTDNSMonitorFD; friend class CTDNSMonitorFD; +#ifdef HAVE_THREADED_DNS struct TDNSTask { CString sHostname; u_short iPort; @@ -123,19 +121,19 @@ private: }; class CDNSJob : public CJob { public: - CString sHostname; - TDNSTask* task; - int fd; - bool bBind; + CString sHostname; + TDNSTask* task; + CSockManager* pManager; + bool bBind; - int iRes; - addrinfo* aiResult; + int iRes; + addrinfo* aiResult; - void run(); + void runThread(); + void runMain(); }; void StartTDNSThread(TDNSTask* task, bool bBind); void SetTDNSThreadFinished(TDNSTask* task, bool bBind, addrinfo* aiResult); - void RetrieveTDNSResult(); static void* TDNSThread(void* argument); #endif protected: diff --git a/include/znc/Threads.h b/include/znc/Threads.h index 518d1aad..c2c6e7d9 100644 --- a/include/znc/Threads.h +++ b/include/znc/Threads.h @@ -182,14 +182,13 @@ public: class CJob { public: virtual ~CJob() {} - virtual void run() = 0; + virtual void runThread() = 0; + virtual void runMain() = 0; }; class CThreadPool { private: - CThreadPool() : m_done(false), m_num_threads(0), m_num_idle(0) { - } - + CThreadPool(); ~CThreadPool(); public: @@ -197,7 +196,15 @@ public: void addJob(CJob *job); + int getReadFD() const { + return m_iJobPipe[0]; + } + + void handlePipeReadable() const; + private: + void jobDone(const CJob* pJob) const; + // Check if the calling thread is still needed, must be called with m_mutex held bool threadNeeded() const; @@ -223,6 +230,10 @@ private: // number of idle threads waiting on the condition variable size_t m_num_idle; + // pipe for waking up the main thread + int m_iJobPipe[2]; + + // list of pending jobs std::list m_jobs; }; diff --git a/src/Socket.cpp b/src/Socket.cpp index ecd9165b..cd18facd 100644 --- a/src/Socket.cpp +++ b/src/Socket.cpp @@ -35,24 +35,24 @@ int CZNCSock::ConvertAddress(const struct sockaddr_storage * pAddr, socklen_t iA return ret; } -#ifdef HAVE_THREADED_DNS +#ifdef HAVE_PTHREAD class CSockManager::CTDNSMonitorFD : public CSMonitorFD { - CSockManager* m_Manager; public: - CTDNSMonitorFD(CSockManager* mgr) { - m_Manager = mgr; - Add(mgr->m_iTDNSpipe[0], ECT_Read); + CTDNSMonitorFD() { + Add(CThreadPool::Get().getReadFD(), ECT_Read); } virtual bool FDsThatTriggered(const std::map& miiReadyFds) { - if (miiReadyFds.find(m_Manager->m_iTDNSpipe[0])->second) { - m_Manager->RetrieveTDNSResult(); + if (miiReadyFds.find(CThreadPool::Get().getReadFD())->second) { + CThreadPool::Get().handlePipeReadable(); } return true; } }; +#endif -void CSockManager::CDNSJob::run() { +#ifdef HAVE_THREADED_DNS +void CSockManager::CDNSJob::runThread() { int iCount = 0; while (true) { addrinfo hints; @@ -73,16 +73,17 @@ void CSockManager::CDNSJob::run() { } sleep(5); // wait 5 seconds before next try } +} - // This write() must succeed because POSIX guarantees that writes of - // less than PIPE_BUF are atomic (and PIPE_BUF is at least 512). - // (Yes, this really wants to write a pointer(!) to the pipe. - CDNSJob *job = this; - size_t w = write(fd, &job, sizeof(job)); - if (w != sizeof(job)) { - DEBUG("Something bad happened during write() to a pipe from TDNSThread, wrote " << w << " bytes: " << strerror(errno)); - exit(1); +void CSockManager::CDNSJob::runMain() { + if (0 != this->iRes) { + DEBUG("Error in threaded DNS: " << gai_strerror(this->iRes)); + if (this->aiResult) { + DEBUG("And aiResult is not NULL..."); + } + this->aiResult = NULL; // just for case. Maybe to call freeaddrinfo()? } + pManager->SetTDNSThreadFinished(this->task, this->bBind, this->aiResult); } void CSockManager::StartTDNSThread(TDNSTask* task, bool bBind) { @@ -90,10 +91,10 @@ void CSockManager::StartTDNSThread(TDNSTask* task, bool bBind) { CDNSJob* arg = new CDNSJob; arg->sHostname = sHostname; arg->task = task; - arg->fd = m_iTDNSpipe[1]; arg->bBind = bBind; arg->iRes = 0; arg->aiResult = NULL; + arg->pManager = this; CThreadPool::Get().addJob(arg); } @@ -177,36 +178,11 @@ void CSockManager::SetTDNSThreadFinished(TDNSTask* task, bool bBind, addrinfo* a delete task; } - -void CSockManager::RetrieveTDNSResult() { - CDNSJob* a = NULL; - ssize_t need = sizeof(a); - ssize_t r = read(m_iTDNSpipe[0], &a, need); - if (r != need) { - DEBUG("Something bad happened during read() from a pipe when getting result of TDNSThread: " << strerror(errno)); - exit(1); - } - TDNSTask* task = a->task; - if (0 != a->iRes) { - DEBUG("Error in threaded DNS: " << gai_strerror(a->iRes)); - if (a->aiResult) { - DEBUG("And aiResult is not NULL..."); - } - a->aiResult = NULL; // just for case. Maybe to call freeaddrinfo()? - } - SetTDNSThreadFinished(task, a->bBind, a->aiResult); - delete a; -} #endif /* HAVE_THREADED_DNS */ CSockManager::CSockManager() { -#ifdef HAVE_THREADED_DNS - if (pipe(m_iTDNSpipe)) { - DEBUG("Ouch, can't open pipe for threaded DNS resolving: " << strerror(errno)); - exit(1); - } - - MonitorFD(new CTDNSMonitorFD(this)); +#ifdef HAVE_PTHREAD + MonitorFD(new CTDNSMonitorFD()); #endif } diff --git a/src/Threads.cpp b/src/Threads.cpp index cb1c38b5..880e19c4 100644 --- a/src/Threads.cpp +++ b/src/Threads.cpp @@ -7,6 +7,7 @@ */ #include +#include #ifdef HAVE_PTHREAD @@ -23,6 +24,36 @@ CThreadPool& CThreadPool::Get() { return pool; } +CThreadPool::CThreadPool() : m_done(false), m_num_threads(0), m_num_idle(0) { + if (pipe(m_iJobPipe)) { + DEBUG("Ouch, can't open pipe for thread pool: " << strerror(errno)); + exit(1); + } +} + +void CThreadPool::jobDone(const CJob* job) const { + // This write() must succeed because POSIX guarantees that writes of + // less than PIPE_BUF are atomic (and PIPE_BUF is at least 512). + // (Yes, this really wants to write a pointer(!) to the pipe. + size_t w = write(m_iJobPipe[1], &job, sizeof(job)); + if (w != sizeof(job)) { + DEBUG("Something bad happened during write() to a pipe for thread pool, wrote " << w << " bytes: " << strerror(errno)); + exit(1); + } +} + +void CThreadPool::handlePipeReadable() const { + CJob* a = NULL; + ssize_t need = sizeof(a); + ssize_t r = read(m_iJobPipe[0], &a, need); + if (r != need) { + DEBUG("Something bad happened during read() from a pipe for thread pool: " << strerror(errno)); + exit(1); + } + a->runMain(); + delete a; +} + CThreadPool::~CThreadPool() { /* Anyone has an idea how this can be done less ugly? */ CMutexLocker guard(m_mutex); @@ -64,7 +95,8 @@ void CThreadPool::threadFunc() { m_num_idle--; guard.unlock(); - job->run(); + job->runThread(); + jobDone(job); guard.lock(); m_num_idle++;