]> granicus.if.org Git - pdns/commitdiff
rec: Add protobuf support
authorRemi Gacogne <remi.gacogne@powerdns.com>
Wed, 30 Mar 2016 11:24:35 +0000 (13:24 +0200)
committerRemi Gacogne <remi.gacogne@powerdns.com>
Wed, 30 Mar 2016 14:31:48 +0000 (16:31 +0200)
12 files changed:
pdns/dnsmessage.proto
pdns/pdns_recursor.cc
pdns/rec-lua-conf.cc
pdns/rec-lua-conf.hh
pdns/recursordist/Makefile.am
pdns/recursordist/configure.ac
pdns/recursordist/dnsmessage.proto [new symlink]
pdns/recursordist/m4/pdns_enable_protobuf.m4 [new symlink]
pdns/recursordist/remote_logger.cc [new symlink]
pdns/recursordist/remote_logger.hh [new symlink]
pdns/remote_logger.cc [new file with mode: 0644]
pdns/remote_logger.hh [new file with mode: 0644]

index 356d07a3bbd9cde1d2637326297df693691f0cb0..dde51d0ebcb1fc7f20b41a2b03605b6d39882255 100644 (file)
@@ -38,9 +38,10 @@ message PBDNSMessage {
       optional uint32 class = 3;
       optional uint32 ttl = 4;
       optional bytes rdata = 5;
-  }
+    }
     optional uint32 rcode = 1;
     repeated DNSRR rrs = 2;
+    optional string appliedPolicy = 3;
   }
 
   optional DNSResponse response = 13;
index 39adbfd064fd2dd41a01caeefbddb3da8215f838..cab442fc180d212e102dce2e5c4ffdd55d9abb75 100644 (file)
@@ -82,6 +82,13 @@ extern SortList g_sortlist;
 #include "rec-lua-conf.hh"
 #include "ednsoptions.hh"
 
+#ifdef HAVE_PROTOBUF
+#include <boost/uuid/uuid.hpp>
+#include <boost/uuid/uuid_generators.hpp>
+#include <boost/uuid/uuid_io.hpp>
+#include "dnsmessage.pb.h"
+#endif
+
 #ifndef RECURSOR
 #include "statbag.hh"
 StatBag S;
@@ -102,6 +109,10 @@ __thread addrringbuf_t* t_remotes, *t_servfailremotes, *t_largeanswerremotes;
 __thread boost::circular_buffer<pair<DNSName, uint16_t> >* t_queryring, *t_servfailqueryring;
 __thread shared_ptr<Regex>* t_traceRegex;
 
+#ifdef HAVE_PROTOBUF
+__thread boost::uuids::random_generator* t_uuidGenerator;
+#endif
+
 NetmaskGroup g_ednssubnets;
 SuffixMatchNode g_ednsdomains;
 
@@ -186,6 +197,9 @@ struct DNSComboWriter {
 
   struct timeval d_now;
   ComboAddress d_remote, d_local;
+#ifdef HAVE_PROTOBUF
+  boost::uuids::uuid d_uuid;
+#endif
   bool d_tcp;
   int d_socket;
   int d_tag{0};
@@ -597,6 +611,68 @@ catch(...)
   return "Exception making error message for exception";
 }
 
+#ifdef HAVE_PROTOBUF
+static void protobufFillMessageFromDC(PBDNSMessage& message, const DNSComboWriter* dc)
+{
+  message.set_messageid(boost::uuids::to_string(dc->d_uuid));
+  message.set_socketfamily(dc->d_remote.sin4.sin_family == AF_INET ? PBDNSMessage_SocketFamily_INET : PBDNSMessage_SocketFamily_INET6);
+  message.set_socketprotocol(dc->d_tcp ? PBDNSMessage_SocketProtocol_TCP : PBDNSMessage_SocketProtocol_UDP);
+  if (dc->d_local.sin4.sin_family == AF_INET) {
+    message.set_to(&dc->d_local.sin4.sin_addr.s_addr, sizeof(dc->d_local.sin4.sin_addr.s_addr));
+  }
+  else if (dc->d_local.sin4.sin_family == AF_INET6) {
+    message.set_to(&dc->d_local.sin6.sin6_addr.s6_addr, sizeof(dc->d_local.sin6.sin6_addr.s6_addr));
+  }
+  if (dc->d_remote.sin4.sin_family == AF_INET) {
+    message.set_from(&dc->d_remote.sin4.sin_addr.s_addr, sizeof(dc->d_remote.sin4.sin_addr.s_addr));
+  }
+  else if (dc->d_remote.sin4.sin_family == AF_INET6) {
+    message.set_from(&dc->d_remote.sin6.sin6_addr.s6_addr, sizeof(dc->d_remote.sin6.sin6_addr.s6_addr));
+  }
+  struct timespec ts;
+  clock_gettime(CLOCK_REALTIME, &ts);
+  message.set_timesec(ts.tv_sec);
+  message.set_timeusec(ts.tv_nsec / 1000);
+  message.set_id(ntohs(dc->d_mdp.d_header.id));
+}
+
+static void protobufLogQuery(const std::shared_ptr<RemoteLogger>& logger, const DNSComboWriter* dc)
+{
+  PBDNSMessage message;
+  message.set_type(PBDNSMessage_Type_DNSQueryType);
+  message.set_inbytes(dc->d_query.length());
+  protobufFillMessageFromDC(message, dc);
+
+  PBDNSMessage_DNSQuestion question;
+  question.set_qname(dc->d_mdp.d_qname.toString());
+  question.set_qtype(dc->d_mdp.d_qtype);
+  question.set_qclass(dc->d_mdp.d_qclass);
+  message.set_allocated_question(&question);
+
+  cerr <<message.DebugString()<<endl;
+  std::string str;
+  message.SerializeToString(&str);
+  logger->queueData(str);
+  message.release_question();
+}
+
+static void protobufLogResponse(const std::shared_ptr<RemoteLogger>& logger, const DNSComboWriter* dc, size_t responseSize, PBDNSMessage_DNSResponse& protobufResponse)
+{
+  PBDNSMessage message;
+  message.set_type(PBDNSMessage_Type_DNSResponseType);
+  message.set_inbytes(responseSize);
+  protobufFillMessageFromDC(message, dc);
+
+  message.set_allocated_response(&protobufResponse);
+
+  cerr <<message.DebugString()<<endl;
+  std::string str;
+  message.SerializeToString(&str);
+  logger->queueData(str);
+  message.release_response();
+}
+#endif
+
 void startDoResolve(void *p)
 {
   DNSComboWriter* dc=(DNSComboWriter *)p;
@@ -616,6 +692,12 @@ void startDoResolve(void *p)
     vector<uint8_t> packet;
 
     auto luaconfsLocal = g_luaconfs.getLocal();
+#ifdef HAVE_PROTOBUF
+    PBDNSMessage_DNSResponse protobufResponse;
+    if(luaconfsLocal->protobufServer) {
+      protobufLogQuery(luaconfsLocal->protobufServer, dc);
+    }
+#endif
 
     DNSPacketWriter pw(packet, dc->d_mdp.d_qname, dc->d_mdp.d_qtype, dc->d_mdp.d_qclass);
 
@@ -881,6 +963,27 @@ void startDoResolve(void *p)
             }
          goto sendit; // need to jump over pw.commit
        }
+#ifdef HAVE_PROTOBUF
+        if(luaconfsLocal->protobufServer && (i->d_type == QType::A || i->d_type == QType::AAAA)) {
+          PBDNSMessage_DNSResponse_DNSRR* pbRR = protobufResponse.add_rrs();
+          if(pbRR) {
+            pbRR->set_name(i->d_name.toString());
+            pbRR->set_type(i->d_type);
+            pbRR->set_class_(i->d_class);
+            pbRR->set_ttl(i->d_ttl);
+            if (i->d_type == QType::A) {
+              const ARecordContent& arc = dynamic_cast<const ARecordContent&>(*(i->d_content));
+              ComboAddress data = arc.getCA();
+              pbRR->set_rdata(&data.sin4.sin_addr.s_addr, sizeof(data.sin4.sin_addr.s_addr));
+            }
+            else if (i->d_type == QType::AAAA) {
+              const AAAARecordContent& arc = dynamic_cast<const AAAARecordContent&>(*(i->d_content));
+              ComboAddress data = arc.getCA();
+              pbRR->set_rdata(&data.sin6.sin6_addr.s6_addr, sizeof(data.sin6.sin6_addr.s6_addr));
+            }
+          }
+        }
+#endif
       }
       if(ret.size())
        pw.commit();
@@ -889,6 +992,12 @@ void startDoResolve(void *p)
 
     g_rs.submitResponse(dc->d_mdp.d_qtype, packet.size(), !dc->d_tcp);
     updateResponseStats(res, dc->d_remote, packet.size(), &dc->d_mdp.d_qname, dc->d_mdp.d_qtype);
+#ifdef HAVE_PROTOBUF
+    if (luaconfsLocal->protobufServer) {
+      protobufResponse.set_rcode(pw.getHeader()->rcode);
+      protobufLogResponse(luaconfsLocal->protobufServer, dc, packet.size(), protobufResponse);
+    }
+#endif
     if(!dc->d_tcp) {
       struct msghdr msgh;
       struct iovec iov;
@@ -1085,7 +1194,9 @@ void handleRunningTCPQuestion(int fd, FDMultiplexer::funcparam_t& var)
       socklen_t len = dest.getSocklen();
       getsockname(conn->getFD(), (sockaddr*)&dest, &len); // if this fails, we're ok with it
       dc->setLocal(dest);
-
+#ifdef HAVE_PROTOBUF
+      dc->d_uuid = (*t_uuidGenerator)();
+#endif
       if(dc->d_mdp.d_header.qr) {
         delete dc;
         g_stats.ignoredCount++;
@@ -1288,6 +1399,10 @@ string* doProcessUDPQuestion(const std::string& question, const ComboAddress& fr
   dc->setRemote(&fromaddr);
   dc->setLocal(destaddr);
   dc->d_tcp=false;
+#ifdef HAVE_PROTOBUF
+  dc->d_uuid = (*t_uuidGenerator)();
+#endif
+
   MT->makeThread(startDoResolve, (void*) dc); // deletes dc
   return 0;
 }
@@ -2516,6 +2631,9 @@ try
 
   t_packetCache = new RecursorPacketCache();
 
+#ifdef HAVE_PROTOBUF
+  t_uuidGenerator = new boost::uuids::random_generator();
+#endif
   L<<Logger::Warning<<"Done priming cache with root hints"<<endl;
 
   t_pdl = new shared_ptr<RecursorLua4>();
index e05349f37bd2d6c5d85dbe1d7398fb2a88f9fa5c..7a95e4861d54d0c17ac1edbedee8b6fd4fb5ebef 100644 (file)
@@ -13,6 +13,7 @@
 #include "syncres.hh"
 #include "rpzloader.hh"
 #include "base64.hh"
+#include "remote_logger.hh"
 
 GlobalStateHolder<LuaConfigItems> g_luaconfs; 
 
@@ -215,6 +216,26 @@ void loadRecursorLuaConfig(const std::string& fname)
         lci.dsAnchors.clear();
     });
 
+#if HAVE_PROTOBUF
+  Lua.writeFunction("protobufServer", [&lci](const string& server_, const boost::optional<uint16_t> timeout, const boost::optional<uint64_t> maxQueuedEntries, const boost::optional<uint8_t> reconnectWaitTime) {
+      try {
+       ComboAddress server(server_);
+        if (!lci.protobufServer) {
+          lci.protobufServer = std::make_shared<RemoteLogger>(server, timeout ? *timeout : 2, maxQueuedEntries ? *maxQueuedEntries : 100, reconnectWaitTime ? *reconnectWaitTime : 1);
+        }
+        else {
+          theL()<<Logger::Error<<"Only one protobuf server can be configured, we already have "<<lci.protobufServer->toString()<<endl;
+        }
+      }
+      catch(std::exception& e) {
+       theL()<<Logger::Error<<"Error while starting protobuf logger to '"<<server_<<": "<<e.what()<<endl;
+      }
+      catch(PDNSException& e) {
+        theL()<<Logger::Error<<"Error while starting protobuf logger to '"<<server_<<": "<<e.reason<<endl;
+      }
+    });
+#endif
+
   try {
     Lua.executeCode(ifs);
     g_luaconfs.setState(lci);
index 5d7ee964c5ec776b43075e6dc005ec10e530480b..a9349b1ab784cfe6e4684c10550e4a813611d1bb 100644 (file)
@@ -2,6 +2,7 @@
 #include "sholder.hh"
 #include "sortlist.hh"
 #include "filterpo.hh"
+#include "remote_logger.hh"
 
 class LuaConfigItems 
 {
@@ -10,6 +11,7 @@ public:
   SortList sortlist;
   DNSFilterEngine dfe;
   map<DNSName,DSRecordContent> dsAnchors;
+  std::shared_ptr<RemoteLogger> protobufServer{nullptr};
 };
 
 extern GlobalStateHolder<LuaConfigItems> g_luaconfs;
index 089a07f98a351577ae1e517f75ebeba3623a4dac..0652e808241e0bcfbfbaa1cb49454ea6d7fa5e40 100644 (file)
@@ -24,7 +24,6 @@ BUILT_SOURCES=htmlfiles.h
 htmlfiles.h: html/*
        ./incfiles > $@
 
-
 SUBDIRS=ext
 
 if LUA
@@ -40,6 +39,7 @@ EXTRA_DIST = \
        devpollmplexer.cc \
        dnslabeltext.cc \
        dnslabeltext.rl \
+       dnsmessage.proto \
        effective_tld_names.dat \
        epollmplexer.cc \
        kqueuemplexer.cc \
@@ -105,6 +105,7 @@ pdns_recursor_SOURCES = \
        recpacketcache.cc recpacketcache.hh \
        recursor_cache.cc recursor_cache.hh \
        reczones.cc \
+       remote_logger.cc remote_logger.hh \
        resolver.hh resolver.cc \
        responsestats.hh responsestats.cc \
        root-addresses.hh \
@@ -168,6 +169,15 @@ pdns_recursor_SOURCES += \
        portsmplexer.cc
 endif
 
+dnsmessage.pb.cc: dnsmessage.proto
+       protoc --cpp_out=./ $<
+
+if HAVE_PROTOBUF
+BUILT_SOURCES += dnsmessage.pb.cc
+pdns_recursor_LDADD += $(PROTOBUF_LIBS)
+nodist_pdns_recursor_SOURCES = dnsmessage.pb.cc dnsmessage.pb.h
+endif
+
 rec_control_SOURCES = \
        arguments.cc arguments.hh \
        dnsname.hh dnsname.cc \
index 7166d1c0110f51984da853be84e5bd4c308f4807..4210807e7561e4ffa95378e0dfec8979fcc1fdc0 100644 (file)
@@ -115,6 +115,7 @@ AS_IF([test "x$enable_hardening" != "xno"], [
 
 PDNS_ENABLE_SANITIZERS
 PDNS_ENABLE_MALLOC_TRACE
+PDNS_ENABLE_PROTOBUF
 PDNS_CHECK_PANDOC
 
 AC_SUBST(LIBS)
diff --git a/pdns/recursordist/dnsmessage.proto b/pdns/recursordist/dnsmessage.proto
new file mode 120000 (symlink)
index 0000000..90efb51
--- /dev/null
@@ -0,0 +1 @@
+../dnsmessage.proto
\ No newline at end of file
diff --git a/pdns/recursordist/m4/pdns_enable_protobuf.m4 b/pdns/recursordist/m4/pdns_enable_protobuf.m4
new file mode 120000 (symlink)
index 0000000..31d73d8
--- /dev/null
@@ -0,0 +1 @@
+../../../m4/pdns_enable_protobuf.m4
\ No newline at end of file
diff --git a/pdns/recursordist/remote_logger.cc b/pdns/recursordist/remote_logger.cc
new file mode 120000 (symlink)
index 0000000..d1bf076
--- /dev/null
@@ -0,0 +1 @@
+../remote_logger.cc
\ No newline at end of file
diff --git a/pdns/recursordist/remote_logger.hh b/pdns/recursordist/remote_logger.hh
new file mode 120000 (symlink)
index 0000000..a6051d1
--- /dev/null
@@ -0,0 +1 @@
+../remote_logger.hh
\ No newline at end of file
diff --git a/pdns/remote_logger.cc b/pdns/remote_logger.cc
new file mode 100644 (file)
index 0000000..4a237e9
--- /dev/null
@@ -0,0 +1,102 @@
+#include <unistd.h>
+
+#include "remote_logger.hh"
+
+bool RemoteLogger::reconnect()
+{
+  if (d_socket >= 0) {
+    close(d_socket);
+  }
+  try {
+    d_socket = SSocket(d_remote.sin4.sin_family, SOCK_STREAM, 0);
+    SConnect(d_socket, d_remote);
+    setNonBlocking(d_socket);
+  }
+  catch(const std::exception& e) {
+    std::cerr<<"Error connecting to remote logger "<<d_remote.toStringWithPort()<<": "<<e.what()<<std::endl;
+    return false;
+  }
+  return true;
+}
+
+bool RemoteLogger::sendData(const char* buffer, size_t bufferSize)
+{
+  size_t pos = 0;
+  while(pos < bufferSize) {
+    ssize_t written = write(d_socket, buffer + pos, bufferSize - pos);
+    if (written == -1) {
+      int res = errno;
+      if (res == EWOULDBLOCK || res == EAGAIN) {
+        return false;
+      }
+      else if (res != EINTR) {
+        reconnect();
+        return false;
+      }
+    }
+    else if (written == 0) {
+      reconnect();
+      return false;
+    }
+    else {
+      pos += (size_t) written;
+    }
+  }
+
+  return true;
+}
+
+void RemoteLogger::worker()
+{
+  while(true) {
+    std::string data;
+    {
+      std::unique_lock<std::mutex> lock(d_writeMutex);
+      d_queueCond.wait(lock, [this]{return (!d_writeQueue.empty()) || d_exiting;});
+      if (d_exiting) {
+        return;
+      }
+      data = d_writeQueue.front();
+      d_writeQueue.pop();
+    }
+
+    try {
+      uint16_t len = data.length();
+      len = htons(len);
+      writen2WithTimeout(d_socket, &len, sizeof(len), (int) d_timeout);
+      writen2WithTimeout(d_socket, data.c_str(), data.length(), (int) d_timeout);
+    }
+    catch(const std::runtime_error& e) {
+      //vinfolog("Error sending data to remote logger (%s): %s", d_remote.toStringWithPort(), e.what());
+      while (!reconnect()) {
+        sleep(d_reconnectWaitTime);
+      }
+    }
+  }
+}
+
+void RemoteLogger::queueData(const std::string& data)
+{
+  {
+    std::unique_lock<std::mutex> lock(d_writeMutex);
+    if (d_writeQueue.size() >= d_maxQueuedEntries) {
+      d_writeQueue.pop();
+    }
+    d_writeQueue.push(data);
+  }
+  d_queueCond.notify_one();
+}
+
+RemoteLogger::RemoteLogger(const ComboAddress& remote, uint16_t timeout, uint64_t maxQueuedEntries, uint8_t reconnectWaitTime): d_remote(remote), d_maxQueuedEntries(maxQueuedEntries), d_timeout(timeout), d_reconnectWaitTime(reconnectWaitTime), d_thread(&RemoteLogger::worker, this)
+{
+  reconnect();
+}
+
+RemoteLogger::~RemoteLogger()
+{
+  d_exiting = true;
+  if (d_socket >= 0)
+    close(d_socket);
+  d_thread.join();
+}
+
diff --git a/pdns/remote_logger.hh b/pdns/remote_logger.hh
new file mode 100644 (file)
index 0000000..02c95f8
--- /dev/null
@@ -0,0 +1,36 @@
+#pragma once
+#include "config.h"
+
+#include <atomic>
+#include <condition_variable>
+#include <queue>
+#include <thread>
+
+#include "iputils.hh"
+
+class RemoteLogger
+{
+public:
+  RemoteLogger(const ComboAddress& remote, uint16_t timeout=2, uint64_t maxQueuedEntries=100, uint8_t reconnectWaitTime=1);
+  ~RemoteLogger();
+  void queueData(const std::string& data);
+  std::string toString()
+  {
+    return d_remote.toStringWithPort();
+  }
+private:
+  bool reconnect();
+  bool sendData(const char* buffer, size_t bufferSize);
+  void worker();
+
+  std::queue<std::string> d_writeQueue;
+  std::mutex d_writeMutex;
+  std::condition_variable d_queueCond;
+  ComboAddress d_remote;
+  uint64_t d_maxQueuedEntries;
+  int d_socket{-1};
+  uint16_t d_timeout;
+  uint8_t d_reconnectWaitTime;
+  std::thread d_thread;
+  std::atomic<bool> d_exiting{false};
+};