]> granicus.if.org Git - pdns/commitdiff
ixfrdist: Speedup and optimize memory usage when writing answers
authorRemi Gacogne <remi.gacogne@powerdns.com>
Wed, 29 Aug 2018 12:29:55 +0000 (14:29 +0200)
committerRemi Gacogne <remi.gacogne@powerdns.com>
Wed, 29 Aug 2018 12:46:03 +0000 (14:46 +0200)
We now group as many records as possible inside one DNS message,
and send a message as soon as it is ready, so we can reduce the
memory usage by not keeping all outgoing messages in memory,
and send a lower number of messages.

pdns/ixfrdist.cc

index a3e0a5db72fab0dd7303fbc9a5f09b8fb7e1a9c1..27047007c3b751c6e56e60fdb7190e34d17e865a 100644 (file)
@@ -135,7 +135,7 @@ bool g_exiting = false;
 
 NetmaskGroup g_acl;
 
-void handleSignal(int signum) {
+static void handleSignal(int signum) {
   g_log<<Logger::Notice<<"Got "<<strsignal(signum)<<" signal";
   if (g_exiting) {
     g_log<<Logger::Notice<<", this is the second time we were asked to stop, forcefully exiting"<<endl;
@@ -145,17 +145,17 @@ void handleSignal(int signum) {
   g_exiting = true;
 }
 
-void usage(po::options_description &desc) {
+static void usage(po::options_description &desc) {
   cerr << "Usage: ixfrdist [OPTION]..."<<endl;
   cerr << desc << "\n";
 }
 
 // The compiler does not like using rfc1982LessThan in std::sort directly
-bool sortSOA(uint32_t i, uint32_t j) {
+static bool sortSOA(uint32_t i, uint32_t j) {
   return rfc1982LessThan(i, j);
 }
 
-void cleanUpDomain(const DNSName& domain, const uint16_t& keep, const string& workdir) {
+static void cleanUpDomain(const DNSName& domain, const uint16_t& keep, const string& workdir) {
   string dir = workdir + "/" + domain.toString();
   DIR *dp;
   dp = opendir(dir.c_str());
@@ -405,7 +405,7 @@ void updateThread(const string& workdir, const uint16_t& keep, const uint16_t& a
   } /* while (true) */
 } /* updateThread */
 
-bool checkQuery(const MOADNSParser& mdp, const ComboAddress& saddr, const bool udp = true, const string& logPrefix="") {
+static bool checkQuery(const MOADNSParser& mdp, const ComboAddress& saddr, const bool udp = true, const string& logPrefix="") {
   vector<string> info_msg;
 
   g_log<<Logger::Debug<<logPrefix<<"Had "<<mdp.d_qname<<"|"<<QType(mdp.d_qtype).getName()<<" query from "<<saddr.toStringWithPort()<<endl;
@@ -451,7 +451,7 @@ bool checkQuery(const MOADNSParser& mdp, const ComboAddress& saddr, const bool u
  * Returns a vector<uint8_t> that represents the full response to a SOA
  * query. QNAME is read from mdp.
  */
-bool makeSOAPacket(const MOADNSParser& mdp, vector<uint8_t>& packet) {
+static bool makeSOAPacket(const MOADNSParser& mdp, vector<uint8_t>& packet) {
 
   auto zoneInfo = getCurrentZoneInfo(mdp.d_qname);
   if (zoneInfo == nullptr) {
@@ -470,7 +470,7 @@ bool makeSOAPacket(const MOADNSParser& mdp, vector<uint8_t>& packet) {
   return true;
 }
 
-vector<uint8_t> getSOAPacket(const MOADNSParser& mdp, const shared_ptr<SOARecordContent>& soa) {
+static vector<uint8_t> getSOAPacket(const MOADNSParser& mdp, const shared_ptr<SOARecordContent>& soa) {
   vector<uint8_t> packet;
   DNSPacketWriter pw(packet, mdp.d_qname, mdp.d_qtype);
   pw.getHeader()->id = mdp.d_header.id;
@@ -484,7 +484,75 @@ vector<uint8_t> getSOAPacket(const MOADNSParser& mdp, const shared_ptr<SOARecord
   return packet;
 }
 
-bool makeAXFRPackets(const MOADNSParser& mdp, vector<vector<uint8_t>>& packets) {
+static bool sendPacketOverTCP(int fd, const std::vector<uint8_t>& packet)
+{
+  char sendBuf[2];
+  sendBuf[0]=packet.size()/256;
+  sendBuf[1]=packet.size()%256;
+
+  ssize_t send = writen2(fd, sendBuf, 2);
+  send += writen2(fd, &packet[0], packet.size());
+  return true;
+}
+
+static bool addRecordToWriter(DNSPacketWriter& pw, const DNSName& zoneName, const DNSRecord& record)
+{
+  pw.startRecord(record.d_name + zoneName, record.d_type);
+  record.d_content->toPacket(pw);
+  if (pw.size() > 65535) {
+    pw.rollback();
+    return false;
+  }
+  return true;
+}
+
+template <typename T> static bool sendRecordsOverTCP(int fd, const MOADNSParser& mdp, const T& records)
+{
+  vector<uint8_t> packet;
+
+  for (auto it = records.cbegin(); it != records.cend();) {
+    bool recordsAdded = false;
+    packet.clear();
+    DNSPacketWriter pw(packet, mdp.d_qname, mdp.d_qtype);
+    pw.getHeader()->id = mdp.d_header.id;
+    pw.getHeader()->rd = mdp.d_header.rd;
+    pw.getHeader()->qr = 1;
+
+    while (it != records.cend()) {
+      if (it->d_type == QType::SOA) {
+        it++;
+        continue;
+      }
+
+      if (addRecordToWriter(pw, mdp.d_qname, *it)) {
+        recordsAdded = true;
+        it++;
+      }
+      else {
+        if (recordsAdded) {
+          pw.commit();
+          sendPacketOverTCP(fd, packet);
+        }
+        if (it == records.cbegin()) {
+          /* something is wrong */
+          return false;
+        }
+
+        break;
+      }
+    }
+
+    if (it == records.cend() && recordsAdded) {
+      pw.commit();
+      sendPacketOverTCP(fd, packet);
+    }
+  }
+
+  return true;
+}
+
+
+static bool handleAXFR(int fd, const MOADNSParser& mdp) {
   /* we get a shared pointer of the zone info that we can't modify, ever.
      A newer one may arise in the meantime, but this one will stay valid
      until we release it.
@@ -496,56 +564,29 @@ bool makeAXFRPackets(const MOADNSParser& mdp, vector<vector<uint8_t>>& packets)
 
   shared_ptr<SOARecordContent> soa = zoneInfo->soa;
   const records_t& records = zoneInfo->latestAXFR;
-  packets.reserve(packets.size() + /* SOAs */ 2 + records.size());
 
   // Initial SOA
   const auto soaPacket = getSOAPacket(mdp, soa);
-  packets.push_back(soaPacket);
+  if (!sendPacketOverTCP(fd, soaPacket)) {
+    return false;
+  }
 
-  for (auto const &record : records) {
-    if (record.d_type == QType::SOA) {
-      continue;
-    }
-    vector<uint8_t> packet;
-    DNSPacketWriter pw(packet, mdp.d_qname, mdp.d_qtype);
-    pw.getHeader()->id = mdp.d_header.id;
-    pw.getHeader()->rd = mdp.d_header.rd;
-    pw.getHeader()->qr = 1;
-    pw.startRecord(record.d_name + mdp.d_qname, record.d_type);
-    record.d_content->toPacket(pw);
-    pw.commit();
-    packets.push_back(packet);
+  if (!sendRecordsOverTCP(fd, mdp, records)) {
+    return false;
   }
 
   // Final SOA
-  packets.push_back(soaPacket);
+  if (!sendPacketOverTCP(fd, soaPacket)) {
+    return false;
+  }
 
   return true;
 }
 
-void makeXFRPacketsFromDNSRecords(const MOADNSParser& mdp, const vector<DNSRecord>& records, vector<vector<uint8_t>>& packets) {
-
-  for(const auto& r : records) {
-    if (r.d_type == QType::SOA) {
-      continue;
-    }
-
-    vector<uint8_t> packet;
-    DNSPacketWriter pw(packet, mdp.d_qname, mdp.d_qtype);
-    pw.getHeader()->id = mdp.d_header.id;
-    pw.getHeader()->rd = mdp.d_header.rd;
-    pw.getHeader()->qr = 1;
-    pw.startRecord(r.d_name + mdp.d_qname, r.d_type);
-    r.d_content->toPacket(pw);
-    pw.commit();
-    packets.push_back(packet);
-  }
-}
-
 /* Produces an IXFR if one can be made according to the rules in RFC 1995 and
  * creates a SOA or AXFR packet when required by the RFC.
  */
-bool makeIXFRPackets(const MOADNSParser& mdp, const shared_ptr<SOARecordContent>& clientSOA, vector<vector<uint8_t>>& packets) {
+static bool handleIXFR(int fd, const ComboAddress& destination, const MOADNSParser& mdp, const shared_ptr<SOARecordContent>& clientSOA) {
   vector<std::shared_ptr<ixfrdiff_t>> toSend;
 
   /* we get a shared pointer of the zone info that we can't modify, ever.
@@ -568,7 +609,7 @@ bool makeIXFRPackets(const MOADNSParser& mdp, const shared_ptr<SOARecordContent>
     vector<uint8_t> packet;
     bool ret = makeSOAPacket(mdp, packet);
     if (ret) {
-      packets.push_back(packet);
+      sendPacketOverTCP(fd, packet);
     }
     return ret;
   }
@@ -590,9 +631,10 @@ bool makeIXFRPackets(const MOADNSParser& mdp, const shared_ptr<SOARecordContent>
 
   if (toSend.empty()) {
     g_log<<Logger::Warning<<"No IXFR available from serial "<<clientSOA->d_st.serial<<" for zone "<<mdp.d_qname<<", attempting to send AXFR"<<endl;
-    return makeAXFRPackets(mdp, packets);
+    return handleAXFR(fd, mdp);
   }
 
+  std::vector<std::vector<uint8_t>> packets;
   for (const auto& diff : toSend) {
     /* An IXFR packet's ANSWER section looks as follows:
      * SOA new_serial
@@ -602,24 +644,43 @@ bool makeIXFRPackets(const MOADNSParser& mdp, const shared_ptr<SOARecordContent>
      * ... added records ...
      * SOA new_serial
      */
-    packets.reserve(packets.size() + /* SOAs */ 4 + diff->removals.size() + diff->additions.size());
 
-    packets.push_back(getSOAPacket(mdp, diff->newSOA));
-    packets.push_back(getSOAPacket(mdp, diff->oldSOA));
-    makeXFRPacketsFromDNSRecords(mdp, diff->removals, packets);
-    packets.push_back(getSOAPacket(mdp, diff->newSOA));
-    makeXFRPacketsFromDNSRecords(mdp, diff->additions, packets);
-    packets.push_back(getSOAPacket(mdp, diff->newSOA));
+    const auto newSOAPacket = getSOAPacket(mdp, diff->newSOA);
+    const auto oldSOAPacket = getSOAPacket(mdp, diff->oldSOA);
+
+    if (!sendPacketOverTCP(fd, newSOAPacket)) {
+      return false;
+    }
+
+    if (!sendPacketOverTCP(fd, oldSOAPacket)) {
+      return false;
+    }
+
+    if (!sendRecordsOverTCP(fd, mdp, diff->removals)) {
+      return false;
+    }
+
+    if (!sendPacketOverTCP(fd, newSOAPacket)) {
+      return false;
+    }
+
+    if (!sendRecordsOverTCP(fd, mdp, diff->additions)) {
+      return false;
+    }
+
+    if (!sendPacketOverTCP(fd, newSOAPacket)) {
+      return false;
+    }
   }
 
   return true;
 }
 
-bool allowedByACL(const ComboAddress& addr) {
+static bool allowedByACL(const ComboAddress& addr) {
   return g_acl.match(addr);
 }
 
-void handleUDPRequest(int fd, boost::any&) {
+static void handleUDPRequest(int fd, boost::any&) {
   // TODO make the buffer-size configurable
   char buf[4096];
   ComboAddress saddr;
@@ -672,7 +733,7 @@ void handleUDPRequest(int fd, boost::any&) {
   return;
 }
 
-void handleTCPRequest(int fd, boost::any&) {
+static void handleTCPRequest(int fd, boost::any&) {
   ComboAddress saddr;
   int cfd = 0;
 
@@ -705,7 +766,7 @@ void handleTCPRequest(int fd, boost::any&) {
 
 /* Thread to handle TCP traffic
  */
-void tcpWorker(int tid) {
+static void tcpWorker(int tid) {
   string prefix = "TCP Worker " + std::to_string(tid) + ": ";
 
   while(true) {
@@ -746,7 +807,6 @@ void tcpWorker(int tid) {
         continue;
       }
 
-      vector<vector<uint8_t>> packets;
       if (mdp.d_qtype == QType::SOA) {
         vector<uint8_t> packet;
         bool ret = makeSOAPacket(mdp, packet);
@@ -754,17 +814,15 @@ void tcpWorker(int tid) {
           close(cfd);
           continue;
         }
-        packets.push_back(packet);
+        sendPacketOverTCP(cfd, packet);
       }
-
-      if (mdp.d_qtype == QType::AXFR) {
-        if (!makeAXFRPackets(mdp, packets)) {
+      else if (mdp.d_qtype == QType::AXFR) {
+        if (!handleAXFR(cfd, mdp)) {
           close(cfd);
           continue;
         }
       }
-
-      if (mdp.d_qtype == QType::IXFR) {
+      else if (mdp.d_qtype == QType::IXFR) {
         /* RFC 1995 section 3:
          *  The IXFR query packet format is the same as that of a normal DNS
          *  query, but with the query type being IXFR and the authority section
@@ -788,21 +846,12 @@ void tcpWorker(int tid) {
           continue;
         }
 
-        if (!makeIXFRPackets(mdp, clientSOA, packets)) {
+        if (!handleIXFR(cfd, saddr, mdp, clientSOA)) {
           close(cfd);
           continue;
         }
       } /* if (mdp.d_qtype == QType::IXFR) */
 
-      g_log<<Logger::Debug<<prefix<<"Sending "<<packets.size()<<" packets to "<<saddr.toStringWithPort()<<endl;
-      for (const auto& packet : packets) {
-        char sendBuf[2];
-        sendBuf[0]=packet.size()/256;
-        sendBuf[1]=packet.size()%256;
-
-        ssize_t send = writen2(cfd, sendBuf, 2);
-        send += writen2(cfd, &packet[0], packet.size());
-      }
       shutdown(cfd, 2);
     } catch (MOADNSException &e) {
       g_log<<Logger::Warning<<prefix<<"Could not parse DNS packet from "<<saddr.toStringWithPort()<<": "<<e.what()<<endl;
@@ -822,7 +871,7 @@ void tcpWorker(int tid) {
  * missing parameters (if applicable), returning true if the config file was
  * good, false otherwise. Will log all issues with the config
  */
-bool parseAndCheckConfig(const string& configpath, YAML::Node& config) {
+static bool parseAndCheckConfig(const string& configpath, YAML::Node& config) {
   g_log<<Logger::Info<<"Loading configuration file from "<<configpath<<endl;
   try {
     config = YAML::LoadFile(configpath);