From: Neil Cook Date: Fri, 19 Oct 2018 11:16:52 +0000 (+0000) Subject: Refactor NOD code X-Git-Tag: dnsdist-1.3.3~16^2~12 X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=343ae951ece80cab4a034444cc4a589eaa57cae9;p=pdns Refactor NOD code - Move core Persistent Stable Bloom Filter code into a separate class - Create new class for handling Unique DNS Responses (UDR) - Make threadID consistent by requiring it's passed to housekeeping method --- diff --git a/pdns/nod.cc b/pdns/nod.cc index 3a00b48bc..c87eb9753 100644 --- a/pdns/nod.cc +++ b/pdns/nod.cc @@ -36,25 +36,27 @@ using namespace nod; using namespace boost::filesystem; -std::mutex NODDB::d_cachedir_mutex; +// PersistentSBF Implementation + +std::mutex PersistentSBF::d_cachedir_mutex; // This looks for an old (per-thread) snapshot. The first one it finds, // it restores from that. Then immediately snapshots with the current thread id,// before removing the old snapshot // In this way, we can have per-thread SBFs, but still snapshot and restore. // The mutex has to be static because we can't have multiple (i.e. per-thread) // instances iterating and writing to the cache dir at the same time -bool NODDB::init(bool ignore_pid) { +bool PersistentSBF::init(bool ignore_pid) { if (d_init) return false; - std::lock_guard lock(NODDB::d_cachedir_mutex); + std::lock_guard lock(d_cachedir_mutex); if (d_cachedir.length()) { path p(d_cachedir); try { if (exists(p) && is_directory(p)) { path newest_file; std::time_t newest_time=time(nullptr); - Regex file_regex(".*\\." + bf_suffix + "$"); + Regex file_regex(d_prefix + ".*\\." + bf_suffix + "$"); for (directory_iterator i(p); i!=directory_iterator(); ++i) { if (is_regular_file(i->path()) && file_regex.match(i->path().filename().string())) { @@ -79,7 +81,7 @@ bool NODDB::init(bool ignore_pid) { d_sbf.restore(infile); infile.close(); // now dump it out again with new thread id & process id - snapshotCurrent(); + snapshotCurrent(std::this_thread::get_id()); // Remove the old file we just read to stop proliferation remove(newest_file); } @@ -98,18 +100,7 @@ bool NODDB::init(bool ignore_pid) { return true; } -void NODDB::housekeepingThread() -{ - setThreadName("pdns-r/NOD-hk"); - for (;;) { - sleep(d_snapshot_interval); - { - snapshotCurrent(); - } - } -} - -void NODDB::setCacheDir(const std::string& cachedir) +void PersistentSBF::setCacheDir(const std::string& cachedir) { if (!d_init) { path p(cachedir); @@ -121,6 +112,59 @@ void NODDB::setCacheDir(const std::string& cachedir) } } +// Dump the SBF to a file +// To spend the least amount of time inside the mutex, we dump to an +// intermediate stringstream, otherwise the lock would be waiting for +// file IO to complete +bool PersistentSBF::snapshotCurrent(std::thread::id tid) +{ + if (d_cachedir.length()) { + path p(d_cachedir); + path f(d_cachedir); + std::stringstream ss; + ss << d_prefix << "_" << tid; + f /= ss.str() + "_" + std::to_string(getpid()) + "." + bf_suffix; + if (exists(p) && is_directory(p)) { + try { + std::ofstream ofile; + std::stringstream ss; + ofile.open(f.string(), std::ios::out | std::ios::binary); + { + // only lock while dumping to a stringstream + std::lock_guard lock(d_sbf_mutex); + d_sbf.dump(ss); + } + // Now write it out to the file + ofile << ss.str(); + + if (ofile.fail()) + throw std::runtime_error("Failed to write to file:" + f.string()); + return true; + } + catch (const std::runtime_error& e) { + g_log< lock(NODDB::d_sbf_mutex); // the result is always the inverse of what is returned by the SBF - return !d_sbf.testAndAdd(dname_lc); + return !d_psbf.testAndAdd(dname_lc); } bool NODDB::isNewDomainWithParent(const std::string& domain, std::string& observed) @@ -160,11 +203,8 @@ bool NODDB::isNewDomainWithParent(const DNSName& dname, std::string& observed) void NODDB::addDomain(const DNSName& dname) { - // The only time this should block is when snapshotting from the - // housekeeping thread std::string native_domain = dname.toDNSStringLC(); - std::lock_guard lock(NODDB::d_sbf_mutex); - d_sbf.add(native_domain); + d_psbf.add(native_domain); } void NODDB::addDomain(const std::string& domain) @@ -173,42 +213,24 @@ void NODDB::addDomain(const std::string& domain) addDomain(dname); } -// Dump the SBF to a file -// To spend the least amount of time inside the mutex, we dump to an -// intermediate stringstream, otherwise the lock would be waiting for -// file IO to complete -bool NODDB::snapshotCurrent() +// UniqueResponseDB Implementation +bool UniqueResponseDB::isUniqueResponse(const std::string& response) { - if (d_cachedir.length()) { - path p(d_cachedir); - path f(d_cachedir); - std::stringstream ss; - ss << std::this_thread::get_id(); - f /= ss.str() + "_" + std::to_string(getpid()) + "." + bf_suffix; - if (exists(p) && is_directory(p)) { - try { - std::ofstream ofile; - std::stringstream ss; - ofile.open(f.string(), std::ios::out | std::ios::binary); - { - // only lock while dumping to a stringstream - std::lock_guard lock(NODDB::d_sbf_mutex); - d_sbf.dump(ss); - } - // Now write it out to the file - ofile << ss.str(); + return !d_psbf.testAndAdd(response); +} - if (ofile.fail()) - throw std::runtime_error("Failed to write to file:" + f.string()); - return true; - } - catch (const std::runtime_error& e) { - g_log< #include +#include #include "dnsname.hh" #include "stable-bloom.hh" @@ -31,18 +32,48 @@ namespace nod { const uint8_t num_dec = 10; const unsigned int snapshot_interval_default = 600; const std::string bf_suffix = "bf"; + const std::string sbf_prefix = "sbf"; - // This class is not designed to be shared between threads + // Theses classes are not designed to be shared between threads // Use a new instance per-thread, e.g. using thread local storage // Synchronization (at the class level) is still needed for reading from // and writing to the cache dir // Synchronization (at the instance level) is needed when snapshotting + class PersistentSBF { + public: + bool init(bool ignore_pid=false); + void setPrefix(const std::string& prefix) { d_prefix = prefix; } // Added to filenames in cachedir + void setCacheDir(const std::string& cachedir); + bool snapshotCurrent(std::thread::id tid); // Write the current file out to disk + void add(const std::string& data) { + // The only time this should block is when snapshotting + std::lock_guard lock(d_sbf_mutex); + d_sbf.add(data); + } + bool test(const std::string& data) { return d_sbf.test(data); } + bool testAndAdd(const std::string& data) { + // The only time this should block is when snapshotting + std::lock_guard lock(d_sbf_mutex); + return d_sbf.testAndAdd(data); + } + private: + bool d_init{false}; + bf::stableBF d_sbf{fp_rate, num_cells, num_dec}; // Stable Bloom Filter + std::string d_cachedir; + std::string d_prefix = sbf_prefix; + std::mutex d_sbf_mutex; // Per-instance mutex for snapshots + static std::mutex d_cachedir_mutex; // One mutex for all instances of this class + }; + class NODDB { public: NODDB() {} // Set ignore_pid to true if you don't mind loading files // created by the current process - bool init(bool ignore_pid=false); // Initialize the NODDB + bool init(bool ignore_pid=false) { + d_psbf.setPrefix("nod"); + return d_psbf.init(ignore_pid); + } bool isNewDomain(const std::string& domain); // Returns true if newly observed domain bool isNewDomain(const DNSName& dname); // As above bool isNewDomainWithParent(const std::string& domain, std::string& observed); // Returns true if newly observed domain, in which case "observed" contains the parent domain which *was* observed (or "" if domain is . or no parent domains observed) @@ -50,20 +81,36 @@ namespace nod { void addDomain(const DNSName& dname); // You need to add this to refresh frequently used domains void addDomain(const std::string& domain); // As above void setSnapshotInterval(unsigned int secs) { d_snapshot_interval = secs; } - void setCacheDir(const std::string& cachedir); - bool snapshotCurrent(); // Write the current file out to disk - bool pruneCacheFiles(); // Remove oldest cache files - static void startHousekeepingThread(std::shared_ptr noddbp) { - noddbp->housekeepingThread(); + void setCacheDir(const std::string& cachedir) { d_psbf.setCacheDir(cachedir); } + bool snapshotCurrent(std::thread::id tid) { return d_psbf.snapshotCurrent(tid); } + static void startHousekeepingThread(std::shared_ptr noddbp, std::thread::id tid) { + noddbp->housekeepingThread(tid); } private: - void housekeepingThread(); - bool d_init{false}; - bf::stableBF d_sbf{fp_rate, num_cells, num_dec}; // Stable Bloom Filter - unsigned int d_snapshot_interval{snapshot_interval_default}; // Number seconds between snapshots - std::string d_cachedir; - std::mutex d_sbf_mutex; // Per-instance mutex for snapshots - static std::mutex d_cachedir_mutex; // One mutex for all instances of this class + PersistentSBF d_psbf; + unsigned int d_snapshot_interval{snapshot_interval_default}; // Number seconds between snapshots + void housekeepingThread(std::thread::id tid); + }; + + class UniqueResponseDB { + public: + UniqueResponseDB() {} + bool init(bool ignore_pid=false) { + d_psbf.setPrefix("udr"); + return d_psbf.init(ignore_pid); + } + bool isUniqueResponse(const std::string& response); + void addResponse(const std::string& response); + void setSnapshotInterval(unsigned int secs) { d_snapshot_interval = secs; } + void setCacheDir(const std::string& cachedir) { d_psbf.setCacheDir(cachedir); } + bool snapshotCurrent(std::thread::id tid) { return d_psbf.snapshotCurrent(tid); } + static void startHousekeepingThread(std::shared_ptr udrdbp, std::thread::id tid) { + udrdbp->housekeepingThread(tid); + } + private: + PersistentSBF d_psbf; + unsigned int d_snapshot_interval{snapshot_interval_default}; // Number seconds between snapshots + void housekeepingThread(std::thread::id tid); }; }