mirror of
https://github.com/znc/znc.git
synced 2026-05-04 20:42:33 +02:00
Add clang-format configuration.
For now, it uses tabs like before, to make the diff easier to read/check. One of following commits will switch it to spaces.
This commit is contained in:
109
src/Threads.cpp
109
src/Threads.cpp
@@ -35,7 +35,16 @@ CThreadPool& CThreadPool::Get() {
|
||||
return pool;
|
||||
}
|
||||
|
||||
CThreadPool::CThreadPool() : m_mutex(), m_cond(), m_cancellationCond(), m_exit_cond(), m_done(false), m_num_threads(0), m_num_idle(0), m_iJobPipe{0,0}, m_jobs() {
|
||||
CThreadPool::CThreadPool()
|
||||
: m_mutex(),
|
||||
m_cond(),
|
||||
m_cancellationCond(),
|
||||
m_exit_cond(),
|
||||
m_done(false),
|
||||
m_num_threads(0),
|
||||
m_num_idle(0),
|
||||
m_iJobPipe{0, 0},
|
||||
m_jobs() {
|
||||
if (pipe(m_iJobPipe)) {
|
||||
DEBUG("Ouch, can't open pipe for thread pool: " << strerror(errno));
|
||||
exit(1);
|
||||
@@ -59,27 +68,30 @@ void CThreadPool::jobDone(CJob* job) {
|
||||
// (Yes, this really wants to write a pointer(!) to the pipe.
|
||||
size_t w = write(m_iJobPipe[1], &job, sizeof(job));
|
||||
if (w != sizeof(job)) {
|
||||
DEBUG("Something bad happened during write() to a pipe for thread pool, wrote " << w << " bytes: " << strerror(errno));
|
||||
DEBUG(
|
||||
"Something bad happened during write() to a pipe for thread pool, "
|
||||
"wrote "
|
||||
<< w << " bytes: " << strerror(errno));
|
||||
exit(1);
|
||||
}
|
||||
}
|
||||
|
||||
void CThreadPool::handlePipeReadable() const {
|
||||
finishJob(getJobFromPipe());
|
||||
}
|
||||
void CThreadPool::handlePipeReadable() const { finishJob(getJobFromPipe()); }
|
||||
|
||||
CJob *CThreadPool::getJobFromPipe() const {
|
||||
CJob* CThreadPool::getJobFromPipe() const {
|
||||
CJob* a = nullptr;
|
||||
ssize_t need = sizeof(a);
|
||||
ssize_t r = read(m_iJobPipe[0], &a, need);
|
||||
if (r != need) {
|
||||
DEBUG("Something bad happened during read() from a pipe for thread pool: " << strerror(errno));
|
||||
DEBUG(
|
||||
"Something bad happened during read() from a pipe for thread pool: "
|
||||
<< strerror(errno));
|
||||
exit(1);
|
||||
}
|
||||
return a;
|
||||
}
|
||||
|
||||
void CThreadPool::finishJob(CJob *job) const {
|
||||
void CThreadPool::finishJob(CJob* job) const {
|
||||
job->runMain();
|
||||
delete job;
|
||||
}
|
||||
@@ -95,8 +107,7 @@ CThreadPool::~CThreadPool() {
|
||||
}
|
||||
|
||||
bool CThreadPool::threadNeeded() const {
|
||||
if (m_num_idle > MAX_IDLE_THREADS)
|
||||
return false;
|
||||
if (m_num_idle > MAX_IDLE_THREADS) return false;
|
||||
return !m_done;
|
||||
}
|
||||
|
||||
@@ -107,15 +118,13 @@ void CThreadPool::threadFunc() {
|
||||
|
||||
while (true) {
|
||||
while (m_jobs.empty()) {
|
||||
if (!threadNeeded())
|
||||
break;
|
||||
if (!threadNeeded()) break;
|
||||
m_cond.wait(m_mutex);
|
||||
}
|
||||
if (!threadNeeded())
|
||||
break;
|
||||
if (!threadNeeded()) break;
|
||||
|
||||
// Figure out a job to do
|
||||
CJob *job = m_jobs.front();
|
||||
CJob* job = m_jobs.front();
|
||||
m_jobs.pop_front();
|
||||
|
||||
// Now do the actual job
|
||||
@@ -133,11 +142,10 @@ void CThreadPool::threadFunc() {
|
||||
m_num_threads--;
|
||||
m_num_idle--;
|
||||
|
||||
if (m_num_threads == 0 && m_done)
|
||||
m_exit_cond.notify_one();
|
||||
if (m_num_threads == 0 && m_done) m_exit_cond.notify_one();
|
||||
}
|
||||
|
||||
void CThreadPool::addJob(CJob *job) {
|
||||
void CThreadPool::addJob(CJob* job) {
|
||||
CMutexLocker guard(m_mutex);
|
||||
m_jobs.push_back(job);
|
||||
|
||||
@@ -154,18 +162,16 @@ void CThreadPool::addJob(CJob *job) {
|
||||
|
||||
// Start a new thread for our pool
|
||||
m_num_threads++;
|
||||
std::thread([this]() {
|
||||
threadFunc();
|
||||
}).detach();
|
||||
std::thread([this]() { threadFunc(); }).detach();
|
||||
}
|
||||
|
||||
void CThreadPool::cancelJob(CJob *job) {
|
||||
std::set<CJob *> jobs;
|
||||
void CThreadPool::cancelJob(CJob* job) {
|
||||
std::set<CJob*> jobs;
|
||||
jobs.insert(job);
|
||||
cancelJobs(jobs);
|
||||
}
|
||||
|
||||
void CThreadPool::cancelJobs(const std::set<CJob *> &jobs) {
|
||||
void CThreadPool::cancelJobs(const std::set<CJob*>& jobs) {
|
||||
// Thanks to the mutex, jobs cannot change state anymore. There are
|
||||
// three different states which can occur:
|
||||
//
|
||||
@@ -184,36 +190,37 @@ void CThreadPool::cancelJobs(const std::set<CJob *> &jobs) {
|
||||
// cancellation is done by changing the job's status to DONE.
|
||||
|
||||
CMutexLocker guard(m_mutex);
|
||||
std::set<CJob *> wait, finished, deleteLater;
|
||||
std::set<CJob *>::const_iterator it;
|
||||
std::set<CJob*> wait, finished, deleteLater;
|
||||
std::set<CJob*>::const_iterator it;
|
||||
|
||||
// Start cancelling all jobs
|
||||
for (it = jobs.begin(); it != jobs.end(); ++it) {
|
||||
switch ((*it)->m_eState) {
|
||||
case CJob::READY: {
|
||||
(*it)->m_eState = CJob::CANCELLED;
|
||||
case CJob::READY: {
|
||||
(*it)->m_eState = CJob::CANCELLED;
|
||||
|
||||
// Job wasn't started yet, must be in the queue
|
||||
std::list<CJob *>::iterator it2 = std::find(m_jobs.begin(), m_jobs.end(), *it);
|
||||
assert(it2 != m_jobs.end());
|
||||
m_jobs.erase(it2);
|
||||
deleteLater.insert(*it);
|
||||
continue;
|
||||
}
|
||||
// Job wasn't started yet, must be in the queue
|
||||
std::list<CJob*>::iterator it2 =
|
||||
std::find(m_jobs.begin(), m_jobs.end(), *it);
|
||||
assert(it2 != m_jobs.end());
|
||||
m_jobs.erase(it2);
|
||||
deleteLater.insert(*it);
|
||||
continue;
|
||||
}
|
||||
|
||||
case CJob::RUNNING:
|
||||
(*it)->m_eState = CJob::CANCELLED;
|
||||
wait.insert(*it);
|
||||
continue;
|
||||
case CJob::RUNNING:
|
||||
(*it)->m_eState = CJob::CANCELLED;
|
||||
wait.insert(*it);
|
||||
continue;
|
||||
|
||||
case CJob::DONE:
|
||||
(*it)->m_eState = CJob::CANCELLED;
|
||||
finished.insert(*it);
|
||||
continue;
|
||||
case CJob::DONE:
|
||||
(*it)->m_eState = CJob::CANCELLED;
|
||||
finished.insert(*it);
|
||||
continue;
|
||||
|
||||
case CJob::CANCELLED:
|
||||
default:
|
||||
assert(0);
|
||||
case CJob::CANCELLED:
|
||||
default:
|
||||
assert(0);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -234,19 +241,19 @@ void CThreadPool::cancelJobs(const std::set<CJob *> &jobs) {
|
||||
it++;
|
||||
}
|
||||
|
||||
if (wait.empty())
|
||||
break;
|
||||
if (wait.empty()) break;
|
||||
|
||||
// Then wait for more to be done
|
||||
m_cancellationCond.wait(m_mutex);
|
||||
}
|
||||
|
||||
// We must call destructors with m_mutex unlocked so that they can call wasCancelled()
|
||||
// We must call destructors with m_mutex unlocked so that they can call
|
||||
// wasCancelled()
|
||||
guard.unlock();
|
||||
|
||||
// Handle finished jobs. They must already be in the pipe.
|
||||
while (!finished.empty()) {
|
||||
CJob *job = getJobFromPipe();
|
||||
CJob* job = getJobFromPipe();
|
||||
if (finished.erase(job) > 0) {
|
||||
assert(job->m_eState == CJob::CANCELLED);
|
||||
delete job;
|
||||
@@ -266,4 +273,4 @@ bool CJob::wasCancelled() const {
|
||||
return m_eState == CANCELLED;
|
||||
}
|
||||
|
||||
#endif // HAVE_PTHREAD
|
||||
#endif // HAVE_PTHREAD
|
||||
|
||||
Reference in New Issue
Block a user