From: Pieter Lexis Date: Thu, 12 Apr 2018 13:26:27 +0000 (+0200) Subject: ixfrdist: Use workers to handle TCP connections X-Git-Tag: dnsdist-1.3.1~148^2~7 X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=a29695955c96667e62d5ae97c663f7cf84d0a749;p=pdns ixfrdist: Use workers to handle TCP connections --- diff --git a/pdns/ixfrdist.cc b/pdns/ixfrdist.cc index d50fc5dfd..6a76b0e68 100644 --- a/pdns/ixfrdist.cc +++ b/pdns/ixfrdist.cc @@ -31,6 +31,8 @@ #include #include #include +#include +#include #include "ixfr.hh" #include "ixfrutils.hh" #include "resolver.hh" @@ -63,6 +65,11 @@ set g_domains; std::map g_soas; std::mutex g_soas_mutex; +// Condition variable for TCP handling +std::condition_variable g_tcpHandlerCV; +std::queue> g_tcpRequestFDs; +std::mutex g_tcpRequestFDsMutex; + using namespace boost::multi_index; namespace po = boost::program_options; @@ -625,91 +632,130 @@ void handleTCPRequest(int fd, boost::any&) { return; } - char buf[4096]; - ssize_t res; - try { - uint16_t toRead; - readn2(cfd, &toRead, sizeof(toRead)); - toRead = std::min(ntohs(toRead), static_cast(sizeof(buf))); - res = readn2WithTimeout(cfd, &buf, toRead, 2); - } catch (runtime_error &e) { - cerr<<"[WARNING] Could not read message from "< lg(g_tcpRequestFDsMutex); + g_tcpRequestFDs.push({cfd, saddr}); } + g_tcpHandlerCV.notify_one(); +} - try { - MOADNSParser mdp(true, string(buf, res)); +/* Thread to handle TCP traffic + */ +void tcpWorker(int tid) { + string prefix = "TCP Worker " + std::to_string(tid) + ": "; + + while(true) { + if (g_debug) { + cerr<<"[DEBUG] "< lk(g_tcpRequestFDsMutex); + g_tcpHandlerCV.wait(lk, []{return g_tcpRequestFDs.size() || g_exiting ;}); + if (g_exiting) { + if (g_debug) { + cerr<<"[DEBUG] "<(sizeof(buf))); + res = readn2WithTimeout(cfd, &buf, toRead, 2); + } catch (runtime_error &e) { + cerr<<"[WARNING] "<> packets; - if (mdp.d_qtype == QType::SOA) { - vector packet; - bool ret = makeSOAPacket(mdp, packet); - if (!ret) { + try { + MOADNSParser mdp(true, string(buf, res)); + + if (!checkQuery(mdp, saddr, false)) { close(cfd); - return; + continue; } - packets.push_back(packet); - } - if (mdp.d_qtype == QType::AXFR) { - if (!makeAXFRPackets(mdp, packets)) { - close(cfd); - return; + vector> packets; + if (mdp.d_qtype == QType::SOA) { + vector packet; + bool ret = makeSOAPacket(mdp, packet); + if (!ret) { + close(cfd); + continue; + } + packets.push_back(packet); } - } - if (mdp.d_qtype == QType::IXFR) { - /* RFC 1995 section 3: - * The IXFR query packet format is the same as that of a normal DNS - * query, but with the query type being IXFR and the authority section - * containing the SOA record of client's version of the zone. - */ - shared_ptr clientSOA; - for (auto &answer : mdp.d_answers) { - // from dnsparser.hh: - // typedef vector > answers_t; - if (answer.first.d_type == QType::SOA && answer.first.d_place == DNSResourceRecord::AUTHORITY) { - clientSOA = getRR(answer.first); - if (clientSOA != nullptr) { - break; + if (mdp.d_qtype == QType::AXFR) { + if (!makeAXFRPackets(mdp, packets)) { + close(cfd); + continue; + } + } + + if (mdp.d_qtype == QType::IXFR) { + /* RFC 1995 section 3: + * The IXFR query packet format is the same as that of a normal DNS + * query, but with the query type being IXFR and the authority section + * containing the SOA record of client's version of the zone. + */ + shared_ptr clientSOA; + for (auto &answer : mdp.d_answers) { + // from dnsparser.hh: + // typedef vector > answers_t; + if (answer.first.d_type == QType::SOA && answer.first.d_place == DNSResourceRecord::AUTHORITY) { + clientSOA = getRR(answer.first); + if (clientSOA != nullptr) { + break; + } } + } /* for (auto const &answer : mdp.d_answers) */ + + if (clientSOA == nullptr) { + cerr<<"[WARNING] "<()->default_value("127.0.0.1:5300"), "server address") ("work-dir", po::value()->default_value("."), "Directory for storing AXFR and IXFR data") ("keep", po::value()->default_value(KEEP_DEFAULT), "Number of old zone versions to retain") - ("axfr-timeout", po::value()->default_value(AXFRTIMEOUT_DEFAULT), "Timeout in seconds for an AXFR to complete") + ("axfr-timeout", po::value()->default_value(AXFRTIMEOUT_DEFAULT), "Timeout in seconds for an inbound AXFR to complete") + ("tcp-out-threads", po::value()->default_value(10), "Number of maximum simultaneous outbound TCP connections") ; po::options_description alloptions; po::options_description hidden("hidden options"); @@ -920,6 +967,7 @@ int main(int argc, char** argv) { signal(SIGTERM, handleSignal); signal(SIGINT, handleSignal); signal(SIGSTOP, handleSignal); + signal(SIGPIPE, SIG_IGN); // Init the things we need reportAllTypes(); @@ -930,6 +978,10 @@ int main(int argc, char** argv) { cout<<"[INFO] IXFR distributor starting up!"<()]; + for (int i=0; i(); i++) { + tcpHandlers[i] = std::thread(tcpWorker, i); + } struct timeval now; for(;;) { @@ -950,6 +1002,10 @@ int main(int argc, char** argv) { } } ut.join(); + g_tcpHandlerCV.notify_all(); + for (int i=0; i(); i++) { + tcpHandlers[i].join(); + } if (g_verbose) { cerr<<"[INFO] IXFR distributor stopped"<