From: Remi Gacogne Date: Tue, 16 Jan 2018 15:59:38 +0000 (+0100) Subject: dnsdist: Ring buffers sharding X-Git-Tag: dnsdist-1.3.0~1^2~12 X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=01f6920be869c5a3ccb5c0444d2bd3b6d3aba26f;p=pdns dnsdist: Ring buffers sharding --- diff --git a/pdns/dnsdist-lua-inspection.cc b/pdns/dnsdist-lua-inspection.cc index d263bdecc..3cf82e460 100644 --- a/pdns/dnsdist-lua-inspection.cc +++ b/pdns/dnsdist-lua-inspection.cc @@ -31,26 +31,27 @@ static std::unordered_map>> g map counts; unsigned int total=0; { - std::lock_guard lock(g_rings.respMutex); - if(!labels) { - for(const auto& a : g_rings.respRing) { - if(!pred(a)) - continue; - counts[a.name]++; - total++; - } - } - else { - unsigned int lab = *labels; - for(auto a : g_rings.respRing) { - if(!pred(a)) - continue; - - a.name.trimToLabels(lab); - counts[a.name]++; - total++; + for (size_t idx = 0; idx < g_rings.getNumberOfShards(); idx++) { + ReadLock rl(&g_rings.d_shards[idx].respLock); + if(!labels) { + for(const auto& a : g_rings.d_shards[idx].respRing) { + if(!pred(a)) + continue; + counts[a.name]++; + total++; + } } + else { + unsigned int lab = *labels; + for(auto a : g_rings.d_shards[idx].respRing) { + if(!pred(a)) + continue; + a.name.trimToLabels(lab); + counts[a.name]++; + total++; + } + } } } // cout<<"Looked at "< lock(g_rings.respMutex); - for(const auto& c : g_rings.respRing) { + for (size_t idx = 0; idx < g_rings.getNumberOfShards(); idx++) { + ReadLock rl(&g_rings.d_shards[idx].respLock); + + for(const auto& c : g_rings.d_shards[idx].respRing) { if (now < c.when) continue; @@ -126,18 +128,22 @@ static vector > > getRespRi { typedef std::unordered_map entry_t; vector > ret; - std::lock_guard lock(g_rings.respMutex); - - entry_t e; - unsigned int count=1; - for(const auto& c : g_rings.respRing) { - if(rcode && (rcode.get() != c.dh.rcode)) - continue; - e["qname"]=c.name.toString(); - e["rcode"]=std::to_string(c.dh.rcode); - ret.push_back(std::make_pair(count,e)); - count++; + + for (size_t idx = 0; idx < g_rings.getNumberOfShards(); idx++) { + ReadLock rl(&g_rings.d_shards[idx].respLock); + + entry_t e; + unsigned int count=1; + for(const auto& c : g_rings.d_shards[idx].respRing) { + if(rcode && (rcode.get() != c.dh.rcode)) + continue; + e["qname"]=c.name.toString(); + e["rcode"]=std::to_string(c.dh.rcode); + ret.push_back(std::make_pair(count,e)); + count++; + } } + return ret; } @@ -149,10 +155,18 @@ static counts_t exceedRespGen(unsigned int rate, int seconds, std::function lock(g_rings.respMutex); - counts.reserve(g_rings.respRing.size()); - for(const auto& c : g_rings.respRing) { + size_t total = 0; + for (size_t idx = 0; idx < g_rings.getNumberOfShards(); idx++) { + ReadLock rl(&g_rings.d_shards[idx].respLock); + total += g_rings.d_shards[idx].respRing.size(); + } + + counts.reserve(total); + + for (size_t idx = 0; idx < g_rings.getNumberOfShards(); idx++) { + ReadLock rl(&g_rings.d_shards[idx].respLock); + for(const auto& c : g_rings.d_shards[idx].respRing) { + if(seconds && c.when < cutoff) continue; if(now < c.when) @@ -176,10 +190,17 @@ static counts_t exceedQueryGen(unsigned int rate, int seconds, std::function counts; unsigned int total=0; { - ReadLock rl(&g_rings.queryLock); - for(const auto& c : g_rings.queryRing) { - counts[c.requestor]++; - total++; + for (size_t idx = 0; idx < g_rings.getNumberOfShards(); idx++) { + ReadLock rl(&g_rings.d_shards[idx].queryLock); + for(const auto& c : g_rings.d_shards[idx].queryRing) { + counts[c.requestor]++; + total++; + } } } vector> rcounts; @@ -251,20 +274,24 @@ void setupLuaInspection() map counts; unsigned int total=0; if(!labels) { - ReadLock rl(&g_rings.queryLock); - for(const auto& a : g_rings.queryRing) { - counts[a.name]++; - total++; - } + for (size_t idx = 0; idx < g_rings.getNumberOfShards(); idx++) { + ReadLock rl(&g_rings.d_shards[idx].queryLock); + for(const auto& a : g_rings.d_shards[idx].queryRing) { + counts[a.name]++; + total++; + } + } } else { unsigned int lab = *labels; - ReadLock rl(&g_rings.queryLock); - for(auto a : g_rings.queryRing) { - a.name.trimToLabels(lab); - counts[a.name]++; - total++; - } + for (size_t idx = 0; idx < g_rings.getNumberOfShards(); idx++) { + ReadLock rl(&g_rings.d_shards[idx].queryLock); + for(auto a : g_rings.d_shards[idx].queryRing) { + a.name.trimToLabels(lab); + counts[a.name]++; + total++; + } + } } // cout<<"Looked at "<> rcounts; @@ -294,20 +321,27 @@ void setupLuaInspection() g_lua.writeFunction("getResponseRing", []() { setLuaNoSideEffect(); - decltype(g_rings.respRing) ring; - { - std::lock_guard lock(g_rings.respMutex); - ring = g_rings.respRing; + size_t totalEntries = 0; + std::vector> rings; + rings.reserve(g_rings.getNumberOfShards()); + for (size_t idx = 0; idx < g_rings.getNumberOfShards(); idx++) { + { + ReadLock rl(&g_rings.d_shards[idx].respLock); + rings[idx] = g_rings.d_shards[idx].respRing; + } + totalEntries += rings[idx].size(); } vector > > ret; - ret.reserve(ring.size()); + ret.reserve(totalEntries); decltype(ret)::value_type item; - for(const auto& r : ring) { - item["name"]=r.name.toString(); - item["qtype"]=r.qtype; - item["rcode"]=r.dh.rcode; - item["usec"]=r.usec; - ret.push_back(item); + for (size_t idx = 0; idx < rings.size(); idx++) { + for(const auto& r : rings[idx]) { + item["name"]=r.name.toString(); + item["qtype"]=r.qtype; + item["rcode"]=r.dh.rcode; + item["usec"]=r.usec; + ret.push_back(item); + } } return ret; }); @@ -382,19 +416,28 @@ void setupLuaInspection() } } - decltype(g_rings.queryRing) qr; - decltype(g_rings.respRing) rr; - { - ReadLock rl(&g_rings.queryLock); - qr=g_rings.queryRing; + std::vector qr; + std::vector rr; + for (size_t idx = 0; idx < g_rings.getNumberOfShards(); idx++) { + { + ReadLock rl(&g_rings.d_shards[idx].queryLock); + qr.resize(qr.size() + g_rings.d_shards[idx].queryRing.size()); + for (const auto& entry : g_rings.d_shards[idx].queryRing) { + qr.push_back(entry); + } + } + { + ReadLock rl(&g_rings.d_shards[idx].respLock); + rr.resize(rr.size() + g_rings.d_shards[idx].respRing.size()); + for (const auto& entry : g_rings.d_shards[idx].respRing) { + rr.push_back(entry); + } + } } + sort(qr.begin(), qr.end(), [](const decltype(qr)::value_type& a, const decltype(qr)::value_type& b) { return b.when < a.when; }); - { - std::lock_guard lock(g_rings.respMutex); - rr=g_rings.respRing; - } sort(rr.begin(), rr.end(), [](const decltype(rr)::value_type& a, const decltype(rr)::value_type& b) { return b.when < a.when; @@ -471,20 +514,22 @@ void setupLuaInspection() double totlat=0; unsigned int size=0; { - std::lock_guard lock(g_rings.respMutex); - for(const auto& r : g_rings.respRing) { - /* skip actively discovered timeouts */ - if (r.usec == std::numeric_limits::max()) - continue; - - ++size; - auto iter = histo.lower_bound(r.usec); - if(iter != histo.end()) - iter->second++; - else - histo.rbegin()++; - totlat+=r.usec; - } + for (size_t idx = 0; idx < g_rings.getNumberOfShards(); idx++) { + ReadLock rl(&g_rings.d_shards[idx].respLock); + for(const auto& r : g_rings.d_shards[idx].respRing) { + /* skip actively discovered timeouts */ + if (r.usec == std::numeric_limits::max()) + continue; + + ++size; + auto iter = histo.lower_bound(r.usec); + if(iter != histo.end()) + iter->second++; + else + histo.rbegin()++; + totlat+=r.usec; + } + } } if (size == 0) { diff --git a/pdns/dnsdist-lua.cc b/pdns/dnsdist-lua.cc index c0270524a..58078e7bf 100644 --- a/pdns/dnsdist-lua.cc +++ b/pdns/dnsdist-lua.cc @@ -325,14 +325,14 @@ void setupLuaConfig(bool client) if (ret->connected) { if(g_launchWork) { g_launchWork->push_back([ret,cpus]() { - ret->tid = thread(responderThread, ret); + ret->tid = thread(responderThread, ret, g_rings.getResponseInserterId()); if (!cpus.empty()) { mapThreadToCPUList(ret->tid.native_handle(), cpus); } }); } else { - ret->tid = thread(responderThread, ret); + ret->tid = thread(responderThread, ret, g_rings.getResponseInserterId()); if (!cpus.empty()) { mapThreadToCPUList(ret->tid.native_handle(), cpus); } @@ -1298,14 +1298,14 @@ void setupLuaConfig(bool client) g_servFailOnNoPolicy = servfail; }); - g_lua.writeFunction("setRingBuffersSize", [](size_t capacity) { + g_lua.writeFunction("setRingBuffersSize", [](size_t capacity, boost::optional numberOfShards) { setLuaSideEffect(); if (g_configurationDone) { errlog("setRingBuffersSize() cannot be used at runtime!"); g_outputBuffer="setRingBuffersSize() cannot be used at runtime!\n"; return; } - g_rings.setCapacity(capacity); + g_rings.setCapacity(capacity, numberOfShards ? *numberOfShards : 1); }); g_lua.writeFunction("setWHashedPertubation", [](uint32_t pertub) { diff --git a/pdns/dnsdist-rings.cc b/pdns/dnsdist-rings.cc index 6fef34271..83c9ca587 100644 --- a/pdns/dnsdist-rings.cc +++ b/pdns/dnsdist-rings.cc @@ -25,9 +25,12 @@ size_t Rings::numDistinctRequestors() { std::set s; - ReadLock rl(&queryLock); - for(const auto& q : queryRing) - s.insert(q.requestor); + for (size_t idx = 0; idx < getNumberOfShards(); idx++) { + ReadLock rl(&d_shards[idx].queryLock); + for(const auto& q : d_shards[idx].queryRing) { + s.insert(q.requestor); + } + } return s.size(); } @@ -35,19 +38,20 @@ std::unordered_map>> Rings::getTopBand { map counts; uint64_t total=0; - { - ReadLock rl(&queryLock); - for(const auto& q : queryRing) { - counts[q.requestor]+=q.size; - total+=q.size; + for (size_t idx = 0; idx < getNumberOfShards(); idx++) { + { + ReadLock rl(&d_shards[idx].queryLock); + for(const auto& q : d_shards[idx].queryRing) { + counts[q.requestor]+=q.size; + total+=q.size; + } } - } - - { - std::lock_guard lock(respMutex); - for(const auto& r : respRing) { - counts[r.requestor]+=r.size; - total+=r.size; + { + ReadLock rl(&d_shards[idx].respLock); + for(const auto& r : d_shards[idx].respRing) { + counts[r.requestor]+=r.size; + total+=r.size; + } } } diff --git a/pdns/dnsdist-tcp.cc b/pdns/dnsdist-tcp.cc index f0ec210d4..266aac8bd 100644 --- a/pdns/dnsdist-tcp.cc +++ b/pdns/dnsdist-tcp.cc @@ -235,6 +235,7 @@ void* tcpClientThread(int pipefd) /* we get launched with a pipe on which we receive file descriptors from clients that we own from that point on */ + const auto queryInserterId = g_rings.getQueryInserterId(); bool outstanding = false; time_t lastTCPCleanup = time(nullptr); @@ -358,7 +359,7 @@ void* tcpClientThread(int pipefd) DNSName qname(query, qlen, sizeof(dnsheader), false, &qtype, &qclass, &consumed); DNSQuestion dq(&qname, qtype, qclass, &dest, &ci.remote, dh, queryBuffer.capacity(), qlen, true, &queryRealTime); - if (!processQuery(holders, dq, poolname, &delayMsec, now)) { + if (!processQuery(holders, dq, poolname, &delayMsec, now, queryInserterId)) { goto drop; } @@ -632,10 +633,7 @@ void* tcpClientThread(int pipefd) struct timespec answertime; gettime(&answertime); unsigned int udiff = 1000000.0*DiffTime(now,answertime); - { - std::lock_guard lock(g_rings.respMutex); - g_rings.respRing.push_back({answertime, ci.remote, qname, dq.qtype, (unsigned int)udiff, (unsigned int)responseLen, *dh, ds->remote}); - } + g_rings.insertResponse(answertime, ci.remote, qname, dq.qtype, (unsigned int)udiff, (unsigned int)responseLen, *dh, ds->remote, queryInserterId); rewrittenResponse.clear(); } diff --git a/pdns/dnsdist.cc b/pdns/dnsdist.cc index d9c18bf31..2a15d291b 100644 --- a/pdns/dnsdist.cc +++ b/pdns/dnsdist.cc @@ -406,7 +406,7 @@ static void pickBackendSocketsReadyForReceiving(const std::shared_ptr dss) +void* responderThread(std::shared_ptr dss, const size_t responseInserterId) try { auto localRespRulactions = g_resprulactions.getLocal(); #ifdef HAVE_DNSCRYPT @@ -512,12 +512,9 @@ try { double udiff = ids->sentTime.udiff(); vinfolog("Got answer from %s, relayed to %s, took %f usec", dss->remote.toStringWithPort(), ids->origRemote.toStringWithPort(), udiff); - { - struct timespec ts; - gettime(&ts); - std::lock_guard lock(g_rings.respMutex); - g_rings.respRing.push_back({ts, ids->origRemote, ids->qname, ids->qtype, (unsigned int)udiff, (unsigned int)got, *dh, dss->remote}); - } + struct timespec ts; + gettime(&ts); + g_rings.insertResponse(ts, ids->origRemote, ids->qname, ids->qtype, (unsigned int)udiff, (unsigned int)got, *dh, dss->remote, responseInserterId); if(dh->rcode == RCode::ServFail) g_stats.servfailResponses++; @@ -859,12 +856,9 @@ static void spoofResponseFromString(DNSQuestion& dq, const string& spoofContent) } } -bool processQuery(LocalHolders& holders, DNSQuestion& dq, string& poolname, int* delayMsec, const struct timespec& now) +bool processQuery(LocalHolders& holders, DNSQuestion& dq, string& poolname, int* delayMsec, const struct timespec& now, size_t queryInserterId) { - { - WriteLock wl(&g_rings.queryLock); - g_rings.queryRing.push_back({now,*dq.remote,*dq.qname,dq.len,dq.qtype,*dq.dh}); - } + g_rings.insertQuery(now,*dq.remote,*dq.qname,dq.len,dq.qtype,*dq.dh, queryInserterId); if(g_qcount.enabled) { string qname = (*dq.qname).toString("."); @@ -1209,7 +1203,7 @@ static void queueResponse(const ClientState& cs, const char* response, uint16_t } #endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */ -static void processUDPQuery(ClientState& cs, LocalHolders& holders, const struct msghdr* msgh, const ComboAddress& remote, ComboAddress& dest, char* query, uint16_t len, size_t queryBufferSize, struct mmsghdr* responsesVect, unsigned int* queuedResponses, struct iovec* respIOV, char* respCBuf) +static void processUDPQuery(ClientState& cs, LocalHolders& holders, const struct msghdr* msgh, const ComboAddress& remote, ComboAddress& dest, char* query, uint16_t len, size_t queryBufferSize, struct mmsghdr* responsesVect, unsigned int* queuedResponses, struct iovec* respIOV, char* respCBuf, size_t queryInserterId) { assert(responsesVect == nullptr || (queuedResponses != nullptr && respIOV != nullptr && respCBuf != nullptr)); uint16_t queryId = 0; @@ -1251,7 +1245,7 @@ static void processUDPQuery(ClientState& cs, LocalHolders& holders, const struct DNSName qname(query, len, sizeof(dnsheader), false, &qtype, &qclass, &consumed); DNSQuestion dq(&qname, qtype, qclass, dest.sin4.sin_family != 0 ? &dest : &cs.local, &remote, dh, queryBufferSize, len, false, &queryRealTime); - if (!processQuery(holders, dq, poolname, &delayMsec, now)) + if (!processQuery(holders, dq, poolname, &delayMsec, now, queryInserterId)) { return; } @@ -1479,7 +1473,7 @@ static void processUDPQuery(ClientState& cs, LocalHolders& holders, const struct } #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) -static void MultipleMessagesUDPClientThread(ClientState* cs, LocalHolders& holders) +static void MultipleMessagesUDPClientThread(ClientState* cs, LocalHolders& holders, size_t queryInserterId) { struct MMReceiver { @@ -1540,7 +1534,7 @@ static void MultipleMessagesUDPClientThread(ClientState* cs, LocalHolders& holde continue; } - processUDPQuery(*cs, holders, msgh, remote, recvData[msgIdx].dest, recvData[msgIdx].packet, static_cast(got), sizeof(recvData[msgIdx].packet), outMsgVec.get(), &msgsToSend, &recvData[msgIdx].iov, recvData[msgIdx].cbuf); + processUDPQuery(*cs, holders, msgh, remote, recvData[msgIdx].dest, recvData[msgIdx].packet, static_cast(got), sizeof(recvData[msgIdx].packet), outMsgVec.get(), &msgsToSend, &recvData[msgIdx].iov, recvData[msgIdx].cbuf, queryInserterId); } @@ -1560,14 +1554,14 @@ static void MultipleMessagesUDPClientThread(ClientState* cs, LocalHolders& holde #endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */ // listens to incoming queries, sends out to downstream servers, noting the intended return path -static void* udpClientThread(ClientState* cs) +static void* udpClientThread(ClientState* cs, size_t queryInserterId) try { LocalHolders holders; #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) if (g_udpVectorSize > 1) { - MultipleMessagesUDPClientThread(cs, holders); + MultipleMessagesUDPClientThread(cs, holders, queryInserterId); } else @@ -1598,7 +1592,7 @@ try continue; } - processUDPQuery(*cs, holders, &msgh, remote, dest, packet, static_cast(got), s_udpIncomingBufferSize, nullptr, nullptr, nullptr, nullptr); + processUDPQuery(*cs, holders, &msgh, remote, dest, packet, static_cast(got), s_udpIncomingBufferSize, nullptr, nullptr, nullptr, nullptr, queryInserterId); } } @@ -1807,7 +1801,7 @@ void* healthChecksThread() } } if (dss->connected) { - dss->tid = thread(responderThread, dss); + dss->tid = thread(responderThread, dss, g_rings.getResponseInserterId()); } } @@ -1851,8 +1845,7 @@ void* healthChecksThread() memset(&fake, 0, sizeof(fake)); fake.id = ids.origID; - std::lock_guard lock(g_rings.respMutex); - g_rings.respRing.push_back({ts, ids.origRemote, ids.qname, ids.qtype, std::numeric_limits::max(), 0, fake, dss->remote}); + g_rings.insertResponse(ts, ids.origRemote, ids.qname, ids.qtype, std::numeric_limits::max(), 0, fake, dss->remote, 0); } } } @@ -2551,7 +2544,7 @@ try auto ret=std::make_shared(ComboAddress(address, 53)); addServerToPool(localPools, "", ret); if (ret->connected) { - ret->tid = thread(responderThread, ret); + ret->tid = thread(responderThread, ret, g_rings.getResponseInserterId()); } g_dstates.modify([ret](servers_t& servers) { servers.push_back(ret); }); } @@ -2575,7 +2568,7 @@ try for(auto& cs : toLaunch) { if (cs->udpFD >= 0) { - thread t1(udpClientThread, cs); + thread t1(udpClientThread, cs, g_rings.getQueryInserterId()); if (!cs->cpus.empty()) { mapThreadToCPUList(t1.native_handle(), cs->cpus); } diff --git a/pdns/dnsdist.hh b/pdns/dnsdist.hh index b7f0f9b63..270b18b86 100644 --- a/pdns/dnsdist.hh +++ b/pdns/dnsdist.hh @@ -376,12 +376,6 @@ struct IDState }; struct Rings { - Rings(size_t capacity=10000) - { - queryRing.set_capacity(capacity); - respRing.set_capacity(capacity); - pthread_rwlock_init(&queryLock, nullptr); - } struct Query { struct timespec when; @@ -391,7 +385,6 @@ struct Rings { uint16_t qtype; struct dnsheader dh; }; - boost::circular_buffer queryRing; struct Response { struct timespec when; @@ -403,23 +396,87 @@ struct Rings { struct dnsheader dh; ComboAddress ds; // who handled it }; - boost::circular_buffer respRing; - std::mutex respMutex; - pthread_rwlock_t queryLock; + struct Shard + { + boost::circular_buffer queryRing; + boost::circular_buffer respRing; + pthread_rwlock_t queryLock; + pthread_rwlock_t respLock; + }; + + Rings(size_t capacity=10000, size_t numberOfShards=1): d_numberOfShards(numberOfShards) + { + setCapacity(capacity, numberOfShards); + } std::unordered_map > > getTopBandwidth(unsigned int numentries); size_t numDistinctRequestors(); - void setCapacity(size_t newCapacity) + void setCapacity(size_t newCapacity, size_t numberOfShards) { - { - WriteLock wl(&queryLock); - queryRing.set_capacity(newCapacity); + if (numberOfShards < d_numberOfShards) { + throw std::runtime_error("Decreasing the number of shards in the query and response rings is not supported"); } - { - std::lock_guard lock(respMutex); - respRing.set_capacity(newCapacity); + + d_shards.resize(numberOfShards); + + /* set up the locks for the new shards */ + for (size_t idx = d_numberOfShards; idx < numberOfShards; idx++) { + pthread_rwlock_init(&d_shards[idx].queryLock, 0); + pthread_rwlock_init(&d_shards[idx].respLock, 0); + } + + d_numberOfShards = numberOfShards; + + /* resize all the rings */ + for (size_t idx = 0; idx < numberOfShards; idx++) { + { + WriteLock wl(&d_shards[idx].queryLock); + d_shards[idx].queryRing.set_capacity(newCapacity / numberOfShards); + } + { + WriteLock wl(&d_shards[idx].respLock); + d_shards[idx].respRing.set_capacity(newCapacity / numberOfShards); + } } } + size_t getQueryInserterId() + { + return s_queryInserterId++; + } + size_t getResponseInserterId() + { + return s_responseInserterId++; + } + size_t getNumberOfShards() const + { + return d_numberOfShards; + } + + void insertQuery(const struct timespec& when, const ComboAddress& requestor, const DNSName& name, uint16_t qtype, uint16_t size, const struct dnsheader& dh, size_t queryInserterId) + { + auto shardId = getShardId(queryInserterId); + WriteLock wl(&d_shards[shardId].queryLock); + d_shards[shardId].queryRing.push_back({when, requestor, name, size, qtype, dh}); + } + + void insertResponse(const struct timespec& when, const ComboAddress& requestor, const DNSName& name, uint16_t qtype, unsigned int usec, unsigned int size, const struct dnsheader& dh, const ComboAddress& backend, size_t responseInserterId) + { + auto shardId = getShardId(responseInserterId); + WriteLock wl(&d_shards[shardId].respLock); + d_shards[shardId].respRing.push_back({when, requestor, name, qtype, usec, size, dh, backend}); + } + + std::vector d_shards; + +private: + size_t getShardId(size_t id) const + { + return (id % d_numberOfShards); + } + + std::atomic s_queryInserterId{0}; + std::atomic s_responseInserterId{0}; + size_t d_numberOfShards; }; extern Rings g_rings; @@ -633,7 +690,7 @@ using servers_t =vector>; template using NumberedVector = std::vector >; -void* responderThread(std::shared_ptr state); +void* responderThread(std::shared_ptr state, size_t responseInserterId); extern std::mutex g_luamutex; extern LuaContext g_lua; extern std::string g_outputBuffer; // locking for this is ok, as locked by g_luamutex @@ -868,7 +925,7 @@ bool getLuaNoSideEffect(); // set if there were only explicit declarations of _n void resetLuaSideEffect(); // reset to indeterminate state bool responseContentMatches(const char* response, const uint16_t responseLen, const DNSName& qname, const uint16_t qtype, const uint16_t qclass, const ComboAddress& remote); -bool processQuery(LocalHolders& holders, DNSQuestion& dq, string& poolname, int* delayMsec, const struct timespec& now); +bool processQuery(LocalHolders& holders, DNSQuestion& dq, string& poolname, int* delayMsec, const struct timespec& now, size_t queryInserterId); bool processResponse(LocalStateHolder >& localRespRulactions, DNSResponse& dr, int* delayMsec); bool fixUpResponse(char** response, uint16_t* responseLen, size_t* responseSize, const DNSName& qname, uint16_t origFlags, bool ednsAdded, bool ecsAdded, std::vector& rewrittenResponse, uint16_t addRoom); void restoreFlags(struct dnsheader* dh, uint16_t origFlags);