From cfe4b6550c997e940c977440af482cdd9bb7570d Mon Sep 17 00:00:00 2001 From: Remi Gacogne Date: Tue, 20 Mar 2018 15:13:15 +0100 Subject: [PATCH] dnsdist: Keep the number of entries in the ring, add tests and stats --- pdns/dnsdist-dynblocks.hh | 10 +- pdns/dnsdist-lua-inspection.cc | 20 +- pdns/dnsdist-rings.hh | 77 +++++-- pdns/dnsdistdist/Makefile.am | 1 + pdns/dnsdistdist/test-dnsdistrings_cc.cc | 262 +++++++++++++++++++++++ 5 files changed, 332 insertions(+), 38 deletions(-) create mode 100644 pdns/dnsdistdist/test-dnsdistrings_cc.cc diff --git a/pdns/dnsdist-dynblocks.hh b/pdns/dnsdist-dynblocks.hh index 5b70b4240..2df9ebbb5 100644 --- a/pdns/dnsdist-dynblocks.hh +++ b/pdns/dnsdist-dynblocks.hh @@ -124,16 +124,10 @@ public: size_t entriesCount = 0; if (hasQueryRules()) { - for (const auto& shard : g_rings.d_shards) { - std::lock_guard rl(shard->queryLock); - entriesCount += shard->queryRing.size(); - } + entriesCount += g_rings.getNumberOfQueryEntries(); } if (hasResponseRules()) { - for (const auto& shard : g_rings.d_shards) { - std::lock_guard rl(shard->respLock); - entriesCount += shard->respRing.size(); - } + entriesCount += g_rings.getNumberOfResponseEntries(); } counts.reserve(entriesCount); diff --git a/pdns/dnsdist-lua-inspection.cc b/pdns/dnsdist-lua-inspection.cc index ad58203ed..cc115a40f 100644 --- a/pdns/dnsdist-lua-inspection.cc +++ b/pdns/dnsdist-lua-inspection.cc @@ -156,13 +156,7 @@ static counts_t exceedRespGen(unsigned int rate, int seconds, std::function rl(g_rings.d_shards[idx]->respLock); - total += g_rings.d_shards[idx]->respRing.size(); - } - - counts.reserve(total); + counts.reserve(g_rings.getNumberOfResponseEntries()); for (size_t idx = 0; idx < g_rings.getNumberOfShards(); idx++) { std::lock_guard rl(g_rings.d_shards[idx]->respLock); @@ -191,13 +185,7 @@ static counts_t exceedQueryGen(unsigned int rate, int seconds, std::function rl(g_rings.d_shards[idx]->queryLock); - total += g_rings.d_shards[idx]->queryRing.size(); - } - - counts.reserve(total); + counts.reserve(g_rings.getNumberOfQueryEntries()); for (size_t idx = 0; idx < g_rings.getNumberOfShards(); idx++) { std::lock_guard rl(g_rings.d_shards[idx]->queryLock); @@ -419,17 +407,17 @@ void setupLuaInspection() std::vector qr; std::vector rr; + qr.reserve(g_rings.getNumberOfQueryEntries()); + rr.reserve(g_rings.getNumberOfResponseEntries()); for (size_t idx = 0; idx < g_rings.getNumberOfShards(); idx++) { { std::lock_guard rl(g_rings.d_shards[idx]->queryLock); - qr.resize(qr.size() + g_rings.d_shards[idx]->queryRing.size()); for (const auto& entry : g_rings.d_shards[idx]->queryRing) { qr.push_back(entry); } } { std::lock_guard rl(g_rings.d_shards[idx]->respLock); - rr.resize(rr.size() + g_rings.d_shards[idx]->respRing.size()); for (const auto& entry : g_rings.d_shards[idx]->respRing) { rr.push_back(entry); } diff --git a/pdns/dnsdist-rings.hh b/pdns/dnsdist-rings.hh index 71e295ddd..b9f37654a 100644 --- a/pdns/dnsdist-rings.hh +++ b/pdns/dnsdist-rings.hh @@ -62,7 +62,7 @@ struct Rings { std::mutex respLock; }; - Rings(size_t capacity=10000, size_t numberOfShards=1, size_t nbLockTries=5): d_numberOfShards(numberOfShards), d_nbLockTries(nbLockTries) + Rings(size_t capacity=10000, size_t numberOfShards=1, size_t nbLockTries=5, bool keepLockingStats=false): d_blockingQueryInserts(0), d_blockingResponseInserts(0), d_deferredQueryInserts(0), d_deferredResponseInserts(0), d_nbQueryEntries(0), d_nbResponseEntries(0), d_currentShardId(0), d_numberOfShards(numberOfShards), d_nbLockTries(nbLockTries), d_keepLockingStats(keepLockingStats) { setCapacity(capacity, numberOfShards); if (numberOfShards <= 1) { @@ -108,41 +108,67 @@ struct Rings { return d_numberOfShards; } + size_t getNumberOfQueryEntries() const + { + return d_nbQueryEntries; + } + + size_t getNumberOfResponseEntries() const + { + return d_nbResponseEntries; + } + void insertQuery(const struct timespec& when, const ComboAddress& requestor, const DNSName& name, uint16_t qtype, uint16_t size, const struct dnsheader& dh) { for (size_t idx = 0; idx < d_nbLockTries; idx++) { - auto shardId = getShardId(); - std::unique_lock wl(d_shards[shardId]->queryLock, std::try_to_lock); + auto& shard = getOneShard(); + std::unique_lock wl(shard->queryLock, std::try_to_lock); if (wl.owns_lock()) { - d_shards[shardId]->queryRing.push_back({when, requestor, name, size, qtype, dh}); + insertQueryLocked(shard, when, requestor, name, qtype, size, dh); return; } + if (d_keepLockingStats) { + d_deferredQueryInserts++; + } } /* out of luck, let's just wait */ - auto shardId = getShardId(); - std::lock_guard wl(d_shards[shardId]->queryLock); - d_shards[shardId]->queryRing.push_back({when, requestor, name, size, qtype, dh}); + if (d_keepLockingStats) { + d_blockingResponseInserts++; + } + auto& shard = getOneShard(); + std::lock_guard wl(shard->queryLock); + insertQueryLocked(shard, when, requestor, name, qtype, size, dh); } void insertResponse(const struct timespec& when, const ComboAddress& requestor, const DNSName& name, uint16_t qtype, unsigned int usec, unsigned int size, const struct dnsheader& dh, const ComboAddress& backend) { for (size_t idx = 0; idx < d_nbLockTries; idx++) { - auto shardId = getShardId(); - std::unique_lock wl(d_shards[shardId]->respLock, std::try_to_lock); + auto& shard = getOneShard(); + std::unique_lock wl(shard->respLock, std::try_to_lock); if (wl.owns_lock()) { - d_shards[shardId]->respRing.push_back({when, requestor, name, qtype, usec, size, dh, backend}); + insertResponseLocked(shard, when, requestor, name, qtype, usec, size, dh, backend); return; } + if (d_keepLockingStats) { + d_deferredResponseInserts++; + } } /* out of luck, let's just wait */ - auto shardId = getShardId(); - std::lock_guard wl(d_shards[shardId]->respLock); - d_shards[shardId]->respRing.push_back({when, requestor, name, qtype, usec, size, dh, backend}); + if (d_keepLockingStats) { + d_blockingResponseInserts++; + } + auto& shard = getOneShard(); + std::lock_guard wl(shard->respLock); + insertResponseLocked(shard, when, requestor, name, qtype, usec, size, dh, backend); } std::vector > d_shards; + std::atomic d_blockingQueryInserts; + std::atomic d_blockingResponseInserts; + std::atomic d_deferredQueryInserts; + std::atomic d_deferredResponseInserts; private: size_t getShardId() @@ -150,11 +176,34 @@ private: return (d_currentShardId++ % d_numberOfShards); } + std::unique_ptr& getOneShard() + { + return d_shards[getShardId()]; + } + + void insertQueryLocked(std::unique_ptr& shard, const struct timespec& when, const ComboAddress& requestor, const DNSName& name, uint16_t qtype, uint16_t size, const struct dnsheader& dh) + { + if (!shard->queryRing.full()) { + d_nbQueryEntries++; + } + shard->queryRing.push_back({when, requestor, name, size, qtype, dh}); + } + + void insertResponseLocked(std::unique_ptr& shard, const struct timespec& when, const ComboAddress& requestor, const DNSName& name, uint16_t qtype, unsigned int usec, unsigned int size, const struct dnsheader& dh, const ComboAddress& backend) + { + if (!shard->respRing.full()) { + d_nbResponseEntries++; + } + shard->respRing.push_back({when, requestor, name, qtype, usec, size, dh, backend}); + } + + std::atomic d_nbQueryEntries; + std::atomic d_nbResponseEntries; std::atomic d_currentShardId; size_t d_numberOfShards; size_t d_nbLockTries = 5; - + bool d_keepLockingStats{false}; }; extern Rings g_rings; diff --git a/pdns/dnsdistdist/Makefile.am b/pdns/dnsdistdist/Makefile.am index be6bd927b..cc7853186 100644 --- a/pdns/dnsdistdist/Makefile.am +++ b/pdns/dnsdistdist/Makefile.am @@ -210,6 +210,7 @@ testrunner_SOURCES = \ test-base64_cc.cc \ test-dnsdist_cc.cc \ test-dnsdistpacketcache_cc.cc \ + test-dnsdistrings_cc.cc \ test-dnscrypt_cc.cc \ test-iputils_hh.cc \ dnsdist.hh \ diff --git a/pdns/dnsdistdist/test-dnsdistrings_cc.cc b/pdns/dnsdistdist/test-dnsdistrings_cc.cc new file mode 100644 index 000000000..4b4f5da0b --- /dev/null +++ b/pdns/dnsdistdist/test-dnsdistrings_cc.cc @@ -0,0 +1,262 @@ + +#define BOOST_TEST_DYN_LINK +#define BOOST_TEST_NO_MAIN + +#include +#include + +#include "dnsdist-rings.hh" +#include "gettime.hh" + +BOOST_AUTO_TEST_SUITE(dnsdistrings_cc) + +static void test_ring(size_t maxEntries, size_t numberOfShards, size_t nbLockTries) +{ + Rings rings(maxEntries, numberOfShards, nbLockTries); + size_t entriesPerShard = maxEntries / numberOfShards; + + BOOST_CHECK_EQUAL(rings.getNumberOfShards(), numberOfShards); + BOOST_CHECK_EQUAL(rings.getNumberOfQueryEntries(), 0); + BOOST_CHECK_EQUAL(rings.getNumberOfResponseEntries(), 0); + BOOST_CHECK_EQUAL(rings.d_shards.size(), rings.getNumberOfShards()); + for (const auto& shard : rings.d_shards) { + BOOST_CHECK(shard != nullptr); + } + + dnsheader dh; + DNSName qname("rings.powerdns.com."); + ComboAddress requestor1("192.0.2.1"); + ComboAddress requestor2("192.0.2.2"); + uint16_t qtype = QType::AAAA; + uint16_t size = 42; + struct timespec now; + gettime(&now); + + /* fill the query ring */ + for (size_t idx = 0; idx < maxEntries; idx++) { + rings.insertQuery(now, requestor1, qname, qtype, size, dh); + } + BOOST_CHECK_EQUAL(rings.getNumberOfQueryEntries(), maxEntries); + BOOST_CHECK_EQUAL(rings.getNumberOfResponseEntries(), 0); + for (const auto& shard : rings.d_shards) { + BOOST_CHECK_EQUAL(shard->queryRing.size(), entriesPerShard); + for (const auto& entry : shard->queryRing) { + BOOST_CHECK_EQUAL(entry.name, qname); + BOOST_CHECK_EQUAL(entry.qtype, qtype); + BOOST_CHECK_EQUAL(entry.size, size); + BOOST_CHECK_EQUAL(entry.when.tv_sec, now.tv_sec); + BOOST_CHECK_EQUAL(entry.requestor.toStringWithPort(), requestor1.toStringWithPort()); + } + } + + /* push enough queries to get rid of the existing ones */ + for (size_t idx = 0; idx < maxEntries; idx++) { + rings.insertQuery(now, requestor2, qname, qtype, size, dh); + } + BOOST_CHECK_EQUAL(rings.getNumberOfQueryEntries(), maxEntries); + BOOST_CHECK_EQUAL(rings.getNumberOfResponseEntries(), 0); + for (const auto& shard : rings.d_shards) { + BOOST_CHECK_EQUAL(shard->queryRing.size(), entriesPerShard); + for (const auto& entry : shard->queryRing) { + BOOST_CHECK_EQUAL(entry.name, qname); + BOOST_CHECK_EQUAL(entry.qtype, qtype); + BOOST_CHECK_EQUAL(entry.size, size); + BOOST_CHECK_EQUAL(entry.when.tv_sec, now.tv_sec); + BOOST_CHECK_EQUAL(entry.requestor.toStringWithPort(), requestor2.toStringWithPort()); + } + } + + ComboAddress server("192.0.2.42"); + unsigned int latency = 100; + + /* fill the response ring */ + for (size_t idx = 0; idx < maxEntries; idx++) { + rings.insertResponse(now, requestor1, qname, qtype, latency, size, dh, server); + } + BOOST_CHECK_EQUAL(rings.getNumberOfQueryEntries(), maxEntries); + BOOST_CHECK_EQUAL(rings.getNumberOfResponseEntries(), maxEntries); + for (const auto& shard : rings.d_shards) { + BOOST_CHECK_EQUAL(shard->respRing.size(), entriesPerShard); + for (const auto& entry : shard->respRing) { + BOOST_CHECK_EQUAL(entry.name, qname); + BOOST_CHECK_EQUAL(entry.qtype, qtype); + BOOST_CHECK_EQUAL(entry.size, size); + BOOST_CHECK_EQUAL(entry.when.tv_sec, now.tv_sec); + BOOST_CHECK_EQUAL(entry.requestor.toStringWithPort(), requestor1.toStringWithPort()); + BOOST_CHECK_EQUAL(entry.usec, latency); + BOOST_CHECK_EQUAL(entry.ds.toStringWithPort(), server.toStringWithPort()); + } + } + + /* push enough responses to get rid of the existing ones */ + for (size_t idx = 0; idx < maxEntries; idx++) { + rings.insertResponse(now, requestor2, qname, qtype, latency, size, dh, server); + } + BOOST_CHECK_EQUAL(rings.getNumberOfQueryEntries(), maxEntries); + BOOST_CHECK_EQUAL(rings.getNumberOfResponseEntries(), maxEntries); + for (const auto& shard : rings.d_shards) { + BOOST_CHECK_EQUAL(shard->respRing.size(), entriesPerShard); + for (const auto& entry : shard->respRing) { + BOOST_CHECK_EQUAL(entry.name, qname); + BOOST_CHECK_EQUAL(entry.qtype, qtype); + BOOST_CHECK_EQUAL(entry.size, size); + BOOST_CHECK_EQUAL(entry.when.tv_sec, now.tv_sec); + BOOST_CHECK_EQUAL(entry.requestor.toStringWithPort(), requestor2.toStringWithPort()); + BOOST_CHECK_EQUAL(entry.usec, latency); + BOOST_CHECK_EQUAL(entry.ds.toStringWithPort(), server.toStringWithPort()); + } + } +} + + +BOOST_AUTO_TEST_CASE(test_Rings_Simple) { + + /* 5 entries over 1 shard */ + test_ring(5, 1, 0); + /* 500 entries over 10 shards */ + test_ring(500, 10, 0); + /* 5000 entries over 100 shards, max 5 try-lock attempts */ + test_ring(500, 100, 5); +} + +static void ringReaderThread(Rings& rings, std::atomic& done, size_t numberOfEntries, uint16_t qtype) +{ + size_t iterationsDone = 0; + + while (done == false) { + size_t numberOfQueries = 0; + size_t numberOfResponses = 0; + + for (auto& shard : rings.d_shards) { + { + std::lock_guard rl(shard->queryLock); + for(const auto& c : shard->queryRing) { + numberOfQueries++; + // BOOST_CHECK* is slow as hell.. + if(c.qtype != qtype) { + cerr<<"Invalid query QType!"< rl(shard->respLock); + for(const auto& c : shard->respRing) { + if(c.qtype != qtype) { + cerr<<"Invalid response QType!"< done; + std::vector writerThreads; + std::thread readerThread(ringReaderThread, std::ref(rings), std::ref(done), numberOfEntries, qtype); + + /* we need to overcommit a bit to account for the fact that due to contention, + we might not perfectly distribute the entries over the shards, + so some of them might get full while other still have some place left */ + size_t insertionsPerThread = (1.2 * numberOfEntries) / numberOfWriterThreads; + for (size_t idx = 0; idx < numberOfWriterThreads; idx++) { + writerThreads.push_back(std::thread(ringWriterThread, std::ref(rings), insertionsPerThread, query, response)); + } + + /* wait for the writers to be finished */ + for (auto& t : writerThreads) { + t.join(); + } + + /* we can stop the reader thread now */ + done = true; + readerThread.join(); + + BOOST_CHECK_EQUAL(rings.getNumberOfShards(), numberOfShards); + BOOST_CHECK_EQUAL(rings.getNumberOfQueryEntries(), numberOfEntries); + BOOST_CHECK_EQUAL(rings.getNumberOfResponseEntries(), numberOfEntries); + BOOST_CHECK_EQUAL(rings.d_shards.size(), rings.getNumberOfShards()); + + size_t totalQueries = 0; + size_t totalResponses = 0; + for (const auto& shard : rings.d_shards) { + BOOST_CHECK_EQUAL(shard->queryRing.size(), entriesPerShard); + totalQueries += shard->queryRing.size(); + for (const auto& entry : shard->queryRing) { + BOOST_CHECK_EQUAL(entry.name, qname); + BOOST_CHECK_EQUAL(entry.qtype, qtype); + BOOST_CHECK_EQUAL(entry.size, size); + BOOST_CHECK_EQUAL(entry.when.tv_sec, now.tv_sec); + BOOST_CHECK_EQUAL(entry.requestor.toStringWithPort(), requestor.toStringWithPort()); + } + BOOST_CHECK_EQUAL(shard->respRing.size(), entriesPerShard); + totalResponses += shard->respRing.size(); + for (const auto& entry : shard->respRing) { + BOOST_CHECK_EQUAL(entry.name, qname); + BOOST_CHECK_EQUAL(entry.qtype, qtype); + BOOST_CHECK_EQUAL(entry.size, size); + BOOST_CHECK_EQUAL(entry.when.tv_sec, now.tv_sec); + BOOST_CHECK_EQUAL(entry.requestor.toStringWithPort(), requestor.toStringWithPort()); + BOOST_CHECK_EQUAL(entry.usec, latency); + BOOST_CHECK_EQUAL(entry.ds.toStringWithPort(), server.toStringWithPort()); + } + } + BOOST_CHECK_EQUAL(rings.getNumberOfQueryEntries(), totalQueries); + BOOST_CHECK_EQUAL(rings.getNumberOfResponseEntries(), totalResponses); +#if 0 + cerr<<"Done "<<(insertionsPerThread*numberOfWriterThreads)<<" insertions"<