]> granicus.if.org Git - pdns/commitdiff
Refactor NOD code
authorNeil Cook <neil.cook@noware.co.uk>
Fri, 19 Oct 2018 11:16:52 +0000 (11:16 +0000)
committerNeil Cook <neil.cook@noware.co.uk>
Wed, 24 Oct 2018 12:43:24 +0000 (12:43 +0000)
- Move core Persistent Stable Bloom Filter code into a separate class
- Create new class for handling Unique DNS Responses (UDR)
- Make threadID consistent by requiring it's passed to housekeeping method

pdns/nod.cc
pdns/nod.hh

index 3a00b48bcb962ceaf67a2072022ba4584f8a6840..c87eb9753ecc3b25c78c1f7b1977119ccab147fb 100644 (file)
 using namespace nod;
 using namespace boost::filesystem;
 
-std::mutex NODDB::d_cachedir_mutex;
+// PersistentSBF Implementation 
+
+std::mutex PersistentSBF::d_cachedir_mutex;
 
 // This looks for an old (per-thread) snapshot. The first one it finds,
 // it restores from that. Then immediately snapshots with the current thread id,// before removing the old snapshot
 // In this way, we can have per-thread SBFs, but still snapshot and restore.
 // The mutex has to be static because we can't have multiple (i.e. per-thread)
 // instances iterating and writing to the cache dir at the same time
-bool NODDB::init(bool ignore_pid) {
+bool PersistentSBF::init(bool ignore_pid) {
   if (d_init)
     return false;
 
-  std::lock_guard<std::mutex> lock(NODDB::d_cachedir_mutex);
+  std::lock_guard<std::mutex> lock(d_cachedir_mutex);
   if (d_cachedir.length()) {
     path p(d_cachedir);
     try {
       if (exists(p) && is_directory(p)) {
         path newest_file;
         std::time_t newest_time=time(nullptr);
-        Regex file_regex(".*\\." + bf_suffix + "$");
+        Regex file_regex(d_prefix + ".*\\." + bf_suffix + "$");
         for (directory_iterator i(p); i!=directory_iterator(); ++i) {
           if (is_regular_file(i->path()) &&
               file_regex.match(i->path().filename().string())) {
@@ -79,7 +81,7 @@ bool NODDB::init(bool ignore_pid) {
             d_sbf.restore(infile);
             infile.close();
             // now dump it out again with new thread id & process id
-            snapshotCurrent();
+            snapshotCurrent(std::this_thread::get_id());
             // Remove the old file we just read to stop proliferation
             remove(newest_file);
           }
@@ -98,18 +100,7 @@ bool NODDB::init(bool ignore_pid) {
   return true;
 }
 
-void NODDB::housekeepingThread()
-{
-  setThreadName("pdns-r/NOD-hk");
-  for (;;) {
-    sleep(d_snapshot_interval);
-    {
-      snapshotCurrent();
-    }
-  }
-}
-
-void NODDB::setCacheDir(const std::string& cachedir)
+void PersistentSBF::setCacheDir(const std::string& cachedir)
 {
   if (!d_init) {
     path p(cachedir);
@@ -121,6 +112,59 @@ void NODDB::setCacheDir(const std::string& cachedir)
   }
 }
 
+// Dump the SBF to a file
+// To spend the least amount of time inside the mutex, we dump to an
+// intermediate stringstream, otherwise the lock would be waiting for
+// file IO to complete
+bool PersistentSBF::snapshotCurrent(std::thread::id tid)
+{
+  if (d_cachedir.length()) {
+    path p(d_cachedir);
+    path f(d_cachedir);
+    std::stringstream ss;
+    ss << d_prefix << "_" << tid;
+    f /= ss.str() + "_" + std::to_string(getpid()) + "." + bf_suffix;
+    if (exists(p) && is_directory(p)) {
+      try {
+        std::ofstream ofile;
+        std::stringstream ss;
+        ofile.open(f.string(), std::ios::out | std::ios::binary);
+        {
+          // only lock while dumping to a stringstream
+          std::lock_guard<std::mutex> lock(d_sbf_mutex);
+          d_sbf.dump(ss);
+        }
+        // Now write it out to the file
+        ofile << ss.str();
+
+        if (ofile.fail())
+          throw std::runtime_error("Failed to write to file:" + f.string());
+        return true;
+      }
+      catch (const std::runtime_error& e) {
+        g_log<<Logger::Warning<<"NODDB snapshot: Cannot write file: " << e.what() << endl;
+      }
+    }
+    else {
+      g_log<<Logger::Warning<<"NODDB snapshot: Cannot write file: " << f.string() << endl;
+    }
+  }
+  return false;
+}
+
+// NODDB Implementation
+
+void NODDB::housekeepingThread(std::thread::id tid)
+{
+  setThreadName("pdns-r/NOD-hk");
+  for (;;) {
+    sleep(d_snapshot_interval);
+    {
+      snapshotCurrent(tid);
+    }
+  }
+}
+
 bool NODDB::isNewDomain(const std::string& domain)
 {
   DNSName dname(domain);
@@ -132,9 +176,8 @@ bool NODDB::isNewDomain(const DNSName& dname)
   std::string dname_lc = dname.toDNSStringLC();
   // The only time this should block is when snapshotting from the
   // housekeeping thread
-  std::lock_guard<std::mutex> lock(NODDB::d_sbf_mutex);
   // the result is always the inverse of what is returned by the SBF
-  return !d_sbf.testAndAdd(dname_lc);
+  return !d_psbf.testAndAdd(dname_lc);
 }
 
 bool NODDB::isNewDomainWithParent(const std::string& domain, std::string& observed)
@@ -160,11 +203,8 @@ bool NODDB::isNewDomainWithParent(const DNSName& dname, std::string& observed)
 
 void NODDB::addDomain(const DNSName& dname)
 {
-  // The only time this should block is when snapshotting from the
-  // housekeeping thread
   std::string native_domain = dname.toDNSStringLC();
-  std::lock_guard<std::mutex> lock(NODDB::d_sbf_mutex);
-  d_sbf.add(native_domain);
+  d_psbf.add(native_domain);
 }
 
 void NODDB::addDomain(const std::string& domain)
@@ -173,42 +213,24 @@ void NODDB::addDomain(const std::string& domain)
   addDomain(dname);
 }
 
-// Dump the SBF to a file
-// To spend the least amount of time inside the mutex, we dump to an
-// intermediate stringstream, otherwise the lock would be waiting for
-// file IO to complete
-bool NODDB::snapshotCurrent()
+// UniqueResponseDB Implementation
+bool UniqueResponseDB::isUniqueResponse(const std::string& response)
 {
-  if (d_cachedir.length()) {
-    path p(d_cachedir);
-    path f(d_cachedir);
-    std::stringstream ss;
-    ss << std::this_thread::get_id();
-    f /= ss.str() + "_" + std::to_string(getpid()) + "." + bf_suffix;
-    if (exists(p) && is_directory(p)) {
-      try {
-        std::ofstream ofile;
-        std::stringstream ss;
-        ofile.open(f.string(), std::ios::out | std::ios::binary);
-        {
-          // only lock while dumping to a stringstream
-          std::lock_guard<std::mutex> lock(NODDB::d_sbf_mutex);
-          d_sbf.dump(ss);
-        }
-        // Now write it out to the file
-        ofile << ss.str();
+  return !d_psbf.testAndAdd(response);
+}
 
-        if (ofile.fail())
-          throw std::runtime_error("Failed to write to file:" + f.string());
-        return true;
-      }
-      catch (const std::runtime_error& e) {
-        g_log<<Logger::Warning<<"NODDB snapshot: Cannot write file: " << e.what() << endl;
-      }
-    }
-    else {
-      g_log<<Logger::Warning<<"NODDB snapshot: Cannot write file: " << f.string() << endl;
+void UniqueResponseDB::addResponse(const std::string& response)
+{
+  d_psbf.add(response);
+}
+
+void UniqueResponseDB::housekeepingThread(std::thread::id tid)
+{
+  setThreadName("pdns-r/UDR-hk");
+  for (;;) {
+    sleep(d_snapshot_interval);
+    {
+      snapshotCurrent(tid);
     }
   }
-  return false;
 }
index 8f1a056d8cf2dfbb63b2e3a93a192a1e05dcdc44..84e7ae65913a605e81aee3f2a8440a1e57846e7e 100644 (file)
@@ -22,6 +22,7 @@
 #pragma once
 #include <atomic>
 #include <mutex>
+#include <thread>
 #include "dnsname.hh"
 #include "stable-bloom.hh"
 
@@ -31,18 +32,48 @@ namespace nod {
   const uint8_t num_dec = 10;
   const unsigned int snapshot_interval_default = 600;
   const std::string bf_suffix = "bf";
+  const std::string sbf_prefix = "sbf";
 
-  // This class is not designed to be shared between threads
+  // Theses classes are not designed to be shared between threads
   // Use a new instance per-thread, e.g. using thread local storage
   // Synchronization (at the class level) is still needed for reading from
   // and writing to the cache dir
   // Synchronization (at the instance level) is needed when snapshotting
+  class PersistentSBF {
+  public:
+    bool init(bool ignore_pid=false);
+    void setPrefix(const std::string& prefix) { d_prefix = prefix; } // Added to filenames in cachedir
+    void setCacheDir(const std::string& cachedir);
+    bool snapshotCurrent(std::thread::id tid); // Write the current file out to disk
+    void add(const std::string& data) { 
+      // The only time this should block is when snapshotting
+      std::lock_guard<std::mutex> lock(d_sbf_mutex);
+      d_sbf.add(data); 
+    }
+    bool test(const std::string& data) { return d_sbf.test(data); }
+    bool testAndAdd(const std::string& data) { 
+      // The only time this should block is when snapshotting
+      std::lock_guard<std::mutex> lock(d_sbf_mutex);
+      return d_sbf.testAndAdd(data); 
+    }
+  private:
+    bool d_init{false};
+    bf::stableBF d_sbf{fp_rate, num_cells, num_dec}; // Stable Bloom Filter
+    std::string d_cachedir;
+    std::string d_prefix = sbf_prefix;
+    std::mutex d_sbf_mutex; // Per-instance mutex for snapshots
+    static std::mutex d_cachedir_mutex; // One mutex for all instances of this class
+  };
+
   class NODDB {
   public:
     NODDB() {}
     // Set ignore_pid to true if you don't mind loading files
     // created by the current process
-    bool init(bool ignore_pid=false); // Initialize the NODDB
+    bool init(bool ignore_pid=false) { 
+      d_psbf.setPrefix("nod");
+      return d_psbf.init(ignore_pid); 
+    }
     bool isNewDomain(const std::string& domain); // Returns true if newly observed domain
     bool isNewDomain(const DNSName& dname); // As above
     bool isNewDomainWithParent(const std::string& domain, std::string& observed); // Returns true if newly observed domain, in which case "observed" contains the parent domain which *was* observed (or "" if domain is . or no parent domains observed)
@@ -50,20 +81,36 @@ namespace nod {
     void addDomain(const DNSName& dname); // You need to add this to refresh frequently used domains
     void addDomain(const std::string& domain); // As above
     void setSnapshotInterval(unsigned int secs) { d_snapshot_interval = secs; }
-    void setCacheDir(const std::string& cachedir);
-    bool snapshotCurrent(); // Write the current file out to disk
-    bool pruneCacheFiles(); // Remove oldest cache files
-    static void startHousekeepingThread(std::shared_ptr<NODDB> noddbp) {
-      noddbp->housekeepingThread();
+    void setCacheDir(const std::string& cachedir) { d_psbf.setCacheDir(cachedir); }
+    bool snapshotCurrent(std::thread::id tid) { return d_psbf.snapshotCurrent(tid); }
+    static void startHousekeepingThread(std::shared_ptr<NODDB> noddbp, std::thread::id tid) {
+      noddbp->housekeepingThread(tid);
     }
   private:
-    void housekeepingThread();
-    bool d_init{false};
-    bf::stableBF d_sbf{fp_rate, num_cells, num_dec}; // Stable Bloom Filter
-    unsigned int d_snapshot_interval{snapshot_interval_default}; // Number seconds between snapshots
-    std::string d_cachedir;
-    std::mutex d_sbf_mutex; // Per-instance mutex for snapshots
-    static std::mutex d_cachedir_mutex; // One mutex for all instances of this class
+    PersistentSBF d_psbf;
+    unsigned int d_snapshot_interval{snapshot_interval_default}; // Number seconds between snapshots    
+    void housekeepingThread(std::thread::id tid);
+  };
+
+  class UniqueResponseDB {
+  public:
+    UniqueResponseDB() {}
+    bool init(bool ignore_pid=false) {
+      d_psbf.setPrefix("udr");
+      return d_psbf.init(ignore_pid); 
+    }
+    bool isUniqueResponse(const std::string& response);
+    void addResponse(const std::string& response);
+    void setSnapshotInterval(unsigned int secs) { d_snapshot_interval = secs; }
+    void setCacheDir(const std::string& cachedir) { d_psbf.setCacheDir(cachedir); }
+    bool snapshotCurrent(std::thread::id tid) { return d_psbf.snapshotCurrent(tid); }
+    static void startHousekeepingThread(std::shared_ptr<UniqueResponseDB> udrdbp, std::thread::id tid) {
+      udrdbp->housekeepingThread(tid);
+    }
+  private:
+    PersistentSBF d_psbf;
+    unsigned int d_snapshot_interval{snapshot_interval_default}; // Number seconds between snapshots    
+    void housekeepingThread(std::thread::id tid); 
   };
 
 }