]> granicus.if.org Git - pdns/commitdiff
rec: Use a separate, non-blocking pipe to distribute queries
authorRemi Gacogne <remi.gacogne@powerdns.com>
Thu, 3 May 2018 11:17:50 +0000 (12:17 +0100)
committerRemi Gacogne <remi.gacogne@powerdns.com>
Thu, 3 May 2018 11:17:50 +0000 (12:17 +0100)
This allows us to drop queries when a pipe goes full, thus still
distributing queries to other threads instead of blocking. It also
adds a new metric to keep track of queries dropped because the pipe
was full.

pdns/pdns_recursor.cc
pdns/rec-snmp.cc
pdns/rec_channel_rec.cc
pdns/recursordist/RECURSOR-MIB.txt
pdns/recursordist/docs/metrics.rst
pdns/syncres.hh

index 78c85174ec8335c9dae7809db1c9e37e32f40888..996c4c059355dde44170b067300de9c2d0390969 100644 (file)
@@ -128,6 +128,8 @@ struct ThreadPipeSet
   int readToThread;
   int writeFromThread;
   int readFromThread;
+  int writeQueriesToThread; // this one is non-blocking
+  int readQueriesToThread;
 };
 
 typedef vector<int> tcpListenSockets_t;
@@ -2409,6 +2411,12 @@ static void makeThreadPipes()
     tps.readFromThread = fd[0];
     tps.writeFromThread = fd[1];
 
+    if(pipe(fd) < 0)
+      unixDie("Creating pipe for inter-thread communications");
+    tps.readQueriesToThread = fd[0];
+    tps.writeQueriesToThread = fd[1];
+    setNonBlocking(tps.writeQueriesToThread);
+
     g_pipes.push_back(tps);
   }
 }
@@ -2463,9 +2471,21 @@ void distributeAsyncFunction(const string& packet, const pipefunc_t& func)
   tmsg->func = func;
   tmsg->wantAnswer = false;
 
-  if(write(tps.writeToThread, &tmsg, sizeof(tmsg)) != sizeof(tmsg)) {
+  ssize_t written = write(tps.writeQueriesToThread, &tmsg, sizeof(tmsg));
+  if (written > 0) {
+    if (static_cast<size_t>(written) != sizeof(tmsg)) {
+      delete tmsg;
+      unixDie("write to thread pipe returned wrong size or error");
+    }
+  }
+  else {
+    int error = errno;
     delete tmsg;
-    unixDie("write to thread pipe returned wrong size or error");
+    if (error == EAGAIN || error == EWOULDBLOCK) {
+      g_stats.queryPipeFullDrops++;
+    } else {
+      unixDie("write to thread pipe returned wrong size or error:" + error);
+    }
   }
 }
 
@@ -2473,7 +2493,7 @@ static void handlePipeRequest(int fd, FDMultiplexer::funcparam_t& var)
 {
   ThreadMSG* tmsg = nullptr;
 
-  if(read(fd, &tmsg, sizeof(tmsg)) != sizeof(tmsg)) { // fd == readToThread
+  if(read(fd, &tmsg, sizeof(tmsg)) != sizeof(tmsg)) { // fd == readToThread || fd == readQueriesToThread
     unixDie("read from thread pipe returned wrong size or error");
   }
 
@@ -3448,6 +3468,7 @@ try
   }
 
   t_fdm->addReadFD(g_pipes[t_id].readToThread, handlePipeRequest);
+  t_fdm->addReadFD(g_pipes[t_id].readQueriesToThread, handlePipeRequest);
 
   if(g_useOneSocketPerThread) {
     for(deferredAdd_t::const_iterator i = deferredAdds[t_id].cbegin(); i != deferredAdds[t_id].cend(); ++i) {
index 537e1852b316baf8a9a66cb9ebfa9eb44f5af21d..f240550f0d1e12bf435a49cea23cdb7fac9d11ea 100644 (file)
@@ -107,6 +107,7 @@ static const oid policyResultNxdomainOID[] = { RECURSOR_STATS_OID, 88 };
 static const oid policyResultNodataOID[] = { RECURSOR_STATS_OID, 89 };
 static const oid policyResultTruncateOID[] = { RECURSOR_STATS_OID, 90 };
 static const oid policyResultCustomOID[] = { RECURSOR_STATS_OID, 91 };
+static const oid queryPipeFullDropsOID[] = { RECURSOR_STATS_OID, 92 };
 
 static std::unordered_map<oid, std::string> s_statsMap;
 
@@ -218,6 +219,7 @@ RecursorSNMPAgent::RecursorSNMPAgent(const std::string& name, const std::string&
   registerCounter64Stat("client-parse-errors", clientParseErrorsOID, OID_LENGTH(clientParseErrorsOID));
   registerCounter64Stat("server-parse-errors", serverParseErrorsOID, OID_LENGTH(serverParseErrorsOID));
   registerCounter64Stat("too-old-drops", tooOldDropsOID, OID_LENGTH(tooOldDropsOID));
+  registerCounter64Stat("query-pipe-full-drops", queryPipeFullDropsOID, OID_LENGTH(queryPipeFullDropsOID));
   registerCounter64Stat("answers0-1", answers01OID, OID_LENGTH(answers01OID));
   registerCounter64Stat("answers1-10", answers110OID, OID_LENGTH(answers110OID));
   registerCounter64Stat("answers10-100", answers10100OID, OID_LENGTH(answers10100OID));
index 40e287e68af525e762adc74a538f574ac77feaea..3215c497bb16d424ffa35b5c81d4c4cf172b742e 100644 (file)
@@ -870,6 +870,7 @@ void registerAllStats()
   addGetStat("client-parse-errors", &g_stats.clientParseError);
   addGetStat("server-parse-errors", &g_stats.serverParseError);
   addGetStat("too-old-drops", &g_stats.tooOldDrops);
+  addGetStat("query-pipe-full-drops", &g_stats.queryPipeFullDrops);
 
   addGetStat("answers0-1", &g_stats.answers0_1);
   addGetStat("answers1-10", &g_stats.answers1_10);
index ccbacf889d077939dbe759bb12672182de79d1c2..d190126deaf37e3c4c403c4069d4834716cbcb73 100644 (file)
@@ -758,6 +758,14 @@ policyResultCustom OBJECT-TYPE
         "Number of policy-mandated custom results"
     ::= { stats 91 }
 
+queryPipeFullDrops OBJECT-TYPE
+    SYNTAX Counter64
+    MAX-ACCESS read-only
+    STATUS current
+    DESCRIPTION
+        "Number of responses dropped because the query distribution pipe was full"
+    ::= { stats 92 }
+
 ---
 --- Traps / Notifications
 ---
@@ -891,6 +899,7 @@ recGroup OBJECT-GROUP
         policyResultNodata,
         policyResultTruncate,
         policyResultCustom,
+        queryPipeFullDrops,
         trapReason
     }
     STATUS current
index 71b90c4b2388ec5dd4a7c65f226d6ac9415e8998..f10cec03888669aa31574e83372140ceb04e6a80 100644 (file)
@@ -360,6 +360,12 @@ qa-latency
 ^^^^^^^^^^
 shows the current latency average, in microseconds,   exponentially weighted over past 'latency-statistic-size' packets
 
+query-pipe-full-drops
+^^^^^^^^^^^^^^^^^^^^^
+.. versionadded:: 4.2
+
+questions dropped because the query distribution pipe was full
+
 questions
 ^^^^^^^^^
 counts all end-user initiated queries with the RD bit   set
index b10b440761a73177ba733b89821f9f0f116c0a7c..3abc485b82b1ebd7e7e4a9398a884283b9f6f9ed 100644 (file)
@@ -905,6 +905,7 @@ struct RecursorStats
   std::atomic<uint64_t> clientParseError;
   std::atomic<uint64_t> serverParseError;
   std::atomic<uint64_t> tooOldDrops;
+  std::atomic<uint64_t> queryPipeFullDrops;
   std::atomic<uint64_t> unexpectedCount;
   std::atomic<uint64_t> caseMismatchCount;
   std::atomic<uint64_t> spoofCount;