From 27ae2e3c3586b4c29ca135635a54e82a074fccdc Mon Sep 17 00:00:00 2001 From: Remi Gacogne Date: Mon, 4 Mar 2019 11:32:23 +0100 Subject: [PATCH] mplexer: Make it possible to set the read TTD right away --- pdns/devpollmplexer.cc | 6 +++--- pdns/epollmplexer.cc | 6 +++--- pdns/kqueuemplexer.cc | 6 +++--- pdns/mplexer.hh | 46 ++++++++++++++++++++++++++++-------------- pdns/pdns_recursor.cc | 14 +++++++------ pdns/pollmplexer.cc | 12 +++-------- pdns/portsmplexer.cc | 6 +++--- 7 files changed, 54 insertions(+), 42 deletions(-) diff --git a/pdns/devpollmplexer.cc b/pdns/devpollmplexer.cc index f9234965a..8b5d531f1 100644 --- a/pdns/devpollmplexer.cc +++ b/pdns/devpollmplexer.cc @@ -49,7 +49,7 @@ public: virtual int run(struct timeval* tv, int timeout=500) override; virtual void getAvailableFDs(std::vector& fds, int timeout) override; - virtual void addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, const funcparam_t& parameter) override; + virtual void addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, const funcparam_t& parameter, const struct timeval* ttd=nullptr) override; virtual void removeFD(callbackmap_t& cbmap, int fd) override; string getName() const override { @@ -82,9 +82,9 @@ DevPollFDMultiplexer::DevPollFDMultiplexer() } -void DevPollFDMultiplexer::addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, const funcparam_t& parameter) +void DevPollFDMultiplexer::addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, const funcparam_t& parameter, const struct timeval* ttd) { - accountingAddFD(cbmap, fd, toDo, parameter); + accountingAddFD(cbmap, fd, toDo, parameter, ttd); struct pollfd devent; devent.fd=fd; diff --git a/pdns/epollmplexer.cc b/pdns/epollmplexer.cc index 97d7e82ff..983c8d708 100644 --- a/pdns/epollmplexer.cc +++ b/pdns/epollmplexer.cc @@ -45,7 +45,7 @@ public: virtual int run(struct timeval* tv, int timeout=500) override; virtual void getAvailableFDs(std::vector& fds, int timeout) override; - virtual void addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, const funcparam_t& parameter) override; + virtual void addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, const funcparam_t& parameter, const struct timeval* ttd=nullptr) override; virtual void removeFD(callbackmap_t& cbmap, int fd) override; string getName() const override { @@ -94,9 +94,9 @@ EpollFDMultiplexer::EpollFDMultiplexer() : d_eevents(new epoll_event[s_maxevents } -void EpollFDMultiplexer::addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, const funcparam_t& parameter) +void EpollFDMultiplexer::addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, const funcparam_t& parameter, const struct timeval* ttd) { - accountingAddFD(cbmap, fd, toDo, parameter); + accountingAddFD(cbmap, fd, toDo, parameter, ttd); struct epoll_event eevent; diff --git a/pdns/kqueuemplexer.cc b/pdns/kqueuemplexer.cc index 44d3f4673..5338c1ec3 100644 --- a/pdns/kqueuemplexer.cc +++ b/pdns/kqueuemplexer.cc @@ -47,7 +47,7 @@ public: virtual int run(struct timeval* tv, int timeout=500) override; virtual void getAvailableFDs(std::vector& fds, int timeout) override; - virtual void addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, const boost::any& parameter) override; + virtual void addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, const boost::any& parameter, const struct timeval* ttd=nullptr) override; virtual void removeFD(callbackmap_t& cbmap, int fd) override; string getName() const override { @@ -80,9 +80,9 @@ KqueueFDMultiplexer::KqueueFDMultiplexer() : d_kevents(new struct kevent[s_maxev throw FDMultiplexerException("Setting up kqueue: "+stringerror()); } -void KqueueFDMultiplexer::addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, const boost::any& parameter) +void KqueueFDMultiplexer::addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, const boost::any& parameter, const struct timeval* ttd) { - accountingAddFD(cbmap, fd, toDo, parameter); + accountingAddFD(cbmap, fd, toDo, parameter, ttd); struct kevent kqevent; EV_SET(&kqevent, fd, (&cbmap == &d_readCallbacks) ? EVFILT_READ : EVFILT_WRITE, EV_ADD, 0,0,0); diff --git a/pdns/mplexer.hh b/pdns/mplexer.hh index d70143d46..c28a1f218 100644 --- a/pdns/mplexer.hh +++ b/pdns/mplexer.hh @@ -77,9 +77,9 @@ public: virtual void getAvailableFDs(std::vector& fds, int timeout) = 0; //! Add an fd to the read watch list - currently an fd can only be on one list at a time! - virtual void addReadFD(int fd, callbackfunc_t toDo, const funcparam_t& parameter=funcparam_t()) + virtual void addReadFD(int fd, callbackfunc_t toDo, const funcparam_t& parameter=funcparam_t(), const struct timeval* ttd=nullptr) { - this->addFD(d_readCallbacks, fd, toDo, parameter); + this->addFD(d_readCallbacks, fd, toDo, parameter, ttd); } //! Add an fd to the write watch list - currently an fd can only be on one list at a time! @@ -104,25 +104,36 @@ public: virtual void setReadTTD(int fd, struct timeval tv, int timeout) { - if(!d_readCallbacks.count(fd)) + const auto& it = d_readCallbacks.find(fd); + if (it == d_readCallbacks.end()) { throw FDMultiplexerException("attempt to timestamp fd not in the multiplexer"); + } + tv.tv_sec += timeout; - d_readCallbacks[fd].d_ttd=tv; + it->second.d_ttd = tv; } virtual funcparam_t& getReadParameter(int fd) { - if(!d_readCallbacks.count(fd)) + const auto& it = d_readCallbacks.find(fd); + if(it == d_readCallbacks.end()) { throw FDMultiplexerException("attempt to look up data in multiplexer for unlisted fd "+std::to_string(fd)); - return d_readCallbacks[fd].d_parameter; + } + + return it->second.d_parameter; } virtual std::vector > getTimeouts(const struct timeval& tv) { + const auto tied = boost::tie(tv.tv_sec, tv.tv_usec); std::vector > ret; - for(callbackmap_t::iterator i=d_readCallbacks.begin(); i!=d_readCallbacks.end(); ++i) - if(i->second.d_ttd.tv_sec && boost::tie(tv.tv_sec, tv.tv_usec) > boost::tie(i->second.d_ttd.tv_sec, i->second.d_ttd.tv_usec)) - ret.push_back(std::make_pair(i->first, i->second.d_parameter)); + + for(const auto& entry : d_readCallbacks) { + if(entry.second.d_ttd.tv_sec && tied > boost::tie(entry.second.d_ttd.tv_sec, entry.second.d_ttd.tv_usec)) { + ret.push_back(std::make_pair(entry.first, entry.second.d_parameter)); + } + } + return ret; } @@ -141,27 +152,32 @@ protected: typedef std::map callbackmap_t; callbackmap_t d_readCallbacks, d_writeCallbacks; - virtual void addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, const funcparam_t& parameter)=0; + virtual void addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, const funcparam_t& parameter, const struct timeval* ttd=nullptr)=0; virtual void removeFD(callbackmap_t& cbmap, int fd)=0; bool d_inrun; callbackmap_t::iterator d_iter; - void accountingAddFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, const funcparam_t& parameter) + void accountingAddFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, const funcparam_t& parameter, const struct timeval* ttd=nullptr) { Callback cb; cb.d_callback=toDo; cb.d_parameter=parameter; memset(&cb.d_ttd, 0, sizeof(cb.d_ttd)); - - if(cbmap.count(fd)) + if (ttd) { + cb.d_ttd = *ttd; + } + + auto pair = cbmap.insert({fd, cb}); + if (!pair.second) { throw FDMultiplexerException("Tried to add fd "+std::to_string(fd)+ " to multiplexer twice"); - cbmap[fd]=cb; + } } void accountingRemoveFD(callbackmap_t& cbmap, int fd) { - if(!cbmap.erase(fd)) + if(!cbmap.erase(fd)) { throw FDMultiplexerException("Tried to remove unlisted fd "+std::to_string(fd)+ " from multiplexer"); + } } }; diff --git a/pdns/pdns_recursor.cc b/pdns/pdns_recursor.cc index ae37f9da8..60c58a8f3 100644 --- a/pdns/pdns_recursor.cc +++ b/pdns/pdns_recursor.cc @@ -1629,8 +1629,10 @@ static void startDoResolve(void *p) else { dc->d_tcpConnection->state=TCPConnection::BYTE0; Utility::gettimeofday(&g_now, 0); // needs to be updated - t_fdm->addReadFD(dc->d_socket, handleRunningTCPQuestion, dc->d_tcpConnection); - t_fdm->setReadTTD(dc->d_socket, g_now, g_tcpTimeout); + struct timeval ttd = g_now; + ttd.tv_sec += g_tcpTimeout; + + t_fdm->addReadFD(dc->d_socket, handleRunningTCPQuestion, dc->d_tcpConnection, &ttd); } } } @@ -2050,11 +2052,11 @@ static void handleNewTCPQuestion(int fd, FDMultiplexer::funcparam_t& ) std::shared_ptr tc = std::make_shared(newsock, addr); tc->state=TCPConnection::BYTE0; - t_fdm->addReadFD(tc->getFD(), handleRunningTCPQuestion, tc); + struct timeval ttd; + Utility::gettimeofday(&ttd, 0); + ttd.tv_sec += g_tcpTimeout; - struct timeval now; - Utility::gettimeofday(&now, 0); - t_fdm->setReadTTD(tc->getFD(), now, g_tcpTimeout); + t_fdm->addReadFD(tc->getFD(), handleRunningTCPQuestion, tc, &ttd); } } diff --git a/pdns/pollmplexer.cc b/pdns/pollmplexer.cc index 312c4bfec..be1d0c9f6 100644 --- a/pdns/pollmplexer.cc +++ b/pdns/pollmplexer.cc @@ -37,7 +37,7 @@ public: virtual int run(struct timeval* tv, int timeout=500) override; virtual void getAvailableFDs(std::vector& fds, int timeout) override; - virtual void addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, const funcparam_t& parameter) override; + virtual void addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, const funcparam_t& parameter, const struct timeval* ttd=nullptr) override; virtual void removeFD(callbackmap_t& cbmap, int fd) override; string getName() const override @@ -60,15 +60,9 @@ static struct RegisterOurselves } } doIt; -void PollFDMultiplexer::addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, const boost::any& parameter) +void PollFDMultiplexer::addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, const boost::any& parameter, const struct timeval* ttd) { - Callback cb; - cb.d_callback=toDo; - cb.d_parameter=parameter; - memset(&cb.d_ttd, 0, sizeof(cb.d_ttd)); - if(cbmap.count(fd)) - throw FDMultiplexerException("Tried to add fd "+std::to_string(fd)+ " to multiplexer twice"); - cbmap[fd]=cb; + accountingAddFD(cbmap, fd, toDo, parameter, ttd); } void PollFDMultiplexer::removeFD(callbackmap_t& cbmap, int fd) diff --git a/pdns/portsmplexer.cc b/pdns/portsmplexer.cc index c6e91e60f..8aaf71bc5 100644 --- a/pdns/portsmplexer.cc +++ b/pdns/portsmplexer.cc @@ -25,7 +25,7 @@ public: virtual int run(struct timeval* tv, int timeout=500); - virtual void addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, const boost::any& parameter); + virtual void addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, const boost::any& parameter, const struct timeval* ttd=nullptr); virtual void removeFD(callbackmap_t& cbmap, int fd); string getName() { @@ -59,9 +59,9 @@ PortsFDMultiplexer::PortsFDMultiplexer() : d_pevents(new port_event_t[s_maxevent throw FDMultiplexerException("Setting up port: "+stringerror()); } -void PortsFDMultiplexer::addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, const boost::any& parameter) +void PortsFDMultiplexer::addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, const boost::any& parameter, const struct timeval* ttd) { - accountingAddFD(cbmap, fd, toDo, parameter); + accountingAddFD(cbmap, fd, toDo, parameter, ttd); if(port_associate(d_portfd, PORT_SOURCE_FD, fd, (&cbmap == &d_readCallbacks) ? POLLIN : POLLOUT, 0) < 0) { cbmap.erase(fd); -- 2.40.0