From: Remi Gacogne Date: Thu, 3 May 2018 11:17:50 +0000 (+0100) Subject: rec: Use a separate, non-blocking pipe to distribute queries X-Git-Tag: dnsdist-1.3.1~93^2~2 X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=cf8cda180674c58d7a56b528d379aceaf9641746;p=pdns rec: Use a separate, non-blocking pipe to distribute queries This allows us to drop queries when a pipe goes full, thus still distributing queries to other threads instead of blocking. It also adds a new metric to keep track of queries dropped because the pipe was full. --- diff --git a/pdns/pdns_recursor.cc b/pdns/pdns_recursor.cc index 78c85174e..996c4c059 100644 --- a/pdns/pdns_recursor.cc +++ b/pdns/pdns_recursor.cc @@ -128,6 +128,8 @@ struct ThreadPipeSet int readToThread; int writeFromThread; int readFromThread; + int writeQueriesToThread; // this one is non-blocking + int readQueriesToThread; }; typedef vector tcpListenSockets_t; @@ -2409,6 +2411,12 @@ static void makeThreadPipes() tps.readFromThread = fd[0]; tps.writeFromThread = fd[1]; + if(pipe(fd) < 0) + unixDie("Creating pipe for inter-thread communications"); + tps.readQueriesToThread = fd[0]; + tps.writeQueriesToThread = fd[1]; + setNonBlocking(tps.writeQueriesToThread); + g_pipes.push_back(tps); } } @@ -2463,9 +2471,21 @@ void distributeAsyncFunction(const string& packet, const pipefunc_t& func) tmsg->func = func; tmsg->wantAnswer = false; - if(write(tps.writeToThread, &tmsg, sizeof(tmsg)) != sizeof(tmsg)) { + ssize_t written = write(tps.writeQueriesToThread, &tmsg, sizeof(tmsg)); + if (written > 0) { + if (static_cast(written) != sizeof(tmsg)) { + delete tmsg; + unixDie("write to thread pipe returned wrong size or error"); + } + } + else { + int error = errno; delete tmsg; - unixDie("write to thread pipe returned wrong size or error"); + if (error == EAGAIN || error == EWOULDBLOCK) { + g_stats.queryPipeFullDrops++; + } else { + unixDie("write to thread pipe returned wrong size or error:" + error); + } } } @@ -2473,7 +2493,7 @@ static void handlePipeRequest(int fd, FDMultiplexer::funcparam_t& var) { ThreadMSG* tmsg = nullptr; - if(read(fd, &tmsg, sizeof(tmsg)) != sizeof(tmsg)) { // fd == readToThread + if(read(fd, &tmsg, sizeof(tmsg)) != sizeof(tmsg)) { // fd == readToThread || fd == readQueriesToThread unixDie("read from thread pipe returned wrong size or error"); } @@ -3448,6 +3468,7 @@ try } t_fdm->addReadFD(g_pipes[t_id].readToThread, handlePipeRequest); + t_fdm->addReadFD(g_pipes[t_id].readQueriesToThread, handlePipeRequest); if(g_useOneSocketPerThread) { for(deferredAdd_t::const_iterator i = deferredAdds[t_id].cbegin(); i != deferredAdds[t_id].cend(); ++i) { diff --git a/pdns/rec-snmp.cc b/pdns/rec-snmp.cc index 537e1852b..f240550f0 100644 --- a/pdns/rec-snmp.cc +++ b/pdns/rec-snmp.cc @@ -107,6 +107,7 @@ static const oid policyResultNxdomainOID[] = { RECURSOR_STATS_OID, 88 }; static const oid policyResultNodataOID[] = { RECURSOR_STATS_OID, 89 }; static const oid policyResultTruncateOID[] = { RECURSOR_STATS_OID, 90 }; static const oid policyResultCustomOID[] = { RECURSOR_STATS_OID, 91 }; +static const oid queryPipeFullDropsOID[] = { RECURSOR_STATS_OID, 92 }; static std::unordered_map s_statsMap; @@ -218,6 +219,7 @@ RecursorSNMPAgent::RecursorSNMPAgent(const std::string& name, const std::string& registerCounter64Stat("client-parse-errors", clientParseErrorsOID, OID_LENGTH(clientParseErrorsOID)); registerCounter64Stat("server-parse-errors", serverParseErrorsOID, OID_LENGTH(serverParseErrorsOID)); registerCounter64Stat("too-old-drops", tooOldDropsOID, OID_LENGTH(tooOldDropsOID)); + registerCounter64Stat("query-pipe-full-drops", queryPipeFullDropsOID, OID_LENGTH(queryPipeFullDropsOID)); registerCounter64Stat("answers0-1", answers01OID, OID_LENGTH(answers01OID)); registerCounter64Stat("answers1-10", answers110OID, OID_LENGTH(answers110OID)); registerCounter64Stat("answers10-100", answers10100OID, OID_LENGTH(answers10100OID)); diff --git a/pdns/rec_channel_rec.cc b/pdns/rec_channel_rec.cc index 40e287e68..3215c497b 100644 --- a/pdns/rec_channel_rec.cc +++ b/pdns/rec_channel_rec.cc @@ -870,6 +870,7 @@ void registerAllStats() addGetStat("client-parse-errors", &g_stats.clientParseError); addGetStat("server-parse-errors", &g_stats.serverParseError); addGetStat("too-old-drops", &g_stats.tooOldDrops); + addGetStat("query-pipe-full-drops", &g_stats.queryPipeFullDrops); addGetStat("answers0-1", &g_stats.answers0_1); addGetStat("answers1-10", &g_stats.answers1_10); diff --git a/pdns/recursordist/RECURSOR-MIB.txt b/pdns/recursordist/RECURSOR-MIB.txt index ccbacf889..d190126de 100644 --- a/pdns/recursordist/RECURSOR-MIB.txt +++ b/pdns/recursordist/RECURSOR-MIB.txt @@ -758,6 +758,14 @@ policyResultCustom OBJECT-TYPE "Number of policy-mandated custom results" ::= { stats 91 } +queryPipeFullDrops OBJECT-TYPE + SYNTAX Counter64 + MAX-ACCESS read-only + STATUS current + DESCRIPTION + "Number of responses dropped because the query distribution pipe was full" + ::= { stats 92 } + --- --- Traps / Notifications --- @@ -891,6 +899,7 @@ recGroup OBJECT-GROUP policyResultNodata, policyResultTruncate, policyResultCustom, + queryPipeFullDrops, trapReason } STATUS current diff --git a/pdns/recursordist/docs/metrics.rst b/pdns/recursordist/docs/metrics.rst index 71b90c4b2..f10cec038 100644 --- a/pdns/recursordist/docs/metrics.rst +++ b/pdns/recursordist/docs/metrics.rst @@ -360,6 +360,12 @@ qa-latency ^^^^^^^^^^ shows the current latency average, in microseconds, exponentially weighted over past 'latency-statistic-size' packets +query-pipe-full-drops +^^^^^^^^^^^^^^^^^^^^^ +.. versionadded:: 4.2 + +questions dropped because the query distribution pipe was full + questions ^^^^^^^^^ counts all end-user initiated queries with the RD bit set diff --git a/pdns/syncres.hh b/pdns/syncres.hh index b10b44076..3abc485b8 100644 --- a/pdns/syncres.hh +++ b/pdns/syncres.hh @@ -905,6 +905,7 @@ struct RecursorStats std::atomic clientParseError; std::atomic serverParseError; std::atomic tooOldDrops; + std::atomic queryPipeFullDrops; std::atomic unexpectedCount; std::atomic caseMismatchCount; std::atomic spoofCount;