map<DNSName, unsigned int> counts;
unsigned int total=0;
{
- std::lock_guard<std::mutex> lock(g_rings.respMutex);
- if(!labels) {
- for(const auto& a : g_rings.respRing) {
- if(!pred(a))
- continue;
- counts[a.name]++;
- total++;
- }
- }
- else {
- unsigned int lab = *labels;
- for(auto a : g_rings.respRing) {
- if(!pred(a))
- continue;
-
- a.name.trimToLabels(lab);
- counts[a.name]++;
- total++;
+ for (size_t idx = 0; idx < g_rings.getNumberOfShards(); idx++) {
+ ReadLock rl(&g_rings.d_shards[idx].respLock);
+ if(!labels) {
+ for(const auto& a : g_rings.d_shards[idx].respRing) {
+ if(!pred(a))
+ continue;
+ counts[a.name]++;
+ total++;
+ }
}
+ else {
+ unsigned int lab = *labels;
+ for(auto a : g_rings.d_shards[idx].respRing) {
+ if(!pred(a))
+ continue;
+ a.name.trimToLabels(lab);
+ counts[a.name]++;
+ total++;
+ }
+ }
}
}
// cout<<"Looked at "<<total<<" responses, "<<counts.size()<<" different ones"<<endl;
cutoff.tv_sec -= seconds;
StatNode root;
- {
- std::lock_guard<std::mutex> lock(g_rings.respMutex);
- for(const auto& c : g_rings.respRing) {
+ for (size_t idx = 0; idx < g_rings.getNumberOfShards(); idx++) {
+ ReadLock rl(&g_rings.d_shards[idx].respLock);
+
+ for(const auto& c : g_rings.d_shards[idx].respRing) {
if (now < c.when)
continue;
{
typedef std::unordered_map<string,string> entry_t;
vector<pair<unsigned int, entry_t > > ret;
- std::lock_guard<std::mutex> lock(g_rings.respMutex);
-
- entry_t e;
- unsigned int count=1;
- for(const auto& c : g_rings.respRing) {
- if(rcode && (rcode.get() != c.dh.rcode))
- continue;
- e["qname"]=c.name.toString();
- e["rcode"]=std::to_string(c.dh.rcode);
- ret.push_back(std::make_pair(count,e));
- count++;
+
+ for (size_t idx = 0; idx < g_rings.getNumberOfShards(); idx++) {
+ ReadLock rl(&g_rings.d_shards[idx].respLock);
+
+ entry_t e;
+ unsigned int count=1;
+ for(const auto& c : g_rings.d_shards[idx].respRing) {
+ if(rcode && (rcode.get() != c.dh.rcode))
+ continue;
+ e["qname"]=c.name.toString();
+ e["rcode"]=std::to_string(c.dh.rcode);
+ ret.push_back(std::make_pair(count,e));
+ count++;
+ }
}
+
return ret;
}
cutoff = mintime = now;
cutoff.tv_sec -= seconds;
- {
- std::lock_guard<std::mutex> lock(g_rings.respMutex);
- counts.reserve(g_rings.respRing.size());
- for(const auto& c : g_rings.respRing) {
+ size_t total = 0;
+ for (size_t idx = 0; idx < g_rings.getNumberOfShards(); idx++) {
+ ReadLock rl(&g_rings.d_shards[idx].respLock);
+ total += g_rings.d_shards[idx].respRing.size();
+ }
+
+ counts.reserve(total);
+
+ for (size_t idx = 0; idx < g_rings.getNumberOfShards(); idx++) {
+ ReadLock rl(&g_rings.d_shards[idx].respLock);
+ for(const auto& c : g_rings.d_shards[idx].respRing) {
+
if(seconds && c.when < cutoff)
continue;
if(now < c.when)
cutoff = mintime = now;
cutoff.tv_sec -= seconds;
- {
- ReadLock rl(&g_rings.queryLock);
- counts.reserve(g_rings.queryRing.size());
- for(const auto& c : g_rings.queryRing) {
+ size_t total = 0;
+ for (size_t idx = 0; idx < g_rings.getNumberOfShards(); idx++) {
+ ReadLock rl(&g_rings.d_shards[idx].queryLock);
+ total += g_rings.d_shards[idx].queryRing.size();
+ }
+
+ counts.reserve(total);
+
+ for (size_t idx = 0; idx < g_rings.getNumberOfShards(); idx++) {
+ ReadLock rl(&g_rings.d_shards[idx].queryLock);
+ for(const auto& c : g_rings.d_shards[idx].queryRing) {
if(seconds && c.when < cutoff)
continue;
if(now < c.when)
map<ComboAddress, unsigned int,ComboAddress::addressOnlyLessThan > counts;
unsigned int total=0;
{
- ReadLock rl(&g_rings.queryLock);
- for(const auto& c : g_rings.queryRing) {
- counts[c.requestor]++;
- total++;
+ for (size_t idx = 0; idx < g_rings.getNumberOfShards(); idx++) {
+ ReadLock rl(&g_rings.d_shards[idx].queryLock);
+ for(const auto& c : g_rings.d_shards[idx].queryRing) {
+ counts[c.requestor]++;
+ total++;
+ }
}
}
vector<pair<unsigned int, ComboAddress>> rcounts;
map<DNSName, unsigned int> counts;
unsigned int total=0;
if(!labels) {
- ReadLock rl(&g_rings.queryLock);
- for(const auto& a : g_rings.queryRing) {
- counts[a.name]++;
- total++;
- }
+ for (size_t idx = 0; idx < g_rings.getNumberOfShards(); idx++) {
+ ReadLock rl(&g_rings.d_shards[idx].queryLock);
+ for(const auto& a : g_rings.d_shards[idx].queryRing) {
+ counts[a.name]++;
+ total++;
+ }
+ }
}
else {
unsigned int lab = *labels;
- ReadLock rl(&g_rings.queryLock);
- for(auto a : g_rings.queryRing) {
- a.name.trimToLabels(lab);
- counts[a.name]++;
- total++;
- }
+ for (size_t idx = 0; idx < g_rings.getNumberOfShards(); idx++) {
+ ReadLock rl(&g_rings.d_shards[idx].queryLock);
+ for(auto a : g_rings.d_shards[idx].queryRing) {
+ a.name.trimToLabels(lab);
+ counts[a.name]++;
+ total++;
+ }
+ }
}
// cout<<"Looked at "<<total<<" queries, "<<counts.size()<<" different ones"<<endl;
vector<pair<unsigned int, DNSName>> rcounts;
g_lua.writeFunction("getResponseRing", []() {
setLuaNoSideEffect();
- decltype(g_rings.respRing) ring;
- {
- std::lock_guard<std::mutex> lock(g_rings.respMutex);
- ring = g_rings.respRing;
+ size_t totalEntries = 0;
+ std::vector<boost::circular_buffer<Rings::Response>> rings;
+ rings.reserve(g_rings.getNumberOfShards());
+ for (size_t idx = 0; idx < g_rings.getNumberOfShards(); idx++) {
+ {
+ ReadLock rl(&g_rings.d_shards[idx].respLock);
+ rings[idx] = g_rings.d_shards[idx].respRing;
+ }
+ totalEntries += rings[idx].size();
}
vector<std::unordered_map<string, boost::variant<string, unsigned int> > > ret;
- ret.reserve(ring.size());
+ ret.reserve(totalEntries);
decltype(ret)::value_type item;
- for(const auto& r : ring) {
- item["name"]=r.name.toString();
- item["qtype"]=r.qtype;
- item["rcode"]=r.dh.rcode;
- item["usec"]=r.usec;
- ret.push_back(item);
+ for (size_t idx = 0; idx < rings.size(); idx++) {
+ for(const auto& r : rings[idx]) {
+ item["name"]=r.name.toString();
+ item["qtype"]=r.qtype;
+ item["rcode"]=r.dh.rcode;
+ item["usec"]=r.usec;
+ ret.push_back(item);
+ }
}
return ret;
});
}
}
- decltype(g_rings.queryRing) qr;
- decltype(g_rings.respRing) rr;
- {
- ReadLock rl(&g_rings.queryLock);
- qr=g_rings.queryRing;
+ std::vector<Rings::Query> qr;
+ std::vector<Rings::Response> rr;
+ for (size_t idx = 0; idx < g_rings.getNumberOfShards(); idx++) {
+ {
+ ReadLock rl(&g_rings.d_shards[idx].queryLock);
+ qr.resize(qr.size() + g_rings.d_shards[idx].queryRing.size());
+ for (const auto& entry : g_rings.d_shards[idx].queryRing) {
+ qr.push_back(entry);
+ }
+ }
+ {
+ ReadLock rl(&g_rings.d_shards[idx].respLock);
+ rr.resize(rr.size() + g_rings.d_shards[idx].respRing.size());
+ for (const auto& entry : g_rings.d_shards[idx].respRing) {
+ rr.push_back(entry);
+ }
+ }
}
+
sort(qr.begin(), qr.end(), [](const decltype(qr)::value_type& a, const decltype(qr)::value_type& b) {
return b.when < a.when;
});
- {
- std::lock_guard<std::mutex> lock(g_rings.respMutex);
- rr=g_rings.respRing;
- }
sort(rr.begin(), rr.end(), [](const decltype(rr)::value_type& a, const decltype(rr)::value_type& b) {
return b.when < a.when;
double totlat=0;
unsigned int size=0;
{
- std::lock_guard<std::mutex> lock(g_rings.respMutex);
- for(const auto& r : g_rings.respRing) {
- /* skip actively discovered timeouts */
- if (r.usec == std::numeric_limits<unsigned int>::max())
- continue;
-
- ++size;
- auto iter = histo.lower_bound(r.usec);
- if(iter != histo.end())
- iter->second++;
- else
- histo.rbegin()++;
- totlat+=r.usec;
- }
+ for (size_t idx = 0; idx < g_rings.getNumberOfShards(); idx++) {
+ ReadLock rl(&g_rings.d_shards[idx].respLock);
+ for(const auto& r : g_rings.d_shards[idx].respRing) {
+ /* skip actively discovered timeouts */
+ if (r.usec == std::numeric_limits<unsigned int>::max())
+ continue;
+
+ ++size;
+ auto iter = histo.lower_bound(r.usec);
+ if(iter != histo.end())
+ iter->second++;
+ else
+ histo.rbegin()++;
+ totlat+=r.usec;
+ }
+ }
}
if (size == 0) {
if (ret->connected) {
if(g_launchWork) {
g_launchWork->push_back([ret,cpus]() {
- ret->tid = thread(responderThread, ret);
+ ret->tid = thread(responderThread, ret, g_rings.getResponseInserterId());
if (!cpus.empty()) {
mapThreadToCPUList(ret->tid.native_handle(), cpus);
}
});
}
else {
- ret->tid = thread(responderThread, ret);
+ ret->tid = thread(responderThread, ret, g_rings.getResponseInserterId());
if (!cpus.empty()) {
mapThreadToCPUList(ret->tid.native_handle(), cpus);
}
g_servFailOnNoPolicy = servfail;
});
- g_lua.writeFunction("setRingBuffersSize", [](size_t capacity) {
+ g_lua.writeFunction("setRingBuffersSize", [](size_t capacity, boost::optional<size_t> numberOfShards) {
setLuaSideEffect();
if (g_configurationDone) {
errlog("setRingBuffersSize() cannot be used at runtime!");
g_outputBuffer="setRingBuffersSize() cannot be used at runtime!\n";
return;
}
- g_rings.setCapacity(capacity);
+ g_rings.setCapacity(capacity, numberOfShards ? *numberOfShards : 1);
});
g_lua.writeFunction("setWHashedPertubation", [](uint32_t pertub) {
size_t Rings::numDistinctRequestors()
{
std::set<ComboAddress, ComboAddress::addressOnlyLessThan> s;
- ReadLock rl(&queryLock);
- for(const auto& q : queryRing)
- s.insert(q.requestor);
+ for (size_t idx = 0; idx < getNumberOfShards(); idx++) {
+ ReadLock rl(&d_shards[idx].queryLock);
+ for(const auto& q : d_shards[idx].queryRing) {
+ s.insert(q.requestor);
+ }
+ }
return s.size();
}
{
map<ComboAddress, unsigned int, ComboAddress::addressOnlyLessThan> counts;
uint64_t total=0;
- {
- ReadLock rl(&queryLock);
- for(const auto& q : queryRing) {
- counts[q.requestor]+=q.size;
- total+=q.size;
+ for (size_t idx = 0; idx < getNumberOfShards(); idx++) {
+ {
+ ReadLock rl(&d_shards[idx].queryLock);
+ for(const auto& q : d_shards[idx].queryRing) {
+ counts[q.requestor]+=q.size;
+ total+=q.size;
+ }
}
- }
-
- {
- std::lock_guard<std::mutex> lock(respMutex);
- for(const auto& r : respRing) {
- counts[r.requestor]+=r.size;
- total+=r.size;
+ {
+ ReadLock rl(&d_shards[idx].respLock);
+ for(const auto& r : d_shards[idx].respRing) {
+ counts[r.requestor]+=r.size;
+ total+=r.size;
+ }
}
}
/* we get launched with a pipe on which we receive file descriptors from clients that we own
from that point on */
+ const auto queryInserterId = g_rings.getQueryInserterId();
bool outstanding = false;
time_t lastTCPCleanup = time(nullptr);
DNSName qname(query, qlen, sizeof(dnsheader), false, &qtype, &qclass, &consumed);
DNSQuestion dq(&qname, qtype, qclass, &dest, &ci.remote, dh, queryBuffer.capacity(), qlen, true, &queryRealTime);
- if (!processQuery(holders, dq, poolname, &delayMsec, now)) {
+ if (!processQuery(holders, dq, poolname, &delayMsec, now, queryInserterId)) {
goto drop;
}
struct timespec answertime;
gettime(&answertime);
unsigned int udiff = 1000000.0*DiffTime(now,answertime);
- {
- std::lock_guard<std::mutex> lock(g_rings.respMutex);
- g_rings.respRing.push_back({answertime, ci.remote, qname, dq.qtype, (unsigned int)udiff, (unsigned int)responseLen, *dh, ds->remote});
- }
+ g_rings.insertResponse(answertime, ci.remote, qname, dq.qtype, (unsigned int)udiff, (unsigned int)responseLen, *dh, ds->remote, queryInserterId);
rewrittenResponse.clear();
}
}
// listens on a dedicated socket, lobs answers from downstream servers to original requestors
-void* responderThread(std::shared_ptr<DownstreamState> dss)
+void* responderThread(std::shared_ptr<DownstreamState> dss, const size_t responseInserterId)
try {
auto localRespRulactions = g_resprulactions.getLocal();
#ifdef HAVE_DNSCRYPT
double udiff = ids->sentTime.udiff();
vinfolog("Got answer from %s, relayed to %s, took %f usec", dss->remote.toStringWithPort(), ids->origRemote.toStringWithPort(), udiff);
- {
- struct timespec ts;
- gettime(&ts);
- std::lock_guard<std::mutex> lock(g_rings.respMutex);
- g_rings.respRing.push_back({ts, ids->origRemote, ids->qname, ids->qtype, (unsigned int)udiff, (unsigned int)got, *dh, dss->remote});
- }
+ struct timespec ts;
+ gettime(&ts);
+ g_rings.insertResponse(ts, ids->origRemote, ids->qname, ids->qtype, (unsigned int)udiff, (unsigned int)got, *dh, dss->remote, responseInserterId);
if(dh->rcode == RCode::ServFail)
g_stats.servfailResponses++;
}
}
-bool processQuery(LocalHolders& holders, DNSQuestion& dq, string& poolname, int* delayMsec, const struct timespec& now)
+bool processQuery(LocalHolders& holders, DNSQuestion& dq, string& poolname, int* delayMsec, const struct timespec& now, size_t queryInserterId)
{
- {
- WriteLock wl(&g_rings.queryLock);
- g_rings.queryRing.push_back({now,*dq.remote,*dq.qname,dq.len,dq.qtype,*dq.dh});
- }
+ g_rings.insertQuery(now,*dq.remote,*dq.qname,dq.len,dq.qtype,*dq.dh, queryInserterId);
if(g_qcount.enabled) {
string qname = (*dq.qname).toString(".");
}
#endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */
-static void processUDPQuery(ClientState& cs, LocalHolders& holders, const struct msghdr* msgh, const ComboAddress& remote, ComboAddress& dest, char* query, uint16_t len, size_t queryBufferSize, struct mmsghdr* responsesVect, unsigned int* queuedResponses, struct iovec* respIOV, char* respCBuf)
+static void processUDPQuery(ClientState& cs, LocalHolders& holders, const struct msghdr* msgh, const ComboAddress& remote, ComboAddress& dest, char* query, uint16_t len, size_t queryBufferSize, struct mmsghdr* responsesVect, unsigned int* queuedResponses, struct iovec* respIOV, char* respCBuf, size_t queryInserterId)
{
assert(responsesVect == nullptr || (queuedResponses != nullptr && respIOV != nullptr && respCBuf != nullptr));
uint16_t queryId = 0;
DNSName qname(query, len, sizeof(dnsheader), false, &qtype, &qclass, &consumed);
DNSQuestion dq(&qname, qtype, qclass, dest.sin4.sin_family != 0 ? &dest : &cs.local, &remote, dh, queryBufferSize, len, false, &queryRealTime);
- if (!processQuery(holders, dq, poolname, &delayMsec, now))
+ if (!processQuery(holders, dq, poolname, &delayMsec, now, queryInserterId))
{
return;
}
}
#if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
-static void MultipleMessagesUDPClientThread(ClientState* cs, LocalHolders& holders)
+static void MultipleMessagesUDPClientThread(ClientState* cs, LocalHolders& holders, size_t queryInserterId)
{
struct MMReceiver
{
continue;
}
- processUDPQuery(*cs, holders, msgh, remote, recvData[msgIdx].dest, recvData[msgIdx].packet, static_cast<uint16_t>(got), sizeof(recvData[msgIdx].packet), outMsgVec.get(), &msgsToSend, &recvData[msgIdx].iov, recvData[msgIdx].cbuf);
+ processUDPQuery(*cs, holders, msgh, remote, recvData[msgIdx].dest, recvData[msgIdx].packet, static_cast<uint16_t>(got), sizeof(recvData[msgIdx].packet), outMsgVec.get(), &msgsToSend, &recvData[msgIdx].iov, recvData[msgIdx].cbuf, queryInserterId);
}
#endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */
// listens to incoming queries, sends out to downstream servers, noting the intended return path
-static void* udpClientThread(ClientState* cs)
+static void* udpClientThread(ClientState* cs, size_t queryInserterId)
try
{
LocalHolders holders;
#if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
if (g_udpVectorSize > 1) {
- MultipleMessagesUDPClientThread(cs, holders);
+ MultipleMessagesUDPClientThread(cs, holders, queryInserterId);
}
else
continue;
}
- processUDPQuery(*cs, holders, &msgh, remote, dest, packet, static_cast<uint16_t>(got), s_udpIncomingBufferSize, nullptr, nullptr, nullptr, nullptr);
+ processUDPQuery(*cs, holders, &msgh, remote, dest, packet, static_cast<uint16_t>(got), s_udpIncomingBufferSize, nullptr, nullptr, nullptr, nullptr, queryInserterId);
}
}
}
}
if (dss->connected) {
- dss->tid = thread(responderThread, dss);
+ dss->tid = thread(responderThread, dss, g_rings.getResponseInserterId());
}
}
memset(&fake, 0, sizeof(fake));
fake.id = ids.origID;
- std::lock_guard<std::mutex> lock(g_rings.respMutex);
- g_rings.respRing.push_back({ts, ids.origRemote, ids.qname, ids.qtype, std::numeric_limits<unsigned int>::max(), 0, fake, dss->remote});
+ g_rings.insertResponse(ts, ids.origRemote, ids.qname, ids.qtype, std::numeric_limits<unsigned int>::max(), 0, fake, dss->remote, 0);
}
}
}
auto ret=std::make_shared<DownstreamState>(ComboAddress(address, 53));
addServerToPool(localPools, "", ret);
if (ret->connected) {
- ret->tid = thread(responderThread, ret);
+ ret->tid = thread(responderThread, ret, g_rings.getResponseInserterId());
}
g_dstates.modify([ret](servers_t& servers) { servers.push_back(ret); });
}
for(auto& cs : toLaunch) {
if (cs->udpFD >= 0) {
- thread t1(udpClientThread, cs);
+ thread t1(udpClientThread, cs, g_rings.getQueryInserterId());
if (!cs->cpus.empty()) {
mapThreadToCPUList(t1.native_handle(), cs->cpus);
}
};
struct Rings {
- Rings(size_t capacity=10000)
- {
- queryRing.set_capacity(capacity);
- respRing.set_capacity(capacity);
- pthread_rwlock_init(&queryLock, nullptr);
- }
struct Query
{
struct timespec when;
uint16_t qtype;
struct dnsheader dh;
};
- boost::circular_buffer<Query> queryRing;
struct Response
{
struct timespec when;
struct dnsheader dh;
ComboAddress ds; // who handled it
};
- boost::circular_buffer<Response> respRing;
- std::mutex respMutex;
- pthread_rwlock_t queryLock;
+ struct Shard
+ {
+ boost::circular_buffer<Query> queryRing;
+ boost::circular_buffer<Response> respRing;
+ pthread_rwlock_t queryLock;
+ pthread_rwlock_t respLock;
+ };
+
+ Rings(size_t capacity=10000, size_t numberOfShards=1): d_numberOfShards(numberOfShards)
+ {
+ setCapacity(capacity, numberOfShards);
+ }
std::unordered_map<int, vector<boost::variant<string,double> > > getTopBandwidth(unsigned int numentries);
size_t numDistinctRequestors();
- void setCapacity(size_t newCapacity)
+ void setCapacity(size_t newCapacity, size_t numberOfShards)
{
- {
- WriteLock wl(&queryLock);
- queryRing.set_capacity(newCapacity);
+ if (numberOfShards < d_numberOfShards) {
+ throw std::runtime_error("Decreasing the number of shards in the query and response rings is not supported");
}
- {
- std::lock_guard<std::mutex> lock(respMutex);
- respRing.set_capacity(newCapacity);
+
+ d_shards.resize(numberOfShards);
+
+ /* set up the locks for the new shards */
+ for (size_t idx = d_numberOfShards; idx < numberOfShards; idx++) {
+ pthread_rwlock_init(&d_shards[idx].queryLock, 0);
+ pthread_rwlock_init(&d_shards[idx].respLock, 0);
+ }
+
+ d_numberOfShards = numberOfShards;
+
+ /* resize all the rings */
+ for (size_t idx = 0; idx < numberOfShards; idx++) {
+ {
+ WriteLock wl(&d_shards[idx].queryLock);
+ d_shards[idx].queryRing.set_capacity(newCapacity / numberOfShards);
+ }
+ {
+ WriteLock wl(&d_shards[idx].respLock);
+ d_shards[idx].respRing.set_capacity(newCapacity / numberOfShards);
+ }
}
}
+ size_t getQueryInserterId()
+ {
+ return s_queryInserterId++;
+ }
+ size_t getResponseInserterId()
+ {
+ return s_responseInserterId++;
+ }
+ size_t getNumberOfShards() const
+ {
+ return d_numberOfShards;
+ }
+
+ void insertQuery(const struct timespec& when, const ComboAddress& requestor, const DNSName& name, uint16_t qtype, uint16_t size, const struct dnsheader& dh, size_t queryInserterId)
+ {
+ auto shardId = getShardId(queryInserterId);
+ WriteLock wl(&d_shards[shardId].queryLock);
+ d_shards[shardId].queryRing.push_back({when, requestor, name, size, qtype, dh});
+ }
+
+ void insertResponse(const struct timespec& when, const ComboAddress& requestor, const DNSName& name, uint16_t qtype, unsigned int usec, unsigned int size, const struct dnsheader& dh, const ComboAddress& backend, size_t responseInserterId)
+ {
+ auto shardId = getShardId(responseInserterId);
+ WriteLock wl(&d_shards[shardId].respLock);
+ d_shards[shardId].respRing.push_back({when, requestor, name, qtype, usec, size, dh, backend});
+ }
+
+ std::vector<Shard> d_shards;
+
+private:
+ size_t getShardId(size_t id) const
+ {
+ return (id % d_numberOfShards);
+ }
+
+ std::atomic<size_t> s_queryInserterId{0};
+ std::atomic<size_t> s_responseInserterId{0};
+ size_t d_numberOfShards;
};
extern Rings g_rings;
template <class T> using NumberedVector = std::vector<std::pair<unsigned int, T> >;
-void* responderThread(std::shared_ptr<DownstreamState> state);
+void* responderThread(std::shared_ptr<DownstreamState> state, size_t responseInserterId);
extern std::mutex g_luamutex;
extern LuaContext g_lua;
extern std::string g_outputBuffer; // locking for this is ok, as locked by g_luamutex
void resetLuaSideEffect(); // reset to indeterminate state
bool responseContentMatches(const char* response, const uint16_t responseLen, const DNSName& qname, const uint16_t qtype, const uint16_t qclass, const ComboAddress& remote);
-bool processQuery(LocalHolders& holders, DNSQuestion& dq, string& poolname, int* delayMsec, const struct timespec& now);
+bool processQuery(LocalHolders& holders, DNSQuestion& dq, string& poolname, int* delayMsec, const struct timespec& now, size_t queryInserterId);
bool processResponse(LocalStateHolder<vector<DNSDistResponseRuleAction> >& localRespRulactions, DNSResponse& dr, int* delayMsec);
bool fixUpResponse(char** response, uint16_t* responseLen, size_t* responseSize, const DNSName& qname, uint16_t origFlags, bool ednsAdded, bool ecsAdded, std::vector<uint8_t>& rewrittenResponse, uint16_t addRoom);
void restoreFlags(struct dnsheader* dh, uint16_t origFlags);