]> granicus.if.org Git - pdns/commitdiff
dnsdist: Instead of using rings IDs, let's do try-lock round-robin
authorRemi Gacogne <remi.gacogne@powerdns.com>
Mon, 19 Mar 2018 15:20:35 +0000 (16:20 +0100)
committerRemi Gacogne <remi.gacogne@powerdns.com>
Thu, 29 Mar 2018 09:37:51 +0000 (11:37 +0200)
pdns/dnsdist-console.cc
pdns/dnsdist-lua.cc
pdns/dnsdist-rings.cc
pdns/dnsdist-tcp.cc
pdns/dnsdist.cc
pdns/dnsdist.hh

index 31d4d74feea8d8fcf30daf01731a5e7d8e195bf3..50d48a014832a7da52994e8e4343d6ccb782f138 100644 (file)
@@ -402,6 +402,7 @@ const std::vector<ConsoleKeyword> g_consoleKeywords{
   { "setPoolServerPolicy", true, "name, func, pool", "set the server selection policy for this pool to one named 'name' and provided by 'function'" },
   { "setQueryCount", true, "bool", "set whether queries should be counted" },
   { "setQueryCountFilter", true, "func", "filter queries that would be counted, where `func` is a function with parameter `dq` which decides whether a query should and how it should be counted" },
+  { "setRingBuffersLockRetries", true, "n", "set the number of attempts to get a non-blocking lock to a ringbuffer shard before blocking" },
   { "setRingBuffersSize", true, "n", "set the capacity of the ringbuffers used for live traffic inspection to `n`" },
   { "setRules", true, "list of rules", "replace the current rules with the supplied list of pairs of DNS Rules and DNS Actions (see `newRuleAction()`)" },
   { "setServerPolicy", true, "policy", "set server selection policy to that policy" },
index 58078e7bf4357a074c60db38d5212a49bac8b9f3..0198ef21631da552e885017c2b69b0d2fc667c85 100644 (file)
@@ -325,14 +325,14 @@ void setupLuaConfig(bool client)
                        if (ret->connected) {
                          if(g_launchWork) {
                            g_launchWork->push_back([ret,cpus]() {
-                              ret->tid = thread(responderThread, ret, g_rings.getResponseInserterId());
+                              ret->tid = thread(responderThread, ret);
                               if (!cpus.empty()) {
                                 mapThreadToCPUList(ret->tid.native_handle(), cpus);
                               }
                            });
                          }
                          else {
-                            ret->tid = thread(responderThread, ret, g_rings.getResponseInserterId());
+                            ret->tid = thread(responderThread, ret);
                             if (!cpus.empty()) {
                               mapThreadToCPUList(ret->tid.native_handle(), cpus);
                             }
@@ -1308,6 +1308,11 @@ void setupLuaConfig(bool client)
       g_rings.setCapacity(capacity, numberOfShards ? *numberOfShards : 1);
     });
 
+  g_lua.writeFunction("setRingBuffersLockRetries", [](size_t retries) {
+      setLuaSideEffect();
+      g_rings.setNumberOfLockRetries(retries);
+    });
+
   g_lua.writeFunction("setWHashedPertubation", [](uint32_t pertub) {
       setLuaSideEffect();
       g_hashperturb = pertub;
index d3cc5f43f0ab8da4f0789666b0bc815c5fabcbbb..4288cd05ce185ee15137e0869dd706270750be3a 100644 (file)
@@ -22,9 +22,6 @@
 #include "dnsdist.hh"
 #include "lock.hh"
 
-std::atomic<size_t> Rings::s_queryInserterId;
-std::atomic<size_t> Rings::s_responseInserterId;
-
 size_t Rings::numDistinctRequestors()
 {
   std::set<ComboAddress, ComboAddress::addressOnlyLessThan> s;
index 266aac8bdeccaaffc8d0c6ceb22d0cd9b0c36039..b8163847753db6a339642671fcc9d4535b77a932 100644 (file)
@@ -235,7 +235,6 @@ void* tcpClientThread(int pipefd)
   /* we get launched with a pipe on which we receive file descriptors from clients that we own
      from that point on */
 
-  const auto queryInserterId = g_rings.getQueryInserterId();
   bool outstanding = false;
   time_t lastTCPCleanup = time(nullptr);
   
@@ -359,7 +358,7 @@ void* tcpClientThread(int pipefd)
        DNSName qname(query, qlen, sizeof(dnsheader), false, &qtype, &qclass, &consumed);
        DNSQuestion dq(&qname, qtype, qclass, &dest, &ci.remote, dh, queryBuffer.capacity(), qlen, true, &queryRealTime);
 
-       if (!processQuery(holders, dq, poolname, &delayMsec, now, queryInserterId)) {
+       if (!processQuery(holders, dq, poolname, &delayMsec, now)) {
          goto drop;
        }
 
@@ -633,7 +632,7 @@ void* tcpClientThread(int pipefd)
         struct timespec answertime;
         gettime(&answertime);
         unsigned int udiff = 1000000.0*DiffTime(now,answertime);
-        g_rings.insertResponse(answertime, ci.remote, qname, dq.qtype, (unsigned int)udiff, (unsigned int)responseLen, *dh, ds->remote, queryInserterId);
+        g_rings.insertResponse(answertime, ci.remote, qname, dq.qtype, (unsigned int)udiff, (unsigned int)responseLen, *dh, ds->remote);
 
         rewrittenResponse.clear();
       }
index 2a15d291b0eb6dbd2c038d165de2601c2a7eb53c..0381723c58c4195589c9ee4d3e64271855f66214 100644 (file)
@@ -406,7 +406,7 @@ static void pickBackendSocketsReadyForReceiving(const std::shared_ptr<Downstream
 }
 
 // listens on a dedicated socket, lobs answers from downstream servers to original requestors
-void* responderThread(std::shared_ptr<DownstreamState> dss, const size_t responseInserterId)
+void* responderThread(std::shared_ptr<DownstreamState> dss)
 try {
   auto localRespRulactions = g_resprulactions.getLocal();
 #ifdef HAVE_DNSCRYPT
@@ -514,7 +514,7 @@ try {
 
         struct timespec ts;
         gettime(&ts);
-        g_rings.insertResponse(ts, ids->origRemote, ids->qname, ids->qtype, (unsigned int)udiff, (unsigned int)got, *dh, dss->remote, responseInserterId);
+        g_rings.insertResponse(ts, ids->origRemote, ids->qname, ids->qtype, (unsigned int)udiff, (unsigned int)got, *dh, dss->remote);
 
         if(dh->rcode == RCode::ServFail)
           g_stats.servfailResponses++;
@@ -856,9 +856,9 @@ static void spoofResponseFromString(DNSQuestion& dq, const string& spoofContent)
   }
 }
 
-bool processQuery(LocalHolders& holders, DNSQuestion& dq, string& poolname, int* delayMsec, const struct timespec& now, size_t queryInserterId)
+bool processQuery(LocalHolders& holders, DNSQuestion& dq, string& poolname, int* delayMsec, const struct timespec& now)
 {
-  g_rings.insertQuery(now,*dq.remote,*dq.qname,dq.len,dq.qtype,*dq.dh, queryInserterId);
+  g_rings.insertQuery(now,*dq.remote,*dq.qname,dq.len,dq.qtype,*dq.dh);
 
   if(g_qcount.enabled) {
     string qname = (*dq.qname).toString(".");
@@ -1203,7 +1203,7 @@ static void queueResponse(const ClientState& cs, const char* response, uint16_t
 }
 #endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */
 
-static void processUDPQuery(ClientState& cs, LocalHolders& holders, const struct msghdr* msgh, const ComboAddress& remote, ComboAddress& dest, char* query, uint16_t len, size_t queryBufferSize, struct mmsghdr* responsesVect, unsigned int* queuedResponses, struct iovec* respIOV, char* respCBuf, size_t queryInserterId)
+static void processUDPQuery(ClientState& cs, LocalHolders& holders, const struct msghdr* msgh, const ComboAddress& remote, ComboAddress& dest, char* query, uint16_t len, size_t queryBufferSize, struct mmsghdr* responsesVect, unsigned int* queuedResponses, struct iovec* respIOV, char* respCBuf)
 {
   assert(responsesVect == nullptr || (queuedResponses != nullptr && respIOV != nullptr && respCBuf != nullptr));
   uint16_t queryId = 0;
@@ -1245,7 +1245,7 @@ static void processUDPQuery(ClientState& cs, LocalHolders& holders, const struct
     DNSName qname(query, len, sizeof(dnsheader), false, &qtype, &qclass, &consumed);
     DNSQuestion dq(&qname, qtype, qclass, dest.sin4.sin_family != 0 ? &dest : &cs.local, &remote, dh, queryBufferSize, len, false, &queryRealTime);
 
-    if (!processQuery(holders, dq, poolname, &delayMsec, now, queryInserterId))
+    if (!processQuery(holders, dq, poolname, &delayMsec, now))
     {
       return;
     }
@@ -1473,7 +1473,7 @@ static void processUDPQuery(ClientState& cs, LocalHolders& holders, const struct
 }
 
 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
-static void MultipleMessagesUDPClientThread(ClientState* cs, LocalHolders& holders, size_t queryInserterId)
+static void MultipleMessagesUDPClientThread(ClientState* cs, LocalHolders& holders)
 {
   struct MMReceiver
   {
@@ -1534,7 +1534,7 @@ static void MultipleMessagesUDPClientThread(ClientState* cs, LocalHolders& holde
         continue;
       }
 
-      processUDPQuery(*cs, holders, msgh, remote, recvData[msgIdx].dest, recvData[msgIdx].packet, static_cast<uint16_t>(got), sizeof(recvData[msgIdx].packet), outMsgVec.get(), &msgsToSend, &recvData[msgIdx].iov, recvData[msgIdx].cbuf, queryInserterId);
+      processUDPQuery(*cs, holders, msgh, remote, recvData[msgIdx].dest, recvData[msgIdx].packet, static_cast<uint16_t>(got), sizeof(recvData[msgIdx].packet), outMsgVec.get(), &msgsToSend, &recvData[msgIdx].iov, recvData[msgIdx].cbuf);
 
     }
 
@@ -1554,14 +1554,14 @@ static void MultipleMessagesUDPClientThread(ClientState* cs, LocalHolders& holde
 #endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */
 
 // listens to incoming queries, sends out to downstream servers, noting the intended return path
-static void* udpClientThread(ClientState* cs, size_t queryInserterId)
+static void* udpClientThread(ClientState* cs)
 try
 {
   LocalHolders holders;
 
 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
   if (g_udpVectorSize > 1) {
-    MultipleMessagesUDPClientThread(cs, holders, queryInserterId);
+    MultipleMessagesUDPClientThread(cs, holders);
 
   }
   else
@@ -1592,7 +1592,7 @@ try
         continue;
       }
 
-      processUDPQuery(*cs, holders, &msgh, remote, dest, packet, static_cast<uint16_t>(got), s_udpIncomingBufferSize, nullptr, nullptr, nullptr, nullptr, queryInserterId);
+      processUDPQuery(*cs, holders, &msgh, remote, dest, packet, static_cast<uint16_t>(got), s_udpIncomingBufferSize, nullptr, nullptr, nullptr, nullptr);
     }
   }
 
@@ -1801,7 +1801,7 @@ void* healthChecksThread()
               }
             }
             if (dss->connected) {
-              dss->tid = thread(responderThread, dss, g_rings.getResponseInserterId());
+              dss->tid = thread(responderThread, dss);
             }
           }
 
@@ -1845,7 +1845,7 @@ void* healthChecksThread()
           memset(&fake, 0, sizeof(fake));
           fake.id = ids.origID;
 
-          g_rings.insertResponse(ts, ids.origRemote, ids.qname, ids.qtype, std::numeric_limits<unsigned int>::max(), 0, fake, dss->remote, 0);
+          g_rings.insertResponse(ts, ids.origRemote, ids.qname, ids.qtype, std::numeric_limits<unsigned int>::max(), 0, fake, dss->remote);
         }          
       }
     }
@@ -2544,7 +2544,7 @@ try
       auto ret=std::make_shared<DownstreamState>(ComboAddress(address, 53));
       addServerToPool(localPools, "", ret);
       if (ret->connected) {
-        ret->tid = thread(responderThread, ret, g_rings.getResponseInserterId());
+        ret->tid = thread(responderThread, ret);
       }
       g_dstates.modify([ret](servers_t& servers) { servers.push_back(ret); });
     }
@@ -2568,7 +2568,7 @@ try
 
   for(auto& cs : toLaunch) {
     if (cs->udpFD >= 0) {
-      thread t1(udpClientThread, cs, g_rings.getQueryInserterId());
+      thread t1(udpClientThread, cs);
       if (!cs->cpus.empty()) {
         mapThreadToCPUList(t1.native_handle(), cs->cpus);
       }
index 55b9db48da572a4831765208380ca708ff9e103a..67b41ee8c4347532f7834c5e779e5bbe47a77697 100644 (file)
@@ -405,9 +405,12 @@ struct Rings {
     std::mutex respLock;
   };
 
-  Rings(size_t capacity=10000, size_t numberOfShards=1): d_numberOfShards(numberOfShards)
+  Rings(size_t capacity=10000, size_t numberOfShards=1, size_t nbLockTries=5): d_numberOfShards(numberOfShards), d_nbLockTries(nbLockTries)
   {
     setCapacity(capacity, numberOfShards);
+    if (numberOfShards <= 1) {
+      d_nbLockTries = 0;
+    }
   }
   std::unordered_map<int, vector<boost::variant<string,double> > > getTopBandwidth(unsigned int numentries);
   size_t numDistinctRequestors();
@@ -434,14 +437,13 @@ struct Rings {
     }
   }
 
-  static size_t getQueryInserterId()
-  {
-    return s_queryInserterId++;
-  }
-
-  static size_t getResponseInserterId()
+  void setNumberOfLockRetries(size_t retries)
   {
-    return s_responseInserterId++;
+    if (d_numberOfShards <= 1) {
+      d_nbLockTries = 0;
+    } else {
+      d_nbLockTries = retries;
+    }
   }
 
   size_t getNumberOfShards() const
@@ -449,16 +451,35 @@ struct Rings {
     return d_numberOfShards;
   }
 
-  void insertQuery(const struct timespec& when, const ComboAddress& requestor, const DNSName& name, uint16_t qtype, uint16_t size, const struct dnsheader& dh, size_t queryInserterId)
+  void insertQuery(const struct timespec& when, const ComboAddress& requestor, const DNSName& name, uint16_t qtype, uint16_t size, const struct dnsheader& dh)
   {
-    auto shardId = getShardId(queryInserterId);
+    for (size_t idx = 0; idx < d_nbLockTries; idx++) {
+      auto shardId = getShardId();
+      std::unique_lock<std::mutex> wl(d_shards[shardId]->queryLock, std::try_to_lock);
+      if (wl.owns_lock()) {
+        d_shards[shardId]->queryRing.push_back({when, requestor, name, size, qtype, dh});
+        return;
+      }
+    }
+
+    /* out of luck, let's just wait */
+    auto shardId = getShardId();
     std::lock_guard<std::mutex> wl(d_shards[shardId]->queryLock);
     d_shards[shardId]->queryRing.push_back({when, requestor, name, size, qtype, dh});
   }
 
-  void insertResponse(const struct timespec& when, const ComboAddress& requestor, const DNSName& name, uint16_t qtype, unsigned int usec, unsigned int size, const struct dnsheader& dh, const ComboAddress& backend, size_t responseInserterId)
+  void insertResponse(const struct timespec& when, const ComboAddress& requestor, const DNSName& name, uint16_t qtype, unsigned int usec, unsigned int size, const struct dnsheader& dh, const ComboAddress& backend)
   {
-    auto shardId = getShardId(responseInserterId);
+    for (size_t idx = 0; idx < d_nbLockTries; idx++) {
+      auto shardId = getShardId();
+      std::unique_lock<std::mutex> wl(d_shards[shardId]->respLock, std::try_to_lock);
+      if (wl.owns_lock()) {
+        d_shards[shardId]->respRing.push_back({when, requestor, name, qtype, usec, size, dh, backend});
+      }
+    }
+
+    /* out of luck, let's just wait */
+    auto shardId = getShardId();
     std::lock_guard<std::mutex> wl(d_shards[shardId]->respLock);
     d_shards[shardId]->respRing.push_back({when, requestor, name, qtype, usec, size, dh, backend});
   }
@@ -466,15 +487,16 @@ struct Rings {
   std::vector<std::unique_ptr<Shard> > d_shards;
 
 private:
-  size_t getShardId(size_t id) const
+  size_t getShardId()
   {
-    return (id % d_numberOfShards);
+    return (d_currentShardId++ % d_numberOfShards);
   }
 
-  static std::atomic<size_t> s_queryInserterId;
-  static std::atomic<size_t> s_responseInserterId;
+  std::atomic<size_t> d_currentShardId;
 
   size_t d_numberOfShards;
+  size_t d_nbLockTries = 5;
+
 };
 
 extern Rings g_rings;
@@ -688,7 +710,7 @@ using servers_t =vector<std::shared_ptr<DownstreamState>>;
 
 template <class T> using NumberedVector = std::vector<std::pair<unsigned int, T> >;
 
-void* responderThread(std::shared_ptr<DownstreamState> state, size_t responseInserterId);
+void* responderThread(std::shared_ptr<DownstreamState> state);
 extern std::mutex g_luamutex;
 extern LuaContext g_lua;
 extern std::string g_outputBuffer; // locking for this is ok, as locked by g_luamutex
@@ -923,7 +945,7 @@ bool getLuaNoSideEffect(); // set if there were only explicit declarations of _n
 void resetLuaSideEffect(); // reset to indeterminate state
 
 bool responseContentMatches(const char* response, const uint16_t responseLen, const DNSName& qname, const uint16_t qtype, const uint16_t qclass, const ComboAddress& remote);
-bool processQuery(LocalHolders& holders, DNSQuestion& dq, string& poolname, int* delayMsec, const struct timespec& now, size_t queryInserterId);
+bool processQuery(LocalHolders& holders, DNSQuestion& dq, string& poolname, int* delayMsec, const struct timespec& now);
 bool processResponse(LocalStateHolder<vector<DNSDistResponseRuleAction> >& localRespRulactions, DNSResponse& dr, int* delayMsec);
 bool fixUpResponse(char** response, uint16_t* responseLen, size_t* responseSize, const DNSName& qname, uint16_t origFlags, bool ednsAdded, bool ecsAdded, std::vector<uint8_t>& rewrittenResponse, uint16_t addRoom);
 void restoreFlags(struct dnsheader* dh, uint16_t origFlags);