]> granicus.if.org Git - pdns/commitdiff
implement pdns-distributes-queries to make powerdns distribute queries itself
authorBert Hubert <bert.hubert@netherlabs.nl>
Sun, 8 Aug 2010 19:59:19 +0000 (19:59 +0000)
committerBert Hubert <bert.hubert@netherlabs.nl>
Sun, 8 Aug 2010 19:59:19 +0000 (19:59 +0000)
implement 'processes', bringing back the old-school '--fork' option

git-svn-id: svn://svn.powerdns.com/pdns/trunk/pdns@1684 d19b8d6e-7fed-0310-83ef-9ca221ded41b

pdns/pdns_recursor.cc

index 489d4d841e48250a3bbc3d8a9ff4e142a9ba8fd6..a728ed7d66a98ff9d8b0959cd860cffb81b7d38e 100644 (file)
@@ -99,6 +99,8 @@ __thread RecursorPacketCache* t_packetCache;
 RecursorStats g_stats;
 bool g_quiet;
 
+bool g_weDistributeQueries; // if true, only 1 thread listens on the incoming query sockets
+
 static __thread NetmaskGroup* t_allowFrom;
 static NetmaskGroup* g_initialAllowFrom; // new thread needs to be setup with this
 
@@ -810,6 +812,47 @@ void handleNewTCPQuestion(int fd, FDMultiplexer::funcparam_t& )
   }
 }
  
+string* doProcessUDPQuestion(const std::string& question, const ComboAddress& fromaddr, int fd)
+{
+  ++g_stats.qcounter;
+
+  string response;
+  try {
+    uint32_t age;
+    if(!SyncRes::s_nopacketcache && t_packetCache->getResponsePacket(question, g_now.tv_sec, &response, &age)) {
+      if(!g_quiet)
+       L<<Logger::Error<<t_id<< " question answered from packet cache from "<<fromaddr.toString()<<endl;
+
+      g_stats.packetCacheHits++;
+      SyncRes::s_queries++;
+      ageDNSPacket(response, age);
+      sendto(fd, response.c_str(), response.length(), 0, (struct sockaddr*) &fromaddr, fromaddr.getSocklen());
+      if(response.length() >= sizeof(struct dnsheader))
+       updateRcodeStats(((struct dnsheader*)response.c_str())->rcode);
+      g_stats.avgLatencyUsec=(uint64_t)((1-0.0001)*g_stats.avgLatencyUsec + 0); // we assume 0 usec
+      return 0;
+    }
+  } 
+  catch(std::exception& e) {
+    L<<Logger::Error<<"Error processing or aging answer packet: "<<e.what()<<endl;
+    return 0;
+  }
+  
+  
+  if(MT->numProcesses() > g_maxMThreads) {
+    g_stats.overCapacityDrops++;
+    return 0;
+  }
+  
+  DNSComboWriter* dc = new DNSComboWriter(question.c_str(), question.size(), g_now);
+  dc->setSocket(fd);
+  dc->setRemote(&fromaddr);
+
+  dc->d_tcp=false;
+  MT->makeThread(startDoResolve, (void*) dc); // deletes dc
+  return 0;
+} 
 void handleNewUDPQuestion(int fd, FDMultiplexer::funcparam_t& var)
 {
   int len;
@@ -835,40 +878,11 @@ void handleNewUDPQuestion(int fd, FDMultiplexer::funcparam_t& var)
           L<<Logger::Error<<"Ignoring answer from "<<fromaddr.toString()<<" on server socket!"<<endl;
       }
       else {
-        ++g_stats.qcounter;
-
-        string response;
-        try {
-         uint32_t age;
-          if(!SyncRes::s_nopacketcache && t_packetCache->getResponsePacket(string(data, len), g_now.tv_sec, &response, &age)) {
-            if(!g_quiet)
-              L<<Logger::Error<<t_id<< " question answered from packet cache from "<<fromaddr.toString()<<endl;
-  
-            g_stats.packetCacheHits++;
-            SyncRes::s_queries++;
-           ageDNSPacket(response, age);
-            sendto(fd, response.c_str(), response.length(), 0, (struct sockaddr*) &fromaddr, fromaddr.getSocklen());
-            if(response.length() >= sizeof(struct dnsheader))
-              updateRcodeStats(((struct dnsheader*)response.c_str())->rcode);
-            g_stats.avgLatencyUsec=(uint64_t)((1-0.0001)*g_stats.avgLatencyUsec + 0); // we assume 0 usec
-            return;
-          }
-        } 
-        catch(std::exception& e) {
-          throw MOADNSException(e.what()); // translate
-        }
-        if(MT->numProcesses() > g_maxMThreads) {
-          g_stats.overCapacityDrops++;
-          return;
-        }
-  
-        DNSComboWriter* dc = new DNSComboWriter(data, len, g_now);
-        dc->setSocket(fd);
-        dc->setRemote(&fromaddr);
-
-        dc->d_tcp=false;
-
-        MT->makeThread(startDoResolve, (void*) dc); // deletes dc
+       string question(data, len);
+       if(g_weDistributeQueries)
+         distributeAsyncFunction(boost::bind(doProcessUDPQuestion, question, fromaddr, fd));
+       else
+         doProcessUDPQuestion(question, fromaddr, fd);
       }
     }
     catch(MOADNSException& mde) {
@@ -884,6 +898,7 @@ void handleNewUDPQuestion(int fd, FDMultiplexer::funcparam_t& var)
   }
 }
 
+
 typedef vector<pair<int, function< void(int, any&) > > > deferredAdd_t;
 deferredAdd_t deferredAdd;
 
@@ -1198,7 +1213,7 @@ void broadcastFunction(const pipefunc_t& func, bool skipSelf)
 void distributeAsyncFunction(const pipefunc_t& func)
 {
   static unsigned int counter;
-  unsigned int target = ++counter % g_pipes.size();
+  unsigned int target = 1 + (++counter % (g_pipes.size()-1));
   // cerr<<"Sending to: "<<target<<endl;
   if(target == t_id) {
     func();
@@ -1643,6 +1658,11 @@ int serviceMain(int argc, char*argv[])
   }
 
   g_quiet=::arg().mustDo("quiet");
+  g_weDistributeQueries = ::arg().mustDo("pdns-distributes-queries");
+  if(g_weDistributeQueries) {
+      L<<Logger::Warning<<"PowerDNS Recursor itself will distribute queries over threads"<<endl;
+  }
+  
   if(::arg().mustDo("trace")) {
     SyncRes::setLog(true);
     ::arg().set("quiet")="no";
@@ -1697,6 +1717,11 @@ int serviceMain(int argc, char*argv[])
   makeUDPServerSockets();
   makeTCPServerSockets();
 
+  for(int forks = 0; forks < ::arg().asNum("processes") - 1; ++forks) {
+    if(!fork()) // we are child
+      break;
+  }
+  
   s_pidfname=::arg()["socket-dir"]+"/"+s_programname+".pid";
   if(!s_pidfname.empty())
     unlink(s_pidfname.c_str()); // remove possible old pid file 
@@ -1731,7 +1756,7 @@ int serviceMain(int argc, char*argv[])
   Utility::dropPrivs(newuid, newgid);
   
   
-  g_numThreads = ::arg().asNum("threads");
+  g_numThreads = ::arg().asNum("threads") + ::arg().mustDo("pdns-distributes-queries");
   
   makeThreadPipes();
   
@@ -1804,14 +1829,14 @@ try
 
   t_fdm->addReadFD(g_pipes[t_id].readToThread, handlePipeRequest);
 
-  for(deferredAdd_t::const_iterator i=deferredAdd.begin(); i!=deferredAdd.end(); ++i) 
-    t_fdm->addReadFD(i->first, i->second);
+  if(!g_weDistributeQueries || !t_id)  // if we distribute queries, only t_id = 0 listens
+    for(deferredAdd_t::const_iterator i=deferredAdd.begin(); i!=deferredAdd.end(); ++i) 
+      t_fdm->addReadFD(i->first, i->second);
   
   if(!t_id) {
-    
     t_fdm->addReadFD(s_rcc.d_fd, handleRCC); // control channel
   }
-  
+
   unsigned int maxTcpClients=::arg().asNum("max-tcp-clients");
   
   bool listenOnTCP(true);
@@ -1927,6 +1952,7 @@ int main(int argc, char **argv)
     ::arg().set("setuid","If set, change user id to this uid for more security")="";
     ::arg().set("network-timeout", "Wait this nummer of milliseconds for network i/o")="1500";
     ::arg().set("threads", "Launch this number of threads")="2";
+    ::arg().set("processes", "Launch this number of processes (EXPERIMENTAL, DO NOT CHANGE)")="1";
 #ifdef WIN32
     ::arg().set("quiet","Suppress logging of questions and answers")="off";
     ::arg().setSwitch( "register-service", "Register the service" )= "no";
@@ -1982,7 +2008,9 @@ int main(int argc, char **argv)
     ::arg().setSwitch( "ignore-rd-bit", "Assume each packet requires recursion, for compatability" )= "off"; 
     ::arg().setSwitch( "disable-edns-ping", "Disable EDNSPing" )= "no"; 
     ::arg().setSwitch( "disable-edns", "Disable EDNS" )= ""; 
-    ::arg().setSwitch( "disable-packetcache", "Disable packetcahe" )= "no"; 
+    ::arg().setSwitch( "disable-packetcache", "Disable packetcache" )= "no"; 
+    ::arg().setSwitch( "pdns-distributes-queries", "If PowerDNS itself should distribute queries over threads (EXPERIMENTAL)")="no";
+    
 
     ::arg().setCmd("help","Provide a helpful message");
     ::arg().setCmd("version","Print version string ("VERSION")");