]> granicus.if.org Git - pdns/commitdiff
add infrastructure for manual query distribution over threads (distributeAsyncFunction)
authorBert Hubert <bert.hubert@netherlabs.nl>
Fri, 6 Aug 2010 19:12:20 +0000 (19:12 +0000)
committerBert Hubert <bert.hubert@netherlabs.nl>
Fri, 6 Aug 2010 19:12:20 +0000 (19:12 +0000)
git-svn-id: svn://svn.powerdns.com/pdns/trunk/pdns@1678 d19b8d6e-7fed-0310-83ef-9ca221ded41b

pdns/pdns_recursor.cc
pdns/syncres.hh

index 5f8c2ad339395fd436be3afd1d21fe4e7d50ead8..489d4d841e48250a3bbc3d8a9ff4e142a9ba8fd6 100644 (file)
@@ -124,7 +124,7 @@ struct DNSComboWriter {
                                                                                                        d_tcp(false), d_socket(-1)
   {}
   MOADNSParser d_mdp;
-  void setRemote(ComboAddress* sa)
+  void setRemote(const ComboAddress* sa)
   {
     d_remote=*sa;
   }
@@ -1162,6 +1162,12 @@ void makeThreadPipes()
   }
 }
 
+struct ThreadMSG
+{
+  pipefunc_t func;
+  bool wantAnswer;
+};
+
 void broadcastFunction(const pipefunc_t& func, bool skipSelf)
 {
   unsigned int n = 0;
@@ -1172,9 +1178,11 @@ void broadcastFunction(const pipefunc_t& func, bool skipSelf)
         func(); // don't write to ourselves!
       continue;
     }
-      
-    pipefunc_t *funcptr = new pipefunc_t(func);
-    if(write(tps.writeToThread, &funcptr, sizeof(funcptr)) != sizeof(funcptr))
+  
+    ThreadMSG* tmsg = new ThreadMSG();
+    tmsg->func = func;
+    tmsg->wantAnswer = true;
+    if(write(tps.writeToThread, &tmsg, sizeof(tmsg)) != sizeof(tmsg))
       unixDie("write to thread pipe returned wrong size or error");
     
     string* resp;
@@ -1187,22 +1195,39 @@ void broadcastFunction(const pipefunc_t& func, bool skipSelf)
     }
   }
 }
-
-
+void distributeAsyncFunction(const pipefunc_t& func)
+{
+  static unsigned int counter;
+  unsigned int target = ++counter % g_pipes.size();
+  // cerr<<"Sending to: "<<target<<endl;
+  if(target == t_id) {
+    func();
+    return;
+  }
+  ThreadPipeSet& tps = g_pipes[target];    
+  ThreadMSG* tmsg = new ThreadMSG();
+  tmsg->func = func;
+  tmsg->wantAnswer = false;
+  
+  if(write(tps.writeToThread, &tmsg, sizeof(tmsg)) != sizeof(tmsg))
+    unixDie("write to thread pipe returned wrong size or error");
+    
+}
 
 void handlePipeRequest(int fd, FDMultiplexer::funcparam_t& var)
 {
-  pipefunc_t* func;
-  if(read(fd, &func, sizeof(func)) != sizeof(func)) { // fd == readToThread 
+  ThreadMSG* tmsg;
+  
+  if(read(fd, &tmsg, sizeof(tmsg)) != sizeof(tmsg)) { // fd == readToThread 
     unixDie("read from thread pipe returned wrong size or error");
   }
   
-  void *resp = (*func)();
-  
-  if(write(g_pipes[t_id].writeFromThread, &resp, sizeof(resp)) != sizeof(resp))
-    unixDie("write to thread pipe returned wrong size or error");
+  void *resp = tmsg->func();
+  if(tmsg->wantAnswer)
+    if(write(g_pipes[t_id].writeFromThread, &resp, sizeof(resp)) != sizeof(resp))
+      unixDie("write to thread pipe returned wrong size or error");
   
-  delete func;
+  delete tmsg;
 }
 
 template<class T> void *voider(const boost::function<T*()>& func)
@@ -1216,8 +1241,6 @@ vector<ComboAddress>& operator+=(vector<ComboAddress>&a, const vector<ComboAddre
   return a;
 }
 
-
-
 template<class T> T broadcastAccFunction(const boost::function<T*()>& func, bool skipSelf)
 {
   unsigned int n = 0;
@@ -1236,9 +1259,13 @@ template<class T> T broadcastAccFunction(const boost::function<T*()>& func, bool
       continue;
     }
       
-    pipefunc_t *funcptr = new pipefunc_t(boost::bind(voider<T>, func));
-    if(write(tps.writeToThread, &funcptr, sizeof(funcptr)) != sizeof(funcptr))
+    ThreadMSG* tmsg = new ThreadMSG();
+    tmsg->func = boost::bind(voider<T>, func);
+    tmsg->wantAnswer = true;
+  
+    if(write(tps.writeToThread, &tmsg, sizeof(tmsg)) != sizeof(tmsg))
       unixDie("write to thread pipe returned wrong size or error");
+  
     
     T* resp;
     if(read(tps.readFromThread, &resp, sizeof(resp)) != sizeof(resp))
@@ -1694,7 +1721,6 @@ int serviceMain(int argc, char*argv[])
   if(!::arg()["setuid"].empty())
     newuid=Utility::makeUidNumeric(::arg()["setuid"]);
 
-#ifndef WIN32
   if (!::arg()["chroot"].empty()) {
     if (chroot(::arg()["chroot"].c_str())<0 || chdir("/") < 0) {
       L<<Logger::Error<<"Unable to chroot to '"+::arg()["chroot"]+"': "<<strerror (errno)<<", exiting"<<endl;
@@ -1785,7 +1811,6 @@ try
     
     t_fdm->addReadFD(s_rcc.d_fd, handleRCC); // control channel
   }
-#endif 
   
   unsigned int maxTcpClients=::arg().asNum("max-tcp-clients");
   
index 9d12650ed13a18f4d10e4e3a2a6720989ddd7949..0e26229b82fb5f22bb4b2bea74ac6f4d724b0766 100644 (file)
@@ -534,6 +534,7 @@ ComboAddress parseIPAndPort(const std::string& input, uint16_t port);
 ComboAddress getQueryLocalAddress(int family, uint16_t port);
 typedef boost::function<void*(void)> pipefunc_t;
 void broadcastFunction(const pipefunc_t& func, bool skipSelf = false);
+void distributeAsyncFunction(const pipefunc_t& func);
 
 
 template<class T> T broadcastAccFunction(const boost::function<T*()>& func, bool skipSelf=false);