diff --git a/Makefile.in b/Makefile.in index 6aa2a48f..def8565b 100644 --- a/Makefile.in +++ b/Makefile.in @@ -34,7 +34,7 @@ SED := @SED@ LIB_SRCS := ZNCString.cpp Csocket.cpp znc.cpp IRCNetwork.cpp User.cpp IRCSock.cpp \ Client.cpp Chan.cpp Nick.cpp Server.cpp Modules.cpp MD5.cpp Buffer.cpp Utils.cpp \ FileUtils.cpp HTTPSock.cpp Template.cpp ClientCommand.cpp Socket.cpp SHA256.cpp \ - WebModules.cpp Listener.cpp Config.cpp ZNCDebug.cpp version.cpp + WebModules.cpp Listener.cpp Config.cpp ZNCDebug.cpp Threads.cpp version.cpp LIB_SRCS := $(addprefix src/,$(LIB_SRCS)) BIN_SRCS := src/main.cpp LIB_OBJS := $(patsubst %cpp,%o,$(LIB_SRCS)) diff --git a/configure.ac b/configure.ac index d3282be7..8f0591f9 100644 --- a/configure.ac +++ b/configure.ac @@ -220,6 +220,7 @@ DNS_TEXT=blocking if test "x$TDNS" != "xno"; then old_TDNS=$TDNS AX_PTHREAD([ + AC_DEFINE([HAVE_PTHREAD], [1], [Define if you have POSIX threads libraries and header files.]) AC_MSG_CHECKING([whether getaddrinfo() supports AI_ADDRCONFIG]) AC_COMPILE_IFELSE([ AC_LANG_PROGRAM([[ diff --git a/include/znc/Socket.h b/include/znc/Socket.h index 49bbdb4a..1a413b1f 100644 --- a/include/znc/Socket.h +++ b/include/znc/Socket.h @@ -11,6 +11,7 @@ #include #include +#include class CModule; @@ -120,7 +121,8 @@ private: addrinfo* aiTarget; addrinfo* aiBind; }; - struct TDNSArg { + class CDNSJob : public CJob { + public: CString sHostname; TDNSTask* task; int fd; @@ -128,33 +130,13 @@ private: int iRes; addrinfo* aiResult; - }; - struct TDNSStatus { - /* mutex which protects this whole struct */ - pthread_mutex_t mutex; - /* condition variable for idle threads */ - pthread_cond_t cond; - /* When this is true, all threads should exit */ - bool done; - /* Total number of running DNS threads */ - size_t num_threads; - /* Number of DNS threads which don't have any work */ - size_t num_idle; - /* List of pending DNS jobs */ - std::list jobs; + + void run(); }; void StartTDNSThread(TDNSTask* task, bool bBind); void SetTDNSThreadFinished(TDNSTask* task, bool bBind, addrinfo* aiResult); void RetrieveTDNSResult(); static void* TDNSThread(void* argument); - static void DoDNS(TDNSArg *arg); - - /** Must be called with threadStatus->mutex held. - * @returns false when the calling DNS thread should exit. - */ - static bool ThreadNeeded(struct TDNSStatus* status); - - TDNSStatus m_threadStatus; #endif protected: }; diff --git a/include/znc/Threads.h b/include/znc/Threads.h new file mode 100644 index 00000000..518d1aad --- /dev/null +++ b/include/znc/Threads.h @@ -0,0 +1,230 @@ +/* + * Copyright (C) 2004-2012 See the AUTHORS file for details. + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 as published + * by the Free Software Foundation. + */ + +#ifndef _THREADS_H +#define _THREADS_H + +#include + +#ifdef HAVE_PTHREAD + +#include + +#include +#include +#include +#include +#include +#include + +/** + * This class represents a non-recursive mutex. Only a single thread may own the + * mutex at any point in time. + */ +class CMutex { +public: + friend class CConditionVariable; + + CMutex() { + int i = pthread_mutex_init(&m_mutex, NULL); + if (i) { + CUtils::PrintError("Can't initialize mutex: " + CString(strerror(errno))); + exit(1); + } + } + + ~CMutex() { + int i = pthread_mutex_destroy(&m_mutex); + if (i) { + CUtils::PrintError("Can't destroy mutex: " + CString(strerror(errno))); + exit(1); + } + } + + void lock() { + int i = pthread_mutex_lock(&m_mutex); + if (i) { + CUtils::PrintError("Can't lock mutex: " + CString(strerror(errno))); + exit(1); + } + } + + void unlock() { + int i = pthread_mutex_unlock(&m_mutex); + if (i) { + CUtils::PrintError("Can't unlock mutex: " + CString(strerror(errno))); + exit(1); + } + } + +private: + pthread_mutex_t m_mutex; +}; + +/** + * A mutex locker should always be used as an automatic variable. This + * class makes sure that the mutex is unlocked when this class is destructed. + * For example, this makes it easier to make code exception-safe. + */ +class CMutexLocker { +public: + CMutexLocker(CMutex& mutex, bool initiallyLocked = true) + : m_mutex(mutex), m_locked(false) { + if (initiallyLocked) + lock(); + } + + ~CMutexLocker() { + if (m_locked) + unlock(); + } + + void lock() { + assert(!m_locked); + m_mutex.lock(); + m_locked = true; + } + + void unlock() { + assert(m_locked); + m_locked = false; + m_mutex.unlock(); + } + +private: + CMutex &m_mutex; + bool m_locked; +}; + +/** + * A condition variable makes it possible for threads to wait until some + * condition is reached at which point the thread can wake up again. + */ +class CConditionVariable { +public: + CConditionVariable() { + int i = pthread_cond_init(&m_cond, NULL); + if (i) { + CUtils::PrintError("Can't initialize condition variable: " + + CString(strerror(errno))); + exit(1); + } + } + + ~CConditionVariable() { + int i = pthread_cond_destroy(&m_cond); + if (i) { + CUtils::PrintError("Can't destroy condition variable: " + + CString(strerror(errno))); + exit(1); + } + } + + void wait(CMutex& mutex) { + int i = pthread_cond_wait(&m_cond, &mutex.m_mutex); + if (i) { + CUtils::PrintError("Can't wait on condition variable: " + + CString(strerror(errno))); + exit(1); + } + } + + void signal() { + int i = pthread_cond_signal(&m_cond); + if (i) { + CUtils::PrintError("Can't signal condition variable: " + + CString(strerror(errno))); + exit(1); + } + } + + void broadcast() { + int i = pthread_cond_broadcast(&m_cond); + if (i) { + CUtils::PrintError("Can't broadcast condition variable: " + + CString(strerror(errno))); + exit(1); + } + } + +private: + pthread_cond_t m_cond; +}; + +class CThread { +public: + typedef void *threadRoutine(void *); + static void startThread(threadRoutine *func, void *arg) { + pthread_t thr; + sigset_t old_sigmask, sigmask; + + /* Block all signals. The thread will inherit our signal mask + * and thus won't ever try to handle signals. + */ + int i = sigfillset(&sigmask); + i |= pthread_sigmask(SIG_SETMASK, &sigmask, &old_sigmask); + i |= pthread_create(&thr, NULL, func, arg); + i |= pthread_sigmask(SIG_SETMASK, &old_sigmask, NULL); + i |= pthread_detach(thr); + if (i) { + CUtils::PrintError("Can't start new thread: " + + CString(strerror(errno))); + exit(1); + } + } +}; + +class CJob { +public: + virtual ~CJob() {} + virtual void run() = 0; +}; + +class CThreadPool { +private: + CThreadPool() : m_done(false), m_num_threads(0), m_num_idle(0) { + } + + ~CThreadPool(); + +public: + static CThreadPool& Get(); + + void addJob(CJob *job); + +private: + // Check if the calling thread is still needed, must be called with m_mutex held + bool threadNeeded() const; + + void threadFunc(); + static void *threadPoolFunc(void *arg) { + CThreadPool &pool = *reinterpret_cast(arg); + pool.threadFunc(); + return NULL; + } + + // mutex protecting all of these members + CMutex m_mutex; + + // condition variable for waiting idle threads + CConditionVariable m_cond; + + // when this is true, all threads should exit + bool m_done; + + // total number of running threads + size_t m_num_threads; + + // number of idle threads waiting on the condition variable + size_t m_num_idle; + + std::list m_jobs; +}; + +#endif // HAVE_PTHREAD +#endif // !_THREADS_H diff --git a/src/Socket.cpp b/src/Socket.cpp index 2ac7da39..ecd9165b 100644 --- a/src/Socket.cpp +++ b/src/Socket.cpp @@ -10,9 +10,6 @@ #include #include -/* We should need 2 DNS threads (host, bindhost) per IRC connection */ -static const size_t MAX_IDLE_THREADS = 2; - unsigned int CSockManager::GetAnonConnectionCount(const CString &sIP) const { const_iterator it; unsigned int ret = 0; @@ -55,55 +52,7 @@ public: } }; -bool CSockManager::ThreadNeeded(struct TDNSStatus* threadStatus) -{ - // We should keep a number of idle threads alive - if (threadStatus->num_idle > MAX_IDLE_THREADS) - return false; - // If ZNC is shutting down, all threads should exit - return !threadStatus->done; -} - -void* CSockManager::TDNSThread(void* argument) { - TDNSStatus *threadStatus = (TDNSStatus *) argument; - - pthread_mutex_lock(&threadStatus->mutex); - threadStatus->num_threads++; - threadStatus->num_idle++; - while (true) { - /* Wait for a DNS job for us to do. This is a while()-loop - * because POSIX allows spurious wakeups from pthread_cond_wait. - */ - while (threadStatus->jobs.empty()) { - if (!ThreadNeeded(threadStatus)) - break; - pthread_cond_wait(&threadStatus->cond, &threadStatus->mutex); - } - - if (!ThreadNeeded(threadStatus)) - break; - - /* Figure out a DNS job to do */ - assert(threadStatus->num_idle > 0); - TDNSArg *job = threadStatus->jobs.front(); - threadStatus->jobs.pop_front(); - threadStatus->num_idle--; - pthread_mutex_unlock(&threadStatus->mutex); - - /* Now do the actual work */ - DoDNS(job); - - pthread_mutex_lock(&threadStatus->mutex); - threadStatus->num_idle++; - } - assert(threadStatus->num_threads > 0 && threadStatus->num_idle > 0); - threadStatus->num_threads--; - threadStatus->num_idle--; - pthread_mutex_unlock(&threadStatus->mutex); - return NULL; -} - -void CSockManager::DoDNS(TDNSArg *arg) { +void CSockManager::CDNSJob::run() { int iCount = 0; while (true) { addrinfo hints; @@ -112,25 +61,25 @@ void CSockManager::DoDNS(TDNSArg *arg) { hints.ai_socktype = SOCK_STREAM; hints.ai_protocol = IPPROTO_TCP; hints.ai_flags = AI_ADDRCONFIG; - arg->iRes = getaddrinfo(arg->sHostname.c_str(), NULL, &hints, &arg->aiResult); - if (EAGAIN != arg->iRes) { + iRes = getaddrinfo(sHostname.c_str(), NULL, &hints, &aiResult); + if (EAGAIN != iRes) { break; } iCount++; if (iCount > 5) { - arg->iRes = ETIMEDOUT; + iRes = ETIMEDOUT; break; } sleep(5); // wait 5 seconds before next try } - size_t need = sizeof(TDNSArg*); - char* x = (char*)&arg; // This write() must succeed because POSIX guarantees that writes of // less than PIPE_BUF are atomic (and PIPE_BUF is at least 512). - size_t w = write(arg->fd, x, need); - if (w != need) { + // (Yes, this really wants to write a pointer(!) to the pipe. + CDNSJob *job = this; + size_t w = write(fd, &job, sizeof(job)); + if (w != sizeof(job)) { DEBUG("Something bad happened during write() to a pipe from TDNSThread, wrote " << w << " bytes: " << strerror(errno)); exit(1); } @@ -138,7 +87,7 @@ void CSockManager::DoDNS(TDNSArg *arg) { void CSockManager::StartTDNSThread(TDNSTask* task, bool bBind) { CString sHostname = bBind ? task->sBindhost : task->sHostname; - TDNSArg* arg = new TDNSArg; + CDNSJob* arg = new CDNSJob; arg->sHostname = sHostname; arg->task = task; arg->fd = m_iTDNSpipe[1]; @@ -146,62 +95,7 @@ void CSockManager::StartTDNSThread(TDNSTask* task, bool bBind) { arg->iRes = 0; arg->aiResult = NULL; - pthread_mutex_lock(&m_threadStatus.mutex); - m_threadStatus.jobs.push_back(arg); - /* Do we need a new DNS thread? */ - if (m_threadStatus.num_idle > 0) { - /* Nope, there is one waiting for a job */ - pthread_cond_signal(&m_threadStatus.cond); - pthread_mutex_unlock(&m_threadStatus.mutex); - return; - } - pthread_mutex_unlock(&m_threadStatus.mutex); - - pthread_attr_t attr; - if (pthread_attr_init(&attr)) { - CString sError = "Couldn't init pthread_attr for " + sHostname; - DEBUG(sError); - CZNC::Get().Broadcast(sError, /* bAdminOnly = */ true); - SetTDNSThreadFinished(task, bBind, NULL); - return; - } - pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); - - pthread_t thr; - sigset_t old_sigmask; - sigset_t sigmask; - sigfillset(&sigmask); - /* Block all signals. The thread will inherit our signal mask and thus - * won't ever try to handle any signals. - */ - if (pthread_sigmask(SIG_SETMASK, &sigmask, &old_sigmask)) { - CString sError = "Couldn't block signals"; - DEBUG(sError); - CZNC::Get().Broadcast(sError, /* bAdminOnly = */ true); - delete arg; - pthread_attr_destroy(&attr); - SetTDNSThreadFinished(task, bBind, NULL); - return; - } - if (pthread_create(&thr, &attr, TDNSThread, &m_threadStatus)) { - CString sError = "Couldn't create thread for " + sHostname; - DEBUG(sError); - CZNC::Get().Broadcast(sError, /* bAdminOnly = */ true); - delete arg; - pthread_attr_destroy(&attr); - SetTDNSThreadFinished(task, bBind, NULL); - return; - } - if (pthread_sigmask(SIG_SETMASK, &old_sigmask, NULL)) { - CString sError = "Couldn't unblock signals"; - DEBUG(sError); - CZNC::Get().Broadcast(sError, /* bAdminOnly = */ true); - delete arg; - pthread_attr_destroy(&attr); - SetTDNSThreadFinished(task, bBind, NULL); - return; - } - pthread_attr_destroy(&attr); + CThreadPool::Get().addJob(arg); } void CSockManager::SetTDNSThreadFinished(TDNSTask* task, bool bBind, addrinfo* aiResult) { @@ -285,18 +179,12 @@ void CSockManager::SetTDNSThreadFinished(TDNSTask* task, bool bBind, addrinfo* a } void CSockManager::RetrieveTDNSResult() { - TDNSArg* a = NULL; - size_t readed = 0; - size_t need = sizeof(TDNSArg*); - char* x = (char*)&a; - while (readed < need) { - ssize_t r = read(m_iTDNSpipe[0], x, need - readed); - if (-1 == r) { - DEBUG("Something bad happened during read() from a pipe when getting result of TDNSThread: " << strerror(errno)); - exit(1); - } - readed += r; - x += r; + CDNSJob* a = NULL; + ssize_t need = sizeof(a); + ssize_t r = read(m_iTDNSpipe[0], &a, need); + if (r != need) { + DEBUG("Something bad happened during read() from a pipe when getting result of TDNSThread: " << strerror(errno)); + exit(1); } TDNSTask* task = a->task; if (0 != a->iRes) { @@ -313,21 +201,6 @@ void CSockManager::RetrieveTDNSResult() { CSockManager::CSockManager() { #ifdef HAVE_THREADED_DNS - int m = pthread_mutex_init(&m_threadStatus.mutex, NULL); - if (m) { - CUtils::PrintError("Can't initialize mutex: " + CString(strerror(errno))); - exit(1); - } - m = pthread_cond_init(&m_threadStatus.cond, NULL); - if (m) { - CUtils::PrintError("Can't initialize condition variable: " + CString(strerror(errno))); - exit(1); - } - - m_threadStatus.num_threads = 0; - m_threadStatus.num_idle = 0; - m_threadStatus.done = false; - if (pipe(m_iTDNSpipe)) { DEBUG("Ouch, can't open pipe for threaded DNS resolving: " << strerror(errno)); exit(1); @@ -338,20 +211,6 @@ CSockManager::CSockManager() { } CSockManager::~CSockManager() { -#ifdef HAVE_THREADED_DNS - /* Anyone has an idea how this can be done less ugly? */ - pthread_mutex_lock(&m_threadStatus.mutex); - m_threadStatus.done = true; - while (m_threadStatus.num_threads > 0) { - pthread_cond_broadcast(&m_threadStatus.cond); - pthread_mutex_unlock(&m_threadStatus.mutex); - usleep(100); - pthread_mutex_lock(&m_threadStatus.mutex); - } - pthread_mutex_unlock(&m_threadStatus.mutex); - pthread_cond_destroy(&m_threadStatus.cond); - pthread_mutex_destroy(&m_threadStatus.mutex); -#endif } void CSockManager::Connect(const CString& sHostname, u_short iPort, const CString& sSockName, int iTimeout, bool bSSL, const CString& sBindHost, CZNCSock *pcSock) { diff --git a/src/Threads.cpp b/src/Threads.cpp new file mode 100644 index 00000000..cb1c38b5 --- /dev/null +++ b/src/Threads.cpp @@ -0,0 +1,96 @@ +/* + * Copyright (C) 2004-2012 See the AUTHORS file for details. + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 as published + * by the Free Software Foundation. + */ + +#include + +#ifdef HAVE_PTHREAD + +/* Just an arbitrary limit for the number of idle threads */ +static const size_t MAX_IDLE_THREADS = 3; + +/* Just an arbitrary limit for the number of running threads */ +static const size_t MAX_TOTAL_THREADS = 20; + +CThreadPool& CThreadPool::Get() { + // Beware! The following is not thread-safe! This function must + // be called once any thread is started. + static CThreadPool pool; + return pool; +} + +CThreadPool::~CThreadPool() { + /* Anyone has an idea how this can be done less ugly? */ + CMutexLocker guard(m_mutex); + m_done = true; + + while (m_num_threads > 0) { + m_cond.broadcast(); + guard.unlock(); + usleep(100); + guard.lock(); + } +} + +bool CThreadPool::threadNeeded() const { + if (m_num_idle > MAX_IDLE_THREADS) + return false; + return !m_done; +} + +void CThreadPool::threadFunc() { + CMutexLocker guard(m_mutex); + m_num_threads++; + m_num_idle++; + + while (true) { + while (m_jobs.empty()) { + if (!threadNeeded()) + break; + m_cond.wait(m_mutex); + } + if (!threadNeeded()) + break; + + // Figure out a job to do + CJob *job = m_jobs.front(); + m_jobs.pop_front(); + + // Now do the actual job + m_num_idle--; + guard.unlock(); + + job->run(); + + guard.lock(); + m_num_idle++; + } + assert(m_num_threads > 0 && m_num_idle > 0); + m_num_threads--; + m_num_idle--; +} + +void CThreadPool::addJob(CJob *job) { + CMutexLocker guard(m_mutex); + m_jobs.push_back(job); + + // Do we already have a thread which can handle this job? + if (m_num_idle > 0) { + m_cond.signal(); + return; + } + + if (m_num_threads >= MAX_TOTAL_THREADS) + // We can't start a new thread. The job will be handled once + // some thread finishes its current job. + return; + + // Start a new thread for our pool + CThread::startThread(threadPoolFunc, this); +} + +#endif // HAVE_PTHREAD