]> granicus.if.org Git - pdns/commitdiff
dnsdist: Add an option to use several source ports toward a backend
authorRemi Gacogne <remi.gacogne@powerdns.com>
Thu, 1 Mar 2018 11:19:29 +0000 (11:19 +0000)
committerRemi Gacogne <remi.gacogne@powerdns.com>
Thu, 15 Mar 2018 16:34:29 +0000 (17:34 +0100)
This is very useful if the backend is distributing queries based
only on (source IP, source port, destination IP, destination port),
which is for example the case of PowerDNS Recursor with several
threads, reuseport set and pdns-distribute-queries not set.

(cherry picked from commit 150105a20eaebc8e0041b1a41b81fd90d7dbaba3)
(cherry picked from commit e998def2a9007aab462c90ff5badc2eab7253ee8)
(cherry picked from commit 38069e7ea3ad11b1cc055469cf3378531f2e7239)
(cherry picked from commit cd73ceebff1b805de167ad23198def0c8a59d786)
(cherry picked from commit 5bdbb83d3b2a6aabb9a4f36f7b0a2ee0acca2f15)

19 files changed:
pdns/devpollmplexer.cc
pdns/dnsdist-console.cc
pdns/dnsdist-lua.cc
pdns/dnsdist.cc
pdns/dnsdist.hh
pdns/dnsdistdist/Makefile.am
pdns/dnsdistdist/devpollmplexer.cc [new symlink]
pdns/dnsdistdist/docs/reference/config.rst
pdns/dnsdistdist/epollmplexer.cc [new symlink]
pdns/dnsdistdist/kqueuemplexer.cc [new symlink]
pdns/dnsdistdist/mplexer.hh [new symlink]
pdns/dnsdistdist/pollmplexer.cc [new symlink]
pdns/dnsdistdist/portsmplexer.cc [new symlink]
pdns/epollmplexer.cc
pdns/kqueuemplexer.cc
pdns/mplexer.hh
pdns/pollmplexer.cc
pdns/portsmplexer.cc
pdns/selectmplexer.cc [deleted file]

index 54543787a6dfad0c30a8602f1c8a7ab217239114..2d68dc07ee7201fe13e568d2a9da189faedd8cbb 100644 (file)
@@ -47,11 +47,12 @@ public:
     close(d_devpollfd);
   }
 
-  virtual int run(struct timeval* tv);
+  virtual int run(struct timeval* tv) override;
+  virtual void getAvailableFDs(std::vector<int>& fds, int timeout) override;
 
-  virtual void addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, const funcparam_t& parameter);
-  virtual void removeFD(callbackmap_t& cbmap, int fd);
-  string getName()
+  virtual void addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, const funcparam_t& parameter) override;
+  virtual void removeFD(callbackmap_t& cbmap, int fd) override;
+  string getName() const override
   {
     return "/dev/poll";
   }
@@ -113,6 +114,26 @@ void DevPollFDMultiplexer::removeFD(callbackmap_t& cbmap, int fd)
   }
 }
 
+void DevPollFDMultiplexer::getAvailableFDs(std::vector<int>& fds, int timeout)
+{
+  struct dvpoll dvp;
+  dvp.dp_nfds = d_readCallbacks.size() + d_writeCallbacks.size();
+  dvp.dp_fds = new pollfd[dvp.dp_nfds];
+  dvp.dp_timeout = timeout;
+  int ret=ioctl(d_devpollfd, DP_POLL, &dvp);
+
+  if(ret < 0 && errno!=EINTR) {
+    delete[] dvp.dp_fds;
+    throw FDMultiplexerException("/dev/poll returned error: "+stringerror());
+  }
+
+  for(int n=0; n < ret; ++n) {
+    fds.push_back(dvp.dp_fds[n].fd);
+  }
+
+  delete[] dvp.dp_fds;
+}
+
 int DevPollFDMultiplexer::run(struct timeval* now)
 {
   if(d_inrun) {
@@ -124,7 +145,7 @@ int DevPollFDMultiplexer::run(struct timeval* now)
   dvp.dp_timeout = 500;
   int ret=ioctl(d_devpollfd, DP_POLL, &dvp); 
   gettimeofday(now,0); // MANDATORY!
-  
+
   if(ret < 0 && errno!=EINTR) {
     delete[] dvp.dp_fds;
     throw FDMultiplexerException("/dev/poll returned error: "+stringerror());
index f34b22488e4303e9dbab3e549493d9309c88772b..8252753ac8837cf4a070e6a215375088fc62ff0c 100644 (file)
@@ -336,7 +336,7 @@ const std::vector<ConsoleKeyword> g_consoleKeywords{
   { "newQPSLimiter", true, "rate, burst", "configure a QPS limiter with that rate and that burst capacity" },
   { "newRemoteLogger", true, "address:port [, timeout=2, maxQueuedEntries=100, reconnectWaitTime=1]", "create a Remote Logger object, to use with `RemoteLogAction()` and `RemoteLogResponseAction()`" },
   { "newRuleAction", true, "DNS rule, DNS action", "return a pair of DNS Rule and DNS Action, to be used with `setRules()`" },
-  { "newServer", true, "{address=\"ip:port\", qps=1000, order=1, weight=10, pool=\"abuse\", retries=5, tcpConnectTimeout=5, tcpSendTimeout=30, tcpRecvTimeout=30, checkName=\"a.root-servers.net.\", checkType=\"A\", maxCheckFailures=1, mustResolve=false, useClientSubnet=true, source=\"address|interface name|address@interface\"}", "instantiate a server" },
+  { "newServer", true, "{address=\"ip:port\", qps=1000, order=1, weight=10, pool=\"abuse\", retries=5, tcpConnectTimeout=5, tcpSendTimeout=30, tcpRecvTimeout=30, checkName=\"a.root-servers.net.\", checkType=\"A\", maxCheckFailures=1, mustResolve=false, useClientSubnet=true, source=\"address|interface name|address@interface\", sockets=1}", "instantiate a server" },
   { "newServerPolicy", true, "name, function", "create a policy object from a Lua function" },
   { "newSuffixMatchNode", true, "", "returns a new SuffixMatchNode" },
   { "NoRecurseAction", true, "", "strip RD bit from the question, let it go through" },
index f3c28c20a634204ae49f6d91c83849768b190837..33c287c59d2890fe878646aae9be9044ba7b3d99 100644 (file)
@@ -270,6 +270,7 @@ vector<std::function<void(void)>> setupLua(bool client, const std::string& confi
                        }
                        ComboAddress sourceAddr;
                        unsigned int sourceItf = 0;
+                        size_t numberOfSockets = 1;
                        if(auto addressStr = boost::get<string>(&pvars)) {
                          std::shared_ptr<DownstreamState> ret;
                          try {
@@ -368,6 +369,14 @@ vector<std::function<void(void)>> setupLua(bool client, const std::string& confi
                          }
                        }
 
+                        if (vars.count("sockets")) {
+                          numberOfSockets = std::stoul(boost::get<string>(vars["sockets"]));
+                          if (numberOfSockets == 0) {
+                            warnlog("Dismissing invalid number of sockets '%s', using 1 instead", boost::get<string>(vars["sockets"]));
+                            numberOfSockets = 1;
+                          }
+                        }
+
                        std::shared_ptr<DownstreamState> ret;
                        try {
                          ComboAddress address(boost::get<string>(vars["address"]), 53);
@@ -376,7 +385,7 @@ vector<std::function<void(void)>> setupLua(bool client, const std::string& confi
                            errlog("Error creating new server: %s is not a valid address for a downstream server", boost::get<string>(vars["address"]));
                            return ret;
                          }
-                         ret=std::make_shared<DownstreamState>(address, sourceAddr, sourceItf);
+                         ret=std::make_shared<DownstreamState>(address, sourceAddr, sourceItf, numberOfSockets);
                        }
                        catch(const PDNSException& e) {
                          g_outputBuffer="Error creating new server: "+string(e.reason);
index d09c41bf62ffe24563e0582dde1d0859e1c05a97..9fba707acc88c105e23a096a72eff76c78ffac32 100644 (file)
@@ -378,6 +378,27 @@ static bool sendUDPResponse(int origFD, char* response, uint16_t responseLen, in
   return true;
 }
 
+
+static int pickBackendSocketForSending(DownstreamState* state)
+{
+  return state->sockets[state->socketsOffset++ % state->sockets.size()];
+}
+
+static void pickBackendSocketsReadyForReceiving(const std::shared_ptr<DownstreamState>& state, std::vector<int>& ready)
+{
+  ready.clear();
+
+  if (state->sockets.size() == 1) {
+    ready.push_back(state->sockets[0]);
+    return ;
+  }
+
+  {
+    std::lock_guard<std::mutex> lock(state->socketsLock);
+    state->mplexer->getAvailableFDs(ready, -1);
+  }
+}
+
 // listens on a dedicated socket, lobs answers from downstream servers to original requestors
 void* responderThread(std::shared_ptr<DownstreamState> state)
 try {
@@ -394,119 +415,126 @@ try {
   vector<uint8_t> rewrittenResponse;
 
   uint16_t queryId = 0;
+  std::vector<int> sockets;
+  sockets.reserve(state->sockets.size());
+
   for(;;) {
     dnsheader* dh = reinterpret_cast<struct dnsheader*>(packet);
     bool outstandingDecreased = false;
     try {
-      ssize_t got = recv(state->fd, packet, sizeof(packet), 0);
-      char * response = packet;
-      size_t responseSize = sizeof(packet);
+      pickBackendSocketsReadyForReceiving(state, sockets);
+      for (const auto& fd : sockets) {
+        ssize_t got = recv(fd, packet, sizeof(packet), 0);
+        char * response = packet;
+        size_t responseSize = sizeof(packet);
 
-      if (got < (ssize_t) sizeof(dnsheader))
-        continue;
+        if (got < (ssize_t) sizeof(dnsheader))
+          continue;
 
-      uint16_t responseLen = (uint16_t) got;
-      queryId = dh->id;
+        uint16_t responseLen = (uint16_t) got;
+        queryId = dh->id;
 
-      if(queryId >= state->idStates.size())
-        continue;
+        if(queryId >= state->idStates.size())
+          continue;
 
-      IDState* ids = &state->idStates[queryId];
-      int origFD = ids->origFD;
+        IDState* ids = &state->idStates[queryId];
+        int origFD = ids->origFD;
 
-      if(origFD < 0) // duplicate
-        continue;
+        if(origFD < 0) // duplicate
+          continue;
 
-      /* setting age to 0 to prevent the maintainer thread from
-         cleaning this IDS while we process the response.
-         We have already a copy of the origFD, so it would
-         mostly mess up the outstanding counter.
-      */
-      ids->age = 0;
+        /* setting age to 0 to prevent the maintainer thread from
+           cleaning this IDS while we process the response.
+           We have already a copy of the origFD, so it would
+           mostly mess up the outstanding counter.
+        */
+        ids->age = 0;
 
-      if (!responseContentMatches(response, responseLen, ids->qname, ids->qtype, ids->qclass, state->remote)) {
-        continue;
-      }
+        if (!responseContentMatches(response, responseLen, ids->qname, ids->qtype, ids->qclass, state->remote)) {
+          continue;
+        }
 
-      --state->outstanding;  // you'd think an attacker could game this, but we're using connected socket
-      outstandingDecreased = true;
+        --state->outstanding;  // you'd think an attacker could game this, but we're using connected socket
+        outstandingDecreased = true;
 
-      if(dh->tc && g_truncateTC) {
-        truncateTC(response, &responseLen);
-      }
+        if(dh->tc && g_truncateTC) {
+          truncateTC(response, &responseLen);
+        }
 
-      dh->id = ids->origID;
+        dh->id = ids->origID;
 
-      uint16_t addRoom = 0;
-      DNSResponse dr(&ids->qname, ids->qtype, ids->qclass, &ids->origDest, &ids->origRemote, dh, sizeof(packet), responseLen, false, &ids->sentTime.d_start);
+        uint16_t addRoom = 0;
+        DNSResponse dr(&ids->qname, ids->qtype, ids->qclass, &ids->origDest, &ids->origRemote, dh, sizeof(packet), responseLen, false, &ids->sentTime.d_start);
 #ifdef HAVE_PROTOBUF
-      dr.uniqueId = ids->uniqueId;
+        dr.uniqueId = ids->uniqueId;
 #endif
-      if (!processResponse(localRespRulactions, dr, &ids->delayMsec)) {
-        continue;
-      }
+        if (!processResponse(localRespRulactions, dr, &ids->delayMsec)) {
+          continue;
+        }
 
 #ifdef HAVE_DNSCRYPT
-      if (ids->dnsCryptQuery) {
-        addRoom = DNSCRYPT_MAX_RESPONSE_PADDING_AND_MAC_SIZE;
-      }
+        if (ids->dnsCryptQuery) {
+          addRoom = DNSCRYPT_MAX_RESPONSE_PADDING_AND_MAC_SIZE;
+        }
 #endif
-      if (!fixUpResponse(&response, &responseLen, &responseSize, ids->qname, ids->origFlags, ids->ednsAdded, ids->ecsAdded, rewrittenResponse, addRoom)) {
-        continue;
-      }
+        if (!fixUpResponse(&response, &responseLen, &responseSize, ids->qname, ids->origFlags, ids->ednsAdded, ids->ecsAdded, rewrittenResponse, addRoom)) {
+          continue;
+        }
 
-      if (ids->packetCache && !ids->skipCache) {
-        ids->packetCache->insert(ids->cacheKey, ids->qname, ids->qtype, ids->qclass, response, responseLen, false, dh->rcode);
-      }
+        if (ids->packetCache && !ids->skipCache) {
+          ids->packetCache->insert(ids->cacheKey, ids->qname, ids->qtype, ids->qclass, response, responseLen, false, dh->rcode);
+        }
 
-      if (ids->cs && !ids->cs->muted) {
+        if (ids->cs && !ids->cs->muted) {
 #ifdef HAVE_DNSCRYPT
-        if (!encryptResponse(response, &responseLen, responseSize, false, ids->dnsCryptQuery, &dh, &dhCopy)) {
-          continue;
-        }
+          if (!encryptResponse(response, &responseLen, responseSize, false, ids->dnsCryptQuery, &dh, &dhCopy)) {
+            continue;
+          }
 #endif
 
-        ComboAddress empty;
-        empty.sin4.sin_family = 0;
-        /* if ids->destHarvested is false, origDest holds the listening address.
-           We don't want to use that as a source since it could be 0.0.0.0 for example. */
-        sendUDPResponse(origFD, response, responseLen, ids->delayMsec, ids->destHarvested ? ids->origDest : empty, ids->origRemote);
-      }
+          ComboAddress empty;
+          empty.sin4.sin_family = 0;
+          /* if ids->destHarvested is false, origDest holds the listening address.
+             We don't want to use that as a source since it could be 0.0.0.0 for example. */
+          sendUDPResponse(origFD, response, responseLen, ids->delayMsec, ids->destHarvested ? ids->origDest : empty, ids->origRemote);
+        }
 
-      g_stats.responses++;
+        g_stats.responses++;
 
-      double udiff = ids->sentTime.udiff();
-      vinfolog("Got answer from %s, relayed to %s, took %f usec", state->remote.toStringWithPort(), ids->origRemote.toStringWithPort(), udiff);
+        double udiff = ids->sentTime.udiff();
+        vinfolog("Got answer from %s, relayed to %s, took %f usec", state->remote.toStringWithPort(), ids->origRemote.toStringWithPort(), udiff);
 
-      {
-        struct timespec ts;
-        gettime(&ts);
-        std::lock_guard<std::mutex> lock(g_rings.respMutex);
-        g_rings.respRing.push_back({ts, ids->origRemote, ids->qname, ids->qtype, (unsigned int)udiff, (unsigned int)got, *dh, state->remote});
-      }
+        {
+          struct timespec ts;
+          gettime(&ts);
+          std::lock_guard<std::mutex> lock(g_rings.respMutex);
+          g_rings.respRing.push_back({ts, ids->origRemote, ids->qname, ids->qtype, (unsigned int)udiff, (unsigned int)got, *dh, state->remote});
+        }
+
+        if(dh->rcode == RCode::ServFail)
+          g_stats.servfailResponses++;
+
+        state->latencyUsec = (127.0 * state->latencyUsec / 128.0) + udiff/128.0;
 
-      if(dh->rcode == RCode::ServFail)
-        g_stats.servfailResponses++;
-      state->latencyUsec = (127.0 * state->latencyUsec / 128.0) + udiff/128.0;
+        if(udiff < 1000) g_stats.latency0_1++;
+        else if(udiff < 10000) g_stats.latency1_10++;
+        else if(udiff < 50000) g_stats.latency10_50++;
+        else if(udiff < 100000) g_stats.latency50_100++;
+        else if(udiff < 1000000) g_stats.latency100_1000++;
+        else g_stats.latencySlow++;
 
-      if(udiff < 1000) g_stats.latency0_1++;
-      else if(udiff < 10000) g_stats.latency1_10++;
-      else if(udiff < 50000) g_stats.latency10_50++;
-      else if(udiff < 100000) g_stats.latency50_100++;
-      else if(udiff < 1000000) g_stats.latency100_1000++;
-      else g_stats.latencySlow++;
-    
-      doLatencyAverages(udiff);
+        doLatencyAverages(udiff);
 
-      if (ids->origFD == origFD) {
+        if (ids->origFD == origFD) {
 #ifdef HAVE_DNSCRYPT
-        ids->dnsCryptQuery = 0;
+          ids->dnsCryptQuery = nullptr;
 #endif
-        ids->origFD = -1;
-        outstandingDecreased = false;
-      }
+          ids->origFD = -1;
+          outstandingDecreased = false;
+        }
 
-      rewrittenResponse.clear();
+        rewrittenResponse.clear();
+      }
     }
     catch(const std::exception& e){
       vinfolog("Got an error in UDP responder thread while parsing a response from %s, id %d: %s", state->remote.toStringWithPort(), queryId, e.what());
@@ -541,30 +569,61 @@ catch(...)
 void DownstreamState::reconnect()
 {
   connected = false;
-  if (fd != -1) {
-    /* shutdown() is needed to wake up recv() in the responderThread */
-    shutdown(fd, SHUT_RDWR);
-    close(fd);
-    fd = -1;
-  }
-  if (!IsAnyAddress(remote)) {
-    fd = SSocket(remote.sin4.sin_family, SOCK_DGRAM, 0);
-    if (!IsAnyAddress(sourceAddr)) {
-      SSetsockopt(fd, SOL_SOCKET, SO_REUSEADDR, 1);
-      SBind(fd, sourceAddr);
-    }
-    try {
-      SConnect(fd, remote);
-      connected = true;
+  for (auto& fd : sockets) {
+    if (fd != -1) {
+      {
+        std::lock_guard<std::mutex> lock(socketsLock);
+        mplexer->removeReadFD(fd);
+      }
+      /* shutdown() is needed to wake up recv() in the responderThread */
+      shutdown(fd, SHUT_RDWR);
+      close(fd);
+      fd = -1;
+    }
+    if (!IsAnyAddress(remote)) {
+      fd = SSocket(remote.sin4.sin_family, SOCK_DGRAM, 0);
+      if (!IsAnyAddress(sourceAddr)) {
+        SSetsockopt(fd, SOL_SOCKET, SO_REUSEADDR, 1);
+        SBind(fd, sourceAddr);
+      }
+      try {
+        SConnect(fd, remote);
+        {
+          std::lock_guard<std::mutex> lock(socketsLock);
+          mplexer->addReadFD(fd, [](int, boost::any) {});
+        }
+        connected = true;
+      }
+      catch(const std::runtime_error& error) {
+        infolog("Error connecting to new server with address %s: %s", remote.toStringWithPort(), error.what());
+        connected = false;
+        break;
+      }
     }
-    catch(const std::runtime_error& error) {
-      infolog("Error connecting to new server with address %s: %s", remote.toStringWithPort(), error.what());
+  }
+
+  /* if at least one (re-)connection failed, close all sockets */
+  if (!connected) {
+    for (auto& fd : sockets) {
+      if (fd != -1) {
+        /* shutdown() is needed to wake up recv() in the responderThread */
+        shutdown(fd, SHUT_RDWR);
+        close(fd);
+        fd = -1;
+      }
     }
   }
 }
 
-DownstreamState::DownstreamState(const ComboAddress& remote_, const ComboAddress& sourceAddr_, unsigned int sourceItf_): remote(remote_), sourceAddr(sourceAddr_), sourceItf(sourceItf_)
+DownstreamState::DownstreamState(const ComboAddress& remote_, const ComboAddress& sourceAddr_, unsigned int sourceItf_, size_t numberOfSockets): remote(remote_), sourceAddr(sourceAddr_), sourceItf(sourceItf_)
 {
+  mplexer = std::unique_ptr<FDMultiplexer>(FDMultiplexer::getMultiplexerSilent());
+
+  sockets.resize(numberOfSockets);
+  for (auto& fd : sockets) {
+    fd = -1;
+  }
+
   if (!IsAnyAddress(remote)) {
     reconnect();
     idStates.resize(g_maxOutstanding);
@@ -1356,11 +1415,13 @@ try
 
       dh->id = idOffset;
 
+      int fd = pickBackendSocketForSending(ss);
+
       if (largerQuery.empty()) {
-        ret = udpClientSendRequestToBackend(ss, ss->fd, query, dq.len);
+        ret = udpClientSendRequestToBackend(ss, fd, query, dq.len);
       }
       else {
-        ret = udpClientSendRequestToBackend(ss, ss->fd, largerQuery.c_str(), largerQuery.size());
+        ret = udpClientSendRequestToBackend(ss, fd, largerQuery.c_str(), largerQuery.size());
         largerQuery.clear();
       }
 
@@ -1567,15 +1628,23 @@ void* healthChecksThread()
           warnlog("Marking downstream %s as '%s'", dss->getNameWithAddr(), newState ? "up" : "down");
 
           if (newState && !dss->connected) {
-            try {
-              SConnect(dss->fd, dss->remote);
-              dss->connected = true;
-              dss->tid = thread(responderThread, dss);
+            for (auto& fd : dss->sockets) {
+              try {
+                SConnect(fd, dss->remote);
+                {
+                  std::lock_guard<std::mutex> lock(dss->socketsLock);
+                  dss->mplexer->addReadFD(fd, [](int, boost::any) {});
+                }
+                dss->connected = true;
+              }
+              catch(const std::runtime_error& error) {
+                infolog("Error connecting to new server with address %s: %s", dss->remote.toStringWithPort(), error.what());
+                newState = false;
+                dss->connected = false;
+              }
             }
-            catch(const std::runtime_error& error) {
-              infolog("Error connecting to new server with address %s: %s", dss->remote.toStringWithPort(), error.what());
-              newState = false;
-              dss->connected = false;
+            if (dss->connected) {
+              dss->tid = thread(responderThread, dss);
             }
           }
 
@@ -1706,7 +1775,15 @@ static void checkFileDescriptorsLimits(size_t udpBindsCount, size_t tcpBindsCoun
 {
   /* stdin, stdout, stderr */
   size_t requiredFDsCount = 3;
-  size_t backendsCount = g_dstates.getCopy().size();
+  const auto backends = g_dstates.getCopy();
+  /* UDP sockets to backends */
+  size_t backendUDPSocketsCount = 0;
+  for (const auto& backend : backends) {
+    backendUDPSocketsCount += backend->sockets.size();
+  }
+  requiredFDsCount += backendUDPSocketsCount;
+  /* TCP sockets to backends */
+  requiredFDsCount += (backends.size() * g_maxTCPClientThreads);
   /* listening sockets */
   requiredFDsCount += udpBindsCount;
   requiredFDsCount += tcpBindsCount;
@@ -1714,10 +1791,6 @@ static void checkFileDescriptorsLimits(size_t udpBindsCount, size_t tcpBindsCoun
   requiredFDsCount += g_maxTCPClientThreads;
   /* max pipes for communicating between TCP acceptors and client threads */
   requiredFDsCount += (g_maxTCPClientThreads * 2);
-  /* UDP sockets to backends */
-  requiredFDsCount += backendsCount;
-  /* TCP sockets to backends */
-  requiredFDsCount += (backendsCount * g_maxTCPClientThreads);
   /* max TCP queued connections */
   requiredFDsCount += g_maxTCPQueuedConnections;
   /* DelayPipe pipe */
index 34f36d916046a7abed3e0830250ea2747f141b58..f54d91147e4e00e2bb3fd81cbf9de77d510f3ddd 100644 (file)
@@ -24,6 +24,7 @@
 #include "ext/luawrapper/include/LuaContext.hpp"
 #include <time.h>
 #include "misc.hh"
+#include "mplexer.hh"
 #include "iputils.hh"
 #include "dnsname.hh"
 #include <atomic>
@@ -580,15 +581,21 @@ extern std::shared_ptr<TCPClientCollection> g_tcpclientthreads;
 
 struct DownstreamState
 {
-  DownstreamState(const ComboAddress& remote_, const ComboAddress& sourceAddr_, unsigned int sourceItf);
-  DownstreamState(const ComboAddress& remote_): DownstreamState(remote_, ComboAddress(), 0) {}
+  DownstreamState(const ComboAddress& remote_, const ComboAddress& sourceAddr_, unsigned int sourceItf, size_t numberOfSockets);
+  DownstreamState(const ComboAddress& remote_): DownstreamState(remote_, ComboAddress(), 0, 1) {}
   ~DownstreamState()
   {
-    if (fd >= 0)
-      close(fd);
+    for (auto& fd : sockets) {
+      if (fd >= 0) {
+        close(fd);
+        fd = -1;
+      }
+    }
   }
 
-  int fd{-1};
+  std::vector<int> sockets;
+  std::mutex socketsLock;
+  std::unique_ptr<FDMultiplexer> mplexer{nullptr};
   std::thread tid;
   ComboAddress remote;
   QPSLimiter qps;
@@ -607,6 +614,7 @@ struct DownstreamState
     std::atomic<uint64_t> queries{0};
   } prev;
   string name;
+  size_t socketsOffset{0};
   double queryLoad{0.0};
   double dropRate{0.0};
   double latencyUsec{0.0};
index 30dd8884225866a9892311240279fc8dfbc8b754..574f5da7efae6a0aa90af0be2f052600a96c3b7a 100644 (file)
@@ -46,7 +46,11 @@ EXTRA_DIST=dnslabeltext.rl \
           bpf-filter.main.ebpf \
           bpf-filter.qname.ebpf \
           bpf-filter.ebpf.src \
-          DNSDIST-MIB.txt
+          DNSDIST-MIB.txt \
+          devpollmplexer.cc \
+          epollmplexer.cc \
+          kqueuemplexer.cc \
+          portsmplexer.cc
 
 bin_PROGRAMS = dnsdist
 
@@ -95,9 +99,11 @@ dnsdist_SOURCES = \
        iputils.cc iputils.hh \
        lock.hh \
        misc.cc misc.hh \
+       mplexer.hh \
        htmlfiles.h \
        namespaces.hh \
        pdnsexception.hh \
+       pollmplexer.cc \
        protobuf.cc protobuf.hh \
        qtype.cc qtype.hh \
        remote_logger.cc remote_logger.hh \
@@ -151,6 +157,20 @@ dnsdist.$(OBJEXT): dnsmessage.pb.cc
 endif
 endif
 
+if HAVE_FREEBSD
+dnsdist_SOURCES += kqueuemplexer.cc
+endif
+
+if HAVE_LINUX
+dnsdist_SOURCES += epollmplexer.cc
+endif
+
+if HAVE_SOLARIS
+dnsdist_SOURCES += \
+        devpollmplexer.cc \
+        portsmplexer.cc
+endif
+
 testrunner_SOURCES = \
        base64.hh \
        dns.hh \
diff --git a/pdns/dnsdistdist/devpollmplexer.cc b/pdns/dnsdistdist/devpollmplexer.cc
new file mode 120000 (symlink)
index 0000000..ab43785
--- /dev/null
@@ -0,0 +1 @@
+../devpollmplexer.cc
\ No newline at end of file
index 28f374d429fe24bd562d4606bd1fbae9be87a7f7..17b5f68bc5dbe918c0c4e26b0e6db4984e4f1540 100644 (file)
@@ -236,6 +236,7 @@ Servers
                              --   "address", e.g. "192.0.2.2"
                              --   "interface name", e.g. "eth0"
                              --   "address@interface", e.g. "192.0.2.2@eth0"
+      sockets=NUM            -- Number of sockets (and thus source ports) used toward the backend server, defaults to a single one
     })
 
   :param str server_string: A simple IP:PORT string.
diff --git a/pdns/dnsdistdist/epollmplexer.cc b/pdns/dnsdistdist/epollmplexer.cc
new file mode 120000 (symlink)
index 0000000..b796a57
--- /dev/null
@@ -0,0 +1 @@
+../epollmplexer.cc
\ No newline at end of file
diff --git a/pdns/dnsdistdist/kqueuemplexer.cc b/pdns/dnsdistdist/kqueuemplexer.cc
new file mode 120000 (symlink)
index 0000000..0824bd9
--- /dev/null
@@ -0,0 +1 @@
+../kqueuemplexer.cc
\ No newline at end of file
diff --git a/pdns/dnsdistdist/mplexer.hh b/pdns/dnsdistdist/mplexer.hh
new file mode 120000 (symlink)
index 0000000..abb3c51
--- /dev/null
@@ -0,0 +1 @@
+../mplexer.hh
\ No newline at end of file
diff --git a/pdns/dnsdistdist/pollmplexer.cc b/pdns/dnsdistdist/pollmplexer.cc
new file mode 120000 (symlink)
index 0000000..008cc91
--- /dev/null
@@ -0,0 +1 @@
+../pollmplexer.cc
\ No newline at end of file
diff --git a/pdns/dnsdistdist/portsmplexer.cc b/pdns/dnsdistdist/portsmplexer.cc
new file mode 120000 (symlink)
index 0000000..d5e7107
--- /dev/null
@@ -0,0 +1 @@
+../portsmplexer.cc
\ No newline at end of file
index 9ab4ebf48efd73b62dc41109d17a771e6392197d..0b69993e68a9d767025e695feaf3407f2bc2603d 100644 (file)
@@ -31,7 +31,6 @@
 #include <sys/epoll.h>
 #endif
 
-#include "namespaces.hh"
 #include "namespaces.hh"
 
 class EpollFDMultiplexer : public FDMultiplexer
@@ -43,11 +42,12 @@ public:
     close(d_epollfd);
   }
 
-  virtual int run(struct timeval* tv);
+  virtual int run(struct timeval* tv) override;
+  virtual void getAvailableFDs(std::vector<int>& fds, int timeout) override;
 
-  virtual void addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, const funcparam_t& parameter);
-  virtual void removeFD(callbackmap_t& cbmap, int fd);
-  string getName()
+  virtual void addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, const funcparam_t& parameter) override;
+  virtual void removeFD(callbackmap_t& cbmap, int fd) override;
+  string getName() const override
   {
     return "epoll";
   }
@@ -70,8 +70,8 @@ static struct EpollRegisterOurselves
   }
 } doItEpoll;
 
-
 int EpollFDMultiplexer::s_maxevents=1024;
+
 EpollFDMultiplexer::EpollFDMultiplexer() : d_eevents(new epoll_event[s_maxevents])
 {
   d_epollfd=epoll_create(s_maxevents); // not hard max
@@ -124,6 +124,18 @@ void EpollFDMultiplexer::removeFD(callbackmap_t& cbmap, int fd)
     throw FDMultiplexerException("Removing fd from epoll set: "+stringerror());
 }
 
+void EpollFDMultiplexer::getAvailableFDs(std::vector<int>& fds, int timeout)
+{
+  int ret=epoll_wait(d_epollfd, d_eevents.get(), s_maxevents, timeout);
+
+  if(ret < 0 && errno!=EINTR)
+    throw FDMultiplexerException("epoll returned error: "+stringerror());
+
+  for(int n=0; n < ret; ++n) {
+    fds.push_back(d_eevents[n].data.fd);
+  }
+}
+
 int EpollFDMultiplexer::run(struct timeval* now)
 {
   if(d_inrun) {
@@ -132,7 +144,7 @@ int EpollFDMultiplexer::run(struct timeval* now)
   
   int ret=epoll_wait(d_epollfd, d_eevents.get(), s_maxevents, 500);
   gettimeofday(now,0); // MANDATORY
-  
+
   if(ret < 0 && errno!=EINTR)
     throw FDMultiplexerException("epoll returned error: "+stringerror());
 
index e0eefc87657ba8649e939bb1bd343da1773bb452..a9b3a4d3bc10fd401c665097bdb789a7cac74f84 100644 (file)
 #include <iostream>
 #include <unistd.h>
 #include "misc.hh"
-#include "syncres.hh"
 #include <sys/types.h>
 #if defined(__FreeBSD__) || defined(__FreeBSD_kernel__)
 #include <sys/event.h>
 #endif
 #include <sys/time.h>
 
-#include "namespaces.hh"
 #include "namespaces.hh"
 
 class KqueueFDMultiplexer : public FDMultiplexer
@@ -46,11 +44,12 @@ public:
     close(d_kqueuefd);
   }
 
-  virtual int run(struct timeval* tv);
+  virtual int run(struct timeval* tv) override;
+  virtual void getAvailableFDs(std::vector<int>& fds, int timeout) override;
 
-  virtual void addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, const boost::any& parameter);
-  virtual void removeFD(callbackmap_t& cbmap, int fd);
-  string getName()
+  virtual void addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, const boost::any& parameter) override;
+  virtual void removeFD(callbackmap_t& cbmap, int fd) override;
+  string getName() const override
   {
     return "kqueue";
   }
@@ -87,7 +86,7 @@ void KqueueFDMultiplexer::addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toD
 
   struct kevent kqevent;
   EV_SET(&kqevent, fd, (&cbmap == &d_readCallbacks) ? EVFILT_READ : EVFILT_WRITE, EV_ADD, 0,0,0);
-  
+
   if(kevent(d_kqueuefd, &kqevent, 1, 0, 0, 0) < 0) {
     cbmap.erase(fd);
     throw FDMultiplexerException("Adding fd to kqueue set: "+stringerror());
@@ -105,6 +104,22 @@ void KqueueFDMultiplexer::removeFD(callbackmap_t& cbmap, int fd)
     throw FDMultiplexerException("Removing fd from kqueue set: "+stringerror());
 }
 
+void KqueueFDMultiplexer::getAvailableFDs(std::vector<int>& fds, int timeout)
+{
+  struct timespec ts;
+  ts.tv_sec=timeout/1000;
+  ts.tv_nsec=(timeout % 1000) * 1000000;
+
+  int ret = kevent(d_kqueuefd, 0, 0, d_kevents.get(), s_maxevents, &ts);
+
+  if(ret < 0 && errno != EINTR)
+    throw FDMultiplexerException("kqueue returned error: "+stringerror());
+
+  for(int n=0; n < ret; ++n) {
+    fds.push_back(d_kevents[n].ident);
+  }
+}
+
 int KqueueFDMultiplexer::run(struct timeval* now)
 {
   if(d_inrun) {
@@ -117,7 +132,7 @@ int KqueueFDMultiplexer::run(struct timeval* now)
 
   int ret=kevent(d_kqueuefd, 0, 0, d_kevents.get(), s_maxevents, &ts);
   gettimeofday(now,0); // MANDATORY!
-  
+
   if(ret < 0 && errno!=EINTR)
     throw FDMultiplexerException("kqueue returned error: "+stringerror());
 
index dfa7bae238881b541d4a33dbd291e785b8c17d58..7ff804ab2d6e4cdc139204082cde24ac3ed3a7f7 100644 (file)
@@ -30,7 +30,6 @@
 #include <map>
 #include <stdexcept>
 #include <string>
-#include "utility.hh"
 #include <sys/time.h>
 
 class FDMultiplexerException : public std::runtime_error
@@ -69,8 +68,13 @@ public:
   virtual ~FDMultiplexer()
   {}
 
+  static FDMultiplexer* getMultiplexerSilent();
+
   virtual int run(struct timeval* tv) = 0;
 
+  /* timeout is in ms, 0 will return immediatly, -1 will block until at least one FD is ready */
+  virtual void getAvailableFDs(std::vector<int>& fds, int timeout) = 0;
+
   //! Add an fd to the read watch list - currently an fd can only be on one list at a time!
   virtual void addReadFD(int fd, callbackfunc_t toDo, const funcparam_t& parameter=funcparam_t())
   {
@@ -130,7 +134,7 @@ public:
     return theMap;
   }
   
-  virtual std::string getName() = 0;
+  virtual std::string getName() const = 0;
 
 
 protected:
@@ -161,23 +165,5 @@ protected:
   }
 };
 
-class SelectFDMultiplexer : public FDMultiplexer
-{
-public:
-  SelectFDMultiplexer()
-  {}
-  virtual ~SelectFDMultiplexer()
-  {}
-
-  virtual int run(struct timeval* tv);
-
-  virtual void addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, const funcparam_t& parameter);
-  virtual void removeFD(callbackmap_t& cbmap, int fd);
-  std::string getName()
-  {
-    return "select";
-  }
-};
-
 #endif
 
index 08f126eb4e770e4b2a2ccd117d9490589156c534..205f166332307f7771223d8ffcc7cc6e8db97788 100644 (file)
@@ -6,11 +6,47 @@
 #include <iostream>
 #include <poll.h>
 #include "misc.hh"
-#include "syncres.hh"
-#include "utility.hh" 
-#include "namespaces.hh"
 #include "namespaces.hh"
 
+FDMultiplexer* FDMultiplexer::getMultiplexerSilent()
+{
+  FDMultiplexer* ret = nullptr;
+  for(const auto& i : FDMultiplexer::getMultiplexerMap()) {
+    try {
+      ret = i.second();
+      return ret;
+    }
+    catch(const FDMultiplexerException& fe) {
+    }
+    catch(...) {
+    }
+  }
+  return ret;
+}
+
+
+class PollFDMultiplexer : public FDMultiplexer
+{
+public:
+  PollFDMultiplexer()
+  {}
+  virtual ~PollFDMultiplexer()
+  {
+  }
+
+  virtual int run(struct timeval* tv) override;
+  virtual void getAvailableFDs(std::vector<int>& fds, int timeout) override;
+
+  virtual void addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, const funcparam_t& parameter) override;
+  virtual void removeFD(callbackmap_t& cbmap, int fd) override;
+
+  string getName() const override
+  {
+    return "poll";
+  }
+private:
+  vector<struct pollfd> preparePollFD() const;
+};
 
 static FDMultiplexer* make()
 {
@@ -44,52 +80,70 @@ void PollFDMultiplexer::removeFD(callbackmap_t& cbmap, int fd)
     throw FDMultiplexerException("Tried to remove unlisted fd "+std::to_string(fd)+ " from multiplexer");
 }
 
-bool pollfdcomp(const struct pollfd& a, const struct pollfd& b)
+vector<struct pollfd> PollFDMultiplexer::preparePollFD() const
 {
-  return a.fd < b.fd;
-}
-
-int PollFDMultiplexer::run(struct timeval* now)
-{
-  if(d_inrun) {
-    throw FDMultiplexerException("FDMultiplexer::run() is not reentrant!\n");
-  }
-  
   vector<struct pollfd> pollfds;
-  
+  pollfds.reserve(d_readCallbacks.size() + d_writeCallbacks.size());
+
   struct pollfd pollfd;
-  for(callbackmap_t::const_iterator i=d_readCallbacks.begin(); i != d_readCallbacks.end(); ++i) {
-    pollfd.fd = i->first;
+  for(const auto& cb : d_readCallbacks) {
+    pollfd.fd = cb.first;
     pollfd.events = POLLIN;
     pollfds.push_back(pollfd);
   }
 
-  for(callbackmap_t::const_iterator i=d_writeCallbacks.begin(); i != d_writeCallbacks.end(); ++i) {
-    pollfd.fd = i->first;
+  for(const auto& cb : d_writeCallbacks) {
+    pollfd.fd = cb.first;
     pollfd.events = POLLOUT;
     pollfds.push_back(pollfd);
   }
 
+  return pollfds;
+}
+
+void PollFDMultiplexer::getAvailableFDs(std::vector<int>& fds, int timeout)
+{
+  auto pollfds = preparePollFD();
+  int ret = poll(&pollfds[0], pollfds.size(), timeout);
+
+  if (ret < 0 && errno != EINTR)
+    throw FDMultiplexerException("poll returned error: " + stringerror());
+
+  for(const auto& pollfd : pollfds) {
+    if (pollfd.revents == POLLIN || pollfd.revents == POLLOUT) {
+      fds.push_back(pollfd.fd);
+    }
+  }
+}
+
+int PollFDMultiplexer::run(struct timeval* now)
+{
+  if(d_inrun) {
+    throw FDMultiplexerException("FDMultiplexer::run() is not reentrant!\n");
+  }
+
+  auto pollfds = preparePollFD();
+
   int ret=poll(&pollfds[0], pollfds.size(), 500);
-  Utility::gettimeofday(now, 0); // MANDATORY!
+  gettimeofday(now, 0); // MANDATORY!
   
   if(ret < 0 && errno!=EINTR)
     throw FDMultiplexerException("poll returned error: "+stringerror());
 
   d_iter=d_readCallbacks.end();
   d_inrun=true;
-  
-  for(unsigned int n = 0; n < pollfds.size(); ++n) {  
-    if(pollfds[n].revents == POLLIN) {
-      d_iter=d_readCallbacks.find(pollfds[n].fd);
+
+  for(const auto& pollfd : pollfds) {
+    if(pollfd.revents == POLLIN) {
+      d_iter=d_readCallbacks.find(pollfd.fd);
     
       if(d_iter != d_readCallbacks.end()) {
         d_iter->second.d_callback(d_iter->first, d_iter->second.d_parameter);
         continue; // so we don't refind ourselves as writable!
       }
     }
-    else if(pollfds[n].revents == POLLOUT) {
-      d_iter=d_writeCallbacks.find(pollfds[n].fd);
+    else if(pollfd.revents == POLLOUT) {
+      d_iter=d_writeCallbacks.find(pollfd.fd);
     
       if(d_iter != d_writeCallbacks.end()) {
         d_iter->second.d_callback(d_iter->first, d_iter->second.d_parameter);
index 58d05f7eeed4059ea01331cc6d9442d77c007000..119f97def00527353d68362322a8fbd053c3b727 100644 (file)
@@ -11,9 +11,7 @@
 #include <iostream>
 
 #include "misc.hh"
-#include "syncres.hh"
 
-#include "namespaces.hh"
 #include "namespaces.hh"
 
 class PortsFDMultiplexer : public FDMultiplexer
diff --git a/pdns/selectmplexer.cc b/pdns/selectmplexer.cc
deleted file mode 100644 (file)
index 857dec6..0000000
+++ /dev/null
@@ -1,131 +0,0 @@
-#ifdef HAVE_CONFIG_H
-#include "config.h"
-#endif
-#include "mplexer.hh"
-#include "sstuff.hh"
-#include <iostream>
-#include "misc.hh"
-#include "utility.hh" 
-
-
-#include "namespaces.hh"
-#include "namespaces.hh"
-
-static FDMultiplexer* make()
-{
-  return new SelectFDMultiplexer();
-}
-
-static struct RegisterOurselves
-{
-  RegisterOurselves() {
-    FDMultiplexer::getMultiplexerMap().insert(make_pair(1, &make));
-  }
-} doIt;
-
-void SelectFDMultiplexer::addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, const boost::any& parameter)
-{
-  Callback cb;
-  cb.d_callback=toDo;
-  cb.d_parameter=parameter;
-  memset(&cb.d_ttd, 0, sizeof(cb.d_ttd));
-  if(cbmap.count(fd))
-    throw FDMultiplexerException("Tried to add fd "+std::to_string(fd)+ " to multiplexer twice");
-  cbmap[fd]=cb;
-}
-
-void SelectFDMultiplexer::removeFD(callbackmap_t& cbmap, int fd)
-{
-  if(d_inrun && d_iter->first==fd)  // trying to remove us!
-    d_iter++;
-
-  if(!cbmap.erase(fd))
-    throw FDMultiplexerException("Tried to remove unlisted fd "+std::to_string(fd)+ " from multiplexer");
-}
-
-int SelectFDMultiplexer::run(struct timeval* now)
-{
-  if(d_inrun) {
-    throw FDMultiplexerException("FDMultiplexer::run() is not reentrant!\n");
-  }
-  fd_set readfds, writefds;
-  FD_ZERO(&readfds);
-  FD_ZERO(&writefds);
-  
-  int fdmax=0;
-
-  for(callbackmap_t::const_iterator i=d_readCallbacks.begin(); i != d_readCallbacks.end(); ++i) {
-    FD_SET(i->first, &readfds);
-    fdmax=max(i->first, fdmax);
-  }
-
-  for(callbackmap_t::const_iterator i=d_writeCallbacks.begin(); i != d_writeCallbacks.end(); ++i) {
-    FD_SET(i->first, &writefds);
-    fdmax=max(i->first, fdmax);
-  }
-  
-  struct timeval tv={0,500000};
-  int ret=select(fdmax + 1, &readfds, &writefds, 0, &tv);
-  Utility::gettimeofday(now, 0); // MANDATORY!
-  
-  if(ret < 0 && errno!=EINTR)
-    throw FDMultiplexerException("select returned error: "+stringerror());
-
-  if(ret < 1) // nothing - thanks AB
-    return 0;
-
-  d_iter=d_readCallbacks.end();
-  d_inrun=true;
-  
-  for(callbackmap_t::iterator i=d_readCallbacks.begin(); i != d_readCallbacks.end() && i->first <= fdmax; ) {
-    d_iter=i++;
-
-    if(FD_ISSET(d_iter->first, &readfds)) {
-      d_iter->second.d_callback(d_iter->first, d_iter->second.d_parameter);
-      continue;  // so we don't refind ourselves as writable
-    }
-  }
-
-  for(callbackmap_t::iterator i=d_writeCallbacks.begin(); i != d_writeCallbacks.end() && i->first <= fdmax; ) {
-    d_iter=i++;
-    if(FD_ISSET(d_iter->first, &writefds)) {
-      d_iter->second.d_callback(d_iter->first, d_iter->second.d_parameter);
-    }
-  }
-
-  d_inrun=false;
-  return 0;
-}
-
-#if 0
-
-void acceptData(int fd, boost::any& parameter)
-{
-  cout<<"Have data on fd "<<fd<<endl;
-  Socket* sock=boost::any_cast<Socket*>(parameter);
-  string packet;
-  IPEndpoint rem;
-  sock->recvFrom(packet, rem);
-  cout<<"Received "<<packet.size()<<" bytes!\n";
-}
-
-
-int main()
-{
-  Socket s(AF_INET, SOCK_DGRAM);
-  
-  IPEndpoint loc("0.0.0.0", 2000);
-  s.bind(loc);
-
-  SelectFDMultiplexer sfm;
-
-  sfm.addReadFD(s.getHandle(), &acceptData, &s);
-
-  for(int n=0; n < 100 ; ++n) {
-    sfm.run();
-  }
-  sfm.removeReadFD(s.getHandle());
-  sfm.removeReadFD(s.getHandle());
-}
-#endif
-