std::mutex respLock;
};
- Rings(size_t capacity=10000, size_t numberOfShards=1, size_t nbLockTries=5): d_numberOfShards(numberOfShards), d_nbLockTries(nbLockTries)
+ Rings(size_t capacity=10000, size_t numberOfShards=1, size_t nbLockTries=5, bool keepLockingStats=false): d_blockingQueryInserts(0), d_blockingResponseInserts(0), d_deferredQueryInserts(0), d_deferredResponseInserts(0), d_nbQueryEntries(0), d_nbResponseEntries(0), d_currentShardId(0), d_numberOfShards(numberOfShards), d_nbLockTries(nbLockTries), d_keepLockingStats(keepLockingStats)
{
setCapacity(capacity, numberOfShards);
if (numberOfShards <= 1) {
return d_numberOfShards;
}
+ size_t getNumberOfQueryEntries() const
+ {
+ return d_nbQueryEntries;
+ }
+
+ size_t getNumberOfResponseEntries() const
+ {
+ return d_nbResponseEntries;
+ }
+
void insertQuery(const struct timespec& when, const ComboAddress& requestor, const DNSName& name, uint16_t qtype, uint16_t size, const struct dnsheader& dh)
{
for (size_t idx = 0; idx < d_nbLockTries; idx++) {
- auto shardId = getShardId();
- std::unique_lock<std::mutex> wl(d_shards[shardId]->queryLock, std::try_to_lock);
+ auto& shard = getOneShard();
+ std::unique_lock<std::mutex> wl(shard->queryLock, std::try_to_lock);
if (wl.owns_lock()) {
- d_shards[shardId]->queryRing.push_back({when, requestor, name, size, qtype, dh});
+ insertQueryLocked(shard, when, requestor, name, qtype, size, dh);
return;
}
+ if (d_keepLockingStats) {
+ d_deferredQueryInserts++;
+ }
}
/* out of luck, let's just wait */
- auto shardId = getShardId();
- std::lock_guard<std::mutex> wl(d_shards[shardId]->queryLock);
- d_shards[shardId]->queryRing.push_back({when, requestor, name, size, qtype, dh});
+ if (d_keepLockingStats) {
+ d_blockingResponseInserts++;
+ }
+ auto& shard = getOneShard();
+ std::lock_guard<std::mutex> wl(shard->queryLock);
+ insertQueryLocked(shard, when, requestor, name, qtype, size, dh);
}
void insertResponse(const struct timespec& when, const ComboAddress& requestor, const DNSName& name, uint16_t qtype, unsigned int usec, unsigned int size, const struct dnsheader& dh, const ComboAddress& backend)
{
for (size_t idx = 0; idx < d_nbLockTries; idx++) {
- auto shardId = getShardId();
- std::unique_lock<std::mutex> wl(d_shards[shardId]->respLock, std::try_to_lock);
+ auto& shard = getOneShard();
+ std::unique_lock<std::mutex> wl(shard->respLock, std::try_to_lock);
if (wl.owns_lock()) {
- d_shards[shardId]->respRing.push_back({when, requestor, name, qtype, usec, size, dh, backend});
+ insertResponseLocked(shard, when, requestor, name, qtype, usec, size, dh, backend);
return;
}
+ if (d_keepLockingStats) {
+ d_deferredResponseInserts++;
+ }
}
/* out of luck, let's just wait */
- auto shardId = getShardId();
- std::lock_guard<std::mutex> wl(d_shards[shardId]->respLock);
- d_shards[shardId]->respRing.push_back({when, requestor, name, qtype, usec, size, dh, backend});
+ if (d_keepLockingStats) {
+ d_blockingResponseInserts++;
+ }
+ auto& shard = getOneShard();
+ std::lock_guard<std::mutex> wl(shard->respLock);
+ insertResponseLocked(shard, when, requestor, name, qtype, usec, size, dh, backend);
}
std::vector<std::unique_ptr<Shard> > d_shards;
+ std::atomic<uint64_t> d_blockingQueryInserts;
+ std::atomic<uint64_t> d_blockingResponseInserts;
+ std::atomic<uint64_t> d_deferredQueryInserts;
+ std::atomic<uint64_t> d_deferredResponseInserts;
private:
size_t getShardId()
return (d_currentShardId++ % d_numberOfShards);
}
+ std::unique_ptr<Shard>& getOneShard()
+ {
+ return d_shards[getShardId()];
+ }
+
+ void insertQueryLocked(std::unique_ptr<Shard>& shard, const struct timespec& when, const ComboAddress& requestor, const DNSName& name, uint16_t qtype, uint16_t size, const struct dnsheader& dh)
+ {
+ if (!shard->queryRing.full()) {
+ d_nbQueryEntries++;
+ }
+ shard->queryRing.push_back({when, requestor, name, size, qtype, dh});
+ }
+
+ void insertResponseLocked(std::unique_ptr<Shard>& shard, const struct timespec& when, const ComboAddress& requestor, const DNSName& name, uint16_t qtype, unsigned int usec, unsigned int size, const struct dnsheader& dh, const ComboAddress& backend)
+ {
+ if (!shard->respRing.full()) {
+ d_nbResponseEntries++;
+ }
+ shard->respRing.push_back({when, requestor, name, qtype, usec, size, dh, backend});
+ }
+
+ std::atomic<size_t> d_nbQueryEntries;
+ std::atomic<size_t> d_nbResponseEntries;
std::atomic<size_t> d_currentShardId;
size_t d_numberOfShards;
size_t d_nbLockTries = 5;
-
+ bool d_keepLockingStats{false};
};
extern Rings g_rings;
--- /dev/null
+
+#define BOOST_TEST_DYN_LINK
+#define BOOST_TEST_NO_MAIN
+
+#include <thread>
+#include <boost/test/unit_test.hpp>
+
+#include "dnsdist-rings.hh"
+#include "gettime.hh"
+
+BOOST_AUTO_TEST_SUITE(dnsdistrings_cc)
+
+static void test_ring(size_t maxEntries, size_t numberOfShards, size_t nbLockTries)
+{
+ Rings rings(maxEntries, numberOfShards, nbLockTries);
+ size_t entriesPerShard = maxEntries / numberOfShards;
+
+ BOOST_CHECK_EQUAL(rings.getNumberOfShards(), numberOfShards);
+ BOOST_CHECK_EQUAL(rings.getNumberOfQueryEntries(), 0);
+ BOOST_CHECK_EQUAL(rings.getNumberOfResponseEntries(), 0);
+ BOOST_CHECK_EQUAL(rings.d_shards.size(), rings.getNumberOfShards());
+ for (const auto& shard : rings.d_shards) {
+ BOOST_CHECK(shard != nullptr);
+ }
+
+ dnsheader dh;
+ DNSName qname("rings.powerdns.com.");
+ ComboAddress requestor1("192.0.2.1");
+ ComboAddress requestor2("192.0.2.2");
+ uint16_t qtype = QType::AAAA;
+ uint16_t size = 42;
+ struct timespec now;
+ gettime(&now);
+
+ /* fill the query ring */
+ for (size_t idx = 0; idx < maxEntries; idx++) {
+ rings.insertQuery(now, requestor1, qname, qtype, size, dh);
+ }
+ BOOST_CHECK_EQUAL(rings.getNumberOfQueryEntries(), maxEntries);
+ BOOST_CHECK_EQUAL(rings.getNumberOfResponseEntries(), 0);
+ for (const auto& shard : rings.d_shards) {
+ BOOST_CHECK_EQUAL(shard->queryRing.size(), entriesPerShard);
+ for (const auto& entry : shard->queryRing) {
+ BOOST_CHECK_EQUAL(entry.name, qname);
+ BOOST_CHECK_EQUAL(entry.qtype, qtype);
+ BOOST_CHECK_EQUAL(entry.size, size);
+ BOOST_CHECK_EQUAL(entry.when.tv_sec, now.tv_sec);
+ BOOST_CHECK_EQUAL(entry.requestor.toStringWithPort(), requestor1.toStringWithPort());
+ }
+ }
+
+ /* push enough queries to get rid of the existing ones */
+ for (size_t idx = 0; idx < maxEntries; idx++) {
+ rings.insertQuery(now, requestor2, qname, qtype, size, dh);
+ }
+ BOOST_CHECK_EQUAL(rings.getNumberOfQueryEntries(), maxEntries);
+ BOOST_CHECK_EQUAL(rings.getNumberOfResponseEntries(), 0);
+ for (const auto& shard : rings.d_shards) {
+ BOOST_CHECK_EQUAL(shard->queryRing.size(), entriesPerShard);
+ for (const auto& entry : shard->queryRing) {
+ BOOST_CHECK_EQUAL(entry.name, qname);
+ BOOST_CHECK_EQUAL(entry.qtype, qtype);
+ BOOST_CHECK_EQUAL(entry.size, size);
+ BOOST_CHECK_EQUAL(entry.when.tv_sec, now.tv_sec);
+ BOOST_CHECK_EQUAL(entry.requestor.toStringWithPort(), requestor2.toStringWithPort());
+ }
+ }
+
+ ComboAddress server("192.0.2.42");
+ unsigned int latency = 100;
+
+ /* fill the response ring */
+ for (size_t idx = 0; idx < maxEntries; idx++) {
+ rings.insertResponse(now, requestor1, qname, qtype, latency, size, dh, server);
+ }
+ BOOST_CHECK_EQUAL(rings.getNumberOfQueryEntries(), maxEntries);
+ BOOST_CHECK_EQUAL(rings.getNumberOfResponseEntries(), maxEntries);
+ for (const auto& shard : rings.d_shards) {
+ BOOST_CHECK_EQUAL(shard->respRing.size(), entriesPerShard);
+ for (const auto& entry : shard->respRing) {
+ BOOST_CHECK_EQUAL(entry.name, qname);
+ BOOST_CHECK_EQUAL(entry.qtype, qtype);
+ BOOST_CHECK_EQUAL(entry.size, size);
+ BOOST_CHECK_EQUAL(entry.when.tv_sec, now.tv_sec);
+ BOOST_CHECK_EQUAL(entry.requestor.toStringWithPort(), requestor1.toStringWithPort());
+ BOOST_CHECK_EQUAL(entry.usec, latency);
+ BOOST_CHECK_EQUAL(entry.ds.toStringWithPort(), server.toStringWithPort());
+ }
+ }
+
+ /* push enough responses to get rid of the existing ones */
+ for (size_t idx = 0; idx < maxEntries; idx++) {
+ rings.insertResponse(now, requestor2, qname, qtype, latency, size, dh, server);
+ }
+ BOOST_CHECK_EQUAL(rings.getNumberOfQueryEntries(), maxEntries);
+ BOOST_CHECK_EQUAL(rings.getNumberOfResponseEntries(), maxEntries);
+ for (const auto& shard : rings.d_shards) {
+ BOOST_CHECK_EQUAL(shard->respRing.size(), entriesPerShard);
+ for (const auto& entry : shard->respRing) {
+ BOOST_CHECK_EQUAL(entry.name, qname);
+ BOOST_CHECK_EQUAL(entry.qtype, qtype);
+ BOOST_CHECK_EQUAL(entry.size, size);
+ BOOST_CHECK_EQUAL(entry.when.tv_sec, now.tv_sec);
+ BOOST_CHECK_EQUAL(entry.requestor.toStringWithPort(), requestor2.toStringWithPort());
+ BOOST_CHECK_EQUAL(entry.usec, latency);
+ BOOST_CHECK_EQUAL(entry.ds.toStringWithPort(), server.toStringWithPort());
+ }
+ }
+}
+
+
+BOOST_AUTO_TEST_CASE(test_Rings_Simple) {
+
+ /* 5 entries over 1 shard */
+ test_ring(5, 1, 0);
+ /* 500 entries over 10 shards */
+ test_ring(500, 10, 0);
+ /* 5000 entries over 100 shards, max 5 try-lock attempts */
+ test_ring(500, 100, 5);
+}
+
+static void ringReaderThread(Rings& rings, std::atomic<bool>& done, size_t numberOfEntries, uint16_t qtype)
+{
+ size_t iterationsDone = 0;
+
+ while (done == false) {
+ size_t numberOfQueries = 0;
+ size_t numberOfResponses = 0;
+
+ for (auto& shard : rings.d_shards) {
+ {
+ std::lock_guard<std::mutex> rl(shard->queryLock);
+ for(const auto& c : shard->queryRing) {
+ numberOfQueries++;
+ // BOOST_CHECK* is slow as hell..
+ if(c.qtype != qtype) {
+ cerr<<"Invalid query QType!"<<endl;
+ return;
+ }
+ }
+ }
+ {
+ std::lock_guard<std::mutex> rl(shard->respLock);
+ for(const auto& c : shard->respRing) {
+ if(c.qtype != qtype) {
+ cerr<<"Invalid response QType!"<<endl;
+ return;
+ }
+ numberOfResponses++;
+ }
+ }
+ }
+
+ BOOST_CHECK_LE(numberOfQueries, numberOfEntries);
+ BOOST_CHECK_LE(numberOfResponses, numberOfEntries);
+ iterationsDone++;
+ usleep(10000);
+ }
+
+ BOOST_CHECK_GT(iterationsDone, 1);
+#if 0
+ cerr<<"Done "<<iterationsDone<<" reading iterations"<<endl;
+#endif
+}
+
+static void ringWriterThread(Rings& rings, size_t numberOfEntries, const Rings::Query query, const Rings::Response response)
+{
+ for (size_t idx = 0; idx < numberOfEntries; idx++) {
+ rings.insertQuery(query.when, query.requestor, query.name, query.qtype, query.size, query.dh);
+ rings.insertResponse(response.when, response.requestor, response.name, response.qtype, response.usec, response.size, response.dh, response.ds);
+ }
+}
+
+BOOST_AUTO_TEST_CASE(test_Rings_Threaded) {
+ size_t numberOfEntries = 1000000;
+ size_t numberOfShards = 50;
+ size_t lockAttempts = 5;
+ size_t numberOfWriterThreads = 4;
+ size_t entriesPerShard = numberOfEntries / numberOfShards;
+
+ struct timespec now;
+ gettime(&now);
+ dnsheader dh;
+ dh.id = htons(4242);
+ dh.qr = 0;
+ dh.tc = 0;
+ dh.rd = 0;
+ dh.rcode = 0;
+ dh.qdcount = htons(1);
+ DNSName qname("rings.powerdns.com.");
+ ComboAddress requestor("192.0.2.1");
+ ComboAddress server("192.0.2.42");
+ unsigned int latency = 100;
+ uint16_t qtype = QType::AAAA;
+ uint16_t size = 42;
+
+ Rings rings(numberOfEntries, numberOfShards, lockAttempts, true);
+ Rings::Query query({now, requestor, qname, size, qtype, dh});
+ Rings::Response response({now, requestor, qname, qtype, latency, size, dh, server});
+
+ std::atomic<bool> done;
+ std::vector<std::thread> writerThreads;
+ std::thread readerThread(ringReaderThread, std::ref(rings), std::ref(done), numberOfEntries, qtype);
+
+ /* we need to overcommit a bit to account for the fact that due to contention,
+ we might not perfectly distribute the entries over the shards,
+ so some of them might get full while other still have some place left */
+ size_t insertionsPerThread = (1.2 * numberOfEntries) / numberOfWriterThreads;
+ for (size_t idx = 0; idx < numberOfWriterThreads; idx++) {
+ writerThreads.push_back(std::thread(ringWriterThread, std::ref(rings), insertionsPerThread, query, response));
+ }
+
+ /* wait for the writers to be finished */
+ for (auto& t : writerThreads) {
+ t.join();
+ }
+
+ /* we can stop the reader thread now */
+ done = true;
+ readerThread.join();
+
+ BOOST_CHECK_EQUAL(rings.getNumberOfShards(), numberOfShards);
+ BOOST_CHECK_EQUAL(rings.getNumberOfQueryEntries(), numberOfEntries);
+ BOOST_CHECK_EQUAL(rings.getNumberOfResponseEntries(), numberOfEntries);
+ BOOST_CHECK_EQUAL(rings.d_shards.size(), rings.getNumberOfShards());
+
+ size_t totalQueries = 0;
+ size_t totalResponses = 0;
+ for (const auto& shard : rings.d_shards) {
+ BOOST_CHECK_EQUAL(shard->queryRing.size(), entriesPerShard);
+ totalQueries += shard->queryRing.size();
+ for (const auto& entry : shard->queryRing) {
+ BOOST_CHECK_EQUAL(entry.name, qname);
+ BOOST_CHECK_EQUAL(entry.qtype, qtype);
+ BOOST_CHECK_EQUAL(entry.size, size);
+ BOOST_CHECK_EQUAL(entry.when.tv_sec, now.tv_sec);
+ BOOST_CHECK_EQUAL(entry.requestor.toStringWithPort(), requestor.toStringWithPort());
+ }
+ BOOST_CHECK_EQUAL(shard->respRing.size(), entriesPerShard);
+ totalResponses += shard->respRing.size();
+ for (const auto& entry : shard->respRing) {
+ BOOST_CHECK_EQUAL(entry.name, qname);
+ BOOST_CHECK_EQUAL(entry.qtype, qtype);
+ BOOST_CHECK_EQUAL(entry.size, size);
+ BOOST_CHECK_EQUAL(entry.when.tv_sec, now.tv_sec);
+ BOOST_CHECK_EQUAL(entry.requestor.toStringWithPort(), requestor.toStringWithPort());
+ BOOST_CHECK_EQUAL(entry.usec, latency);
+ BOOST_CHECK_EQUAL(entry.ds.toStringWithPort(), server.toStringWithPort());
+ }
+ }
+ BOOST_CHECK_EQUAL(rings.getNumberOfQueryEntries(), totalQueries);
+ BOOST_CHECK_EQUAL(rings.getNumberOfResponseEntries(), totalResponses);
+#if 0
+ cerr<<"Done "<<(insertionsPerThread*numberOfWriterThreads)<<" insertions"<<endl;
+ cerr<<"Got "<<rings.d_deferredQueryInserts<<" deferred query insertions"<<endl;
+ cerr<<"Got "<<rings.d_blockingQueryInserts<<" blocking query insertions"<<endl;
+ cerr<<"Got "<<rings.d_deferredResponseInserts<<" deferred response insertions"<<endl;
+ cerr<<"Got "<<rings.d_blockingResponseInserts<<" blocking response insertions"<<endl;
+#endif
+}
+
+BOOST_AUTO_TEST_SUITE_END()