return results;
}
+
+size_t getPipeBufferSize(int fd)
+{
+#ifdef F_GETPIPE_SZ
+ int res = fcntl(fd, F_GETPIPE_SZ);
+ if (res == -1) {
+ return 0;
+ }
+ return res;
+#else
+ errno = ENOSYS;
+ return 0;
+#endif /* F_GETPIPE_SZ */
+}
+
+bool setPipeBufferSize(int fd, size_t size)
+{
+#ifdef F_SETPIPE_SZ
+ if (size > std::numeric_limits<int>::max()) {
+ errno = EINVAL;
+ return false;
+ }
+ int newSize = static_cast<int>(size);
+ int res = fcntl(fd, F_SETPIPE_SZ, newSize);
+ if (res == -1) {
+ return false;
+ }
+ return true;
+#else
+ errno = ENOSYS;
+ return false;
+#endif /* F_SETPIPE_SZ */
+}
bool setReceiveSocketErrors(int sock, int af);
int closesocket(int fd);
bool setCloseOnExec(int sock);
-uint64_t udpErrorStats(const std::string& str);
+size_t getPipeBufferSize(int fd);
+bool setPipeBufferSize(int fd, size_t size);
+
+uint64_t udpErrorStats(const std::string& str);
uint64_t getRealMemoryUsage(const std::string&);
uint64_t getSpecialMemoryUsage(const std::string&);
uint64_t getOpenFileDescriptors(const std::string&);
static void makeThreadPipes()
{
+ auto pipeBufferSize = ::arg().asNum("distribution-pipe-buffer-size");
+ if (pipeBufferSize > 0) {
+ g_log<<Logger::Info<<"Resizing the buffer of the distribution pipe to "<<pipeBufferSize<<endl;
+ }
+
/* thread 0 is the handler / SNMP, we start at 1 */
for(unsigned int n = 1; n <= (g_numWorkerThreads + g_numDistributorThreads); ++n) {
auto& threadInfos = s_threadInfos.at(n);
threadInfos.pipes.readQueriesToThread = fd[0];
threadInfos.pipes.writeQueriesToThread = fd[1];
+ if (pipeBufferSize > 0) {
+ if (!setPipeBufferSize(threadInfos.pipes.writeQueriesToThread, pipeBufferSize)) {
+ g_log<<Logger::Warning<<"Error resizing the buffer of the distribution pipe for thread "<<n<<" to "<<pipeBufferSize<<": "<<strerror(errno)<<endl;
+ auto existingSize = getPipeBufferSize(threadInfos.pipes.writeQueriesToThread);
+ if (existingSize > 0) {
+ g_log<<Logger::Warning<<"The current size of the distribution pipe's buffer for thread "<<n<<" is "<<existingSize<<endl;
+ }
+ }
+ }
+
if (!setNonBlocking(threadInfos.pipes.writeQueriesToThread)) {
unixDie("Making pipe for inter-thread communications non-blocking");
}
::arg().set("max-recursion-depth", "Maximum number of internal recursion calls per query, 0 for unlimited")="40";
::arg().set("max-udp-queries-per-round", "Maximum number of UDP queries processed per recvmsg() round, before returning back to normal processing")="10000";
::arg().set("protobuf-use-kernel-timestamp", "Compute the latency of queries in protobuf messages by using the timestamp set by the kernel when the query was received (when available)")="";
+ ::arg().set("distribution-pipe-buffer-size", "Size in bytes of the internal buffer of the pipe used by the distributor to pass incoming queries to a worker thread")="0";
::arg().set("include-dir","Include *.conf files from this directory")="";
::arg().set("security-poll-suffix","Domain name from which to query security update notifications")="secpoll.powerdns.com.";
share of queries, even if the incoming traffic is very skewed, with a larger
number of requests asking for the same qname.
+.. _setting-distribution-pipe-buffer-size:
+
+``distribution-pipe-buffer-size``
+---------------------------------
+.. versionadded:: 4.2.0
+
+- Integer
+- Default: 0
+
+Size in bytes of the internal buffer of the pipe used by the distributor to pass incoming queries to a worker thread.
+Requires support for `F_SETPIPE_SZ` which is present in Linux since 2.6.35. The actual size might be rounded up to
+a multiple of a page size. 0 means that the OS default size is used.
+A large buffer might allow the recursor to deal with very short-lived load spikes during which a worker thread gets
+overloaded, but it will be at the cost of an increased latency.
+
.. _setting-distributor-threads:
``distributor-threads``