From: Bert Hubert Date: Fri, 6 Aug 2010 19:12:20 +0000 (+0000) Subject: add infrastructure for manual query distribution over threads (distributeAsyncFunction) X-Git-Tag: rec-3.3~41 X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=00c9b8c1fe585505c702c70476b868b6f12a93f4;p=pdns add infrastructure for manual query distribution over threads (distributeAsyncFunction) git-svn-id: svn://svn.powerdns.com/pdns/trunk/pdns@1678 d19b8d6e-7fed-0310-83ef-9ca221ded41b --- diff --git a/pdns/pdns_recursor.cc b/pdns/pdns_recursor.cc index 5f8c2ad33..489d4d841 100644 --- a/pdns/pdns_recursor.cc +++ b/pdns/pdns_recursor.cc @@ -124,7 +124,7 @@ struct DNSComboWriter { d_tcp(false), d_socket(-1) {} MOADNSParser d_mdp; - void setRemote(ComboAddress* sa) + void setRemote(const ComboAddress* sa) { d_remote=*sa; } @@ -1162,6 +1162,12 @@ void makeThreadPipes() } } +struct ThreadMSG +{ + pipefunc_t func; + bool wantAnswer; +}; + void broadcastFunction(const pipefunc_t& func, bool skipSelf) { unsigned int n = 0; @@ -1172,9 +1178,11 @@ void broadcastFunction(const pipefunc_t& func, bool skipSelf) func(); // don't write to ourselves! continue; } - - pipefunc_t *funcptr = new pipefunc_t(func); - if(write(tps.writeToThread, &funcptr, sizeof(funcptr)) != sizeof(funcptr)) + + ThreadMSG* tmsg = new ThreadMSG(); + tmsg->func = func; + tmsg->wantAnswer = true; + if(write(tps.writeToThread, &tmsg, sizeof(tmsg)) != sizeof(tmsg)) unixDie("write to thread pipe returned wrong size or error"); string* resp; @@ -1187,22 +1195,39 @@ void broadcastFunction(const pipefunc_t& func, bool skipSelf) } } } - - +void distributeAsyncFunction(const pipefunc_t& func) +{ + static unsigned int counter; + unsigned int target = ++counter % g_pipes.size(); + // cerr<<"Sending to: "<func = func; + tmsg->wantAnswer = false; + + if(write(tps.writeToThread, &tmsg, sizeof(tmsg)) != sizeof(tmsg)) + unixDie("write to thread pipe returned wrong size or error"); + +} void handlePipeRequest(int fd, FDMultiplexer::funcparam_t& var) { - pipefunc_t* func; - if(read(fd, &func, sizeof(func)) != sizeof(func)) { // fd == readToThread + ThreadMSG* tmsg; + + if(read(fd, &tmsg, sizeof(tmsg)) != sizeof(tmsg)) { // fd == readToThread unixDie("read from thread pipe returned wrong size or error"); } - void *resp = (*func)(); - - if(write(g_pipes[t_id].writeFromThread, &resp, sizeof(resp)) != sizeof(resp)) - unixDie("write to thread pipe returned wrong size or error"); + void *resp = tmsg->func(); + if(tmsg->wantAnswer) + if(write(g_pipes[t_id].writeFromThread, &resp, sizeof(resp)) != sizeof(resp)) + unixDie("write to thread pipe returned wrong size or error"); - delete func; + delete tmsg; } template void *voider(const boost::function& func) @@ -1216,8 +1241,6 @@ vector& operator+=(vector&a, const vector T broadcastAccFunction(const boost::function& func, bool skipSelf) { unsigned int n = 0; @@ -1236,9 +1259,13 @@ template T broadcastAccFunction(const boost::function& func, bool continue; } - pipefunc_t *funcptr = new pipefunc_t(boost::bind(voider, func)); - if(write(tps.writeToThread, &funcptr, sizeof(funcptr)) != sizeof(funcptr)) + ThreadMSG* tmsg = new ThreadMSG(); + tmsg->func = boost::bind(voider, func); + tmsg->wantAnswer = true; + + if(write(tps.writeToThread, &tmsg, sizeof(tmsg)) != sizeof(tmsg)) unixDie("write to thread pipe returned wrong size or error"); + T* resp; if(read(tps.readFromThread, &resp, sizeof(resp)) != sizeof(resp)) @@ -1694,7 +1721,6 @@ int serviceMain(int argc, char*argv[]) if(!::arg()["setuid"].empty()) newuid=Utility::makeUidNumeric(::arg()["setuid"]); -#ifndef WIN32 if (!::arg()["chroot"].empty()) { if (chroot(::arg()["chroot"].c_str())<0 || chdir("/") < 0) { L<addReadFD(s_rcc.d_fd, handleRCC); // control channel } -#endif unsigned int maxTcpClients=::arg().asNum("max-tcp-clients"); diff --git a/pdns/syncres.hh b/pdns/syncres.hh index 9d12650ed..0e26229b8 100644 --- a/pdns/syncres.hh +++ b/pdns/syncres.hh @@ -534,6 +534,7 @@ ComboAddress parseIPAndPort(const std::string& input, uint16_t port); ComboAddress getQueryLocalAddress(int family, uint16_t port); typedef boost::function pipefunc_t; void broadcastFunction(const pipefunc_t& func, bool skipSelf = false); +void distributeAsyncFunction(const pipefunc_t& func); template T broadcastAccFunction(const boost::function& func, bool skipSelf=false);