]> granicus.if.org Git - pdns/commitdiff
rec: Use a separate protobuf exporter thread per worker thread
authorRemi Gacogne <remi.gacogne@powerdns.com>
Wed, 4 Apr 2018 14:49:03 +0000 (16:49 +0200)
committerRemi Gacogne <remi.gacogne@powerdns.com>
Wed, 11 Apr 2018 11:53:45 +0000 (13:53 +0200)
Use a dedicated protobuf queue for each worker thread, instead of
sharing a single one for all worker threads, leading to some
contention under heavy load.

pdns/pdns_recursor.cc
pdns/rec-lua-conf.cc
pdns/rec-lua-conf.hh
pdns/remote_logger.cc
pdns/remote_logger.hh
pdns/syncres.cc
pdns/syncres.hh

index 93c566e6bbded4cc6860f5fae367d1d6bb513ec0..a868ed4b5816191b0da59b78fe4547ff18d6efb0 100644 (file)
@@ -104,6 +104,10 @@ static thread_local std::shared_ptr<RecursorLua4> t_pdl;
 static thread_local unsigned int t_id;
 static thread_local std::shared_ptr<Regex> t_traceRegex;
 static thread_local std::unique_ptr<tcpClientCounts_t> t_tcpClientCounts;
+#ifdef HAVE_PROTOBUF
+static thread_local std::shared_ptr<RemoteLogger> t_protobufServer{nullptr};
+static thread_local std::shared_ptr<RemoteLogger> t_outgoingProtobufServer{nullptr};
+#endif /* HAVE_PROTOBUF */
 
 thread_local std::unique_ptr<MT_t> MT; // the big MTasker
 thread_local std::unique_ptr<MemRecursorCache> t_RC;
@@ -776,6 +780,77 @@ static bool addRecordToPacket(DNSPacketWriter& pw, const DNSRecord& rec, uint32_
   return true;
 }
 
+#ifdef HAVE_PROTOBUF
+static std::shared_ptr<RemoteLogger> startProtobufServer(const ProtobufExportConfig& config, uint64_t generation)
+{
+  std::shared_ptr<RemoteLogger> result = nullptr;
+  try {
+    result = std::make_shared<RemoteLogger>(config.server, config.timeout, config.maxQueuedEntries, config.reconnectWaitTime, config.asyncConnect);
+    result->setGeneration(generation);
+  }
+  catch(const std::exception& e) {
+    g_log<<Logger::Error<<"Error while starting protobuf logger to '"<<config.server<<": "<<e.what()<<endl;
+  }
+  catch(const PDNSException& e) {
+    g_log<<Logger::Error<<"Error while starting protobuf logger to '"<<config.server<<": "<<e.reason<<endl;
+  }
+
+  return result;
+}
+
+static bool checkProtobufExport(LocalStateHolder<LuaConfigItems>& luaconfsLocal)
+{
+  if (!luaconfsLocal->protobufExportConfig.enabled) {
+    if (t_protobufServer != nullptr) {
+      t_protobufServer->stop();
+      t_protobufServer = nullptr;
+    }
+
+    return false;
+  }
+
+  /* if the server was not running, or if it was running according to a
+     previous configuration */
+  if (t_protobufServer == nullptr ||
+      t_protobufServer->getGeneration() < luaconfsLocal->generation) {
+
+    if (t_protobufServer) {
+      t_protobufServer->stop();
+    }
+
+    t_protobufServer = startProtobufServer(luaconfsLocal->protobufExportConfig, luaconfsLocal->generation);
+  }
+
+  return true;
+}
+
+static bool checkOutgoingProtobufExport(LocalStateHolder<LuaConfigItems>& luaconfsLocal)
+{
+  if (!luaconfsLocal->outgoingProtobufExportConfig.enabled) {
+    if (t_outgoingProtobufServer != nullptr) {
+      t_outgoingProtobufServer->stop();
+      t_outgoingProtobufServer = nullptr;
+    }
+
+    return false;
+  }
+
+  /* if the server was not running, or if it was running according to a
+     previous configuration */
+  if (t_outgoingProtobufServer == nullptr ||
+      t_outgoingProtobufServer->getGeneration() < luaconfsLocal->generation) {
+
+    if (t_outgoingProtobufServer) {
+      t_outgoingProtobufServer->stop();
+    }
+
+    t_outgoingProtobufServer = startProtobufServer(luaconfsLocal->outgoingProtobufExportConfig, luaconfsLocal->generation);
+  }
+
+  return true;
+}
+#endif /* HAVE_PROTOBUF */
+
 static void startDoResolve(void *p)
 {
   DNSComboWriter* dc=(DNSComboWriter *)p;
@@ -815,7 +890,7 @@ static void startDoResolve(void *p)
     bool wantsRPZ(true);
     boost::optional<RecProtoBufMessage> pbMessage(boost::none);
 #ifdef HAVE_PROTOBUF
-    if (luaconfsLocal->protobufServer) {
+    if (checkProtobufExport(luaconfsLocal)) {
       Netmask requestorNM(dc->d_source, dc->d_source.sin4.sin_family == AF_INET ? luaconfsLocal->protobufMaskV4 : luaconfsLocal->protobufMaskV6);
       const ComboAddress& requestor = requestorNM.getMaskedNetwork();
       pbMessage = RecProtoBufMessage(RecProtoBufMessage::Response);
@@ -863,6 +938,7 @@ static void startDoResolve(void *p)
 
 #ifdef HAVE_PROTOBUF
     sr.setInitialRequestId(dc->d_uuid);
+    sr.setOutgoingProtobufServer(t_outgoingProtobufServer);
 #endif
 
     sr.setQuerySource(dc->d_remote, g_useIncomingECS && !dc->d_ednssubnet.source.empty() ? boost::optional<const EDNSSubnetOpts&>(dc->d_ednssubnet) : boost::none);
@@ -1191,7 +1267,7 @@ static void startDoResolve(void *p)
         needCommit = true;
 
 #ifdef HAVE_PROTOBUF
-        if(luaconfsLocal->protobufServer && (i->d_type == QType::A || i->d_type == QType::AAAA || i->d_type == QType::CNAME)) {
+        if(t_protobufServer && (i->d_type == QType::A || i->d_type == QType::AAAA || i->d_type == QType::CNAME)) {
           pbMessage->addRR(*i);
         }
 #endif
@@ -1216,7 +1292,7 @@ static void startDoResolve(void *p)
     g_rs.submitResponse(dc->d_mdp.d_qtype, packet.size(), !dc->d_tcp);
     updateResponseStats(res, dc->d_source, packet.size(), &dc->d_mdp.d_qname, dc->d_mdp.d_qtype);
 #ifdef HAVE_PROTOBUF
-    if (luaconfsLocal->protobufServer && (!luaconfsLocal->protobufTaggedOnly || (appliedPolicy.d_name && !appliedPolicy.d_name->empty()) || !dc->d_policyTags.empty())) {
+    if (t_protobufServer && (!luaconfsLocal->protobufTaggedOnly || (appliedPolicy.d_name && !appliedPolicy.d_name->empty()) || !dc->d_policyTags.empty())) {
       pbMessage->setBytes(packet.size());
       pbMessage->setResponseCode(pw.getHeader()->rcode);
       if (appliedPolicy.d_name) {
@@ -1227,7 +1303,7 @@ static void startDoResolve(void *p)
       pbMessage->setQueryTime(dc->d_now.tv_sec, dc->d_now.tv_usec);
       pbMessage->setRequestorId(dq.requestorId);
       pbMessage->setDeviceId(dq.deviceId);
-      protobufLogResponse(luaconfsLocal->protobufServer, *pbMessage);
+      protobufLogResponse(t_protobufServer, *pbMessage);
     }
 #endif
     if(!dc->d_tcp) {
@@ -1561,7 +1637,7 @@ static void handleRunningTCPQuestion(int fd, FDMultiplexer::funcparam_t& var)
       string deviceId;
 #ifdef HAVE_PROTOBUF
       auto luaconfsLocal = g_luaconfs.getLocal();
-      if (luaconfsLocal->protobufServer) {
+      if (checkProtobufExport(luaconfsLocal)) {
         needECS = true;
       }
 #endif
@@ -1599,18 +1675,18 @@ static void handleRunningTCPQuestion(int fd, FDMultiplexer::funcparam_t& var)
         }
       }
 #ifdef HAVE_PROTOBUF
-      if(luaconfsLocal->protobufServer || luaconfsLocal->outgoingProtobufServer) {
+      if(t_protobufServer || t_outgoingProtobufServer) {
         dc->d_requestorId = requestorId;
         dc->d_deviceId = deviceId;
         dc->d_uuid = (*t_uuidGenerator)();
       }
 
-      if(luaconfsLocal->protobufServer) {
+      if(t_protobufServer) {
         try {
           const struct dnsheader* dh = (const struct dnsheader*) conn->data;
 
           if (!luaconfsLocal->protobufTaggedOnly) {
-            protobufLogQuery(luaconfsLocal->protobufServer, luaconfsLocal->protobufMaskV4, luaconfsLocal->protobufMaskV6, dc->d_uuid, dc->d_source, dc->d_destination, dc->d_ednssubnet.source, true, dh->id, conn->qlen, qname, qtype, qclass, dc->d_policyTags, dc->d_requestorId, dc->d_deviceId);
+            protobufLogQuery(t_protobufServer, luaconfsLocal->protobufMaskV4, luaconfsLocal->protobufMaskV6, dc->d_uuid, dc->d_source, dc->d_destination, dc->d_ednssubnet.source, true, dh->id, conn->qlen, qname, qtype, qclass, dc->d_policyTags, dc->d_requestorId, dc->d_deviceId);
           }
         }
         catch(std::exception& e) {
@@ -1727,10 +1803,10 @@ static string* doProcessUDPQuestion(const std::string& question, const ComboAddr
 #ifdef HAVE_PROTOBUF
   boost::uuids::uuid uniqueId;
   auto luaconfsLocal = g_luaconfs.getLocal();
-  if (luaconfsLocal->protobufServer) {
+  if (checkProtobufExport(luaconfsLocal)) {
     uniqueId = (*t_uuidGenerator)();
     needECS = true;
-  } else if (luaconfsLocal->outgoingProtobufServer) {
+  } else if (checkOutgoingProtobufExport(luaconfsLocal)) {
     uniqueId = (*t_uuidGenerator)();
   }
 #endif
@@ -1796,10 +1872,10 @@ static string* doProcessUDPQuestion(const std::string& question, const ComboAddr
     bool cacheHit = false;
     boost::optional<RecProtoBufMessage> pbMessage(boost::none);
 #ifdef HAVE_PROTOBUF
-    if(luaconfsLocal->protobufServer) {
+    if(t_protobufServer) {
       pbMessage = RecProtoBufMessage(DNSProtoBufMessage::DNSProtoBufMessageType::Response);
       if (!luaconfsLocal->protobufTaggedOnly || !policyTags.empty()) {
-        protobufLogQuery(luaconfsLocal->protobufServer, luaconfsLocal->protobufMaskV4, luaconfsLocal->protobufMaskV6, uniqueId, source, destination, ednssubnet.source, false, dh->id, question.size(), qname, qtype, qclass, policyTags, requestorId, deviceId);
+        protobufLogQuery(t_protobufServer, luaconfsLocal->protobufMaskV4, luaconfsLocal->protobufMaskV6, uniqueId, source, destination, ednssubnet.source, false, dh->id, question.size(), qname, qtype, qclass, policyTags, requestorId, deviceId);
       }
     }
 #endif /* HAVE_PROTOBUF */
@@ -1816,7 +1892,7 @@ static string* doProcessUDPQuestion(const std::string& question, const ComboAddr
 
     if (cacheHit) {
 #ifdef HAVE_PROTOBUF
-      if(luaconfsLocal->protobufServer && (!luaconfsLocal->protobufTaggedOnly || !pbMessage->getAppliedPolicy().empty() || !pbMessage->getPolicyTags().empty())) {
+      if(t_protobufServer && (!luaconfsLocal->protobufTaggedOnly || !pbMessage->getAppliedPolicy().empty() || !pbMessage->getPolicyTags().empty())) {
         Netmask requestorNM(source, source.sin4.sin_family == AF_INET ? luaconfsLocal->protobufMaskV4 : luaconfsLocal->protobufMaskV6);
         const ComboAddress& requestor = requestorNM.getMaskedNetwork();
         pbMessage->update(uniqueId, &requestor, &destination, false, dh->id);
@@ -1824,7 +1900,7 @@ static string* doProcessUDPQuestion(const std::string& question, const ComboAddr
         pbMessage->setQueryTime(g_now.tv_sec, g_now.tv_usec);
         pbMessage->setRequestorId(requestorId);
         pbMessage->setDeviceId(deviceId);
-        protobufLogResponse(luaconfsLocal->protobufServer, *pbMessage);
+        protobufLogResponse(t_protobufServer, *pbMessage);
       }
 #endif /* HAVE_PROTOBUF */
       if(!g_quiet)
@@ -1895,7 +1971,7 @@ static string* doProcessUDPQuestion(const std::string& question, const ComboAddr
   dc->d_ttlCap = ttlCap;
   dc->d_variable = variable;
 #ifdef HAVE_PROTOBUF
-  if (luaconfsLocal->protobufServer || luaconfsLocal->outgoingProtobufServer) {
+  if (t_protobufServer || t_outgoingProtobufServer) {
     dc->d_uuid = uniqueId;
   }
   dc->d_requestorId = requestorId;
@@ -3338,6 +3414,13 @@ try
 
   MT=std::unique_ptr<MTasker<PacketID,string> >(new MTasker<PacketID,string>(::arg().asNum("stack-size")));
 
+#ifdef HAVE_PROTOBUF
+  /* start protobuf export threads if needed */
+  auto luaconfsLocal = g_luaconfs.getLocal();
+  checkProtobufExport(luaconfsLocal);
+  checkOutgoingProtobufExport(luaconfsLocal);
+#endif /* HAVE_PROTOBUF */
+
   PacketID pident;
 
   t_fdm=getMultiplexer();
index 04b082bc65315c688a52bb1691f5f77409eb232d..2098fbfa0318d067b5983edd506db430d36beec0 100644 (file)
@@ -92,6 +92,9 @@ void loadRecursorLuaConfig(const std::string& fname, bool checkOnly)
   if(!ifs)
     throw PDNSException("Cannot open file '"+fname+"': "+strerror(errno));
 
+  auto luaconfsLocal = g_luaconfs.getLocal();
+  lci.generation = luaconfsLocal->generation + 1;
+
   Lua.writeFunction("clearSortlist", [&lci]() { lci.sortlist.clear(); });
   
   /* we can get: "1.2.3.4"
@@ -277,9 +280,28 @@ void loadRecursorLuaConfig(const std::string& fname, bool checkOnly)
   Lua.writeFunction("protobufServer", [&lci, checkOnly](const string& server_, const boost::optional<uint16_t> timeout, const boost::optional<uint64_t> maxQueuedEntries, const boost::optional<uint8_t> reconnectWaitTime, const boost::optional<uint8_t> maskV4, boost::optional<uint8_t> maskV6, boost::optional<bool> asyncConnect, boost::optional<bool> taggedOnly) {
       try {
        ComboAddress server(server_);
-        if (!lci.protobufServer) {
+        if (!lci.protobufExportConfig.enabled) {
+
+          lci.protobufExportConfig.enabled = true;
+
           if (!checkOnly) {
-            lci.protobufServer = std::make_shared<RemoteLogger>(server, timeout ? *timeout : 2, maxQueuedEntries ? *maxQueuedEntries : 100, reconnectWaitTime ? *reconnectWaitTime : 1, asyncConnect ? *asyncConnect : false);
+            lci.protobufExportConfig.server = server;
+
+            if (timeout) {
+              lci.protobufExportConfig.timeout = *timeout;
+            }
+
+            if (maxQueuedEntries) {
+              lci.protobufExportConfig.maxQueuedEntries = *maxQueuedEntries;
+            }
+
+            if (reconnectWaitTime) {
+              lci.protobufExportConfig.reconnectWaitTime = *reconnectWaitTime;
+            }
+
+            if (asyncConnect) {
+              lci.protobufExportConfig.asyncConnect = *asyncConnect;
+            }
           }
 
           if (maskV4) {
@@ -293,7 +315,7 @@ void loadRecursorLuaConfig(const std::string& fname, bool checkOnly)
           }
         }
         else {
-          g_log<<Logger::Error<<"Only one protobuf server can be configured, we already have "<<lci.protobufServer->toString()<<endl;
+          g_log<<Logger::Error<<"Only one protobuf server can be configured, we already have "<<lci.protobufExportConfig.server.toString()<<endl;
         }
       }
       catch(std::exception& e) {
@@ -307,13 +329,32 @@ void loadRecursorLuaConfig(const std::string& fname, bool checkOnly)
   Lua.writeFunction("outgoingProtobufServer", [&lci, checkOnly](const string& server_, const boost::optional<uint16_t> timeout, const boost::optional<uint64_t> maxQueuedEntries, const boost::optional<uint8_t> reconnectWaitTime, boost::optional<bool> asyncConnect) {
       try {
        ComboAddress server(server_);
-        if (!lci.outgoingProtobufServer) {
+        if (!lci.outgoingProtobufExportConfig.enabled) {
+
+          lci.outgoingProtobufExportConfig.enabled = true;
+
           if (!checkOnly) {
-            lci.outgoingProtobufServer = std::make_shared<RemoteLogger>(server, timeout ? *timeout : 2, maxQueuedEntries ? *maxQueuedEntries : 100, reconnectWaitTime ? *reconnectWaitTime : 1, asyncConnect ? *asyncConnect : false);
+            lci.outgoingProtobufExportConfig.server = server;
+
+            if (timeout) {
+              lci.outgoingProtobufExportConfig.timeout = *timeout;
+            }
+
+            if (maxQueuedEntries) {
+              lci.outgoingProtobufExportConfig.maxQueuedEntries = *maxQueuedEntries;
+            }
+
+            if (reconnectWaitTime) {
+              lci.outgoingProtobufExportConfig.reconnectWaitTime = *reconnectWaitTime;
+            }
+
+            if (asyncConnect) {
+              lci.outgoingProtobufExportConfig.asyncConnect = *asyncConnect;
+            }
           }
         }
         else {
-          g_log<<Logger::Error<<"Only one protobuf server can be configured, we already have "<<lci.protobufServer->toString()<<endl;
+          g_log<<Logger::Error<<"Only one protobuf server can be configured, we already have "<<lci.outgoingProtobufExportConfig.server.toString()<<endl;
         }
       }
       catch(std::exception& e) {
index 62d83b0bf698fd87fe469247db64f9a05e796a45..2d3ef4185cf10265e6c4ea385763a4ebbac2d6c9 100644 (file)
 #include "sholder.hh"
 #include "sortlist.hh"
 #include "filterpo.hh"
-#include "remote_logger.hh"
 #include "validate.hh"
 
+struct ProtobufExportConfig
+{
+  ComboAddress server;
+  uint64_t maxQueuedEntries{100};
+  uint16_t timeout{2};
+  uint16_t reconnectWaitTime{1};
+  bool asyncConnect{false};
+  bool enabled{false};
+};
+
 class LuaConfigItems 
 {
 public:
@@ -34,8 +43,12 @@ public:
   DNSFilterEngine dfe;
   map<DNSName,dsmap_t> dsAnchors;
   map<DNSName,std::string> negAnchors;
-  std::shared_ptr<RemoteLogger> protobufServer{nullptr};
-  std::shared_ptr<RemoteLogger> outgoingProtobufServer{nullptr};
+  /* we need to increment this every time the configuration
+     is reloaded, so we know if we need to reload the protobuf
+     remote loggers */
+  ProtobufExportConfig protobufExportConfig;
+  ProtobufExportConfig outgoingProtobufExportConfig;
+  uint64_t generation{0};
   uint8_t protobufMaskV4{32};
   uint8_t protobufMaskV6{128};
   bool protobufTaggedOnly{false};
index 9cda5925e2d15d81fab5bda3d5b2149704f41cc9..70b29f701bdee3f6f35793225f018b86d3270dba 100644 (file)
@@ -75,7 +75,7 @@ void RemoteLogger::worker()
 void RemoteLogger::queueData(const std::string& data)
 {
   {
-    std::unique_lock<std::mutex> lock(d_writeMutex);
+    std::lock_guard<std::mutex> lock(d_writeMutex);
     if (d_writeQueue.size() >= d_maxQueuedEntries) {
       d_writeQueue.pop();
     }
index 6f72840c610f343b3d41840fad43aac0a916a501..a0fec492e1eedab0e7d034f1dcee9050cd187ae9 100644 (file)
@@ -49,6 +49,18 @@ public:
   {
     return "RemoteLogger to " + d_remote.toStringWithPort();
   }
+  void stop()
+  {
+    d_exiting = true;
+  }
+  uint64_t getGeneration() const
+  {
+    return d_generation;
+  }
+  void setGeneration(uint64_t newGeneration)
+  {
+    d_generation = newGeneration;
+  }
 private:
   void busyReconnectLoop();
   bool reconnect();
@@ -59,6 +71,7 @@ private:
   std::condition_variable d_queueCond;
   ComboAddress d_remote;
   uint64_t d_maxQueuedEntries;
+  uint64_t d_generation{0};
   int d_socket{-1};
   uint16_t d_timeout;
   uint8_t d_reconnectWaitTime;
index b765958a4f943664c97f9b9d53e7a04fbffd5fe6..b4d9824158e0aa2952890bc77ea72145779528b2 100644 (file)
@@ -464,10 +464,10 @@ int SyncRes::asyncresolveWrapper(const ComboAddress& ip, bool ednsMANDATORY, con
       sendQname.makeUsLowerCase();
 
     if (d_asyncResolve) {
-      ret = d_asyncResolve(ip, sendQname, type, doTCP, sendRDQuery, EDNSLevel, now, srcmask, ctx, luaconfsLocal->outgoingProtobufServer, res, chained);
+      ret = d_asyncResolve(ip, sendQname, type, doTCP, sendRDQuery, EDNSLevel, now, srcmask, ctx, d_outgoingProtobufServer, res, chained);
     }
     else {
-      ret=asyncresolve(ip, sendQname, type, doTCP, sendRDQuery, EDNSLevel, now, srcmask, ctx, luaconfsLocal->outgoingProtobufServer, res, chained);
+      ret=asyncresolve(ip, sendQname, type, doTCP, sendRDQuery, EDNSLevel, now, srcmask, ctx, d_outgoingProtobufServer, res, chained);
     }
     if(ret < 0) {
       return ret; // transport error, nothing to learn here
index e953d6cfcbf4e96ed124c2b67303fff849a83942..b10b440761a73177ba733b89821f9f0f116c0a7c 100644 (file)
@@ -649,6 +649,11 @@ public:
   {
     d_initialRequestId = initialRequestId;
   }
+
+  void setOutgoingProtobufServer(std::shared_ptr<RemoteLogger>& server)
+  {
+    d_outgoingProtobufServer = server;
+  }
 #endif
 
   void setAsyncCallback(asyncresolve_t func)
@@ -790,6 +795,7 @@ private:
   ostringstream d_trace;
   shared_ptr<RecursorLua4> d_pdl;
   boost::optional<Netmask> d_outgoingECSNetwork;
+  std::shared_ptr<RemoteLogger> d_outgoingProtobufServer{nullptr};
 #ifdef HAVE_PROTOBUF
   boost::optional<const boost::uuids::uuid&> d_initialRequestId;
 #endif