int readQueriesToThread{-1};
};
+ /* FD corresponding to TCP sockets this thread is listening
+ on if reuseport is set. Otherwise g_tcpSockets is used instead.
+ These FDs are also in deferredAdds when we have one
+ socket per listener, and in g_deferredAdds instead. */
+ std::set<int> tcpSockets;
/* FD corresponding to listening sockets if we have one socket per
listener (with reuseport), otherwise all listeners share the
same FD and g_deferredAdds is then used instead */
static std::vector<RecThreadInfo> s_threadInfos;
/* without reuseport, all listeners share the same sockets */
static deferredAdd_t g_deferredAdds;
+static std::set<int> g_tcpSockets;
typedef vector<int> tcpListenSockets_t;
typedef map<int, ComboAddress> listenSocketsAddresses_t; // is shared across all threads right now
static const ComboAddress g_local4("0.0.0.0"), g_local6("::");
-static tcpListenSockets_t g_tcpListenSockets; // shared across threads, but this is fine, never written to from a thread. All threads listen on all sockets
static listenSocketsAddresses_t g_listenSocketsAddresses; // is shared across all threads right now
static set<int> g_fromtosockets; // listen sockets that use 'sendfromto()' mechanism
static vector<ComboAddress> g_localQueryAddresses4, g_localQueryAddresses6;
}
}
-static void makeTCPServerSockets(deferredAdd_t& deferredAdds)
+static void makeTCPServerSockets(deferredAdd_t& deferredAdds, std::set<int>& tcpSockets)
{
int fd;
vector<string>locals;
setSocketSendBuffer(fd, 65000);
listen(fd, 128);
deferredAdds.push_back(make_pair(fd, handleNewTCPQuestion));
- g_tcpListenSockets.push_back(fd);
+ tcpSockets.insert(fd);
+
// we don't need to update g_listenSocketsAddresses since it doesn't work for TCP/IP:
// - fd is not that which we know here, but returned from accept()
if(sin.sin4.sin_family == AF_INET)
/* first thread is the handler, then distributors */
for (unsigned int threadId = 1; threadId <= g_numDistributorThreads; threadId++) {
auto& deferredAdds = s_threadInfos.at(threadId).deferredAdds;
+ auto& tcpSockets = s_threadInfos.at(threadId).tcpSockets;
makeUDPServerSockets(deferredAdds);
- makeTCPServerSockets(deferredAdds);
+ makeTCPServerSockets(deferredAdds, tcpSockets);
}
}
else {
/* first thread is the handler, there is no distributor here and workers are accepting queries */
for (unsigned int threadId = 1; threadId <= g_numWorkerThreads; threadId++) {
auto& deferredAdds = s_threadInfos.at(threadId).deferredAdds;
+ auto& tcpSockets = s_threadInfos.at(threadId).tcpSockets;
makeUDPServerSockets(deferredAdds);
- makeTCPServerSockets(deferredAdds);
+ makeTCPServerSockets(deferredAdds, tcpSockets);
}
}
}
/* we don't have reuseport so we can only open one socket per
listening addr:port and everyone will listen on it */
makeUDPServerSockets(g_deferredAdds);
- makeTCPServerSockets(g_deferredAdds);
+ makeTCPServerSockets(g_deferredAdds, g_tcpSockets);
}
#ifdef NOD_ENABLED
if(threadInfo.isListener) {
if(listenOnTCP) {
if(TCPConnection::getCurrentConnections() > maxTcpClients) { // shutdown, too many connections
- for(const auto fd : g_tcpListenSockets) {
- t_fdm->removeReadFD(fd);
+
+ if (g_reusePort) {
+ for(const auto fd : threadInfo.tcpSockets) {
+ t_fdm->removeReadFD(fd);
+ }
+ }
+ else {
+ for(const auto fd : g_tcpSockets) {
+ t_fdm->removeReadFD(fd);
+ }
}
+
listenOnTCP=false;
}
}
else {
if(TCPConnection::getCurrentConnections() <= maxTcpClients) { // reenable
- for(const auto fd : g_tcpListenSockets) {
- t_fdm->addReadFD(fd, handleNewTCPQuestion);
+ if (g_reusePort) {
+ for(const auto fd : threadInfo.tcpSockets) {
+ t_fdm->addReadFD(fd, handleNewTCPQuestion);
+ }
+ }
+ else {
+ for(const auto fd : g_tcpSockets) {
+ t_fdm->addReadFD(fd, handleNewTCPQuestion);
+ }
}
+
listenOnTCP=true;
}
}