From: Otto Moerbeek Date: Wed, 18 Sep 2019 10:01:01 +0000 (+0200) Subject: Allow multiple simulaneous incoming TCP queries over a connection. X-Git-Tag: dnsdist-1.4.0-rc4~8^2~6 X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=d5c6ec956fc9c127ad0779af804e2b0ecc6c912f;p=pdns Allow multiple simulaneous incoming TCP queries over a connection. Answers are sent out the moment the become available, so not necesarily in the same order as received. There's a limit on how many queries per TCP induced connection we may have in flight. --- diff --git a/pdns/pdns_recursor.cc b/pdns/pdns_recursor.cc index 3637d11ae..e5a781134 100644 --- a/pdns/pdns_recursor.cc +++ b/pdns/pdns_recursor.cc @@ -746,7 +746,7 @@ static void writePid(void) } } -TCPConnection::TCPConnection(int fd, const ComboAddress& addr) : data(2, 0), d_remote(addr), d_fd(fd) +TCPConnection::TCPConnection(int fd, const ComboAddress& addr) : data(2, 0), d_remote(addr), d_requestsInFlight(0), d_fd(fd) { ++s_currentConnections; (*t_tcpClientCounts)[d_remote]++; @@ -767,6 +767,8 @@ TCPConnection::~TCPConnection() --s_currentConnections; } +int TCPConnection::s_maxInFlight = 10; + AtomicCounter TCPConnection::s_currentConnections; static void handleRunningTCPQuestion(int fd, FDMultiplexer::funcparam_t& var); @@ -1760,12 +1762,16 @@ static void startDoResolve(void *p) dc->d_socket = -1; } else { - dc->d_tcpConnection->state=TCPConnection::BYTE0; Utility::gettimeofday(&g_now, 0); // needs to be updated struct timeval ttd = g_now; - ttd.tv_sec += g_tcpTimeout; - - t_fdm->addReadFD(dc->d_socket, handleRunningTCPQuestion, dc->d_tcpConnection, &ttd); + dc->d_tcpConnection->d_requestsInFlight--; + if (dc->d_tcpConnection->d_requestsInFlight == TCPConnection::s_maxInFlight - 1) { + //cerr << "Reenabling... " << dc->d_tcpConnection->d_requestsInFlight << ' ' << dc->d_socket << endl; + ttd.tv_sec += g_tcpTimeout; + t_fdm->addReadFD(dc->d_socket, handleRunningTCPQuestion, dc->d_tcpConnection, &ttd); + } else { + t_fdm->setReadTTD(dc->d_socket, ttd, g_tcpTimeout); + } } } } @@ -2002,7 +2008,11 @@ static void handleRunningTCPQuestion(int fd, FDMultiplexer::funcparam_t& var) } conn->bytesread+=(uint16_t)bytes; if(conn->bytesread==conn->qlen) { - t_fdm->removeReadFD(fd); // should no longer awake ourselves when there is data to read + conn->d_requestsInFlight++; + if (conn->d_requestsInFlight >= TCPConnection::s_maxInFlight) { + //cerr << "Disabling... " << conn->d_requestsInFlight << ' ' << fd << endl; + t_fdm->removeReadFD(fd); // should no longer awake ourselves when there is data to read + } std::unique_ptr dc; try { @@ -2137,7 +2147,8 @@ static void handleRunningTCPQuestion(int fd, FDMultiplexer::funcparam_t& var) else { ++g_stats.qcounter; ++g_stats.tcpqcounter; - MT->makeThread(startDoResolve, dc.release()); // deletes dc, will set state to BYTE0 again + MT->makeThread(startDoResolve, dc.release()); // deletes dc + conn->state = TCPConnection::BYTE0; return; } } diff --git a/pdns/sdig.cc b/pdns/sdig.cc index a7e41de8e..fc0d36074 100644 --- a/pdns/sdig.cc +++ b/pdns/sdig.cc @@ -2,14 +2,14 @@ #include "config.h" #endif #include "dnsparser.hh" +#include "dnsrecords.hh" +#include "dnswriter.hh" #include "ednsoptions.hh" -#include "sstuff.hh" +#include "ednssubnet.hh" #include "misc.hh" -#include "dnswriter.hh" -#include "dnsrecords.hh" +#include "sstuff.hh" #include "statbag.hh" #include -#include "ednssubnet.hh" #ifdef HAVE_LIBCURL #include "minicurl.hh" @@ -17,60 +17,195 @@ StatBag S; -bool hidettl=false; +bool hidettl = false; string ttl(uint32_t ttl) { - if(hidettl) + if (hidettl) return "[ttl]"; else return std::to_string(ttl); } -void usage() { - cerr<<"sdig"<& packet, const string& q, const string& t, + bool dnssec, const boost::optional ednsnm, + bool recurse, uint16_t xpfcode, uint16_t xpfversion, + uint64_t xpfproto, char* xpfsrc, char* xpfdst) +{ + DNSPacketWriter pw(packet, DNSName(q), DNSRecordContent::TypeToNumber(t)); + + if (dnssec || ednsnm || getenv("SDIGBUFSIZE")) { + char* sbuf = getenv("SDIGBUFSIZE"); + int bufsize; + if (sbuf) + bufsize = atoi(sbuf); + else + bufsize = 2800; + DNSPacketWriter::optvect_t opts; + if (ednsnm) { + EDNSSubnetOpts eo; + eo.source = *ednsnm; + opts.push_back( + make_pair(EDNSOptionCode::ECS, makeEDNSSubnetOptsString(eo))); + } + + pw.addOpt(bufsize, 0, dnssec ? EDNSOpts::DNSSECOK : 0, opts); + pw.commit(); + } + + if (xpfcode) { + ComboAddress src(xpfsrc), dst(xpfdst); + pw.startRecord(DNSName("."), xpfcode, 0, 1, DNSResourceRecord::ADDITIONAL); + // xpf->toPacket(pw); + pw.xfr8BitInt(xpfversion); + pw.xfr8BitInt(xpfproto); + pw.xfrCAWithoutPort(xpfversion, src); + pw.xfrCAWithoutPort(xpfversion, dst); + pw.xfrCAPort(src); + pw.xfrCAPort(dst); + pw.commit(); + } + + if (recurse) { + pw.getHeader()->rd = true; + } +} + +void printReply(const string& reply, bool showflags, bool hidesoadetails) { - bool dnssec=false; - bool recurse=false; - bool tcp=false; - bool showflags=false; - bool hidesoadetails=false; - bool doh=false; + MOADNSParser mdp(false, reply); + cout << "Reply to question for qname='" << mdp.d_qname.toString() + << "', qtype=" << DNSRecordContent::NumberToType(mdp.d_qtype) << endl; + cout << "Rcode: " << mdp.d_header.rcode << " (" + << RCode::to_s(mdp.d_header.rcode) << "), RD: " << mdp.d_header.rd + << ", QR: " << mdp.d_header.qr; + cout << ", TC: " << mdp.d_header.tc << ", AA: " << mdp.d_header.aa + << ", opcode: " << mdp.d_header.opcode << endl; + + for (MOADNSParser::answers_t::const_iterator i = mdp.d_answers.begin(); + i != mdp.d_answers.end(); ++i) { + cout << i->first.d_place - 1 << "\t" << i->first.d_name.toString() << "\t" + << nameForClass(i->first.d_class, i->first.d_type) << "\t" + << DNSRecordContent::NumberToType(i->first.d_type); + if (i->first.d_class == QClass::IN) { + if (i->first.d_type == QType::RRSIG) { + string zoneRep = i->first.d_content->getZoneRepresentation(); + vector parts; + stringtok(parts, zoneRep); + cout << "\t" << ttl(i->first.d_ttl) << "\t" << parts[0] << " " + << parts[1] << " " << parts[2] << " " << parts[3] + << " [expiry] [inception] [keytag] " << parts[7] << " ...\n"; + continue; + } + if (!showflags && i->first.d_type == QType::NSEC3) { + string zoneRep = i->first.d_content->getZoneRepresentation(); + vector parts; + stringtok(parts, zoneRep); + cout << "\t" << ttl(i->first.d_ttl) << "\t" << parts[0] << " [flags] " + << parts[2] << " " << parts[3] << " " << parts[4]; + for (vector::iterator iter = parts.begin() + 5; + iter != parts.end(); ++iter) + cout << " " << *iter; + cout << "\n"; + continue; + } + if (i->first.d_type == QType::DNSKEY) { + string zoneRep = i->first.d_content->getZoneRepresentation(); + vector parts; + stringtok(parts, zoneRep); + cout << "\t" << ttl(i->first.d_ttl) << "\t" << parts[0] << " " + << parts[1] << " " << parts[2] << " ...\n"; + continue; + } + if (i->first.d_type == QType::SOA && hidesoadetails) { + string zoneRep = i->first.d_content->getZoneRepresentation(); + vector parts; + stringtok(parts, zoneRep); + cout << "\t" << ttl(i->first.d_ttl) << "\t" << parts[0] << " " + << parts[1] << " [serial] " << parts[3] << " " << parts[4] << " " + << parts[5] << " " << parts[6] << "\n"; + continue; + } + } + cout << "\t" << ttl(i->first.d_ttl) << "\t" + << i->first.d_content->getZoneRepresentation() << "\n"; + } + + EDNSOpts edo; + if (getEDNSOpts(mdp, &edo)) { + // cerr<<"Have "<>::const_iterator iter = edo.d_options.begin(); + iter != edo.d_options.end(); ++iter) { + if (iter->first == EDNSOptionCode::ECS) { // 'EDNS subnet' + EDNSSubnetOpts reso; + if (getEDNSSubnetOptsFromString(iter->second, &reso)) { + cerr << "EDNS Subnet response: " << reso.source.toString() + << ", scope: " << reso.scope.toString() + << ", family = " << reso.scope.getNetwork().sin4.sin_family + << endl; + } + } else if (iter->first == EDNSOptionCode::PADDING) { + cerr << "EDNS Padding size: " << (iter->second.size()) << endl; + } else { + cerr << "Have unknown option " << (int)iter->first << endl; + } + } + } +} + +int main(int argc, char** argv) +try { + bool dnssec = false; + bool recurse = false; + bool tcp = false; + bool showflags = false; + bool hidesoadetails = false; + bool doh = false; boost::optional ednsnm; uint16_t xpfcode = 0, xpfversion = 0, xpfproto = 0; char *xpfsrc = NULL, *xpfdst = NULL; - for(int i=1; i 5) { - for(int i=5; i packet; - - DNSPacketWriter pw(packet, DNSName(argv[3]), DNSRecordContent::TypeToNumber(argv[4])); - - if(dnssec || ednsnm || getenv("SDIGBUFSIZE")) - { - char *sbuf=getenv("SDIGBUFSIZE"); - int bufsize; - if(sbuf) - bufsize=atoi(sbuf); - else - bufsize=2800; - DNSPacketWriter::optvect_t opts; - if(ednsnm) { - EDNSSubnetOpts eo; - eo.source = *ednsnm; - opts.push_back(make_pair(EDNSOptionCode::ECS, makeEDNSSubnetOptsString(eo))); - } - - pw.addOpt(bufsize, 0, dnssec ? EDNSOpts::DNSSECOK : 0, opts); - pw.commit(); - } - - if(xpfcode) - { - ComboAddress src(xpfsrc), dst(xpfdst); - pw.startRecord(DNSName("."), xpfcode, 0, 1, DNSResourceRecord::ADDITIONAL); - // xpf->toPacket(pw); - pw.xfr8BitInt(xpfversion); - pw.xfr8BitInt(xpfproto); - pw.xfrCAWithoutPort(xpfversion, src); - pw.xfrCAWithoutPort(xpfversion, dst); - pw.xfrCAPort(src); - pw.xfrCAPort(dst); - pw.commit(); - } - - if(recurse) - { - pw.getHeader()->rd=true; - } - string reply; - string question(packet.begin(), packet.end()); ComboAddress dest; - if(*argv[1]=='h') { + if (*argv[1] == 'h') { doh = true; + } else { + dest = ComboAddress(argv[1] + (*argv[1] == '@'), atoi(argv[2])); } - else { - dest = ComboAddress(argv[1] + (*argv[1]=='@'), atoi(argv[2])); + + string name = string(argv[3]); + string type = string(argv[4]); + + vector> questions; + if (name == "-" && type == "-") { + if (!tcp) { + throw PDNSException("multi-query from stdin only supported for tcp"); + } + string line; + while (getline(std::cin, line)) { + auto fields = splitField(line, ' '); + + questions.push_back(make_pair(fields.first, fields.second)); + } + } else { + questions.push_back(make_pair(name, type)); } - if(doh) { + if (doh) { #ifdef HAVE_LIBCURL + vector packet; + fillPacket(packet, name, type, dnssec, ednsnm, recurse, xpfcode, xpfversion, + xpfproto, xpfsrc, xpfdst); MiniCurl mc; MiniCurl::MiniCurlHeaders mch; mch.insert(std::make_pair("Content-Type", "application/dns-message")); mch.insert(std::make_pair("Accept", "application/dns-message")); + string question(packet.begin(), packet.end()); reply = mc.postURL(argv[1], question, mch); + printReply(reply, showflags, hidesoadetails); #else throw PDNSException("please link sdig against libcurl for DoH support"); #endif - } - else if(tcp) { + } else if (tcp) { Socket sock(dest.sin4.sin_family, SOCK_STREAM); sock.connect(dest); - uint16_t len; - len = htons(packet.size()); - if(sock.write((char *) &len, 2) != 2) - throw PDNSException("tcp write failed"); - - sock.writen(question); - - if(sock.read((char *) &len, 2) != 2) - throw PDNSException("tcp read failed"); + for (const auto& it : questions) { + vector packet; + fillPacket(packet, it.first, it.second, dnssec, ednsnm, recurse, xpfcode, + xpfversion, xpfproto, xpfsrc, xpfdst); - len=ntohs(len); - std::unique_ptr creply(new char[len]); - int n=0; - int numread; - while(n packet; + fillPacket(packet, name, type, dnssec, ednsnm, recurse, xpfcode, xpfversion, + xpfproto, xpfsrc, xpfdst); + string question(packet.begin(), packet.end()); Socket sock(dest.sin4.sin_family, SOCK_DGRAM); sock.sendTo(question, dest); - int result=waitForData(sock.getHandle(), 10); - if(result < 0) - throw std::runtime_error("Error waiting for data: "+stringerror()); - if(!result) + int result = waitForData(sock.getHandle(), 10); + if (result < 0) + throw std::runtime_error("Error waiting for data: " + stringerror()); + if (!result) throw std::runtime_error("Timeout waiting for data"); sock.recvFrom(reply, dest); + printReply(reply, showflags, hidesoadetails); } - MOADNSParser mdp(false, reply); - cout<<"Reply to question for qname='"<first.d_place-1<<"\t"<first.d_name.toString()<<"\t"<first.d_class, i->first.d_type)<<"\t"<first.d_type); - if(i->first.d_class == QClass::IN) - { - if(i->first.d_type == QType::RRSIG) - { - string zoneRep = i->first.d_content->getZoneRepresentation(); - vector parts; - stringtok(parts, zoneRep); - cout<<"\t"<first.d_ttl)<<"\t"<< parts[0]<<" "<first.d_type == QType::NSEC3) - { - string zoneRep = i->first.d_content->getZoneRepresentation(); - vector parts; - stringtok(parts, zoneRep); - cout<<"\t"<first.d_ttl)<<"\t"<< parts[0]<<" [flags] "<::iterator iter = parts.begin()+5; iter != parts.end(); ++iter) - cout<<" "<<*iter; - cout<<"\n"; - continue; - } - if(i->first.d_type == QType::DNSKEY) - { - string zoneRep = i->first.d_content->getZoneRepresentation(); - vector parts; - stringtok(parts, zoneRep); - cout<<"\t"<first.d_ttl)<<"\t"<< parts[0]<<" "<first.d_type == QType::SOA && hidesoadetails) - { - string zoneRep = i->first.d_content->getZoneRepresentation(); - vector parts; - stringtok(parts, zoneRep); - cout<<"\t"<first.d_ttl)<<"\t"<first.d_ttl)<<"\t"<< i->first.d_content->getZoneRepresentation()<<"\n"; - } - - EDNSOpts edo; - if(getEDNSOpts(mdp, &edo)) { -// cerr<<"Have "< >::const_iterator iter = edo.d_options.begin(); - iter != edo.d_options.end(); - ++iter) { - if(iter->first == EDNSOptionCode::ECS) {// 'EDNS subnet' - EDNSSubnetOpts reso; - if(getEDNSSubnetOptsFromString(iter->second, &reso)) { - cerr<<"EDNS Subnet response: "<second.size())<first< d_requestsInFlight; static unsigned int getCurrentConnections() { return s_currentConnections; } + // The max number of concurent TCP queries we're willing to process + static int s_maxInFlight; private: const int d_fd; static AtomicCounter s_currentConnections; //!< total number of current TCP connections