typedef map<ComboAddress, uint32_t, ComboAddress::addressOnlyLessThan> tcpClientCounts_t;
static thread_local std::shared_ptr<RecursorLua4> t_pdl;
-static thread_local unsigned int t_id;
+static thread_local int t_id;
static thread_local std::shared_ptr<Regex> t_traceRegex;
static thread_local std::unique_ptr<tcpClientCounts_t> t_tcpClientCounts;
unsigned int getRecursorThreadId()
{
- return t_id;
+ return static_cast<unsigned int>(t_id);
}
int getMTaskerTID()
last_rootupdate=now.tv_sec;
}
- if(!t_id) {
+ if (t_id == -1) {
if(g_statisticsInterval > 0 && now.tv_sec - last_stat >= g_statisticsInterval) {
doStats();
last_stat=time(0);
}
+ }
+ else if(!t_id) {
if(now.tv_sec - last_secpoll >= 3600) {
try {
void broadcastFunction(const pipefunc_t& func, bool skipSelf)
{
- unsigned int n = 0;
+ /* This function might be called by the worker with t_id 0 during startup */
+ if (t_id != -1 && t_id != 0) {
+ L<<Logger::Error<<"broadcastFunction() has been called by a worker ("<<t_id<<")"<<endl;
+ exit(1);
+ }
+
+ int n = 0;
for(ThreadPipeSet& tps : g_pipes)
{
if(n++ == t_id) {
void distributeAsyncFunction(const string& packet, const pipefunc_t& func)
{
+ if (t_id != 0) {
+ L<<Logger::Error<<"distributeAsyncFunction() has been called by a worker ("<<t_id<<")"<<endl;
+ exit(1);
+ }
+
unsigned int hash = hashQuestion(packet.c_str(), packet.length(), g_disthashseed);
unsigned int target = 1 + (hash % (g_pipes.size()-1));
- if(target == t_id) {
- func();
- return;
+ if(target == 0) {
+ L<<Logger::Error<<"distributeAsyncFunction() tried to assign a query to the distributor"<<endl;
+ exit(1);
}
+
ThreadPipeSet& tps = g_pipes[target];
ThreadMSG* tmsg = new ThreadMSG();
tmsg->func = func;
template<class T> T broadcastAccFunction(const boost::function<T*()>& func, bool skipSelf)
{
- unsigned int n = 0;
+ if (t_id != -1) {
+ L<<Logger::Error<<"broadcastFunction has been called by a worker ("<<t_id<<")"<<endl;
+ exit(1);
+
+ }
+
T ret=T();
for(ThreadPipeSet& tps : g_pipes)
{
- if(n++ == t_id) {
- if(!skipSelf) {
- T* resp = (T*)func(); // don't write to ourselves!
- if(resp) {
- //~ cerr <<"got direct: " << *resp << endl;
- ret += *resp;
- delete resp;
- }
- }
- continue;
- }
-
ThreadMSG* tmsg = new ThreadMSG();
tmsg->func = boost::bind(voider<T>, func);
tmsg->wantAnswer = true;
}
}
-static void* recursorThread(void*);
+static void* recursorThread(int tid, bool worker);
static void* pleaseSupplantACLs(std::shared_ptr<NetmaskGroup> ng)
{
g_snmpAgent->run();
}
+ /* This thread handles the web server, carbon, statistics and the control channel */
+ std::thread handlerThread(recursorThread, -1, false);
+
const auto cpusMap = parseCPUMap();
+
+ std::vector<std::thread> workers(g_numThreads);
if(g_numThreads == 1) {
L<<Logger::Warning<<"Operating unthreaded"<<endl;
#ifdef HAVE_SYSTEMD
sd_notify(0, "READY=1");
#endif
setCPUMap(cpusMap, 0, pthread_self());
- recursorThread(0);
+ recursorThread(0, true);
}
else {
- pthread_t tid;
L<<Logger::Warning<<"Launching "<< g_numThreads <<" threads"<<endl;
for(unsigned int n=0; n < g_numThreads; ++n) {
- pthread_create(&tid, 0, recursorThread, (void*)(long)n);
+ workers[n] = std::thread(recursorThread, n, true);
- setCPUMap(cpusMap, n, tid);
+ setCPUMap(cpusMap, n, workers[n].native_handle());
}
- void* res;
#ifdef HAVE_SYSTEMD
sd_notify(0, "READY=1");
#endif
- pthread_join(tid, &res);
+ workers.back().join();
}
return 0;
}
-static void* recursorThread(void* ptr)
+static void* recursorThread(int n, bool worker)
try
{
- t_id=(int) (long) ptr;
+ t_id=n;
SyncRes tmp(g_now); // make sure it allocates tsstorage before we do anything, like primeHints or so..
SyncRes::setDomainMap(g_initialDomainMap);
t_allowFrom = g_initialAllowFrom;
PacketID pident;
t_fdm=getMultiplexer();
- if(!t_id) {
+
+ if(!worker) {
if(::arg().mustDo("webserver")) {
L<<Logger::Warning << "Enabling web server" << endl;
try {
}
L<<Logger::Error<<"Enabled '"<< t_fdm->getName() << "' multiplexer"<<endl;
}
-
- t_fdm->addReadFD(g_pipes[t_id].readToThread, handlePipeRequest);
-
- if(g_useOneSocketPerThread) {
- for(deferredAdd_t::const_iterator i = deferredAdds[t_id].cbegin(); i != deferredAdds[t_id].cend(); ++i) {
- t_fdm->addReadFD(i->first, i->second);
- }
- }
else {
- if(!g_weDistributeQueries || !t_id) { // if we distribute queries, only t_id = 0 listens
- for(deferredAdd_t::const_iterator i = deferredAdds[0].cbegin(); i != deferredAdds[0].cend(); ++i) {
+ t_fdm->addReadFD(g_pipes[t_id].readToThread, handlePipeRequest);
+
+ if(g_useOneSocketPerThread) {
+ for(deferredAdd_t::const_iterator i = deferredAdds[t_id].cbegin(); i != deferredAdds[t_id].cend(); ++i) {
t_fdm->addReadFD(i->first, i->second);
}
}
+ else {
+ if(!g_weDistributeQueries || !t_id) { // if we distribute queries, only t_id = 0 listens
+ for(deferredAdd_t::const_iterator i = deferredAdds[0].cbegin(); i != deferredAdds[0].cend(); ++i) {
+ t_fdm->addReadFD(i->first, i->second);
+ }
+ }
+ }
}
registerAllStats();
- if(!t_id) {
+
+ if(!worker) {
t_fdm->addReadFD(s_rcc.d_fd, handleRCC); // control channel
}
counter++;
- if(!t_id && statsWanted) {
+ if(!worker && statsWanted) {
doStats();
}
Utility::gettimeofday(&g_now, 0);
- if(!t_id && (g_now.tv_sec - last_carbon >= carbonInterval)) {
+ if(!worker && (g_now.tv_sec - last_carbon >= carbonInterval)) {
MT->makeThread(doCarbonDump, 0);
last_carbon = g_now.tv_sec;
}
t_fdm->run(&g_now);
// 'run' updates g_now for us
- if(!g_weDistributeQueries || !t_id) { // if pdns distributes queries, only tid 0 should do this
+ if(worker && (!g_weDistributeQueries || !t_id)) { // if pdns distributes queries, only tid 0 should do this
if(listenOnTCP) {
if(TCPConnection::getCurrentConnections() > maxTcpClients) { // shutdown, too many connections
for(tcpListenSockets_t::iterator i=g_tcpListenSockets.begin(); i != g_tcpListenSockets.end(); ++i)