From 7ccd554c2db207241f93c9a05ecb5646028850c1 Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Fri, 4 May 2018 08:17:49 -0400 Subject: [PATCH] abstract out epoll and make room for kqueue --- .../backends/nntpchan-daemon/daemon/main.cpp | 5 +- .../include/nntpchan/event.hpp | 27 +-- .../include/nntpchan/nntp_server.hpp | 2 +- .../include/nntpchan/server.hpp | 4 +- .../nntpchan-daemon/libnntpchan/epoll.hpp | 213 +++++++++++++++++ .../nntpchan-daemon/libnntpchan/event.cpp | 214 ++---------------- .../libnntpchan/nntp_server.cpp | 2 +- .../nntpchan-daemon/libnntpchan/server.cpp | 10 +- 8 files changed, 252 insertions(+), 225 deletions(-) create mode 100644 contrib/backends/nntpchan-daemon/libnntpchan/epoll.hpp diff --git a/contrib/backends/nntpchan-daemon/daemon/main.cpp b/contrib/backends/nntpchan-daemon/daemon/main.cpp index 9d4701f..a362b33 100644 --- a/contrib/backends/nntpchan-daemon/daemon/main.cpp +++ b/contrib/backends/nntpchan-daemon/daemon/main.cpp @@ -21,7 +21,7 @@ int main(int argc, char *argv[]) nntpchan::Crypto crypto; - nntpchan::Mainloop loop; + nntpchan::ev::Loop * loop = nntpchan::NewMainLoop(); nntpchan::NNTPServer * nntp = new nntpchan::NNTPServer(loop); @@ -143,8 +143,9 @@ int main(int argc, char *argv[]) return 1; } - loop.Run(); + loop->Run(); std::cerr << "Exiting" << std::endl; + delete loop; } else { diff --git a/contrib/backends/nntpchan-daemon/include/nntpchan/event.hpp b/contrib/backends/nntpchan-daemon/include/nntpchan/event.hpp index 18da6a6..0450405 100644 --- a/contrib/backends/nntpchan-daemon/include/nntpchan/event.hpp +++ b/contrib/backends/nntpchan-daemon/include/nntpchan/event.hpp @@ -31,24 +31,21 @@ namespace ev virtual bool acceptable() const { return false; }; virtual int accept() { return -1; }; }; + + struct Loop + { + public: + virtual ~Loop() {}; + + virtual bool BindTCP(const sockaddr * addr, ev::io * handler) = 0; + virtual bool TrackConn(ev::io * handler) = 0; + virtual void UntrackConn(ev::io * handler) = 0; + virtual void Run() = 0; + }; } -class Mainloop -{ -public: - Mainloop(); - ~Mainloop(); +ev::Loop * NewMainLoop(); - bool BindTCP(const sockaddr * addr, ev::io * handler); - bool TrackConn(ev::io * handler); - void UntrackConn(ev::io * handler); - void Run(); - -private: - size_t conns; - int epollfd; - char readbuf[128]; -}; } #endif diff --git a/contrib/backends/nntpchan-daemon/include/nntpchan/nntp_server.hpp b/contrib/backends/nntpchan-daemon/include/nntpchan/nntp_server.hpp index a8f4ad0..495b8bd 100644 --- a/contrib/backends/nntpchan-daemon/include/nntpchan/nntp_server.hpp +++ b/contrib/backends/nntpchan-daemon/include/nntpchan/nntp_server.hpp @@ -11,7 +11,7 @@ namespace nntpchan class NNTPServer : public Server { public: - NNTPServer(Mainloop & loop); + NNTPServer(ev::Loop * loop); virtual ~NNTPServer(); diff --git a/contrib/backends/nntpchan-daemon/include/nntpchan/server.hpp b/contrib/backends/nntpchan-daemon/include/nntpchan/server.hpp index f23f82f..d14a324 100644 --- a/contrib/backends/nntpchan-daemon/include/nntpchan/server.hpp +++ b/contrib/backends/nntpchan-daemon/include/nntpchan/server.hpp @@ -59,7 +59,7 @@ private: class Server : public ev::io { public: - Server(Mainloop & loop); + Server(ev::Loop * loop); virtual ~Server() {}; virtual bool acceptable() const { return true; }; @@ -91,7 +91,7 @@ protected: private: void OnAccept(int fd, int status); - Mainloop & m_Loop; + ev::Loop * m_Loop; std::deque m_conns; }; } diff --git a/contrib/backends/nntpchan-daemon/libnntpchan/epoll.hpp b/contrib/backends/nntpchan-daemon/libnntpchan/epoll.hpp new file mode 100644 index 0000000..ddfe429 --- /dev/null +++ b/contrib/backends/nntpchan-daemon/libnntpchan/epoll.hpp @@ -0,0 +1,213 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +namespace nntpchan +{ +namespace ev +{ + struct EpollLoop : public Loop + { + size_t conns; + int epollfd; + char readbuf[128]; + EpollLoop() : conns(0), epollfd(epoll_create1(EPOLL_CLOEXEC)) + { + } + ~EpollLoop() + { + ::close(epollfd); + } + + virtual bool BindTCP(const sockaddr * addr, ev::io * handler) + { + assert(handler->acceptable()); + socklen_t slen; + switch(addr->sa_family) + { + case AF_INET: + slen = sizeof(sockaddr_in); + break; + case AF_INET6: + slen = sizeof(sockaddr_in6); + break; + case AF_UNIX: + slen = sizeof(sockaddr_un); + break; + default: + return false; + } + int fd = socket(addr->sa_family, SOCK_STREAM | SOCK_NONBLOCK, 0); + if(fd == -1) + { + return false; + } + handler->fd = fd; + + if(bind(fd, addr, slen) == -1) + return false; + + if (listen(fd, 5) == -1) + return false; + + return TrackConn(handler); + } + + virtual bool TrackConn(ev::io * handler) + { + epoll_event ev; + ev.data.ptr = handler; + ev.events = EPOLLET; + if(handler->readable() || handler->acceptable()) + { + ev.events |= EPOLLIN; + } + if(handler->writeable()) + { + ev.events |= EPOLLOUT; + } + if ( epoll_ctl(epollfd, EPOLL_CTL_ADD, handler->fd, &ev) == -1) + { + return false; + } + ++conns; + return true; + } + + virtual void UntrackConn(ev::io * handler) + { + if(epoll_ctl(epollfd, EPOLL_CTL_DEL, handler->fd, nullptr) != -1) + --conns; + } + + + virtual void Run() + { + epoll_event evs[512]; + epoll_event * ev; + ev::io * handler; + int res = -1; + int idx ; + + sigset_t mask; + + sigemptyset(&mask); + sigaddset(&mask, SIGWINCH); + + int sfd = signalfd(-1, &mask, SFD_NONBLOCK | SFD_CLOEXEC); + epoll_event sig_ev; + sig_ev.data.fd = sfd; + sig_ev.events = EPOLLIN; + epoll_ctl(epollfd, EPOLL_CTL_ADD, sfd, &sig_ev); + do + { + res = epoll_wait(epollfd, evs, 512, -1); + idx = 0; + while(idx < res) + { + errno = 0; + ev = &evs[idx++]; + if(ev->data.fd == sfd) + { + read(sfd, readbuf, sizeof(readbuf)); + continue; + } + + handler = static_cast(ev->data.ptr); + + if(ev->events & EPOLLERR || ev->events & EPOLLHUP) + { + handler->close(); + delete handler; + continue; + } + + if (handler->acceptable()) + { + int acceptfd; + bool errored = false; + while(true) + { + acceptfd = handler->accept(); + if(acceptfd == -1) + { + if (errno == EAGAIN || errno == EWOULDBLOCK) + { + break; + } + perror("accept()"); + errored = true; + break; + } + } + if(errored) + { + handler->close(); + delete handler; + continue; + } + } + if(ev->events & EPOLLIN && handler->readable()) + { + bool errored = false; + while(true) + { + int readed = handler->read(readbuf, sizeof(readbuf)); + if(readed == -1) + { + if(errno != EAGAIN) + { + perror("read()"); + handler->close(); + delete handler; + errored = true; + } + break; + } + else if (readed == 0) + { + handler->close(); + delete handler; + errored = true; + break; + } + } + if(errored) continue; + } + if(ev->events & EPOLLOUT && handler->writeable()) + { + int written = handler->write(); + if(written < 0) + { + if (errno == EAGAIN || errno == EWOULDBLOCK) + { + // blocking + } + else + { + perror("write()"); + handler->close(); + delete handler; + } + } + } + if (!handler->keepalive()) + { + handler->close(); + delete handler; + } + } + } + while(res != -1 && conns); + } +}; +} +} diff --git a/contrib/backends/nntpchan-daemon/libnntpchan/event.cpp b/contrib/backends/nntpchan-daemon/libnntpchan/event.cpp index 72f9438..e318746 100644 --- a/contrib/backends/nntpchan-daemon/libnntpchan/event.cpp +++ b/contrib/backends/nntpchan-daemon/libnntpchan/event.cpp @@ -1,205 +1,21 @@ -#include #include -#include -#include -#include -#include -#include -#include -#include -#include +#ifdef __linux__ +#include "epoll.hpp" +typedef nntpchan::ev::EpollLoop LoopImpl; +#else +#ifdef __freebsd__ +#include "kqueue.hpp" +typedef nntpchan::ev::KqueueLoop LoopImpl; +#else +#error "unsupported platform" +#endif +#endif -namespace nntpchan +namespace nntpchan { -Mainloop::Mainloop() : conns(0) -{ - epollfd = epoll_create1(EPOLL_CLOEXEC); -} - -Mainloop::~Mainloop() { close(epollfd); } - - -bool Mainloop::BindTCP(const sockaddr * addr, ev::io * handler) -{ - assert(handler->acceptable()); - socklen_t slen; - switch(addr->sa_family) + ev::Loop * NewMainLoop() { - case AF_INET: - slen = sizeof(sockaddr_in); - break; - case AF_INET6: - slen = sizeof(sockaddr_in6); - break; - case AF_UNIX: - slen = sizeof(sockaddr_un); - break; - default: - return false; + return new LoopImpl; } - int fd = socket(addr->sa_family, SOCK_STREAM | SOCK_NONBLOCK, 0); - if(fd == -1) - { - return false; - } - std::cout << "bind to " << fd << std::endl; - handler->fd = fd; - - if(bind(fd, addr, slen) == -1) - return false; - - if (listen(fd, 5) == -1) - return false; - - return TrackConn(handler); -} - -bool Mainloop::TrackConn(ev::io * handler) -{ - epoll_event ev; - ev.data.ptr = handler; - ev.events = EPOLLET; - if(handler->readable() || handler->acceptable()) - { - ev.events |= EPOLLIN; - } - if(handler->writeable()) - { - ev.events |= EPOLLOUT; - } - if ( epoll_ctl(epollfd, EPOLL_CTL_ADD, handler->fd, &ev) == -1) - { - return false; - } - ++conns; - return true; -} - -void Mainloop::UntrackConn(ev::io * handler) -{ - if(epoll_ctl(epollfd, EPOLL_CTL_DEL, handler->fd, nullptr) != -1) - --conns; -} - - -void Mainloop::Run() -{ - epoll_event evs[512]; - epoll_event * ev; - ev::io * handler; - int res = -1; - int idx ; - - sigset_t mask; - - sigemptyset(&mask); - sigaddset(&mask, SIGWINCH); - - int sfd = signalfd(-1, &mask, SFD_NONBLOCK | SFD_CLOEXEC); - epoll_event sig_ev; - sig_ev.data.fd = sfd; - sig_ev.events = EPOLLIN; - epoll_ctl(epollfd, EPOLL_CTL_ADD, sfd, &sig_ev); - do - { - res = epoll_wait(epollfd, evs, 512, -1); - idx = 0; - while(idx < res) - { - errno = 0; - ev = &evs[idx++]; - if(ev->data.fd == sfd) - { - read(sfd, readbuf, sizeof(readbuf)); - continue; - } - - handler = static_cast(ev->data.ptr); - - if(ev->events & EPOLLERR || ev->events & EPOLLHUP) - { - handler->close(); - delete handler; - continue; - } - - if (handler->acceptable()) - { - int acceptfd; - bool errored = false; - while(true) - { - acceptfd = handler->accept(); - if(acceptfd == -1) - { - if (errno == EAGAIN || errno == EWOULDBLOCK) - { - break; - } - perror("accept()"); - errored = true; - break; - } - } - if(errored) - { - handler->close(); - delete handler; - continue; - } - } - if(ev->events & EPOLLIN && handler->readable()) - { - bool errored = false; - while(true) - { - int readed = handler->read(readbuf, sizeof(readbuf)); - if(readed == -1) - { - if(errno != EAGAIN) - { - perror("read()"); - handler->close(); - delete handler; - errored = true; - } - break; - } - else if (readed == 0) - { - handler->close(); - delete handler; - errored = true; - break; - } - } - if(errored) continue; - } - if(ev->events & EPOLLOUT && handler->writeable()) - { - int written = handler->write(); - if(written < 0) - { - if (errno == EAGAIN || errno == EWOULDBLOCK) - { - // blocking - } - else - { - perror("write()"); - handler->close(); - delete handler; - } - } - } - if (!handler->keepalive()) - { - handler->close(); - delete handler; - } - } - } - while(res != -1 && conns); -} -} +} \ No newline at end of file diff --git a/contrib/backends/nntpchan-daemon/libnntpchan/nntp_server.cpp b/contrib/backends/nntpchan-daemon/libnntpchan/nntp_server.cpp index 3d40c0b..caa36fc 100644 --- a/contrib/backends/nntpchan-daemon/libnntpchan/nntp_server.cpp +++ b/contrib/backends/nntpchan-daemon/libnntpchan/nntp_server.cpp @@ -11,7 +11,7 @@ namespace nntpchan { -NNTPServer::NNTPServer(Mainloop & loop) : Server(loop), m_frontend(nullptr) {} +NNTPServer::NNTPServer(ev::Loop * loop) : Server(loop), m_frontend(nullptr) {} NNTPServer::~NNTPServer() {} diff --git a/contrib/backends/nntpchan-daemon/libnntpchan/server.cpp b/contrib/backends/nntpchan-daemon/libnntpchan/server.cpp index bc7b42e..ae998eb 100644 --- a/contrib/backends/nntpchan-daemon/libnntpchan/server.cpp +++ b/contrib/backends/nntpchan-daemon/libnntpchan/server.cpp @@ -7,7 +7,7 @@ namespace nntpchan { -Server::Server(Mainloop & loop) : ev::io(-1), m_Loop(loop) +Server::Server(ev::Loop * loop) : ev::io(-1), m_Loop(loop) { } @@ -18,13 +18,13 @@ void Server::close() { itr = m_conns.erase(itr); } - m_Loop.UntrackConn(this); + m_Loop->UntrackConn(this); ev::io::close(); } bool Server::Bind(const std::string &addr) { auto saddr = ParseAddr(addr); - return m_Loop.BindTCP(saddr, this); + return m_Loop->BindTCP(saddr, this); } void Server::OnAccept(int f, int status) @@ -36,7 +36,7 @@ void Server::OnAccept(int f, int status) } IServerConn *conn = CreateConn(f); - if(m_Loop.TrackConn(conn)) + if(m_Loop->TrackConn(conn)) { m_conns.push_back(conn); conn->Greet(); @@ -68,7 +68,7 @@ void Server::RemoveConn(IServerConn *conn) else ++itr; } - m_Loop.UntrackConn(conn); + m_Loop->UntrackConn(conn); } void IConnHandler::QueueLine(const std::string &line) { m_sendlines.push_back(line+"\r\n"); }