// Should not be called simultaneously!
void TCPClientCollection::addTCPClientThread()
-{
+{
+ if (d_numthreads >= d_tcpclientthreads.capacity()) {
+ warnlog("Adding a new TCP client thread would exceed the vector capacity, skipping");
+ return;
+ }
+
vinfolog("Adding TCP Client thread");
- int pipefds[2];
+ int pipefds[2] = { -1, -1};
if(pipe(pipefds) < 0)
unixDie("Creating pipe");
if (!setNonBlocking(pipefds[1]))
unixDie("Setting pipe non-blocking");
- d_tcpclientthreads.push_back(pipefds[1]);
+ d_tcpclientthreads.push_back(pipefds[1]);
thread t1(tcpClientThread, pipefds[0]);
t1.detach();
++d_numthreads;
return false;
}
-TCPClientCollection g_tcpclientthreads;
+std::shared_ptr<TCPClientCollection> g_tcpclientthreads;
void* tcpClientThread(int pipefd)
{
ConnectionInfo* citmp, ci;
readn2(pipefd, &citmp, sizeof(citmp));
- --g_tcpclientthreads.d_queued;
+ --g_tcpclientthreads->d_queued;
ci=*citmp;
delete citmp;
ComboAddress remote;
remote.sin4.sin_family = cs->local.sin4.sin_family;
- g_tcpclientthreads.addTCPClientThread();
+ g_tcpclientthreads->addTCPClientThread();
auto acl = g_ACL.getLocal();
for(;;) {
vinfolog("Got TCP connection from %s", remote.toStringWithPort());
ci->remote = remote;
- int pipe = g_tcpclientthreads.getThread();
- writen2WithTimeout(pipe, &ci, sizeof(ci), 0);
+ int pipe = g_tcpclientthreads->getThread();
+ if (pipe >= 0) {
+ writen2WithTimeout(pipe, &ci, sizeof(ci), 0);
+ }
+ else {
+ close(ci->fd);
+ delete ci;
+ }
}
catch(std::exception& e) {
errlog("While reading a TCP question: %s", e.what());
for(;;) {
sleep(interval);
- if(g_tcpclientthreads.d_queued > 1 && g_tcpclientthreads.d_numthreads < g_maxTCPClientThreads)
- g_tcpclientthreads.addTCPClientThread();
+ if(g_tcpclientthreads->d_queued > 1 && g_tcpclientthreads->d_numthreads < g_maxTCPClientThreads)
+ g_tcpclientthreads->addTCPClientThread();
for(auto& dss : g_dstates.getCopy()) { // this points to the actual shared_ptrs!
if(dss->availability==DownstreamState::Availability::Auto) {
/* this need to be done _after_ dropping privileges */
g_delay = new DelayPipe<DelayedPacket>();
+ g_tcpclientthreads = std::make_shared<TCPClientCollection>(g_maxTCPClientThreads);
+
for(auto& t : todo)
t();
class TCPClientCollection {
std::vector<int> d_tcpclientthreads;
- std::atomic<uint64_t> d_pos;
+ std::atomic<uint64_t> d_pos{0};
public:
- std::atomic<uint64_t> d_queued, d_numthreads;
+ std::atomic<uint64_t> d_queued{0}, d_numthreads{0};
- TCPClientCollection()
+ TCPClientCollection(size_t maxThreads)
{
- d_tcpclientthreads.reserve(1024);
+ d_tcpclientthreads.reserve(maxThreads);
}
- int getThread()
+ int getThread()
{
int pos = d_pos++;
++d_queued;
void addTCPClientThread();
};
-extern TCPClientCollection g_tcpclientthreads;
+extern std::shared_ptr<TCPClientCollection> g_tcpclientthreads;
struct DownstreamState
{