--- /dev/null
+#include "mplexer.hh"
+#include "sstuff.hh"
+#include <iostream>
+#include <unistd.h>
+#include "misc.hh"
+#include <boost/lexical_cast.hpp>
+#include "syncres.hh"
+#include <sys/epoll.h>
+
+using namespace boost;
+using namespace std;
+
+#include <sys/epoll.h>
+
+
+class EpollFDMultiplexer : public FDMultiplexer
+{
+public:
+ EpollFDMultiplexer();
+ virtual ~EpollFDMultiplexer()
+ {
+ close(d_epollfd);
+ }
+
+ virtual int run(struct timeval* tv=0);
+
+ virtual void addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, boost::any parameter);
+ virtual void removeFD(callbackmap_t& cbmap, int fd);
+private:
+ int d_epollfd;
+ boost::shared_array<epoll_event> d_eevents;
+ static int s_maxevents; // not a hard maximum
+};
+
+FDMultiplexer* getMultiplexer()
+{
+ return new EpollFDMultiplexer();
+}
+
+
+int EpollFDMultiplexer::s_maxevents=1024;
+EpollFDMultiplexer::EpollFDMultiplexer() : d_eevents(new epoll_event[s_maxevents])
+{
+ d_epollfd=epoll_create(s_maxevents); // not hard max
+ if(d_epollfd < 0)
+ throw FDMultiplexerException("Setting up epoll: "+stringerror());
+}
+
+void EpollFDMultiplexer::addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, boost::any parameter)
+{
+ Callback cb;
+ cb.d_callback=toDo;
+ cb.d_parameter=parameter;
+
+ if(cbmap.count(fd))
+ throw FDMultiplexerException("Tried to add fd "+lexical_cast<string>(fd)+ " to multiplexer twice");
+ struct epoll_event eevent;
+
+ eevent.events = (&cbmap == &d_readCallbacks) ? EPOLLIN : EPOLLOUT;
+
+ eevent.data.u64=0; // placate valgrind (I love it so much)
+ eevent.data.fd=fd;
+
+ if(epoll_ctl(d_epollfd, EPOLL_CTL_ADD, fd, &eevent) < 0)
+ throw FDMultiplexerException("Adding fd to epoll set: "+stringerror());
+
+ cbmap[fd]=cb;
+}
+
+void EpollFDMultiplexer::removeFD(callbackmap_t& cbmap, int fd)
+{
+ if(d_inrun && d_iter->first==fd) // trying to remove us!
+ d_iter++;
+
+ if(!cbmap.erase(fd))
+ throw FDMultiplexerException("Tried to remove unlisted fd "+lexical_cast<string>(fd)+ " from multiplexer");
+
+ if(epoll_ctl(d_epollfd, EPOLL_CTL_DEL, fd, 0) < 0)
+ throw FDMultiplexerException("Removing fd from epoll set: "+stringerror());
+}
+
+int EpollFDMultiplexer::run(struct timeval* now)
+{
+ if(d_inrun) {
+ throw FDMultiplexerException("FDMultiplexer::run() is not reentrant!\n");
+ }
+
+ int ret=epoll_wait(d_epollfd, d_eevents.get(), s_maxevents, 500);
+ if(now)
+ gettimeofday(now,0);
+
+ if(ret < 0 && errno!=EINTR)
+ throw FDMultiplexerException("select returned error: "+stringerror());
+
+ if(ret==0) // nothing
+ return 0;
+
+ d_inrun=true;
+
+ for(int n=0; n < ret; ++n) {
+ d_iter=d_readCallbacks.find(d_eevents[n].data.fd);
+
+ if(d_iter != d_readCallbacks.end())
+ d_iter->second.d_callback(d_iter->first, d_iter->second.d_parameter);
+
+ d_iter=d_writeCallbacks.find(d_eevents[n].data.fd);
+
+ if(d_iter != d_writeCallbacks.end())
+ d_iter->second.d_callback(d_iter->first, d_iter->second.d_parameter);
+ }
+
+ d_inrun=false;
+ return 0;
+}
+
+#if 0
+void acceptData(int fd, boost::any& parameter)
+{
+ cout<<"Have data on fd "<<fd<<endl;
+ Socket* sock=boost::any_cast<Socket*>(parameter);
+ string packet;
+ IPEndpoint rem;
+ sock->recvFrom(packet, rem);
+ cout<<"Received "<<packet.size()<<" bytes!\n";
+}
+
+
+int main()
+{
+ Socket s(InterNetwork, Datagram);
+
+ IPEndpoint loc("0.0.0.0", 2000);
+ s.bind(loc);
+
+ EpollFDMultiplexer sfm;
+
+ sfm.addReadFD(s.getHandle(), &acceptData, &s);
+
+ for(int n=0; n < 100 ; ++n) {
+ sfm.run();
+ }
+ sfm.removeReadFD(s.getHandle());
+ sfm.removeReadFD(s.getHandle());
+}
+#endif
+
+
#include <boost/function.hpp>
#include <boost/any.hpp>
+#include <boost/shared_array.hpp>
#include <map>
#include <stdexcept>
#include <string>
{}
};
+
+/** Very simple FD multiplexer, based on callbacks and boost::any parameters
+ As a special service, this parameter is kept around and can be modified,
+ allowing for state to be stored inside the multiplexer.
+
+ It has some "interesting" semantics
+*/
class FDMultiplexer
{
protected:
virtual int run(struct timeval* tv=0) = 0;
+ //! Add an fd to the read watch list - currently an fd can only be on one list at a time!
virtual void addReadFD(int fd, callbackfunc_t toDo, boost::any parameter=boost::any())
{
- this->addFD(d_inrun ? d_newReadCallbacks : d_readCallbacks, fd, toDo, parameter);
+ this->addFD(d_readCallbacks, fd, toDo, parameter);
}
+ //! Add an fd to the write watch list - currently an fd can only be on one list at a time!
virtual void addWriteFD(int fd, callbackfunc_t toDo, boost::any parameter=boost::any())
{
- this->addFD(d_inrun ? d_newWriteCallbacks : d_writeCallbacks, fd, toDo, parameter);
+ this->addFD(d_writeCallbacks, fd, toDo, parameter);
}
+ //! Remove an fd from the read watch list. You can't call this function on an fd that is closed already!
virtual void removeReadFD(int fd)
{
- this->removeFD(d_inrun ? d_newReadCallbacks : d_readCallbacks, fd);
- }
- virtual void removeWriteFD(int fd)
- {
- this->removeFD(d_inrun ? d_newWriteCallbacks : d_writeCallbacks, fd);
+ this->removeFD(d_readCallbacks, fd);
}
- virtual boost::any& getReadParameter(int fd)
+ //! Remove an fd from the write watch list. You can't call this function on an fd that is closed already!
+ virtual void removeWriteFD(int fd)
{
- return d_readCallbacks[fd].d_parameter;
+ this->removeFD(d_writeCallbacks, fd);
}
protected:
typedef std::map<int, Callback> callbackmap_t;
callbackmap_t d_readCallbacks, d_writeCallbacks;
- callbackmap_t d_newReadCallbacks, d_newWriteCallbacks;
virtual void addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, boost::any parameter)=0;
virtual void removeFD(callbackmap_t& cbmap, int fd)=0;
bool d_inrun;
+ callbackmap_t::iterator d_iter;
};
-class SelectFDMultiplexer : public FDMultiplexer
-{
-public:
- SelectFDMultiplexer()
- {}
- virtual ~SelectFDMultiplexer()
- {}
-
- virtual int run(struct timeval* tv=0);
-
- virtual void addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, boost::any parameter);
- virtual void removeFD(callbackmap_t& cbmap, int fd);
-
-private:
-};
-
+FDMultiplexer* getMultiplexer();
#include <unistd.h>
#include "misc.hh"
#include <boost/lexical_cast.hpp>
+#include "syncres.hh"
using namespace boost;
-
using namespace std;
+class SelectFDMultiplexer : public FDMultiplexer
+{
+public:
+ SelectFDMultiplexer()
+ {}
+ virtual ~SelectFDMultiplexer()
+ {}
+
+ virtual int run(struct timeval* tv=0);
+
+ virtual void addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, boost::any parameter);
+ virtual void removeFD(callbackmap_t& cbmap, int fd);
+};
+
+
+FDMultiplexer* getMultiplexer()
+{
+ return new SelectFDMultiplexer();
+}
+
+
void SelectFDMultiplexer::addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, boost::any parameter)
{
Callback cb;
cb.d_callback=toDo;
cb.d_parameter=parameter;
+
if(cbmap.count(fd))
throw FDMultiplexerException("Tried to add fd "+lexical_cast<string>(fd)+ " to multiplexer twice");
cbmap[fd]=cb;
void SelectFDMultiplexer::removeFD(callbackmap_t& cbmap, int fd)
{
+ if(d_inrun && d_iter->first==fd) // trying to remove us!
+ d_iter++;
+
if(!cbmap.erase(fd))
throw FDMultiplexerException("Tried to remove unlisted fd "+lexical_cast<string>(fd)+ " from multiplexer");
}
-
int SelectFDMultiplexer::run(struct timeval* now)
{
+ if(d_inrun) {
+ throw FDMultiplexerException("FDMultiplexer::run() is not reentrant!\n");
+ }
fd_set readfds, writefds;
FD_ZERO(&readfds);
FD_ZERO(&writefds);
FD_SET(i->first, &readfds);
fdmax=max(i->first, fdmax);
}
+
+ for(callbackmap_t::const_iterator i=d_writeCallbacks.begin(); i != d_writeCallbacks.end(); ++i) {
+ FD_SET(i->first, &writefds);
+ fdmax=max(i->first, fdmax);
+ }
+
struct timeval tv={0,500000};
int ret=select(fdmax + 1, &readfds, &writefds, 0, &tv);
if(ret==0) // nothing
return 0;
+ d_iter=d_readCallbacks.end();
d_inrun=true;
- d_newReadCallbacks=d_readCallbacks;
- d_newWriteCallbacks=d_writeCallbacks;
+
+ for(callbackmap_t::iterator i=d_readCallbacks.begin(); i != d_readCallbacks.end() && i->first <= fdmax; ) {
+ d_iter=i++;
- for(callbackmap_t::iterator i=d_readCallbacks.begin(); i != d_readCallbacks.end(); ++i) {
- if(FD_ISSET(i->first, &readfds))
- i->second.d_callback(i->first, i->second.d_parameter);
- }
- for(callbackmap_t::iterator i=d_writeCallbacks.begin(); i != d_writeCallbacks.end(); ++i) {
- if(FD_ISSET(i->first, &writefds))
- i->second.d_callback(i->first, i->second.d_parameter);
+ if(FD_ISSET(d_iter->first, &readfds)) {
+ d_iter->second.d_callback(d_iter->first, d_iter->second.d_parameter);
+ }
}
- d_readCallbacks.swap(d_newReadCallbacks);
- d_writeCallbacks.swap(d_newWriteCallbacks);
+ for(callbackmap_t::iterator i=d_writeCallbacks.begin(); i != d_writeCallbacks.end() && i->first <= fdmax; ) {
+ d_iter=i++;
+ if(FD_ISSET(d_iter->first, &writefds)) {
+ d_iter->second.d_callback(d_iter->first, d_iter->second.d_parameter);
+ }
+ }
d_inrun=false;
-
return 0;
}
+#if 0
+
void acceptData(int fd, boost::any& parameter)
{
cout<<"Have data on fd "<<fd<<endl;
cout<<"Received "<<packet.size()<<" bytes!\n";
}
-#if 0
+
int main()
{
Socket s(InterNetwork, Datagram);