Add a generic threads abstraction

This should make it easier to work with threads. It provides classes for mutexes
and condition variables. Additionally, there is a special CMutexGuard that
automatically unlocks the mutex on destruction and a CThreadPool class.

This thread pool is used to replace the thread pool in the sockets code.

Signed-off-by: Uli Schlachter <psychon@znc.in>
This commit is contained in:
Uli Schlachter
2012-11-10 17:13:54 +01:00
parent 2db7307ac3
commit 75f2e3fa41
6 changed files with 349 additions and 181 deletions

View File

@@ -10,9 +10,6 @@
#include <znc/IRCNetwork.h>
#include <signal.h>
/* We should need 2 DNS threads (host, bindhost) per IRC connection */
static const size_t MAX_IDLE_THREADS = 2;
unsigned int CSockManager::GetAnonConnectionCount(const CString &sIP) const {
const_iterator it;
unsigned int ret = 0;
@@ -55,55 +52,7 @@ public:
}
};
bool CSockManager::ThreadNeeded(struct TDNSStatus* threadStatus)
{
// We should keep a number of idle threads alive
if (threadStatus->num_idle > MAX_IDLE_THREADS)
return false;
// If ZNC is shutting down, all threads should exit
return !threadStatus->done;
}
void* CSockManager::TDNSThread(void* argument) {
TDNSStatus *threadStatus = (TDNSStatus *) argument;
pthread_mutex_lock(&threadStatus->mutex);
threadStatus->num_threads++;
threadStatus->num_idle++;
while (true) {
/* Wait for a DNS job for us to do. This is a while()-loop
* because POSIX allows spurious wakeups from pthread_cond_wait.
*/
while (threadStatus->jobs.empty()) {
if (!ThreadNeeded(threadStatus))
break;
pthread_cond_wait(&threadStatus->cond, &threadStatus->mutex);
}
if (!ThreadNeeded(threadStatus))
break;
/* Figure out a DNS job to do */
assert(threadStatus->num_idle > 0);
TDNSArg *job = threadStatus->jobs.front();
threadStatus->jobs.pop_front();
threadStatus->num_idle--;
pthread_mutex_unlock(&threadStatus->mutex);
/* Now do the actual work */
DoDNS(job);
pthread_mutex_lock(&threadStatus->mutex);
threadStatus->num_idle++;
}
assert(threadStatus->num_threads > 0 && threadStatus->num_idle > 0);
threadStatus->num_threads--;
threadStatus->num_idle--;
pthread_mutex_unlock(&threadStatus->mutex);
return NULL;
}
void CSockManager::DoDNS(TDNSArg *arg) {
void CSockManager::CDNSJob::run() {
int iCount = 0;
while (true) {
addrinfo hints;
@@ -112,25 +61,25 @@ void CSockManager::DoDNS(TDNSArg *arg) {
hints.ai_socktype = SOCK_STREAM;
hints.ai_protocol = IPPROTO_TCP;
hints.ai_flags = AI_ADDRCONFIG;
arg->iRes = getaddrinfo(arg->sHostname.c_str(), NULL, &hints, &arg->aiResult);
if (EAGAIN != arg->iRes) {
iRes = getaddrinfo(sHostname.c_str(), NULL, &hints, &aiResult);
if (EAGAIN != iRes) {
break;
}
iCount++;
if (iCount > 5) {
arg->iRes = ETIMEDOUT;
iRes = ETIMEDOUT;
break;
}
sleep(5); // wait 5 seconds before next try
}
size_t need = sizeof(TDNSArg*);
char* x = (char*)&arg;
// This write() must succeed because POSIX guarantees that writes of
// less than PIPE_BUF are atomic (and PIPE_BUF is at least 512).
size_t w = write(arg->fd, x, need);
if (w != need) {
// (Yes, this really wants to write a pointer(!) to the pipe.
CDNSJob *job = this;
size_t w = write(fd, &job, sizeof(job));
if (w != sizeof(job)) {
DEBUG("Something bad happened during write() to a pipe from TDNSThread, wrote " << w << " bytes: " << strerror(errno));
exit(1);
}
@@ -138,7 +87,7 @@ void CSockManager::DoDNS(TDNSArg *arg) {
void CSockManager::StartTDNSThread(TDNSTask* task, bool bBind) {
CString sHostname = bBind ? task->sBindhost : task->sHostname;
TDNSArg* arg = new TDNSArg;
CDNSJob* arg = new CDNSJob;
arg->sHostname = sHostname;
arg->task = task;
arg->fd = m_iTDNSpipe[1];
@@ -146,62 +95,7 @@ void CSockManager::StartTDNSThread(TDNSTask* task, bool bBind) {
arg->iRes = 0;
arg->aiResult = NULL;
pthread_mutex_lock(&m_threadStatus.mutex);
m_threadStatus.jobs.push_back(arg);
/* Do we need a new DNS thread? */
if (m_threadStatus.num_idle > 0) {
/* Nope, there is one waiting for a job */
pthread_cond_signal(&m_threadStatus.cond);
pthread_mutex_unlock(&m_threadStatus.mutex);
return;
}
pthread_mutex_unlock(&m_threadStatus.mutex);
pthread_attr_t attr;
if (pthread_attr_init(&attr)) {
CString sError = "Couldn't init pthread_attr for " + sHostname;
DEBUG(sError);
CZNC::Get().Broadcast(sError, /* bAdminOnly = */ true);
SetTDNSThreadFinished(task, bBind, NULL);
return;
}
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
pthread_t thr;
sigset_t old_sigmask;
sigset_t sigmask;
sigfillset(&sigmask);
/* Block all signals. The thread will inherit our signal mask and thus
* won't ever try to handle any signals.
*/
if (pthread_sigmask(SIG_SETMASK, &sigmask, &old_sigmask)) {
CString sError = "Couldn't block signals";
DEBUG(sError);
CZNC::Get().Broadcast(sError, /* bAdminOnly = */ true);
delete arg;
pthread_attr_destroy(&attr);
SetTDNSThreadFinished(task, bBind, NULL);
return;
}
if (pthread_create(&thr, &attr, TDNSThread, &m_threadStatus)) {
CString sError = "Couldn't create thread for " + sHostname;
DEBUG(sError);
CZNC::Get().Broadcast(sError, /* bAdminOnly = */ true);
delete arg;
pthread_attr_destroy(&attr);
SetTDNSThreadFinished(task, bBind, NULL);
return;
}
if (pthread_sigmask(SIG_SETMASK, &old_sigmask, NULL)) {
CString sError = "Couldn't unblock signals";
DEBUG(sError);
CZNC::Get().Broadcast(sError, /* bAdminOnly = */ true);
delete arg;
pthread_attr_destroy(&attr);
SetTDNSThreadFinished(task, bBind, NULL);
return;
}
pthread_attr_destroy(&attr);
CThreadPool::Get().addJob(arg);
}
void CSockManager::SetTDNSThreadFinished(TDNSTask* task, bool bBind, addrinfo* aiResult) {
@@ -285,18 +179,12 @@ void CSockManager::SetTDNSThreadFinished(TDNSTask* task, bool bBind, addrinfo* a
}
void CSockManager::RetrieveTDNSResult() {
TDNSArg* a = NULL;
size_t readed = 0;
size_t need = sizeof(TDNSArg*);
char* x = (char*)&a;
while (readed < need) {
ssize_t r = read(m_iTDNSpipe[0], x, need - readed);
if (-1 == r) {
DEBUG("Something bad happened during read() from a pipe when getting result of TDNSThread: " << strerror(errno));
exit(1);
}
readed += r;
x += r;
CDNSJob* a = NULL;
ssize_t need = sizeof(a);
ssize_t r = read(m_iTDNSpipe[0], &a, need);
if (r != need) {
DEBUG("Something bad happened during read() from a pipe when getting result of TDNSThread: " << strerror(errno));
exit(1);
}
TDNSTask* task = a->task;
if (0 != a->iRes) {
@@ -313,21 +201,6 @@ void CSockManager::RetrieveTDNSResult() {
CSockManager::CSockManager() {
#ifdef HAVE_THREADED_DNS
int m = pthread_mutex_init(&m_threadStatus.mutex, NULL);
if (m) {
CUtils::PrintError("Can't initialize mutex: " + CString(strerror(errno)));
exit(1);
}
m = pthread_cond_init(&m_threadStatus.cond, NULL);
if (m) {
CUtils::PrintError("Can't initialize condition variable: " + CString(strerror(errno)));
exit(1);
}
m_threadStatus.num_threads = 0;
m_threadStatus.num_idle = 0;
m_threadStatus.done = false;
if (pipe(m_iTDNSpipe)) {
DEBUG("Ouch, can't open pipe for threaded DNS resolving: " << strerror(errno));
exit(1);
@@ -338,20 +211,6 @@ CSockManager::CSockManager() {
}
CSockManager::~CSockManager() {
#ifdef HAVE_THREADED_DNS
/* Anyone has an idea how this can be done less ugly? */
pthread_mutex_lock(&m_threadStatus.mutex);
m_threadStatus.done = true;
while (m_threadStatus.num_threads > 0) {
pthread_cond_broadcast(&m_threadStatus.cond);
pthread_mutex_unlock(&m_threadStatus.mutex);
usleep(100);
pthread_mutex_lock(&m_threadStatus.mutex);
}
pthread_mutex_unlock(&m_threadStatus.mutex);
pthread_cond_destroy(&m_threadStatus.cond);
pthread_mutex_destroy(&m_threadStatus.mutex);
#endif
}
void CSockManager::Connect(const CString& sHostname, u_short iPort, const CString& sSockName, int iTimeout, bool bSSL, const CString& sBindHost, CZNCSock *pcSock) {