]> granicus.if.org Git - pdns/commitdiff
- Fix multiplexer accounting in the write error case
authorOtto Moerbeek <otto.moerbeek@open-xchange.com>
Wed, 9 Oct 2019 08:35:00 +0000 (10:35 +0200)
committerOtto Moerbeek <otto.moerbeek@open-xchange.com>
Wed, 9 Oct 2019 08:35:00 +0000 (10:35 +0200)
- Use proper type for in-flight accounting

pdns/pdns_recursor.cc
pdns/syncres.hh

index e5a781134114efa8c49b8b6bae68a728e6059542..99cc7273a8d875649925bad933e3c542fb1ede9b 100644 (file)
@@ -746,7 +746,7 @@ static void writePid(void)
   }
 }
 
-TCPConnection::TCPConnection(int fd, const ComboAddress& addr) : data(2, 0), d_remote(addr), d_requestsInFlight(0), d_fd(fd)
+TCPConnection::TCPConnection(int fd, const ComboAddress& addr) : data(2, 0), d_remote(addr), d_fd(fd)
 {
   ++s_currentConnections;
   (*t_tcpClientCounts)[d_remote]++;
@@ -767,7 +767,7 @@ TCPConnection::~TCPConnection()
   --s_currentConnections;
 }
 
-int TCPConnection::s_maxInFlight = 10;
+uint16_t TCPConnection::s_maxInFlight = 10;
 
 AtomicCounter TCPConnection::s_currentConnections;
 
@@ -1750,27 +1750,41 @@ static void startDoResolve(void *p)
       else
         hadError=false;
 
-      // update tcp connection status, either by closing or moving to 'BYTE0'
+      // update tcp connection status, closing if needed and doing the fd multiplexer accounting
 
+      dc->d_tcpConnection->d_requestsInFlight--;
+
+      // In the code below, we try to remove the fd from the set, but
+      // we don't know if another mthread already did the remove, so we can get a
+      // "Tried to remove unlisted fd" exception.  Not that an inflight < limit test
+      // will not work since we do not know if the other mthread got an error or not.
       if(hadError) {
-        // no need to remove us from FDM, we weren't there
+        try {
+          t_fdm->removeReadFD(dc->d_socket);
+        }
+        catch (FDMultiplexerException &) {
+        }
         dc->d_socket = -1;
       }
       else {
         dc->d_tcpConnection->queriesCount++;
         if (g_tcpMaxQueriesPerConn && dc->d_tcpConnection->queriesCount >= g_tcpMaxQueriesPerConn) {
+          try {
+            t_fdm->removeReadFD(dc->d_socket);
+          }
+          catch (FDMultiplexerException &) {
+          }
           dc->d_socket = -1;
         }
         else {
           Utility::gettimeofday(&g_now, 0); // needs to be updated
           struct timeval ttd = g_now;
-          dc->d_tcpConnection->d_requestsInFlight--;
           if (dc->d_tcpConnection->d_requestsInFlight == TCPConnection::s_maxInFlight - 1) {
-            //cerr << "Reenabling... " << dc->d_tcpConnection->d_requestsInFlight << ' ' << dc->d_socket << endl;
+            //cerr << "Reenabling " << dc->d_socket << ' ' << dc->d_tcpConnection->d_requestsInFlight << endl;
             ttd.tv_sec += g_tcpTimeout;
             t_fdm->addReadFD(dc->d_socket, handleRunningTCPQuestion, dc->d_tcpConnection, &ttd);
           } else {
-            t_fdm->setReadTTD(dc->d_socket, ttd,  g_tcpTimeout);
+            t_fdm->setReadTTD(dc->d_socket, ttd, g_tcpTimeout);
           }
         }
       }
@@ -2010,8 +2024,12 @@ static void handleRunningTCPQuestion(int fd, FDMultiplexer::funcparam_t& var)
     if(conn->bytesread==conn->qlen) {
       conn->d_requestsInFlight++;
       if (conn->d_requestsInFlight >= TCPConnection::s_maxInFlight) {
-        //cerr << "Disabling... " << conn->d_requestsInFlight << ' ' << fd << endl;
+        //cerr << "Disabling " << fd << ' ' << conn->d_requestsInFlight << endl;
         t_fdm->removeReadFD(fd); // should no longer awake ourselves when there is data to read
+      } else {
+        Utility::gettimeofday(&g_now, 0); // needed?
+        struct timeval ttd = g_now;
+        t_fdm->setReadTTD(fd, ttd, g_tcpTimeout);
       }
 
       std::unique_ptr<DNSComboWriter> dc;
index c4e0a605d0e5db045751901b9c226a3c5b26c3ba..67689a058aa296e2017db8cbc41aec679fab8b20 100644 (file)
@@ -1012,10 +1012,10 @@ public:
   enum stateenum {BYTE0, BYTE1, GETQUESTION, DONE} state{BYTE0};
   uint16_t qlen{0};
   uint16_t bytesread{0};
-  std::atomic<int> d_requestsInFlight;
+  uint16_t d_requestsInFlight{0}; // number of mthreads spawned for this connection
   static unsigned int getCurrentConnections() { return s_currentConnections; }
   // The max number of concurent TCP queries we're willing to process
-  static int s_maxInFlight;
+  static uint16_t s_maxInFlight;
 private:
   const int d_fd;
   static AtomicCounter s_currentConnections; //!< total number of current TCP connections