]> granicus.if.org Git - pdns/commitdiff
dnsdist: Keep the number of entries in the ring, add tests and stats
authorRemi Gacogne <remi.gacogne@powerdns.com>
Tue, 20 Mar 2018 14:13:15 +0000 (15:13 +0100)
committerRemi Gacogne <remi.gacogne@powerdns.com>
Thu, 29 Mar 2018 09:37:52 +0000 (11:37 +0200)
pdns/dnsdist-dynblocks.hh
pdns/dnsdist-lua-inspection.cc
pdns/dnsdist-rings.hh
pdns/dnsdistdist/Makefile.am
pdns/dnsdistdist/test-dnsdistrings_cc.cc [new file with mode: 0644]

index 5b70b424076df4b9ab5913c4adf48caed00d6f92..2df9ebbb590f5cb120472347edfa4bca7fd4b993 100644 (file)
@@ -124,16 +124,10 @@ public:
 
     size_t entriesCount = 0;
     if (hasQueryRules()) {
-      for (const auto& shard : g_rings.d_shards) {
-        std::lock_guard<std::mutex> rl(shard->queryLock);
-        entriesCount += shard->queryRing.size();
-      }
+      entriesCount += g_rings.getNumberOfQueryEntries();
     }
     if (hasResponseRules()) {
-      for (const auto& shard : g_rings.d_shards) {
-        std::lock_guard<std::mutex> rl(shard->respLock);
-        entriesCount += shard->respRing.size();
-      }
+      entriesCount += g_rings.getNumberOfResponseEntries();
     }
     counts.reserve(entriesCount);
 
index ad58203eda2a31e95385a1504d05225cd75fd886..cc115a40fa0235f4fba89ddb20e54c45062e1142 100644 (file)
@@ -156,13 +156,7 @@ static counts_t exceedRespGen(unsigned int rate, int seconds, std::function<void
   cutoff = mintime = now;
   cutoff.tv_sec -= seconds;
 
-  size_t total = 0;
-  for (size_t idx = 0; idx < g_rings.getNumberOfShards(); idx++) {
-    std::lock_guard<std::mutex> rl(g_rings.d_shards[idx]->respLock);
-    total += g_rings.d_shards[idx]->respRing.size();
-  }
-
-  counts.reserve(total);
+  counts.reserve(g_rings.getNumberOfResponseEntries());
 
   for (size_t idx = 0; idx < g_rings.getNumberOfShards(); idx++) {
     std::lock_guard<std::mutex> rl(g_rings.d_shards[idx]->respLock);
@@ -191,13 +185,7 @@ static counts_t exceedQueryGen(unsigned int rate, int seconds, std::function<voi
   cutoff = mintime = now;
   cutoff.tv_sec -= seconds;
 
-  size_t total = 0;
-  for (size_t idx = 0; idx < g_rings.getNumberOfShards(); idx++) {
-    std::lock_guard<std::mutex> rl(g_rings.d_shards[idx]->queryLock);
-    total += g_rings.d_shards[idx]->queryRing.size();
-  }
-
-  counts.reserve(total);
+  counts.reserve(g_rings.getNumberOfQueryEntries());
 
   for (size_t idx = 0; idx < g_rings.getNumberOfShards(); idx++) {
     std::lock_guard<std::mutex> rl(g_rings.d_shards[idx]->queryLock);
@@ -419,17 +407,17 @@ void setupLuaInspection()
 
       std::vector<Rings::Query> qr;
       std::vector<Rings::Response> rr;
+      qr.reserve(g_rings.getNumberOfQueryEntries());
+      rr.reserve(g_rings.getNumberOfResponseEntries());
       for (size_t idx = 0; idx < g_rings.getNumberOfShards(); idx++) {
         {
           std::lock_guard<std::mutex> rl(g_rings.d_shards[idx]->queryLock);
-          qr.resize(qr.size() + g_rings.d_shards[idx]->queryRing.size());
           for (const auto& entry : g_rings.d_shards[idx]->queryRing) {
             qr.push_back(entry);
           }
         }
         {
           std::lock_guard<std::mutex> rl(g_rings.d_shards[idx]->respLock);
-          rr.resize(rr.size() + g_rings.d_shards[idx]->respRing.size());
           for (const auto& entry : g_rings.d_shards[idx]->respRing) {
             rr.push_back(entry);
           }
index 71e295dddc8591ce4cc6192ac0d8fb40e24e1fc1..b9f37654ab6e6d79981cfd27790809124cf59236 100644 (file)
@@ -62,7 +62,7 @@ struct Rings {
     std::mutex respLock;
   };
 
-  Rings(size_t capacity=10000, size_t numberOfShards=1, size_t nbLockTries=5): d_numberOfShards(numberOfShards), d_nbLockTries(nbLockTries)
+  Rings(size_t capacity=10000, size_t numberOfShards=1, size_t nbLockTries=5, bool keepLockingStats=false): d_blockingQueryInserts(0), d_blockingResponseInserts(0), d_deferredQueryInserts(0), d_deferredResponseInserts(0), d_nbQueryEntries(0), d_nbResponseEntries(0), d_currentShardId(0), d_numberOfShards(numberOfShards), d_nbLockTries(nbLockTries), d_keepLockingStats(keepLockingStats)
   {
     setCapacity(capacity, numberOfShards);
     if (numberOfShards <= 1) {
@@ -108,41 +108,67 @@ struct Rings {
     return d_numberOfShards;
   }
 
+  size_t getNumberOfQueryEntries() const
+  {
+    return d_nbQueryEntries;
+  }
+
+  size_t getNumberOfResponseEntries() const
+  {
+    return d_nbResponseEntries;
+  }
+
   void insertQuery(const struct timespec& when, const ComboAddress& requestor, const DNSName& name, uint16_t qtype, uint16_t size, const struct dnsheader& dh)
   {
     for (size_t idx = 0; idx < d_nbLockTries; idx++) {
-      auto shardId = getShardId();
-      std::unique_lock<std::mutex> wl(d_shards[shardId]->queryLock, std::try_to_lock);
+      auto& shard = getOneShard();
+      std::unique_lock<std::mutex> wl(shard->queryLock, std::try_to_lock);
       if (wl.owns_lock()) {
-        d_shards[shardId]->queryRing.push_back({when, requestor, name, size, qtype, dh});
+        insertQueryLocked(shard, when, requestor, name, qtype, size, dh);
         return;
       }
+      if (d_keepLockingStats) {
+        d_deferredQueryInserts++;
+      }
     }
 
     /* out of luck, let's just wait */
-    auto shardId = getShardId();
-    std::lock_guard<std::mutex> wl(d_shards[shardId]->queryLock);
-    d_shards[shardId]->queryRing.push_back({when, requestor, name, size, qtype, dh});
+    if (d_keepLockingStats) {
+      d_blockingResponseInserts++;
+    }
+    auto& shard = getOneShard();
+    std::lock_guard<std::mutex> wl(shard->queryLock);
+    insertQueryLocked(shard, when, requestor, name, qtype, size, dh);
   }
 
   void insertResponse(const struct timespec& when, const ComboAddress& requestor, const DNSName& name, uint16_t qtype, unsigned int usec, unsigned int size, const struct dnsheader& dh, const ComboAddress& backend)
   {
     for (size_t idx = 0; idx < d_nbLockTries; idx++) {
-      auto shardId = getShardId();
-      std::unique_lock<std::mutex> wl(d_shards[shardId]->respLock, std::try_to_lock);
+      auto& shard = getOneShard();
+      std::unique_lock<std::mutex> wl(shard->respLock, std::try_to_lock);
       if (wl.owns_lock()) {
-        d_shards[shardId]->respRing.push_back({when, requestor, name, qtype, usec, size, dh, backend});
+        insertResponseLocked(shard, when, requestor, name, qtype, usec, size, dh, backend);
         return;
       }
+      if (d_keepLockingStats) {
+        d_deferredResponseInserts++;
+      }
     }
 
     /* out of luck, let's just wait */
-    auto shardId = getShardId();
-    std::lock_guard<std::mutex> wl(d_shards[shardId]->respLock);
-    d_shards[shardId]->respRing.push_back({when, requestor, name, qtype, usec, size, dh, backend});
+    if (d_keepLockingStats) {
+      d_blockingResponseInserts++;
+    }
+    auto& shard = getOneShard();
+    std::lock_guard<std::mutex> wl(shard->respLock);
+    insertResponseLocked(shard, when, requestor, name, qtype, usec, size, dh, backend);
   }
 
   std::vector<std::unique_ptr<Shard> > d_shards;
+  std::atomic<uint64_t> d_blockingQueryInserts;
+  std::atomic<uint64_t> d_blockingResponseInserts;
+  std::atomic<uint64_t> d_deferredQueryInserts;
+  std::atomic<uint64_t> d_deferredResponseInserts;
 
 private:
   size_t getShardId()
@@ -150,11 +176,34 @@ private:
     return (d_currentShardId++ % d_numberOfShards);
   }
 
+  std::unique_ptr<Shard>& getOneShard()
+  {
+    return d_shards[getShardId()];
+  }
+
+  void insertQueryLocked(std::unique_ptr<Shard>& shard, const struct timespec& when, const ComboAddress& requestor, const DNSName& name, uint16_t qtype, uint16_t size, const struct dnsheader& dh)
+  {
+    if (!shard->queryRing.full()) {
+      d_nbQueryEntries++;
+    }
+    shard->queryRing.push_back({when, requestor, name, size, qtype, dh});
+  }
+
+  void insertResponseLocked(std::unique_ptr<Shard>& shard, const struct timespec& when, const ComboAddress& requestor, const DNSName& name, uint16_t qtype, unsigned int usec, unsigned int size, const struct dnsheader& dh, const ComboAddress& backend)
+  {
+    if (!shard->respRing.full()) {
+      d_nbResponseEntries++;
+    }
+    shard->respRing.push_back({when, requestor, name, qtype, usec, size, dh, backend});
+  }
+
+  std::atomic<size_t> d_nbQueryEntries;
+  std::atomic<size_t> d_nbResponseEntries;
   std::atomic<size_t> d_currentShardId;
 
   size_t d_numberOfShards;
   size_t d_nbLockTries = 5;
-
+  bool d_keepLockingStats{false};
 };
 
 extern Rings g_rings;
index be6bd927b21bd974d57944a9580e260e4035658b..cc7853186ed608e5655f215f70410edf72e3a95c 100644 (file)
@@ -210,6 +210,7 @@ testrunner_SOURCES = \
        test-base64_cc.cc \
        test-dnsdist_cc.cc \
        test-dnsdistpacketcache_cc.cc \
+       test-dnsdistrings_cc.cc \
        test-dnscrypt_cc.cc \
        test-iputils_hh.cc \
        dnsdist.hh \
diff --git a/pdns/dnsdistdist/test-dnsdistrings_cc.cc b/pdns/dnsdistdist/test-dnsdistrings_cc.cc
new file mode 100644 (file)
index 0000000..4b4f5da
--- /dev/null
@@ -0,0 +1,262 @@
+
+#define BOOST_TEST_DYN_LINK
+#define BOOST_TEST_NO_MAIN
+
+#include <thread>
+#include <boost/test/unit_test.hpp>
+
+#include "dnsdist-rings.hh"
+#include "gettime.hh"
+
+BOOST_AUTO_TEST_SUITE(dnsdistrings_cc)
+
+static void test_ring(size_t maxEntries, size_t numberOfShards, size_t nbLockTries)
+{
+  Rings rings(maxEntries, numberOfShards, nbLockTries);
+  size_t entriesPerShard = maxEntries / numberOfShards;
+
+  BOOST_CHECK_EQUAL(rings.getNumberOfShards(), numberOfShards);
+  BOOST_CHECK_EQUAL(rings.getNumberOfQueryEntries(), 0);
+  BOOST_CHECK_EQUAL(rings.getNumberOfResponseEntries(), 0);
+  BOOST_CHECK_EQUAL(rings.d_shards.size(), rings.getNumberOfShards());
+  for (const auto& shard : rings.d_shards) {
+    BOOST_CHECK(shard != nullptr);
+  }
+
+  dnsheader dh;
+  DNSName qname("rings.powerdns.com.");
+  ComboAddress requestor1("192.0.2.1");
+  ComboAddress requestor2("192.0.2.2");
+  uint16_t qtype = QType::AAAA;
+  uint16_t size = 42;
+  struct timespec now;
+  gettime(&now);
+
+  /* fill the query ring */
+  for (size_t idx = 0; idx < maxEntries; idx++) {
+    rings.insertQuery(now, requestor1, qname, qtype, size, dh);
+  }
+  BOOST_CHECK_EQUAL(rings.getNumberOfQueryEntries(), maxEntries);
+  BOOST_CHECK_EQUAL(rings.getNumberOfResponseEntries(), 0);
+  for (const auto& shard : rings.d_shards) {
+    BOOST_CHECK_EQUAL(shard->queryRing.size(), entriesPerShard);
+    for (const auto& entry : shard->queryRing) {
+      BOOST_CHECK_EQUAL(entry.name, qname);
+      BOOST_CHECK_EQUAL(entry.qtype, qtype);
+      BOOST_CHECK_EQUAL(entry.size, size);
+      BOOST_CHECK_EQUAL(entry.when.tv_sec, now.tv_sec);
+      BOOST_CHECK_EQUAL(entry.requestor.toStringWithPort(), requestor1.toStringWithPort());
+    }
+  }
+
+  /* push enough queries to get rid of the existing ones */
+  for (size_t idx = 0; idx < maxEntries; idx++) {
+    rings.insertQuery(now, requestor2, qname, qtype, size, dh);
+  }
+  BOOST_CHECK_EQUAL(rings.getNumberOfQueryEntries(), maxEntries);
+  BOOST_CHECK_EQUAL(rings.getNumberOfResponseEntries(), 0);
+  for (const auto& shard : rings.d_shards) {
+    BOOST_CHECK_EQUAL(shard->queryRing.size(), entriesPerShard);
+    for (const auto& entry : shard->queryRing) {
+      BOOST_CHECK_EQUAL(entry.name, qname);
+      BOOST_CHECK_EQUAL(entry.qtype, qtype);
+      BOOST_CHECK_EQUAL(entry.size, size);
+      BOOST_CHECK_EQUAL(entry.when.tv_sec, now.tv_sec);
+      BOOST_CHECK_EQUAL(entry.requestor.toStringWithPort(), requestor2.toStringWithPort());
+    }
+  }
+
+  ComboAddress server("192.0.2.42");
+  unsigned int latency = 100;
+
+  /* fill the response ring */
+  for (size_t idx = 0; idx < maxEntries; idx++) {
+    rings.insertResponse(now, requestor1, qname, qtype, latency, size, dh, server);
+  }
+  BOOST_CHECK_EQUAL(rings.getNumberOfQueryEntries(), maxEntries);
+  BOOST_CHECK_EQUAL(rings.getNumberOfResponseEntries(), maxEntries);
+  for (const auto& shard : rings.d_shards) {
+    BOOST_CHECK_EQUAL(shard->respRing.size(), entriesPerShard);
+    for (const auto& entry : shard->respRing) {
+      BOOST_CHECK_EQUAL(entry.name, qname);
+      BOOST_CHECK_EQUAL(entry.qtype, qtype);
+      BOOST_CHECK_EQUAL(entry.size, size);
+      BOOST_CHECK_EQUAL(entry.when.tv_sec, now.tv_sec);
+      BOOST_CHECK_EQUAL(entry.requestor.toStringWithPort(), requestor1.toStringWithPort());
+      BOOST_CHECK_EQUAL(entry.usec, latency);
+      BOOST_CHECK_EQUAL(entry.ds.toStringWithPort(), server.toStringWithPort());
+    }
+  }
+
+  /* push enough responses to get rid of the existing ones */
+  for (size_t idx = 0; idx < maxEntries; idx++) {
+    rings.insertResponse(now, requestor2, qname, qtype, latency, size, dh, server);
+  }
+  BOOST_CHECK_EQUAL(rings.getNumberOfQueryEntries(), maxEntries);
+  BOOST_CHECK_EQUAL(rings.getNumberOfResponseEntries(), maxEntries);
+  for (const auto& shard : rings.d_shards) {
+    BOOST_CHECK_EQUAL(shard->respRing.size(), entriesPerShard);
+    for (const auto& entry : shard->respRing) {
+      BOOST_CHECK_EQUAL(entry.name, qname);
+      BOOST_CHECK_EQUAL(entry.qtype, qtype);
+      BOOST_CHECK_EQUAL(entry.size, size);
+      BOOST_CHECK_EQUAL(entry.when.tv_sec, now.tv_sec);
+      BOOST_CHECK_EQUAL(entry.requestor.toStringWithPort(), requestor2.toStringWithPort());
+      BOOST_CHECK_EQUAL(entry.usec, latency);
+      BOOST_CHECK_EQUAL(entry.ds.toStringWithPort(), server.toStringWithPort());
+    }
+  }
+}
+
+
+BOOST_AUTO_TEST_CASE(test_Rings_Simple) {
+
+  /* 5 entries over 1 shard */
+  test_ring(5, 1, 0);
+  /* 500 entries over 10 shards */
+  test_ring(500, 10, 0);
+  /* 5000 entries over 100 shards, max 5 try-lock attempts */
+  test_ring(500, 100, 5);
+}
+
+static void ringReaderThread(Rings& rings, std::atomic<bool>& done, size_t numberOfEntries, uint16_t qtype)
+{
+  size_t iterationsDone = 0;
+
+  while (done == false) {
+    size_t numberOfQueries = 0;
+    size_t numberOfResponses = 0;
+
+    for (auto& shard : rings.d_shards) {
+      {
+        std::lock_guard<std::mutex> rl(shard->queryLock);
+        for(const auto& c : shard->queryRing) {
+          numberOfQueries++;
+          // BOOST_CHECK* is slow as hell..
+          if(c.qtype != qtype) {
+            cerr<<"Invalid query QType!"<<endl;
+            return;
+          }
+        }
+      }
+      {
+        std::lock_guard<std::mutex> rl(shard->respLock);
+        for(const auto& c : shard->respRing) {
+          if(c.qtype != qtype) {
+            cerr<<"Invalid response QType!"<<endl;
+            return;
+          }
+          numberOfResponses++;
+        }
+      }
+    }
+
+    BOOST_CHECK_LE(numberOfQueries, numberOfEntries);
+    BOOST_CHECK_LE(numberOfResponses, numberOfEntries);
+    iterationsDone++;
+    usleep(10000);
+  }
+
+  BOOST_CHECK_GT(iterationsDone, 1);
+#if 0
+  cerr<<"Done "<<iterationsDone<<" reading iterations"<<endl;
+#endif
+}
+
+static void ringWriterThread(Rings& rings, size_t numberOfEntries, const Rings::Query query, const Rings::Response response)
+{
+  for (size_t idx = 0; idx < numberOfEntries; idx++) {
+    rings.insertQuery(query.when, query.requestor, query.name, query.qtype, query.size, query.dh);
+    rings.insertResponse(response.when, response.requestor, response.name, response.qtype, response.usec, response.size, response.dh, response.ds);
+  }
+}
+
+BOOST_AUTO_TEST_CASE(test_Rings_Threaded) {
+  size_t numberOfEntries = 1000000;
+  size_t numberOfShards = 50;
+  size_t lockAttempts = 5;
+  size_t numberOfWriterThreads = 4;
+  size_t entriesPerShard = numberOfEntries / numberOfShards;
+
+  struct timespec now;
+  gettime(&now);
+  dnsheader dh;
+  dh.id = htons(4242);
+  dh.qr = 0;
+  dh.tc = 0;
+  dh.rd = 0;
+  dh.rcode = 0;
+  dh.qdcount = htons(1);
+  DNSName qname("rings.powerdns.com.");
+  ComboAddress requestor("192.0.2.1");
+  ComboAddress server("192.0.2.42");
+  unsigned int latency = 100;
+  uint16_t qtype = QType::AAAA;
+  uint16_t size = 42;
+
+  Rings rings(numberOfEntries, numberOfShards, lockAttempts, true);
+  Rings::Query query({now, requestor, qname, size, qtype, dh});
+  Rings::Response response({now, requestor, qname, qtype, latency, size, dh, server});
+
+  std::atomic<bool> done;
+  std::vector<std::thread> writerThreads;
+  std::thread readerThread(ringReaderThread, std::ref(rings), std::ref(done), numberOfEntries, qtype);
+
+  /* we need to overcommit a bit to account for the fact that due to contention,
+     we might not perfectly distribute the entries over the shards,
+     so some of them might get full while other still have some place left */
+  size_t insertionsPerThread = (1.2 * numberOfEntries) / numberOfWriterThreads;
+  for (size_t idx = 0; idx < numberOfWriterThreads; idx++) {
+    writerThreads.push_back(std::thread(ringWriterThread, std::ref(rings), insertionsPerThread, query, response));
+  }
+
+  /* wait for the writers to be finished */
+  for (auto& t : writerThreads) {
+    t.join();
+  }
+
+  /* we can stop the reader thread now */
+  done = true;
+  readerThread.join();
+
+  BOOST_CHECK_EQUAL(rings.getNumberOfShards(), numberOfShards);
+  BOOST_CHECK_EQUAL(rings.getNumberOfQueryEntries(), numberOfEntries);
+  BOOST_CHECK_EQUAL(rings.getNumberOfResponseEntries(), numberOfEntries);
+  BOOST_CHECK_EQUAL(rings.d_shards.size(), rings.getNumberOfShards());
+
+  size_t totalQueries = 0;
+  size_t totalResponses = 0;
+  for (const auto& shard : rings.d_shards) {
+    BOOST_CHECK_EQUAL(shard->queryRing.size(), entriesPerShard);
+    totalQueries += shard->queryRing.size();
+    for (const auto& entry : shard->queryRing) {
+      BOOST_CHECK_EQUAL(entry.name, qname);
+      BOOST_CHECK_EQUAL(entry.qtype, qtype);
+      BOOST_CHECK_EQUAL(entry.size, size);
+      BOOST_CHECK_EQUAL(entry.when.tv_sec, now.tv_sec);
+      BOOST_CHECK_EQUAL(entry.requestor.toStringWithPort(), requestor.toStringWithPort());
+    }
+    BOOST_CHECK_EQUAL(shard->respRing.size(), entriesPerShard);
+    totalResponses += shard->respRing.size();
+    for (const auto& entry : shard->respRing) {
+      BOOST_CHECK_EQUAL(entry.name, qname);
+      BOOST_CHECK_EQUAL(entry.qtype, qtype);
+      BOOST_CHECK_EQUAL(entry.size, size);
+      BOOST_CHECK_EQUAL(entry.when.tv_sec, now.tv_sec);
+      BOOST_CHECK_EQUAL(entry.requestor.toStringWithPort(), requestor.toStringWithPort());
+      BOOST_CHECK_EQUAL(entry.usec, latency);
+      BOOST_CHECK_EQUAL(entry.ds.toStringWithPort(), server.toStringWithPort());
+    }
+  }
+  BOOST_CHECK_EQUAL(rings.getNumberOfQueryEntries(), totalQueries);
+  BOOST_CHECK_EQUAL(rings.getNumberOfResponseEntries(), totalResponses);
+#if 0
+  cerr<<"Done "<<(insertionsPerThread*numberOfWriterThreads)<<" insertions"<<endl;
+  cerr<<"Got "<<rings.d_deferredQueryInserts<<" deferred query insertions"<<endl;
+  cerr<<"Got "<<rings.d_blockingQueryInserts<<" blocking query insertions"<<endl;
+  cerr<<"Got "<<rings.d_deferredResponseInserts<<" deferred response insertions"<<endl;
+  cerr<<"Got "<<rings.d_blockingResponseInserts<<" blocking response insertions"<<endl;
+#endif
+}
+
+BOOST_AUTO_TEST_SUITE_END()