]> granicus.if.org Git - pdns/commitdiff
dnsdist: Ring buffers sharding
authorRemi Gacogne <remi.gacogne@powerdns.com>
Tue, 16 Jan 2018 15:59:38 +0000 (16:59 +0100)
committerRemi Gacogne <remi.gacogne@powerdns.com>
Thu, 29 Mar 2018 09:37:51 +0000 (11:37 +0200)
pdns/dnsdist-lua-inspection.cc
pdns/dnsdist-lua.cc
pdns/dnsdist-rings.cc
pdns/dnsdist-tcp.cc
pdns/dnsdist.cc
pdns/dnsdist.hh

index d263bdecc239a2736a7e987c049f6d6626db9d73..3cf82e460fcc93679c6a06a2fcead480bd8693bd 100644 (file)
@@ -31,26 +31,27 @@ static std::unordered_map<unsigned int, vector<boost::variant<string,double>>> g
   map<DNSName, unsigned int> counts;
   unsigned int total=0;
   {
-    std::lock_guard<std::mutex> lock(g_rings.respMutex);
-    if(!labels) {
-      for(const auto& a : g_rings.respRing) {
-        if(!pred(a))
-          continue;
-        counts[a.name]++;
-        total++;
-      }
-    }
-    else {
-      unsigned int lab = *labels;
-      for(auto a : g_rings.respRing) {
-        if(!pred(a))
-          continue;
-
-        a.name.trimToLabels(lab);
-        counts[a.name]++;
-        total++;
+    for (size_t idx = 0; idx < g_rings.getNumberOfShards(); idx++) {
+      ReadLock rl(&g_rings.d_shards[idx].respLock);
+      if(!labels) {
+        for(const auto& a : g_rings.d_shards[idx].respRing) {
+          if(!pred(a))
+            continue;
+          counts[a.name]++;
+          total++;
+        }
       }
+      else {
+        unsigned int lab = *labels;
+        for(auto a : g_rings.d_shards[idx].respRing) {
+          if(!pred(a))
+            continue;
 
+          a.name.trimToLabels(lab);
+          counts[a.name]++;
+          total++;
+        }
+      }
     }
   }
   //      cout<<"Looked at "<<total<<" responses, "<<counts.size()<<" different ones"<<endl;
@@ -104,9 +105,10 @@ static void statNodeRespRing(statvisitor_t visitor, unsigned int seconds)
   cutoff.tv_sec -= seconds;
 
   StatNode root;
-  {
-    std::lock_guard<std::mutex> lock(g_rings.respMutex);
-    for(const auto& c : g_rings.respRing) {
+  for (size_t idx = 0; idx < g_rings.getNumberOfShards(); idx++) {
+    ReadLock rl(&g_rings.d_shards[idx].respLock);
+
+    for(const auto& c : g_rings.d_shards[idx].respRing) {
       if (now < c.when)
         continue;
 
@@ -126,18 +128,22 @@ static vector<pair<unsigned int, std::unordered_map<string,string> > > getRespRi
 {
   typedef std::unordered_map<string,string>  entry_t;
   vector<pair<unsigned int, entry_t > > ret;
-  std::lock_guard<std::mutex> lock(g_rings.respMutex);
-
-  entry_t e;
-  unsigned int count=1;
-  for(const auto& c : g_rings.respRing) {
-    if(rcode && (rcode.get() != c.dh.rcode))
-      continue;
-    e["qname"]=c.name.toString();
-    e["rcode"]=std::to_string(c.dh.rcode);
-    ret.push_back(std::make_pair(count,e));
-    count++;
+
+  for (size_t idx = 0; idx < g_rings.getNumberOfShards(); idx++) {
+    ReadLock rl(&g_rings.d_shards[idx].respLock);
+
+    entry_t e;
+    unsigned int count=1;
+    for(const auto& c : g_rings.d_shards[idx].respRing) {
+      if(rcode && (rcode.get() != c.dh.rcode))
+        continue;
+      e["qname"]=c.name.toString();
+      e["rcode"]=std::to_string(c.dh.rcode);
+      ret.push_back(std::make_pair(count,e));
+      count++;
+    }
   }
+
   return ret;
 }
 
@@ -149,10 +155,18 @@ static counts_t exceedRespGen(unsigned int rate, int seconds, std::function<void
   cutoff = mintime = now;
   cutoff.tv_sec -= seconds;
 
-  {
-    std::lock_guard<std::mutex> lock(g_rings.respMutex);
-    counts.reserve(g_rings.respRing.size());
-    for(const auto& c : g_rings.respRing) {
+  size_t total = 0;
+  for (size_t idx = 0; idx < g_rings.getNumberOfShards(); idx++) {
+    ReadLock rl(&g_rings.d_shards[idx].respLock);
+    total += g_rings.d_shards[idx].respRing.size();
+  }
+
+  counts.reserve(total);
+
+  for (size_t idx = 0; idx < g_rings.getNumberOfShards(); idx++) {
+    ReadLock rl(&g_rings.d_shards[idx].respLock);
+    for(const auto& c : g_rings.d_shards[idx].respRing) {
+
       if(seconds && c.when < cutoff)
         continue;
       if(now < c.when)
@@ -176,10 +190,17 @@ static counts_t exceedQueryGen(unsigned int rate, int seconds, std::function<voi
   cutoff = mintime = now;
   cutoff.tv_sec -= seconds;
 
-  {
-    ReadLock rl(&g_rings.queryLock);
-    counts.reserve(g_rings.queryRing.size());
-    for(const auto& c : g_rings.queryRing) {
+  size_t total = 0;
+  for (size_t idx = 0; idx < g_rings.getNumberOfShards(); idx++) {
+    ReadLock rl(&g_rings.d_shards[idx].queryLock);
+    total += g_rings.d_shards[idx].queryRing.size();
+  }
+
+  counts.reserve(total);
+
+  for (size_t idx = 0; idx < g_rings.getNumberOfShards(); idx++) {
+    ReadLock rl(&g_rings.d_shards[idx].queryLock);
+    for(const auto& c : g_rings.d_shards[idx].queryRing) {
       if(seconds && c.when < cutoff)
         continue;
       if(now < c.when)
@@ -220,10 +241,12 @@ void setupLuaInspection()
       map<ComboAddress, unsigned int,ComboAddress::addressOnlyLessThan > counts;
       unsigned int total=0;
       {
-        ReadLock rl(&g_rings.queryLock);
-        for(const auto& c : g_rings.queryRing) {
-          counts[c.requestor]++;
-          total++;
+        for (size_t idx = 0; idx < g_rings.getNumberOfShards(); idx++) {
+          ReadLock rl(&g_rings.d_shards[idx].queryLock);
+          for(const auto& c : g_rings.d_shards[idx].queryRing) {
+            counts[c.requestor]++;
+            total++;
+          }
         }
       }
       vector<pair<unsigned int, ComboAddress>> rcounts;
@@ -251,20 +274,24 @@ void setupLuaInspection()
       map<DNSName, unsigned int> counts;
       unsigned int total=0;
       if(!labels) {
-       ReadLock rl(&g_rings.queryLock);
-       for(const auto& a : g_rings.queryRing) {
-         counts[a.name]++;
-         total++;
-       }
+        for (size_t idx = 0; idx < g_rings.getNumberOfShards(); idx++) {
+          ReadLock rl(&g_rings.d_shards[idx].queryLock);
+          for(const auto& a : g_rings.d_shards[idx].queryRing) {
+            counts[a.name]++;
+            total++;
+          }
+        }
       }
       else {
        unsigned int lab = *labels;
-       ReadLock rl(&g_rings.queryLock);
-       for(auto a : g_rings.queryRing) {
-         a.name.trimToLabels(lab);
-         counts[a.name]++;
-         total++;
-       }
+        for (size_t idx = 0; idx < g_rings.getNumberOfShards(); idx++) {
+          ReadLock rl(&g_rings.d_shards[idx].queryLock);
+          for(auto a : g_rings.d_shards[idx].queryRing) {
+            a.name.trimToLabels(lab);
+            counts[a.name]++;
+            total++;
+          }
+        }
       }
       // cout<<"Looked at "<<total<<" queries, "<<counts.size()<<" different ones"<<endl;
       vector<pair<unsigned int, DNSName>> rcounts;
@@ -294,20 +321,27 @@ void setupLuaInspection()
 
   g_lua.writeFunction("getResponseRing", []() {
       setLuaNoSideEffect();
-      decltype(g_rings.respRing) ring;
-      {
-       std::lock_guard<std::mutex> lock(g_rings.respMutex);
-       ring = g_rings.respRing;
+      size_t totalEntries = 0;
+      std::vector<boost::circular_buffer<Rings::Response>> rings;
+      rings.reserve(g_rings.getNumberOfShards());
+      for (size_t idx = 0; idx < g_rings.getNumberOfShards(); idx++) {
+        {
+          ReadLock rl(&g_rings.d_shards[idx].respLock);
+          rings[idx] = g_rings.d_shards[idx].respRing;
+        }
+        totalEntries += rings[idx].size();
       }
       vector<std::unordered_map<string, boost::variant<string, unsigned int> > > ret;
-      ret.reserve(ring.size());
+      ret.reserve(totalEntries);
       decltype(ret)::value_type item;
-      for(const auto& r : ring) {
-       item["name"]=r.name.toString();
-       item["qtype"]=r.qtype;
-       item["rcode"]=r.dh.rcode;
-       item["usec"]=r.usec;
-       ret.push_back(item);
+      for (size_t idx = 0; idx < rings.size(); idx++) {
+        for(const auto& r : rings[idx]) {
+          item["name"]=r.name.toString();
+          item["qtype"]=r.qtype;
+          item["rcode"]=r.dh.rcode;
+          item["usec"]=r.usec;
+          ret.push_back(item);
+        }
       }
       return ret;
     });
@@ -382,19 +416,28 @@ void setupLuaInspection()
         }
       }
 
-      decltype(g_rings.queryRing) qr;
-      decltype(g_rings.respRing) rr;
-      {
-        ReadLock rl(&g_rings.queryLock);
-        qr=g_rings.queryRing;
+      std::vector<Rings::Query> qr;
+      std::vector<Rings::Response> rr;
+      for (size_t idx = 0; idx < g_rings.getNumberOfShards(); idx++) {
+        {
+          ReadLock rl(&g_rings.d_shards[idx].queryLock);
+          qr.resize(qr.size() + g_rings.d_shards[idx].queryRing.size());
+          for (const auto& entry : g_rings.d_shards[idx].queryRing) {
+            qr.push_back(entry);
+          }
+        }
+        {
+          ReadLock rl(&g_rings.d_shards[idx].respLock);
+          rr.resize(rr.size() + g_rings.d_shards[idx].respRing.size());
+          for (const auto& entry : g_rings.d_shards[idx].respRing) {
+            rr.push_back(entry);
+          }
+        }
       }
+
       sort(qr.begin(), qr.end(), [](const decltype(qr)::value_type& a, const decltype(qr)::value_type& b) {
         return b.when < a.when;
       });
-      {
-       std::lock_guard<std::mutex> lock(g_rings.respMutex);
-        rr=g_rings.respRing;
-      }
 
       sort(rr.begin(), rr.end(), [](const decltype(rr)::value_type& a, const decltype(rr)::value_type& b) {
         return b.when < a.when;
@@ -471,20 +514,22 @@ void setupLuaInspection()
       double totlat=0;
       unsigned int size=0;
       {
-       std::lock_guard<std::mutex> lock(g_rings.respMutex);
-       for(const auto& r : g_rings.respRing) {
-          /* skip actively discovered timeouts */
-          if (r.usec == std::numeric_limits<unsigned int>::max())
-            continue;
-
-         ++size;
-         auto iter = histo.lower_bound(r.usec);
-         if(iter != histo.end())
-           iter->second++;
-         else
-           histo.rbegin()++;
-         totlat+=r.usec;
-       }
+        for (size_t idx = 0; idx < g_rings.getNumberOfShards(); idx++) {
+          ReadLock rl(&g_rings.d_shards[idx].respLock);
+          for(const auto& r : g_rings.d_shards[idx].respRing) {
+            /* skip actively discovered timeouts */
+            if (r.usec == std::numeric_limits<unsigned int>::max())
+              continue;
+
+            ++size;
+            auto iter = histo.lower_bound(r.usec);
+            if(iter != histo.end())
+              iter->second++;
+            else
+              histo.rbegin()++;
+            totlat+=r.usec;
+          }
+        }
       }
 
       if (size == 0) {
index c0270524af8eb20a182fe32c60ebe83423e57153..58078e7bf4357a074c60db38d5212a49bac8b9f3 100644 (file)
@@ -325,14 +325,14 @@ void setupLuaConfig(bool client)
                        if (ret->connected) {
                          if(g_launchWork) {
                            g_launchWork->push_back([ret,cpus]() {
-                             ret->tid = thread(responderThread, ret);
+                              ret->tid = thread(responderThread, ret, g_rings.getResponseInserterId());
                               if (!cpus.empty()) {
                                 mapThreadToCPUList(ret->tid.native_handle(), cpus);
                               }
                            });
                          }
                          else {
-                           ret->tid = thread(responderThread, ret);
+                            ret->tid = thread(responderThread, ret, g_rings.getResponseInserterId());
                             if (!cpus.empty()) {
                               mapThreadToCPUList(ret->tid.native_handle(), cpus);
                             }
@@ -1298,14 +1298,14 @@ void setupLuaConfig(bool client)
       g_servFailOnNoPolicy = servfail;
     });
 
-  g_lua.writeFunction("setRingBuffersSize", [](size_t capacity) {
+  g_lua.writeFunction("setRingBuffersSize", [](size_t capacity, boost::optional<size_t> numberOfShards) {
       setLuaSideEffect();
       if (g_configurationDone) {
         errlog("setRingBuffersSize() cannot be used at runtime!");
         g_outputBuffer="setRingBuffersSize() cannot be used at runtime!\n";
         return;
       }
-      g_rings.setCapacity(capacity);
+      g_rings.setCapacity(capacity, numberOfShards ? *numberOfShards : 1);
     });
 
   g_lua.writeFunction("setWHashedPertubation", [](uint32_t pertub) {
index 6fef34271cf85c4d803a42ae3d713b9a7265139b..83c9ca58782f3f2b251de3ca42e180e405c1ffcf 100644 (file)
 size_t Rings::numDistinctRequestors()
 {
   std::set<ComboAddress, ComboAddress::addressOnlyLessThan> s;
-  ReadLock rl(&queryLock);
-  for(const auto& q : queryRing)
-    s.insert(q.requestor);
+  for (size_t idx = 0; idx < getNumberOfShards(); idx++) {
+    ReadLock rl(&d_shards[idx].queryLock);
+    for(const auto& q : d_shards[idx].queryRing) {
+      s.insert(q.requestor);
+    }
+  }
   return s.size();
 }
 
@@ -35,19 +38,20 @@ std::unordered_map<int, vector<boost::variant<string,double>>> Rings::getTopBand
 {
   map<ComboAddress, unsigned int, ComboAddress::addressOnlyLessThan> counts;
   uint64_t total=0;
-  {
-    ReadLock rl(&queryLock);
-    for(const auto& q : queryRing) {
-      counts[q.requestor]+=q.size;
-      total+=q.size;
+  for (size_t idx = 0; idx < getNumberOfShards(); idx++) {
+    {
+      ReadLock rl(&d_shards[idx].queryLock);
+      for(const auto& q : d_shards[idx].queryRing) {
+        counts[q.requestor]+=q.size;
+        total+=q.size;
+      }
     }
-  }
-
-  {
-    std::lock_guard<std::mutex> lock(respMutex);
-    for(const auto& r : respRing) {
-      counts[r.requestor]+=r.size;
-      total+=r.size;
+    {
+      ReadLock rl(&d_shards[idx].respLock);
+      for(const auto& r : d_shards[idx].respRing) {
+        counts[r.requestor]+=r.size;
+        total+=r.size;
+      }
     }
   }
 
index f0ec210d4c1475f8851e6fa94038f476991bf134..266aac8bdeccaaffc8d0c6ceb22d0cd9b0c36039 100644 (file)
@@ -235,6 +235,7 @@ void* tcpClientThread(int pipefd)
   /* we get launched with a pipe on which we receive file descriptors from clients that we own
      from that point on */
 
+  const auto queryInserterId = g_rings.getQueryInserterId();
   bool outstanding = false;
   time_t lastTCPCleanup = time(nullptr);
   
@@ -358,7 +359,7 @@ void* tcpClientThread(int pipefd)
        DNSName qname(query, qlen, sizeof(dnsheader), false, &qtype, &qclass, &consumed);
        DNSQuestion dq(&qname, qtype, qclass, &dest, &ci.remote, dh, queryBuffer.capacity(), qlen, true, &queryRealTime);
 
-       if (!processQuery(holders, dq, poolname, &delayMsec, now)) {
+       if (!processQuery(holders, dq, poolname, &delayMsec, now, queryInserterId)) {
          goto drop;
        }
 
@@ -632,10 +633,7 @@ void* tcpClientThread(int pipefd)
         struct timespec answertime;
         gettime(&answertime);
         unsigned int udiff = 1000000.0*DiffTime(now,answertime);
-        {
-          std::lock_guard<std::mutex> lock(g_rings.respMutex);
-          g_rings.respRing.push_back({answertime, ci.remote, qname, dq.qtype, (unsigned int)udiff, (unsigned int)responseLen, *dh, ds->remote});
-        }
+        g_rings.insertResponse(answertime, ci.remote, qname, dq.qtype, (unsigned int)udiff, (unsigned int)responseLen, *dh, ds->remote, queryInserterId);
 
         rewrittenResponse.clear();
       }
index d9c18bf3107c557d5f881d87511d3837c3d556e0..2a15d291b0eb6dbd2c038d165de2601c2a7eb53c 100644 (file)
@@ -406,7 +406,7 @@ static void pickBackendSocketsReadyForReceiving(const std::shared_ptr<Downstream
 }
 
 // listens on a dedicated socket, lobs answers from downstream servers to original requestors
-void* responderThread(std::shared_ptr<DownstreamState> dss)
+void* responderThread(std::shared_ptr<DownstreamState> dss, const size_t responseInserterId)
 try {
   auto localRespRulactions = g_resprulactions.getLocal();
 #ifdef HAVE_DNSCRYPT
@@ -512,12 +512,9 @@ try {
         double udiff = ids->sentTime.udiff();
         vinfolog("Got answer from %s, relayed to %s, took %f usec", dss->remote.toStringWithPort(), ids->origRemote.toStringWithPort(), udiff);
 
-        {
-          struct timespec ts;
-          gettime(&ts);
-          std::lock_guard<std::mutex> lock(g_rings.respMutex);
-          g_rings.respRing.push_back({ts, ids->origRemote, ids->qname, ids->qtype, (unsigned int)udiff, (unsigned int)got, *dh, dss->remote});
-        }
+        struct timespec ts;
+        gettime(&ts);
+        g_rings.insertResponse(ts, ids->origRemote, ids->qname, ids->qtype, (unsigned int)udiff, (unsigned int)got, *dh, dss->remote, responseInserterId);
 
         if(dh->rcode == RCode::ServFail)
           g_stats.servfailResponses++;
@@ -859,12 +856,9 @@ static void spoofResponseFromString(DNSQuestion& dq, const string& spoofContent)
   }
 }
 
-bool processQuery(LocalHolders& holders, DNSQuestion& dq, string& poolname, int* delayMsec, const struct timespec& now)
+bool processQuery(LocalHolders& holders, DNSQuestion& dq, string& poolname, int* delayMsec, const struct timespec& now, size_t queryInserterId)
 {
-  {
-    WriteLock wl(&g_rings.queryLock);
-    g_rings.queryRing.push_back({now,*dq.remote,*dq.qname,dq.len,dq.qtype,*dq.dh});
-  }
+  g_rings.insertQuery(now,*dq.remote,*dq.qname,dq.len,dq.qtype,*dq.dh, queryInserterId);
 
   if(g_qcount.enabled) {
     string qname = (*dq.qname).toString(".");
@@ -1209,7 +1203,7 @@ static void queueResponse(const ClientState& cs, const char* response, uint16_t
 }
 #endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */
 
-static void processUDPQuery(ClientState& cs, LocalHolders& holders, const struct msghdr* msgh, const ComboAddress& remote, ComboAddress& dest, char* query, uint16_t len, size_t queryBufferSize, struct mmsghdr* responsesVect, unsigned int* queuedResponses, struct iovec* respIOV, char* respCBuf)
+static void processUDPQuery(ClientState& cs, LocalHolders& holders, const struct msghdr* msgh, const ComboAddress& remote, ComboAddress& dest, char* query, uint16_t len, size_t queryBufferSize, struct mmsghdr* responsesVect, unsigned int* queuedResponses, struct iovec* respIOV, char* respCBuf, size_t queryInserterId)
 {
   assert(responsesVect == nullptr || (queuedResponses != nullptr && respIOV != nullptr && respCBuf != nullptr));
   uint16_t queryId = 0;
@@ -1251,7 +1245,7 @@ static void processUDPQuery(ClientState& cs, LocalHolders& holders, const struct
     DNSName qname(query, len, sizeof(dnsheader), false, &qtype, &qclass, &consumed);
     DNSQuestion dq(&qname, qtype, qclass, dest.sin4.sin_family != 0 ? &dest : &cs.local, &remote, dh, queryBufferSize, len, false, &queryRealTime);
 
-    if (!processQuery(holders, dq, poolname, &delayMsec, now))
+    if (!processQuery(holders, dq, poolname, &delayMsec, now, queryInserterId))
     {
       return;
     }
@@ -1479,7 +1473,7 @@ static void processUDPQuery(ClientState& cs, LocalHolders& holders, const struct
 }
 
 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
-static void MultipleMessagesUDPClientThread(ClientState* cs, LocalHolders& holders)
+static void MultipleMessagesUDPClientThread(ClientState* cs, LocalHolders& holders, size_t queryInserterId)
 {
   struct MMReceiver
   {
@@ -1540,7 +1534,7 @@ static void MultipleMessagesUDPClientThread(ClientState* cs, LocalHolders& holde
         continue;
       }
 
-      processUDPQuery(*cs, holders, msgh, remote, recvData[msgIdx].dest, recvData[msgIdx].packet, static_cast<uint16_t>(got), sizeof(recvData[msgIdx].packet), outMsgVec.get(), &msgsToSend, &recvData[msgIdx].iov, recvData[msgIdx].cbuf);
+      processUDPQuery(*cs, holders, msgh, remote, recvData[msgIdx].dest, recvData[msgIdx].packet, static_cast<uint16_t>(got), sizeof(recvData[msgIdx].packet), outMsgVec.get(), &msgsToSend, &recvData[msgIdx].iov, recvData[msgIdx].cbuf, queryInserterId);
 
     }
 
@@ -1560,14 +1554,14 @@ static void MultipleMessagesUDPClientThread(ClientState* cs, LocalHolders& holde
 #endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */
 
 // listens to incoming queries, sends out to downstream servers, noting the intended return path
-static void* udpClientThread(ClientState* cs)
+static void* udpClientThread(ClientState* cs, size_t queryInserterId)
 try
 {
   LocalHolders holders;
 
 #if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
   if (g_udpVectorSize > 1) {
-    MultipleMessagesUDPClientThread(cs, holders);
+    MultipleMessagesUDPClientThread(cs, holders, queryInserterId);
 
   }
   else
@@ -1598,7 +1592,7 @@ try
         continue;
       }
 
-      processUDPQuery(*cs, holders, &msgh, remote, dest, packet, static_cast<uint16_t>(got), s_udpIncomingBufferSize, nullptr, nullptr, nullptr, nullptr);
+      processUDPQuery(*cs, holders, &msgh, remote, dest, packet, static_cast<uint16_t>(got), s_udpIncomingBufferSize, nullptr, nullptr, nullptr, nullptr, queryInserterId);
     }
   }
 
@@ -1807,7 +1801,7 @@ void* healthChecksThread()
               }
             }
             if (dss->connected) {
-              dss->tid = thread(responderThread, dss);
+              dss->tid = thread(responderThread, dss, g_rings.getResponseInserterId());
             }
           }
 
@@ -1851,8 +1845,7 @@ void* healthChecksThread()
           memset(&fake, 0, sizeof(fake));
           fake.id = ids.origID;
 
-          std::lock_guard<std::mutex> lock(g_rings.respMutex);
-          g_rings.respRing.push_back({ts, ids.origRemote, ids.qname, ids.qtype, std::numeric_limits<unsigned int>::max(), 0, fake, dss->remote});
+          g_rings.insertResponse(ts, ids.origRemote, ids.qname, ids.qtype, std::numeric_limits<unsigned int>::max(), 0, fake, dss->remote, 0);
         }          
       }
     }
@@ -2551,7 +2544,7 @@ try
       auto ret=std::make_shared<DownstreamState>(ComboAddress(address, 53));
       addServerToPool(localPools, "", ret);
       if (ret->connected) {
-        ret->tid = thread(responderThread, ret);
+        ret->tid = thread(responderThread, ret, g_rings.getResponseInserterId());
       }
       g_dstates.modify([ret](servers_t& servers) { servers.push_back(ret); });
     }
@@ -2575,7 +2568,7 @@ try
 
   for(auto& cs : toLaunch) {
     if (cs->udpFD >= 0) {
-      thread t1(udpClientThread, cs);
+      thread t1(udpClientThread, cs, g_rings.getQueryInserterId());
       if (!cs->cpus.empty()) {
         mapThreadToCPUList(t1.native_handle(), cs->cpus);
       }
index b7f0f9b63e5943a67f61888b54b1a0003a81f4d6..270b18b8614ef2ea04426ec26f0473ebff56cf10 100644 (file)
@@ -376,12 +376,6 @@ struct IDState
 };
 
 struct Rings {
-  Rings(size_t capacity=10000)
-  {
-    queryRing.set_capacity(capacity);
-    respRing.set_capacity(capacity);
-    pthread_rwlock_init(&queryLock, nullptr);
-  }
   struct Query
   {
     struct timespec when;
@@ -391,7 +385,6 @@ struct Rings {
     uint16_t qtype;
     struct dnsheader dh;
   };
-  boost::circular_buffer<Query> queryRing;
   struct Response
   {
     struct timespec when;
@@ -403,23 +396,87 @@ struct Rings {
     struct dnsheader dh;
     ComboAddress ds; // who handled it
   };
-  boost::circular_buffer<Response> respRing;
-  std::mutex respMutex;
-  pthread_rwlock_t queryLock;
 
+  struct Shard
+  {
+    boost::circular_buffer<Query> queryRing;
+    boost::circular_buffer<Response> respRing;
+    pthread_rwlock_t queryLock;
+    pthread_rwlock_t respLock;
+  };
+
+  Rings(size_t capacity=10000, size_t numberOfShards=1): d_numberOfShards(numberOfShards)
+  {
+    setCapacity(capacity, numberOfShards);
+  }
   std::unordered_map<int, vector<boost::variant<string,double> > > getTopBandwidth(unsigned int numentries);
   size_t numDistinctRequestors();
-  void setCapacity(size_t newCapacity)
+  void setCapacity(size_t newCapacity, size_t numberOfShards)
   {
-    {
-      WriteLock wl(&queryLock);
-      queryRing.set_capacity(newCapacity);
+    if (numberOfShards < d_numberOfShards) {
+      throw std::runtime_error("Decreasing the number of shards in the query and response rings is not supported");
     }
-    {
-      std::lock_guard<std::mutex> lock(respMutex);
-      respRing.set_capacity(newCapacity);
+
+    d_shards.resize(numberOfShards);
+
+    /* set up the locks for the new shards */
+    for (size_t idx = d_numberOfShards; idx < numberOfShards; idx++) {
+      pthread_rwlock_init(&d_shards[idx].queryLock, 0);
+      pthread_rwlock_init(&d_shards[idx].respLock, 0);
+    }
+
+    d_numberOfShards = numberOfShards;
+
+    /* resize all the rings */
+    for (size_t idx = 0; idx < numberOfShards; idx++) {
+      {
+        WriteLock wl(&d_shards[idx].queryLock);
+        d_shards[idx].queryRing.set_capacity(newCapacity / numberOfShards);
+      }
+      {
+        WriteLock wl(&d_shards[idx].respLock);
+        d_shards[idx].respRing.set_capacity(newCapacity / numberOfShards);
+      }
     }
   }
+  size_t getQueryInserterId()
+  {
+    return s_queryInserterId++;
+  }
+  size_t getResponseInserterId()
+  {
+    return s_responseInserterId++;
+  }
+  size_t getNumberOfShards() const
+  {
+    return d_numberOfShards;
+  }
+
+  void insertQuery(const struct timespec& when, const ComboAddress& requestor, const DNSName& name, uint16_t qtype, uint16_t size, const struct dnsheader& dh, size_t queryInserterId)
+  {
+    auto shardId = getShardId(queryInserterId);
+    WriteLock wl(&d_shards[shardId].queryLock);
+    d_shards[shardId].queryRing.push_back({when, requestor, name, size, qtype, dh});
+  }
+
+  void insertResponse(const struct timespec& when, const ComboAddress& requestor, const DNSName& name, uint16_t qtype, unsigned int usec, unsigned int size, const struct dnsheader& dh, const ComboAddress& backend, size_t responseInserterId)
+  {
+    auto shardId = getShardId(responseInserterId);
+    WriteLock wl(&d_shards[shardId].respLock);
+    d_shards[shardId].respRing.push_back({when, requestor, name, qtype, usec, size, dh, backend});
+  }
+
+  std::vector<Shard> d_shards;
+
+private:
+  size_t getShardId(size_t id) const
+  {
+    return (id % d_numberOfShards);
+  }
+
+  std::atomic<size_t> s_queryInserterId{0};
+  std::atomic<size_t> s_responseInserterId{0};
+  size_t d_numberOfShards;
 };
 
 extern Rings g_rings;
@@ -633,7 +690,7 @@ using servers_t =vector<std::shared_ptr<DownstreamState>>;
 
 template <class T> using NumberedVector = std::vector<std::pair<unsigned int, T> >;
 
-void* responderThread(std::shared_ptr<DownstreamState> state);
+void* responderThread(std::shared_ptr<DownstreamState> state, size_t responseInserterId);
 extern std::mutex g_luamutex;
 extern LuaContext g_lua;
 extern std::string g_outputBuffer; // locking for this is ok, as locked by g_luamutex
@@ -868,7 +925,7 @@ bool getLuaNoSideEffect(); // set if there were only explicit declarations of _n
 void resetLuaSideEffect(); // reset to indeterminate state
 
 bool responseContentMatches(const char* response, const uint16_t responseLen, const DNSName& qname, const uint16_t qtype, const uint16_t qclass, const ComboAddress& remote);
-bool processQuery(LocalHolders& holders, DNSQuestion& dq, string& poolname, int* delayMsec, const struct timespec& now);
+bool processQuery(LocalHolders& holders, DNSQuestion& dq, string& poolname, int* delayMsec, const struct timespec& now, size_t queryInserterId);
 bool processResponse(LocalStateHolder<vector<DNSDistResponseRuleAction> >& localRespRulactions, DNSResponse& dr, int* delayMsec);
 bool fixUpResponse(char** response, uint16_t* responseLen, size_t* responseSize, const DNSName& qname, uint16_t origFlags, bool ednsAdded, bool ecsAdded, std::vector<uint8_t>& rewrittenResponse, uint16_t addRoom);
 void restoreFlags(struct dnsheader* dh, uint16_t origFlags);