int readToThread;
int writeFromThread;
int readFromThread;
+ int writeQueriesToThread; // this one is non-blocking
+ int readQueriesToThread;
};
typedef vector<int> tcpListenSockets_t;
tps.readFromThread = fd[0];
tps.writeFromThread = fd[1];
+ if(pipe(fd) < 0)
+ unixDie("Creating pipe for inter-thread communications");
+ tps.readQueriesToThread = fd[0];
+ tps.writeQueriesToThread = fd[1];
+ setNonBlocking(tps.writeQueriesToThread);
+
g_pipes.push_back(tps);
}
}
tmsg->func = func;
tmsg->wantAnswer = false;
- if(write(tps.writeToThread, &tmsg, sizeof(tmsg)) != sizeof(tmsg)) {
+ ssize_t written = write(tps.writeQueriesToThread, &tmsg, sizeof(tmsg));
+ if (written > 0) {
+ if (static_cast<size_t>(written) != sizeof(tmsg)) {
+ delete tmsg;
+ unixDie("write to thread pipe returned wrong size or error");
+ }
+ }
+ else {
+ int error = errno;
delete tmsg;
- unixDie("write to thread pipe returned wrong size or error");
+ if (error == EAGAIN || error == EWOULDBLOCK) {
+ g_stats.queryPipeFullDrops++;
+ } else {
+ unixDie("write to thread pipe returned wrong size or error:" + error);
+ }
}
}
{
ThreadMSG* tmsg = nullptr;
- if(read(fd, &tmsg, sizeof(tmsg)) != sizeof(tmsg)) { // fd == readToThread
+ if(read(fd, &tmsg, sizeof(tmsg)) != sizeof(tmsg)) { // fd == readToThread || fd == readQueriesToThread
unixDie("read from thread pipe returned wrong size or error");
}
}
t_fdm->addReadFD(g_pipes[t_id].readToThread, handlePipeRequest);
+ t_fdm->addReadFD(g_pipes[t_id].readQueriesToThread, handlePipeRequest);
if(g_useOneSocketPerThread) {
for(deferredAdd_t::const_iterator i = deferredAdds[t_id].cbegin(); i != deferredAdds[t_id].cend(); ++i) {
static const oid policyResultNodataOID[] = { RECURSOR_STATS_OID, 89 };
static const oid policyResultTruncateOID[] = { RECURSOR_STATS_OID, 90 };
static const oid policyResultCustomOID[] = { RECURSOR_STATS_OID, 91 };
+static const oid queryPipeFullDropsOID[] = { RECURSOR_STATS_OID, 92 };
static std::unordered_map<oid, std::string> s_statsMap;
registerCounter64Stat("client-parse-errors", clientParseErrorsOID, OID_LENGTH(clientParseErrorsOID));
registerCounter64Stat("server-parse-errors", serverParseErrorsOID, OID_LENGTH(serverParseErrorsOID));
registerCounter64Stat("too-old-drops", tooOldDropsOID, OID_LENGTH(tooOldDropsOID));
+ registerCounter64Stat("query-pipe-full-drops", queryPipeFullDropsOID, OID_LENGTH(queryPipeFullDropsOID));
registerCounter64Stat("answers0-1", answers01OID, OID_LENGTH(answers01OID));
registerCounter64Stat("answers1-10", answers110OID, OID_LENGTH(answers110OID));
registerCounter64Stat("answers10-100", answers10100OID, OID_LENGTH(answers10100OID));
addGetStat("client-parse-errors", &g_stats.clientParseError);
addGetStat("server-parse-errors", &g_stats.serverParseError);
addGetStat("too-old-drops", &g_stats.tooOldDrops);
+ addGetStat("query-pipe-full-drops", &g_stats.queryPipeFullDrops);
addGetStat("answers0-1", &g_stats.answers0_1);
addGetStat("answers1-10", &g_stats.answers1_10);
"Number of policy-mandated custom results"
::= { stats 91 }
+queryPipeFullDrops OBJECT-TYPE
+ SYNTAX Counter64
+ MAX-ACCESS read-only
+ STATUS current
+ DESCRIPTION
+ "Number of responses dropped because the query distribution pipe was full"
+ ::= { stats 92 }
+
---
--- Traps / Notifications
---
policyResultNodata,
policyResultTruncate,
policyResultCustom,
+ queryPipeFullDrops,
trapReason
}
STATUS current
^^^^^^^^^^
shows the current latency average, in microseconds, exponentially weighted over past 'latency-statistic-size' packets
+query-pipe-full-drops
+^^^^^^^^^^^^^^^^^^^^^
+.. versionadded:: 4.2
+
+questions dropped because the query distribution pipe was full
+
questions
^^^^^^^^^
counts all end-user initiated queries with the RD bit set
std::atomic<uint64_t> clientParseError;
std::atomic<uint64_t> serverParseError;
std::atomic<uint64_t> tooOldDrops;
+ std::atomic<uint64_t> queryPipeFullDrops;
std::atomic<uint64_t> unexpectedCount;
std::atomic<uint64_t> caseMismatchCount;
std::atomic<uint64_t> spoofCount;