static thread_local unsigned int t_id;
static thread_local std::shared_ptr<Regex> t_traceRegex;
static thread_local std::unique_ptr<tcpClientCounts_t> t_tcpClientCounts;
+#ifdef HAVE_PROTOBUF
+static thread_local std::shared_ptr<RemoteLogger> t_protobufServer{nullptr};
+static thread_local std::shared_ptr<RemoteLogger> t_outgoingProtobufServer{nullptr};
+#endif /* HAVE_PROTOBUF */
thread_local std::unique_ptr<MT_t> MT; // the big MTasker
thread_local std::unique_ptr<MemRecursorCache> t_RC;
return true;
}
+#ifdef HAVE_PROTOBUF
+static std::shared_ptr<RemoteLogger> startProtobufServer(const ProtobufExportConfig& config, uint64_t generation)
+{
+ std::shared_ptr<RemoteLogger> result = nullptr;
+ try {
+ result = std::make_shared<RemoteLogger>(config.server, config.timeout, config.maxQueuedEntries, config.reconnectWaitTime, config.asyncConnect);
+ result->setGeneration(generation);
+ }
+ catch(const std::exception& e) {
+ g_log<<Logger::Error<<"Error while starting protobuf logger to '"<<config.server<<": "<<e.what()<<endl;
+ }
+ catch(const PDNSException& e) {
+ g_log<<Logger::Error<<"Error while starting protobuf logger to '"<<config.server<<": "<<e.reason<<endl;
+ }
+
+ return result;
+}
+
+static bool checkProtobufExport(LocalStateHolder<LuaConfigItems>& luaconfsLocal)
+{
+ if (!luaconfsLocal->protobufExportConfig.enabled) {
+ if (t_protobufServer != nullptr) {
+ t_protobufServer->stop();
+ t_protobufServer = nullptr;
+ }
+
+ return false;
+ }
+
+ /* if the server was not running, or if it was running according to a
+ previous configuration */
+ if (t_protobufServer == nullptr ||
+ t_protobufServer->getGeneration() < luaconfsLocal->generation) {
+
+ if (t_protobufServer) {
+ t_protobufServer->stop();
+ }
+
+ t_protobufServer = startProtobufServer(luaconfsLocal->protobufExportConfig, luaconfsLocal->generation);
+ }
+
+ return true;
+}
+
+static bool checkOutgoingProtobufExport(LocalStateHolder<LuaConfigItems>& luaconfsLocal)
+{
+ if (!luaconfsLocal->outgoingProtobufExportConfig.enabled) {
+ if (t_outgoingProtobufServer != nullptr) {
+ t_outgoingProtobufServer->stop();
+ t_outgoingProtobufServer = nullptr;
+ }
+
+ return false;
+ }
+
+ /* if the server was not running, or if it was running according to a
+ previous configuration */
+ if (t_outgoingProtobufServer == nullptr ||
+ t_outgoingProtobufServer->getGeneration() < luaconfsLocal->generation) {
+
+ if (t_outgoingProtobufServer) {
+ t_outgoingProtobufServer->stop();
+ }
+
+ t_outgoingProtobufServer = startProtobufServer(luaconfsLocal->outgoingProtobufExportConfig, luaconfsLocal->generation);
+ }
+
+ return true;
+}
+#endif /* HAVE_PROTOBUF */
+
static void startDoResolve(void *p)
{
DNSComboWriter* dc=(DNSComboWriter *)p;
bool wantsRPZ(true);
boost::optional<RecProtoBufMessage> pbMessage(boost::none);
#ifdef HAVE_PROTOBUF
- if (luaconfsLocal->protobufServer) {
+ if (checkProtobufExport(luaconfsLocal)) {
Netmask requestorNM(dc->d_source, dc->d_source.sin4.sin_family == AF_INET ? luaconfsLocal->protobufMaskV4 : luaconfsLocal->protobufMaskV6);
const ComboAddress& requestor = requestorNM.getMaskedNetwork();
pbMessage = RecProtoBufMessage(RecProtoBufMessage::Response);
#ifdef HAVE_PROTOBUF
sr.setInitialRequestId(dc->d_uuid);
+ sr.setOutgoingProtobufServer(t_outgoingProtobufServer);
#endif
sr.setQuerySource(dc->d_remote, g_useIncomingECS && !dc->d_ednssubnet.source.empty() ? boost::optional<const EDNSSubnetOpts&>(dc->d_ednssubnet) : boost::none);
needCommit = true;
#ifdef HAVE_PROTOBUF
- if(luaconfsLocal->protobufServer && (i->d_type == QType::A || i->d_type == QType::AAAA || i->d_type == QType::CNAME)) {
+ if(t_protobufServer && (i->d_type == QType::A || i->d_type == QType::AAAA || i->d_type == QType::CNAME)) {
pbMessage->addRR(*i);
}
#endif
g_rs.submitResponse(dc->d_mdp.d_qtype, packet.size(), !dc->d_tcp);
updateResponseStats(res, dc->d_source, packet.size(), &dc->d_mdp.d_qname, dc->d_mdp.d_qtype);
#ifdef HAVE_PROTOBUF
- if (luaconfsLocal->protobufServer && (!luaconfsLocal->protobufTaggedOnly || (appliedPolicy.d_name && !appliedPolicy.d_name->empty()) || !dc->d_policyTags.empty())) {
+ if (t_protobufServer && (!luaconfsLocal->protobufTaggedOnly || (appliedPolicy.d_name && !appliedPolicy.d_name->empty()) || !dc->d_policyTags.empty())) {
pbMessage->setBytes(packet.size());
pbMessage->setResponseCode(pw.getHeader()->rcode);
if (appliedPolicy.d_name) {
pbMessage->setQueryTime(dc->d_now.tv_sec, dc->d_now.tv_usec);
pbMessage->setRequestorId(dq.requestorId);
pbMessage->setDeviceId(dq.deviceId);
- protobufLogResponse(luaconfsLocal->protobufServer, *pbMessage);
+ protobufLogResponse(t_protobufServer, *pbMessage);
}
#endif
if(!dc->d_tcp) {
string deviceId;
#ifdef HAVE_PROTOBUF
auto luaconfsLocal = g_luaconfs.getLocal();
- if (luaconfsLocal->protobufServer) {
+ if (checkProtobufExport(luaconfsLocal)) {
needECS = true;
}
#endif
}
}
#ifdef HAVE_PROTOBUF
- if(luaconfsLocal->protobufServer || luaconfsLocal->outgoingProtobufServer) {
+ if(t_protobufServer || t_outgoingProtobufServer) {
dc->d_requestorId = requestorId;
dc->d_deviceId = deviceId;
dc->d_uuid = (*t_uuidGenerator)();
}
- if(luaconfsLocal->protobufServer) {
+ if(t_protobufServer) {
try {
const struct dnsheader* dh = (const struct dnsheader*) conn->data;
if (!luaconfsLocal->protobufTaggedOnly) {
- protobufLogQuery(luaconfsLocal->protobufServer, luaconfsLocal->protobufMaskV4, luaconfsLocal->protobufMaskV6, dc->d_uuid, dc->d_source, dc->d_destination, dc->d_ednssubnet.source, true, dh->id, conn->qlen, qname, qtype, qclass, dc->d_policyTags, dc->d_requestorId, dc->d_deviceId);
+ protobufLogQuery(t_protobufServer, luaconfsLocal->protobufMaskV4, luaconfsLocal->protobufMaskV6, dc->d_uuid, dc->d_source, dc->d_destination, dc->d_ednssubnet.source, true, dh->id, conn->qlen, qname, qtype, qclass, dc->d_policyTags, dc->d_requestorId, dc->d_deviceId);
}
}
catch(std::exception& e) {
#ifdef HAVE_PROTOBUF
boost::uuids::uuid uniqueId;
auto luaconfsLocal = g_luaconfs.getLocal();
- if (luaconfsLocal->protobufServer) {
+ if (checkProtobufExport(luaconfsLocal)) {
uniqueId = (*t_uuidGenerator)();
needECS = true;
- } else if (luaconfsLocal->outgoingProtobufServer) {
+ } else if (checkOutgoingProtobufExport(luaconfsLocal)) {
uniqueId = (*t_uuidGenerator)();
}
#endif
bool cacheHit = false;
boost::optional<RecProtoBufMessage> pbMessage(boost::none);
#ifdef HAVE_PROTOBUF
- if(luaconfsLocal->protobufServer) {
+ if(t_protobufServer) {
pbMessage = RecProtoBufMessage(DNSProtoBufMessage::DNSProtoBufMessageType::Response);
if (!luaconfsLocal->protobufTaggedOnly || !policyTags.empty()) {
- protobufLogQuery(luaconfsLocal->protobufServer, luaconfsLocal->protobufMaskV4, luaconfsLocal->protobufMaskV6, uniqueId, source, destination, ednssubnet.source, false, dh->id, question.size(), qname, qtype, qclass, policyTags, requestorId, deviceId);
+ protobufLogQuery(t_protobufServer, luaconfsLocal->protobufMaskV4, luaconfsLocal->protobufMaskV6, uniqueId, source, destination, ednssubnet.source, false, dh->id, question.size(), qname, qtype, qclass, policyTags, requestorId, deviceId);
}
}
#endif /* HAVE_PROTOBUF */
if (cacheHit) {
#ifdef HAVE_PROTOBUF
- if(luaconfsLocal->protobufServer && (!luaconfsLocal->protobufTaggedOnly || !pbMessage->getAppliedPolicy().empty() || !pbMessage->getPolicyTags().empty())) {
+ if(t_protobufServer && (!luaconfsLocal->protobufTaggedOnly || !pbMessage->getAppliedPolicy().empty() || !pbMessage->getPolicyTags().empty())) {
Netmask requestorNM(source, source.sin4.sin_family == AF_INET ? luaconfsLocal->protobufMaskV4 : luaconfsLocal->protobufMaskV6);
const ComboAddress& requestor = requestorNM.getMaskedNetwork();
pbMessage->update(uniqueId, &requestor, &destination, false, dh->id);
pbMessage->setQueryTime(g_now.tv_sec, g_now.tv_usec);
pbMessage->setRequestorId(requestorId);
pbMessage->setDeviceId(deviceId);
- protobufLogResponse(luaconfsLocal->protobufServer, *pbMessage);
+ protobufLogResponse(t_protobufServer, *pbMessage);
}
#endif /* HAVE_PROTOBUF */
if(!g_quiet)
dc->d_ttlCap = ttlCap;
dc->d_variable = variable;
#ifdef HAVE_PROTOBUF
- if (luaconfsLocal->protobufServer || luaconfsLocal->outgoingProtobufServer) {
+ if (t_protobufServer || t_outgoingProtobufServer) {
dc->d_uuid = uniqueId;
}
dc->d_requestorId = requestorId;
MT=std::unique_ptr<MTasker<PacketID,string> >(new MTasker<PacketID,string>(::arg().asNum("stack-size")));
+#ifdef HAVE_PROTOBUF
+ /* start protobuf export threads if needed */
+ auto luaconfsLocal = g_luaconfs.getLocal();
+ checkProtobufExport(luaconfsLocal);
+ checkOutgoingProtobufExport(luaconfsLocal);
+#endif /* HAVE_PROTOBUF */
+
PacketID pident;
t_fdm=getMultiplexer();
if(!ifs)
throw PDNSException("Cannot open file '"+fname+"': "+strerror(errno));
+ auto luaconfsLocal = g_luaconfs.getLocal();
+ lci.generation = luaconfsLocal->generation + 1;
+
Lua.writeFunction("clearSortlist", [&lci]() { lci.sortlist.clear(); });
/* we can get: "1.2.3.4"
Lua.writeFunction("protobufServer", [&lci, checkOnly](const string& server_, const boost::optional<uint16_t> timeout, const boost::optional<uint64_t> maxQueuedEntries, const boost::optional<uint8_t> reconnectWaitTime, const boost::optional<uint8_t> maskV4, boost::optional<uint8_t> maskV6, boost::optional<bool> asyncConnect, boost::optional<bool> taggedOnly) {
try {
ComboAddress server(server_);
- if (!lci.protobufServer) {
+ if (!lci.protobufExportConfig.enabled) {
+
+ lci.protobufExportConfig.enabled = true;
+
if (!checkOnly) {
- lci.protobufServer = std::make_shared<RemoteLogger>(server, timeout ? *timeout : 2, maxQueuedEntries ? *maxQueuedEntries : 100, reconnectWaitTime ? *reconnectWaitTime : 1, asyncConnect ? *asyncConnect : false);
+ lci.protobufExportConfig.server = server;
+
+ if (timeout) {
+ lci.protobufExportConfig.timeout = *timeout;
+ }
+
+ if (maxQueuedEntries) {
+ lci.protobufExportConfig.maxQueuedEntries = *maxQueuedEntries;
+ }
+
+ if (reconnectWaitTime) {
+ lci.protobufExportConfig.reconnectWaitTime = *reconnectWaitTime;
+ }
+
+ if (asyncConnect) {
+ lci.protobufExportConfig.asyncConnect = *asyncConnect;
+ }
}
if (maskV4) {
}
}
else {
- g_log<<Logger::Error<<"Only one protobuf server can be configured, we already have "<<lci.protobufServer->toString()<<endl;
+ g_log<<Logger::Error<<"Only one protobuf server can be configured, we already have "<<lci.protobufExportConfig.server.toString()<<endl;
}
}
catch(std::exception& e) {
Lua.writeFunction("outgoingProtobufServer", [&lci, checkOnly](const string& server_, const boost::optional<uint16_t> timeout, const boost::optional<uint64_t> maxQueuedEntries, const boost::optional<uint8_t> reconnectWaitTime, boost::optional<bool> asyncConnect) {
try {
ComboAddress server(server_);
- if (!lci.outgoingProtobufServer) {
+ if (!lci.outgoingProtobufExportConfig.enabled) {
+
+ lci.outgoingProtobufExportConfig.enabled = true;
+
if (!checkOnly) {
- lci.outgoingProtobufServer = std::make_shared<RemoteLogger>(server, timeout ? *timeout : 2, maxQueuedEntries ? *maxQueuedEntries : 100, reconnectWaitTime ? *reconnectWaitTime : 1, asyncConnect ? *asyncConnect : false);
+ lci.outgoingProtobufExportConfig.server = server;
+
+ if (timeout) {
+ lci.outgoingProtobufExportConfig.timeout = *timeout;
+ }
+
+ if (maxQueuedEntries) {
+ lci.outgoingProtobufExportConfig.maxQueuedEntries = *maxQueuedEntries;
+ }
+
+ if (reconnectWaitTime) {
+ lci.outgoingProtobufExportConfig.reconnectWaitTime = *reconnectWaitTime;
+ }
+
+ if (asyncConnect) {
+ lci.outgoingProtobufExportConfig.asyncConnect = *asyncConnect;
+ }
}
}
else {
- g_log<<Logger::Error<<"Only one protobuf server can be configured, we already have "<<lci.protobufServer->toString()<<endl;
+ g_log<<Logger::Error<<"Only one protobuf server can be configured, we already have "<<lci.outgoingProtobufExportConfig.server.toString()<<endl;
}
}
catch(std::exception& e) {