]> granicus.if.org Git - pdns/commitdiff
add epoll mplexer, update selectmplexer
authorBert Hubert <bert.hubert@netherlabs.nl>
Sun, 16 Apr 2006 19:11:57 +0000 (19:11 +0000)
committerBert Hubert <bert.hubert@netherlabs.nl>
Sun, 16 Apr 2006 19:11:57 +0000 (19:11 +0000)
git-svn-id: svn://svn.powerdns.com/pdns/trunk/pdns@709 d19b8d6e-7fed-0310-83ef-9ca221ded41b

pdns/epollmplexer.cc [new file with mode: 0644]
pdns/mplexer.hh
pdns/selectmplexer.cc

diff --git a/pdns/epollmplexer.cc b/pdns/epollmplexer.cc
new file mode 100644 (file)
index 0000000..e3b2e86
--- /dev/null
@@ -0,0 +1,147 @@
+#include "mplexer.hh"
+#include "sstuff.hh"
+#include <iostream>
+#include <unistd.h>
+#include "misc.hh"
+#include <boost/lexical_cast.hpp>
+#include "syncres.hh"
+#include <sys/epoll.h>
+
+using namespace boost;
+using namespace std;
+
+#include <sys/epoll.h>
+
+
+class EpollFDMultiplexer : public FDMultiplexer
+{
+public:
+  EpollFDMultiplexer();
+  virtual ~EpollFDMultiplexer()
+  {
+    close(d_epollfd);
+  }
+
+  virtual int run(struct timeval* tv=0);
+
+  virtual void addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, boost::any parameter);
+  virtual void removeFD(callbackmap_t& cbmap, int fd);
+private:
+  int d_epollfd;
+  boost::shared_array<epoll_event> d_eevents;
+  static int s_maxevents; // not a hard maximum
+};
+
+FDMultiplexer* getMultiplexer()
+{
+  return new EpollFDMultiplexer();
+}
+
+
+int EpollFDMultiplexer::s_maxevents=1024;
+EpollFDMultiplexer::EpollFDMultiplexer() : d_eevents(new epoll_event[s_maxevents])
+{
+  d_epollfd=epoll_create(s_maxevents); // not hard max
+  if(d_epollfd < 0)
+    throw FDMultiplexerException("Setting up epoll: "+stringerror());
+}
+
+void EpollFDMultiplexer::addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, boost::any parameter)
+{
+  Callback cb;
+  cb.d_callback=toDo;
+  cb.d_parameter=parameter;
+
+  if(cbmap.count(fd))
+    throw FDMultiplexerException("Tried to add fd "+lexical_cast<string>(fd)+ " to multiplexer twice");
+  struct epoll_event eevent;
+  
+  eevent.events = (&cbmap == &d_readCallbacks) ? EPOLLIN : EPOLLOUT;
+  
+  eevent.data.u64=0; // placate valgrind (I love it so much)
+  eevent.data.fd=fd; 
+
+  if(epoll_ctl(d_epollfd, EPOLL_CTL_ADD, fd, &eevent) < 0)
+    throw FDMultiplexerException("Adding fd to epoll set: "+stringerror());
+
+  cbmap[fd]=cb;
+}
+
+void EpollFDMultiplexer::removeFD(callbackmap_t& cbmap, int fd)
+{
+  if(d_inrun && d_iter->first==fd)  // trying to remove us!
+    d_iter++;
+
+  if(!cbmap.erase(fd))
+    throw FDMultiplexerException("Tried to remove unlisted fd "+lexical_cast<string>(fd)+ " from multiplexer");
+
+  if(epoll_ctl(d_epollfd, EPOLL_CTL_DEL, fd, 0) < 0)
+    throw FDMultiplexerException("Removing fd from epoll set: "+stringerror());
+}
+
+int EpollFDMultiplexer::run(struct timeval* now)
+{
+  if(d_inrun) {
+    throw FDMultiplexerException("FDMultiplexer::run() is not reentrant!\n");
+  }
+  
+  int ret=epoll_wait(d_epollfd, d_eevents.get(), s_maxevents, 500);
+  if(now)
+    gettimeofday(now,0);
+  
+  if(ret < 0 && errno!=EINTR)
+    throw FDMultiplexerException("select returned error: "+stringerror());
+
+  if(ret==0) // nothing
+    return 0;
+
+  d_inrun=true;
+
+  for(int n=0; n < ret; ++n) {
+    d_iter=d_readCallbacks.find(d_eevents[n].data.fd);
+    
+    if(d_iter != d_readCallbacks.end())
+      d_iter->second.d_callback(d_iter->first, d_iter->second.d_parameter);
+
+    d_iter=d_writeCallbacks.find(d_eevents[n].data.fd);
+    
+    if(d_iter != d_writeCallbacks.end())
+      d_iter->second.d_callback(d_iter->first, d_iter->second.d_parameter);
+  }
+
+  d_inrun=false;
+  return 0;
+}
+
+#if 0
+void acceptData(int fd, boost::any& parameter)
+{
+  cout<<"Have data on fd "<<fd<<endl;
+  Socket* sock=boost::any_cast<Socket*>(parameter);
+  string packet;
+  IPEndpoint rem;
+  sock->recvFrom(packet, rem);
+  cout<<"Received "<<packet.size()<<" bytes!\n";
+}
+
+
+int main()
+{
+  Socket s(InterNetwork, Datagram);
+  
+  IPEndpoint loc("0.0.0.0", 2000);
+  s.bind(loc);
+
+  EpollFDMultiplexer sfm;
+
+  sfm.addReadFD(s.getHandle(), &acceptData, &s);
+
+  for(int n=0; n < 100 ; ++n) {
+    sfm.run();
+  }
+  sfm.removeReadFD(s.getHandle());
+  sfm.removeReadFD(s.getHandle());
+}
+#endif
+
+
index ba56cfab72b6d7caed28ffe95fbbad47cde8eb0c..09de16bf6113886984cee650fec5402f521d3874 100644 (file)
@@ -1,5 +1,6 @@
 #include <boost/function.hpp>
 #include <boost/any.hpp>
+#include <boost/shared_array.hpp>
 #include <map>
 #include <stdexcept>
 #include <string>
@@ -11,6 +12,13 @@ public:
   {}
 };
 
+
+/** Very simple FD multiplexer, based on callbacks and boost::any parameters
+    As a special service, this parameter is kept around and can be modified, 
+    allowing for state to be stored inside the multiplexer.
+
+    It has some "interesting" semantics
+*/
 class FDMultiplexer
 {
 protected:
@@ -29,54 +37,39 @@ public:
 
   virtual int run(struct timeval* tv=0) = 0;
 
+  //! Add an fd to the read watch list - currently an fd can only be on one list at a time!
   virtual void addReadFD(int fd, callbackfunc_t toDo, boost::any parameter=boost::any())
   {
-    this->addFD(d_inrun ? d_newReadCallbacks : d_readCallbacks, fd, toDo, parameter);
+    this->addFD(d_readCallbacks, fd, toDo, parameter);
   }
 
+  //! Add an fd to the write watch list - currently an fd can only be on one list at a time!
   virtual void addWriteFD(int fd, callbackfunc_t toDo, boost::any parameter=boost::any())
   {
-    this->addFD(d_inrun ? d_newWriteCallbacks : d_writeCallbacks, fd, toDo, parameter);
+    this->addFD(d_writeCallbacks, fd, toDo, parameter);
   }
 
+  //! Remove an fd from the read watch list. You can't call this function on an fd that is closed already!
   virtual void removeReadFD(int fd)
   {
-    this->removeFD(d_inrun ? d_newReadCallbacks : d_readCallbacks, fd);
-  }
-  virtual void removeWriteFD(int fd)
-  {
-    this->removeFD(d_inrun ? d_newWriteCallbacks : d_writeCallbacks, fd);
+    this->removeFD(d_readCallbacks, fd);
   }
 
-  virtual boost::any& getReadParameter(int fd) 
+  //! Remove an fd from the write watch list. You can't call this function on an fd that is closed already!
+  virtual void removeWriteFD(int fd)
   {
-    return d_readCallbacks[fd].d_parameter;
+    this->removeFD(d_writeCallbacks, fd);
   }
 
 protected:
   typedef std::map<int, Callback> callbackmap_t;
   callbackmap_t d_readCallbacks, d_writeCallbacks;
-  callbackmap_t d_newReadCallbacks, d_newWriteCallbacks;
 
   virtual void addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, boost::any parameter)=0;
   virtual void removeFD(callbackmap_t& cbmap, int fd)=0;
   bool d_inrun;
+  callbackmap_t::iterator d_iter;
 
 };
 
-class SelectFDMultiplexer : public FDMultiplexer
-{
-public:
-  SelectFDMultiplexer()
-  {}
-  virtual ~SelectFDMultiplexer()
-  {}
-
-  virtual int run(struct timeval* tv=0);
-
-  virtual void addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, boost::any parameter);
-  virtual void removeFD(callbackmap_t& cbmap, int fd);
-
-private:
-};
-
+FDMultiplexer* getMultiplexer();
index 8d22c1c0bc2b68a80af7c34041547ab90a3b7d97..c8a1d484228d6681c4b85526847bf87f3dcdd0c5 100644 (file)
@@ -4,16 +4,38 @@
 #include <unistd.h>
 #include "misc.hh"
 #include <boost/lexical_cast.hpp>
+#include "syncres.hh"
 
 using namespace boost;
-
 using namespace std;
 
+class SelectFDMultiplexer : public FDMultiplexer
+{
+public:
+  SelectFDMultiplexer()
+  {}
+  virtual ~SelectFDMultiplexer()
+  {}
+
+  virtual int run(struct timeval* tv=0);
+
+  virtual void addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, boost::any parameter);
+  virtual void removeFD(callbackmap_t& cbmap, int fd);
+};
+
+
+FDMultiplexer* getMultiplexer()
+{
+  return new SelectFDMultiplexer();
+}
+
+
 void SelectFDMultiplexer::addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, boost::any parameter)
 {
   Callback cb;
   cb.d_callback=toDo;
   cb.d_parameter=parameter;
+
   if(cbmap.count(fd))
     throw FDMultiplexerException("Tried to add fd "+lexical_cast<string>(fd)+ " to multiplexer twice");
   cbmap[fd]=cb;
@@ -21,13 +43,18 @@ void SelectFDMultiplexer::addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toD
 
 void SelectFDMultiplexer::removeFD(callbackmap_t& cbmap, int fd)
 {
+  if(d_inrun && d_iter->first==fd)  // trying to remove us!
+    d_iter++;
+
   if(!cbmap.erase(fd))
     throw FDMultiplexerException("Tried to remove unlisted fd "+lexical_cast<string>(fd)+ " from multiplexer");
 }
 
-
 int SelectFDMultiplexer::run(struct timeval* now)
 {
+  if(d_inrun) {
+    throw FDMultiplexerException("FDMultiplexer::run() is not reentrant!\n");
+  }
   fd_set readfds, writefds;
   FD_ZERO(&readfds);
   FD_ZERO(&writefds);
@@ -38,6 +65,12 @@ int SelectFDMultiplexer::run(struct timeval* now)
     FD_SET(i->first, &readfds);
     fdmax=max(i->first, fdmax);
   }
+
+  for(callbackmap_t::const_iterator i=d_writeCallbacks.begin(); i != d_writeCallbacks.end(); ++i) {
+    FD_SET(i->first, &writefds);
+    fdmax=max(i->first, fdmax);
+  }
+
   
   struct timeval tv={0,500000};
   int ret=select(fdmax + 1, &readfds, &writefds, 0, &tv);
@@ -50,27 +83,30 @@ int SelectFDMultiplexer::run(struct timeval* now)
   if(ret==0) // nothing
     return 0;
 
+  d_iter=d_readCallbacks.end();
   d_inrun=true;
-  d_newReadCallbacks=d_readCallbacks;
-  d_newWriteCallbacks=d_writeCallbacks;
+  
+  for(callbackmap_t::iterator i=d_readCallbacks.begin(); i != d_readCallbacks.end() && i->first <= fdmax; ) {
+    d_iter=i++;
 
-  for(callbackmap_t::iterator i=d_readCallbacks.begin(); i != d_readCallbacks.end(); ++i) {
-    if(FD_ISSET(i->first, &readfds))
-      i->second.d_callback(i->first, i->second.d_parameter);
-  }
-  for(callbackmap_t::iterator i=d_writeCallbacks.begin(); i != d_writeCallbacks.end(); ++i) {
-    if(FD_ISSET(i->first, &writefds))
-      i->second.d_callback(i->first, i->second.d_parameter);
+    if(FD_ISSET(d_iter->first, &readfds)) {
+      d_iter->second.d_callback(d_iter->first, d_iter->second.d_parameter);
+    }
   }
 
-  d_readCallbacks.swap(d_newReadCallbacks);
-  d_writeCallbacks.swap(d_newWriteCallbacks);
+  for(callbackmap_t::iterator i=d_writeCallbacks.begin(); i != d_writeCallbacks.end() && i->first <= fdmax; ) {
+    d_iter=i++;
+    if(FD_ISSET(d_iter->first, &writefds)) {
+      d_iter->second.d_callback(d_iter->first, d_iter->second.d_parameter);
+    }
+  }
 
   d_inrun=false;
-  
   return 0;
 }
 
+#if 0
+
 void acceptData(int fd, boost::any& parameter)
 {
   cout<<"Have data on fd "<<fd<<endl;
@@ -81,7 +117,7 @@ void acceptData(int fd, boost::any& parameter)
   cout<<"Received "<<packet.size()<<" bytes!\n";
 }
 
-#if 0
+
 int main()
 {
   Socket s(InterNetwork, Datagram);