int readQueriesToThread;
};
+struct RecWorkerThreadInfo
+{
+ MT_t* mt{nullptr};
+ uint64_t numberOfDistributedQueries{0};
+};
+
/* the TID of the thread handling the web server, carbon, statistics and the control channel */
static const int s_handlerThreadID = -1;
/* when pdns-distributes-queries is set, the TID of the thread handling, hashing and distributing new queries
static const ComboAddress g_local4("0.0.0.0"), g_local6("::");
static vector<ThreadPipeSet> g_pipes; // effectively readonly after startup
+static vector<RecWorkerThreadInfo> s_threadInfos;
static tcpListenSockets_t g_tcpListenSockets; // shared across threads, but this is fine, never written to from a thread. All threads listen on all sockets
static listenSocketsAddresses_t g_listenSocketsAddresses; // is shared across all threads right now
static std::unordered_map<unsigned int, deferredAdd_t> deferredAdds;
static time_t g_statisticsInterval;
static bool g_useIncomingECS;
std::atomic<uint32_t> g_maxCacheEntries, g_maxPacketCacheEntries;
+static double s_balancingFactor;
RecursorControlChannel s_rcc; // only active in thread 0
RecursorStats g_stats;
L<<Logger::Notice<<"stats: " << broadcastAccFunction<uint64_t>(pleaseGetPacketCacheSize) <<
" packet cache entries, "<<(int)(100.0*broadcastAccFunction<uint64_t>(pleaseGetPacketCacheHits)/SyncRes::s_queries) << "% packet cache hits"<<endl;
+ size_t idx = 0;
+ for (const auto& threadInfo : s_threadInfos) {
+ if (idx != s_distributorThreadID) {
+ L<<Logger::Notice<<"Thread "<<idx<<" has been distributed "<<threadInfo.numberOfDistributedQueries<<" queries"<<endl;
+ }
+ ++idx;
+ }
+
time_t now = time(0);
if(lastOutputTime && lastQueryCount && now != lastOutputTime) {
L<<Logger::Notice<<"stats: "<< (SyncRes::s_queries - lastQueryCount) / (now - lastOutputTime) <<" qps (average over "<< (now - lastOutputTime) << " seconds)"<<endl;
static bool trySendingQueryToWorker(unsigned int target, ThreadMSG* tmsg)
{
+ auto& targetInfo = s_threadInfos[target];
if(target == static_cast<unsigned int>(s_distributorThreadID)) {
L<<Logger::Error<<"distributeAsyncFunction() tried to assign a query to the distributor"<<endl;
exit(1);
}
}
+ ++targetInfo.numberOfDistributedQueries;
+
return true;
}
-// This function is only called by the distributor thread, when pdns-distributes-queries is set
+static unsigned int getWorkerLoad(size_t workerIdx)
+{
+ const auto mt = s_threadInfos[/* skip distributor */ 1 + workerIdx].mt;
+ if (mt != nullptr) {
+ return mt->numProcesses();
+ }
+ return 0;
+}
+
+static unsigned int selectWorker(unsigned int hash)
+{
+ if (s_balancingFactor == 0) {
+ return 1 + (hash % (g_pipes.size()-1));
+ }
+
+ /* we start with one, representing the query we are currently handling */
+ double currentLoad = 1;
+ std::vector<unsigned int> load(g_numWorkerThreads);
+ for (size_t idx = 0; idx < g_numWorkerThreads; idx++) {
+ load[idx] = getWorkerLoad(idx);
+ currentLoad += load[idx];
+ // cerr<<"load for worker "<<idx<<" is "<<load[idx]<<endl;
+ }
+
+ double targetLoad = (currentLoad / g_numWorkerThreads) * s_balancingFactor;
+ // cerr<<"total load is "<<currentLoad<<", number of workers is "<<g_numWorkerThreads<<", target load is "<<targetLoad<<endl;
+
+ unsigned int worker = hash % g_numWorkerThreads;
+ /* at least one server has to be below the average load */
+ while(load[worker] > targetLoad) {
+ // cerr<<"worker "<<worker<<" is above the target load, selecting another one"<<endl;
+ worker = (worker + 1) % g_numWorkerThreads;
+ }
+
+ return /* skip distributor */ 1 + worker;
+}
+
+// This function is only called by the distributor threads, when pdns-distributes-queries is set
void distributeAsyncFunction(const string& packet, const pipefunc_t& func)
{
if (t_id != s_distributorThreadID) {
}
unsigned int hash = hashQuestion(packet.c_str(), packet.length(), g_disthashseed);
- unsigned int target = 1 + (hash % (g_pipes.size()-1));
+ unsigned int target = selectWorker(hash);
ThreadMSG* tmsg = new ThreadMSG();
tmsg->func = func;
was full, let's try another one */
unsigned int newTarget = 0;
do {
- newTarget = 1 + dns_random(g_pipes.size()-1);
+ newTarget = /* skip distributor */ 1 + dns_random(g_pipes.size()-1);
} while (newTarget == target);
if (!trySendingQueryToWorker(newTarget, tmsg)) {
g_statisticsInterval = ::arg().asNum("statistics-interval");
+ s_balancingFactor = ::arg().asDouble("distribution-load-factor");
+
#ifdef SO_REUSEPORT
g_reusePort = ::arg().mustDo("reuseport");
#endif
+ s_threadInfos.resize(/* distributor */ 1 + g_numWorkerThreads);
+
g_useOneSocketPerThread = (!g_weDistributeQueries && g_reusePort);
if (g_useOneSocketPerThread) {
}
MT=std::unique_ptr<MTasker<PacketID,string> >(new MTasker<PacketID,string>(::arg().asNum("stack-size")));
+ if (worker) {
+ auto& threadInfo = s_threadInfos.at(t_id);
+ threadInfo.mt = MT.get();
+ }
PacketID pident;
::arg().setSwitch("log-rpz-changes", "Log additions and removals to RPZ zones at Info level")="no";
+ ::arg().set("distribution-load-factor", "The load factor used when PowerDNS is distributing queries to worker threads")="0.0";
+
::arg().setCmd("help","Provide a helpful message");
::arg().setCmd("version","Print version string");
::arg().setCmd("config","Output blank configuration");
Use this setting when running inside a supervisor that handles logging (like systemd).
**Note**: do not use this setting in combination with `daemon`_ as all logging will disappear.
+.. _setting-distribution-load-factor:
+
+``distribution-load-factor``
+----------------------------
+.. versionadded:: 4.1.12
+
+- Double
+- Default: 0.0
+
+If `pdns-distributes-queries`_ is set and this setting is set to another value
+than 0, the distributor thread will use a bounded load-balancing algorithm while
+distributing queries to worker threads, making sure that no thread is assigned
+more queries than distribution-load-factor times the average number of queries
+currently processed by all the workers.
+For example, with a value of 1.25, no server should get more than 125 % of the
+average load. This helps making sure that all the workers have roughly the same
+share of queries, even if the incoming traffic is very skewed, with a larger
+number of requests asking for the same qname.
+
.. _setting-dnssec:
``dnssec``