return 0;
}
-void DownstreamState::reconnect()
+bool DownstreamState::reconnect()
{
+ std::unique_lock<std::mutex> tl(connectLock, std::try_to_lock);
+ if (!tl.owns_lock()) {
+ /* we are already reconnecting */
+ return false;
+ }
+
connected = false;
for (auto& fd : sockets) {
if (fd != -1) {
- {
+ if (sockets.size() > 1) {
std::lock_guard<std::mutex> lock(socketsLock);
mplexer->removeReadFD(fd);
}
}
try {
SConnect(fd, remote);
- {
+ if (sockets.size() > 1) {
std::lock_guard<std::mutex> lock(socketsLock);
mplexer->addReadFD(fd, [](int, boost::any) {});
}
if (!connected) {
for (auto& fd : sockets) {
if (fd != -1) {
+ if (sockets.size() > 1) {
+ std::lock_guard<std::mutex> lock(socketsLock);
+ mplexer->removeReadFD(fd);
+ }
/* shutdown() is needed to wake up recv() in the responderThread */
shutdown(fd, SHUT_RDWR);
close(fd);
}
}
}
+
+ return connected;
}
DownstreamState::DownstreamState(const ComboAddress& remote_, const ComboAddress& sourceAddr_, unsigned int sourceItf_, size_t numberOfSockets): remote(remote_), sourceAddr(sourceAddr_), sourceItf(sourceItf_)
{
+ threadStarted.clear();
+
mplexer = std::unique_ptr<FDMultiplexer>(FDMultiplexer::getMultiplexerSilent());
sockets.resize(numberOfSockets);
warnlog("Marking downstream %s as '%s'", dss->getNameWithAddr(), newState ? "up" : "down");
if (newState && !dss->connected) {
- for (auto& fd : dss->sockets) {
- try {
- SConnect(fd, dss->remote);
- {
- std::lock_guard<std::mutex> lock(dss->socketsLock);
- dss->mplexer->addReadFD(fd, [](int, boost::any) {});
- }
- dss->connected = true;
- }
- catch(const std::runtime_error& error) {
- infolog("Error connecting to new server with address %s: %s", dss->remote.toStringWithPort(), error.what());
- newState = false;
- dss->connected = false;
- }
- }
- if (dss->connected) {
+ newState = dss->reconnect();
+
+ if (dss->connected && !dss->threadStarted.test_and_set()) {
dss->tid = thread(responderThread, dss);
}
}
for(const auto& address : g_cmdLine.remotes) {
auto ret=std::make_shared<DownstreamState>(ComboAddress(address, 53));
addServerToPool(localPools, "", ret);
- if (ret->connected) {
+ if (ret->connected && !ret->threadStarted.test_and_set()) {
ret->tid = thread(responderThread, ret);
}
g_dstates.modify([ret](servers_t& servers) { servers.push_back(ret); });
std::vector<int> sockets;
std::mutex socketsLock;
+ std::mutex connectLock;
std::unique_ptr<FDMultiplexer> mplexer{nullptr};
std::thread tid;
ComboAddress remote;
bool useECS{false};
bool setCD{false};
std::atomic<bool> connected{false};
+ std::atomic_flag threadStarted;
bool tcpFastOpen{false};
bool ipBindAddrNoPort{true};
+
bool isUp() const
{
if(availability == Availability::Down)
status = (upStatus ? "up" : "down");
return status;
}
- void reconnect();
+ bool reconnect();
};
using servers_t =vector<std::shared_ptr<DownstreamState>>;