can be configured with `setMaxTCPQueuedConnections()` and defaults to 1000.
Any value larger than 0 will cause new connections to be dropped if there are
already too many queued.
+By default, every TCP worker thread has its own queue, and the incoming TCP
+connections are dispatched to TCP workers on a round-robin basis. This might
+cause issues if some connections are taking a very long time, since incoming
+ones will be waiting until the TCP worker they have been assigned to has finished
+handling its current query, while other TCP workers might be available.
+The experimental `setTCPUseSinglePipe(true)` directive can be used so that all the
+incoming TCP connections are put into a single queue and handled by the
+first TCP worker available.
When dispatching UDP queries to backend servers, `dnsdist` keeps track of at
most `n` outstanding queries for each backend. This number `n` can be tuned by
* `setCacheCleaningDelay(n)`: set the interval in seconds between two runs of the cache cleaning algorithm, removing expired entries
* `setCacheCleaningPercentage(n)`: set the percentage of the cache that the cache cleaning algorithm will try to free by removing expired entries. By default (100), all expired entries are removed
* `setStaleCacheEntriesTTL(n)`: allows using cache entries expired for at most `n` seconds when no backend available to answer for a query
+ * `setTCPUseSinglePipe(bool)`: whether the incoming TCP connections should be put into a single queue instead of using per-thread queues. Defaults to false
* `setTCPRecvTimeout(n)`: set the read timeout on TCP connections from the client, in seconds
* `setTCPSendTimeout(n)`: set the write timeout on TCP connections from the client, in seconds
* `setUDPTimeout(n)`: set the maximum time dnsdist will wait for a response from a backend over UDP, in seconds. Defaults to 2
{ "setServerPolicy", true, "policy", "set server selection policy to that policy" },
{ "setServerPolicyLua", true, "name, function", "set server selection policy to one named 'name' and provided by 'function'" },
{ "setServFailWhenNoServer", true, "bool", "if set, return a ServFail when no servers are available, instead of the default behaviour of dropping the query" },
+ { "setTCPUseSinglePipe", true, "bool", "whether the incoming TCP connections should be put into a single queue instead of using per-thread queues. Defaults to false" },
{ "setTCPRecvTimeout", true, "n", "set the read timeout on TCP connections from the client, in seconds" },
{ "setTCPSendTimeout", true, "n", "set the write timeout on TCP connections from the client, in seconds" },
{ "setUDPTimeout", true, "n", "set the maximum time dnsdist will wait for a response from a backend over UDP, in seconds" },
boost::format fmt("%-10d %-10d %-10d %-10d\n");
g_outputBuffer += (fmt % "Clients" % "MaxClients" % "Queued" % "MaxQueued").str();
g_outputBuffer += (fmt % g_tcpclientthreads->getThreadsCount() % g_maxTCPClientThreads % g_tcpclientthreads->getQueuedCount() % g_maxTCPQueuedConnections).str();
+ g_outputBuffer += "Query distribution mode is: " + std::string(g_useTCPSinglePipe ? "single queue" : "per-thread queues") + "\n";
});
g_lua.writeFunction("setCacheCleaningDelay", [](uint32_t delay) { g_cacheCleaningDelay = delay; });
g_hashperturb = pertub;
});
+ g_lua.writeFunction("setTCPUseSinglePipe", [](bool flag) {
+ if (g_configurationDone) {
+ g_outputBuffer="setTCPUseSinglePipe() cannot be used at runtime!\n";
+ return;
+ }
+ setLuaSideEffect();
+ g_useTCPSinglePipe = flag;
+ });
+
}
size_t g_maxTCPConnectionsPerClient{0};
static std::mutex tcpClientsCountMutex;
static std::map<ComboAddress,size_t,ComboAddress::addressOnlyLessThan> tcpClientsCount;
+bool g_useTCPSinglePipe{false};
void* tcpClientThread(int pipefd);
void TCPClientCollection::addTCPClientThread()
{
+ int pipefds[2] = { -1, -1};
+
vinfolog("Adding TCP Client thread");
- int pipefds[2] = { -1, -1};
- if (pipe(pipefds) < 0) {
- errlog("Error creating the TCP thread communication pipe: %s", strerror(errno));
- return;
+ if (d_useSinglePipe) {
+ pipefds[0] = d_singlePipe[0];
+ pipefds[1] = d_singlePipe[1];
}
+ else {
+ if (pipe(pipefds) < 0) {
+ errlog("Error creating the TCP thread communication pipe: %s", strerror(errno));
+ return;
+ }
- if (!setNonBlocking(pipefds[1])) {
- close(pipefds[0]);
- close(pipefds[1]);
- errlog("Error setting the TCP thread communication pipe non-blocking: %s", strerror(errno));
- return;
+ if (!setNonBlocking(pipefds[1])) {
+ close(pipefds[0]);
+ close(pipefds[1]);
+ errlog("Error setting the TCP thread communication pipe non-blocking: %s", strerror(errno));
+ return;
+ }
}
{
if (d_numthreads >= d_tcpclientthreads.capacity()) {
warnlog("Adding a new TCP client thread would exceed the vector capacity (%d/%d), skipping", d_numthreads.load(), d_tcpclientthreads.capacity());
- close(pipefds[0]);
- close(pipefds[1]);
+ if (!d_useSinglePipe) {
+ close(pipefds[0]);
+ close(pipefds[1]);
+ }
return;
}
catch(const std::runtime_error& e) {
/* the thread creation failed, don't leak */
errlog("Error creating a TCP thread: %s", e.what());
- close(pipefds[0]);
- close(pipefds[1]);
+ if (!d_useSinglePipe) {
+ close(pipefds[0]);
+ close(pipefds[1]);
+ }
return;
}
/* this need to be done _after_ dropping privileges */
g_delay = new DelayPipe<DelayedPacket>();
- g_tcpclientthreads = std::make_shared<TCPClientCollection>(g_maxTCPClientThreads);
+ g_tcpclientthreads = std::make_shared<TCPClientCollection>(g_maxTCPClientThreads, g_useTCPSinglePipe);
for(auto& t : todo)
t();
std::atomic<uint64_t> d_queued{0};
uint64_t d_maxthreads{0};
std::mutex d_mutex;
+ int d_singlePipe[2];
+ bool d_useSinglePipe;
public:
- TCPClientCollection(size_t maxThreads)
+ TCPClientCollection(size_t maxThreads, bool useSinglePipe=false): d_maxthreads(maxThreads), d_useSinglePipe(useSinglePipe)
{
d_maxthreads = maxThreads;
d_tcpclientthreads.reserve(maxThreads);
+
+ if (d_useSinglePipe) {
+ if (pipe(d_singlePipe) < 0) {
+ throw std::runtime_error("Error creating the TCP single communication pipe: " + string(strerror(errno)));
+ }
+ if (!setNonBlocking(d_singlePipe[1])) {
+ int err = errno;
+ close(d_singlePipe[0]);
+ close(d_singlePipe[1]);
+ throw std::runtime_error("Error setting the TCP single communication pipe non-blocking: " + string(strerror(err)));
+ }
+ }
+ else {
+ d_singlePipe[0] = -1;
+ d_singlePipe[1] = -1;
+ }
}
int getThread()
{
extern std::string g_apiConfigDirectory;
extern bool g_servFailOnNoPolicy;
extern uint32_t g_hashperturb;
+extern bool g_useTCPSinglePipe;
struct ConsoleKeyword {
std::string name;