#include "rec-lua-conf.hh"
#include "ednsoptions.hh"
+#ifdef HAVE_PROTOBUF
+#include <boost/uuid/uuid.hpp>
+#include <boost/uuid/uuid_generators.hpp>
+#include <boost/uuid/uuid_io.hpp>
+#include "dnsmessage.pb.h"
+#endif
+
#ifndef RECURSOR
#include "statbag.hh"
StatBag S;
__thread boost::circular_buffer<pair<DNSName, uint16_t> >* t_queryring, *t_servfailqueryring;
__thread shared_ptr<Regex>* t_traceRegex;
+#ifdef HAVE_PROTOBUF
+__thread boost::uuids::random_generator* t_uuidGenerator;
+#endif
+
NetmaskGroup g_ednssubnets;
SuffixMatchNode g_ednsdomains;
struct timeval d_now;
ComboAddress d_remote, d_local;
+#ifdef HAVE_PROTOBUF
+ boost::uuids::uuid d_uuid;
+#endif
bool d_tcp;
int d_socket;
int d_tag{0};
return "Exception making error message for exception";
}
+#ifdef HAVE_PROTOBUF
+static void protobufFillMessageFromDC(PBDNSMessage& message, const DNSComboWriter* dc)
+{
+ message.set_messageid(boost::uuids::to_string(dc->d_uuid));
+ message.set_socketfamily(dc->d_remote.sin4.sin_family == AF_INET ? PBDNSMessage_SocketFamily_INET : PBDNSMessage_SocketFamily_INET6);
+ message.set_socketprotocol(dc->d_tcp ? PBDNSMessage_SocketProtocol_TCP : PBDNSMessage_SocketProtocol_UDP);
+ if (dc->d_local.sin4.sin_family == AF_INET) {
+ message.set_to(&dc->d_local.sin4.sin_addr.s_addr, sizeof(dc->d_local.sin4.sin_addr.s_addr));
+ }
+ else if (dc->d_local.sin4.sin_family == AF_INET6) {
+ message.set_to(&dc->d_local.sin6.sin6_addr.s6_addr, sizeof(dc->d_local.sin6.sin6_addr.s6_addr));
+ }
+ if (dc->d_remote.sin4.sin_family == AF_INET) {
+ message.set_from(&dc->d_remote.sin4.sin_addr.s_addr, sizeof(dc->d_remote.sin4.sin_addr.s_addr));
+ }
+ else if (dc->d_remote.sin4.sin_family == AF_INET6) {
+ message.set_from(&dc->d_remote.sin6.sin6_addr.s6_addr, sizeof(dc->d_remote.sin6.sin6_addr.s6_addr));
+ }
+ struct timespec ts;
+ clock_gettime(CLOCK_REALTIME, &ts);
+ message.set_timesec(ts.tv_sec);
+ message.set_timeusec(ts.tv_nsec / 1000);
+ message.set_id(ntohs(dc->d_mdp.d_header.id));
+}
+
+static void protobufLogQuery(const std::shared_ptr<RemoteLogger>& logger, const DNSComboWriter* dc)
+{
+ PBDNSMessage message;
+ message.set_type(PBDNSMessage_Type_DNSQueryType);
+ message.set_inbytes(dc->d_query.length());
+ protobufFillMessageFromDC(message, dc);
+
+ PBDNSMessage_DNSQuestion question;
+ question.set_qname(dc->d_mdp.d_qname.toString());
+ question.set_qtype(dc->d_mdp.d_qtype);
+ question.set_qclass(dc->d_mdp.d_qclass);
+ message.set_allocated_question(&question);
+
+ cerr <<message.DebugString()<<endl;
+ std::string str;
+ message.SerializeToString(&str);
+ logger->queueData(str);
+ message.release_question();
+}
+
+static void protobufLogResponse(const std::shared_ptr<RemoteLogger>& logger, const DNSComboWriter* dc, size_t responseSize, PBDNSMessage_DNSResponse& protobufResponse)
+{
+ PBDNSMessage message;
+ message.set_type(PBDNSMessage_Type_DNSResponseType);
+ message.set_inbytes(responseSize);
+ protobufFillMessageFromDC(message, dc);
+
+ message.set_allocated_response(&protobufResponse);
+
+ cerr <<message.DebugString()<<endl;
+ std::string str;
+ message.SerializeToString(&str);
+ logger->queueData(str);
+ message.release_response();
+}
+#endif
+
void startDoResolve(void *p)
{
DNSComboWriter* dc=(DNSComboWriter *)p;
vector<uint8_t> packet;
auto luaconfsLocal = g_luaconfs.getLocal();
+#ifdef HAVE_PROTOBUF
+ PBDNSMessage_DNSResponse protobufResponse;
+ if(luaconfsLocal->protobufServer) {
+ protobufLogQuery(luaconfsLocal->protobufServer, dc);
+ }
+#endif
DNSPacketWriter pw(packet, dc->d_mdp.d_qname, dc->d_mdp.d_qtype, dc->d_mdp.d_qclass);
}
goto sendit; // need to jump over pw.commit
}
+#ifdef HAVE_PROTOBUF
+ if(luaconfsLocal->protobufServer && (i->d_type == QType::A || i->d_type == QType::AAAA)) {
+ PBDNSMessage_DNSResponse_DNSRR* pbRR = protobufResponse.add_rrs();
+ if(pbRR) {
+ pbRR->set_name(i->d_name.toString());
+ pbRR->set_type(i->d_type);
+ pbRR->set_class_(i->d_class);
+ pbRR->set_ttl(i->d_ttl);
+ if (i->d_type == QType::A) {
+ const ARecordContent& arc = dynamic_cast<const ARecordContent&>(*(i->d_content));
+ ComboAddress data = arc.getCA();
+ pbRR->set_rdata(&data.sin4.sin_addr.s_addr, sizeof(data.sin4.sin_addr.s_addr));
+ }
+ else if (i->d_type == QType::AAAA) {
+ const AAAARecordContent& arc = dynamic_cast<const AAAARecordContent&>(*(i->d_content));
+ ComboAddress data = arc.getCA();
+ pbRR->set_rdata(&data.sin6.sin6_addr.s6_addr, sizeof(data.sin6.sin6_addr.s6_addr));
+ }
+ }
+ }
+#endif
}
if(ret.size())
pw.commit();
g_rs.submitResponse(dc->d_mdp.d_qtype, packet.size(), !dc->d_tcp);
updateResponseStats(res, dc->d_remote, packet.size(), &dc->d_mdp.d_qname, dc->d_mdp.d_qtype);
+#ifdef HAVE_PROTOBUF
+ if (luaconfsLocal->protobufServer) {
+ protobufResponse.set_rcode(pw.getHeader()->rcode);
+ protobufLogResponse(luaconfsLocal->protobufServer, dc, packet.size(), protobufResponse);
+ }
+#endif
if(!dc->d_tcp) {
struct msghdr msgh;
struct iovec iov;
socklen_t len = dest.getSocklen();
getsockname(conn->getFD(), (sockaddr*)&dest, &len); // if this fails, we're ok with it
dc->setLocal(dest);
-
+#ifdef HAVE_PROTOBUF
+ dc->d_uuid = (*t_uuidGenerator)();
+#endif
if(dc->d_mdp.d_header.qr) {
delete dc;
g_stats.ignoredCount++;
dc->setRemote(&fromaddr);
dc->setLocal(destaddr);
dc->d_tcp=false;
+#ifdef HAVE_PROTOBUF
+ dc->d_uuid = (*t_uuidGenerator)();
+#endif
+
MT->makeThread(startDoResolve, (void*) dc); // deletes dc
return 0;
}
t_packetCache = new RecursorPacketCache();
+#ifdef HAVE_PROTOBUF
+ t_uuidGenerator = new boost::uuids::random_generator();
+#endif
L<<Logger::Warning<<"Done priming cache with root hints"<<endl;
t_pdl = new shared_ptr<RecursorLua4>();
--- /dev/null
+#include <unistd.h>
+
+#include "remote_logger.hh"
+
+bool RemoteLogger::reconnect()
+{
+ if (d_socket >= 0) {
+ close(d_socket);
+ }
+ try {
+ d_socket = SSocket(d_remote.sin4.sin_family, SOCK_STREAM, 0);
+ SConnect(d_socket, d_remote);
+ setNonBlocking(d_socket);
+ }
+ catch(const std::exception& e) {
+ std::cerr<<"Error connecting to remote logger "<<d_remote.toStringWithPort()<<": "<<e.what()<<std::endl;
+ return false;
+ }
+ return true;
+}
+
+bool RemoteLogger::sendData(const char* buffer, size_t bufferSize)
+{
+ size_t pos = 0;
+ while(pos < bufferSize) {
+ ssize_t written = write(d_socket, buffer + pos, bufferSize - pos);
+ if (written == -1) {
+ int res = errno;
+ if (res == EWOULDBLOCK || res == EAGAIN) {
+ return false;
+ }
+ else if (res != EINTR) {
+ reconnect();
+ return false;
+ }
+ }
+ else if (written == 0) {
+ reconnect();
+ return false;
+ }
+ else {
+ pos += (size_t) written;
+ }
+ }
+
+ return true;
+}
+
+void RemoteLogger::worker()
+{
+ while(true) {
+ std::string data;
+ {
+ std::unique_lock<std::mutex> lock(d_writeMutex);
+ d_queueCond.wait(lock, [this]{return (!d_writeQueue.empty()) || d_exiting;});
+ if (d_exiting) {
+ return;
+ }
+ data = d_writeQueue.front();
+ d_writeQueue.pop();
+ }
+
+ try {
+ uint16_t len = data.length();
+ len = htons(len);
+ writen2WithTimeout(d_socket, &len, sizeof(len), (int) d_timeout);
+ writen2WithTimeout(d_socket, data.c_str(), data.length(), (int) d_timeout);
+ }
+ catch(const std::runtime_error& e) {
+ //vinfolog("Error sending data to remote logger (%s): %s", d_remote.toStringWithPort(), e.what());
+ while (!reconnect()) {
+ sleep(d_reconnectWaitTime);
+ }
+ }
+ }
+}
+
+void RemoteLogger::queueData(const std::string& data)
+{
+ {
+ std::unique_lock<std::mutex> lock(d_writeMutex);
+ if (d_writeQueue.size() >= d_maxQueuedEntries) {
+ d_writeQueue.pop();
+ }
+ d_writeQueue.push(data);
+ }
+ d_queueCond.notify_one();
+}
+
+RemoteLogger::RemoteLogger(const ComboAddress& remote, uint16_t timeout, uint64_t maxQueuedEntries, uint8_t reconnectWaitTime): d_remote(remote), d_maxQueuedEntries(maxQueuedEntries), d_timeout(timeout), d_reconnectWaitTime(reconnectWaitTime), d_thread(&RemoteLogger::worker, this)
+{
+ reconnect();
+}
+
+RemoteLogger::~RemoteLogger()
+{
+ d_exiting = true;
+ if (d_socket >= 0)
+ close(d_socket);
+ d_thread.join();
+}
+