}
}
-// This function is only called by the distributor threads, when pdns-distributes-queries is set
-void distributeAsyncFunction(const string& packet, const pipefunc_t& func)
+static bool trySendingQueryToWorker(unsigned int target, ThreadMSG* tmsg)
{
- if (!isDistributorThread()) {
- g_log<<Logger::Error<<"distributeAsyncFunction() has been called by a worker ("<<t_id<<")"<<endl;
- exit(1);
- }
-
- unsigned int hash = hashQuestion(packet.c_str(), packet.length(), g_disthashseed);
- unsigned int target = /* skip handler */ 1 + g_numDistributorThreads + (hash % g_numWorkerThreads);
-
const auto& targetInfo = s_threadInfos[target];
if(!targetInfo.isWorker) {
g_log<<Logger::Error<<"distributeAsyncFunction() tried to assign a query to a non-worker thread"<<endl;
}
const auto& tps = targetInfo.pipes;
- ThreadMSG* tmsg = new ThreadMSG();
- tmsg->func = func;
- tmsg->wantAnswer = false;
ssize_t written = write(tps.writeQueriesToThread, &tmsg, sizeof(tmsg));
if (written > 0) {
}
else {
int error = errno;
- delete tmsg;
if (error == EAGAIN || error == EWOULDBLOCK) {
- g_stats.queryPipeFullDrops++;
+ return false;
} else {
+ delete tmsg;
unixDie("write to thread pipe returned wrong size or error:" + std::to_string(error));
}
}
+
+ return true;
+}
+
+// This function is only called by the distributor threads, when pdns-distributes-queries is set
+void distributeAsyncFunction(const string& packet, const pipefunc_t& func)
+{
+ if (!isDistributorThread()) {
+ g_log<<Logger::Error<<"distributeAsyncFunction() has been called by a worker ("<<t_id<<")"<<endl;
+ exit(1);
+ }
+
+ unsigned int hash = hashQuestion(packet.c_str(), packet.length(), g_disthashseed);
+ unsigned int target = /* skip handler */ 1 + g_numDistributorThreads + (hash % g_numWorkerThreads);
+
+ ThreadMSG* tmsg = new ThreadMSG();
+ tmsg->func = func;
+ tmsg->wantAnswer = false;
+
+ if (!trySendingQueryToWorker(target, tmsg)) {
+ /* if this function failed but did not raise an exception, it means that the pipe
+ was full, let's try another one */
+ unsigned int newTarget = 0;
+ do {
+ newTarget = /* skip handler */ 1 + g_numDistributorThreads + dns_random(g_numWorkerThreads);
+ } while (newTarget == target);
+
+ if (!trySendingQueryToWorker(newTarget, tmsg)) {
+ g_stats.queryPipeFullDrops++;
+ delete tmsg;
+ }
+ }
}
static void handlePipeRequest(int fd, FDMultiplexer::funcparam_t& var)