From aa7929a38bbd745705acead4a3ac6d1edd6b6b8a Mon Sep 17 00:00:00 2001 From: Remi Gacogne Date: Wed, 30 Mar 2016 13:24:35 +0200 Subject: [PATCH] rec: Add protobuf support --- pdns/dnsmessage.proto | 3 +- pdns/pdns_recursor.cc | 120 ++++++++++++++++++- pdns/rec-lua-conf.cc | 21 ++++ pdns/rec-lua-conf.hh | 2 + pdns/recursordist/Makefile.am | 12 +- pdns/recursordist/configure.ac | 1 + pdns/recursordist/dnsmessage.proto | 1 + pdns/recursordist/m4/pdns_enable_protobuf.m4 | 1 + pdns/recursordist/remote_logger.cc | 1 + pdns/recursordist/remote_logger.hh | 1 + pdns/remote_logger.cc | 102 ++++++++++++++++ pdns/remote_logger.hh | 36 ++++++ 12 files changed, 298 insertions(+), 3 deletions(-) create mode 120000 pdns/recursordist/dnsmessage.proto create mode 120000 pdns/recursordist/m4/pdns_enable_protobuf.m4 create mode 120000 pdns/recursordist/remote_logger.cc create mode 120000 pdns/recursordist/remote_logger.hh create mode 100644 pdns/remote_logger.cc create mode 100644 pdns/remote_logger.hh diff --git a/pdns/dnsmessage.proto b/pdns/dnsmessage.proto index 356d07a3b..dde51d0eb 100644 --- a/pdns/dnsmessage.proto +++ b/pdns/dnsmessage.proto @@ -38,9 +38,10 @@ message PBDNSMessage { optional uint32 class = 3; optional uint32 ttl = 4; optional bytes rdata = 5; - } + } optional uint32 rcode = 1; repeated DNSRR rrs = 2; + optional string appliedPolicy = 3; } optional DNSResponse response = 13; diff --git a/pdns/pdns_recursor.cc b/pdns/pdns_recursor.cc index 39adbfd06..cab442fc1 100644 --- a/pdns/pdns_recursor.cc +++ b/pdns/pdns_recursor.cc @@ -82,6 +82,13 @@ extern SortList g_sortlist; #include "rec-lua-conf.hh" #include "ednsoptions.hh" +#ifdef HAVE_PROTOBUF +#include +#include +#include +#include "dnsmessage.pb.h" +#endif + #ifndef RECURSOR #include "statbag.hh" StatBag S; @@ -102,6 +109,10 @@ __thread addrringbuf_t* t_remotes, *t_servfailremotes, *t_largeanswerremotes; __thread boost::circular_buffer >* t_queryring, *t_servfailqueryring; __thread shared_ptr* t_traceRegex; +#ifdef HAVE_PROTOBUF +__thread boost::uuids::random_generator* t_uuidGenerator; +#endif + NetmaskGroup g_ednssubnets; SuffixMatchNode g_ednsdomains; @@ -186,6 +197,9 @@ struct DNSComboWriter { struct timeval d_now; ComboAddress d_remote, d_local; +#ifdef HAVE_PROTOBUF + boost::uuids::uuid d_uuid; +#endif bool d_tcp; int d_socket; int d_tag{0}; @@ -597,6 +611,68 @@ catch(...) return "Exception making error message for exception"; } +#ifdef HAVE_PROTOBUF +static void protobufFillMessageFromDC(PBDNSMessage& message, const DNSComboWriter* dc) +{ + message.set_messageid(boost::uuids::to_string(dc->d_uuid)); + message.set_socketfamily(dc->d_remote.sin4.sin_family == AF_INET ? PBDNSMessage_SocketFamily_INET : PBDNSMessage_SocketFamily_INET6); + message.set_socketprotocol(dc->d_tcp ? PBDNSMessage_SocketProtocol_TCP : PBDNSMessage_SocketProtocol_UDP); + if (dc->d_local.sin4.sin_family == AF_INET) { + message.set_to(&dc->d_local.sin4.sin_addr.s_addr, sizeof(dc->d_local.sin4.sin_addr.s_addr)); + } + else if (dc->d_local.sin4.sin_family == AF_INET6) { + message.set_to(&dc->d_local.sin6.sin6_addr.s6_addr, sizeof(dc->d_local.sin6.sin6_addr.s6_addr)); + } + if (dc->d_remote.sin4.sin_family == AF_INET) { + message.set_from(&dc->d_remote.sin4.sin_addr.s_addr, sizeof(dc->d_remote.sin4.sin_addr.s_addr)); + } + else if (dc->d_remote.sin4.sin_family == AF_INET6) { + message.set_from(&dc->d_remote.sin6.sin6_addr.s6_addr, sizeof(dc->d_remote.sin6.sin6_addr.s6_addr)); + } + struct timespec ts; + clock_gettime(CLOCK_REALTIME, &ts); + message.set_timesec(ts.tv_sec); + message.set_timeusec(ts.tv_nsec / 1000); + message.set_id(ntohs(dc->d_mdp.d_header.id)); +} + +static void protobufLogQuery(const std::shared_ptr& logger, const DNSComboWriter* dc) +{ + PBDNSMessage message; + message.set_type(PBDNSMessage_Type_DNSQueryType); + message.set_inbytes(dc->d_query.length()); + protobufFillMessageFromDC(message, dc); + + PBDNSMessage_DNSQuestion question; + question.set_qname(dc->d_mdp.d_qname.toString()); + question.set_qtype(dc->d_mdp.d_qtype); + question.set_qclass(dc->d_mdp.d_qclass); + message.set_allocated_question(&question); + + cerr <queueData(str); + message.release_question(); +} + +static void protobufLogResponse(const std::shared_ptr& logger, const DNSComboWriter* dc, size_t responseSize, PBDNSMessage_DNSResponse& protobufResponse) +{ + PBDNSMessage message; + message.set_type(PBDNSMessage_Type_DNSResponseType); + message.set_inbytes(responseSize); + protobufFillMessageFromDC(message, dc); + + message.set_allocated_response(&protobufResponse); + + cerr <queueData(str); + message.release_response(); +} +#endif + void startDoResolve(void *p) { DNSComboWriter* dc=(DNSComboWriter *)p; @@ -616,6 +692,12 @@ void startDoResolve(void *p) vector packet; auto luaconfsLocal = g_luaconfs.getLocal(); +#ifdef HAVE_PROTOBUF + PBDNSMessage_DNSResponse protobufResponse; + if(luaconfsLocal->protobufServer) { + protobufLogQuery(luaconfsLocal->protobufServer, dc); + } +#endif DNSPacketWriter pw(packet, dc->d_mdp.d_qname, dc->d_mdp.d_qtype, dc->d_mdp.d_qclass); @@ -881,6 +963,27 @@ void startDoResolve(void *p) } goto sendit; // need to jump over pw.commit } +#ifdef HAVE_PROTOBUF + if(luaconfsLocal->protobufServer && (i->d_type == QType::A || i->d_type == QType::AAAA)) { + PBDNSMessage_DNSResponse_DNSRR* pbRR = protobufResponse.add_rrs(); + if(pbRR) { + pbRR->set_name(i->d_name.toString()); + pbRR->set_type(i->d_type); + pbRR->set_class_(i->d_class); + pbRR->set_ttl(i->d_ttl); + if (i->d_type == QType::A) { + const ARecordContent& arc = dynamic_cast(*(i->d_content)); + ComboAddress data = arc.getCA(); + pbRR->set_rdata(&data.sin4.sin_addr.s_addr, sizeof(data.sin4.sin_addr.s_addr)); + } + else if (i->d_type == QType::AAAA) { + const AAAARecordContent& arc = dynamic_cast(*(i->d_content)); + ComboAddress data = arc.getCA(); + pbRR->set_rdata(&data.sin6.sin6_addr.s6_addr, sizeof(data.sin6.sin6_addr.s6_addr)); + } + } + } +#endif } if(ret.size()) pw.commit(); @@ -889,6 +992,12 @@ void startDoResolve(void *p) g_rs.submitResponse(dc->d_mdp.d_qtype, packet.size(), !dc->d_tcp); updateResponseStats(res, dc->d_remote, packet.size(), &dc->d_mdp.d_qname, dc->d_mdp.d_qtype); +#ifdef HAVE_PROTOBUF + if (luaconfsLocal->protobufServer) { + protobufResponse.set_rcode(pw.getHeader()->rcode); + protobufLogResponse(luaconfsLocal->protobufServer, dc, packet.size(), protobufResponse); + } +#endif if(!dc->d_tcp) { struct msghdr msgh; struct iovec iov; @@ -1085,7 +1194,9 @@ void handleRunningTCPQuestion(int fd, FDMultiplexer::funcparam_t& var) socklen_t len = dest.getSocklen(); getsockname(conn->getFD(), (sockaddr*)&dest, &len); // if this fails, we're ok with it dc->setLocal(dest); - +#ifdef HAVE_PROTOBUF + dc->d_uuid = (*t_uuidGenerator)(); +#endif if(dc->d_mdp.d_header.qr) { delete dc; g_stats.ignoredCount++; @@ -1288,6 +1399,10 @@ string* doProcessUDPQuestion(const std::string& question, const ComboAddress& fr dc->setRemote(&fromaddr); dc->setLocal(destaddr); dc->d_tcp=false; +#ifdef HAVE_PROTOBUF + dc->d_uuid = (*t_uuidGenerator)(); +#endif + MT->makeThread(startDoResolve, (void*) dc); // deletes dc return 0; } @@ -2516,6 +2631,9 @@ try t_packetCache = new RecursorPacketCache(); +#ifdef HAVE_PROTOBUF + t_uuidGenerator = new boost::uuids::random_generator(); +#endif L<(); diff --git a/pdns/rec-lua-conf.cc b/pdns/rec-lua-conf.cc index e05349f37..7a95e4861 100644 --- a/pdns/rec-lua-conf.cc +++ b/pdns/rec-lua-conf.cc @@ -13,6 +13,7 @@ #include "syncres.hh" #include "rpzloader.hh" #include "base64.hh" +#include "remote_logger.hh" GlobalStateHolder g_luaconfs; @@ -215,6 +216,26 @@ void loadRecursorLuaConfig(const std::string& fname) lci.dsAnchors.clear(); }); +#if HAVE_PROTOBUF + Lua.writeFunction("protobufServer", [&lci](const string& server_, const boost::optional timeout, const boost::optional maxQueuedEntries, const boost::optional reconnectWaitTime) { + try { + ComboAddress server(server_); + if (!lci.protobufServer) { + lci.protobufServer = std::make_shared(server, timeout ? *timeout : 2, maxQueuedEntries ? *maxQueuedEntries : 100, reconnectWaitTime ? *reconnectWaitTime : 1); + } + else { + theL()<toString()< dsAnchors; + std::shared_ptr protobufServer{nullptr}; }; extern GlobalStateHolder g_luaconfs; diff --git a/pdns/recursordist/Makefile.am b/pdns/recursordist/Makefile.am index 089a07f98..0652e8082 100644 --- a/pdns/recursordist/Makefile.am +++ b/pdns/recursordist/Makefile.am @@ -24,7 +24,6 @@ BUILT_SOURCES=htmlfiles.h htmlfiles.h: html/* ./incfiles > $@ - SUBDIRS=ext if LUA @@ -40,6 +39,7 @@ EXTRA_DIST = \ devpollmplexer.cc \ dnslabeltext.cc \ dnslabeltext.rl \ + dnsmessage.proto \ effective_tld_names.dat \ epollmplexer.cc \ kqueuemplexer.cc \ @@ -105,6 +105,7 @@ pdns_recursor_SOURCES = \ recpacketcache.cc recpacketcache.hh \ recursor_cache.cc recursor_cache.hh \ reczones.cc \ + remote_logger.cc remote_logger.hh \ resolver.hh resolver.cc \ responsestats.hh responsestats.cc \ root-addresses.hh \ @@ -168,6 +169,15 @@ pdns_recursor_SOURCES += \ portsmplexer.cc endif +dnsmessage.pb.cc: dnsmessage.proto + protoc --cpp_out=./ $< + +if HAVE_PROTOBUF +BUILT_SOURCES += dnsmessage.pb.cc +pdns_recursor_LDADD += $(PROTOBUF_LIBS) +nodist_pdns_recursor_SOURCES = dnsmessage.pb.cc dnsmessage.pb.h +endif + rec_control_SOURCES = \ arguments.cc arguments.hh \ dnsname.hh dnsname.cc \ diff --git a/pdns/recursordist/configure.ac b/pdns/recursordist/configure.ac index 7166d1c01..4210807e7 100644 --- a/pdns/recursordist/configure.ac +++ b/pdns/recursordist/configure.ac @@ -115,6 +115,7 @@ AS_IF([test "x$enable_hardening" != "xno"], [ PDNS_ENABLE_SANITIZERS PDNS_ENABLE_MALLOC_TRACE +PDNS_ENABLE_PROTOBUF PDNS_CHECK_PANDOC AC_SUBST(LIBS) diff --git a/pdns/recursordist/dnsmessage.proto b/pdns/recursordist/dnsmessage.proto new file mode 120000 index 000000000..90efb5124 --- /dev/null +++ b/pdns/recursordist/dnsmessage.proto @@ -0,0 +1 @@ +../dnsmessage.proto \ No newline at end of file diff --git a/pdns/recursordist/m4/pdns_enable_protobuf.m4 b/pdns/recursordist/m4/pdns_enable_protobuf.m4 new file mode 120000 index 000000000..31d73d873 --- /dev/null +++ b/pdns/recursordist/m4/pdns_enable_protobuf.m4 @@ -0,0 +1 @@ +../../../m4/pdns_enable_protobuf.m4 \ No newline at end of file diff --git a/pdns/recursordist/remote_logger.cc b/pdns/recursordist/remote_logger.cc new file mode 120000 index 000000000..d1bf076c7 --- /dev/null +++ b/pdns/recursordist/remote_logger.cc @@ -0,0 +1 @@ +../remote_logger.cc \ No newline at end of file diff --git a/pdns/recursordist/remote_logger.hh b/pdns/recursordist/remote_logger.hh new file mode 120000 index 000000000..a6051d1de --- /dev/null +++ b/pdns/recursordist/remote_logger.hh @@ -0,0 +1 @@ +../remote_logger.hh \ No newline at end of file diff --git a/pdns/remote_logger.cc b/pdns/remote_logger.cc new file mode 100644 index 000000000..4a237e98a --- /dev/null +++ b/pdns/remote_logger.cc @@ -0,0 +1,102 @@ +#include + +#include "remote_logger.hh" + +bool RemoteLogger::reconnect() +{ + if (d_socket >= 0) { + close(d_socket); + } + try { + d_socket = SSocket(d_remote.sin4.sin_family, SOCK_STREAM, 0); + SConnect(d_socket, d_remote); + setNonBlocking(d_socket); + } + catch(const std::exception& e) { + std::cerr<<"Error connecting to remote logger "< lock(d_writeMutex); + d_queueCond.wait(lock, [this]{return (!d_writeQueue.empty()) || d_exiting;}); + if (d_exiting) { + return; + } + data = d_writeQueue.front(); + d_writeQueue.pop(); + } + + try { + uint16_t len = data.length(); + len = htons(len); + writen2WithTimeout(d_socket, &len, sizeof(len), (int) d_timeout); + writen2WithTimeout(d_socket, data.c_str(), data.length(), (int) d_timeout); + } + catch(const std::runtime_error& e) { + //vinfolog("Error sending data to remote logger (%s): %s", d_remote.toStringWithPort(), e.what()); + while (!reconnect()) { + sleep(d_reconnectWaitTime); + } + } + } +} + +void RemoteLogger::queueData(const std::string& data) +{ + { + std::unique_lock lock(d_writeMutex); + if (d_writeQueue.size() >= d_maxQueuedEntries) { + d_writeQueue.pop(); + } + d_writeQueue.push(data); + } + d_queueCond.notify_one(); +} + +RemoteLogger::RemoteLogger(const ComboAddress& remote, uint16_t timeout, uint64_t maxQueuedEntries, uint8_t reconnectWaitTime): d_remote(remote), d_maxQueuedEntries(maxQueuedEntries), d_timeout(timeout), d_reconnectWaitTime(reconnectWaitTime), d_thread(&RemoteLogger::worker, this) +{ + reconnect(); +} + +RemoteLogger::~RemoteLogger() +{ + d_exiting = true; + if (d_socket >= 0) + close(d_socket); + d_thread.join(); +} + diff --git a/pdns/remote_logger.hh b/pdns/remote_logger.hh new file mode 100644 index 000000000..02c95f86f --- /dev/null +++ b/pdns/remote_logger.hh @@ -0,0 +1,36 @@ +#pragma once +#include "config.h" + +#include +#include +#include +#include + +#include "iputils.hh" + +class RemoteLogger +{ +public: + RemoteLogger(const ComboAddress& remote, uint16_t timeout=2, uint64_t maxQueuedEntries=100, uint8_t reconnectWaitTime=1); + ~RemoteLogger(); + void queueData(const std::string& data); + std::string toString() + { + return d_remote.toStringWithPort(); + } +private: + bool reconnect(); + bool sendData(const char* buffer, size_t bufferSize); + void worker(); + + std::queue d_writeQueue; + std::mutex d_writeMutex; + std::condition_variable d_queueCond; + ComboAddress d_remote; + uint64_t d_maxQueuedEntries; + int d_socket{-1}; + uint16_t d_timeout; + uint8_t d_reconnectWaitTime; + std::thread d_thread; + std::atomic d_exiting{false}; +}; -- 2.40.0