]> granicus.if.org Git - pdns/commitdiff
dnsdist: Fix reconnection handling
authorRemi Gacogne <remi.gacogne@powerdns.com>
Tue, 22 May 2018 14:06:34 +0000 (16:06 +0200)
committerRemi Gacogne <remi.gacogne@powerdns.com>
Fri, 25 May 2018 10:21:19 +0000 (12:21 +0200)
pdns/dnsdist-lua.cc
pdns/dnsdist.cc
pdns/dnsdist.hh

index c6c97c3a58e06e39931620380958458b503d73ae..e35f0c0f3e2b0f9dc8ce7bcf922e4a148cfabee1 100644 (file)
@@ -345,6 +345,8 @@ void setupLuaConfig(bool client)
                         g_pools.setState(localPools);
 
                        if (ret->connected) {
+                         ret->threadStarted.test_and_set();
+
                          if(g_launchWork) {
                            g_launchWork->push_back([ret,cpus]() {
                               ret->tid = thread(responderThread, ret);
index 44ec64f42c3b0ca4d3b64c41c99c6a93c5deb46d..aeec6f39b8b53d49ed9f51907d1236c738085bc2 100644 (file)
@@ -564,12 +564,18 @@ catch(...)
   return 0;
 }
 
-void DownstreamState::reconnect()
+bool DownstreamState::reconnect()
 {
+  std::unique_lock<std::mutex> tl(connectLock, std::try_to_lock);
+  if (!tl.owns_lock()) {
+    /* we are already reconnecting */
+    return false;
+  }
+
   connected = false;
   for (auto& fd : sockets) {
     if (fd != -1) {
-      {
+      if (sockets.size() > 1) {
         std::lock_guard<std::mutex> lock(socketsLock);
         mplexer->removeReadFD(fd);
       }
@@ -586,7 +592,7 @@ void DownstreamState::reconnect()
       }
       try {
         SConnect(fd, remote);
-        {
+        if (sockets.size() > 1) {
           std::lock_guard<std::mutex> lock(socketsLock);
           mplexer->addReadFD(fd, [](int, boost::any) {});
         }
@@ -604,6 +610,10 @@ void DownstreamState::reconnect()
   if (!connected) {
     for (auto& fd : sockets) {
       if (fd != -1) {
+        if (sockets.size() > 1) {
+          std::lock_guard<std::mutex> lock(socketsLock);
+          mplexer->removeReadFD(fd);
+        }
         /* shutdown() is needed to wake up recv() in the responderThread */
         shutdown(fd, SHUT_RDWR);
         close(fd);
@@ -611,10 +621,14 @@ void DownstreamState::reconnect()
       }
     }
   }
+
+  return connected;
 }
 
 DownstreamState::DownstreamState(const ComboAddress& remote_, const ComboAddress& sourceAddr_, unsigned int sourceItf_, size_t numberOfSockets): remote(remote_), sourceAddr(sourceAddr_), sourceItf(sourceItf_)
 {
+  threadStarted.clear();
+
   mplexer = std::unique_ptr<FDMultiplexer>(FDMultiplexer::getMultiplexerSilent());
 
   sockets.resize(numberOfSockets);
@@ -1827,22 +1841,9 @@ void* healthChecksThread()
           warnlog("Marking downstream %s as '%s'", dss->getNameWithAddr(), newState ? "up" : "down");
 
           if (newState && !dss->connected) {
-            for (auto& fd : dss->sockets) {
-              try {
-                SConnect(fd, dss->remote);
-                {
-                  std::lock_guard<std::mutex> lock(dss->socketsLock);
-                  dss->mplexer->addReadFD(fd, [](int, boost::any) {});
-                }
-                dss->connected = true;
-              }
-              catch(const std::runtime_error& error) {
-                infolog("Error connecting to new server with address %s: %s", dss->remote.toStringWithPort(), error.what());
-                newState = false;
-                dss->connected = false;
-              }
-            }
-            if (dss->connected) {
+            newState = dss->reconnect();
+
+            if (dss->connected && !dss->threadStarted.test_and_set()) {
               dss->tid = thread(responderThread, dss);
             }
           }
@@ -2587,7 +2588,7 @@ try
     for(const auto& address : g_cmdLine.remotes) {
       auto ret=std::make_shared<DownstreamState>(ComboAddress(address, 53));
       addServerToPool(localPools, "", ret);
-      if (ret->connected) {
+      if (ret->connected && !ret->threadStarted.test_and_set()) {
         ret->tid = thread(responderThread, ret);
       }
       g_dstates.modify([ret](servers_t& servers) { servers.push_back(ret); });
index 96a050b8dea1db310a1f031d9bf58fceda5f0c8d..bd89ace6e68a538ac6b8e7d4d86213592e7e782a 100644 (file)
@@ -501,6 +501,7 @@ struct DownstreamState
 
   std::vector<int> sockets;
   std::mutex socketsLock;
+  std::mutex connectLock;
   std::unique_ptr<FDMultiplexer> mplexer{nullptr};
   std::thread tid;
   ComboAddress remote;
@@ -544,8 +545,10 @@ struct DownstreamState
   bool useECS{false};
   bool setCD{false};
   std::atomic<bool> connected{false};
+  std::atomic_flag threadStarted;
   bool tcpFastOpen{false};
   bool ipBindAddrNoPort{true};
+
   bool isUp() const
   {
     if(availability == Availability::Down)
@@ -580,7 +583,7 @@ struct DownstreamState
       status = (upStatus ? "up" : "down");
     return status;
   }
-  void reconnect();
+  bool reconnect();
 };
 using servers_t =vector<std::shared_ptr<DownstreamState>>;