From: Remi Gacogne Date: Thu, 17 Jan 2019 09:41:33 +0000 (+0100) Subject: rec: Try another worker before failing if the first pipe was full X-Git-Tag: rec-4.2.0-alpha1~24^2 X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=592d7ade3824a43b20f3b6ad39692415edcbd508;p=pdns rec: Try another worker before failing if the first pipe was full When the recursor is configured to distribute incoming queries to its worker threads itself, and in the unlikely case that the pipe to the selected thread is full, randomly select another worker to handle the query. If this one fails too, we give up. --- diff --git a/pdns/pdns_recursor.cc b/pdns/pdns_recursor.cc index ca40f1bb9..643b21883 100644 --- a/pdns/pdns_recursor.cc +++ b/pdns/pdns_recursor.cc @@ -2806,17 +2806,8 @@ void broadcastFunction(const pipefunc_t& func) } } -// This function is only called by the distributor threads, when pdns-distributes-queries is set -void distributeAsyncFunction(const string& packet, const pipefunc_t& func) +static bool trySendingQueryToWorker(unsigned int target, ThreadMSG* tmsg) { - if (!isDistributorThread()) { - g_log<func = func; - tmsg->wantAnswer = false; ssize_t written = write(tps.writeQueriesToThread, &tmsg, sizeof(tmsg)); if (written > 0) { @@ -2837,13 +2825,45 @@ void distributeAsyncFunction(const string& packet, const pipefunc_t& func) } else { int error = errno; - delete tmsg; if (error == EAGAIN || error == EWOULDBLOCK) { - g_stats.queryPipeFullDrops++; + return false; } else { + delete tmsg; unixDie("write to thread pipe returned wrong size or error:" + std::to_string(error)); } } + + return true; +} + +// This function is only called by the distributor threads, when pdns-distributes-queries is set +void distributeAsyncFunction(const string& packet, const pipefunc_t& func) +{ + if (!isDistributorThread()) { + g_log<func = func; + tmsg->wantAnswer = false; + + if (!trySendingQueryToWorker(target, tmsg)) { + /* if this function failed but did not raise an exception, it means that the pipe + was full, let's try another one */ + unsigned int newTarget = 0; + do { + newTarget = /* skip handler */ 1 + g_numDistributorThreads + dns_random(g_numWorkerThreads); + } while (newTarget == target); + + if (!trySendingQueryToWorker(newTarget, tmsg)) { + g_stats.queryPipeFullDrops++; + delete tmsg; + } + } } static void handlePipeRequest(int fd, FDMultiplexer::funcparam_t& var)