RecursorStats g_stats;
bool g_quiet;
+bool g_weDistributeQueries; // if true, only 1 thread listens on the incoming query sockets
+
static __thread NetmaskGroup* t_allowFrom;
static NetmaskGroup* g_initialAllowFrom; // new thread needs to be setup with this
}
}
+string* doProcessUDPQuestion(const std::string& question, const ComboAddress& fromaddr, int fd)
+{
+ ++g_stats.qcounter;
+
+ string response;
+ try {
+ uint32_t age;
+ if(!SyncRes::s_nopacketcache && t_packetCache->getResponsePacket(question, g_now.tv_sec, &response, &age)) {
+ if(!g_quiet)
+ L<<Logger::Error<<t_id<< " question answered from packet cache from "<<fromaddr.toString()<<endl;
+
+ g_stats.packetCacheHits++;
+ SyncRes::s_queries++;
+ ageDNSPacket(response, age);
+ sendto(fd, response.c_str(), response.length(), 0, (struct sockaddr*) &fromaddr, fromaddr.getSocklen());
+ if(response.length() >= sizeof(struct dnsheader))
+ updateRcodeStats(((struct dnsheader*)response.c_str())->rcode);
+ g_stats.avgLatencyUsec=(uint64_t)((1-0.0001)*g_stats.avgLatencyUsec + 0); // we assume 0 usec
+ return 0;
+ }
+ }
+ catch(std::exception& e) {
+ L<<Logger::Error<<"Error processing or aging answer packet: "<<e.what()<<endl;
+ return 0;
+ }
+
+
+ if(MT->numProcesses() > g_maxMThreads) {
+ g_stats.overCapacityDrops++;
+ return 0;
+ }
+
+ DNSComboWriter* dc = new DNSComboWriter(question.c_str(), question.size(), g_now);
+ dc->setSocket(fd);
+ dc->setRemote(&fromaddr);
+
+ dc->d_tcp=false;
+ MT->makeThread(startDoResolve, (void*) dc); // deletes dc
+ return 0;
+}
+
void handleNewUDPQuestion(int fd, FDMultiplexer::funcparam_t& var)
{
int len;
L<<Logger::Error<<"Ignoring answer from "<<fromaddr.toString()<<" on server socket!"<<endl;
}
else {
- ++g_stats.qcounter;
-
- string response;
- try {
- uint32_t age;
- if(!SyncRes::s_nopacketcache && t_packetCache->getResponsePacket(string(data, len), g_now.tv_sec, &response, &age)) {
- if(!g_quiet)
- L<<Logger::Error<<t_id<< " question answered from packet cache from "<<fromaddr.toString()<<endl;
-
- g_stats.packetCacheHits++;
- SyncRes::s_queries++;
- ageDNSPacket(response, age);
- sendto(fd, response.c_str(), response.length(), 0, (struct sockaddr*) &fromaddr, fromaddr.getSocklen());
- if(response.length() >= sizeof(struct dnsheader))
- updateRcodeStats(((struct dnsheader*)response.c_str())->rcode);
- g_stats.avgLatencyUsec=(uint64_t)((1-0.0001)*g_stats.avgLatencyUsec + 0); // we assume 0 usec
- return;
- }
- }
- catch(std::exception& e) {
- throw MOADNSException(e.what()); // translate
- }
- if(MT->numProcesses() > g_maxMThreads) {
- g_stats.overCapacityDrops++;
- return;
- }
-
- DNSComboWriter* dc = new DNSComboWriter(data, len, g_now);
- dc->setSocket(fd);
- dc->setRemote(&fromaddr);
-
- dc->d_tcp=false;
-
- MT->makeThread(startDoResolve, (void*) dc); // deletes dc
+ string question(data, len);
+ if(g_weDistributeQueries)
+ distributeAsyncFunction(boost::bind(doProcessUDPQuestion, question, fromaddr, fd));
+ else
+ doProcessUDPQuestion(question, fromaddr, fd);
}
}
catch(MOADNSException& mde) {
}
}
+
typedef vector<pair<int, function< void(int, any&) > > > deferredAdd_t;
deferredAdd_t deferredAdd;
void distributeAsyncFunction(const pipefunc_t& func)
{
static unsigned int counter;
- unsigned int target = ++counter % g_pipes.size();
+ unsigned int target = 1 + (++counter % (g_pipes.size()-1));
// cerr<<"Sending to: "<<target<<endl;
if(target == t_id) {
func();
}
g_quiet=::arg().mustDo("quiet");
+ g_weDistributeQueries = ::arg().mustDo("pdns-distributes-queries");
+ if(g_weDistributeQueries) {
+ L<<Logger::Warning<<"PowerDNS Recursor itself will distribute queries over threads"<<endl;
+ }
+
if(::arg().mustDo("trace")) {
SyncRes::setLog(true);
::arg().set("quiet")="no";
makeUDPServerSockets();
makeTCPServerSockets();
+ for(int forks = 0; forks < ::arg().asNum("processes") - 1; ++forks) {
+ if(!fork()) // we are child
+ break;
+ }
+
s_pidfname=::arg()["socket-dir"]+"/"+s_programname+".pid";
if(!s_pidfname.empty())
unlink(s_pidfname.c_str()); // remove possible old pid file
Utility::dropPrivs(newuid, newgid);
- g_numThreads = ::arg().asNum("threads");
+ g_numThreads = ::arg().asNum("threads") + ::arg().mustDo("pdns-distributes-queries");
makeThreadPipes();
t_fdm->addReadFD(g_pipes[t_id].readToThread, handlePipeRequest);
- for(deferredAdd_t::const_iterator i=deferredAdd.begin(); i!=deferredAdd.end(); ++i)
- t_fdm->addReadFD(i->first, i->second);
+ if(!g_weDistributeQueries || !t_id) // if we distribute queries, only t_id = 0 listens
+ for(deferredAdd_t::const_iterator i=deferredAdd.begin(); i!=deferredAdd.end(); ++i)
+ t_fdm->addReadFD(i->first, i->second);
if(!t_id) {
-
t_fdm->addReadFD(s_rcc.d_fd, handleRCC); // control channel
}
-
+
unsigned int maxTcpClients=::arg().asNum("max-tcp-clients");
bool listenOnTCP(true);
::arg().set("setuid","If set, change user id to this uid for more security")="";
::arg().set("network-timeout", "Wait this nummer of milliseconds for network i/o")="1500";
::arg().set("threads", "Launch this number of threads")="2";
+ ::arg().set("processes", "Launch this number of processes (EXPERIMENTAL, DO NOT CHANGE)")="1";
#ifdef WIN32
::arg().set("quiet","Suppress logging of questions and answers")="off";
::arg().setSwitch( "register-service", "Register the service" )= "no";
::arg().setSwitch( "ignore-rd-bit", "Assume each packet requires recursion, for compatability" )= "off";
::arg().setSwitch( "disable-edns-ping", "Disable EDNSPing" )= "no";
::arg().setSwitch( "disable-edns", "Disable EDNS" )= "";
- ::arg().setSwitch( "disable-packetcache", "Disable packetcahe" )= "no";
+ ::arg().setSwitch( "disable-packetcache", "Disable packetcache" )= "no";
+ ::arg().setSwitch( "pdns-distributes-queries", "If PowerDNS itself should distribute queries over threads (EXPERIMENTAL)")="no";
+
::arg().setCmd("help","Provide a helpful message");
::arg().setCmd("version","Print version string ("VERSION")");