using namespace nod;
using namespace boost::filesystem;
-std::mutex NODDB::d_cachedir_mutex;
+// PersistentSBF Implementation
+
+std::mutex PersistentSBF::d_cachedir_mutex;
// This looks for an old (per-thread) snapshot. The first one it finds,
// it restores from that. Then immediately snapshots with the current thread id,// before removing the old snapshot
// In this way, we can have per-thread SBFs, but still snapshot and restore.
// The mutex has to be static because we can't have multiple (i.e. per-thread)
// instances iterating and writing to the cache dir at the same time
-bool NODDB::init(bool ignore_pid) {
+bool PersistentSBF::init(bool ignore_pid) {
if (d_init)
return false;
- std::lock_guard<std::mutex> lock(NODDB::d_cachedir_mutex);
+ std::lock_guard<std::mutex> lock(d_cachedir_mutex);
if (d_cachedir.length()) {
path p(d_cachedir);
try {
if (exists(p) && is_directory(p)) {
path newest_file;
std::time_t newest_time=time(nullptr);
- Regex file_regex(".*\\." + bf_suffix + "$");
+ Regex file_regex(d_prefix + ".*\\." + bf_suffix + "$");
for (directory_iterator i(p); i!=directory_iterator(); ++i) {
if (is_regular_file(i->path()) &&
file_regex.match(i->path().filename().string())) {
d_sbf.restore(infile);
infile.close();
// now dump it out again with new thread id & process id
- snapshotCurrent();
+ snapshotCurrent(std::this_thread::get_id());
// Remove the old file we just read to stop proliferation
remove(newest_file);
}
return true;
}
-void NODDB::housekeepingThread()
-{
- setThreadName("pdns-r/NOD-hk");
- for (;;) {
- sleep(d_snapshot_interval);
- {
- snapshotCurrent();
- }
- }
-}
-
-void NODDB::setCacheDir(const std::string& cachedir)
+void PersistentSBF::setCacheDir(const std::string& cachedir)
{
if (!d_init) {
path p(cachedir);
}
}
+// Dump the SBF to a file
+// To spend the least amount of time inside the mutex, we dump to an
+// intermediate stringstream, otherwise the lock would be waiting for
+// file IO to complete
+bool PersistentSBF::snapshotCurrent(std::thread::id tid)
+{
+ if (d_cachedir.length()) {
+ path p(d_cachedir);
+ path f(d_cachedir);
+ std::stringstream ss;
+ ss << d_prefix << "_" << tid;
+ f /= ss.str() + "_" + std::to_string(getpid()) + "." + bf_suffix;
+ if (exists(p) && is_directory(p)) {
+ try {
+ std::ofstream ofile;
+ std::stringstream ss;
+ ofile.open(f.string(), std::ios::out | std::ios::binary);
+ {
+ // only lock while dumping to a stringstream
+ std::lock_guard<std::mutex> lock(d_sbf_mutex);
+ d_sbf.dump(ss);
+ }
+ // Now write it out to the file
+ ofile << ss.str();
+
+ if (ofile.fail())
+ throw std::runtime_error("Failed to write to file:" + f.string());
+ return true;
+ }
+ catch (const std::runtime_error& e) {
+ g_log<<Logger::Warning<<"NODDB snapshot: Cannot write file: " << e.what() << endl;
+ }
+ }
+ else {
+ g_log<<Logger::Warning<<"NODDB snapshot: Cannot write file: " << f.string() << endl;
+ }
+ }
+ return false;
+}
+
+// NODDB Implementation
+
+void NODDB::housekeepingThread(std::thread::id tid)
+{
+ setThreadName("pdns-r/NOD-hk");
+ for (;;) {
+ sleep(d_snapshot_interval);
+ {
+ snapshotCurrent(tid);
+ }
+ }
+}
+
bool NODDB::isNewDomain(const std::string& domain)
{
DNSName dname(domain);
std::string dname_lc = dname.toDNSStringLC();
// The only time this should block is when snapshotting from the
// housekeeping thread
- std::lock_guard<std::mutex> lock(NODDB::d_sbf_mutex);
// the result is always the inverse of what is returned by the SBF
- return !d_sbf.testAndAdd(dname_lc);
+ return !d_psbf.testAndAdd(dname_lc);
}
bool NODDB::isNewDomainWithParent(const std::string& domain, std::string& observed)
void NODDB::addDomain(const DNSName& dname)
{
- // The only time this should block is when snapshotting from the
- // housekeeping thread
std::string native_domain = dname.toDNSStringLC();
- std::lock_guard<std::mutex> lock(NODDB::d_sbf_mutex);
- d_sbf.add(native_domain);
+ d_psbf.add(native_domain);
}
void NODDB::addDomain(const std::string& domain)
addDomain(dname);
}
-// Dump the SBF to a file
-// To spend the least amount of time inside the mutex, we dump to an
-// intermediate stringstream, otherwise the lock would be waiting for
-// file IO to complete
-bool NODDB::snapshotCurrent()
+// UniqueResponseDB Implementation
+bool UniqueResponseDB::isUniqueResponse(const std::string& response)
{
- if (d_cachedir.length()) {
- path p(d_cachedir);
- path f(d_cachedir);
- std::stringstream ss;
- ss << std::this_thread::get_id();
- f /= ss.str() + "_" + std::to_string(getpid()) + "." + bf_suffix;
- if (exists(p) && is_directory(p)) {
- try {
- std::ofstream ofile;
- std::stringstream ss;
- ofile.open(f.string(), std::ios::out | std::ios::binary);
- {
- // only lock while dumping to a stringstream
- std::lock_guard<std::mutex> lock(NODDB::d_sbf_mutex);
- d_sbf.dump(ss);
- }
- // Now write it out to the file
- ofile << ss.str();
+ return !d_psbf.testAndAdd(response);
+}
- if (ofile.fail())
- throw std::runtime_error("Failed to write to file:" + f.string());
- return true;
- }
- catch (const std::runtime_error& e) {
- g_log<<Logger::Warning<<"NODDB snapshot: Cannot write file: " << e.what() << endl;
- }
- }
- else {
- g_log<<Logger::Warning<<"NODDB snapshot: Cannot write file: " << f.string() << endl;
+void UniqueResponseDB::addResponse(const std::string& response)
+{
+ d_psbf.add(response);
+}
+
+void UniqueResponseDB::housekeepingThread(std::thread::id tid)
+{
+ setThreadName("pdns-r/UDR-hk");
+ for (;;) {
+ sleep(d_snapshot_interval);
+ {
+ snapshotCurrent(tid);
}
}
- return false;
}
#pragma once
#include <atomic>
#include <mutex>
+#include <thread>
#include "dnsname.hh"
#include "stable-bloom.hh"
const uint8_t num_dec = 10;
const unsigned int snapshot_interval_default = 600;
const std::string bf_suffix = "bf";
+ const std::string sbf_prefix = "sbf";
- // This class is not designed to be shared between threads
+ // Theses classes are not designed to be shared between threads
// Use a new instance per-thread, e.g. using thread local storage
// Synchronization (at the class level) is still needed for reading from
// and writing to the cache dir
// Synchronization (at the instance level) is needed when snapshotting
+ class PersistentSBF {
+ public:
+ bool init(bool ignore_pid=false);
+ void setPrefix(const std::string& prefix) { d_prefix = prefix; } // Added to filenames in cachedir
+ void setCacheDir(const std::string& cachedir);
+ bool snapshotCurrent(std::thread::id tid); // Write the current file out to disk
+ void add(const std::string& data) {
+ // The only time this should block is when snapshotting
+ std::lock_guard<std::mutex> lock(d_sbf_mutex);
+ d_sbf.add(data);
+ }
+ bool test(const std::string& data) { return d_sbf.test(data); }
+ bool testAndAdd(const std::string& data) {
+ // The only time this should block is when snapshotting
+ std::lock_guard<std::mutex> lock(d_sbf_mutex);
+ return d_sbf.testAndAdd(data);
+ }
+ private:
+ bool d_init{false};
+ bf::stableBF d_sbf{fp_rate, num_cells, num_dec}; // Stable Bloom Filter
+ std::string d_cachedir;
+ std::string d_prefix = sbf_prefix;
+ std::mutex d_sbf_mutex; // Per-instance mutex for snapshots
+ static std::mutex d_cachedir_mutex; // One mutex for all instances of this class
+ };
+
class NODDB {
public:
NODDB() {}
// Set ignore_pid to true if you don't mind loading files
// created by the current process
- bool init(bool ignore_pid=false); // Initialize the NODDB
+ bool init(bool ignore_pid=false) {
+ d_psbf.setPrefix("nod");
+ return d_psbf.init(ignore_pid);
+ }
bool isNewDomain(const std::string& domain); // Returns true if newly observed domain
bool isNewDomain(const DNSName& dname); // As above
bool isNewDomainWithParent(const std::string& domain, std::string& observed); // Returns true if newly observed domain, in which case "observed" contains the parent domain which *was* observed (or "" if domain is . or no parent domains observed)
void addDomain(const DNSName& dname); // You need to add this to refresh frequently used domains
void addDomain(const std::string& domain); // As above
void setSnapshotInterval(unsigned int secs) { d_snapshot_interval = secs; }
- void setCacheDir(const std::string& cachedir);
- bool snapshotCurrent(); // Write the current file out to disk
- bool pruneCacheFiles(); // Remove oldest cache files
- static void startHousekeepingThread(std::shared_ptr<NODDB> noddbp) {
- noddbp->housekeepingThread();
+ void setCacheDir(const std::string& cachedir) { d_psbf.setCacheDir(cachedir); }
+ bool snapshotCurrent(std::thread::id tid) { return d_psbf.snapshotCurrent(tid); }
+ static void startHousekeepingThread(std::shared_ptr<NODDB> noddbp, std::thread::id tid) {
+ noddbp->housekeepingThread(tid);
}
private:
- void housekeepingThread();
- bool d_init{false};
- bf::stableBF d_sbf{fp_rate, num_cells, num_dec}; // Stable Bloom Filter
- unsigned int d_snapshot_interval{snapshot_interval_default}; // Number seconds between snapshots
- std::string d_cachedir;
- std::mutex d_sbf_mutex; // Per-instance mutex for snapshots
- static std::mutex d_cachedir_mutex; // One mutex for all instances of this class
+ PersistentSBF d_psbf;
+ unsigned int d_snapshot_interval{snapshot_interval_default}; // Number seconds between snapshots
+ void housekeepingThread(std::thread::id tid);
+ };
+
+ class UniqueResponseDB {
+ public:
+ UniqueResponseDB() {}
+ bool init(bool ignore_pid=false) {
+ d_psbf.setPrefix("udr");
+ return d_psbf.init(ignore_pid);
+ }
+ bool isUniqueResponse(const std::string& response);
+ void addResponse(const std::string& response);
+ void setSnapshotInterval(unsigned int secs) { d_snapshot_interval = secs; }
+ void setCacheDir(const std::string& cachedir) { d_psbf.setCacheDir(cachedir); }
+ bool snapshotCurrent(std::thread::id tid) { return d_psbf.snapshotCurrent(tid); }
+ static void startHousekeepingThread(std::shared_ptr<UniqueResponseDB> udrdbp, std::thread::id tid) {
+ udrdbp->housekeepingThread(tid);
+ }
+ private:
+ PersistentSBF d_psbf;
+ unsigned int d_snapshot_interval{snapshot_interval_default}; // Number seconds between snapshots
+ void housekeepingThread(std::thread::id tid);
};
}