From a1dfcec8db227c38c232802808d6de918bb5a1bd Mon Sep 17 00:00:00 2001 From: Bert Hubert Date: Sun, 16 Apr 2006 19:11:57 +0000 Subject: [PATCH] add epoll mplexer, update selectmplexer git-svn-id: svn://svn.powerdns.com/pdns/trunk/pdns@709 d19b8d6e-7fed-0310-83ef-9ca221ded41b --- pdns/epollmplexer.cc | 147 ++++++++++++++++++++++++++++++++++++++++++ pdns/mplexer.hh | 45 ++++++------- pdns/selectmplexer.cc | 66 ++++++++++++++----- 3 files changed, 217 insertions(+), 41 deletions(-) create mode 100644 pdns/epollmplexer.cc diff --git a/pdns/epollmplexer.cc b/pdns/epollmplexer.cc new file mode 100644 index 000000000..e3b2e86f0 --- /dev/null +++ b/pdns/epollmplexer.cc @@ -0,0 +1,147 @@ +#include "mplexer.hh" +#include "sstuff.hh" +#include +#include +#include "misc.hh" +#include +#include "syncres.hh" +#include + +using namespace boost; +using namespace std; + +#include + + +class EpollFDMultiplexer : public FDMultiplexer +{ +public: + EpollFDMultiplexer(); + virtual ~EpollFDMultiplexer() + { + close(d_epollfd); + } + + virtual int run(struct timeval* tv=0); + + virtual void addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, boost::any parameter); + virtual void removeFD(callbackmap_t& cbmap, int fd); +private: + int d_epollfd; + boost::shared_array d_eevents; + static int s_maxevents; // not a hard maximum +}; + +FDMultiplexer* getMultiplexer() +{ + return new EpollFDMultiplexer(); +} + + +int EpollFDMultiplexer::s_maxevents=1024; +EpollFDMultiplexer::EpollFDMultiplexer() : d_eevents(new epoll_event[s_maxevents]) +{ + d_epollfd=epoll_create(s_maxevents); // not hard max + if(d_epollfd < 0) + throw FDMultiplexerException("Setting up epoll: "+stringerror()); +} + +void EpollFDMultiplexer::addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, boost::any parameter) +{ + Callback cb; + cb.d_callback=toDo; + cb.d_parameter=parameter; + + if(cbmap.count(fd)) + throw FDMultiplexerException("Tried to add fd "+lexical_cast(fd)+ " to multiplexer twice"); + struct epoll_event eevent; + + eevent.events = (&cbmap == &d_readCallbacks) ? EPOLLIN : EPOLLOUT; + + eevent.data.u64=0; // placate valgrind (I love it so much) + eevent.data.fd=fd; + + if(epoll_ctl(d_epollfd, EPOLL_CTL_ADD, fd, &eevent) < 0) + throw FDMultiplexerException("Adding fd to epoll set: "+stringerror()); + + cbmap[fd]=cb; +} + +void EpollFDMultiplexer::removeFD(callbackmap_t& cbmap, int fd) +{ + if(d_inrun && d_iter->first==fd) // trying to remove us! + d_iter++; + + if(!cbmap.erase(fd)) + throw FDMultiplexerException("Tried to remove unlisted fd "+lexical_cast(fd)+ " from multiplexer"); + + if(epoll_ctl(d_epollfd, EPOLL_CTL_DEL, fd, 0) < 0) + throw FDMultiplexerException("Removing fd from epoll set: "+stringerror()); +} + +int EpollFDMultiplexer::run(struct timeval* now) +{ + if(d_inrun) { + throw FDMultiplexerException("FDMultiplexer::run() is not reentrant!\n"); + } + + int ret=epoll_wait(d_epollfd, d_eevents.get(), s_maxevents, 500); + if(now) + gettimeofday(now,0); + + if(ret < 0 && errno!=EINTR) + throw FDMultiplexerException("select returned error: "+stringerror()); + + if(ret==0) // nothing + return 0; + + d_inrun=true; + + for(int n=0; n < ret; ++n) { + d_iter=d_readCallbacks.find(d_eevents[n].data.fd); + + if(d_iter != d_readCallbacks.end()) + d_iter->second.d_callback(d_iter->first, d_iter->second.d_parameter); + + d_iter=d_writeCallbacks.find(d_eevents[n].data.fd); + + if(d_iter != d_writeCallbacks.end()) + d_iter->second.d_callback(d_iter->first, d_iter->second.d_parameter); + } + + d_inrun=false; + return 0; +} + +#if 0 +void acceptData(int fd, boost::any& parameter) +{ + cout<<"Have data on fd "<(parameter); + string packet; + IPEndpoint rem; + sock->recvFrom(packet, rem); + cout<<"Received "< #include +#include #include #include #include @@ -11,6 +12,13 @@ public: {} }; + +/** Very simple FD multiplexer, based on callbacks and boost::any parameters + As a special service, this parameter is kept around and can be modified, + allowing for state to be stored inside the multiplexer. + + It has some "interesting" semantics +*/ class FDMultiplexer { protected: @@ -29,54 +37,39 @@ public: virtual int run(struct timeval* tv=0) = 0; + //! Add an fd to the read watch list - currently an fd can only be on one list at a time! virtual void addReadFD(int fd, callbackfunc_t toDo, boost::any parameter=boost::any()) { - this->addFD(d_inrun ? d_newReadCallbacks : d_readCallbacks, fd, toDo, parameter); + this->addFD(d_readCallbacks, fd, toDo, parameter); } + //! Add an fd to the write watch list - currently an fd can only be on one list at a time! virtual void addWriteFD(int fd, callbackfunc_t toDo, boost::any parameter=boost::any()) { - this->addFD(d_inrun ? d_newWriteCallbacks : d_writeCallbacks, fd, toDo, parameter); + this->addFD(d_writeCallbacks, fd, toDo, parameter); } + //! Remove an fd from the read watch list. You can't call this function on an fd that is closed already! virtual void removeReadFD(int fd) { - this->removeFD(d_inrun ? d_newReadCallbacks : d_readCallbacks, fd); - } - virtual void removeWriteFD(int fd) - { - this->removeFD(d_inrun ? d_newWriteCallbacks : d_writeCallbacks, fd); + this->removeFD(d_readCallbacks, fd); } - virtual boost::any& getReadParameter(int fd) + //! Remove an fd from the write watch list. You can't call this function on an fd that is closed already! + virtual void removeWriteFD(int fd) { - return d_readCallbacks[fd].d_parameter; + this->removeFD(d_writeCallbacks, fd); } protected: typedef std::map callbackmap_t; callbackmap_t d_readCallbacks, d_writeCallbacks; - callbackmap_t d_newReadCallbacks, d_newWriteCallbacks; virtual void addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, boost::any parameter)=0; virtual void removeFD(callbackmap_t& cbmap, int fd)=0; bool d_inrun; + callbackmap_t::iterator d_iter; }; -class SelectFDMultiplexer : public FDMultiplexer -{ -public: - SelectFDMultiplexer() - {} - virtual ~SelectFDMultiplexer() - {} - - virtual int run(struct timeval* tv=0); - - virtual void addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, boost::any parameter); - virtual void removeFD(callbackmap_t& cbmap, int fd); - -private: -}; - +FDMultiplexer* getMultiplexer(); diff --git a/pdns/selectmplexer.cc b/pdns/selectmplexer.cc index 8d22c1c0b..c8a1d4842 100644 --- a/pdns/selectmplexer.cc +++ b/pdns/selectmplexer.cc @@ -4,16 +4,38 @@ #include #include "misc.hh" #include +#include "syncres.hh" using namespace boost; - using namespace std; +class SelectFDMultiplexer : public FDMultiplexer +{ +public: + SelectFDMultiplexer() + {} + virtual ~SelectFDMultiplexer() + {} + + virtual int run(struct timeval* tv=0); + + virtual void addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, boost::any parameter); + virtual void removeFD(callbackmap_t& cbmap, int fd); +}; + + +FDMultiplexer* getMultiplexer() +{ + return new SelectFDMultiplexer(); +} + + void SelectFDMultiplexer::addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, boost::any parameter) { Callback cb; cb.d_callback=toDo; cb.d_parameter=parameter; + if(cbmap.count(fd)) throw FDMultiplexerException("Tried to add fd "+lexical_cast(fd)+ " to multiplexer twice"); cbmap[fd]=cb; @@ -21,13 +43,18 @@ void SelectFDMultiplexer::addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toD void SelectFDMultiplexer::removeFD(callbackmap_t& cbmap, int fd) { + if(d_inrun && d_iter->first==fd) // trying to remove us! + d_iter++; + if(!cbmap.erase(fd)) throw FDMultiplexerException("Tried to remove unlisted fd "+lexical_cast(fd)+ " from multiplexer"); } - int SelectFDMultiplexer::run(struct timeval* now) { + if(d_inrun) { + throw FDMultiplexerException("FDMultiplexer::run() is not reentrant!\n"); + } fd_set readfds, writefds; FD_ZERO(&readfds); FD_ZERO(&writefds); @@ -38,6 +65,12 @@ int SelectFDMultiplexer::run(struct timeval* now) FD_SET(i->first, &readfds); fdmax=max(i->first, fdmax); } + + for(callbackmap_t::const_iterator i=d_writeCallbacks.begin(); i != d_writeCallbacks.end(); ++i) { + FD_SET(i->first, &writefds); + fdmax=max(i->first, fdmax); + } + struct timeval tv={0,500000}; int ret=select(fdmax + 1, &readfds, &writefds, 0, &tv); @@ -50,27 +83,30 @@ int SelectFDMultiplexer::run(struct timeval* now) if(ret==0) // nothing return 0; + d_iter=d_readCallbacks.end(); d_inrun=true; - d_newReadCallbacks=d_readCallbacks; - d_newWriteCallbacks=d_writeCallbacks; + + for(callbackmap_t::iterator i=d_readCallbacks.begin(); i != d_readCallbacks.end() && i->first <= fdmax; ) { + d_iter=i++; - for(callbackmap_t::iterator i=d_readCallbacks.begin(); i != d_readCallbacks.end(); ++i) { - if(FD_ISSET(i->first, &readfds)) - i->second.d_callback(i->first, i->second.d_parameter); - } - for(callbackmap_t::iterator i=d_writeCallbacks.begin(); i != d_writeCallbacks.end(); ++i) { - if(FD_ISSET(i->first, &writefds)) - i->second.d_callback(i->first, i->second.d_parameter); + if(FD_ISSET(d_iter->first, &readfds)) { + d_iter->second.d_callback(d_iter->first, d_iter->second.d_parameter); + } } - d_readCallbacks.swap(d_newReadCallbacks); - d_writeCallbacks.swap(d_newWriteCallbacks); + for(callbackmap_t::iterator i=d_writeCallbacks.begin(); i != d_writeCallbacks.end() && i->first <= fdmax; ) { + d_iter=i++; + if(FD_ISSET(d_iter->first, &writefds)) { + d_iter->second.d_callback(d_iter->first, d_iter->second.d_parameter); + } + } d_inrun=false; - return 0; } +#if 0 + void acceptData(int fd, boost::any& parameter) { cout<<"Have data on fd "<