]> granicus.if.org Git - pdns/commitdiff
rec: Try another worker before failing if the first pipe was full
authorRemi Gacogne <remi.gacogne@powerdns.com>
Thu, 17 Jan 2019 09:41:33 +0000 (10:41 +0100)
committerRemi Gacogne <remi.gacogne@powerdns.com>
Thu, 17 Jan 2019 09:41:33 +0000 (10:41 +0100)
When the recursor is configured to distribute incoming queries to
its worker threads itself, and in the unlikely case that the pipe
to the selected thread is full, randomly select another worker to
handle the query. If this one fails too, we give up.

pdns/pdns_recursor.cc

index ca40f1bb9248eb3ec18dd95ec1c2845327eaa684..643b218832388f3b145f6ad09f9c61692572cdaa 100644 (file)
@@ -2806,17 +2806,8 @@ void broadcastFunction(const pipefunc_t& func)
   }
 }
 
-// This function is only called by the distributor threads, when pdns-distributes-queries is set
-void distributeAsyncFunction(const string& packet, const pipefunc_t& func)
+static bool trySendingQueryToWorker(unsigned int target, ThreadMSG* tmsg)
 {
-  if (!isDistributorThread()) {
-    g_log<<Logger::Error<<"distributeAsyncFunction() has been called by a worker ("<<t_id<<")"<<endl;
-    exit(1);
-  }
-
-  unsigned int hash = hashQuestion(packet.c_str(), packet.length(), g_disthashseed);
-  unsigned int target = /* skip handler */ 1 + g_numDistributorThreads + (hash % g_numWorkerThreads);
-
   const auto& targetInfo = s_threadInfos[target];
   if(!targetInfo.isWorker) {
     g_log<<Logger::Error<<"distributeAsyncFunction() tried to assign a query to a non-worker thread"<<endl;
@@ -2824,9 +2815,6 @@ void distributeAsyncFunction(const string& packet, const pipefunc_t& func)
   }
 
   const auto& tps = targetInfo.pipes;
-  ThreadMSG* tmsg = new ThreadMSG();
-  tmsg->func = func;
-  tmsg->wantAnswer = false;
 
   ssize_t written = write(tps.writeQueriesToThread, &tmsg, sizeof(tmsg));
   if (written > 0) {
@@ -2837,13 +2825,45 @@ void distributeAsyncFunction(const string& packet, const pipefunc_t& func)
   }
   else {
     int error = errno;
-    delete tmsg;
     if (error == EAGAIN || error == EWOULDBLOCK) {
-      g_stats.queryPipeFullDrops++;
+      return false;
     } else {
+      delete tmsg;
       unixDie("write to thread pipe returned wrong size or error:" + std::to_string(error));
     }
   }
+
+  return true;
+}
+
+// This function is only called by the distributor threads, when pdns-distributes-queries is set
+void distributeAsyncFunction(const string& packet, const pipefunc_t& func)
+{
+  if (!isDistributorThread()) {
+    g_log<<Logger::Error<<"distributeAsyncFunction() has been called by a worker ("<<t_id<<")"<<endl;
+    exit(1);
+  }
+
+  unsigned int hash = hashQuestion(packet.c_str(), packet.length(), g_disthashseed);
+  unsigned int target = /* skip handler */ 1 + g_numDistributorThreads + (hash % g_numWorkerThreads);
+
+  ThreadMSG* tmsg = new ThreadMSG();
+  tmsg->func = func;
+  tmsg->wantAnswer = false;
+
+  if (!trySendingQueryToWorker(target, tmsg)) {
+    /* if this function failed but did not raise an exception, it means that the pipe
+       was full, let's try another one */
+    unsigned int newTarget = 0;
+    do {
+      newTarget = /* skip handler */ 1 + g_numDistributorThreads + dns_random(g_numWorkerThreads);
+    } while (newTarget == target);
+
+    if (!trySendingQueryToWorker(newTarget, tmsg)) {
+      g_stats.queryPipeFullDrops++;
+      delete tmsg;
+    }
+  }
 }
 
 static void handlePipeRequest(int fd, FDMultiplexer::funcparam_t& var)