From: Remi Gacogne Date: Fri, 22 Mar 2019 14:03:14 +0000 (+0100) Subject: mplexer: Keep TTD ordered so we can scan for timeouts efficiently X-Git-Tag: dnsdist-1.4.0-alpha1~25^2~16 X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=ac3da0c298c9a11d8198e8592508fa525c70271b;p=pdns mplexer: Keep TTD ordered so we can scan for timeouts efficiently --- diff --git a/pdns/devpollmplexer.cc b/pdns/devpollmplexer.cc index 8b5d531f1..35df6dc8f 100644 --- a/pdns/devpollmplexer.cc +++ b/pdns/devpollmplexer.cc @@ -160,13 +160,13 @@ int DevPollFDMultiplexer::run(struct timeval* now, int timeout) d_iter=d_readCallbacks.find(dvp.dp_fds[n].fd); if(d_iter != d_readCallbacks.end()) { - d_iter->second.d_callback(d_iter->first, d_iter->second.d_parameter); + d_iter->d_callback(d_iter->d_fd, d_iter->d_parameter); continue; // so we don't refind ourselves as writable! } d_iter=d_writeCallbacks.find(dvp.dp_fds[n].fd); if(d_iter != d_writeCallbacks.end()) { - d_iter->second.d_callback(d_iter->first, d_iter->second.d_parameter); + d_iter->d_callback(d_iter->d_fd, d_iter->d_parameter); } } delete[] dvp.dp_fds; diff --git a/pdns/epollmplexer.cc b/pdns/epollmplexer.cc index 983c8d708..433687d21 100644 --- a/pdns/epollmplexer.cc +++ b/pdns/epollmplexer.cc @@ -156,13 +156,13 @@ int EpollFDMultiplexer::run(struct timeval* now, int timeout) d_iter=d_readCallbacks.find(d_eevents[n].data.fd); if(d_iter != d_readCallbacks.end()) { - d_iter->second.d_callback(d_iter->first, d_iter->second.d_parameter); + d_iter->d_callback(d_iter->d_fd, d_iter->d_parameter); continue; // so we don't refind ourselves as writable! } d_iter=d_writeCallbacks.find(d_eevents[n].data.fd); if(d_iter != d_writeCallbacks.end()) { - d_iter->second.d_callback(d_iter->first, d_iter->second.d_parameter); + d_iter->d_callback(d_iter->d_fd, d_iter->d_parameter); } } d_inrun=false; diff --git a/pdns/kqueuemplexer.cc b/pdns/kqueuemplexer.cc index 5338c1ec3..42e834257 100644 --- a/pdns/kqueuemplexer.cc +++ b/pdns/kqueuemplexer.cc @@ -144,14 +144,14 @@ int KqueueFDMultiplexer::run(struct timeval* now, int timeout) for(int n=0; n < ret; ++n) { d_iter=d_readCallbacks.find(d_kevents[n].ident); if(d_iter != d_readCallbacks.end()) { - d_iter->second.d_callback(d_iter->first, d_iter->second.d_parameter); + d_iter->d_callback(d_iter->d_fd, d_iter->d_parameter); continue; // so we don't find ourselves as writable again } d_iter=d_writeCallbacks.find(d_kevents[n].ident); if(d_iter != d_writeCallbacks.end()) { - d_iter->second.d_callback(d_iter->first, d_iter->second.d_parameter); + d_iter->d_callback(d_iter->f_fd, d_iter->d_parameter); } } diff --git a/pdns/mplexer.hh b/pdns/mplexer.hh index b42e90092..a008ec7cf 100644 --- a/pdns/mplexer.hh +++ b/pdns/mplexer.hh @@ -26,12 +26,18 @@ #include #include #include +#include +#include +#include +#include #include #include #include #include #include +using namespace ::boost::multi_index; + class FDMultiplexerException : public std::runtime_error { public: @@ -57,8 +63,9 @@ protected: struct Callback { callbackfunc_t d_callback; - funcparam_t d_parameter; + mutable funcparam_t d_parameter; struct timeval d_ttd; + int d_fd; }; public: @@ -109,8 +116,10 @@ public: throw FDMultiplexerException("attempt to timestamp fd not in the multiplexer"); } + auto newEntry = *it; tv.tv_sec += timeout; - it->second.d_ttd = tv; + newEntry.d_ttd = tv; + d_readCallbacks.replace(it, newEntry); } virtual void setWriteTTD(int fd, struct timeval tv, int timeout) @@ -120,29 +129,23 @@ public: throw FDMultiplexerException("attempt to timestamp fd not in the multiplexer"); } + auto newEntry = *it; tv.tv_sec += timeout; - it->second.d_ttd = tv; - } - - virtual funcparam_t& getReadParameter(int fd) - { - const auto& it = d_readCallbacks.find(fd); - if(it == d_readCallbacks.end()) { - throw FDMultiplexerException("attempt to look up data in multiplexer for unlisted fd "+std::to_string(fd)); - } - - return it->second.d_parameter; + newEntry.d_ttd = tv; + d_writeCallbacks.replace(it, newEntry); } virtual std::vector > getTimeouts(const struct timeval& tv, bool writes=false) { - const auto tied = boost::tie(tv.tv_sec, tv.tv_usec); std::vector > ret; + const auto tied = boost::tie(tv.tv_sec, tv.tv_usec); + auto& idx = writes ? d_writeCallbacks.get() : d_readCallbacks.get(); - for(const auto& entry : (writes ? d_writeCallbacks : d_readCallbacks)) { - if(entry.second.d_ttd.tv_sec && tied > boost::tie(entry.second.d_ttd.tv_sec, entry.second.d_ttd.tv_usec)) { - ret.push_back(std::make_pair(entry.first, entry.second.d_parameter)); + for (auto it = idx.begin(); it != idx.end(); ++it) { + if (it->d_ttd.tv_sec == 0 || tied <= boost::tie(it->d_ttd.tv_sec, it->d_ttd.tv_usec)) { + break; } + ret.push_back(std::make_pair(it->d_fd, it->d_parameter)); } return ret; @@ -160,7 +163,42 @@ public: virtual std::string getName() const = 0; protected: - typedef std::map callbackmap_t; + struct FDBasedTag {}; + struct TTDOrderedTag {}; + struct ttd_compare + { + /* we want a 0 TTD (no timeout) to come _after_ everything else */ + bool operator() (const struct timeval& lhs, const struct timeval& rhs) const + { + /* special treatment if at least one of the TTD is 0, + normal comparison otherwise */ + if (lhs.tv_sec == 0 && rhs.tv_sec == 0) { + return false; + } + if (lhs.tv_sec == 0 && rhs.tv_sec != 0) { + return false; + } + if (lhs.tv_sec != 0 && rhs.tv_sec == 0) { + return true; + } + + return std::tie(lhs.tv_sec, lhs.tv_usec) < std::tie(rhs.tv_sec, rhs.tv_usec); + } + }; + + typedef multi_index_container< + Callback, + indexed_by < + hashed_unique, + member + >, + ordered_non_unique, + member, + ttd_compare + > + > + > callbackmap_t; + callbackmap_t d_readCallbacks, d_writeCallbacks; virtual void addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, const funcparam_t& parameter, const struct timeval* ttd=nullptr)=0; @@ -171,6 +209,7 @@ protected: void accountingAddFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, const funcparam_t& parameter, const struct timeval* ttd=nullptr) { Callback cb; + cb.d_fd = fd; cb.d_callback=toDo; cb.d_parameter=parameter; memset(&cb.d_ttd, 0, sizeof(cb.d_ttd)); @@ -178,7 +217,7 @@ protected: cb.d_ttd = *ttd; } - auto pair = cbmap.insert({fd, cb}); + auto pair = cbmap.insert(cb); if (!pair.second) { throw FDMultiplexerException("Tried to add fd "+std::to_string(fd)+ " to multiplexer twice"); } diff --git a/pdns/pollmplexer.cc b/pdns/pollmplexer.cc index be1d0c9f6..8e3e8b1d3 100644 --- a/pdns/pollmplexer.cc +++ b/pdns/pollmplexer.cc @@ -67,7 +67,7 @@ void PollFDMultiplexer::addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, void PollFDMultiplexer::removeFD(callbackmap_t& cbmap, int fd) { - if(d_inrun && d_iter->first==fd) // trying to remove us! + if(d_inrun && d_iter->d_fd==fd) // trying to remove us! ++d_iter; if(!cbmap.erase(fd)) @@ -81,13 +81,13 @@ vector PollFDMultiplexer::preparePollFD() const struct pollfd pollfd; for(const auto& cb : d_readCallbacks) { - pollfd.fd = cb.first; + pollfd.fd = cb.d_fd; pollfd.events = POLLIN; pollfds.push_back(pollfd); } for(const auto& cb : d_writeCallbacks) { - pollfd.fd = cb.first; + pollfd.fd = cb.d_fd; pollfd.events = POLLOUT; pollfds.push_back(pollfd); } @@ -132,7 +132,7 @@ int PollFDMultiplexer::run(struct timeval* now, int timeout) d_iter=d_readCallbacks.find(pollfd.fd); if(d_iter != d_readCallbacks.end()) { - d_iter->second.d_callback(d_iter->first, d_iter->second.d_parameter); + d_iter->d_callback(d_iter->d_fd, d_iter->d_parameter); continue; // so we don't refind ourselves as writable! } } @@ -140,7 +140,7 @@ int PollFDMultiplexer::run(struct timeval* now, int timeout) d_iter=d_writeCallbacks.find(pollfd.fd); if(d_iter != d_writeCallbacks.end()) { - d_iter->second.d_callback(d_iter->first, d_iter->second.d_parameter); + d_iter->d_callback(d_iter->d_fd, d_iter->d_parameter); } } } diff --git a/pdns/portsmplexer.cc b/pdns/portsmplexer.cc index 8aaf71bc5..39939b2f4 100644 --- a/pdns/portsmplexer.cc +++ b/pdns/portsmplexer.cc @@ -113,7 +113,7 @@ int PortsFDMultiplexer::run(struct timeval* now, int timeout) d_iter=d_readCallbacks.find(d_pevents[n].portev_object); if(d_iter != d_readCallbacks.end()) { - d_iter->second.d_callback(d_iter->first, d_iter->second.d_parameter); + d_iter->d_callback(d_iter->d_fd, d_iter->d_parameter); if(d_readCallbacks.count(d_pevents[n].portev_object) && port_associate(d_portfd, PORT_SOURCE_FD, d_pevents[n].portev_object, POLLIN, 0) < 0) throw FDMultiplexerException("Unable to add fd back to ports (read): "+stringerror()); @@ -123,7 +123,7 @@ int PortsFDMultiplexer::run(struct timeval* now, int timeout) d_iter=d_writeCallbacks.find(d_pevents[n].portev_object); if(d_iter != d_writeCallbacks.end()) { - d_iter->second.d_callback(d_iter->first, d_iter->second.d_parameter); + d_iter->d_callback(d_iter->d_fd, d_iter->d_parameter); if(d_writeCallbacks.count(d_pevents[n].portev_object) && port_associate(d_portfd, PORT_SOURCE_FD, d_pevents[n].portev_object, POLLOUT, 0) < 0) throw FDMultiplexerException("Unable to add fd back to ports (write): "+stringerror());