From 63341e8d89ecb90faf3afd7bbb9286ac8249e1b3 Mon Sep 17 00:00:00 2001 From: Remi Gacogne Date: Wed, 4 Apr 2018 16:49:03 +0200 Subject: [PATCH] rec: Use a separate protobuf exporter thread per worker thread Use a dedicated protobuf queue for each worker thread, instead of sharing a single one for all worker threads, leading to some contention under heavy load. --- pdns/pdns_recursor.cc | 113 ++++++++++++++++++++++++++++++++++++------ pdns/rec-lua-conf.cc | 53 +++++++++++++++++--- pdns/rec-lua-conf.hh | 19 +++++-- pdns/remote_logger.cc | 2 +- pdns/remote_logger.hh | 13 +++++ pdns/syncres.cc | 4 +- pdns/syncres.hh | 6 +++ 7 files changed, 183 insertions(+), 27 deletions(-) diff --git a/pdns/pdns_recursor.cc b/pdns/pdns_recursor.cc index 93c566e6b..a868ed4b5 100644 --- a/pdns/pdns_recursor.cc +++ b/pdns/pdns_recursor.cc @@ -104,6 +104,10 @@ static thread_local std::shared_ptr t_pdl; static thread_local unsigned int t_id; static thread_local std::shared_ptr t_traceRegex; static thread_local std::unique_ptr t_tcpClientCounts; +#ifdef HAVE_PROTOBUF +static thread_local std::shared_ptr t_protobufServer{nullptr}; +static thread_local std::shared_ptr t_outgoingProtobufServer{nullptr}; +#endif /* HAVE_PROTOBUF */ thread_local std::unique_ptr MT; // the big MTasker thread_local std::unique_ptr t_RC; @@ -776,6 +780,77 @@ static bool addRecordToPacket(DNSPacketWriter& pw, const DNSRecord& rec, uint32_ return true; } +#ifdef HAVE_PROTOBUF +static std::shared_ptr startProtobufServer(const ProtobufExportConfig& config, uint64_t generation) +{ + std::shared_ptr result = nullptr; + try { + result = std::make_shared(config.server, config.timeout, config.maxQueuedEntries, config.reconnectWaitTime, config.asyncConnect); + result->setGeneration(generation); + } + catch(const std::exception& e) { + g_log<& luaconfsLocal) +{ + if (!luaconfsLocal->protobufExportConfig.enabled) { + if (t_protobufServer != nullptr) { + t_protobufServer->stop(); + t_protobufServer = nullptr; + } + + return false; + } + + /* if the server was not running, or if it was running according to a + previous configuration */ + if (t_protobufServer == nullptr || + t_protobufServer->getGeneration() < luaconfsLocal->generation) { + + if (t_protobufServer) { + t_protobufServer->stop(); + } + + t_protobufServer = startProtobufServer(luaconfsLocal->protobufExportConfig, luaconfsLocal->generation); + } + + return true; +} + +static bool checkOutgoingProtobufExport(LocalStateHolder& luaconfsLocal) +{ + if (!luaconfsLocal->outgoingProtobufExportConfig.enabled) { + if (t_outgoingProtobufServer != nullptr) { + t_outgoingProtobufServer->stop(); + t_outgoingProtobufServer = nullptr; + } + + return false; + } + + /* if the server was not running, or if it was running according to a + previous configuration */ + if (t_outgoingProtobufServer == nullptr || + t_outgoingProtobufServer->getGeneration() < luaconfsLocal->generation) { + + if (t_outgoingProtobufServer) { + t_outgoingProtobufServer->stop(); + } + + t_outgoingProtobufServer = startProtobufServer(luaconfsLocal->outgoingProtobufExportConfig, luaconfsLocal->generation); + } + + return true; +} +#endif /* HAVE_PROTOBUF */ + static void startDoResolve(void *p) { DNSComboWriter* dc=(DNSComboWriter *)p; @@ -815,7 +890,7 @@ static void startDoResolve(void *p) bool wantsRPZ(true); boost::optional pbMessage(boost::none); #ifdef HAVE_PROTOBUF - if (luaconfsLocal->protobufServer) { + if (checkProtobufExport(luaconfsLocal)) { Netmask requestorNM(dc->d_source, dc->d_source.sin4.sin_family == AF_INET ? luaconfsLocal->protobufMaskV4 : luaconfsLocal->protobufMaskV6); const ComboAddress& requestor = requestorNM.getMaskedNetwork(); pbMessage = RecProtoBufMessage(RecProtoBufMessage::Response); @@ -863,6 +938,7 @@ static void startDoResolve(void *p) #ifdef HAVE_PROTOBUF sr.setInitialRequestId(dc->d_uuid); + sr.setOutgoingProtobufServer(t_outgoingProtobufServer); #endif sr.setQuerySource(dc->d_remote, g_useIncomingECS && !dc->d_ednssubnet.source.empty() ? boost::optional(dc->d_ednssubnet) : boost::none); @@ -1191,7 +1267,7 @@ static void startDoResolve(void *p) needCommit = true; #ifdef HAVE_PROTOBUF - if(luaconfsLocal->protobufServer && (i->d_type == QType::A || i->d_type == QType::AAAA || i->d_type == QType::CNAME)) { + if(t_protobufServer && (i->d_type == QType::A || i->d_type == QType::AAAA || i->d_type == QType::CNAME)) { pbMessage->addRR(*i); } #endif @@ -1216,7 +1292,7 @@ static void startDoResolve(void *p) g_rs.submitResponse(dc->d_mdp.d_qtype, packet.size(), !dc->d_tcp); updateResponseStats(res, dc->d_source, packet.size(), &dc->d_mdp.d_qname, dc->d_mdp.d_qtype); #ifdef HAVE_PROTOBUF - if (luaconfsLocal->protobufServer && (!luaconfsLocal->protobufTaggedOnly || (appliedPolicy.d_name && !appliedPolicy.d_name->empty()) || !dc->d_policyTags.empty())) { + if (t_protobufServer && (!luaconfsLocal->protobufTaggedOnly || (appliedPolicy.d_name && !appliedPolicy.d_name->empty()) || !dc->d_policyTags.empty())) { pbMessage->setBytes(packet.size()); pbMessage->setResponseCode(pw.getHeader()->rcode); if (appliedPolicy.d_name) { @@ -1227,7 +1303,7 @@ static void startDoResolve(void *p) pbMessage->setQueryTime(dc->d_now.tv_sec, dc->d_now.tv_usec); pbMessage->setRequestorId(dq.requestorId); pbMessage->setDeviceId(dq.deviceId); - protobufLogResponse(luaconfsLocal->protobufServer, *pbMessage); + protobufLogResponse(t_protobufServer, *pbMessage); } #endif if(!dc->d_tcp) { @@ -1561,7 +1637,7 @@ static void handleRunningTCPQuestion(int fd, FDMultiplexer::funcparam_t& var) string deviceId; #ifdef HAVE_PROTOBUF auto luaconfsLocal = g_luaconfs.getLocal(); - if (luaconfsLocal->protobufServer) { + if (checkProtobufExport(luaconfsLocal)) { needECS = true; } #endif @@ -1599,18 +1675,18 @@ static void handleRunningTCPQuestion(int fd, FDMultiplexer::funcparam_t& var) } } #ifdef HAVE_PROTOBUF - if(luaconfsLocal->protobufServer || luaconfsLocal->outgoingProtobufServer) { + if(t_protobufServer || t_outgoingProtobufServer) { dc->d_requestorId = requestorId; dc->d_deviceId = deviceId; dc->d_uuid = (*t_uuidGenerator)(); } - if(luaconfsLocal->protobufServer) { + if(t_protobufServer) { try { const struct dnsheader* dh = (const struct dnsheader*) conn->data; if (!luaconfsLocal->protobufTaggedOnly) { - protobufLogQuery(luaconfsLocal->protobufServer, luaconfsLocal->protobufMaskV4, luaconfsLocal->protobufMaskV6, dc->d_uuid, dc->d_source, dc->d_destination, dc->d_ednssubnet.source, true, dh->id, conn->qlen, qname, qtype, qclass, dc->d_policyTags, dc->d_requestorId, dc->d_deviceId); + protobufLogQuery(t_protobufServer, luaconfsLocal->protobufMaskV4, luaconfsLocal->protobufMaskV6, dc->d_uuid, dc->d_source, dc->d_destination, dc->d_ednssubnet.source, true, dh->id, conn->qlen, qname, qtype, qclass, dc->d_policyTags, dc->d_requestorId, dc->d_deviceId); } } catch(std::exception& e) { @@ -1727,10 +1803,10 @@ static string* doProcessUDPQuestion(const std::string& question, const ComboAddr #ifdef HAVE_PROTOBUF boost::uuids::uuid uniqueId; auto luaconfsLocal = g_luaconfs.getLocal(); - if (luaconfsLocal->protobufServer) { + if (checkProtobufExport(luaconfsLocal)) { uniqueId = (*t_uuidGenerator)(); needECS = true; - } else if (luaconfsLocal->outgoingProtobufServer) { + } else if (checkOutgoingProtobufExport(luaconfsLocal)) { uniqueId = (*t_uuidGenerator)(); } #endif @@ -1796,10 +1872,10 @@ static string* doProcessUDPQuestion(const std::string& question, const ComboAddr bool cacheHit = false; boost::optional pbMessage(boost::none); #ifdef HAVE_PROTOBUF - if(luaconfsLocal->protobufServer) { + if(t_protobufServer) { pbMessage = RecProtoBufMessage(DNSProtoBufMessage::DNSProtoBufMessageType::Response); if (!luaconfsLocal->protobufTaggedOnly || !policyTags.empty()) { - protobufLogQuery(luaconfsLocal->protobufServer, luaconfsLocal->protobufMaskV4, luaconfsLocal->protobufMaskV6, uniqueId, source, destination, ednssubnet.source, false, dh->id, question.size(), qname, qtype, qclass, policyTags, requestorId, deviceId); + protobufLogQuery(t_protobufServer, luaconfsLocal->protobufMaskV4, luaconfsLocal->protobufMaskV6, uniqueId, source, destination, ednssubnet.source, false, dh->id, question.size(), qname, qtype, qclass, policyTags, requestorId, deviceId); } } #endif /* HAVE_PROTOBUF */ @@ -1816,7 +1892,7 @@ static string* doProcessUDPQuestion(const std::string& question, const ComboAddr if (cacheHit) { #ifdef HAVE_PROTOBUF - if(luaconfsLocal->protobufServer && (!luaconfsLocal->protobufTaggedOnly || !pbMessage->getAppliedPolicy().empty() || !pbMessage->getPolicyTags().empty())) { + if(t_protobufServer && (!luaconfsLocal->protobufTaggedOnly || !pbMessage->getAppliedPolicy().empty() || !pbMessage->getPolicyTags().empty())) { Netmask requestorNM(source, source.sin4.sin_family == AF_INET ? luaconfsLocal->protobufMaskV4 : luaconfsLocal->protobufMaskV6); const ComboAddress& requestor = requestorNM.getMaskedNetwork(); pbMessage->update(uniqueId, &requestor, &destination, false, dh->id); @@ -1824,7 +1900,7 @@ static string* doProcessUDPQuestion(const std::string& question, const ComboAddr pbMessage->setQueryTime(g_now.tv_sec, g_now.tv_usec); pbMessage->setRequestorId(requestorId); pbMessage->setDeviceId(deviceId); - protobufLogResponse(luaconfsLocal->protobufServer, *pbMessage); + protobufLogResponse(t_protobufServer, *pbMessage); } #endif /* HAVE_PROTOBUF */ if(!g_quiet) @@ -1895,7 +1971,7 @@ static string* doProcessUDPQuestion(const std::string& question, const ComboAddr dc->d_ttlCap = ttlCap; dc->d_variable = variable; #ifdef HAVE_PROTOBUF - if (luaconfsLocal->protobufServer || luaconfsLocal->outgoingProtobufServer) { + if (t_protobufServer || t_outgoingProtobufServer) { dc->d_uuid = uniqueId; } dc->d_requestorId = requestorId; @@ -3338,6 +3414,13 @@ try MT=std::unique_ptr >(new MTasker(::arg().asNum("stack-size"))); +#ifdef HAVE_PROTOBUF + /* start protobuf export threads if needed */ + auto luaconfsLocal = g_luaconfs.getLocal(); + checkProtobufExport(luaconfsLocal); + checkOutgoingProtobufExport(luaconfsLocal); +#endif /* HAVE_PROTOBUF */ + PacketID pident; t_fdm=getMultiplexer(); diff --git a/pdns/rec-lua-conf.cc b/pdns/rec-lua-conf.cc index 04b082bc6..2098fbfa0 100644 --- a/pdns/rec-lua-conf.cc +++ b/pdns/rec-lua-conf.cc @@ -92,6 +92,9 @@ void loadRecursorLuaConfig(const std::string& fname, bool checkOnly) if(!ifs) throw PDNSException("Cannot open file '"+fname+"': "+strerror(errno)); + auto luaconfsLocal = g_luaconfs.getLocal(); + lci.generation = luaconfsLocal->generation + 1; + Lua.writeFunction("clearSortlist", [&lci]() { lci.sortlist.clear(); }); /* we can get: "1.2.3.4" @@ -277,9 +280,28 @@ void loadRecursorLuaConfig(const std::string& fname, bool checkOnly) Lua.writeFunction("protobufServer", [&lci, checkOnly](const string& server_, const boost::optional timeout, const boost::optional maxQueuedEntries, const boost::optional reconnectWaitTime, const boost::optional maskV4, boost::optional maskV6, boost::optional asyncConnect, boost::optional taggedOnly) { try { ComboAddress server(server_); - if (!lci.protobufServer) { + if (!lci.protobufExportConfig.enabled) { + + lci.protobufExportConfig.enabled = true; + if (!checkOnly) { - lci.protobufServer = std::make_shared(server, timeout ? *timeout : 2, maxQueuedEntries ? *maxQueuedEntries : 100, reconnectWaitTime ? *reconnectWaitTime : 1, asyncConnect ? *asyncConnect : false); + lci.protobufExportConfig.server = server; + + if (timeout) { + lci.protobufExportConfig.timeout = *timeout; + } + + if (maxQueuedEntries) { + lci.protobufExportConfig.maxQueuedEntries = *maxQueuedEntries; + } + + if (reconnectWaitTime) { + lci.protobufExportConfig.reconnectWaitTime = *reconnectWaitTime; + } + + if (asyncConnect) { + lci.protobufExportConfig.asyncConnect = *asyncConnect; + } } if (maskV4) { @@ -293,7 +315,7 @@ void loadRecursorLuaConfig(const std::string& fname, bool checkOnly) } } else { - g_log<toString()< timeout, const boost::optional maxQueuedEntries, const boost::optional reconnectWaitTime, boost::optional asyncConnect) { try { ComboAddress server(server_); - if (!lci.outgoingProtobufServer) { + if (!lci.outgoingProtobufExportConfig.enabled) { + + lci.outgoingProtobufExportConfig.enabled = true; + if (!checkOnly) { - lci.outgoingProtobufServer = std::make_shared(server, timeout ? *timeout : 2, maxQueuedEntries ? *maxQueuedEntries : 100, reconnectWaitTime ? *reconnectWaitTime : 1, asyncConnect ? *asyncConnect : false); + lci.outgoingProtobufExportConfig.server = server; + + if (timeout) { + lci.outgoingProtobufExportConfig.timeout = *timeout; + } + + if (maxQueuedEntries) { + lci.outgoingProtobufExportConfig.maxQueuedEntries = *maxQueuedEntries; + } + + if (reconnectWaitTime) { + lci.outgoingProtobufExportConfig.reconnectWaitTime = *reconnectWaitTime; + } + + if (asyncConnect) { + lci.outgoingProtobufExportConfig.asyncConnect = *asyncConnect; + } } } else { - g_log<toString()< dsAnchors; map negAnchors; - std::shared_ptr protobufServer{nullptr}; - std::shared_ptr outgoingProtobufServer{nullptr}; + /* we need to increment this every time the configuration + is reloaded, so we know if we need to reload the protobuf + remote loggers */ + ProtobufExportConfig protobufExportConfig; + ProtobufExportConfig outgoingProtobufExportConfig; + uint64_t generation{0}; uint8_t protobufMaskV4{32}; uint8_t protobufMaskV6{128}; bool protobufTaggedOnly{false}; diff --git a/pdns/remote_logger.cc b/pdns/remote_logger.cc index 9cda5925e..70b29f701 100644 --- a/pdns/remote_logger.cc +++ b/pdns/remote_logger.cc @@ -75,7 +75,7 @@ void RemoteLogger::worker() void RemoteLogger::queueData(const std::string& data) { { - std::unique_lock lock(d_writeMutex); + std::lock_guard lock(d_writeMutex); if (d_writeQueue.size() >= d_maxQueuedEntries) { d_writeQueue.pop(); } diff --git a/pdns/remote_logger.hh b/pdns/remote_logger.hh index 6f72840c6..a0fec492e 100644 --- a/pdns/remote_logger.hh +++ b/pdns/remote_logger.hh @@ -49,6 +49,18 @@ public: { return "RemoteLogger to " + d_remote.toStringWithPort(); } + void stop() + { + d_exiting = true; + } + uint64_t getGeneration() const + { + return d_generation; + } + void setGeneration(uint64_t newGeneration) + { + d_generation = newGeneration; + } private: void busyReconnectLoop(); bool reconnect(); @@ -59,6 +71,7 @@ private: std::condition_variable d_queueCond; ComboAddress d_remote; uint64_t d_maxQueuedEntries; + uint64_t d_generation{0}; int d_socket{-1}; uint16_t d_timeout; uint8_t d_reconnectWaitTime; diff --git a/pdns/syncres.cc b/pdns/syncres.cc index b765958a4..b4d982415 100644 --- a/pdns/syncres.cc +++ b/pdns/syncres.cc @@ -464,10 +464,10 @@ int SyncRes::asyncresolveWrapper(const ComboAddress& ip, bool ednsMANDATORY, con sendQname.makeUsLowerCase(); if (d_asyncResolve) { - ret = d_asyncResolve(ip, sendQname, type, doTCP, sendRDQuery, EDNSLevel, now, srcmask, ctx, luaconfsLocal->outgoingProtobufServer, res, chained); + ret = d_asyncResolve(ip, sendQname, type, doTCP, sendRDQuery, EDNSLevel, now, srcmask, ctx, d_outgoingProtobufServer, res, chained); } else { - ret=asyncresolve(ip, sendQname, type, doTCP, sendRDQuery, EDNSLevel, now, srcmask, ctx, luaconfsLocal->outgoingProtobufServer, res, chained); + ret=asyncresolve(ip, sendQname, type, doTCP, sendRDQuery, EDNSLevel, now, srcmask, ctx, d_outgoingProtobufServer, res, chained); } if(ret < 0) { return ret; // transport error, nothing to learn here diff --git a/pdns/syncres.hh b/pdns/syncres.hh index e953d6cfc..b10b44076 100644 --- a/pdns/syncres.hh +++ b/pdns/syncres.hh @@ -649,6 +649,11 @@ public: { d_initialRequestId = initialRequestId; } + + void setOutgoingProtobufServer(std::shared_ptr& server) + { + d_outgoingProtobufServer = server; + } #endif void setAsyncCallback(asyncresolve_t func) @@ -790,6 +795,7 @@ private: ostringstream d_trace; shared_ptr d_pdl; boost::optional d_outgoingECSNetwork; + std::shared_ptr d_outgoingProtobufServer{nullptr}; #ifdef HAVE_PROTOBUF boost::optional d_initialRequestId; #endif -- 2.40.0