virtual int run(struct timeval* tv, int timeout=500) override;
virtual void getAvailableFDs(std::vector<int>& fds, int timeout) override;
- virtual void addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, const funcparam_t& parameter) override;
+ virtual void addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, const funcparam_t& parameter, const struct timeval* ttd=nullptr) override;
virtual void removeFD(callbackmap_t& cbmap, int fd) override;
string getName() const override
{
}
-void DevPollFDMultiplexer::addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, const funcparam_t& parameter)
+void DevPollFDMultiplexer::addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, const funcparam_t& parameter, const struct timeval* ttd)
{
- accountingAddFD(cbmap, fd, toDo, parameter);
+ accountingAddFD(cbmap, fd, toDo, parameter, ttd);
struct pollfd devent;
devent.fd=fd;
virtual int run(struct timeval* tv, int timeout=500) override;
virtual void getAvailableFDs(std::vector<int>& fds, int timeout) override;
- virtual void addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, const funcparam_t& parameter) override;
+ virtual void addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, const funcparam_t& parameter, const struct timeval* ttd=nullptr) override;
virtual void removeFD(callbackmap_t& cbmap, int fd) override;
string getName() const override
{
}
-void EpollFDMultiplexer::addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, const funcparam_t& parameter)
+void EpollFDMultiplexer::addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, const funcparam_t& parameter, const struct timeval* ttd)
{
- accountingAddFD(cbmap, fd, toDo, parameter);
+ accountingAddFD(cbmap, fd, toDo, parameter, ttd);
struct epoll_event eevent;
virtual int run(struct timeval* tv, int timeout=500) override;
virtual void getAvailableFDs(std::vector<int>& fds, int timeout) override;
- virtual void addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, const boost::any& parameter) override;
+ virtual void addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, const boost::any& parameter, const struct timeval* ttd=nullptr) override;
virtual void removeFD(callbackmap_t& cbmap, int fd) override;
string getName() const override
{
throw FDMultiplexerException("Setting up kqueue: "+stringerror());
}
-void KqueueFDMultiplexer::addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, const boost::any& parameter)
+void KqueueFDMultiplexer::addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, const boost::any& parameter, const struct timeval* ttd)
{
- accountingAddFD(cbmap, fd, toDo, parameter);
+ accountingAddFD(cbmap, fd, toDo, parameter, ttd);
struct kevent kqevent;
EV_SET(&kqevent, fd, (&cbmap == &d_readCallbacks) ? EVFILT_READ : EVFILT_WRITE, EV_ADD, 0,0,0);
virtual void getAvailableFDs(std::vector<int>& fds, int timeout) = 0;
//! Add an fd to the read watch list - currently an fd can only be on one list at a time!
- virtual void addReadFD(int fd, callbackfunc_t toDo, const funcparam_t& parameter=funcparam_t())
+ virtual void addReadFD(int fd, callbackfunc_t toDo, const funcparam_t& parameter=funcparam_t(), const struct timeval* ttd=nullptr)
{
- this->addFD(d_readCallbacks, fd, toDo, parameter);
+ this->addFD(d_readCallbacks, fd, toDo, parameter, ttd);
}
//! Add an fd to the write watch list - currently an fd can only be on one list at a time!
virtual void setReadTTD(int fd, struct timeval tv, int timeout)
{
- if(!d_readCallbacks.count(fd))
+ const auto& it = d_readCallbacks.find(fd);
+ if (it == d_readCallbacks.end()) {
throw FDMultiplexerException("attempt to timestamp fd not in the multiplexer");
+ }
+
tv.tv_sec += timeout;
- d_readCallbacks[fd].d_ttd=tv;
+ it->second.d_ttd = tv;
}
virtual funcparam_t& getReadParameter(int fd)
{
- if(!d_readCallbacks.count(fd))
+ const auto& it = d_readCallbacks.find(fd);
+ if(it == d_readCallbacks.end()) {
throw FDMultiplexerException("attempt to look up data in multiplexer for unlisted fd "+std::to_string(fd));
- return d_readCallbacks[fd].d_parameter;
+ }
+
+ return it->second.d_parameter;
}
virtual std::vector<std::pair<int, funcparam_t> > getTimeouts(const struct timeval& tv)
{
+ const auto tied = boost::tie(tv.tv_sec, tv.tv_usec);
std::vector<std::pair<int, funcparam_t> > ret;
- for(callbackmap_t::iterator i=d_readCallbacks.begin(); i!=d_readCallbacks.end(); ++i)
- if(i->second.d_ttd.tv_sec && boost::tie(tv.tv_sec, tv.tv_usec) > boost::tie(i->second.d_ttd.tv_sec, i->second.d_ttd.tv_usec))
- ret.push_back(std::make_pair(i->first, i->second.d_parameter));
+
+ for(const auto& entry : d_readCallbacks) {
+ if(entry.second.d_ttd.tv_sec && tied > boost::tie(entry.second.d_ttd.tv_sec, entry.second.d_ttd.tv_usec)) {
+ ret.push_back(std::make_pair(entry.first, entry.second.d_parameter));
+ }
+ }
+
return ret;
}
typedef std::map<int, Callback> callbackmap_t;
callbackmap_t d_readCallbacks, d_writeCallbacks;
- virtual void addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, const funcparam_t& parameter)=0;
+ virtual void addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, const funcparam_t& parameter, const struct timeval* ttd=nullptr)=0;
virtual void removeFD(callbackmap_t& cbmap, int fd)=0;
bool d_inrun;
callbackmap_t::iterator d_iter;
- void accountingAddFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, const funcparam_t& parameter)
+ void accountingAddFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, const funcparam_t& parameter, const struct timeval* ttd=nullptr)
{
Callback cb;
cb.d_callback=toDo;
cb.d_parameter=parameter;
memset(&cb.d_ttd, 0, sizeof(cb.d_ttd));
-
- if(cbmap.count(fd))
+ if (ttd) {
+ cb.d_ttd = *ttd;
+ }
+
+ auto pair = cbmap.insert({fd, cb});
+ if (!pair.second) {
throw FDMultiplexerException("Tried to add fd "+std::to_string(fd)+ " to multiplexer twice");
- cbmap[fd]=cb;
+ }
}
void accountingRemoveFD(callbackmap_t& cbmap, int fd)
{
- if(!cbmap.erase(fd))
+ if(!cbmap.erase(fd)) {
throw FDMultiplexerException("Tried to remove unlisted fd "+std::to_string(fd)+ " from multiplexer");
+ }
}
};
else {
dc->d_tcpConnection->state=TCPConnection::BYTE0;
Utility::gettimeofday(&g_now, 0); // needs to be updated
- t_fdm->addReadFD(dc->d_socket, handleRunningTCPQuestion, dc->d_tcpConnection);
- t_fdm->setReadTTD(dc->d_socket, g_now, g_tcpTimeout);
+ struct timeval ttd = g_now;
+ ttd.tv_sec += g_tcpTimeout;
+
+ t_fdm->addReadFD(dc->d_socket, handleRunningTCPQuestion, dc->d_tcpConnection, &ttd);
}
}
}
std::shared_ptr<TCPConnection> tc = std::make_shared<TCPConnection>(newsock, addr);
tc->state=TCPConnection::BYTE0;
- t_fdm->addReadFD(tc->getFD(), handleRunningTCPQuestion, tc);
+ struct timeval ttd;
+ Utility::gettimeofday(&ttd, 0);
+ ttd.tv_sec += g_tcpTimeout;
- struct timeval now;
- Utility::gettimeofday(&now, 0);
- t_fdm->setReadTTD(tc->getFD(), now, g_tcpTimeout);
+ t_fdm->addReadFD(tc->getFD(), handleRunningTCPQuestion, tc, &ttd);
}
}
virtual int run(struct timeval* tv, int timeout=500) override;
virtual void getAvailableFDs(std::vector<int>& fds, int timeout) override;
- virtual void addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, const funcparam_t& parameter) override;
+ virtual void addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, const funcparam_t& parameter, const struct timeval* ttd=nullptr) override;
virtual void removeFD(callbackmap_t& cbmap, int fd) override;
string getName() const override
}
} doIt;
-void PollFDMultiplexer::addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, const boost::any& parameter)
+void PollFDMultiplexer::addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, const boost::any& parameter, const struct timeval* ttd)
{
- Callback cb;
- cb.d_callback=toDo;
- cb.d_parameter=parameter;
- memset(&cb.d_ttd, 0, sizeof(cb.d_ttd));
- if(cbmap.count(fd))
- throw FDMultiplexerException("Tried to add fd "+std::to_string(fd)+ " to multiplexer twice");
- cbmap[fd]=cb;
+ accountingAddFD(cbmap, fd, toDo, parameter, ttd);
}
void PollFDMultiplexer::removeFD(callbackmap_t& cbmap, int fd)
virtual int run(struct timeval* tv, int timeout=500);
- virtual void addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, const boost::any& parameter);
+ virtual void addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, const boost::any& parameter, const struct timeval* ttd=nullptr);
virtual void removeFD(callbackmap_t& cbmap, int fd);
string getName()
{
throw FDMultiplexerException("Setting up port: "+stringerror());
}
-void PortsFDMultiplexer::addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, const boost::any& parameter)
+void PortsFDMultiplexer::addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, const boost::any& parameter, const struct timeval* ttd)
{
- accountingAddFD(cbmap, fd, toDo, parameter);
+ accountingAddFD(cbmap, fd, toDo, parameter, ttd);
if(port_associate(d_portfd, PORT_SOURCE_FD, fd, (&cbmap == &d_readCallbacks) ? POLLIN : POLLOUT, 0) < 0) {
cbmap.erase(fd);