]> granicus.if.org Git - pdns/commitdiff
ixfrdist: Use workers to handle TCP connections
authorPieter Lexis <pieter.lexis@powerdns.com>
Thu, 12 Apr 2018 13:26:27 +0000 (15:26 +0200)
committerPieter Lexis <pieter.lexis@powerdns.com>
Thu, 12 Apr 2018 13:31:45 +0000 (15:31 +0200)
pdns/ixfrdist.cc

index d50fc5dfd924b11d7632118a23781ec7209d7bab..6a76b0e68422e0af63c7c0acd72e36ba2b07070b 100644 (file)
@@ -31,6 +31,8 @@
 #include <mutex>
 #include <thread>
 #include <dirent.h>
+#include <queue>
+#include <condition_variable>
 #include "ixfr.hh"
 #include "ixfrutils.hh"
 #include "resolver.hh"
@@ -63,6 +65,11 @@ set<DNSName> g_domains;
 std::map<DNSName, ixfrinfo_t> g_soas;
 std::mutex g_soas_mutex;
 
+// Condition variable for TCP handling
+std::condition_variable g_tcpHandlerCV;
+std::queue<pair<int, ComboAddress>> g_tcpRequestFDs;
+std::mutex g_tcpRequestFDsMutex;
+
 using namespace boost::multi_index;
 
 namespace po = boost::program_options;
@@ -625,91 +632,130 @@ void handleTCPRequest(int fd, boost::any&) {
     return;
   }
 
-  char buf[4096];
-  ssize_t res;
-  try {
-    uint16_t toRead;
-    readn2(cfd, &toRead, sizeof(toRead));
-    toRead = std::min(ntohs(toRead), static_cast<uint16_t>(sizeof(buf)));
-    res = readn2WithTimeout(cfd, &buf, toRead, 2);
-  } catch (runtime_error &e) {
-    cerr<<"[WARNING] Could not read message from "<<saddr.toStringWithPort()<<": "<<e.what()<<endl;
-    close(cfd);
-    return;
+  {
+    std::lock_guard<std::mutex> lg(g_tcpRequestFDsMutex);
+    g_tcpRequestFDs.push({cfd, saddr});
   }
+  g_tcpHandlerCV.notify_one();
+}
 
-  try {
-    MOADNSParser mdp(true, string(buf, res));
+/* Thread to handle TCP traffic
+ */
+void tcpWorker(int tid) {
+  string prefix = "TCP Worker " + std::to_string(tid) + ": ";
+
+  while(true) {
+    if (g_debug) {
+      cerr<<"[DEBUG] "<<prefix<<"ready for a new request!"<<endl;
+    }
+    std::unique_lock<std::mutex> lk(g_tcpRequestFDsMutex);
+    g_tcpHandlerCV.wait(lk, []{return g_tcpRequestFDs.size() || g_exiting ;});
+    if (g_exiting) {
+      if (g_debug) {
+        cerr<<"[DEBUG] "<<prefix<<"stopping thread"<<endl;
+      }
+      break;
+    }
+    if (g_debug) {
+      cerr<<"[DEBUG] "<<prefix<<"Going to handle a query"<<endl;
+    }
+    auto request = g_tcpRequestFDs.front();
+    g_tcpRequestFDs.pop();
+    lk.unlock();
 
-    if (!checkQuery(mdp, saddr, false)) {
+    int cfd = request.first;
+    ComboAddress saddr = request.second;
+
+    char buf[4096];
+    ssize_t res;
+    try {
+      uint16_t toRead;
+      readn2(cfd, &toRead, sizeof(toRead));
+      toRead = std::min(ntohs(toRead), static_cast<uint16_t>(sizeof(buf)));
+      res = readn2WithTimeout(cfd, &buf, toRead, 2);
+    } catch (runtime_error &e) {
+      cerr<<"[WARNING] "<<prefix<<"Could not read message from "<<saddr.toStringWithPort()<<": "<<e.what()<<endl;
       close(cfd);
-      return;
+      continue;
     }
 
-    vector<vector<uint8_t>> packets;
-    if (mdp.d_qtype == QType::SOA) {
-    vector<uint8_t> packet;
-      bool ret = makeSOAPacket(mdp, packet);
-      if (!ret) {
+    try {
+      MOADNSParser mdp(true, string(buf, res));
+
+      if (!checkQuery(mdp, saddr, false)) {
         close(cfd);
-        return;
+        continue;
       }
-      packets.push_back(packet);
-    }
 
-    if (mdp.d_qtype == QType::AXFR) {
-      if (!makeAXFRPackets(mdp, packets)) {
-        close(cfd);
-        return;
+      vector<vector<uint8_t>> packets;
+      if (mdp.d_qtype == QType::SOA) {
+      vector<uint8_t> packet;
+        bool ret = makeSOAPacket(mdp, packet);
+        if (!ret) {
+          close(cfd);
+          continue;
+        }
+        packets.push_back(packet);
       }
-    }
 
-    if (mdp.d_qtype == QType::IXFR) {
-      /* RFC 1995 section 3:
-       *  The IXFR query packet format is the same as that of a normal DNS
-       *  query, but with the query type being IXFR and the authority section
-       *  containing the SOA record of client's version of the zone.
-       */
-      shared_ptr<SOARecordContent> clientSOA;
-      for (auto &answer : mdp.d_answers) {
-        // from dnsparser.hh:
-        // typedef vector<pair<DNSRecord, uint16_t > > answers_t;
-        if (answer.first.d_type == QType::SOA && answer.first.d_place == DNSResourceRecord::AUTHORITY) {
-          clientSOA = getRR<SOARecordContent>(answer.first);
-          if (clientSOA != nullptr) {
-            break;
+      if (mdp.d_qtype == QType::AXFR) {
+        if (!makeAXFRPackets(mdp, packets)) {
+          close(cfd);
+          continue;
+        }
+      }
+
+      if (mdp.d_qtype == QType::IXFR) {
+        /* RFC 1995 section 3:
+         *  The IXFR query packet format is the same as that of a normal DNS
+         *  query, but with the query type being IXFR and the authority section
+         *  containing the SOA record of client's version of the zone.
+         */
+        shared_ptr<SOARecordContent> clientSOA;
+        for (auto &answer : mdp.d_answers) {
+          // from dnsparser.hh:
+          // typedef vector<pair<DNSRecord, uint16_t > > answers_t;
+          if (answer.first.d_type == QType::SOA && answer.first.d_place == DNSResourceRecord::AUTHORITY) {
+            clientSOA = getRR<SOARecordContent>(answer.first);
+            if (clientSOA != nullptr) {
+              break;
+            }
           }
+        } /* for (auto const &answer : mdp.d_answers) */
+
+        if (clientSOA == nullptr) {
+          cerr<<"[WARNING] "<<prefix<<"IXFR request packet did not contain a SOA record in the AUTHORITY section"<<endl;
+          close(cfd);
+          continue;
         }
-      } /* for (auto const &answer : mdp.d_answers) */
 
-      if (clientSOA == nullptr) {
-        cerr<<"[WARNING] IXFR request packet did not contain a SOA record in the AUTHORITY section"<<endl;
-        close(cfd);
-        return;
-      }
+        if (!makeIXFRPackets(mdp, clientSOA, packets)) {
+          close(cfd);
+          continue;
+        }
+      } /* if (mdp.d_qtype == QType::IXFR) */
 
-      if (!makeIXFRPackets(mdp, clientSOA, packets)) {
-        close(cfd);
-        return;
-      }
-    } /* if (mdp.d_qtype == QType::IXFR) */
+      for (const auto& packet : packets) {
+        char sendBuf[2];
+        sendBuf[0]=packet.size()/256;
+        sendBuf[1]=packet.size()%256;
 
-    for (const auto& packet : packets) {
-      char sendBuf[2];
-      sendBuf[0]=packet.size()/256;
-      sendBuf[1]=packet.size()%256;
+        ssize_t send = writen2(cfd, sendBuf, 2);
+        send += writen2(cfd, &packet[0], packet.size());
+      }
+      shutdown(cfd, 2);
+    } catch (MOADNSException &e) {
+      cerr<<"[WARNING] "<<prefix<<"Could not parse DNS packet from "<<saddr.toStringWithPort()<<": "<<e.what()<<endl;
+    } catch (runtime_error &e) {
+      cerr<<"[WARNING] "<<prefix<<"Could not write reply to "<<saddr.toStringWithPort()<<": "<<e.what()<<endl;
+    }
+    // bye!
+    close(cfd);
 
-      ssize_t send = writen2(cfd, sendBuf, 2);
-      send += writen2(cfd, &packet[0], packet.size());
+    if (g_exiting) {
+      break;
     }
-    shutdown(cfd, 2);
-  } catch (MOADNSException &e) {
-    cerr<<"[WARNING] Could not parse DNS packet from "<<saddr.toStringWithPort()<<": "<<e.what()<<endl;
-  } catch (runtime_error &e) {
-    cerr<<"[WARNING] Could not write reply to "<<saddr.toStringWithPort()<<": "<<e.what()<<endl;
   }
-  // bye!
-  close(cfd);
 }
 
 int main(int argc, char** argv) {
@@ -727,7 +773,8 @@ int main(int argc, char** argv) {
       ("server-address", po::value<string>()->default_value("127.0.0.1:5300"), "server address")
       ("work-dir", po::value<string>()->default_value("."), "Directory for storing AXFR and IXFR data")
       ("keep", po::value<uint16_t>()->default_value(KEEP_DEFAULT), "Number of old zone versions to retain")
-      ("axfr-timeout", po::value<uint16_t>()->default_value(AXFRTIMEOUT_DEFAULT), "Timeout in seconds for an AXFR to complete")
+      ("axfr-timeout", po::value<uint16_t>()->default_value(AXFRTIMEOUT_DEFAULT), "Timeout in seconds for an inbound AXFR to complete")
+      ("tcp-out-threads", po::value<uint16_t>()->default_value(10), "Number of maximum simultaneous outbound TCP connections")
       ;
     po::options_description alloptions;
     po::options_description hidden("hidden options");
@@ -920,6 +967,7 @@ int main(int argc, char** argv) {
   signal(SIGTERM, handleSignal);
   signal(SIGINT, handleSignal);
   signal(SIGSTOP, handleSignal);
+  signal(SIGPIPE, SIG_IGN);
 
   // Init the things we need
   reportAllTypes();
@@ -930,6 +978,10 @@ int main(int argc, char** argv) {
   cout<<"[INFO] IXFR distributor starting up!"<<endl;
 
   std::thread ut(updateThread);
+  std::thread tcpHandlers[g_vm["tcp-out-threads"].as<uint16_t>()];
+  for (int i=0; i<g_vm["tcp-out-threads"].as<uint16_t>(); i++) {
+    tcpHandlers[i] = std::thread(tcpWorker, i);
+  }
 
   struct timeval now;
   for(;;) {
@@ -950,6 +1002,10 @@ int main(int argc, char** argv) {
     }
   }
   ut.join();
+  g_tcpHandlerCV.notify_all();
+  for (int i=0; i<g_vm["tcp-out-threads"].as<uint16_t>(); i++) {
+    tcpHandlers[i].join();
+  }
   if (g_verbose) {
     cerr<<"[INFO] IXFR distributor stopped"<<endl;
   }