From 1bc3c142a26ab71aad4bf7508da4ccc0bd8a55a6 Mon Sep 17 00:00:00 2001 From: Bert Hubert Date: Sun, 8 Aug 2010 19:59:19 +0000 Subject: [PATCH] implement pdns-distributes-queries to make powerdns distribute queries itself implement 'processes', bringing back the old-school '--fork' option git-svn-id: svn://svn.powerdns.com/pdns/trunk/pdns@1684 d19b8d6e-7fed-0310-83ef-9ca221ded41b --- pdns/pdns_recursor.cc | 110 ++++++++++++++++++++++++++---------------- 1 file changed, 69 insertions(+), 41 deletions(-) diff --git a/pdns/pdns_recursor.cc b/pdns/pdns_recursor.cc index 489d4d841..a728ed7d6 100644 --- a/pdns/pdns_recursor.cc +++ b/pdns/pdns_recursor.cc @@ -99,6 +99,8 @@ __thread RecursorPacketCache* t_packetCache; RecursorStats g_stats; bool g_quiet; +bool g_weDistributeQueries; // if true, only 1 thread listens on the incoming query sockets + static __thread NetmaskGroup* t_allowFrom; static NetmaskGroup* g_initialAllowFrom; // new thread needs to be setup with this @@ -810,6 +812,47 @@ void handleNewTCPQuestion(int fd, FDMultiplexer::funcparam_t& ) } } +string* doProcessUDPQuestion(const std::string& question, const ComboAddress& fromaddr, int fd) +{ + ++g_stats.qcounter; + + string response; + try { + uint32_t age; + if(!SyncRes::s_nopacketcache && t_packetCache->getResponsePacket(question, g_now.tv_sec, &response, &age)) { + if(!g_quiet) + L<= sizeof(struct dnsheader)) + updateRcodeStats(((struct dnsheader*)response.c_str())->rcode); + g_stats.avgLatencyUsec=(uint64_t)((1-0.0001)*g_stats.avgLatencyUsec + 0); // we assume 0 usec + return 0; + } + } + catch(std::exception& e) { + L<numProcesses() > g_maxMThreads) { + g_stats.overCapacityDrops++; + return 0; + } + + DNSComboWriter* dc = new DNSComboWriter(question.c_str(), question.size(), g_now); + dc->setSocket(fd); + dc->setRemote(&fromaddr); + + dc->d_tcp=false; + MT->makeThread(startDoResolve, (void*) dc); // deletes dc + return 0; +} + void handleNewUDPQuestion(int fd, FDMultiplexer::funcparam_t& var) { int len; @@ -835,40 +878,11 @@ void handleNewUDPQuestion(int fd, FDMultiplexer::funcparam_t& var) L<getResponsePacket(string(data, len), g_now.tv_sec, &response, &age)) { - if(!g_quiet) - L<= sizeof(struct dnsheader)) - updateRcodeStats(((struct dnsheader*)response.c_str())->rcode); - g_stats.avgLatencyUsec=(uint64_t)((1-0.0001)*g_stats.avgLatencyUsec + 0); // we assume 0 usec - return; - } - } - catch(std::exception& e) { - throw MOADNSException(e.what()); // translate - } - if(MT->numProcesses() > g_maxMThreads) { - g_stats.overCapacityDrops++; - return; - } - - DNSComboWriter* dc = new DNSComboWriter(data, len, g_now); - dc->setSocket(fd); - dc->setRemote(&fromaddr); - - dc->d_tcp=false; - - MT->makeThread(startDoResolve, (void*) dc); // deletes dc + string question(data, len); + if(g_weDistributeQueries) + distributeAsyncFunction(boost::bind(doProcessUDPQuestion, question, fromaddr, fd)); + else + doProcessUDPQuestion(question, fromaddr, fd); } } catch(MOADNSException& mde) { @@ -884,6 +898,7 @@ void handleNewUDPQuestion(int fd, FDMultiplexer::funcparam_t& var) } } + typedef vector > > deferredAdd_t; deferredAdd_t deferredAdd; @@ -1198,7 +1213,7 @@ void broadcastFunction(const pipefunc_t& func, bool skipSelf) void distributeAsyncFunction(const pipefunc_t& func) { static unsigned int counter; - unsigned int target = ++counter % g_pipes.size(); + unsigned int target = 1 + (++counter % (g_pipes.size()-1)); // cerr<<"Sending to: "<addReadFD(g_pipes[t_id].readToThread, handlePipeRequest); - for(deferredAdd_t::const_iterator i=deferredAdd.begin(); i!=deferredAdd.end(); ++i) - t_fdm->addReadFD(i->first, i->second); + if(!g_weDistributeQueries || !t_id) // if we distribute queries, only t_id = 0 listens + for(deferredAdd_t::const_iterator i=deferredAdd.begin(); i!=deferredAdd.end(); ++i) + t_fdm->addReadFD(i->first, i->second); if(!t_id) { - t_fdm->addReadFD(s_rcc.d_fd, handleRCC); // control channel } - + unsigned int maxTcpClients=::arg().asNum("max-tcp-clients"); bool listenOnTCP(true); @@ -1927,6 +1952,7 @@ int main(int argc, char **argv) ::arg().set("setuid","If set, change user id to this uid for more security")=""; ::arg().set("network-timeout", "Wait this nummer of milliseconds for network i/o")="1500"; ::arg().set("threads", "Launch this number of threads")="2"; + ::arg().set("processes", "Launch this number of processes (EXPERIMENTAL, DO NOT CHANGE)")="1"; #ifdef WIN32 ::arg().set("quiet","Suppress logging of questions and answers")="off"; ::arg().setSwitch( "register-service", "Register the service" )= "no"; @@ -1982,7 +2008,9 @@ int main(int argc, char **argv) ::arg().setSwitch( "ignore-rd-bit", "Assume each packet requires recursion, for compatability" )= "off"; ::arg().setSwitch( "disable-edns-ping", "Disable EDNSPing" )= "no"; ::arg().setSwitch( "disable-edns", "Disable EDNS" )= ""; - ::arg().setSwitch( "disable-packetcache", "Disable packetcahe" )= "no"; + ::arg().setSwitch( "disable-packetcache", "Disable packetcache" )= "no"; + ::arg().setSwitch( "pdns-distributes-queries", "If PowerDNS itself should distribute queries over threads (EXPERIMENTAL)")="no"; + ::arg().setCmd("help","Provide a helpful message"); ::arg().setCmd("version","Print version string ("VERSION")"); -- 2.40.0