int writeFromThread;
int readFromThread;
};
+
typedef vector<int> tcpListenSockets_t;
typedef map<int, ComboAddress> listenSocketsAddresses_t; // is shared across all threads right now
typedef vector<pair<int, function< void(int, any&) > > > deferredAdd_t;
static vector<ThreadPipeSet> g_pipes; // effectively readonly after startup
static tcpListenSockets_t g_tcpListenSockets; // shared across threads, but this is fine, never written to from a thread. All threads listen on all sockets
static listenSocketsAddresses_t g_listenSocketsAddresses; // is shared across all threads right now
-static deferredAdd_t deferredAdd;
+static std::unordered_map<unsigned int, deferredAdd_t> deferredAdds;
static set<int> g_fromtosockets; // listen sockets that use 'sendfromto()' mechanism
static vector<ComboAddress> g_localQueryAddresses4, g_localQueryAddresses6;
static AtomicCounter counter;
static bool g_anyToTcp;
static bool g_lowercaseOutgoing;
static bool g_weDistributeQueries; // if true, only 1 thread listens on the incoming query sockets
+static bool g_reusePort{false};
+static bool g_useOneSocketPerThread;
std::unordered_set<DNSName> g_delegationOnly;
RecursorControlChannel s_rcc; // only active in thread 0
}
}
-static void makeTCPServerSockets()
+static void makeTCPServerSockets(unsigned int threadId)
{
int fd;
vector<string>locals;
setCloseOnExec(fd);
int tmp=1;
- if(setsockopt(fd,SOL_SOCKET,SO_REUSEADDR,(char*)&tmp,sizeof tmp)<0) {
+ if(setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &tmp, sizeof tmp)<0) {
L<<Logger::Error<<"Setsockopt failed for TCP listening socket"<<endl;
exit(1);
}
}
#ifdef TCP_DEFER_ACCEPT
- if(setsockopt(fd, SOL_TCP,TCP_DEFER_ACCEPT,(char*)&tmp,sizeof tmp) >= 0) {
+ if(setsockopt(fd, SOL_TCP, TCP_DEFER_ACCEPT, &tmp, sizeof tmp) >= 0) {
if(i==locals.begin())
L<<Logger::Error<<"Enabled TCP data-ready filter for (slight) DoS protection"<<endl;
}
Utility::setBindAny(AF_INET, fd);
#ifdef SO_REUSEPORT
- if(::arg().mustDo("reuseport")) {
- int one=1;
- if(setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &one, sizeof(one)) < 0)
+ if(g_reusePort) {
+ if(setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &tmp, sizeof(tmp)) < 0)
throw PDNSException("SO_REUSEPORT: "+stringerror());
}
#endif
setNonBlocking(fd);
setSocketSendBuffer(fd, 65000);
listen(fd, 128);
- deferredAdd.push_back(make_pair(fd, handleNewTCPQuestion));
+ deferredAdds[threadId].push_back(make_pair(fd, handleNewTCPQuestion));
g_tcpListenSockets.push_back(fd);
// we don't need to update g_listenSocketsAddresses since it doesn't work for TCP/IP:
// - fd is not that which we know here, but returned from accept()
}
}
-static void makeUDPServerSockets()
+static void makeUDPServerSockets(unsigned int threadId)
{
int one=1;
vector<string>locals;
#ifdef SO_REUSEPORT
- if(::arg().mustDo("reuseport")) {
+ if(g_reusePort) {
if(setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &one, sizeof(one)) < 0)
throw PDNSException("SO_REUSEPORT: "+stringerror());
}
setNonBlocking(fd);
- deferredAdd.push_back(make_pair(fd, handleNewUDPQuestion));
+ deferredAdds[threadId].push_back(make_pair(fd, handleNewUDPQuestion));
g_listenSocketsAddresses[fd]=sin; // this is written to only from the startup thread, not from the workers
if(sin.sin4.sin_family == AF_INET)
L<<Logger::Error<<"Listening for UDP queries on "<< sin.toString() <<":"<<st.port<<endl;
g_lowercaseOutgoing = ::arg().mustDo("lowercase-outgoing");
- makeUDPServerSockets();
- makeTCPServerSockets();
+ g_numWorkerThreads = ::arg().asNum("threads");
+ g_numThreads = g_numWorkerThreads + g_weDistributeQueries;
+ g_maxMThreads = ::arg().asNum("max-mthreads");
+
+#ifdef SO_REUSEPORT
+ g_reusePort = ::arg().mustDo("reuseport");
+#endif
+
+ g_useOneSocketPerThread = (!g_weDistributeQueries && g_reusePort);
+
+ if (g_useOneSocketPerThread) {
+ for (unsigned int threadId = 0; threadId < g_numWorkerThreads; threadId++) {
+ makeUDPServerSockets(threadId);
+ makeTCPServerSockets(threadId);
+ }
+ }
+ else {
+ makeUDPServerSockets(0);
+ makeTCPServerSockets(0);
+ }
parseEDNSSubnetWhitelist(::arg()["edns-subnet-whitelist"]);
g_useIncomingECS = ::arg().mustDo("use-incoming-edns-subnet");
signal(SIGUSR1,usr1Handler);
signal(SIGUSR2,usr2Handler);
signal(SIGPIPE,SIG_IGN);
- g_numThreads = ::arg().asNum("threads") + ::arg().mustDo("pdns-distributes-queries");
- g_numWorkerThreads = ::arg().asNum("threads");
- g_maxMThreads = ::arg().asNum("max-mthreads");
+
checkOrFixFDS();
#ifdef HAVE_LIBSODIUM
t_fdm->addReadFD(g_pipes[t_id].readToThread, handlePipeRequest);
- if(!g_weDistributeQueries || !t_id) // if we distribute queries, only t_id = 0 listens
- for(deferredAdd_t::const_iterator i=deferredAdd.begin(); i!=deferredAdd.end(); ++i)
- t_fdm->addReadFD(i->first, i->second);
+ if(g_useOneSocketPerThread) {
+ for (unsigned int threadId = 0; threadId < g_numWorkerThreads; threadId++) {
+ for(deferredAdd_t::const_iterator i = deferredAdds[threadId].begin(); i != deferredAdds[threadId].end(); ++i) {
+ t_fdm->addReadFD(i->first, i->second);
+ }
+ }
+ }
+ else {
+ if(!g_weDistributeQueries || !t_id) { // if we distribute queries, only t_id = 0 listens
+ for(deferredAdd_t::const_iterator i = deferredAdds[0].begin(); i != deferredAdds[0].end(); ++i) {
+ t_fdm->addReadFD(i->first, i->second);
+ }
+ }
+ }
if(!t_id) {
t_fdm->addReadFD(s_rcc.d_fd, handleRCC); // control channel