]> granicus.if.org Git - pdns/commitdiff
rec: Move carbon/webserver/control/stats handling to a separate thread
authorRemi Gacogne <remi.gacogne@powerdns.com>
Thu, 3 May 2018 12:27:18 +0000 (13:27 +0100)
committerRemi Gacogne <remi.gacogne@powerdns.com>
Tue, 15 May 2018 13:21:01 +0000 (15:21 +0200)
This makes sure that no worker or distributor thread will get blocked
while waiting for a response from another thread, for example while
gathering stats or executing a command coming from the control
channel.

(cherry picked from commit b8dc631ae02733f35130af7b27ccb9342069e2d8)

pdns/pdns_recursor.cc

index 1371ea605ae8aeaab06d4200f4fe664f19c93510..bcc15d2f716c81d9ca90ca340147eafaf9c8d048 100644 (file)
@@ -97,7 +97,7 @@
 typedef map<ComboAddress, uint32_t, ComboAddress::addressOnlyLessThan> tcpClientCounts_t;
 
 static thread_local std::shared_ptr<RecursorLua4> t_pdl;
-static thread_local unsigned int t_id;
+static thread_local int t_id;
 static thread_local std::shared_ptr<Regex> t_traceRegex;
 static thread_local std::unique_ptr<tcpClientCounts_t> t_tcpClientCounts;
 
@@ -238,7 +238,7 @@ ArgvMap &arg()
 
 unsigned int getRecursorThreadId()
 {
-  return t_id;
+  return static_cast<unsigned int>(t_id);
 }
 
 int getMTaskerTID()
@@ -2172,11 +2172,13 @@ static void houseKeeping(void *)
         last_rootupdate=now.tv_sec;
     }
 
-    if(!t_id) {
+    if (t_id == -1) {
       if(g_statisticsInterval > 0 && now.tv_sec - last_stat >= g_statisticsInterval) {
        doStats();
        last_stat=time(0);
       }
+    }
+    else if(!t_id) {
 
       if(now.tv_sec - last_secpoll >= 3600) {
        try {
@@ -2239,7 +2241,13 @@ struct ThreadMSG
 
 void broadcastFunction(const pipefunc_t& func, bool skipSelf)
 {
-  unsigned int n = 0;
+  /* This function might be called by the worker with t_id 0 during startup */
+  if (t_id != -1 && t_id != 0) {
+    L<<Logger::Error<<"broadcastFunction() has been called by a worker ("<<t_id<<")"<<endl;
+    exit(1);
+  }
+
+  int n = 0;
   for(ThreadPipeSet& tps : g_pipes)
   {
     if(n++ == t_id) {
@@ -2269,13 +2277,19 @@ void broadcastFunction(const pipefunc_t& func, bool skipSelf)
 
 void distributeAsyncFunction(const string& packet, const pipefunc_t& func)
 {
+  if (t_id != 0) {
+    L<<Logger::Error<<"distributeAsyncFunction() has been called by a worker ("<<t_id<<")"<<endl;
+    exit(1);
+  }
+
   unsigned int hash = hashQuestion(packet.c_str(), packet.length(), g_disthashseed);
   unsigned int target = 1 + (hash % (g_pipes.size()-1));
 
-  if(target == t_id) {
-    func();
-    return;
+  if(target == 0) {
+    L<<Logger::Error<<"distributeAsyncFunction() tried to assign a query to the distributor"<<endl;
+    exit(1);
   }
+
   ThreadPipeSet& tps = g_pipes[target];
   ThreadMSG* tmsg = new ThreadMSG();
   tmsg->func = func;
@@ -2343,22 +2357,15 @@ vector<pair<DNSName, uint16_t> >& operator+=(vector<pair<DNSName, uint16_t> >&a,
 
 template<class T> T broadcastAccFunction(const boost::function<T*()>& func, bool skipSelf)
 {
-  unsigned int n = 0;
+  if (t_id != -1) {
+    L<<Logger::Error<<"broadcastFunction has been called by a worker ("<<t_id<<")"<<endl;
+    exit(1);
+
+  }
+
   T ret=T();
   for(ThreadPipeSet& tps : g_pipes)
   {
-    if(n++ == t_id) {
-      if(!skipSelf) {
-        T* resp = (T*)func(); // don't write to ourselves!
-        if(resp) {
-          //~ cerr <<"got direct: " << *resp << endl;
-          ret += *resp;
-          delete resp;
-        }
-      }
-      continue;
-    }
-
     ThreadMSG* tmsg = new ThreadMSG();
     tmsg->func = boost::bind(voider<T>, func);
     tmsg->wantAnswer = true;
@@ -2680,7 +2687,7 @@ static void checkOrFixFDS()
   }
 }
 
-static void* recursorThread(void*);
+static void* recursorThread(int tid, bool worker);
 
 static void* pleaseSupplantACLs(std::shared_ptr<NetmaskGroup> ng)
 {
@@ -3135,36 +3142,39 @@ static int serviceMain(int argc, char*argv[])
     g_snmpAgent->run();
   }
 
+  /* This thread handles the web server, carbon, statistics and the control channel */
+  std::thread handlerThread(recursorThread, -1, false);
+
   const auto cpusMap = parseCPUMap();
+
+  std::vector<std::thread> workers(g_numThreads);
   if(g_numThreads == 1) {
     L<<Logger::Warning<<"Operating unthreaded"<<endl;
 #ifdef HAVE_SYSTEMD
     sd_notify(0, "READY=1");
 #endif
     setCPUMap(cpusMap, 0, pthread_self());
-    recursorThread(0);
+    recursorThread(0, true);
   }
   else {
-    pthread_t tid;
     L<<Logger::Warning<<"Launching "<< g_numThreads <<" threads"<<endl;
     for(unsigned int n=0; n < g_numThreads; ++n) {
-      pthread_create(&tid, 0, recursorThread, (void*)(long)n);
+      workers[n] = std::thread(recursorThread, n, true);
 
-      setCPUMap(cpusMap, n, tid);
+      setCPUMap(cpusMap, n, workers[n].native_handle());
     }
-    void* res;
 #ifdef HAVE_SYSTEMD
     sd_notify(0, "READY=1");
 #endif
-    pthread_join(tid, &res);
+    workers.back().join();
   }
   return 0;
 }
 
-static void* recursorThread(void* ptr)
+static void* recursorThread(int n, bool worker)
 try
 {
-  t_id=(int) (long) ptr;
+  t_id=n;
   SyncRes tmp(g_now); // make sure it allocates tsstorage before we do anything, like primeHints or so..
   SyncRes::setDomainMap(g_initialDomainMap);
   t_allowFrom = g_initialAllowFrom;
@@ -3213,7 +3223,8 @@ try
   PacketID pident;
 
   t_fdm=getMultiplexer();
-  if(!t_id) {
+
+  if(!worker) {
     if(::arg().mustDo("webserver")) {
       L<<Logger::Warning << "Enabling web server" << endl;
       try {
@@ -3226,24 +3237,26 @@ try
     }
     L<<Logger::Error<<"Enabled '"<< t_fdm->getName() << "' multiplexer"<<endl;
   }
-
-  t_fdm->addReadFD(g_pipes[t_id].readToThread, handlePipeRequest);
-
-  if(g_useOneSocketPerThread) {
-    for(deferredAdd_t::const_iterator i = deferredAdds[t_id].cbegin(); i != deferredAdds[t_id].cend(); ++i) {
-      t_fdm->addReadFD(i->first, i->second);
-    }
-  }
   else {
-    if(!g_weDistributeQueries || !t_id) { // if we distribute queries, only t_id = 0 listens
-      for(deferredAdd_t::const_iterator i = deferredAdds[0].cbegin(); i != deferredAdds[0].cend(); ++i) {
+    t_fdm->addReadFD(g_pipes[t_id].readToThread, handlePipeRequest);
+
+    if(g_useOneSocketPerThread) {
+      for(deferredAdd_t::const_iterator i = deferredAdds[t_id].cbegin(); i != deferredAdds[t_id].cend(); ++i) {
         t_fdm->addReadFD(i->first, i->second);
       }
     }
+    else {
+      if(!g_weDistributeQueries || !t_id) { // if we distribute queries, only t_id = 0 listens
+        for(deferredAdd_t::const_iterator i = deferredAdds[0].cbegin(); i != deferredAdds[0].cend(); ++i) {
+          t_fdm->addReadFD(i->first, i->second);
+        }
+      }
+    }
   }
 
   registerAllStats();
-  if(!t_id) {
+
+  if(!worker) {
     t_fdm->addReadFD(s_rcc.d_fd, handleRCC); // control channel
   }
 
@@ -3275,13 +3288,13 @@ try
 
     counter++;
 
-    if(!t_id && statsWanted) {
+    if(!worker && statsWanted) {
       doStats();
     }
 
     Utility::gettimeofday(&g_now, 0);
 
-    if(!t_id && (g_now.tv_sec - last_carbon >= carbonInterval)) {
+    if(!worker && (g_now.tv_sec - last_carbon >= carbonInterval)) {
       MT->makeThread(doCarbonDump, 0);
       last_carbon = g_now.tv_sec;
     }
@@ -3289,7 +3302,7 @@ try
     t_fdm->run(&g_now);
     // 'run' updates g_now for us
 
-    if(!g_weDistributeQueries || !t_id) { // if pdns distributes queries, only tid 0 should do this
+    if(worker && (!g_weDistributeQueries || !t_id)) { // if pdns distributes queries, only tid 0 should do this
       if(listenOnTCP) {
        if(TCPConnection::getCurrentConnections() > maxTcpClients) {  // shutdown, too many connections
          for(tcpListenSockets_t::iterator i=g_tcpListenSockets.begin(); i != g_tcpListenSockets.end(); ++i)