diff --git a/include/znc/Socket.h b/include/znc/Socket.h index 1a413b1f..6acff68c 100644 --- a/include/znc/Socket.h +++ b/include/znc/Socket.h @@ -102,11 +102,9 @@ public: private: void FinishConnect(const CString& sHostname, u_short iPort, const CString& sSockName, int iTimeout, bool bSSL, const CString& sBindHost, CZNCSock *pcSock); -#ifdef HAVE_THREADED_DNS - int m_iTDNSpipe[2]; - class CTDNSMonitorFD; friend class CTDNSMonitorFD; +#ifdef HAVE_THREADED_DNS struct TDNSTask { CString sHostname; u_short iPort; @@ -123,19 +121,19 @@ private: }; class CDNSJob : public CJob { public: - CString sHostname; - TDNSTask* task; - int fd; - bool bBind; + CString sHostname; + TDNSTask* task; + CSockManager* pManager; + bool bBind; - int iRes; - addrinfo* aiResult; + int iRes; + addrinfo* aiResult; - void run(); + void runThread(); + void runMain(); }; void StartTDNSThread(TDNSTask* task, bool bBind); void SetTDNSThreadFinished(TDNSTask* task, bool bBind, addrinfo* aiResult); - void RetrieveTDNSResult(); static void* TDNSThread(void* argument); #endif protected: diff --git a/include/znc/Threads.h b/include/znc/Threads.h index 518d1aad..c2c6e7d9 100644 --- a/include/znc/Threads.h +++ b/include/znc/Threads.h @@ -182,14 +182,13 @@ public: class CJob { public: virtual ~CJob() {} - virtual void run() = 0; + virtual void runThread() = 0; + virtual void runMain() = 0; }; class CThreadPool { private: - CThreadPool() : m_done(false), m_num_threads(0), m_num_idle(0) { - } - + CThreadPool(); ~CThreadPool(); public: @@ -197,7 +196,15 @@ public: void addJob(CJob *job); + int getReadFD() const { + return m_iJobPipe[0]; + } + + void handlePipeReadable() const; + private: + void jobDone(const CJob* pJob) const; + // Check if the calling thread is still needed, must be called with m_mutex held bool threadNeeded() const; @@ -223,6 +230,10 @@ private: // number of idle threads waiting on the condition variable size_t m_num_idle; + // pipe for waking up the main thread + int m_iJobPipe[2]; + + // list of pending jobs std::list m_jobs; }; diff --git a/src/Socket.cpp b/src/Socket.cpp index ecd9165b..cd18facd 100644 --- a/src/Socket.cpp +++ b/src/Socket.cpp @@ -35,24 +35,24 @@ int CZNCSock::ConvertAddress(const struct sockaddr_storage * pAddr, socklen_t iA return ret; } -#ifdef HAVE_THREADED_DNS +#ifdef HAVE_PTHREAD class CSockManager::CTDNSMonitorFD : public CSMonitorFD { - CSockManager* m_Manager; public: - CTDNSMonitorFD(CSockManager* mgr) { - m_Manager = mgr; - Add(mgr->m_iTDNSpipe[0], ECT_Read); + CTDNSMonitorFD() { + Add(CThreadPool::Get().getReadFD(), ECT_Read); } virtual bool FDsThatTriggered(const std::map& miiReadyFds) { - if (miiReadyFds.find(m_Manager->m_iTDNSpipe[0])->second) { - m_Manager->RetrieveTDNSResult(); + if (miiReadyFds.find(CThreadPool::Get().getReadFD())->second) { + CThreadPool::Get().handlePipeReadable(); } return true; } }; +#endif -void CSockManager::CDNSJob::run() { +#ifdef HAVE_THREADED_DNS +void CSockManager::CDNSJob::runThread() { int iCount = 0; while (true) { addrinfo hints; @@ -73,16 +73,17 @@ void CSockManager::CDNSJob::run() { } sleep(5); // wait 5 seconds before next try } +} - // This write() must succeed because POSIX guarantees that writes of - // less than PIPE_BUF are atomic (and PIPE_BUF is at least 512). - // (Yes, this really wants to write a pointer(!) to the pipe. - CDNSJob *job = this; - size_t w = write(fd, &job, sizeof(job)); - if (w != sizeof(job)) { - DEBUG("Something bad happened during write() to a pipe from TDNSThread, wrote " << w << " bytes: " << strerror(errno)); - exit(1); +void CSockManager::CDNSJob::runMain() { + if (0 != this->iRes) { + DEBUG("Error in threaded DNS: " << gai_strerror(this->iRes)); + if (this->aiResult) { + DEBUG("And aiResult is not NULL..."); + } + this->aiResult = NULL; // just for case. Maybe to call freeaddrinfo()? } + pManager->SetTDNSThreadFinished(this->task, this->bBind, this->aiResult); } void CSockManager::StartTDNSThread(TDNSTask* task, bool bBind) { @@ -90,10 +91,10 @@ void CSockManager::StartTDNSThread(TDNSTask* task, bool bBind) { CDNSJob* arg = new CDNSJob; arg->sHostname = sHostname; arg->task = task; - arg->fd = m_iTDNSpipe[1]; arg->bBind = bBind; arg->iRes = 0; arg->aiResult = NULL; + arg->pManager = this; CThreadPool::Get().addJob(arg); } @@ -177,36 +178,11 @@ void CSockManager::SetTDNSThreadFinished(TDNSTask* task, bool bBind, addrinfo* a delete task; } - -void CSockManager::RetrieveTDNSResult() { - CDNSJob* a = NULL; - ssize_t need = sizeof(a); - ssize_t r = read(m_iTDNSpipe[0], &a, need); - if (r != need) { - DEBUG("Something bad happened during read() from a pipe when getting result of TDNSThread: " << strerror(errno)); - exit(1); - } - TDNSTask* task = a->task; - if (0 != a->iRes) { - DEBUG("Error in threaded DNS: " << gai_strerror(a->iRes)); - if (a->aiResult) { - DEBUG("And aiResult is not NULL..."); - } - a->aiResult = NULL; // just for case. Maybe to call freeaddrinfo()? - } - SetTDNSThreadFinished(task, a->bBind, a->aiResult); - delete a; -} #endif /* HAVE_THREADED_DNS */ CSockManager::CSockManager() { -#ifdef HAVE_THREADED_DNS - if (pipe(m_iTDNSpipe)) { - DEBUG("Ouch, can't open pipe for threaded DNS resolving: " << strerror(errno)); - exit(1); - } - - MonitorFD(new CTDNSMonitorFD(this)); +#ifdef HAVE_PTHREAD + MonitorFD(new CTDNSMonitorFD()); #endif } diff --git a/src/Threads.cpp b/src/Threads.cpp index cb1c38b5..880e19c4 100644 --- a/src/Threads.cpp +++ b/src/Threads.cpp @@ -7,6 +7,7 @@ */ #include +#include #ifdef HAVE_PTHREAD @@ -23,6 +24,36 @@ CThreadPool& CThreadPool::Get() { return pool; } +CThreadPool::CThreadPool() : m_done(false), m_num_threads(0), m_num_idle(0) { + if (pipe(m_iJobPipe)) { + DEBUG("Ouch, can't open pipe for thread pool: " << strerror(errno)); + exit(1); + } +} + +void CThreadPool::jobDone(const CJob* job) const { + // This write() must succeed because POSIX guarantees that writes of + // less than PIPE_BUF are atomic (and PIPE_BUF is at least 512). + // (Yes, this really wants to write a pointer(!) to the pipe. + size_t w = write(m_iJobPipe[1], &job, sizeof(job)); + if (w != sizeof(job)) { + DEBUG("Something bad happened during write() to a pipe for thread pool, wrote " << w << " bytes: " << strerror(errno)); + exit(1); + } +} + +void CThreadPool::handlePipeReadable() const { + CJob* a = NULL; + ssize_t need = sizeof(a); + ssize_t r = read(m_iJobPipe[0], &a, need); + if (r != need) { + DEBUG("Something bad happened during read() from a pipe for thread pool: " << strerror(errno)); + exit(1); + } + a->runMain(); + delete a; +} + CThreadPool::~CThreadPool() { /* Anyone has an idea how this can be done less ugly? */ CMutexLocker guard(m_mutex); @@ -64,7 +95,8 @@ void CThreadPool::threadFunc() { m_num_idle--; guard.unlock(); - job->run(); + job->runThread(); + jobDone(job); guard.lock(); m_num_idle++;