From 5d7e6765196d98fef5edb3b830e425fbd4353085 Mon Sep 17 00:00:00 2001 From: Remi Gacogne Date: Tue, 22 May 2018 16:06:34 +0200 Subject: [PATCH] dnsdist: Fix reconnection handling --- pdns/dnsdist-lua.cc | 2 ++ pdns/dnsdist.cc | 41 +++++++++++++++++++++-------------------- pdns/dnsdist.hh | 5 ++++- 3 files changed, 27 insertions(+), 21 deletions(-) diff --git a/pdns/dnsdist-lua.cc b/pdns/dnsdist-lua.cc index c6c97c3a5..e35f0c0f3 100644 --- a/pdns/dnsdist-lua.cc +++ b/pdns/dnsdist-lua.cc @@ -345,6 +345,8 @@ void setupLuaConfig(bool client) g_pools.setState(localPools); if (ret->connected) { + ret->threadStarted.test_and_set(); + if(g_launchWork) { g_launchWork->push_back([ret,cpus]() { ret->tid = thread(responderThread, ret); diff --git a/pdns/dnsdist.cc b/pdns/dnsdist.cc index 44ec64f42..aeec6f39b 100644 --- a/pdns/dnsdist.cc +++ b/pdns/dnsdist.cc @@ -564,12 +564,18 @@ catch(...) return 0; } -void DownstreamState::reconnect() +bool DownstreamState::reconnect() { + std::unique_lock tl(connectLock, std::try_to_lock); + if (!tl.owns_lock()) { + /* we are already reconnecting */ + return false; + } + connected = false; for (auto& fd : sockets) { if (fd != -1) { - { + if (sockets.size() > 1) { std::lock_guard lock(socketsLock); mplexer->removeReadFD(fd); } @@ -586,7 +592,7 @@ void DownstreamState::reconnect() } try { SConnect(fd, remote); - { + if (sockets.size() > 1) { std::lock_guard lock(socketsLock); mplexer->addReadFD(fd, [](int, boost::any) {}); } @@ -604,6 +610,10 @@ void DownstreamState::reconnect() if (!connected) { for (auto& fd : sockets) { if (fd != -1) { + if (sockets.size() > 1) { + std::lock_guard lock(socketsLock); + mplexer->removeReadFD(fd); + } /* shutdown() is needed to wake up recv() in the responderThread */ shutdown(fd, SHUT_RDWR); close(fd); @@ -611,10 +621,14 @@ void DownstreamState::reconnect() } } } + + return connected; } DownstreamState::DownstreamState(const ComboAddress& remote_, const ComboAddress& sourceAddr_, unsigned int sourceItf_, size_t numberOfSockets): remote(remote_), sourceAddr(sourceAddr_), sourceItf(sourceItf_) { + threadStarted.clear(); + mplexer = std::unique_ptr(FDMultiplexer::getMultiplexerSilent()); sockets.resize(numberOfSockets); @@ -1827,22 +1841,9 @@ void* healthChecksThread() warnlog("Marking downstream %s as '%s'", dss->getNameWithAddr(), newState ? "up" : "down"); if (newState && !dss->connected) { - for (auto& fd : dss->sockets) { - try { - SConnect(fd, dss->remote); - { - std::lock_guard lock(dss->socketsLock); - dss->mplexer->addReadFD(fd, [](int, boost::any) {}); - } - dss->connected = true; - } - catch(const std::runtime_error& error) { - infolog("Error connecting to new server with address %s: %s", dss->remote.toStringWithPort(), error.what()); - newState = false; - dss->connected = false; - } - } - if (dss->connected) { + newState = dss->reconnect(); + + if (dss->connected && !dss->threadStarted.test_and_set()) { dss->tid = thread(responderThread, dss); } } @@ -2587,7 +2588,7 @@ try for(const auto& address : g_cmdLine.remotes) { auto ret=std::make_shared(ComboAddress(address, 53)); addServerToPool(localPools, "", ret); - if (ret->connected) { + if (ret->connected && !ret->threadStarted.test_and_set()) { ret->tid = thread(responderThread, ret); } g_dstates.modify([ret](servers_t& servers) { servers.push_back(ret); }); diff --git a/pdns/dnsdist.hh b/pdns/dnsdist.hh index 96a050b8d..bd89ace6e 100644 --- a/pdns/dnsdist.hh +++ b/pdns/dnsdist.hh @@ -501,6 +501,7 @@ struct DownstreamState std::vector sockets; std::mutex socketsLock; + std::mutex connectLock; std::unique_ptr mplexer{nullptr}; std::thread tid; ComboAddress remote; @@ -544,8 +545,10 @@ struct DownstreamState bool useECS{false}; bool setCD{false}; std::atomic connected{false}; + std::atomic_flag threadStarted; bool tcpFastOpen{false}; bool ipBindAddrNoPort{true}; + bool isUp() const { if(availability == Availability::Down) @@ -580,7 +583,7 @@ struct DownstreamState status = (upStatus ? "up" : "down"); return status; } - void reconnect(); + bool reconnect(); }; using servers_t =vector>; -- 2.50.1