From: Remi Gacogne Date: Thu, 1 Mar 2018 11:19:29 +0000 (+0000) Subject: dnsdist: Add an option to use several source ports toward a backend X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=225e9c9946113646b9e1787afd23e8aa5103ac8d;p=pdns dnsdist: Add an option to use several source ports toward a backend This is very useful if the backend is distributing queries based only on (source IP, source port, destination IP, destination port), which is for example the case of PowerDNS Recursor with several threads, reuseport set and pdns-distribute-queries not set. (cherry picked from commit 150105a20eaebc8e0041b1a41b81fd90d7dbaba3) (cherry picked from commit e998def2a9007aab462c90ff5badc2eab7253ee8) (cherry picked from commit 38069e7ea3ad11b1cc055469cf3378531f2e7239) (cherry picked from commit cd73ceebff1b805de167ad23198def0c8a59d786) (cherry picked from commit 5bdbb83d3b2a6aabb9a4f36f7b0a2ee0acca2f15) --- diff --git a/pdns/devpollmplexer.cc b/pdns/devpollmplexer.cc index 54543787a..2d68dc07e 100644 --- a/pdns/devpollmplexer.cc +++ b/pdns/devpollmplexer.cc @@ -47,11 +47,12 @@ public: close(d_devpollfd); } - virtual int run(struct timeval* tv); + virtual int run(struct timeval* tv) override; + virtual void getAvailableFDs(std::vector& fds, int timeout) override; - virtual void addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, const funcparam_t& parameter); - virtual void removeFD(callbackmap_t& cbmap, int fd); - string getName() + virtual void addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, const funcparam_t& parameter) override; + virtual void removeFD(callbackmap_t& cbmap, int fd) override; + string getName() const override { return "/dev/poll"; } @@ -113,6 +114,26 @@ void DevPollFDMultiplexer::removeFD(callbackmap_t& cbmap, int fd) } } +void DevPollFDMultiplexer::getAvailableFDs(std::vector& fds, int timeout) +{ + struct dvpoll dvp; + dvp.dp_nfds = d_readCallbacks.size() + d_writeCallbacks.size(); + dvp.dp_fds = new pollfd[dvp.dp_nfds]; + dvp.dp_timeout = timeout; + int ret=ioctl(d_devpollfd, DP_POLL, &dvp); + + if(ret < 0 && errno!=EINTR) { + delete[] dvp.dp_fds; + throw FDMultiplexerException("/dev/poll returned error: "+stringerror()); + } + + for(int n=0; n < ret; ++n) { + fds.push_back(dvp.dp_fds[n].fd); + } + + delete[] dvp.dp_fds; +} + int DevPollFDMultiplexer::run(struct timeval* now) { if(d_inrun) { @@ -124,7 +145,7 @@ int DevPollFDMultiplexer::run(struct timeval* now) dvp.dp_timeout = 500; int ret=ioctl(d_devpollfd, DP_POLL, &dvp); gettimeofday(now,0); // MANDATORY! - + if(ret < 0 && errno!=EINTR) { delete[] dvp.dp_fds; throw FDMultiplexerException("/dev/poll returned error: "+stringerror()); diff --git a/pdns/dnsdist-console.cc b/pdns/dnsdist-console.cc index f34b22488..8252753ac 100644 --- a/pdns/dnsdist-console.cc +++ b/pdns/dnsdist-console.cc @@ -336,7 +336,7 @@ const std::vector g_consoleKeywords{ { "newQPSLimiter", true, "rate, burst", "configure a QPS limiter with that rate and that burst capacity" }, { "newRemoteLogger", true, "address:port [, timeout=2, maxQueuedEntries=100, reconnectWaitTime=1]", "create a Remote Logger object, to use with `RemoteLogAction()` and `RemoteLogResponseAction()`" }, { "newRuleAction", true, "DNS rule, DNS action", "return a pair of DNS Rule and DNS Action, to be used with `setRules()`" }, - { "newServer", true, "{address=\"ip:port\", qps=1000, order=1, weight=10, pool=\"abuse\", retries=5, tcpConnectTimeout=5, tcpSendTimeout=30, tcpRecvTimeout=30, checkName=\"a.root-servers.net.\", checkType=\"A\", maxCheckFailures=1, mustResolve=false, useClientSubnet=true, source=\"address|interface name|address@interface\"}", "instantiate a server" }, + { "newServer", true, "{address=\"ip:port\", qps=1000, order=1, weight=10, pool=\"abuse\", retries=5, tcpConnectTimeout=5, tcpSendTimeout=30, tcpRecvTimeout=30, checkName=\"a.root-servers.net.\", checkType=\"A\", maxCheckFailures=1, mustResolve=false, useClientSubnet=true, source=\"address|interface name|address@interface\", sockets=1}", "instantiate a server" }, { "newServerPolicy", true, "name, function", "create a policy object from a Lua function" }, { "newSuffixMatchNode", true, "", "returns a new SuffixMatchNode" }, { "NoRecurseAction", true, "", "strip RD bit from the question, let it go through" }, diff --git a/pdns/dnsdist-lua.cc b/pdns/dnsdist-lua.cc index f3c28c20a..33c287c59 100644 --- a/pdns/dnsdist-lua.cc +++ b/pdns/dnsdist-lua.cc @@ -270,6 +270,7 @@ vector> setupLua(bool client, const std::string& confi } ComboAddress sourceAddr; unsigned int sourceItf = 0; + size_t numberOfSockets = 1; if(auto addressStr = boost::get(&pvars)) { std::shared_ptr ret; try { @@ -368,6 +369,14 @@ vector> setupLua(bool client, const std::string& confi } } + if (vars.count("sockets")) { + numberOfSockets = std::stoul(boost::get(vars["sockets"])); + if (numberOfSockets == 0) { + warnlog("Dismissing invalid number of sockets '%s', using 1 instead", boost::get(vars["sockets"])); + numberOfSockets = 1; + } + } + std::shared_ptr ret; try { ComboAddress address(boost::get(vars["address"]), 53); @@ -376,7 +385,7 @@ vector> setupLua(bool client, const std::string& confi errlog("Error creating new server: %s is not a valid address for a downstream server", boost::get(vars["address"])); return ret; } - ret=std::make_shared(address, sourceAddr, sourceItf); + ret=std::make_shared(address, sourceAddr, sourceItf, numberOfSockets); } catch(const PDNSException& e) { g_outputBuffer="Error creating new server: "+string(e.reason); diff --git a/pdns/dnsdist.cc b/pdns/dnsdist.cc index d09c41bf6..9fba707ac 100644 --- a/pdns/dnsdist.cc +++ b/pdns/dnsdist.cc @@ -378,6 +378,27 @@ static bool sendUDPResponse(int origFD, char* response, uint16_t responseLen, in return true; } + +static int pickBackendSocketForSending(DownstreamState* state) +{ + return state->sockets[state->socketsOffset++ % state->sockets.size()]; +} + +static void pickBackendSocketsReadyForReceiving(const std::shared_ptr& state, std::vector& ready) +{ + ready.clear(); + + if (state->sockets.size() == 1) { + ready.push_back(state->sockets[0]); + return ; + } + + { + std::lock_guard lock(state->socketsLock); + state->mplexer->getAvailableFDs(ready, -1); + } +} + // listens on a dedicated socket, lobs answers from downstream servers to original requestors void* responderThread(std::shared_ptr state) try { @@ -394,119 +415,126 @@ try { vector rewrittenResponse; uint16_t queryId = 0; + std::vector sockets; + sockets.reserve(state->sockets.size()); + for(;;) { dnsheader* dh = reinterpret_cast(packet); bool outstandingDecreased = false; try { - ssize_t got = recv(state->fd, packet, sizeof(packet), 0); - char * response = packet; - size_t responseSize = sizeof(packet); + pickBackendSocketsReadyForReceiving(state, sockets); + for (const auto& fd : sockets) { + ssize_t got = recv(fd, packet, sizeof(packet), 0); + char * response = packet; + size_t responseSize = sizeof(packet); - if (got < (ssize_t) sizeof(dnsheader)) - continue; + if (got < (ssize_t) sizeof(dnsheader)) + continue; - uint16_t responseLen = (uint16_t) got; - queryId = dh->id; + uint16_t responseLen = (uint16_t) got; + queryId = dh->id; - if(queryId >= state->idStates.size()) - continue; + if(queryId >= state->idStates.size()) + continue; - IDState* ids = &state->idStates[queryId]; - int origFD = ids->origFD; + IDState* ids = &state->idStates[queryId]; + int origFD = ids->origFD; - if(origFD < 0) // duplicate - continue; + if(origFD < 0) // duplicate + continue; - /* setting age to 0 to prevent the maintainer thread from - cleaning this IDS while we process the response. - We have already a copy of the origFD, so it would - mostly mess up the outstanding counter. - */ - ids->age = 0; + /* setting age to 0 to prevent the maintainer thread from + cleaning this IDS while we process the response. + We have already a copy of the origFD, so it would + mostly mess up the outstanding counter. + */ + ids->age = 0; - if (!responseContentMatches(response, responseLen, ids->qname, ids->qtype, ids->qclass, state->remote)) { - continue; - } + if (!responseContentMatches(response, responseLen, ids->qname, ids->qtype, ids->qclass, state->remote)) { + continue; + } - --state->outstanding; // you'd think an attacker could game this, but we're using connected socket - outstandingDecreased = true; + --state->outstanding; // you'd think an attacker could game this, but we're using connected socket + outstandingDecreased = true; - if(dh->tc && g_truncateTC) { - truncateTC(response, &responseLen); - } + if(dh->tc && g_truncateTC) { + truncateTC(response, &responseLen); + } - dh->id = ids->origID; + dh->id = ids->origID; - uint16_t addRoom = 0; - DNSResponse dr(&ids->qname, ids->qtype, ids->qclass, &ids->origDest, &ids->origRemote, dh, sizeof(packet), responseLen, false, &ids->sentTime.d_start); + uint16_t addRoom = 0; + DNSResponse dr(&ids->qname, ids->qtype, ids->qclass, &ids->origDest, &ids->origRemote, dh, sizeof(packet), responseLen, false, &ids->sentTime.d_start); #ifdef HAVE_PROTOBUF - dr.uniqueId = ids->uniqueId; + dr.uniqueId = ids->uniqueId; #endif - if (!processResponse(localRespRulactions, dr, &ids->delayMsec)) { - continue; - } + if (!processResponse(localRespRulactions, dr, &ids->delayMsec)) { + continue; + } #ifdef HAVE_DNSCRYPT - if (ids->dnsCryptQuery) { - addRoom = DNSCRYPT_MAX_RESPONSE_PADDING_AND_MAC_SIZE; - } + if (ids->dnsCryptQuery) { + addRoom = DNSCRYPT_MAX_RESPONSE_PADDING_AND_MAC_SIZE; + } #endif - if (!fixUpResponse(&response, &responseLen, &responseSize, ids->qname, ids->origFlags, ids->ednsAdded, ids->ecsAdded, rewrittenResponse, addRoom)) { - continue; - } + if (!fixUpResponse(&response, &responseLen, &responseSize, ids->qname, ids->origFlags, ids->ednsAdded, ids->ecsAdded, rewrittenResponse, addRoom)) { + continue; + } - if (ids->packetCache && !ids->skipCache) { - ids->packetCache->insert(ids->cacheKey, ids->qname, ids->qtype, ids->qclass, response, responseLen, false, dh->rcode); - } + if (ids->packetCache && !ids->skipCache) { + ids->packetCache->insert(ids->cacheKey, ids->qname, ids->qtype, ids->qclass, response, responseLen, false, dh->rcode); + } - if (ids->cs && !ids->cs->muted) { + if (ids->cs && !ids->cs->muted) { #ifdef HAVE_DNSCRYPT - if (!encryptResponse(response, &responseLen, responseSize, false, ids->dnsCryptQuery, &dh, &dhCopy)) { - continue; - } + if (!encryptResponse(response, &responseLen, responseSize, false, ids->dnsCryptQuery, &dh, &dhCopy)) { + continue; + } #endif - ComboAddress empty; - empty.sin4.sin_family = 0; - /* if ids->destHarvested is false, origDest holds the listening address. - We don't want to use that as a source since it could be 0.0.0.0 for example. */ - sendUDPResponse(origFD, response, responseLen, ids->delayMsec, ids->destHarvested ? ids->origDest : empty, ids->origRemote); - } + ComboAddress empty; + empty.sin4.sin_family = 0; + /* if ids->destHarvested is false, origDest holds the listening address. + We don't want to use that as a source since it could be 0.0.0.0 for example. */ + sendUDPResponse(origFD, response, responseLen, ids->delayMsec, ids->destHarvested ? ids->origDest : empty, ids->origRemote); + } - g_stats.responses++; + g_stats.responses++; - double udiff = ids->sentTime.udiff(); - vinfolog("Got answer from %s, relayed to %s, took %f usec", state->remote.toStringWithPort(), ids->origRemote.toStringWithPort(), udiff); + double udiff = ids->sentTime.udiff(); + vinfolog("Got answer from %s, relayed to %s, took %f usec", state->remote.toStringWithPort(), ids->origRemote.toStringWithPort(), udiff); - { - struct timespec ts; - gettime(&ts); - std::lock_guard lock(g_rings.respMutex); - g_rings.respRing.push_back({ts, ids->origRemote, ids->qname, ids->qtype, (unsigned int)udiff, (unsigned int)got, *dh, state->remote}); - } + { + struct timespec ts; + gettime(&ts); + std::lock_guard lock(g_rings.respMutex); + g_rings.respRing.push_back({ts, ids->origRemote, ids->qname, ids->qtype, (unsigned int)udiff, (unsigned int)got, *dh, state->remote}); + } + + if(dh->rcode == RCode::ServFail) + g_stats.servfailResponses++; + + state->latencyUsec = (127.0 * state->latencyUsec / 128.0) + udiff/128.0; - if(dh->rcode == RCode::ServFail) - g_stats.servfailResponses++; - state->latencyUsec = (127.0 * state->latencyUsec / 128.0) + udiff/128.0; + if(udiff < 1000) g_stats.latency0_1++; + else if(udiff < 10000) g_stats.latency1_10++; + else if(udiff < 50000) g_stats.latency10_50++; + else if(udiff < 100000) g_stats.latency50_100++; + else if(udiff < 1000000) g_stats.latency100_1000++; + else g_stats.latencySlow++; - if(udiff < 1000) g_stats.latency0_1++; - else if(udiff < 10000) g_stats.latency1_10++; - else if(udiff < 50000) g_stats.latency10_50++; - else if(udiff < 100000) g_stats.latency50_100++; - else if(udiff < 1000000) g_stats.latency100_1000++; - else g_stats.latencySlow++; - - doLatencyAverages(udiff); + doLatencyAverages(udiff); - if (ids->origFD == origFD) { + if (ids->origFD == origFD) { #ifdef HAVE_DNSCRYPT - ids->dnsCryptQuery = 0; + ids->dnsCryptQuery = nullptr; #endif - ids->origFD = -1; - outstandingDecreased = false; - } + ids->origFD = -1; + outstandingDecreased = false; + } - rewrittenResponse.clear(); + rewrittenResponse.clear(); + } } catch(const std::exception& e){ vinfolog("Got an error in UDP responder thread while parsing a response from %s, id %d: %s", state->remote.toStringWithPort(), queryId, e.what()); @@ -541,30 +569,61 @@ catch(...) void DownstreamState::reconnect() { connected = false; - if (fd != -1) { - /* shutdown() is needed to wake up recv() in the responderThread */ - shutdown(fd, SHUT_RDWR); - close(fd); - fd = -1; - } - if (!IsAnyAddress(remote)) { - fd = SSocket(remote.sin4.sin_family, SOCK_DGRAM, 0); - if (!IsAnyAddress(sourceAddr)) { - SSetsockopt(fd, SOL_SOCKET, SO_REUSEADDR, 1); - SBind(fd, sourceAddr); - } - try { - SConnect(fd, remote); - connected = true; + for (auto& fd : sockets) { + if (fd != -1) { + { + std::lock_guard lock(socketsLock); + mplexer->removeReadFD(fd); + } + /* shutdown() is needed to wake up recv() in the responderThread */ + shutdown(fd, SHUT_RDWR); + close(fd); + fd = -1; + } + if (!IsAnyAddress(remote)) { + fd = SSocket(remote.sin4.sin_family, SOCK_DGRAM, 0); + if (!IsAnyAddress(sourceAddr)) { + SSetsockopt(fd, SOL_SOCKET, SO_REUSEADDR, 1); + SBind(fd, sourceAddr); + } + try { + SConnect(fd, remote); + { + std::lock_guard lock(socketsLock); + mplexer->addReadFD(fd, [](int, boost::any) {}); + } + connected = true; + } + catch(const std::runtime_error& error) { + infolog("Error connecting to new server with address %s: %s", remote.toStringWithPort(), error.what()); + connected = false; + break; + } } - catch(const std::runtime_error& error) { - infolog("Error connecting to new server with address %s: %s", remote.toStringWithPort(), error.what()); + } + + /* if at least one (re-)connection failed, close all sockets */ + if (!connected) { + for (auto& fd : sockets) { + if (fd != -1) { + /* shutdown() is needed to wake up recv() in the responderThread */ + shutdown(fd, SHUT_RDWR); + close(fd); + fd = -1; + } } } } -DownstreamState::DownstreamState(const ComboAddress& remote_, const ComboAddress& sourceAddr_, unsigned int sourceItf_): remote(remote_), sourceAddr(sourceAddr_), sourceItf(sourceItf_) +DownstreamState::DownstreamState(const ComboAddress& remote_, const ComboAddress& sourceAddr_, unsigned int sourceItf_, size_t numberOfSockets): remote(remote_), sourceAddr(sourceAddr_), sourceItf(sourceItf_) { + mplexer = std::unique_ptr(FDMultiplexer::getMultiplexerSilent()); + + sockets.resize(numberOfSockets); + for (auto& fd : sockets) { + fd = -1; + } + if (!IsAnyAddress(remote)) { reconnect(); idStates.resize(g_maxOutstanding); @@ -1356,11 +1415,13 @@ try dh->id = idOffset; + int fd = pickBackendSocketForSending(ss); + if (largerQuery.empty()) { - ret = udpClientSendRequestToBackend(ss, ss->fd, query, dq.len); + ret = udpClientSendRequestToBackend(ss, fd, query, dq.len); } else { - ret = udpClientSendRequestToBackend(ss, ss->fd, largerQuery.c_str(), largerQuery.size()); + ret = udpClientSendRequestToBackend(ss, fd, largerQuery.c_str(), largerQuery.size()); largerQuery.clear(); } @@ -1567,15 +1628,23 @@ void* healthChecksThread() warnlog("Marking downstream %s as '%s'", dss->getNameWithAddr(), newState ? "up" : "down"); if (newState && !dss->connected) { - try { - SConnect(dss->fd, dss->remote); - dss->connected = true; - dss->tid = thread(responderThread, dss); + for (auto& fd : dss->sockets) { + try { + SConnect(fd, dss->remote); + { + std::lock_guard lock(dss->socketsLock); + dss->mplexer->addReadFD(fd, [](int, boost::any) {}); + } + dss->connected = true; + } + catch(const std::runtime_error& error) { + infolog("Error connecting to new server with address %s: %s", dss->remote.toStringWithPort(), error.what()); + newState = false; + dss->connected = false; + } } - catch(const std::runtime_error& error) { - infolog("Error connecting to new server with address %s: %s", dss->remote.toStringWithPort(), error.what()); - newState = false; - dss->connected = false; + if (dss->connected) { + dss->tid = thread(responderThread, dss); } } @@ -1706,7 +1775,15 @@ static void checkFileDescriptorsLimits(size_t udpBindsCount, size_t tcpBindsCoun { /* stdin, stdout, stderr */ size_t requiredFDsCount = 3; - size_t backendsCount = g_dstates.getCopy().size(); + const auto backends = g_dstates.getCopy(); + /* UDP sockets to backends */ + size_t backendUDPSocketsCount = 0; + for (const auto& backend : backends) { + backendUDPSocketsCount += backend->sockets.size(); + } + requiredFDsCount += backendUDPSocketsCount; + /* TCP sockets to backends */ + requiredFDsCount += (backends.size() * g_maxTCPClientThreads); /* listening sockets */ requiredFDsCount += udpBindsCount; requiredFDsCount += tcpBindsCount; @@ -1714,10 +1791,6 @@ static void checkFileDescriptorsLimits(size_t udpBindsCount, size_t tcpBindsCoun requiredFDsCount += g_maxTCPClientThreads; /* max pipes for communicating between TCP acceptors and client threads */ requiredFDsCount += (g_maxTCPClientThreads * 2); - /* UDP sockets to backends */ - requiredFDsCount += backendsCount; - /* TCP sockets to backends */ - requiredFDsCount += (backendsCount * g_maxTCPClientThreads); /* max TCP queued connections */ requiredFDsCount += g_maxTCPQueuedConnections; /* DelayPipe pipe */ diff --git a/pdns/dnsdist.hh b/pdns/dnsdist.hh index 34f36d916..f54d91147 100644 --- a/pdns/dnsdist.hh +++ b/pdns/dnsdist.hh @@ -24,6 +24,7 @@ #include "ext/luawrapper/include/LuaContext.hpp" #include #include "misc.hh" +#include "mplexer.hh" #include "iputils.hh" #include "dnsname.hh" #include @@ -580,15 +581,21 @@ extern std::shared_ptr g_tcpclientthreads; struct DownstreamState { - DownstreamState(const ComboAddress& remote_, const ComboAddress& sourceAddr_, unsigned int sourceItf); - DownstreamState(const ComboAddress& remote_): DownstreamState(remote_, ComboAddress(), 0) {} + DownstreamState(const ComboAddress& remote_, const ComboAddress& sourceAddr_, unsigned int sourceItf, size_t numberOfSockets); + DownstreamState(const ComboAddress& remote_): DownstreamState(remote_, ComboAddress(), 0, 1) {} ~DownstreamState() { - if (fd >= 0) - close(fd); + for (auto& fd : sockets) { + if (fd >= 0) { + close(fd); + fd = -1; + } + } } - int fd{-1}; + std::vector sockets; + std::mutex socketsLock; + std::unique_ptr mplexer{nullptr}; std::thread tid; ComboAddress remote; QPSLimiter qps; @@ -607,6 +614,7 @@ struct DownstreamState std::atomic queries{0}; } prev; string name; + size_t socketsOffset{0}; double queryLoad{0.0}; double dropRate{0.0}; double latencyUsec{0.0}; diff --git a/pdns/dnsdistdist/Makefile.am b/pdns/dnsdistdist/Makefile.am index 30dd88842..574f5da7e 100644 --- a/pdns/dnsdistdist/Makefile.am +++ b/pdns/dnsdistdist/Makefile.am @@ -46,7 +46,11 @@ EXTRA_DIST=dnslabeltext.rl \ bpf-filter.main.ebpf \ bpf-filter.qname.ebpf \ bpf-filter.ebpf.src \ - DNSDIST-MIB.txt + DNSDIST-MIB.txt \ + devpollmplexer.cc \ + epollmplexer.cc \ + kqueuemplexer.cc \ + portsmplexer.cc bin_PROGRAMS = dnsdist @@ -95,9 +99,11 @@ dnsdist_SOURCES = \ iputils.cc iputils.hh \ lock.hh \ misc.cc misc.hh \ + mplexer.hh \ htmlfiles.h \ namespaces.hh \ pdnsexception.hh \ + pollmplexer.cc \ protobuf.cc protobuf.hh \ qtype.cc qtype.hh \ remote_logger.cc remote_logger.hh \ @@ -151,6 +157,20 @@ dnsdist.$(OBJEXT): dnsmessage.pb.cc endif endif +if HAVE_FREEBSD +dnsdist_SOURCES += kqueuemplexer.cc +endif + +if HAVE_LINUX +dnsdist_SOURCES += epollmplexer.cc +endif + +if HAVE_SOLARIS +dnsdist_SOURCES += \ + devpollmplexer.cc \ + portsmplexer.cc +endif + testrunner_SOURCES = \ base64.hh \ dns.hh \ diff --git a/pdns/dnsdistdist/devpollmplexer.cc b/pdns/dnsdistdist/devpollmplexer.cc new file mode 120000 index 000000000..ab437856b --- /dev/null +++ b/pdns/dnsdistdist/devpollmplexer.cc @@ -0,0 +1 @@ +../devpollmplexer.cc \ No newline at end of file diff --git a/pdns/dnsdistdist/docs/reference/config.rst b/pdns/dnsdistdist/docs/reference/config.rst index 28f374d42..17b5f68bc 100644 --- a/pdns/dnsdistdist/docs/reference/config.rst +++ b/pdns/dnsdistdist/docs/reference/config.rst @@ -236,6 +236,7 @@ Servers -- "address", e.g. "192.0.2.2" -- "interface name", e.g. "eth0" -- "address@interface", e.g. "192.0.2.2@eth0" + sockets=NUM -- Number of sockets (and thus source ports) used toward the backend server, defaults to a single one }) :param str server_string: A simple IP:PORT string. diff --git a/pdns/dnsdistdist/epollmplexer.cc b/pdns/dnsdistdist/epollmplexer.cc new file mode 120000 index 000000000..b796a5780 --- /dev/null +++ b/pdns/dnsdistdist/epollmplexer.cc @@ -0,0 +1 @@ +../epollmplexer.cc \ No newline at end of file diff --git a/pdns/dnsdistdist/kqueuemplexer.cc b/pdns/dnsdistdist/kqueuemplexer.cc new file mode 120000 index 000000000..0824bd91d --- /dev/null +++ b/pdns/dnsdistdist/kqueuemplexer.cc @@ -0,0 +1 @@ +../kqueuemplexer.cc \ No newline at end of file diff --git a/pdns/dnsdistdist/mplexer.hh b/pdns/dnsdistdist/mplexer.hh new file mode 120000 index 000000000..abb3c5130 --- /dev/null +++ b/pdns/dnsdistdist/mplexer.hh @@ -0,0 +1 @@ +../mplexer.hh \ No newline at end of file diff --git a/pdns/dnsdistdist/pollmplexer.cc b/pdns/dnsdistdist/pollmplexer.cc new file mode 120000 index 000000000..008cc9199 --- /dev/null +++ b/pdns/dnsdistdist/pollmplexer.cc @@ -0,0 +1 @@ +../pollmplexer.cc \ No newline at end of file diff --git a/pdns/dnsdistdist/portsmplexer.cc b/pdns/dnsdistdist/portsmplexer.cc new file mode 120000 index 000000000..d5e7107b4 --- /dev/null +++ b/pdns/dnsdistdist/portsmplexer.cc @@ -0,0 +1 @@ +../portsmplexer.cc \ No newline at end of file diff --git a/pdns/epollmplexer.cc b/pdns/epollmplexer.cc index 9ab4ebf48..0b69993e6 100644 --- a/pdns/epollmplexer.cc +++ b/pdns/epollmplexer.cc @@ -31,7 +31,6 @@ #include #endif -#include "namespaces.hh" #include "namespaces.hh" class EpollFDMultiplexer : public FDMultiplexer @@ -43,11 +42,12 @@ public: close(d_epollfd); } - virtual int run(struct timeval* tv); + virtual int run(struct timeval* tv) override; + virtual void getAvailableFDs(std::vector& fds, int timeout) override; - virtual void addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, const funcparam_t& parameter); - virtual void removeFD(callbackmap_t& cbmap, int fd); - string getName() + virtual void addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, const funcparam_t& parameter) override; + virtual void removeFD(callbackmap_t& cbmap, int fd) override; + string getName() const override { return "epoll"; } @@ -70,8 +70,8 @@ static struct EpollRegisterOurselves } } doItEpoll; - int EpollFDMultiplexer::s_maxevents=1024; + EpollFDMultiplexer::EpollFDMultiplexer() : d_eevents(new epoll_event[s_maxevents]) { d_epollfd=epoll_create(s_maxevents); // not hard max @@ -124,6 +124,18 @@ void EpollFDMultiplexer::removeFD(callbackmap_t& cbmap, int fd) throw FDMultiplexerException("Removing fd from epoll set: "+stringerror()); } +void EpollFDMultiplexer::getAvailableFDs(std::vector& fds, int timeout) +{ + int ret=epoll_wait(d_epollfd, d_eevents.get(), s_maxevents, timeout); + + if(ret < 0 && errno!=EINTR) + throw FDMultiplexerException("epoll returned error: "+stringerror()); + + for(int n=0; n < ret; ++n) { + fds.push_back(d_eevents[n].data.fd); + } +} + int EpollFDMultiplexer::run(struct timeval* now) { if(d_inrun) { @@ -132,7 +144,7 @@ int EpollFDMultiplexer::run(struct timeval* now) int ret=epoll_wait(d_epollfd, d_eevents.get(), s_maxevents, 500); gettimeofday(now,0); // MANDATORY - + if(ret < 0 && errno!=EINTR) throw FDMultiplexerException("epoll returned error: "+stringerror()); diff --git a/pdns/kqueuemplexer.cc b/pdns/kqueuemplexer.cc index e0eefc876..a9b3a4d3b 100644 --- a/pdns/kqueuemplexer.cc +++ b/pdns/kqueuemplexer.cc @@ -27,14 +27,12 @@ #include #include #include "misc.hh" -#include "syncres.hh" #include #if defined(__FreeBSD__) || defined(__FreeBSD_kernel__) #include #endif #include -#include "namespaces.hh" #include "namespaces.hh" class KqueueFDMultiplexer : public FDMultiplexer @@ -46,11 +44,12 @@ public: close(d_kqueuefd); } - virtual int run(struct timeval* tv); + virtual int run(struct timeval* tv) override; + virtual void getAvailableFDs(std::vector& fds, int timeout) override; - virtual void addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, const boost::any& parameter); - virtual void removeFD(callbackmap_t& cbmap, int fd); - string getName() + virtual void addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, const boost::any& parameter) override; + virtual void removeFD(callbackmap_t& cbmap, int fd) override; + string getName() const override { return "kqueue"; } @@ -87,7 +86,7 @@ void KqueueFDMultiplexer::addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toD struct kevent kqevent; EV_SET(&kqevent, fd, (&cbmap == &d_readCallbacks) ? EVFILT_READ : EVFILT_WRITE, EV_ADD, 0,0,0); - + if(kevent(d_kqueuefd, &kqevent, 1, 0, 0, 0) < 0) { cbmap.erase(fd); throw FDMultiplexerException("Adding fd to kqueue set: "+stringerror()); @@ -105,6 +104,22 @@ void KqueueFDMultiplexer::removeFD(callbackmap_t& cbmap, int fd) throw FDMultiplexerException("Removing fd from kqueue set: "+stringerror()); } +void KqueueFDMultiplexer::getAvailableFDs(std::vector& fds, int timeout) +{ + struct timespec ts; + ts.tv_sec=timeout/1000; + ts.tv_nsec=(timeout % 1000) * 1000000; + + int ret = kevent(d_kqueuefd, 0, 0, d_kevents.get(), s_maxevents, &ts); + + if(ret < 0 && errno != EINTR) + throw FDMultiplexerException("kqueue returned error: "+stringerror()); + + for(int n=0; n < ret; ++n) { + fds.push_back(d_kevents[n].ident); + } +} + int KqueueFDMultiplexer::run(struct timeval* now) { if(d_inrun) { @@ -117,7 +132,7 @@ int KqueueFDMultiplexer::run(struct timeval* now) int ret=kevent(d_kqueuefd, 0, 0, d_kevents.get(), s_maxevents, &ts); gettimeofday(now,0); // MANDATORY! - + if(ret < 0 && errno!=EINTR) throw FDMultiplexerException("kqueue returned error: "+stringerror()); diff --git a/pdns/mplexer.hh b/pdns/mplexer.hh index dfa7bae23..7ff804ab2 100644 --- a/pdns/mplexer.hh +++ b/pdns/mplexer.hh @@ -30,7 +30,6 @@ #include #include #include -#include "utility.hh" #include class FDMultiplexerException : public std::runtime_error @@ -69,8 +68,13 @@ public: virtual ~FDMultiplexer() {} + static FDMultiplexer* getMultiplexerSilent(); + virtual int run(struct timeval* tv) = 0; + /* timeout is in ms, 0 will return immediatly, -1 will block until at least one FD is ready */ + virtual void getAvailableFDs(std::vector& fds, int timeout) = 0; + //! Add an fd to the read watch list - currently an fd can only be on one list at a time! virtual void addReadFD(int fd, callbackfunc_t toDo, const funcparam_t& parameter=funcparam_t()) { @@ -130,7 +134,7 @@ public: return theMap; } - virtual std::string getName() = 0; + virtual std::string getName() const = 0; protected: @@ -161,23 +165,5 @@ protected: } }; -class SelectFDMultiplexer : public FDMultiplexer -{ -public: - SelectFDMultiplexer() - {} - virtual ~SelectFDMultiplexer() - {} - - virtual int run(struct timeval* tv); - - virtual void addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, const funcparam_t& parameter); - virtual void removeFD(callbackmap_t& cbmap, int fd); - std::string getName() - { - return "select"; - } -}; - #endif diff --git a/pdns/pollmplexer.cc b/pdns/pollmplexer.cc index 08f126eb4..205f16633 100644 --- a/pdns/pollmplexer.cc +++ b/pdns/pollmplexer.cc @@ -6,11 +6,47 @@ #include #include #include "misc.hh" -#include "syncres.hh" -#include "utility.hh" -#include "namespaces.hh" #include "namespaces.hh" +FDMultiplexer* FDMultiplexer::getMultiplexerSilent() +{ + FDMultiplexer* ret = nullptr; + for(const auto& i : FDMultiplexer::getMultiplexerMap()) { + try { + ret = i.second(); + return ret; + } + catch(const FDMultiplexerException& fe) { + } + catch(...) { + } + } + return ret; +} + + +class PollFDMultiplexer : public FDMultiplexer +{ +public: + PollFDMultiplexer() + {} + virtual ~PollFDMultiplexer() + { + } + + virtual int run(struct timeval* tv) override; + virtual void getAvailableFDs(std::vector& fds, int timeout) override; + + virtual void addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, const funcparam_t& parameter) override; + virtual void removeFD(callbackmap_t& cbmap, int fd) override; + + string getName() const override + { + return "poll"; + } +private: + vector preparePollFD() const; +}; static FDMultiplexer* make() { @@ -44,52 +80,70 @@ void PollFDMultiplexer::removeFD(callbackmap_t& cbmap, int fd) throw FDMultiplexerException("Tried to remove unlisted fd "+std::to_string(fd)+ " from multiplexer"); } -bool pollfdcomp(const struct pollfd& a, const struct pollfd& b) +vector PollFDMultiplexer::preparePollFD() const { - return a.fd < b.fd; -} - -int PollFDMultiplexer::run(struct timeval* now) -{ - if(d_inrun) { - throw FDMultiplexerException("FDMultiplexer::run() is not reentrant!\n"); - } - vector pollfds; - + pollfds.reserve(d_readCallbacks.size() + d_writeCallbacks.size()); + struct pollfd pollfd; - for(callbackmap_t::const_iterator i=d_readCallbacks.begin(); i != d_readCallbacks.end(); ++i) { - pollfd.fd = i->first; + for(const auto& cb : d_readCallbacks) { + pollfd.fd = cb.first; pollfd.events = POLLIN; pollfds.push_back(pollfd); } - for(callbackmap_t::const_iterator i=d_writeCallbacks.begin(); i != d_writeCallbacks.end(); ++i) { - pollfd.fd = i->first; + for(const auto& cb : d_writeCallbacks) { + pollfd.fd = cb.first; pollfd.events = POLLOUT; pollfds.push_back(pollfd); } + return pollfds; +} + +void PollFDMultiplexer::getAvailableFDs(std::vector& fds, int timeout) +{ + auto pollfds = preparePollFD(); + int ret = poll(&pollfds[0], pollfds.size(), timeout); + + if (ret < 0 && errno != EINTR) + throw FDMultiplexerException("poll returned error: " + stringerror()); + + for(const auto& pollfd : pollfds) { + if (pollfd.revents == POLLIN || pollfd.revents == POLLOUT) { + fds.push_back(pollfd.fd); + } + } +} + +int PollFDMultiplexer::run(struct timeval* now) +{ + if(d_inrun) { + throw FDMultiplexerException("FDMultiplexer::run() is not reentrant!\n"); + } + + auto pollfds = preparePollFD(); + int ret=poll(&pollfds[0], pollfds.size(), 500); - Utility::gettimeofday(now, 0); // MANDATORY! + gettimeofday(now, 0); // MANDATORY! if(ret < 0 && errno!=EINTR) throw FDMultiplexerException("poll returned error: "+stringerror()); d_iter=d_readCallbacks.end(); d_inrun=true; - - for(unsigned int n = 0; n < pollfds.size(); ++n) { - if(pollfds[n].revents == POLLIN) { - d_iter=d_readCallbacks.find(pollfds[n].fd); + + for(const auto& pollfd : pollfds) { + if(pollfd.revents == POLLIN) { + d_iter=d_readCallbacks.find(pollfd.fd); if(d_iter != d_readCallbacks.end()) { d_iter->second.d_callback(d_iter->first, d_iter->second.d_parameter); continue; // so we don't refind ourselves as writable! } } - else if(pollfds[n].revents == POLLOUT) { - d_iter=d_writeCallbacks.find(pollfds[n].fd); + else if(pollfd.revents == POLLOUT) { + d_iter=d_writeCallbacks.find(pollfd.fd); if(d_iter != d_writeCallbacks.end()) { d_iter->second.d_callback(d_iter->first, d_iter->second.d_parameter); diff --git a/pdns/portsmplexer.cc b/pdns/portsmplexer.cc index 58d05f7ee..119f97def 100644 --- a/pdns/portsmplexer.cc +++ b/pdns/portsmplexer.cc @@ -11,9 +11,7 @@ #include #include "misc.hh" -#include "syncres.hh" -#include "namespaces.hh" #include "namespaces.hh" class PortsFDMultiplexer : public FDMultiplexer diff --git a/pdns/selectmplexer.cc b/pdns/selectmplexer.cc deleted file mode 100644 index 857dec657..000000000 --- a/pdns/selectmplexer.cc +++ /dev/null @@ -1,131 +0,0 @@ -#ifdef HAVE_CONFIG_H -#include "config.h" -#endif -#include "mplexer.hh" -#include "sstuff.hh" -#include -#include "misc.hh" -#include "utility.hh" - - -#include "namespaces.hh" -#include "namespaces.hh" - -static FDMultiplexer* make() -{ - return new SelectFDMultiplexer(); -} - -static struct RegisterOurselves -{ - RegisterOurselves() { - FDMultiplexer::getMultiplexerMap().insert(make_pair(1, &make)); - } -} doIt; - -void SelectFDMultiplexer::addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, const boost::any& parameter) -{ - Callback cb; - cb.d_callback=toDo; - cb.d_parameter=parameter; - memset(&cb.d_ttd, 0, sizeof(cb.d_ttd)); - if(cbmap.count(fd)) - throw FDMultiplexerException("Tried to add fd "+std::to_string(fd)+ " to multiplexer twice"); - cbmap[fd]=cb; -} - -void SelectFDMultiplexer::removeFD(callbackmap_t& cbmap, int fd) -{ - if(d_inrun && d_iter->first==fd) // trying to remove us! - d_iter++; - - if(!cbmap.erase(fd)) - throw FDMultiplexerException("Tried to remove unlisted fd "+std::to_string(fd)+ " from multiplexer"); -} - -int SelectFDMultiplexer::run(struct timeval* now) -{ - if(d_inrun) { - throw FDMultiplexerException("FDMultiplexer::run() is not reentrant!\n"); - } - fd_set readfds, writefds; - FD_ZERO(&readfds); - FD_ZERO(&writefds); - - int fdmax=0; - - for(callbackmap_t::const_iterator i=d_readCallbacks.begin(); i != d_readCallbacks.end(); ++i) { - FD_SET(i->first, &readfds); - fdmax=max(i->first, fdmax); - } - - for(callbackmap_t::const_iterator i=d_writeCallbacks.begin(); i != d_writeCallbacks.end(); ++i) { - FD_SET(i->first, &writefds); - fdmax=max(i->first, fdmax); - } - - struct timeval tv={0,500000}; - int ret=select(fdmax + 1, &readfds, &writefds, 0, &tv); - Utility::gettimeofday(now, 0); // MANDATORY! - - if(ret < 0 && errno!=EINTR) - throw FDMultiplexerException("select returned error: "+stringerror()); - - if(ret < 1) // nothing - thanks AB - return 0; - - d_iter=d_readCallbacks.end(); - d_inrun=true; - - for(callbackmap_t::iterator i=d_readCallbacks.begin(); i != d_readCallbacks.end() && i->first <= fdmax; ) { - d_iter=i++; - - if(FD_ISSET(d_iter->first, &readfds)) { - d_iter->second.d_callback(d_iter->first, d_iter->second.d_parameter); - continue; // so we don't refind ourselves as writable - } - } - - for(callbackmap_t::iterator i=d_writeCallbacks.begin(); i != d_writeCallbacks.end() && i->first <= fdmax; ) { - d_iter=i++; - if(FD_ISSET(d_iter->first, &writefds)) { - d_iter->second.d_callback(d_iter->first, d_iter->second.d_parameter); - } - } - - d_inrun=false; - return 0; -} - -#if 0 - -void acceptData(int fd, boost::any& parameter) -{ - cout<<"Have data on fd "<(parameter); - string packet; - IPEndpoint rem; - sock->recvFrom(packet, rem); - cout<<"Received "<