From: Remi Gacogne Date: Fri, 23 Dec 2016 14:56:17 +0000 (+0100) Subject: dnsdist: Add `setTCPUseSinglePipe()` to use a single TCP waiting queue X-Git-Tag: rec-4.1.0-alpha1~297^2~1 X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=edbda1ad4966504a0acd9d07e41bbd363c795ee7;p=pdns dnsdist: Add `setTCPUseSinglePipe()` to use a single TCP waiting queue By default, every TCP worker thread has its own queue, and incoming TCP connections are dispatched to TCP workers on a round-robin basis. This might cause issues if some connections are taking a very long time, since incoming ones will be waiting until the TCP worker they have been assigned to has finished handling its current query, while other TCP workers might be available. This experimental `setTCPUseSinglePipe(true)` directive can be used so that all the incoming TCP connections are put into a single queue and handled by the first TCP worker available. --- diff --git a/pdns/README-dnsdist.md b/pdns/README-dnsdist.md index 736165075..16cb6efb2 100644 --- a/pdns/README-dnsdist.md +++ b/pdns/README-dnsdist.md @@ -947,6 +947,14 @@ they wait to be picked up. The maximum number of queued connections can be configured with `setMaxTCPQueuedConnections()` and defaults to 1000. Any value larger than 0 will cause new connections to be dropped if there are already too many queued. +By default, every TCP worker thread has its own queue, and the incoming TCP +connections are dispatched to TCP workers on a round-robin basis. This might +cause issues if some connections are taking a very long time, since incoming +ones will be waiting until the TCP worker they have been assigned to has finished +handling its current query, while other TCP workers might be available. +The experimental `setTCPUseSinglePipe(true)` directive can be used so that all the +incoming TCP connections are put into a single queue and handled by the +first TCP worker available. When dispatching UDP queries to backend servers, `dnsdist` keeps track of at most `n` outstanding queries for each backend. This number `n` can be tuned by @@ -1528,6 +1536,7 @@ instantiate a server with additional parameters * `setCacheCleaningDelay(n)`: set the interval in seconds between two runs of the cache cleaning algorithm, removing expired entries * `setCacheCleaningPercentage(n)`: set the percentage of the cache that the cache cleaning algorithm will try to free by removing expired entries. By default (100), all expired entries are removed * `setStaleCacheEntriesTTL(n)`: allows using cache entries expired for at most `n` seconds when no backend available to answer for a query + * `setTCPUseSinglePipe(bool)`: whether the incoming TCP connections should be put into a single queue instead of using per-thread queues. Defaults to false * `setTCPRecvTimeout(n)`: set the read timeout on TCP connections from the client, in seconds * `setTCPSendTimeout(n)`: set the write timeout on TCP connections from the client, in seconds * `setUDPTimeout(n)`: set the maximum time dnsdist will wait for a response from a backend over UDP, in seconds. Defaults to 2 diff --git a/pdns/dnsdist-console.cc b/pdns/dnsdist-console.cc index 91fc7a9ba..f3a9f60ed 100644 --- a/pdns/dnsdist-console.cc +++ b/pdns/dnsdist-console.cc @@ -370,6 +370,7 @@ const std::vector g_consoleKeywords{ { "setServerPolicy", true, "policy", "set server selection policy to that policy" }, { "setServerPolicyLua", true, "name, function", "set server selection policy to one named 'name' and provided by 'function'" }, { "setServFailWhenNoServer", true, "bool", "if set, return a ServFail when no servers are available, instead of the default behaviour of dropping the query" }, + { "setTCPUseSinglePipe", true, "bool", "whether the incoming TCP connections should be put into a single queue instead of using per-thread queues. Defaults to false" }, { "setTCPRecvTimeout", true, "n", "set the read timeout on TCP connections from the client, in seconds" }, { "setTCPSendTimeout", true, "n", "set the write timeout on TCP connections from the client, in seconds" }, { "setUDPTimeout", true, "n", "set the maximum time dnsdist will wait for a response from a backend over UDP, in seconds" }, diff --git a/pdns/dnsdist-lua.cc b/pdns/dnsdist-lua.cc index 5215300bf..dae759e7b 100644 --- a/pdns/dnsdist-lua.cc +++ b/pdns/dnsdist-lua.cc @@ -1603,6 +1603,7 @@ vector> setupLua(bool client, const std::string& confi boost::format fmt("%-10d %-10d %-10d %-10d\n"); g_outputBuffer += (fmt % "Clients" % "MaxClients" % "Queued" % "MaxQueued").str(); g_outputBuffer += (fmt % g_tcpclientthreads->getThreadsCount() % g_maxTCPClientThreads % g_tcpclientthreads->getQueuedCount() % g_maxTCPQueuedConnections).str(); + g_outputBuffer += "Query distribution mode is: " + std::string(g_useTCPSinglePipe ? "single queue" : "per-thread queues") + "\n"; }); g_lua.writeFunction("setCacheCleaningDelay", [](uint32_t delay) { g_cacheCleaningDelay = delay; }); diff --git a/pdns/dnsdist-lua2.cc b/pdns/dnsdist-lua2.cc index 0350806ed..c71743013 100644 --- a/pdns/dnsdist-lua2.cc +++ b/pdns/dnsdist-lua2.cc @@ -1165,4 +1165,13 @@ void moreLua(bool client) g_hashperturb = pertub; }); + g_lua.writeFunction("setTCPUseSinglePipe", [](bool flag) { + if (g_configurationDone) { + g_outputBuffer="setTCPUseSinglePipe() cannot be used at runtime!\n"; + return; + } + setLuaSideEffect(); + g_useTCPSinglePipe = flag; + }); + } diff --git a/pdns/dnsdist-tcp.cc b/pdns/dnsdist-tcp.cc index 6410de8bb..39c03f0e8 100644 --- a/pdns/dnsdist-tcp.cc +++ b/pdns/dnsdist-tcp.cc @@ -90,6 +90,7 @@ size_t g_maxTCPConnectionDuration{0}; size_t g_maxTCPConnectionsPerClient{0}; static std::mutex tcpClientsCountMutex; static std::map tcpClientsCount; +bool g_useTCPSinglePipe{false}; void* tcpClientThread(int pipefd); @@ -106,19 +107,26 @@ static void decrementTCPClientCount(const ComboAddress& client) void TCPClientCollection::addTCPClientThread() { + int pipefds[2] = { -1, -1}; + vinfolog("Adding TCP Client thread"); - int pipefds[2] = { -1, -1}; - if (pipe(pipefds) < 0) { - errlog("Error creating the TCP thread communication pipe: %s", strerror(errno)); - return; + if (d_useSinglePipe) { + pipefds[0] = d_singlePipe[0]; + pipefds[1] = d_singlePipe[1]; } + else { + if (pipe(pipefds) < 0) { + errlog("Error creating the TCP thread communication pipe: %s", strerror(errno)); + return; + } - if (!setNonBlocking(pipefds[1])) { - close(pipefds[0]); - close(pipefds[1]); - errlog("Error setting the TCP thread communication pipe non-blocking: %s", strerror(errno)); - return; + if (!setNonBlocking(pipefds[1])) { + close(pipefds[0]); + close(pipefds[1]); + errlog("Error setting the TCP thread communication pipe non-blocking: %s", strerror(errno)); + return; + } } { @@ -126,8 +134,10 @@ void TCPClientCollection::addTCPClientThread() if (d_numthreads >= d_tcpclientthreads.capacity()) { warnlog("Adding a new TCP client thread would exceed the vector capacity (%d/%d), skipping", d_numthreads.load(), d_tcpclientthreads.capacity()); - close(pipefds[0]); - close(pipefds[1]); + if (!d_useSinglePipe) { + close(pipefds[0]); + close(pipefds[1]); + } return; } @@ -138,8 +148,10 @@ void TCPClientCollection::addTCPClientThread() catch(const std::runtime_error& e) { /* the thread creation failed, don't leak */ errlog("Error creating a TCP thread: %s", e.what()); - close(pipefds[0]); - close(pipefds[1]); + if (!d_useSinglePipe) { + close(pipefds[0]); + close(pipefds[1]); + } return; } diff --git a/pdns/dnsdist.cc b/pdns/dnsdist.cc index 5627cb6ae..68e1009bd 100644 --- a/pdns/dnsdist.cc +++ b/pdns/dnsdist.cc @@ -2070,7 +2070,7 @@ try /* this need to be done _after_ dropping privileges */ g_delay = new DelayPipe(); - g_tcpclientthreads = std::make_shared(g_maxTCPClientThreads); + g_tcpclientthreads = std::make_shared(g_maxTCPClientThreads, g_useTCPSinglePipe); for(auto& t : todo) t(); diff --git a/pdns/dnsdist.hh b/pdns/dnsdist.hh index 851190939..1be7cf146 100644 --- a/pdns/dnsdist.hh +++ b/pdns/dnsdist.hh @@ -383,12 +383,30 @@ class TCPClientCollection { std::atomic d_queued{0}; uint64_t d_maxthreads{0}; std::mutex d_mutex; + int d_singlePipe[2]; + bool d_useSinglePipe; public: - TCPClientCollection(size_t maxThreads) + TCPClientCollection(size_t maxThreads, bool useSinglePipe=false): d_maxthreads(maxThreads), d_useSinglePipe(useSinglePipe) { d_maxthreads = maxThreads; d_tcpclientthreads.reserve(maxThreads); + + if (d_useSinglePipe) { + if (pipe(d_singlePipe) < 0) { + throw std::runtime_error("Error creating the TCP single communication pipe: " + string(strerror(errno))); + } + if (!setNonBlocking(d_singlePipe[1])) { + int err = errno; + close(d_singlePipe[0]); + close(d_singlePipe[1]); + throw std::runtime_error("Error setting the TCP single communication pipe non-blocking: " + string(strerror(err))); + } + } + else { + d_singlePipe[0] = -1; + d_singlePipe[1] = -1; + } } int getThread() { @@ -639,6 +657,7 @@ extern bool g_apiReadWrite; extern std::string g_apiConfigDirectory; extern bool g_servFailOnNoPolicy; extern uint32_t g_hashperturb; +extern bool g_useTCPSinglePipe; struct ConsoleKeyword { std::string name;