]> granicus.if.org Git - pdns/commitdiff
dnsdist: Fix TCP clients threads vector and counters initialization
authorRemi Gacogne <rgacogne-github@coredump.fr>
Tue, 26 Jan 2016 16:16:12 +0000 (17:16 +0100)
committerRemi Gacogne <rgacogne-github@coredump.fr>
Tue, 26 Jan 2016 16:16:12 +0000 (17:16 +0100)
By tracking the FD leak reported in #3300, I observed that:
* we could create up to g_maxTCPClientThreads TCP threads,
but the corresponding vector size was hardcoded at 1024
(which the default for g_maxTCPClientThreads)
* the counters were not explicitely initialized

This commit fixes that and adds some additional checks to make
sure we don't add more TCP client threads, as that could lead to
a race if the vector is resized.

pdns/dnsdist-tcp.cc
pdns/dnsdist.cc
pdns/dnsdist.hh

index 78fa765b5f2ba63e8569248c721f2a4b72b70b8e..61c8f0cb0892c3b51680f10729a5c71d7bf024d2 100644 (file)
@@ -69,17 +69,22 @@ void* tcpClientThread(int pipefd);
 
 // Should not be called simultaneously!
 void TCPClientCollection::addTCPClientThread()
-{  
+{
+  if (d_numthreads >= d_tcpclientthreads.capacity()) {
+    warnlog("Adding a new TCP client thread would exceed the vector capacity, skipping");
+    return;
+  }
+
   vinfolog("Adding TCP Client thread");
 
-  int pipefds[2];
+  int pipefds[2] = { -1, -1};
   if(pipe(pipefds) < 0)
     unixDie("Creating pipe");
 
   if (!setNonBlocking(pipefds[1]))
     unixDie("Setting pipe non-blocking");
 
-  d_tcpclientthreads.push_back(pipefds[1]);    
+  d_tcpclientthreads.push_back(pipefds[1]);
   thread t1(tcpClientThread, pipefds[0]);
   t1.detach();
   ++d_numthreads;
@@ -124,7 +129,7 @@ catch(...) {
   return false;
 }
 
-TCPClientCollection g_tcpclientthreads;
+std::shared_ptr<TCPClientCollection> g_tcpclientthreads;
 
 void* tcpClientThread(int pipefd)
 {
@@ -150,7 +155,7 @@ void* tcpClientThread(int pipefd)
     ConnectionInfo* citmp, ci;
 
     readn2(pipefd, &citmp, sizeof(citmp));
-    --g_tcpclientthreads.d_queued;
+    --g_tcpclientthreads->d_queued;
     ci=*citmp;
     delete citmp;    
 
@@ -497,7 +502,7 @@ void* tcpAcceptorThread(void* p)
   ComboAddress remote;
   remote.sin4.sin_family = cs->local.sin4.sin_family;
   
-  g_tcpclientthreads.addTCPClientThread();
+  g_tcpclientthreads->addTCPClientThread();
 
   auto acl = g_ACL.getLocal();
   for(;;) {
@@ -521,8 +526,14 @@ void* tcpAcceptorThread(void* p)
       vinfolog("Got TCP connection from %s", remote.toStringWithPort());
       
       ci->remote = remote;
-      int pipe = g_tcpclientthreads.getThread();
-      writen2WithTimeout(pipe, &ci, sizeof(ci), 0);
+      int pipe = g_tcpclientthreads->getThread();
+      if (pipe >= 0) {
+        writen2WithTimeout(pipe, &ci, sizeof(ci), 0);
+      }
+      else {
+        close(ci->fd);
+        delete ci;
+      }
     }
     catch(std::exception& e) {
       errlog("While reading a TCP question: %s", e.what());
index 378bf82dec297139b470ee5de70eafd1af50a939..215a8fa7c84a3d6eb21a070882483f26abc859cf 100644 (file)
@@ -853,8 +853,8 @@ void* maintThread()
   for(;;) {
     sleep(interval);
 
-    if(g_tcpclientthreads.d_queued > 1 && g_tcpclientthreads.d_numthreads < g_maxTCPClientThreads)
-      g_tcpclientthreads.addTCPClientThread();
+    if(g_tcpclientthreads->d_queued > 1 && g_tcpclientthreads->d_numthreads < g_maxTCPClientThreads)
+      g_tcpclientthreads->addTCPClientThread();
 
     for(auto& dss : g_dstates.getCopy()) { // this points to the actual shared_ptrs!
       if(dss->availability==DownstreamState::Availability::Auto) {
@@ -1265,6 +1265,8 @@ try
   /* this need to be done _after_ dropping privileges */
   g_delay = new DelayPipe<DelayedPacket>();
 
+  g_tcpclientthreads = std::make_shared<TCPClientCollection>(g_maxTCPClientThreads);
+
   for(auto& t : todo)
     t();
 
index ec0ceb38cf25787978c495ea97e27fa3c008b718..4da00c8b7c15aabac3dec615afeffb1c387c16de 100644 (file)
@@ -258,16 +258,16 @@ struct ClientState
 
 class TCPClientCollection {
   std::vector<int> d_tcpclientthreads;
-  std::atomic<uint64_t> d_pos;
+  std::atomic<uint64_t> d_pos{0};
 public:
-  std::atomic<uint64_t> d_queued, d_numthreads;
+  std::atomic<uint64_t> d_queued{0}, d_numthreads{0};
 
-  TCPClientCollection()
+  TCPClientCollection(size_t maxThreads)
   {
-    d_tcpclientthreads.reserve(1024);
+    d_tcpclientthreads.reserve(maxThreads);
   }
 
-  int getThread() 
+  int getThread()
   {
     int pos = d_pos++;
     ++d_queued;
@@ -276,7 +276,7 @@ public:
   void addTCPClientThread();
 };
 
-extern TCPClientCollection g_tcpclientthreads;
+extern std::shared_ptr<TCPClientCollection> g_tcpclientthreads;
 
 struct DownstreamState
 {