]> granicus.if.org Git - pdns/commitdiff
dnsdist: Use a separate thread and a queue for remote logging
authorRemi Gacogne <remi.gacogne@powerdns.com>
Tue, 22 Mar 2016 15:24:55 +0000 (16:24 +0100)
committerRemi Gacogne <remi.gacogne@powerdns.com>
Tue, 22 Mar 2016 15:24:55 +0000 (16:24 +0100)
pdns/dnsdist-remotelogger.cc
pdns/dnsdist-remotelogger.hh

index 237010567610b69f15eb28bfb50f31fde7b27c2e..b8710bd68a8a098a6dacfc6ad87c4c1941140947 100644 (file)
@@ -10,7 +10,7 @@
 #include "dnsmessage.pb.h"
 #endif
 
-void RemoteLogger::reconnect()
+bool RemoteLogger::reconnect()
 {
   if (d_socket >= 0) {
     close(d_socket);
@@ -22,8 +22,10 @@ void RemoteLogger::reconnect()
     setNonBlocking(d_socket);
   }
   catch(const std::exception& e) {
-    infolog("Error connecting to %s: %s", d_remote.toStringWithPort(), e.what());
+    infolog("Error connecting to remote logger (%s): %s", d_remote.toStringWithPort(), e.what());
+    return false;
   }
+  return true;
 }
 
 bool RemoteLogger::sendData(const char* buffer, size_t bufferSize)
@@ -55,6 +57,59 @@ bool RemoteLogger::sendData(const char* buffer, size_t bufferSize)
   return true;
 }
 
+void RemoteLogger::worker()
+{
+  while(true) {
+    std::string data;
+    {
+      std::unique_lock<std::mutex> lock(d_writeMutex);
+      d_queueCond.wait(lock, [this]{return !d_writeQueue.empty();});
+      data = d_writeQueue.front();
+      d_writeQueue.pop();
+    }
+
+    try {
+      uint32_t len = htonl(data.length());
+      writen2WithTimeout(d_socket, &len, sizeof(len), (int) d_timeout);
+      writen2WithTimeout(d_socket, data.c_str(), data.length(), (int) d_timeout);
+    }
+    catch(const std::runtime_error& e) {
+      vinfolog("Error sending data to remote logger (%s): %s", d_remote.toStringWithPort(), e.what());
+
+      while (!reconnect()) {
+        sleep(d_reconnectWaitTime);
+      }
+    }
+  }
+}
+
+void RemoteLogger::queueData(const std::string& data)
+{
+  {
+    std::unique_lock<std::mutex> lock(d_writeMutex);
+    if (d_writeQueue.size() >= d_maxQueuedEntries) {
+      d_writeQueue.pop();
+    }
+    d_writeQueue.push(data);
+  }
+  d_queueCond.notify_one();
+}
+
+RemoteLogger::RemoteLogger(const ComboAddress& remote, uint16_t timeout, uint64_t maxQueuedEntries, uint8_t reconnectWaitTime): d_remote(remote), d_maxQueuedEntries(maxQueuedEntries), d_timeout(timeout), d_reconnectWaitTime(reconnectWaitTime), d_thread(&RemoteLogger::worker, this)
+{
+#ifdef HAVE_PROTOBUF
+  reconnect();
+#else
+  throw new std::runtime_error("Remote logging requires protobuf support, which is not enabled.");
+#endif /* HAVE_PROTOBUF */
+}
+
+RemoteLogger::~RemoteLogger()
+{
+  if (d_socket >= 0)
+    close(d_socket);
+}
+
 void RemoteLogger::logQuery(const DNSQuestion& dq)
 {
 #ifdef HAVE_PROTOBUF
@@ -91,9 +146,7 @@ void RemoteLogger::logQuery(const DNSQuestion& dq)
   //cerr <<message.DebugString()<<endl;
   std::string str;
   message.SerializeToString(&str);
-  uint32_t len = htonl(str.length());
-  if (sendData((const char*) &len, sizeof(len)))
-    sendData(str.c_str(), str.length());
+  queueData(str);
   message.release_question();
 #endif /* HAVE_PROTOBUF */
 }
@@ -197,9 +250,7 @@ void RemoteLogger::logResponse(const DNSQuestion& dr)
   //cerr <<message.DebugString()<<endl;
   std::string str;
   message.SerializeToString(&str);
-  uint32_t len = htonl(str.length());
-  if (sendData((const char*) &len, sizeof(len)))
-    sendData(str.c_str(), str.length());
+  queueData(str);
   message.release_response();
 #endif /* HAVE_PROTOBUF */
 }
index 9a10c5d57d7eb85cfa609a4eb277674344322d76..0e031bc1ab6837f56728a0d436ff17ca43cc4d64 100644 (file)
@@ -1,22 +1,14 @@
 #pragma once
 #include "config.h"
 
+#include <condition_variable>
+#include <queue>
+
 class RemoteLogger
 {
 public:
-  RemoteLogger(const ComboAddress& remote): d_remote(remote)
-  {
-#ifdef HAVE_PROTOBUF
-    reconnect();
-#else
-    throw new std::runtime_error("Remote logging requires protobuf support, which is not enabled.");
-#endif /* HAVE_PROTOBUF */
-  }
-  ~RemoteLogger()
-  {
-    if (d_socket >= 0)
-      close(d_socket);
-  }
+  RemoteLogger(const ComboAddress& remote, uint16_t timeout=2, uint64_t maxQueuedEntries=100, uint8_t reconnectWaitTime=1);
+  ~RemoteLogger();
   void logQuery(const DNSQuestion& dq);
   void logResponse(const DNSQuestion& dr);
   std::string toString()
@@ -24,10 +16,19 @@ public:
     return d_remote.toStringWithPort();
   }
 private:
-  void reconnect();
+  bool reconnect();
   bool sendData(const char* buffer, size_t bufferSize);
+  void worker();
+  void queueData(const std::string& data);
 
+  std::queue<std::string> d_writeQueue;
+  std::mutex d_writeMutex;
+  std::condition_variable d_queueCond;
   ComboAddress d_remote;
+  uint64_t d_maxQueuedEntries;
   int d_socket{-1};
+  uint16_t d_timeout;
+  uint8_t d_reconnectWaitTime;
+  std::thread d_thread;
 };