]> granicus.if.org Git - pdns/commitdiff
dnsdist: Add `setTCPUseSinglePipe()` to use a single TCP waiting queue
authorRemi Gacogne <remi.gacogne@powerdns.com>
Fri, 23 Dec 2016 14:56:17 +0000 (15:56 +0100)
committerRemi Gacogne <remi.gacogne@powerdns.com>
Fri, 20 Jan 2017 10:16:52 +0000 (11:16 +0100)
By default, every TCP worker thread has its own queue, and incoming TCP
connections are dispatched to TCP workers on a round-robin basis. This might
cause issues if some connections are taking a very long time, since incoming
ones will be waiting until the TCP worker they have been assigned to has finished
handling its current query, while other TCP workers might be available.
This experimental `setTCPUseSinglePipe(true)` directive can be used so that all the
incoming TCP connections are put into a single queue and handled by the
first TCP worker available.

pdns/README-dnsdist.md
pdns/dnsdist-console.cc
pdns/dnsdist-lua.cc
pdns/dnsdist-lua2.cc
pdns/dnsdist-tcp.cc
pdns/dnsdist.cc
pdns/dnsdist.hh

index 7361650755a48954934108cfb582299d860c7a49..16cb6efb28d79ef904865a226763e5d8508aa779 100644 (file)
@@ -947,6 +947,14 @@ they wait to be picked up. The maximum number of queued connections
 can be configured with `setMaxTCPQueuedConnections()` and defaults to 1000.
 Any value larger than 0 will cause new connections to be dropped if there are
 already too many queued.
+By default, every TCP worker thread has its own queue, and the incoming TCP
+connections are dispatched to TCP workers on a round-robin basis. This might
+cause issues if some connections are taking a very long time, since incoming
+ones will be waiting until the TCP worker they have been assigned to has finished
+handling its current query, while other TCP workers might be available.
+The experimental `setTCPUseSinglePipe(true)` directive can be used so that all the
+incoming TCP connections are put into a single queue and handled by the
+first TCP worker available.
 
 When dispatching UDP queries to backend servers, `dnsdist` keeps track of at
 most `n` outstanding queries for each backend. This number `n` can be tuned by
@@ -1528,6 +1536,7 @@ instantiate a server with additional parameters
     * `setCacheCleaningDelay(n)`: set the interval in seconds between two runs of the cache cleaning algorithm, removing expired entries
     * `setCacheCleaningPercentage(n)`: set the percentage of the cache that the cache cleaning algorithm will try to free by removing expired entries. By default (100), all expired entries are removed
     * `setStaleCacheEntriesTTL(n)`: allows using cache entries expired for at most `n` seconds when no backend available to answer for a query
+    * `setTCPUseSinglePipe(bool)`: whether the incoming TCP connections should be put into a single queue instead of using per-thread queues. Defaults to false
     * `setTCPRecvTimeout(n)`: set the read timeout on TCP connections from the client, in seconds
     * `setTCPSendTimeout(n)`: set the write timeout on TCP connections from the client, in seconds
     * `setUDPTimeout(n)`: set the maximum time dnsdist will wait for a response from a backend over UDP, in seconds. Defaults to 2
index 91fc7a9baf7f6d987d1241b80149e6944571e1d9..f3a9f60ed70ca9e8468862ef9c21e57dd2e4556c 100644 (file)
@@ -370,6 +370,7 @@ const std::vector<ConsoleKeyword> g_consoleKeywords{
   { "setServerPolicy", true, "policy", "set server selection policy to that policy" },
   { "setServerPolicyLua", true, "name, function", "set server selection policy to one named 'name' and provided by 'function'" },
   { "setServFailWhenNoServer", true, "bool", "if set, return a ServFail when no servers are available, instead of the default behaviour of dropping the query" },
+  { "setTCPUseSinglePipe", true, "bool", "whether the incoming TCP connections should be put into a single queue instead of using per-thread queues. Defaults to false" },
   { "setTCPRecvTimeout", true, "n", "set the read timeout on TCP connections from the client, in seconds" },
   { "setTCPSendTimeout", true, "n", "set the write timeout on TCP connections from the client, in seconds" },
   { "setUDPTimeout", true, "n", "set the maximum time dnsdist will wait for a response from a backend over UDP, in seconds" },
index 5215300bf7a2936d1d52279850316455f64aa080..dae759e7b2b8cff744f288534874081ba09c59dc 100644 (file)
@@ -1603,6 +1603,7 @@ vector<std::function<void(void)>> setupLua(bool client, const std::string& confi
       boost::format fmt("%-10d %-10d %-10d %-10d\n");
       g_outputBuffer += (fmt % "Clients" % "MaxClients" % "Queued" % "MaxQueued").str();
       g_outputBuffer += (fmt % g_tcpclientthreads->getThreadsCount() % g_maxTCPClientThreads % g_tcpclientthreads->getQueuedCount() % g_maxTCPQueuedConnections).str();
+      g_outputBuffer += "Query distribution mode is: " + std::string(g_useTCPSinglePipe ? "single queue" : "per-thread queues") + "\n";
     });
 
   g_lua.writeFunction("setCacheCleaningDelay", [](uint32_t delay) { g_cacheCleaningDelay = delay; });
index 0350806ed3cc52c0d5911fdf3e7772b0eb981f7e..c71743013862896fd53392cd4301ababbd3db911 100644 (file)
@@ -1165,4 +1165,13 @@ void moreLua(bool client)
         g_hashperturb = pertub;
       });
 
+    g_lua.writeFunction("setTCPUseSinglePipe", [](bool flag) {
+        if (g_configurationDone) {
+          g_outputBuffer="setTCPUseSinglePipe() cannot be used at runtime!\n";
+          return;
+        }
+        setLuaSideEffect();
+        g_useTCPSinglePipe = flag;
+      });
+
 }
index 6410de8bb4a7c58b3f73eb98b69c3fbe7d7e3caf..39c03f0e8fc8366c574c706ec9f73ef6df23ac38 100644 (file)
@@ -90,6 +90,7 @@ size_t g_maxTCPConnectionDuration{0};
 size_t g_maxTCPConnectionsPerClient{0};
 static std::mutex tcpClientsCountMutex;
 static std::map<ComboAddress,size_t,ComboAddress::addressOnlyLessThan> tcpClientsCount;
+bool g_useTCPSinglePipe{false};
 
 void* tcpClientThread(int pipefd);
 
@@ -106,19 +107,26 @@ static void decrementTCPClientCount(const ComboAddress& client)
 
 void TCPClientCollection::addTCPClientThread()
 {
+  int pipefds[2] = { -1, -1};
+
   vinfolog("Adding TCP Client thread");
 
-  int pipefds[2] = { -1, -1};
-  if (pipe(pipefds) < 0) {
-    errlog("Error creating the TCP thread communication pipe: %s", strerror(errno));
-    return;
+  if (d_useSinglePipe) {
+    pipefds[0] = d_singlePipe[0];
+    pipefds[1] = d_singlePipe[1];
   }
+  else {
+    if (pipe(pipefds) < 0) {
+      errlog("Error creating the TCP thread communication pipe: %s", strerror(errno));
+      return;
+    }
 
-  if (!setNonBlocking(pipefds[1])) {
-    close(pipefds[0]);
-    close(pipefds[1]);
-    errlog("Error setting the TCP thread communication pipe non-blocking: %s", strerror(errno));
-    return;
+    if (!setNonBlocking(pipefds[1])) {
+      close(pipefds[0]);
+      close(pipefds[1]);
+      errlog("Error setting the TCP thread communication pipe non-blocking: %s", strerror(errno));
+      return;
+    }
   }
 
   {
@@ -126,8 +134,10 @@ void TCPClientCollection::addTCPClientThread()
 
     if (d_numthreads >= d_tcpclientthreads.capacity()) {
       warnlog("Adding a new TCP client thread would exceed the vector capacity (%d/%d), skipping", d_numthreads.load(), d_tcpclientthreads.capacity());
-      close(pipefds[0]);
-      close(pipefds[1]);
+      if (!d_useSinglePipe) {
+        close(pipefds[0]);
+        close(pipefds[1]);
+      }
       return;
     }
 
@@ -138,8 +148,10 @@ void TCPClientCollection::addTCPClientThread()
     catch(const std::runtime_error& e) {
       /* the thread creation failed, don't leak */
       errlog("Error creating a TCP thread: %s", e.what());
-      close(pipefds[0]);
-      close(pipefds[1]);
+      if (!d_useSinglePipe) {
+        close(pipefds[0]);
+        close(pipefds[1]);
+      }
       return;
     }
 
index 5627cb6ae615ff62997bd0a6030c88e5dfa225db..68e1009bd833701469e4afcec56a884eae13f73f 100644 (file)
@@ -2070,7 +2070,7 @@ try
   /* this need to be done _after_ dropping privileges */
   g_delay = new DelayPipe<DelayedPacket>();
 
-  g_tcpclientthreads = std::make_shared<TCPClientCollection>(g_maxTCPClientThreads);
+  g_tcpclientthreads = std::make_shared<TCPClientCollection>(g_maxTCPClientThreads, g_useTCPSinglePipe);
 
   for(auto& t : todo)
     t();
index 851190939c8f59a15607ce956784732652f91261..1be7cf146d65385c6a8f493b37a7ece85fdcaf8b 100644 (file)
@@ -383,12 +383,30 @@ class TCPClientCollection {
   std::atomic<uint64_t> d_queued{0};
   uint64_t d_maxthreads{0};
   std::mutex d_mutex;
+  int d_singlePipe[2];
+  bool d_useSinglePipe;
 public:
 
-  TCPClientCollection(size_t maxThreads)
+  TCPClientCollection(size_t maxThreads, bool useSinglePipe=false): d_maxthreads(maxThreads), d_useSinglePipe(useSinglePipe)
   {
     d_maxthreads = maxThreads;
     d_tcpclientthreads.reserve(maxThreads);
+
+    if (d_useSinglePipe) {
+      if (pipe(d_singlePipe) < 0) {
+        throw std::runtime_error("Error creating the TCP single communication pipe: " + string(strerror(errno)));
+      }
+      if (!setNonBlocking(d_singlePipe[1])) {
+        int err = errno;
+        close(d_singlePipe[0]);
+        close(d_singlePipe[1]);
+        throw std::runtime_error("Error setting the TCP single communication pipe non-blocking: " + string(strerror(err)));
+      }
+    }
+    else {
+      d_singlePipe[0] = -1;
+      d_singlePipe[1] = -1;
+    }
   }
   int getThread()
   {
@@ -639,6 +657,7 @@ extern bool g_apiReadWrite;
 extern std::string g_apiConfigDirectory;
 extern bool g_servFailOnNoPolicy;
 extern uint32_t g_hashperturb;
+extern bool g_useTCPSinglePipe;
 
 struct ConsoleKeyword {
   std::string name;