From: Remi Gacogne Date: Thu, 3 May 2018 12:27:18 +0000 (+0100) Subject: rec: Move carbon/webserver/control/stats handling to a separate thread X-Git-Tag: dnsdist-1.3.1~89^2~7 X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=d77abca123d8e96ad0855770d63cae4d083440ac;p=pdns rec: Move carbon/webserver/control/stats handling to a separate thread This makes sure that no worker or distributor thread will get blocked while waiting for a response from another thread, for example while gathering stats or executing a command coming from the control channel. --- diff --git a/pdns/pdns_recursor.cc b/pdns/pdns_recursor.cc index c5569a1d3..d26b08ff9 100644 --- a/pdns/pdns_recursor.cc +++ b/pdns/pdns_recursor.cc @@ -101,7 +101,7 @@ typedef map tcpClientCounts_t; static thread_local std::shared_ptr t_pdl; -static thread_local unsigned int t_id; +static thread_local int t_id; static thread_local std::shared_ptr t_traceRegex; static thread_local std::unique_ptr t_tcpClientCounts; #ifdef HAVE_PROTOBUF @@ -283,7 +283,7 @@ ArgvMap &arg() unsigned int getRecursorThreadId() { - return t_id; + return static_cast(t_id); } int getMTaskerTID() @@ -2356,11 +2356,13 @@ static void houseKeeping(void *) last_rootupdate=now.tv_sec; } - if(!t_id) { + if (t_id == -1) { if(g_statisticsInterval > 0 && now.tv_sec - last_stat >= g_statisticsInterval) { doStats(); last_stat=time(0); } + } + else if(!t_id) { if(now.tv_sec - last_secpoll >= 3600) { try { @@ -2432,7 +2434,13 @@ struct ThreadMSG void broadcastFunction(const pipefunc_t& func, bool skipSelf) { - unsigned int n = 0; + /* This function might be called by the worker with t_id 0 during startup */ + if (t_id != -1 && t_id != 0) { + g_log<func = func; @@ -2548,22 +2562,15 @@ vector >& operator+=(vector >&a, template T broadcastAccFunction(const boost::function& func, bool skipSelf) { - unsigned int n = 0; + if (t_id != -1) { + g_log<func = boost::bind(voider, func); tmsg->wantAnswer = true; @@ -2886,7 +2893,7 @@ static void checkOrFixFDS() } } -static void* recursorThread(void*); +static void* recursorThread(int tid, bool worker); static void* pleaseSupplantACLs(std::shared_ptr ng) { @@ -3370,36 +3377,39 @@ static int serviceMain(int argc, char*argv[]) s_avoidUdpSourcePorts.insert(port); } + /* This thread handles the web server, carbon, statistics and the control channel */ + std::thread handlerThread(recursorThread, -1, false); + const auto cpusMap = parseCPUMap(); + + std::vector workers(g_numThreads); if(g_numThreads == 1) { g_log<getName() << "' multiplexer"<addReadFD(g_pipes[t_id].readToThread, handlePipeRequest); - t_fdm->addReadFD(g_pipes[t_id].readQueriesToThread, handlePipeRequest); - - if(g_useOneSocketPerThread) { - for(deferredAdd_t::const_iterator i = deferredAdds[t_id].cbegin(); i != deferredAdds[t_id].cend(); ++i) { - t_fdm->addReadFD(i->first, i->second); - } - } else { - if(!g_weDistributeQueries || !t_id) { // if we distribute queries, only t_id = 0 listens - for(deferredAdd_t::const_iterator i = deferredAdds[0].cbegin(); i != deferredAdds[0].cend(); ++i) { + t_fdm->addReadFD(g_pipes[t_id].readToThread, handlePipeRequest); + t_fdm->addReadFD(g_pipes[t_id].readQueriesToThread, handlePipeRequest); + + if(g_useOneSocketPerThread) { + for(deferredAdd_t::const_iterator i = deferredAdds[t_id].cbegin(); i != deferredAdds[t_id].cend(); ++i) { t_fdm->addReadFD(i->first, i->second); } } + else { + if(!g_weDistributeQueries || !t_id) { // if we distribute queries, only t_id = 0 listens + for(deferredAdd_t::const_iterator i = deferredAdds[0].cbegin(); i != deferredAdds[0].cend(); ++i) { + t_fdm->addReadFD(i->first, i->second); + } + } + } } registerAllStats(); - if(!t_id) { + + if(!worker) { t_fdm->addReadFD(s_rcc.d_fd, handleRCC); // control channel } @@ -3519,13 +3532,13 @@ try counter++; - if(!t_id && statsWanted) { + if(!worker && statsWanted) { doStats(); } Utility::gettimeofday(&g_now, 0); - if(!t_id && (g_now.tv_sec - last_carbon >= carbonInterval)) { + if(!worker && (g_now.tv_sec - last_carbon >= carbonInterval)) { MT->makeThread(doCarbonDump, 0); last_carbon = g_now.tv_sec; } @@ -3533,7 +3546,7 @@ try t_fdm->run(&g_now); // 'run' updates g_now for us - if(!g_weDistributeQueries || !t_id) { // if pdns distributes queries, only tid 0 should do this + if(worker && (!g_weDistributeQueries || !t_id)) { // if pdns distributes queries, only tid 0 should do this if(listenOnTCP) { if(TCPConnection::getCurrentConnections() > maxTcpClients) { // shutdown, too many connections for(tcpListenSockets_t::iterator i=g_tcpListenSockets.begin(); i != g_tcpListenSockets.end(); ++i)