]> granicus.if.org Git - pdns/commitdiff
rec: Use one listening socket per thread when reuseport is enabled
authorRemi Gacogne <remi.gacogne@powerdns.com>
Wed, 1 Mar 2017 09:36:33 +0000 (10:36 +0100)
committerRemi Gacogne <remi.gacogne@powerdns.com>
Tue, 7 Mar 2017 08:41:08 +0000 (09:41 +0100)
Except if `pdns-distributes-queries` is true, of course.
We used to shared the same listening socket between all threads
when `pdns-distributes-queries` is set to false, even with `reuseport`
set to true and `SO_REUSEPORT` support available. After this commit:
* if `pdns-distributes-queries` is true, the distributor thread
is still the only one listening to incoming queries
* if `pdns-distributes-queries` is false and either `reuseport` is
false or `SO_REUSEPORT` support is not available, all threads share
the same listening socket as it was before
* if `pdns-distributes-queries` is false, `SO_REUSEPORT` support is
available and `reuseport` is true, we open a separate listening
socket per thread to let the kernel distribute the incoming queries
for us, avoiding any thundering herd issue as well as the distributor
thread being a bottleneck.

docs/markdown/recursor/settings.md
pdns/pdns_recursor.cc

index cb71da62afc05ce56edcbe9c5ac6d9bb159a1087..09bc266a291c64bef0aeb9fbea067b75cc02a46b 100644 (file)
@@ -748,6 +748,16 @@ which also disables outgoing IPv6 support.
 
 Don't log queries.
 
+## `reuseport`
+* Boolean
+* Default: no
+
+If `SO_REUSEPORT` support is available, allows multiple processes to open a
+listening socket on the same port. Since 4.1.0, when `pdns-distributes-queries` is set to
+false and `reuseport` is enabled, every thread will open a separate listening socket to let
+the kernel distribute the incoming queries, avoiding any thundering herd issue as well as
+the distributor thread being a bottleneck, thus leading to much higher performance on multi-core boxes.
+
 ## `root-nx-trust`
 * Boolean
 * Default: no (<= 4.0.0), yes
index 5323043f4a18b5bead9dfc139a4075e81e01f106..b05b94021a38e3d2dd9bd8322be352ce3b80e447 100644 (file)
@@ -122,6 +122,7 @@ struct ThreadPipeSet
   int writeFromThread;
   int readFromThread;
 };
+
 typedef vector<int> tcpListenSockets_t;
 typedef map<int, ComboAddress> listenSocketsAddresses_t; // is shared across all threads right now
 typedef vector<pair<int, function< void(int, any&) > > > deferredAdd_t;
@@ -130,7 +131,7 @@ static const ComboAddress g_local4("0.0.0.0"), g_local6("::");
 static vector<ThreadPipeSet> g_pipes; // effectively readonly after startup
 static tcpListenSockets_t g_tcpListenSockets;   // shared across threads, but this is fine, never written to from a thread. All threads listen on all sockets
 static listenSocketsAddresses_t g_listenSocketsAddresses; // is shared across all threads right now
-static deferredAdd_t deferredAdd;
+static std::unordered_map<unsigned int, deferredAdd_t> deferredAdds;
 static set<int> g_fromtosockets; // listen sockets that use 'sendfromto()' mechanism
 static vector<ComboAddress> g_localQueryAddresses4, g_localQueryAddresses6;
 static AtomicCounter counter;
@@ -151,6 +152,8 @@ static bool g_logCommonErrors;
 static bool g_anyToTcp;
 static bool g_lowercaseOutgoing;
 static bool g_weDistributeQueries; // if true, only 1 thread listens on the incoming query sockets
+static bool g_reusePort{false};
+static bool g_useOneSocketPerThread;
 
 std::unordered_set<DNSName> g_delegationOnly;
 RecursorControlChannel s_rcc; // only active in thread 0
@@ -1787,7 +1790,7 @@ static void handleNewUDPQuestion(int fd, FDMultiplexer::funcparam_t& var)
   }
 }
 
-static void makeTCPServerSockets()
+static void makeTCPServerSockets(unsigned int threadId)
 {
   int fd;
   vector<string>locals;
@@ -1818,7 +1821,7 @@ static void makeTCPServerSockets()
     setCloseOnExec(fd);
 
     int tmp=1;
-    if(setsockopt(fd,SOL_SOCKET,SO_REUSEADDR,(char*)&tmp,sizeof tmp)<0) {
+    if(setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &tmp, sizeof tmp)<0) {
       L<<Logger::Error<<"Setsockopt failed for TCP listening socket"<<endl;
       exit(1);
     }
@@ -1827,7 +1830,7 @@ static void makeTCPServerSockets()
     }
 
 #ifdef TCP_DEFER_ACCEPT
-    if(setsockopt(fd, SOL_TCP,TCP_DEFER_ACCEPT,(char*)&tmp,sizeof tmp) >= 0) {
+    if(setsockopt(fd, SOL_TCP, TCP_DEFER_ACCEPT, &tmp, sizeof tmp) >= 0) {
       if(i==locals.begin())
         L<<Logger::Error<<"Enabled TCP data-ready filter for (slight) DoS protection"<<endl;
     }
@@ -1837,9 +1840,8 @@ static void makeTCPServerSockets()
        Utility::setBindAny(AF_INET, fd);
 
 #ifdef SO_REUSEPORT
-    if(::arg().mustDo("reuseport")) {
-      int one=1;
-      if(setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &one, sizeof(one)) < 0)
+    if(g_reusePort) {
+      if(setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &tmp, sizeof(tmp)) < 0)
         throw PDNSException("SO_REUSEPORT: "+stringerror());
     }
 #endif
@@ -1852,7 +1854,7 @@ static void makeTCPServerSockets()
     setNonBlocking(fd);
     setSocketSendBuffer(fd, 65000);
     listen(fd, 128);
-    deferredAdd.push_back(make_pair(fd, handleNewTCPQuestion));
+    deferredAdds[threadId].push_back(make_pair(fd, handleNewTCPQuestion));
     g_tcpListenSockets.push_back(fd);
     // we don't need to update g_listenSocketsAddresses since it doesn't work for TCP/IP:
     //  - fd is not that which we know here, but returned from accept()
@@ -1863,7 +1865,7 @@ static void makeTCPServerSockets()
   }
 }
 
-static void makeUDPServerSockets()
+static void makeUDPServerSockets(unsigned int threadId)
 {
   int one=1;
   vector<string>locals;
@@ -1917,7 +1919,7 @@ static void makeUDPServerSockets()
 
   
 #ifdef SO_REUSEPORT  
-    if(::arg().mustDo("reuseport")) {
+    if(g_reusePort) {
       if(setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &one, sizeof(one)) < 0)
         throw PDNSException("SO_REUSEPORT: "+stringerror());
     }
@@ -1928,7 +1930,7 @@ static void makeUDPServerSockets()
 
     setNonBlocking(fd);
 
-    deferredAdd.push_back(make_pair(fd, handleNewUDPQuestion));
+    deferredAdds[threadId].push_back(make_pair(fd, handleNewUDPQuestion));
     g_listenSocketsAddresses[fd]=sin;  // this is written to only from the startup thread, not from the workers
     if(sin.sin4.sin_family == AF_INET)
       L<<Logger::Error<<"Listening for UDP queries on "<< sin.toString() <<":"<<st.port<<endl;
@@ -2784,8 +2786,26 @@ static int serviceMain(int argc, char*argv[])
 
   g_lowercaseOutgoing = ::arg().mustDo("lowercase-outgoing");
 
-  makeUDPServerSockets();
-  makeTCPServerSockets();
+  g_numWorkerThreads = ::arg().asNum("threads");
+  g_numThreads = g_numWorkerThreads + g_weDistributeQueries;
+  g_maxMThreads = ::arg().asNum("max-mthreads");
+
+#ifdef SO_REUSEPORT
+  g_reusePort = ::arg().mustDo("reuseport");
+#endif
+
+  g_useOneSocketPerThread = (!g_weDistributeQueries && g_reusePort);
+
+  if (g_useOneSocketPerThread) {
+    for (unsigned int threadId = 0; threadId < g_numWorkerThreads; threadId++) {
+      makeUDPServerSockets(threadId);
+      makeTCPServerSockets(threadId);
+    }
+  }
+  else {
+    makeUDPServerSockets(0);
+    makeTCPServerSockets(0);
+  }
 
   parseEDNSSubnetWhitelist(::arg()["edns-subnet-whitelist"]);
   g_useIncomingECS = ::arg().mustDo("use-incoming-edns-subnet");
@@ -2805,9 +2825,7 @@ static int serviceMain(int argc, char*argv[])
   signal(SIGUSR1,usr1Handler);
   signal(SIGUSR2,usr2Handler);
   signal(SIGPIPE,SIG_IGN);
-  g_numThreads = ::arg().asNum("threads") + ::arg().mustDo("pdns-distributes-queries");
-  g_numWorkerThreads = ::arg().asNum("threads");
-  g_maxMThreads = ::arg().asNum("max-mthreads");
+
   checkOrFixFDS();
 
 #ifdef HAVE_LIBSODIUM
@@ -2959,9 +2977,20 @@ try
 
   t_fdm->addReadFD(g_pipes[t_id].readToThread, handlePipeRequest);
 
-  if(!g_weDistributeQueries || !t_id)  // if we distribute queries, only t_id = 0 listens
-    for(deferredAdd_t::const_iterator i=deferredAdd.begin(); i!=deferredAdd.end(); ++i)
-      t_fdm->addReadFD(i->first, i->second);
+  if(g_useOneSocketPerThread) {
+    for (unsigned int threadId = 0; threadId < g_numWorkerThreads; threadId++) {
+      for(deferredAdd_t::const_iterator i = deferredAdds[threadId].begin(); i != deferredAdds[threadId].end(); ++i) {
+        t_fdm->addReadFD(i->first, i->second);
+      }
+    }
+  }
+  else {
+    if(!g_weDistributeQueries || !t_id) { // if we distribute queries, only t_id = 0 listens
+      for(deferredAdd_t::const_iterator i = deferredAdds[0].begin(); i != deferredAdds[0].end(); ++i) {
+        t_fdm->addReadFD(i->first, i->second);
+      }
+    }
+  }
 
   if(!t_id) {
     t_fdm->addReadFD(s_rcc.d_fd, handleRCC); // control channel