From 5d6c7a46ec22bbacc2a0dc5e8c74beaa84cfe224 Mon Sep 17 00:00:00 2001 From: Remi Gacogne Date: Fri, 28 Jun 2019 11:47:39 +0200 Subject: [PATCH] rec: Fix the export of only outgoing queries or incoming responses --- pdns/fstrm_logger.hh | 7 ----- pdns/lwres.cc | 67 ++++++++++++++++++++++++++++++++++++------- pdns/pdns_recursor.cc | 5 +++- pdns/remote_logger.hh | 14 +++++++-- 4 files changed, 72 insertions(+), 21 deletions(-) diff --git a/pdns/fstrm_logger.hh b/pdns/fstrm_logger.hh index 57ee93aed..c6844a515 100644 --- a/pdns/fstrm_logger.hh +++ b/pdns/fstrm_logger.hh @@ -43,10 +43,6 @@ public: { return "FrameStreamLogger to " + d_address; } - bool logQueries(void) const { return d_logQueries; } - bool logResponses(void) const { return d_logResponses; } - void setLogQueries(bool flag) { d_logQueries = flag; } - void setLogResponses(bool flag) { d_logResponses = flag; } private: @@ -63,9 +59,6 @@ private: struct fstrm_iothr *d_iothr{nullptr}; void cleanup(); - - bool d_logQueries{true}; - bool d_logResponses{true}; }; #else diff --git a/pdns/lwres.cc b/pdns/lwres.cc index f53909008..a1fb7400a 100644 --- a/pdns/lwres.cc +++ b/pdns/lwres.cc @@ -122,8 +122,21 @@ static void logFstreamResponse(const std::shared_ptr>>& outgoingLoggers, boost::optional& message, boost::optional initialRequestId, const boost::uuids::uuid& uuid, const ComboAddress& ip, const DNSName& domain, int type, uint16_t qid, bool doTCP, size_t bytes, boost::optional& srcmask) { - if(!outgoingLoggers) + if (!outgoingLoggers) { return; + } + + bool log = false; + for (auto& logger : *outgoingLoggers) { + if (logger->logQueries()) { + log = true; + break; + } + } + + if (!log) { + return; + } message = RecProtoBufMessage(DNSProtoBufMessage::OutgoingQuery, uuid, nullptr, &ip, domain, type, QClass::IN, qid, doTCP, bytes); message->setServerIdentity(SyncRes::s_serverID); @@ -141,18 +154,48 @@ static void logOutgoingQuery(const std::shared_ptrserialize(str); for (auto& logger : *outgoingLoggers) { - logger->queueData(str); + if (logger->logQueries()) { + logger->queueData(str); + } } } -static void logIncomingResponse(const std::shared_ptr>>& outgoingLoggers, boost::optional& message, size_t bytes, int rcode, const std::vector& records, const struct timeval& queryTime, const std::set& exportTypes) +static void logIncomingResponse(const std::shared_ptr>>& outgoingLoggers, boost::optional& message, boost::optional initialRequestId, const boost::uuids::uuid& uuid, const ComboAddress& ip, const DNSName& domain, int type, uint16_t qid, bool doTCP, boost::optional& srcmask, size_t bytes, int rcode, const std::vector& records, const struct timeval& queryTime, const std::set& exportTypes) { - if(!outgoingLoggers || !message) + if (!outgoingLoggers) { + return; + } + + bool log = false; + for (auto& logger : *outgoingLoggers) { + if (logger->logResponses()) { + log = true; + break; + } + } + + if (!log) { return; + } + + if (!message) { + message = RecProtoBufMessage(DNSProtoBufMessage::IncomingResponse, uuid, nullptr, &ip, domain, type, QClass::IN, qid, doTCP, bytes); + message->setServerIdentity(SyncRes::s_serverID); + + if (initialRequestId) { + message->setInitialRequestID(*initialRequestId); + } + + if (srcmask) { + message->setEDNSSubnet(*srcmask); + } + } + else { + message->updateTime(); + message->setType(DNSProtoBufMessage::IncomingResponse); + message->setBytes(bytes); + } - message->updateTime(); - message->setType(DNSProtoBufMessage::IncomingResponse); - message->setBytes(bytes); message->setQueryTime(queryTime.tv_sec, queryTime.tv_usec); message->setResponseCode(rcode); message->addRRs(records, exportTypes); @@ -162,7 +205,9 @@ static void logIncomingResponse(const std::shared_ptrserialize(str); for (auto& logger : *outgoingLoggers) { - logger->queueData(str); + if (logger->logResponses()) { + logger->queueData(str); + } } } #endif /* HAVE_PROTOBUF */ @@ -328,7 +373,7 @@ int asyncresolve(const ComboAddress& ip, const DNSName& domain, int type, bool d if(mdp.d_header.rcode == RCode::FormErr && mdp.d_qname.empty() && mdp.d_qtype == 0 && mdp.d_qclass == 0) { #ifdef HAVE_PROTOBUF if(outgoingLoggers) { - logIncomingResponse(outgoingLoggers, pbMessage, len, lwr->d_rcode, lwr->d_records, queryTime, exportTypes); + logIncomingResponse(outgoingLoggers, pbMessage, context ? context->d_initialRequestId : boost::none, uuid, ip, domain, type, qid, doTCP, srcmask, len, lwr->d_rcode, lwr->d_records, queryTime, exportTypes); } #endif lwr->d_validpacket=true; @@ -374,7 +419,7 @@ int asyncresolve(const ComboAddress& ip, const DNSName& domain, int type, bool d #ifdef HAVE_PROTOBUF if(outgoingLoggers) { - logIncomingResponse(outgoingLoggers, pbMessage, len, lwr->d_rcode, lwr->d_records, queryTime, exportTypes); + logIncomingResponse(outgoingLoggers, pbMessage, context ? context->d_initialRequestId : boost::none, uuid, ip, domain, type, qid, doTCP, srcmask, len, lwr->d_rcode, lwr->d_records, queryTime, exportTypes); } #endif lwr->d_validpacket=true; @@ -387,7 +432,7 @@ int asyncresolve(const ComboAddress& ip, const DNSName& domain, int type, bool d g_stats.serverParseError++; #ifdef HAVE_PROTOBUF if(outgoingLoggers) { - logIncomingResponse(outgoingLoggers, pbMessage, len, lwr->d_rcode, lwr->d_records, queryTime, exportTypes); + logIncomingResponse(outgoingLoggers, pbMessage, context ? context->d_initialRequestId : boost::none, uuid, ip, domain, type, qid, doTCP, srcmask, len, lwr->d_rcode, lwr->d_records, queryTime, exportTypes); } #endif lwr->d_validpacket=false; diff --git a/pdns/pdns_recursor.cc b/pdns/pdns_recursor.cc index a61602d9b..347c2803e 100644 --- a/pdns/pdns_recursor.cc +++ b/pdns/pdns_recursor.cc @@ -891,7 +891,10 @@ static std::shared_ptr>> startProtobuf for (const auto& server : config.servers) { try { - result->emplace_back(new RemoteLogger(server, config.timeout, 100*config.maxQueuedEntries, config.reconnectWaitTime, config.asyncConnect)); + auto logger = make_unique(server, config.timeout, 100*config.maxQueuedEntries, config.reconnectWaitTime, config.asyncConnect); + logger->setLogQueries(config.logQueries); + logger->setLogResponses(config.logResponses); + result->emplace_back(std::move(logger)); } catch(const std::exception& e) { g_log< d_drops{0}; + private: bool reconnect(); void maintenanceThread(); @@ -98,8 +108,8 @@ private: uint16_t d_timeout; uint8_t d_reconnectWaitTime; std::atomic d_exiting{false}; - bool d_asyncConnect{false}; - std::thread d_thread; + std::mutex d_mutex; + std::thread d_thread; }; -- 2.40.0