]> granicus.if.org Git - pdns/commitdiff
rec: Use a bounded load-balancing algo to distribute queries
authorRemi Gacogne <remi.gacogne@powerdns.com>
Wed, 20 Feb 2019 16:47:30 +0000 (17:47 +0100)
committerRemi Gacogne <remi.gacogne@powerdns.com>
Wed, 27 Mar 2019 15:54:39 +0000 (16:54 +0100)
(cherry picked from commit 144040bef0b1f65abfb4634f65b1445a84393a1b)

pdns/pdns_recursor.cc
pdns/recursordist/docs/settings.rst

index ce5394e989b8decb62557e4d9575559fe35b372b..a9f317208b8c7a0bcbf4c0f4936df33c5f6c6bbc 100644 (file)
@@ -124,6 +124,12 @@ struct ThreadPipeSet
   int readQueriesToThread;
 };
 
+struct RecWorkerThreadInfo
+{
+  MT_t* mt{nullptr};
+  uint64_t numberOfDistributedQueries{0};
+};
+
 /* the TID of the thread handling the web server, carbon, statistics and the control channel */
 static const int s_handlerThreadID = -1;
 /* when pdns-distributes-queries is set, the TID of the thread handling, hashing and distributing new queries
@@ -137,6 +143,7 @@ typedef vector<pair<int, function< void(int, any&) > > > deferredAdd_t;
 
 static const ComboAddress g_local4("0.0.0.0"), g_local6("::");
 static vector<ThreadPipeSet> g_pipes; // effectively readonly after startup
+static vector<RecWorkerThreadInfo> s_threadInfos;
 static tcpListenSockets_t g_tcpListenSockets;   // shared across threads, but this is fine, never written to from a thread. All threads listen on all sockets
 static listenSocketsAddresses_t g_listenSocketsAddresses; // is shared across all threads right now
 static std::unordered_map<unsigned int, deferredAdd_t> deferredAdds;
@@ -166,6 +173,7 @@ static bool g_gettagNeedsEDNSOptions{false};
 static time_t g_statisticsInterval;
 static bool g_useIncomingECS;
 std::atomic<uint32_t> g_maxCacheEntries, g_maxPacketCacheEntries;
+static double s_balancingFactor;
 
 RecursorControlChannel s_rcc; // only active in thread 0
 RecursorStats g_stats;
@@ -2164,6 +2172,14 @@ static void doStats(void)
     L<<Logger::Notice<<"stats: " <<  broadcastAccFunction<uint64_t>(pleaseGetPacketCacheSize) <<
     " packet cache entries, "<<(int)(100.0*broadcastAccFunction<uint64_t>(pleaseGetPacketCacheHits)/SyncRes::s_queries) << "% packet cache hits"<<endl;
 
+    size_t idx = 0;
+    for (const auto& threadInfo : s_threadInfos) {
+      if (idx != s_distributorThreadID) {
+        L<<Logger::Notice<<"Thread "<<idx<<" has been distributed "<<threadInfo.numberOfDistributedQueries<<" queries"<<endl;
+      }
+      ++idx;
+    }
+
     time_t now = time(0);
     if(lastOutputTime && lastQueryCount && now != lastOutputTime) {
       L<<Logger::Notice<<"stats: "<< (SyncRes::s_queries - lastQueryCount) / (now - lastOutputTime) <<" qps (average over "<< (now - lastOutputTime) << " seconds)"<<endl;
@@ -2324,6 +2340,7 @@ void broadcastFunction(const pipefunc_t& func)
 
 static bool trySendingQueryToWorker(unsigned int target, ThreadMSG* tmsg)
 {
+  auto& targetInfo = s_threadInfos[target];
   if(target == static_cast<unsigned int>(s_distributorThreadID)) {
     L<<Logger::Error<<"distributeAsyncFunction() tried to assign a query to the distributor"<<endl;
     exit(1);
@@ -2348,10 +2365,49 @@ static bool trySendingQueryToWorker(unsigned int target, ThreadMSG* tmsg)
     }
   }
 
+  ++targetInfo.numberOfDistributedQueries;
+
   return true;
 }
 
-// This function is only called by the distributor thread, when pdns-distributes-queries is set
+static unsigned int getWorkerLoad(size_t workerIdx)
+{
+  const auto mt = s_threadInfos[/* skip distributor */ 1 + workerIdx].mt;
+  if (mt != nullptr) {
+    return mt->numProcesses();
+  }
+  return 0;
+}
+
+static unsigned int selectWorker(unsigned int hash)
+{
+  if (s_balancingFactor == 0) {
+    return 1 + (hash % (g_pipes.size()-1));
+  }
+
+  /* we start with one, representing the query we are currently handling */
+  double currentLoad = 1;
+  std::vector<unsigned int> load(g_numWorkerThreads);
+  for (size_t idx = 0; idx < g_numWorkerThreads; idx++) {
+    load[idx] = getWorkerLoad(idx);
+    currentLoad += load[idx];
+    // cerr<<"load for worker "<<idx<<" is "<<load[idx]<<endl;
+  }
+
+  double targetLoad = (currentLoad / g_numWorkerThreads) * s_balancingFactor;
+  // cerr<<"total load is "<<currentLoad<<", number of workers is "<<g_numWorkerThreads<<", target load is "<<targetLoad<<endl;
+
+  unsigned int worker = hash % g_numWorkerThreads;
+  /* at least one server has to be below the average load */
+  while(load[worker] > targetLoad) {
+    // cerr<<"worker "<<worker<<" is above the target load, selecting another one"<<endl;
+    worker = (worker + 1) % g_numWorkerThreads;
+  }
+
+  return /* skip distributor */ 1 + worker;
+}
+
+// This function is only called by the distributor threads, when pdns-distributes-queries is set
 void distributeAsyncFunction(const string& packet, const pipefunc_t& func)
 {
   if (t_id != s_distributorThreadID) {
@@ -2360,7 +2416,7 @@ void distributeAsyncFunction(const string& packet, const pipefunc_t& func)
   }
 
   unsigned int hash = hashQuestion(packet.c_str(), packet.length(), g_disthashseed);
-  unsigned int target = 1 + (hash % (g_pipes.size()-1));
+  unsigned int target = selectWorker(hash);
 
   ThreadMSG* tmsg = new ThreadMSG();
   tmsg->func = func;
@@ -2371,7 +2427,7 @@ void distributeAsyncFunction(const string& packet, const pipefunc_t& func)
        was full, let's try another one */
     unsigned int newTarget = 0;
     do {
-      newTarget = 1 + dns_random(g_pipes.size()-1);
+      newTarget = /* skip distributor */ 1 + dns_random(g_pipes.size()-1);
     } while (newTarget == target);
 
     if (!trySendingQueryToWorker(newTarget, tmsg)) {
@@ -3139,10 +3195,14 @@ static int serviceMain(int argc, char*argv[])
 
   g_statisticsInterval = ::arg().asNum("statistics-interval");
 
+  s_balancingFactor = ::arg().asDouble("distribution-load-factor");
+
 #ifdef SO_REUSEPORT
   g_reusePort = ::arg().mustDo("reuseport");
 #endif
 
+  s_threadInfos.resize(/* distributor */ 1 + g_numWorkerThreads);
+
   g_useOneSocketPerThread = (!g_weDistributeQueries && g_reusePort);
 
   if (g_useOneSocketPerThread) {
@@ -3322,6 +3382,10 @@ try
   }
 
   MT=std::unique_ptr<MTasker<PacketID,string> >(new MTasker<PacketID,string>(::arg().asNum("stack-size")));
+  if (worker) {
+    auto& threadInfo = s_threadInfos.at(t_id);
+    threadInfo.mt = MT.get();
+  }
 
   PacketID pident;
 
@@ -3572,6 +3636,8 @@ int main(int argc, char **argv)
 
     ::arg().setSwitch("log-rpz-changes", "Log additions and removals to RPZ zones at Info level")="no";
 
+    ::arg().set("distribution-load-factor", "The load factor used when PowerDNS is distributing queries to worker threads")="0.0";
+
     ::arg().setCmd("help","Provide a helpful message");
     ::arg().setCmd("version","Print version string");
     ::arg().setCmd("config","Output blank configuration");
index 67033f8ad21eed210f0f1c2ae1ff7150c5f4be68..59ddc1b0048dc052d109f6cc792d2d9710e09ebb 100644 (file)
@@ -263,6 +263,25 @@ Do not log to syslog, only to stdout.
 Use this setting when running inside a supervisor that handles logging (like systemd).
 **Note**: do not use this setting in combination with `daemon`_ as all logging will disappear.
 
+.. _setting-distribution-load-factor:
+
+``distribution-load-factor``
+----------------------------
+.. versionadded:: 4.1.12
+
+-  Double
+-  Default: 0.0
+
+If `pdns-distributes-queries`_ is set and this setting is set to another value
+than 0, the distributor thread will use a bounded load-balancing algorithm while
+distributing queries to worker threads, making sure that no thread is assigned
+more queries than distribution-load-factor times the average number of queries
+currently processed by all the workers.
+For example, with a value of 1.25, no server should get more than 125 % of the
+average load. This helps making sure that all the workers have roughly the same
+share of queries, even if the incoming traffic is very skewed, with a larger
+number of requests asking for the same qname.
+
 .. _setting-dnssec:
 
 ``dnssec``