]> granicus.if.org Git - pdns/commitdiff
ixfrdist: working UDP and TCP SOA queries and update thread
authorPieter Lexis <pieter.lexis@powerdns.com>
Wed, 17 Jan 2018 18:36:30 +0000 (19:36 +0100)
committerPieter Lexis <pieter.lexis@powerdns.com>
Mon, 29 Jan 2018 08:20:13 +0000 (09:20 +0100)
pdns/ixfrdist.cc
pdns/ixfrutils.cc
pdns/ixfrutils.hh

index 30e9a12e43d6d8edcdffb062c6c54558693c8d3c..dda8829d19efcfe8bd08e3a6ef59006fc2a9be99 100644 (file)
@@ -24,6 +24,7 @@
 #endif
 #include <boost/program_options.hpp>
 #include <sys/stat.h>
+#include <mutex>
 #include "ixfr.hh"
 #include "ixfrutils.hh"
 #include "resolver.hh"
@@ -31,6 +32,7 @@
 #include "sstuff.hh"
 #include "mplexer.hh"
 
+
 /* BEGIN Needed because of deeper dependencies */
 #include "arguments.hh"
 #include "statbag.hh"
@@ -50,12 +52,17 @@ SelectFDMultiplexer g_fdm;
 // The domains we support
 set<DNSName> g_domains;
 
+// Map domains to SOA Records and have a mutex to update it.
+std::map<DNSName, shared_ptr<SOARecordContent>> g_soas;
+std::mutex g_soas_mutex;
+
 using namespace boost::multi_index;
 
 namespace po = boost::program_options;
 po::variables_map g_vm;
 string g_workdir;
 ComboAddress g_master;
+
 bool g_verbose = false;
 bool g_debug = false;
 
@@ -64,8 +71,7 @@ void usage(po::options_description &desc) {
   cerr << desc << "\n";
 }
 
-void updateThread() {
-  std::map<DNSName, uint32_t> serials;
+void* updateThread(void*) {
   std::map<DNSName, time_t> lastCheck;
 
   // Initialize the serials we have
@@ -73,45 +79,55 @@ void updateThread() {
     lastCheck[domain] = 0;
     string dir = g_workdir + "/" + domain.toString();
     try {
-      serials[domain] = getSerialsFromDir(dir);
+      auto serial = getSerialsFromDir(dir);
+      shared_ptr<SOARecordContent> soa;
+      {
+        loadSOAFromDisk(domain, g_workdir + "/" + domain.toString() + "/" + std::to_string(serial), soa);
+        std::lock_guard<std::mutex> guard(g_soas_mutex);
+        if (soa != nullptr) {
+          g_soas[domain] = soa;
+        }
+      }
     } catch (runtime_error &e) {
       // Most likely, the directory does not exist.
       cerr<<"[INFO] "<<e.what()<<", attempting to create"<<endl;
       // Attempt to create it, if _that_ fails, there is no hope
       if (mkdir(dir.c_str(), 0777) == -1) {
-        cerr<<"[Error] Could not create '"<<dir<<"': "<<strerror(errno)<<endl;
+        cerr<<"[ERROR] Could not create '"<<dir<<"': "<<strerror(errno)<<endl;
         exit(EXIT_FAILURE);
       }
     }
   }
 
+  if (g_verbose) {
+    cerr<<"[INFO] Update Thread started"<<endl;
+  }
+
   while (true) {
     time_t now = time(nullptr);
     for (const auto &domain : g_domains) {
       string dir = g_workdir + "/" + domain.toString();
       if (now - lastCheck[domain] < 30) { // YOLO 30 seconds
-          continue;
+        continue;
       }
-      if (serials.find(domain) != serials.end() && serials[domain] != 0) {
-        if (g_verbose) {
-          cerr<<"[INFO] Attempting to retrieve SOA Serial update for '"<<domain<<"' from '"<<g_master.toStringWithPort()<<"'"<<endl;
+      if (g_verbose) {
+        cerr<<"[INFO] Attempting to retrieve SOA Serial update for '"<<domain<<"' from '"<<g_master.toStringWithPort()<<"'"<<endl;
+      }
+      shared_ptr<SOARecordContent> sr;
+      try {
+        auto newSerial = getSerialFromMaster(g_master, domain, sr); // TODO TSIG
+        if(g_soas.find(domain) != g_soas.end() && g_verbose) {
+          cerr<<"[INFO]   Got SOA Serial: "<< newSerial<<", had Serial: "<<g_soas[domain]->d_st.serial<<endl;
         }
-        shared_ptr<SOARecordContent> sr;
-        try {
-          auto newSerial = getSerialFromMaster(g_master, domain, sr); // TODO TSIG
+        if (g_soas.find(domain) != g_soas.end() && newSerial == g_soas[domain]->d_st.serial) {
           if (g_verbose) {
-            cerr<<"[INFO]   Got SOA Serial: "<< newSerial<<", had Serial: "<<serials[domain]<<endl;
+            cerr<<"[INFO]   Not updating."<<endl;
           }
-          if (newSerial == serials[domain]) {
-            if (g_verbose) {
-              cerr<<"[INFO]   Not updating."<<endl;
-            }
-            continue;
-          }
-        } catch (runtime_error &e) {
-          cerr<<"[WARNING] Unable to get SOA serial update for '"<<domain<<"': "<<e.what()<<endl;
           continue;
         }
+      } catch (runtime_error &e) {
+        cerr<<"[WARNING] Unable to get SOA serial update for '"<<domain<<"': "<<e.what()<<endl;
+        continue;
       }
       // Now get the full zone!
       if (g_verbose) {
@@ -119,6 +135,7 @@ void updateThread() {
       }
       ComboAddress local = g_master.sin4.sin_family == AF_INET ? ComboAddress("0.0.0.0") : ComboAddress("::");
       TSIGTriplet tt;
+      shared_ptr<SOARecordContent> soa;
       try {
         AXFRRetriever axfr(g_master, domain, tt, &local);
         unsigned int nrecords=0;
@@ -132,77 +149,198 @@ void updateThread() {
             dr.d_name.makeUsRelative(domain);
             records.insert(dr);
             nrecords++;
+            if (dr.d_type == QType::SOA) {
+              soa = getRR<SOARecordContent>(dr);
+            }
           }
         }
         if (g_verbose) {
-          cerr<<"[INFO]   Done! Received "<<nrecords<<" records. Attempting to write to disk!"<<endl;
+          cerr<<"[INFO]    Done! Received "<<nrecords<<" records. Attempting to write to disk!"<<endl;
         }
         writeZoneToDisk(records, domain, dir);
         if (g_verbose) {
-          cerr<<"[INFO]   Done!"<<endl;
+          cerr<<"[INFO]    Done!"<<endl;
         }
       } catch (ResolverException &e) {
         cerr<<"[WARNING] Could not retrieve AXFR for '"<<domain<<"': "<<e.reason<<endl;
       } catch (runtime_error &e) {
         cerr<<"[WARNING] Could not save zone '"<<domain<<"' to disk: "<<e.what()<<endl;
       }
-      serials[domain] = getSerialsFromDir(dir);
       lastCheck[domain] = now;
+      {
+        std::lock_guard<std::mutex> guard(g_soas_mutex);
+        if (soa != nullptr) {
+          g_soas[domain] = soa;
+        }
+      }
     } /* for (const auto &domain : domains) */
     sleep(10);
   } /* while (true) */
 } /* updateThread */
 
+bool checkQuery(const MOADNSParser& mdp, const ComboAddress& saddr, const bool udp = true) {
+  vector<string> info_msg;
+
+  if (g_debug) {
+    cerr<<"[DEBUG] Had "<<mdp.d_qname<<"|"<<QType(mdp.d_qtype).getName()<<" query from "<<saddr.toStringWithPort()<<endl;
+  }
+
+  if (udp && mdp.d_qtype != QType::SOA && mdp.d_qtype != QType::IXFR) {
+    info_msg.push_back("QType is unsupported (" + QType(mdp.d_qtype).getName() + " is not in {SOA,IXFR}.");
+  }
+
+  if (!udp && mdp.d_qtype != QType::SOA && mdp.d_qtype != QType::IXFR && mdp.d_qtype != QType::AXFR) {
+    info_msg.push_back("QType is unsupported (" + QType(mdp.d_qtype).getName() + " is not in {SOA,IXFR,AXFR}.");
+  }
+
+  if (g_domains.find(mdp.d_qname) == g_domains.end()) {
+    info_msg.push_back("Domain name '" + mdp.d_qname.toLogString() + "' is not configured for distribution");
+  }
+
+  if (g_soas.find(mdp.d_qname) == g_soas.end()) {
+    info_msg.push_back("Domain has not been transferred yet");
+  }
+
+  if (!info_msg.empty()) {
+    cerr<<"[WARNING] Ignoring "<<mdp.d_qname<<"|"<<QType(mdp.d_qtype).getName()<<" query from "<<saddr.toStringWithPort();
+    if (g_verbose) {
+      cerr<<":";
+      for (const auto& s : info_msg) {
+        cerr<<endl<<"    "<<s;
+      }
+    }
+    cerr<<endl;
+    return false;
+  }
+
+  return true;
+}
+
+vector<uint8_t> makeSOAPacket(const MOADNSParser& mdp) {
+  vector<uint8_t> packet;
+  DNSPacketWriter pw(packet, mdp.d_qname, mdp.d_qtype);
+  pw.getHeader()->id = mdp.d_header.id;
+  pw.getHeader()->rd = mdp.d_header.rd;
+  pw.getHeader()->qr = 1;
+
+  pw.startRecord(mdp.d_qname, QType::SOA);
+  g_soas[mdp.d_qname]->toPacket(pw);
+  pw.commit();
+
+  return packet;
+}
+
+
 void handleUDPRequest(int fd, boost::any&) {
   // TODO make the buffer-size configurable
   char buf[4096];
-  struct sockaddr saddr;
+  ComboAddress saddr;
   socklen_t fromlen;
-  int res = recvfrom(fd, buf, sizeof(buf), 0, &saddr, &fromlen);
-  ComboAddress from(&saddr, fromlen);
+  int res = recvfrom(fd, buf, sizeof(buf), 0, (struct sockaddr*) &saddr, &fromlen);
 
   if (res == 0) {
-    cerr<<"[Warning] Got an empty message from "<<from.toStringWithPort()<<endl;
+    cerr<<"[WARNING] Got an empty message from "<<saddr.toStringWithPort()<<endl;
     return;
   }
 
   // TODO better error handling/logging
   if(res < 0) {
-    cerr<<"[Warning] Could not read message from "<<from.toStringWithPort()<<": "<<strerror(errno)<<endl;
+    cerr<<"[WARNING] Could not read message from "<<saddr.toStringWithPort()<<": "<<strerror(errno)<<endl;
+    return;
+  }
+
+  if (saddr == ComboAddress("0.0.0.0", 0)) {
+    cerr<<"[WARNING] Could not determine source of message"<<endl;
     return;
   }
 
   MOADNSParser mdp(true, string(buf, res));
-  vector<string> info_msg;
+  if (!checkQuery(mdp, saddr)) {
+    return;
+  }
 
-  if (g_debug) {
-    cerr<<"[Debug] Had "<<mdp.d_qname<<"|"<<QType(mdp.d_qtype).getName()<<" query from "<<from.toStringWithPort()<<endl;
+  // Let's not complicate this with IXFR over UDP (and looking if we need to truncate etc).
+  // Just send the current SOA and let the client try over TCP
+  auto packet = makeSOAPacket(mdp);
+  if(sendto(fd, &packet[0], packet.size(), 0, (struct sockaddr*) &saddr, fromlen) < 0) {
+    cerr<<"[WARNING] Could not send reply for "<<mdp.d_qname<<"|"<<QType(mdp.d_qtype).getName()<<" to "<<saddr.toStringWithPort()<<": "<<strerror(errno)<<endl;
   }
+  return;
+}
 
-  if (mdp.d_qtype != QType::SOA && mdp.d_qtype != QType::AXFR && mdp.d_qtype != QType::IXFR) {
-    info_msg.push_back("QType is unsupported (" + QType(mdp.d_qtype).getName() + " is not in {SOA,AXFR,IXFR}.");
+void handleTCPRequest(int fd, boost::any&) {
+  ComboAddress saddr;
+  socklen_t socklen;
+
+  int cfd = accept(fd, (sockaddr*) &saddr, &socklen);
+
+  if (cfd == -1) {
+    cerr<<"Accepting connection from "<<saddr.toStringWithPort()<<" failed: "<<strerror(errno)<<endl;
+    return;
   }
 
-  if (g_domains.find(mdp.d_qname) == g_domains.end()) {
-    info_msg.push_back("Domain name '" + mdp.d_qname.toLogString() + "' is not configured for distribution");
+  if (saddr == ComboAddress("0.0.0.0", 0)) {
+    cerr<<"[WARNING] Could not determine source of message"<<endl;
+    return;
   }
 
-  if (!info_msg.empty()) {
-    cerr<<"[Warning] Ignoring "<<mdp.d_qname<<"|"<<QType(mdp.d_qtype).getName()<<" query from "<<from.toStringWithPort();
-    if (g_verbose) {
-      cerr<<":";
-      for (const auto& s : info_msg) {
-        cerr<<endl<<"    "<<s;
-      }
+  char buf[4096];
+  // Discard the first 2 bytes (qlen)
+  int res;
+  res = recv(cfd, &buf, 2, 0);
+  if (res != 2) {
+    if (res == 0) { // Connection is closed
+      close(cfd);
+      return;
     }
-    cerr<<endl;
+    if (res == -1) {
+      cerr<<"[WARNING] Could not read message from "<<saddr.toStringWithPort()<<": "<<strerror(errno)<<endl;
+      close(cfd);
+      return;
+    }
+  }
+
+  res = recv(cfd, &buf, sizeof(buf), 0);
+
+  if (res == -1) {
+    cerr<<"[WARNING] Could not read message from "<<saddr.toStringWithPort()<<": "<<strerror(errno)<<endl;
+    close(cfd);
     return;
   }
-}
 
-void handleTCPRequest(int fd, boost::any&) {
-}
+  if (res == 0) { // Connection is closed
+    close(cfd);
+    return;
+  }
+
+  try {
+    MOADNSParser mdp(true, string(buf, res));
 
+    if (!checkQuery(mdp, saddr, false)) {
+      close(cfd);
+      return;
+    }
+
+    vector<uint8_t> packet;
+    if (mdp.d_qtype == QType::SOA) {
+      packet = makeSOAPacket(mdp);
+    }
+
+    char buf[2];
+    buf[0]=packet.size()/256;
+    buf[1]=packet.size()%256;
+
+    int send = writen2(cfd, buf, 2);
+    send += writen2(cfd, &packet[0], packet.size());
+    shutdown(cfd, 2);
+  } catch (MOADNSException &e) {
+    cerr<<"[WARNING] Could not parse DNS packet from "<<saddr.toStringWithPort()<<": "<<e.what()<<endl;
+  } catch (runtime_error &e) {
+    cerr<<"[WARNING] Could not write reply to "<<saddr.toStringWithPort()<<": "<<e.what()<<endl;
+  }
+  // bye!
+  close(cfd);
+}
 
 int main(int argc, char** argv) {
   po::options_description desc("IXFR distribution tool");
@@ -255,7 +393,7 @@ int main(int argc, char** argv) {
       try {
         listen_addresses.push_back(ComboAddress(addr, 53));
       } catch(PDNSException &e) {
-        cerr<<"[Error] listen-address '"<<addr<<"' is not an IP address: "<<e.reason<<endl;
+        cerr<<"[ERROR] listen-address '"<<addr<<"' is not an IP address: "<<e.reason<<endl;
         had_error = true;
       }
     }
@@ -264,12 +402,12 @@ int main(int argc, char** argv) {
   try {
     g_master = ComboAddress(g_vm["server-address"].as<string>(), 53);
   } catch(PDNSException &e) {
-    cerr<<"[Error] server-address '"<<g_vm["server-address"].as<string>()<<"' is not an IP address: "<<e.reason<<endl;
+    cerr<<"[ERROR] server-address '"<<g_vm["server-address"].as<string>()<<"' is not an IP address: "<<e.reason<<endl;
     had_error = true;
   }
 
   if (!g_vm.count("domains")) {
-    cerr<<"[Error] No domain(s) specified!"<<endl;
+    cerr<<"[ERROR] No domain(s) specified!"<<endl;
     had_error = true;
   }
 
@@ -277,7 +415,7 @@ int main(int argc, char** argv) {
     try {
       g_domains.insert(DNSName(domain));
     } catch (PDNSException &e) {
-      cerr<<"[Error] '"<<domain<<"' is not a valid domain name: "<<e.reason<<endl;
+      cerr<<"[ERROR] '"<<domain<<"' is not a valid domain name: "<<e.reason<<endl;
       had_error = true;
     }
   }
@@ -286,7 +424,7 @@ int main(int argc, char** argv) {
     // Create UDP socket
     int s = socket(addr.sin4.sin_family, SOCK_DGRAM, 0);
     if (s < 0) {
-      cerr<<"[Error] Unable to create socket: "<<strerror(errno)<<endl;
+      cerr<<"[ERROR] Unable to create socket: "<<strerror(errno)<<endl;
       had_error = true;
       continue;
     }
@@ -294,7 +432,7 @@ int main(int argc, char** argv) {
     setNonBlocking(s);
 
     if (bind(s, (sockaddr*) &addr, addr.getSocklen()) < 0) {
-      cerr<<"[Error] Unable to bind to "<<addr.toStringWithPort()<<": "<<strerror(errno)<<endl;
+      cerr<<"[ERROR] Unable to bind to "<<addr.toStringWithPort()<<": "<<strerror(errno)<<endl;
       had_error = true;
       continue;
     }
@@ -305,7 +443,7 @@ int main(int argc, char** argv) {
     int t = socket(addr.sin4.sin_family, SOCK_STREAM, 0);
 
     if (t < 0) {
-      cerr<<"[Error] Unable to create socket: "<<strerror(errno)<<endl;
+      cerr<<"[ERROR] Unable to create socket: "<<strerror(errno)<<endl;
       had_error = true;
       continue;
     }
@@ -313,13 +451,13 @@ int main(int argc, char** argv) {
     setNonBlocking(t);
 
     if (bind(t, (sockaddr*) &addr, addr.getSocklen()) < 0) {
-      cerr<<"[Error] Unable to bind to "<<addr.toStringWithPort()<<": "<<strerror(errno)<<endl;
+      cerr<<"[ERROR] Unable to bind to "<<addr.toStringWithPort()<<": "<<strerror(errno)<<endl;
       had_error = true;
     }
 
     // TODO Make backlog configurable?
     if (listen(t, 30) < 0) {
-      cerr<<"[Error] Unable to listen on "<<addr.toStringWithPort()<<": "<<strerror(errno)<<endl;
+      cerr<<"[ERROR] Unable to listen on "<<addr.toStringWithPort()<<": "<<strerror(errno)<<endl;
       had_error = true;
       continue;
     }
@@ -341,12 +479,12 @@ int main(int argc, char** argv) {
   // TODO read from urandom (perhaps getrandom(2)?
   dns_random_init("0123456789abcdef");
 
-  // Updater thread (TODO: actually thread it :))
-  // TODO use mplexer?
-  // updateThread();
+  pthread_t qtid;
+
+  cout<<"[INFO] IXFR distributor starting up!"<<endl;
+
+  pthread_create(&qtid, 0, updateThread, 0);
 
-  // start loop
-  cout<<"IXFR distributor starting up!"<<endl;
   struct timeval now;
   for(;;) {
     gettimeofday(&now, 0);
index d21e107a12275d52433a5fb71260ad994b6ce57a..42b5906fa10d638ea617a226da4afa70d01a6a9c 100644 (file)
@@ -148,3 +148,20 @@ void loadZoneFromDisk(records_t& records, const string& fname, const DNSName& zo
     throw runtime_error("Zone not complete!");
   }
 }
+
+/*
+ * Load the zone `zone` from `fname` and put the first found SOA into `soa`
+ * Does NOT check for nullptr
+ */
+void loadSOAFromDisk(const DNSName& zone, const string& fname, shared_ptr<SOARecordContent>& soa)
+{
+  ZoneParserTNG zpt(fname, zone);
+  DNSResourceRecord rr;
+
+  while(zpt.get(rr)) {
+    if (rr.qtype == QType::SOA) {
+      soa = getRR<SOARecordContent>(DNSRecord(rr));
+      return;
+    }
+  }
+}
index 201089cba66a58cbe4f5cc1e8a6d119fc47637f6..f787f6dde4f6c583ce6f12582b223ca2665da410 100644 (file)
@@ -54,3 +54,4 @@ uint32_t getSerialsFromDir(const std::string& dir);
 uint32_t getSerialFromRecords(const records_t& records, DNSRecord& soaret);
 void writeZoneToDisk(const records_t& records, const DNSName& zone, const std::string& directory);
 void loadZoneFromDisk(records_t& records, const string& fname, const DNSName& zone);
+void loadSOAFromDisk(const DNSName& zone, const string& fname, shared_ptr<SOARecordContent>& soa);