From: Remi Gacogne Date: Mon, 19 Mar 2018 15:20:35 +0000 (+0100) Subject: dnsdist: Instead of using rings IDs, let's do try-lock round-robin X-Git-Tag: dnsdist-1.3.0~1^2~10 X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=6d31c8b66f2d2071c06df4911f00695b91022d30;p=pdns dnsdist: Instead of using rings IDs, let's do try-lock round-robin --- diff --git a/pdns/dnsdist-console.cc b/pdns/dnsdist-console.cc index 31d4d74fe..50d48a014 100644 --- a/pdns/dnsdist-console.cc +++ b/pdns/dnsdist-console.cc @@ -402,6 +402,7 @@ const std::vector g_consoleKeywords{ { "setPoolServerPolicy", true, "name, func, pool", "set the server selection policy for this pool to one named 'name' and provided by 'function'" }, { "setQueryCount", true, "bool", "set whether queries should be counted" }, { "setQueryCountFilter", true, "func", "filter queries that would be counted, where `func` is a function with parameter `dq` which decides whether a query should and how it should be counted" }, + { "setRingBuffersLockRetries", true, "n", "set the number of attempts to get a non-blocking lock to a ringbuffer shard before blocking" }, { "setRingBuffersSize", true, "n", "set the capacity of the ringbuffers used for live traffic inspection to `n`" }, { "setRules", true, "list of rules", "replace the current rules with the supplied list of pairs of DNS Rules and DNS Actions (see `newRuleAction()`)" }, { "setServerPolicy", true, "policy", "set server selection policy to that policy" }, diff --git a/pdns/dnsdist-lua.cc b/pdns/dnsdist-lua.cc index 58078e7bf..0198ef216 100644 --- a/pdns/dnsdist-lua.cc +++ b/pdns/dnsdist-lua.cc @@ -325,14 +325,14 @@ void setupLuaConfig(bool client) if (ret->connected) { if(g_launchWork) { g_launchWork->push_back([ret,cpus]() { - ret->tid = thread(responderThread, ret, g_rings.getResponseInserterId()); + ret->tid = thread(responderThread, ret); if (!cpus.empty()) { mapThreadToCPUList(ret->tid.native_handle(), cpus); } }); } else { - ret->tid = thread(responderThread, ret, g_rings.getResponseInserterId()); + ret->tid = thread(responderThread, ret); if (!cpus.empty()) { mapThreadToCPUList(ret->tid.native_handle(), cpus); } @@ -1308,6 +1308,11 @@ void setupLuaConfig(bool client) g_rings.setCapacity(capacity, numberOfShards ? *numberOfShards : 1); }); + g_lua.writeFunction("setRingBuffersLockRetries", [](size_t retries) { + setLuaSideEffect(); + g_rings.setNumberOfLockRetries(retries); + }); + g_lua.writeFunction("setWHashedPertubation", [](uint32_t pertub) { setLuaSideEffect(); g_hashperturb = pertub; diff --git a/pdns/dnsdist-rings.cc b/pdns/dnsdist-rings.cc index d3cc5f43f..4288cd05c 100644 --- a/pdns/dnsdist-rings.cc +++ b/pdns/dnsdist-rings.cc @@ -22,9 +22,6 @@ #include "dnsdist.hh" #include "lock.hh" -std::atomic Rings::s_queryInserterId; -std::atomic Rings::s_responseInserterId; - size_t Rings::numDistinctRequestors() { std::set s; diff --git a/pdns/dnsdist-tcp.cc b/pdns/dnsdist-tcp.cc index 266aac8bd..b81638477 100644 --- a/pdns/dnsdist-tcp.cc +++ b/pdns/dnsdist-tcp.cc @@ -235,7 +235,6 @@ void* tcpClientThread(int pipefd) /* we get launched with a pipe on which we receive file descriptors from clients that we own from that point on */ - const auto queryInserterId = g_rings.getQueryInserterId(); bool outstanding = false; time_t lastTCPCleanup = time(nullptr); @@ -359,7 +358,7 @@ void* tcpClientThread(int pipefd) DNSName qname(query, qlen, sizeof(dnsheader), false, &qtype, &qclass, &consumed); DNSQuestion dq(&qname, qtype, qclass, &dest, &ci.remote, dh, queryBuffer.capacity(), qlen, true, &queryRealTime); - if (!processQuery(holders, dq, poolname, &delayMsec, now, queryInserterId)) { + if (!processQuery(holders, dq, poolname, &delayMsec, now)) { goto drop; } @@ -633,7 +632,7 @@ void* tcpClientThread(int pipefd) struct timespec answertime; gettime(&answertime); unsigned int udiff = 1000000.0*DiffTime(now,answertime); - g_rings.insertResponse(answertime, ci.remote, qname, dq.qtype, (unsigned int)udiff, (unsigned int)responseLen, *dh, ds->remote, queryInserterId); + g_rings.insertResponse(answertime, ci.remote, qname, dq.qtype, (unsigned int)udiff, (unsigned int)responseLen, *dh, ds->remote); rewrittenResponse.clear(); } diff --git a/pdns/dnsdist.cc b/pdns/dnsdist.cc index 2a15d291b..0381723c5 100644 --- a/pdns/dnsdist.cc +++ b/pdns/dnsdist.cc @@ -406,7 +406,7 @@ static void pickBackendSocketsReadyForReceiving(const std::shared_ptr dss, const size_t responseInserterId) +void* responderThread(std::shared_ptr dss) try { auto localRespRulactions = g_resprulactions.getLocal(); #ifdef HAVE_DNSCRYPT @@ -514,7 +514,7 @@ try { struct timespec ts; gettime(&ts); - g_rings.insertResponse(ts, ids->origRemote, ids->qname, ids->qtype, (unsigned int)udiff, (unsigned int)got, *dh, dss->remote, responseInserterId); + g_rings.insertResponse(ts, ids->origRemote, ids->qname, ids->qtype, (unsigned int)udiff, (unsigned int)got, *dh, dss->remote); if(dh->rcode == RCode::ServFail) g_stats.servfailResponses++; @@ -856,9 +856,9 @@ static void spoofResponseFromString(DNSQuestion& dq, const string& spoofContent) } } -bool processQuery(LocalHolders& holders, DNSQuestion& dq, string& poolname, int* delayMsec, const struct timespec& now, size_t queryInserterId) +bool processQuery(LocalHolders& holders, DNSQuestion& dq, string& poolname, int* delayMsec, const struct timespec& now) { - g_rings.insertQuery(now,*dq.remote,*dq.qname,dq.len,dq.qtype,*dq.dh, queryInserterId); + g_rings.insertQuery(now,*dq.remote,*dq.qname,dq.len,dq.qtype,*dq.dh); if(g_qcount.enabled) { string qname = (*dq.qname).toString("."); @@ -1203,7 +1203,7 @@ static void queueResponse(const ClientState& cs, const char* response, uint16_t } #endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */ -static void processUDPQuery(ClientState& cs, LocalHolders& holders, const struct msghdr* msgh, const ComboAddress& remote, ComboAddress& dest, char* query, uint16_t len, size_t queryBufferSize, struct mmsghdr* responsesVect, unsigned int* queuedResponses, struct iovec* respIOV, char* respCBuf, size_t queryInserterId) +static void processUDPQuery(ClientState& cs, LocalHolders& holders, const struct msghdr* msgh, const ComboAddress& remote, ComboAddress& dest, char* query, uint16_t len, size_t queryBufferSize, struct mmsghdr* responsesVect, unsigned int* queuedResponses, struct iovec* respIOV, char* respCBuf) { assert(responsesVect == nullptr || (queuedResponses != nullptr && respIOV != nullptr && respCBuf != nullptr)); uint16_t queryId = 0; @@ -1245,7 +1245,7 @@ static void processUDPQuery(ClientState& cs, LocalHolders& holders, const struct DNSName qname(query, len, sizeof(dnsheader), false, &qtype, &qclass, &consumed); DNSQuestion dq(&qname, qtype, qclass, dest.sin4.sin_family != 0 ? &dest : &cs.local, &remote, dh, queryBufferSize, len, false, &queryRealTime); - if (!processQuery(holders, dq, poolname, &delayMsec, now, queryInserterId)) + if (!processQuery(holders, dq, poolname, &delayMsec, now)) { return; } @@ -1473,7 +1473,7 @@ static void processUDPQuery(ClientState& cs, LocalHolders& holders, const struct } #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) -static void MultipleMessagesUDPClientThread(ClientState* cs, LocalHolders& holders, size_t queryInserterId) +static void MultipleMessagesUDPClientThread(ClientState* cs, LocalHolders& holders) { struct MMReceiver { @@ -1534,7 +1534,7 @@ static void MultipleMessagesUDPClientThread(ClientState* cs, LocalHolders& holde continue; } - processUDPQuery(*cs, holders, msgh, remote, recvData[msgIdx].dest, recvData[msgIdx].packet, static_cast(got), sizeof(recvData[msgIdx].packet), outMsgVec.get(), &msgsToSend, &recvData[msgIdx].iov, recvData[msgIdx].cbuf, queryInserterId); + processUDPQuery(*cs, holders, msgh, remote, recvData[msgIdx].dest, recvData[msgIdx].packet, static_cast(got), sizeof(recvData[msgIdx].packet), outMsgVec.get(), &msgsToSend, &recvData[msgIdx].iov, recvData[msgIdx].cbuf); } @@ -1554,14 +1554,14 @@ static void MultipleMessagesUDPClientThread(ClientState* cs, LocalHolders& holde #endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */ // listens to incoming queries, sends out to downstream servers, noting the intended return path -static void* udpClientThread(ClientState* cs, size_t queryInserterId) +static void* udpClientThread(ClientState* cs) try { LocalHolders holders; #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) if (g_udpVectorSize > 1) { - MultipleMessagesUDPClientThread(cs, holders, queryInserterId); + MultipleMessagesUDPClientThread(cs, holders); } else @@ -1592,7 +1592,7 @@ try continue; } - processUDPQuery(*cs, holders, &msgh, remote, dest, packet, static_cast(got), s_udpIncomingBufferSize, nullptr, nullptr, nullptr, nullptr, queryInserterId); + processUDPQuery(*cs, holders, &msgh, remote, dest, packet, static_cast(got), s_udpIncomingBufferSize, nullptr, nullptr, nullptr, nullptr); } } @@ -1801,7 +1801,7 @@ void* healthChecksThread() } } if (dss->connected) { - dss->tid = thread(responderThread, dss, g_rings.getResponseInserterId()); + dss->tid = thread(responderThread, dss); } } @@ -1845,7 +1845,7 @@ void* healthChecksThread() memset(&fake, 0, sizeof(fake)); fake.id = ids.origID; - g_rings.insertResponse(ts, ids.origRemote, ids.qname, ids.qtype, std::numeric_limits::max(), 0, fake, dss->remote, 0); + g_rings.insertResponse(ts, ids.origRemote, ids.qname, ids.qtype, std::numeric_limits::max(), 0, fake, dss->remote); } } } @@ -2544,7 +2544,7 @@ try auto ret=std::make_shared(ComboAddress(address, 53)); addServerToPool(localPools, "", ret); if (ret->connected) { - ret->tid = thread(responderThread, ret, g_rings.getResponseInserterId()); + ret->tid = thread(responderThread, ret); } g_dstates.modify([ret](servers_t& servers) { servers.push_back(ret); }); } @@ -2568,7 +2568,7 @@ try for(auto& cs : toLaunch) { if (cs->udpFD >= 0) { - thread t1(udpClientThread, cs, g_rings.getQueryInserterId()); + thread t1(udpClientThread, cs); if (!cs->cpus.empty()) { mapThreadToCPUList(t1.native_handle(), cs->cpus); } diff --git a/pdns/dnsdist.hh b/pdns/dnsdist.hh index 55b9db48d..67b41ee8c 100644 --- a/pdns/dnsdist.hh +++ b/pdns/dnsdist.hh @@ -405,9 +405,12 @@ struct Rings { std::mutex respLock; }; - Rings(size_t capacity=10000, size_t numberOfShards=1): d_numberOfShards(numberOfShards) + Rings(size_t capacity=10000, size_t numberOfShards=1, size_t nbLockTries=5): d_numberOfShards(numberOfShards), d_nbLockTries(nbLockTries) { setCapacity(capacity, numberOfShards); + if (numberOfShards <= 1) { + d_nbLockTries = 0; + } } std::unordered_map > > getTopBandwidth(unsigned int numentries); size_t numDistinctRequestors(); @@ -434,14 +437,13 @@ struct Rings { } } - static size_t getQueryInserterId() - { - return s_queryInserterId++; - } - - static size_t getResponseInserterId() + void setNumberOfLockRetries(size_t retries) { - return s_responseInserterId++; + if (d_numberOfShards <= 1) { + d_nbLockTries = 0; + } else { + d_nbLockTries = retries; + } } size_t getNumberOfShards() const @@ -449,16 +451,35 @@ struct Rings { return d_numberOfShards; } - void insertQuery(const struct timespec& when, const ComboAddress& requestor, const DNSName& name, uint16_t qtype, uint16_t size, const struct dnsheader& dh, size_t queryInserterId) + void insertQuery(const struct timespec& when, const ComboAddress& requestor, const DNSName& name, uint16_t qtype, uint16_t size, const struct dnsheader& dh) { - auto shardId = getShardId(queryInserterId); + for (size_t idx = 0; idx < d_nbLockTries; idx++) { + auto shardId = getShardId(); + std::unique_lock wl(d_shards[shardId]->queryLock, std::try_to_lock); + if (wl.owns_lock()) { + d_shards[shardId]->queryRing.push_back({when, requestor, name, size, qtype, dh}); + return; + } + } + + /* out of luck, let's just wait */ + auto shardId = getShardId(); std::lock_guard wl(d_shards[shardId]->queryLock); d_shards[shardId]->queryRing.push_back({when, requestor, name, size, qtype, dh}); } - void insertResponse(const struct timespec& when, const ComboAddress& requestor, const DNSName& name, uint16_t qtype, unsigned int usec, unsigned int size, const struct dnsheader& dh, const ComboAddress& backend, size_t responseInserterId) + void insertResponse(const struct timespec& when, const ComboAddress& requestor, const DNSName& name, uint16_t qtype, unsigned int usec, unsigned int size, const struct dnsheader& dh, const ComboAddress& backend) { - auto shardId = getShardId(responseInserterId); + for (size_t idx = 0; idx < d_nbLockTries; idx++) { + auto shardId = getShardId(); + std::unique_lock wl(d_shards[shardId]->respLock, std::try_to_lock); + if (wl.owns_lock()) { + d_shards[shardId]->respRing.push_back({when, requestor, name, qtype, usec, size, dh, backend}); + } + } + + /* out of luck, let's just wait */ + auto shardId = getShardId(); std::lock_guard wl(d_shards[shardId]->respLock); d_shards[shardId]->respRing.push_back({when, requestor, name, qtype, usec, size, dh, backend}); } @@ -466,15 +487,16 @@ struct Rings { std::vector > d_shards; private: - size_t getShardId(size_t id) const + size_t getShardId() { - return (id % d_numberOfShards); + return (d_currentShardId++ % d_numberOfShards); } - static std::atomic s_queryInserterId; - static std::atomic s_responseInserterId; + std::atomic d_currentShardId; size_t d_numberOfShards; + size_t d_nbLockTries = 5; + }; extern Rings g_rings; @@ -688,7 +710,7 @@ using servers_t =vector>; template using NumberedVector = std::vector >; -void* responderThread(std::shared_ptr state, size_t responseInserterId); +void* responderThread(std::shared_ptr state); extern std::mutex g_luamutex; extern LuaContext g_lua; extern std::string g_outputBuffer; // locking for this is ok, as locked by g_luamutex @@ -923,7 +945,7 @@ bool getLuaNoSideEffect(); // set if there were only explicit declarations of _n void resetLuaSideEffect(); // reset to indeterminate state bool responseContentMatches(const char* response, const uint16_t responseLen, const DNSName& qname, const uint16_t qtype, const uint16_t qclass, const ComboAddress& remote); -bool processQuery(LocalHolders& holders, DNSQuestion& dq, string& poolname, int* delayMsec, const struct timespec& now, size_t queryInserterId); +bool processQuery(LocalHolders& holders, DNSQuestion& dq, string& poolname, int* delayMsec, const struct timespec& now); bool processResponse(LocalStateHolder >& localRespRulactions, DNSResponse& dr, int* delayMsec); bool fixUpResponse(char** response, uint16_t* responseLen, size_t* responseSize, const DNSName& qname, uint16_t origFlags, bool ednsAdded, bool ecsAdded, std::vector& rewrittenResponse, uint16_t addRoom); void restoreFlags(struct dnsheader* dh, uint16_t origFlags);