]> granicus.if.org Git - pdns/commitdiff
Proper in-flight maintenance; settable setting with doc.
authorOtto Moerbeek <otto.moerbeek@open-xchange.com>
Fri, 11 Oct 2019 09:22:39 +0000 (11:22 +0200)
committerOtto Moerbeek <otto.moerbeek@open-xchange.com>
Fri, 11 Oct 2019 09:22:39 +0000 (11:22 +0200)
pdns/pdns_recursor.cc
pdns/recursordist/docs/settings.rst
pdns/sdig.cc
pdns/syncres.hh

index 636550cf361de4042f3bae5c517bf4b07a98dc8f..adcbee595c375c2ca715f7ec13831ac6a5c92ff0 100644 (file)
@@ -748,12 +748,14 @@ static void writePid(void)
 
 TCPConnection::TCPConnection(int fd, const ComboAddress& addr) : data(2, 0), d_remote(addr), d_fd(fd)
 {
+  d_maxInFlight = ::arg().asNum("max-concurrent-requests-per-tcp-connection");
   ++s_currentConnections;
   (*t_tcpClientCounts)[d_remote]++;
 }
 
 TCPConnection::~TCPConnection()
 {
+  g_log<<Logger::Warning<<"closing socket for TCPConnection " <<d_fd <<endl;
   try {
     if(closesocket(d_fd) < 0)
       g_log<<Logger::Error<<"Error closing socket for TCPConnection"<<endl;
@@ -767,8 +769,6 @@ TCPConnection::~TCPConnection()
   --s_currentConnections;
 }
 
-uint16_t TCPConnection::s_maxInFlight = 10;
-
 AtomicCounter TCPConnection::s_currentConnections;
 
 static void handleRunningTCPQuestion(int fd, FDMultiplexer::funcparam_t& var);
@@ -1779,11 +1779,11 @@ static void startDoResolve(void *p)
         else {
           Utility::gettimeofday(&g_now, 0); // needs to be updated
           struct timeval ttd = g_now;
-          if (dc->d_tcpConnection->d_requestsInFlight == TCPConnection::s_maxInFlight - 1) {
+          if (dc->d_tcpConnection->d_requestsInFlight == dc->d_tcpConnection->d_maxInFlight - 1) {
             // A read error might have happened. If we add the fd back, it will most likely error again.
             // This is not a big issue, the next handleTCPClientReadable() will see another read error
             // and take action.
-            //cerr << "Reenabling " << dc->d_socket << ' ' << dc->d_tcpConnection->d_requestsInFlight << endl;
+            cerr << "Reenabling " << dc->d_socket << ' ' << dc->d_tcpConnection->d_requestsInFlight << endl;
             ttd.tv_sec += g_tcpTimeout;
             t_fdm->addReadFD(dc->d_socket, handleRunningTCPQuestion, dc->d_tcpConnection, &ttd);
           } else {
@@ -2030,16 +2030,7 @@ static void handleRunningTCPQuestion(int fd, FDMultiplexer::funcparam_t& var)
     }
     conn->bytesread+=(uint16_t)bytes;
     if(conn->bytesread==conn->qlen) {
-      conn->d_requestsInFlight++;
-      if (conn->d_requestsInFlight >= TCPConnection::s_maxInFlight) {
-        //cerr << "Disabling " << fd << ' ' << conn->d_requestsInFlight << endl;
-        t_fdm->removeReadFD(fd); // should no longer awake ourselves when there is data to read
-      } else {
-        Utility::gettimeofday(&g_now, 0); // needed?
-        struct timeval ttd = g_now;
-        t_fdm->setReadTTD(fd, ttd, g_tcpTimeout);
-      }
-
+      conn->state = TCPConnection::BYTE0;
       std::unique_ptr<DNSComboWriter> dc;
       try {
         dc=std::unique_ptr<DNSComboWriter>(new DNSComboWriter(conn->data, g_now));
@@ -2173,8 +2164,16 @@ static void handleRunningTCPQuestion(int fd, FDMultiplexer::funcparam_t& var)
       else {
         ++g_stats.qcounter;
         ++g_stats.tcpqcounter;
+        ++conn->d_requestsInFlight;
+        if (conn->d_requestsInFlight >= conn->d_maxInFlight) {
+          cerr << "Disabling " << fd << ' ' << conn->d_requestsInFlight << endl;
+          t_fdm->removeReadFD(fd); // should no longer awake ourselves when there is data to read
+        } else {
+          Utility::gettimeofday(&g_now, 0); // needed?
+          struct timeval ttd = g_now;
+          t_fdm->setReadTTD(fd, ttd, g_tcpTimeout);
+        }
         MT->makeThread(startDoResolve, dc.release()); // deletes dc
-        conn->state = TCPConnection::BYTE0;
         return;
       }
     }
@@ -4584,6 +4583,7 @@ int main(int argc, char **argv)
     ::arg().set("client-tcp-timeout","Timeout in seconds when talking to TCP clients")="2";
     ::arg().set("max-mthreads", "Maximum number of simultaneous Mtasker threads")="2048";
     ::arg().set("max-tcp-clients","Maximum number of simultaneous TCP clients")="128";
+    ::arg().set("max-concurrent-requests-per-tcp-connection", "Maximum number of requests handled concurrently per TCP connection") = "10";
     ::arg().set("server-down-max-fails","Maximum number of consecutive timeouts (and unreachables) to mark a server as down ( 0 => disabled )")="64";
     ::arg().set("server-down-throttle-time","Number of seconds to throttle all queries to a server after being marked as down")="60";
     ::arg().set("dont-throttle-names", "Do not throttle nameservers with this name or suffix")="";
index d7c9f4ac176ed0ce32eeb3211e65d8f4aaa5a4ee..a76958ffe3e6583282d389c248d212943d14b9fe 100644 (file)
@@ -861,6 +861,15 @@ Maximum number of seconds to cache an item in the DNS cache, no matter what the
 
     The minimum value of this setting is 15. i.e. setting this to lower than 15 will make this value 15.
 
+.. _setting max-concurrent-requests-per-tcp-connection:
+
+``max-concurrent-requests-per-tcp-connection``
+------------------------------------------
+-  Integer
+-  Default: 10
+
+Maximum number of requests handled concurrently per tcp connection.
+
 .. _setting-max-mthreads:
 
 ``max-mthreads``
index fc0d36074d7fcb850a9aced45dbbf9e6175407ab..c4bba29a4b6ec432040a4a960b856d913179ee33 100644 (file)
@@ -302,7 +302,7 @@ try {
       string question(packet.begin(), packet.end());
       sock.writen(question);
     }
-    for (const auto& it : questions) {
+    for (size_t i = 0; i < questions.size(); i++) {
       uint16_t len;
       if (sock.read((char *)&len, 2) != 2)
         throw PDNSException("tcp read failed");
index 67689a058aa296e2017db8cbc41aec679fab8b20..ee95715120f63ed4751f97d1dafb3dd0b5ee9db3 100644 (file)
@@ -1013,9 +1013,9 @@ public:
   uint16_t qlen{0};
   uint16_t bytesread{0};
   uint16_t d_requestsInFlight{0}; // number of mthreads spawned for this connection
+  // The max number of concurrent TCP requests we're willing to process
+  uint16_t d_maxInFlight;
   static unsigned int getCurrentConnections() { return s_currentConnections; }
-  // The max number of concurent TCP queries we're willing to process
-  static uint16_t s_maxInFlight;
 private:
   const int d_fd;
   static AtomicCounter s_currentConnections; //!< total number of current TCP connections