From 8a5d505394eae5efa759728dc3ad6c0b34990408 Mon Sep 17 00:00:00 2001 From: bert hubert Date: Wed, 1 Apr 2015 11:52:57 +0200 Subject: [PATCH] split out the TCP part of dnsdist to a separate file --- pdns/Makefile.am | 1 + pdns/dnsdist-tcp.cc | 213 +++++++++++++++++++++++++++++++++++ pdns/dnsdist.cc | 205 --------------------------------- pdns/dnsdist.hh | 32 ++++++ pdns/dnsdistdist/Makefile.am | 1 + pdns/dnsdistdist/populate | 2 +- 6 files changed, 248 insertions(+), 206 deletions(-) create mode 100644 pdns/dnsdist-tcp.cc diff --git a/pdns/Makefile.am b/pdns/Makefile.am index a95c00d6f..33dd7bba6 100644 --- a/pdns/Makefile.am +++ b/pdns/Makefile.am @@ -560,6 +560,7 @@ dnsdist_SOURCES = \ base64.hh \ dnsdist.cc \ dnsdist-lua.cc \ + dnsdist-tcp.cc \ dnsdist-web.cc \ dnslabeltext.cc \ dnsname.cc dnsname.hh \ diff --git a/pdns/dnsdist-tcp.cc b/pdns/dnsdist-tcp.cc new file mode 100644 index 000000000..b7b59a189 --- /dev/null +++ b/pdns/dnsdist-tcp.cc @@ -0,0 +1,213 @@ +/* + PowerDNS Versatile Database Driven Nameserver + Copyright (C) 2013 - 2015 PowerDNS.COM BV + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License version 2 + as published by the Free Software Foundation + + Additionally, the license of this program contains a special + exception which allows to distribute the program in binary form when + it is linked against OpenSSL. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA +*/ + +#include "dnsdist.hh" +#include "dolog.hh" +#include +#include + +using std::thread; +using std::atomic; + +/* TCP: the grand design. + We forward 'messages' between clients and downstream servers. Messages are 65k bytes large, tops. + An answer might theoretically consist of multiple messages (for example, in the case of AXFR), initially + we will not go there. + + In a sense there is a strong symmetry between UDP and TCP, once a connection to a downstream has been setup. + This symmetry is broken because of head-of-line blocking within TCP though, necessitating additional connections + to guarantee performance. + + So the idea is to have a 'pool' of available downstream connections, and forward messages to/from them and never queue. + So whenever an answer comes in, we know where it needs to go. + + Let's start naively. +*/ + +int getTCPDownstream(policy_t policy, string pool, DownstreamState** ds, const ComboAddress& remote, const DNSName& qname, uint16_t qtype, dnsheader* dh) +{ + { + std::lock_guard lock(g_luamutex); + *ds = policy(getDownstreamCandidates(g_dstates.getCopy(), pool), remote, qname, qtype, dh).get(); + } + + vinfolog("TCP connecting to downstream %s", (*ds)->remote.toStringWithPort()); + int sock = SSocket((*ds)->remote.sin4.sin_family, SOCK_STREAM, 0); + SConnect(sock, (*ds)->remote); + return sock; +} + +bool getMsgLen(int fd, uint16_t* len) +try +{ + uint16_t raw; + int ret = readn2(fd, &raw, 2); + if(ret != 2) + return false; + *len = ntohs(raw); + return true; +} +catch(...) { + return false; +} + +bool putMsgLen(int fd, uint16_t len) +try +{ + uint16_t raw = htons(len); + int ret = writen2(fd, &raw, 2); + return ret==2; +} +catch(...) { + return false; +} + +struct ConnectionInfo +{ + int fd; + ComboAddress remote; +}; + +void* tcpClientThread(int pipefd); + + + // Should not be called simultaneously! +void TCPClientCollection::addTCPClientThread() +{ + vinfolog("Adding TCP Client thread"); + + int pipefds[2]; + if(pipe(pipefds) < 0) + unixDie("Creating pipe"); + + d_tcpclientthreads.push_back(pipefds[1]); + thread t1(tcpClientThread, pipefds[0]); + t1.detach(); + ++d_numthreads; +} + +TCPClientCollection g_tcpclientthreads; + + + +void* tcpClientThread(int pipefd) +{ + /* we get launched with a pipe on which we receive file descriptors from clients that we own + from that point on */ + int dsock = -1; + DownstreamState *ds=0; + + for(;;) { + ConnectionInfo* citmp, ci; + + readn2(pipefd, &citmp, sizeof(citmp)); + --g_tcpclientthreads.d_queued; + ci=*citmp; + delete citmp; + + uint16_t qlen, rlen; + string pool; // empty for now + try { + auto localPolicy = g_policy.getLocal(); + for(;;) { + if(!getMsgLen(ci.fd, &qlen)) + break; + + char query[qlen]; + readn2(ci.fd, query, qlen); + uint16_t qtype; + DNSName qname(query, qlen, 12, false, &qtype); + struct dnsheader* dh =(dnsheader*)query; + if(dsock == -1) { + dsock = getTCPDownstream(localPolicy->policy, pool, &ds, ci.remote, qname, qtype, dh); + } + else { + vinfolog("Reusing existing TCP connection to %s", ds->remote.toStringWithPort()); + } + ds->queries++; + ds->outstanding++; + + if(qtype == QType::AXFR) // XXX fixme we really need to do better + break; + + retry:; + if(!putMsgLen(dsock, qlen)) { + vinfolog("Downstream connection to %s died on us, getting a new one!", ds->remote.toStringWithPort()); + close(dsock); + dsock=getTCPDownstream(localPolicy->policy, pool, &ds, ci.remote, qname, qtype, dh); + goto retry; + } + + writen2(dsock, query, qlen); + + if(!getMsgLen(dsock, &rlen)) { + vinfolog("Downstream connection to %s died on us phase 2, getting a new one!", ds->remote.toStringWithPort()); + close(dsock); + dsock=getTCPDownstream(localPolicy->policy, pool, &ds, ci.remote, qname, qtype, dh); + goto retry; + } + + char answerbuffer[rlen]; + readn2(dsock, answerbuffer, rlen); + + putMsgLen(ci.fd, rlen); + writen2(ci.fd, answerbuffer, rlen); + } + } + catch(...){} + + vinfolog("Closing client connection with %s", ci.remote.toStringWithPort()); + close(ci.fd); + ci.fd=-1; + --ds->outstanding; + } + return 0; +} + + +/* spawn as many of these as required, they call Accept on a socket on which they will accept queries, and + they will hand off to worker threads & spawn more of them if required +*/ +void* tcpAcceptorThread(void* p) +{ + ClientState* cs = (ClientState*) p; + + ComboAddress remote; + remote.sin4.sin_family = cs->local.sin4.sin_family; + + g_tcpclientthreads.addTCPClientThread(); + + for(;;) { + try { + ConnectionInfo* ci = new ConnectionInfo; + ci->fd = SAccept(cs->tcpFD, remote); + + vinfolog("Got connection from %s", remote.toStringWithPort()); + + ci->remote = remote; + writen2(g_tcpclientthreads.getThread(), &ci, sizeof(ci)); + } + catch(...){} + } + + return 0; +} diff --git a/pdns/dnsdist.cc b/pdns/dnsdist.cc index 23178c336..b0b179f7a 100644 --- a/pdns/dnsdist.cc +++ b/pdns/dnsdist.cc @@ -161,13 +161,6 @@ DownstreamState::DownstreamState(const ComboAddress& remote_) } -struct ClientState -{ - ComboAddress local; - int udpFD; - int tcpFD; -}; - std::mutex g_luamutex; LuaContext g_lua; @@ -471,204 +464,6 @@ catch(...) return 0; } -/* TCP: the grand design. - We forward 'messages' between clients and downstream servers. Messages are 65k bytes large, tops. - An answer might theoretically consist of multiple messages (for example, in the case of AXFR), initially - we will not go there. - - In a sense there is a strong symmetry between UDP and TCP, once a connection to a downstream has been setup. - This symmetry is broken because of head-of-line blocking within TCP though, necessitating additional connections - to guarantee performance. - - So the idea is to have a 'pool' of available downstream connections, and forward messages to/from them and never queue. - So whenever an answer comes in, we know where it needs to go. - - Let's start naively. -*/ - -int getTCPDownstream(policy_t policy, string pool, DownstreamState** ds, const ComboAddress& remote, const DNSName& qname, uint16_t qtype, dnsheader* dh) -{ - { - std::lock_guard lock(g_luamutex); - *ds = policy(getDownstreamCandidates(g_dstates.getCopy(), pool), remote, qname, qtype, dh).get(); - } - - vinfolog("TCP connecting to downstream %s", (*ds)->remote.toStringWithPort()); - int sock = SSocket((*ds)->remote.sin4.sin_family, SOCK_STREAM, 0); - SConnect(sock, (*ds)->remote); - return sock; -} - -bool getMsgLen(int fd, uint16_t* len) -try -{ - uint16_t raw; - int ret = readn2(fd, &raw, 2); - if(ret != 2) - return false; - *len = ntohs(raw); - return true; -} -catch(...) { - return false; -} - -bool putMsgLen(int fd, uint16_t len) -try -{ - uint16_t raw = htons(len); - int ret = writen2(fd, &raw, 2); - return ret==2; -} -catch(...) { - return false; -} - -struct ConnectionInfo -{ - int fd; - ComboAddress remote; -}; - -void* tcpClientThread(int pipefd); - -class TCPClientCollection { - vector d_tcpclientthreads; - atomic d_pos; -public: - atomic d_queued, d_numthreads; - - TCPClientCollection() - { - d_tcpclientthreads.reserve(1024); - } - - int getThread() - { - int pos = d_pos++; - ++d_queued; - return d_tcpclientthreads[pos % d_numthreads]; - } - - // Should not be called simultaneously! - void addTCPClientThread() - { - vinfolog("Adding TCP Client thread"); - - int pipefds[2]; - if(pipe(pipefds) < 0) - unixDie("Creating pipe"); - - d_tcpclientthreads.push_back(pipefds[1]); - thread t1(tcpClientThread, pipefds[0]); - t1.detach(); - ++d_numthreads; - } -} g_tcpclientthreads; - - -void* tcpClientThread(int pipefd) -{ - /* we get launched with a pipe on which we receive file descriptors from clients that we own - from that point on */ - int dsock = -1; - DownstreamState *ds=0; - - for(;;) { - ConnectionInfo* citmp, ci; - - readn2(pipefd, &citmp, sizeof(citmp)); - --g_tcpclientthreads.d_queued; - ci=*citmp; - delete citmp; - - uint16_t qlen, rlen; - string pool; // empty for now - try { - auto localPolicy = g_policy.getLocal(); - for(;;) { - if(!getMsgLen(ci.fd, &qlen)) - break; - - char query[qlen]; - readn2(ci.fd, query, qlen); - uint16_t qtype; - DNSName qname(query, qlen, 12, false, &qtype); - struct dnsheader* dh =(dnsheader*)query; - if(dsock == -1) { - dsock = getTCPDownstream(localPolicy->policy, pool, &ds, ci.remote, qname, qtype, dh); - } - else { - vinfolog("Reusing existing TCP connection to %s", ds->remote.toStringWithPort()); - } - ds->queries++; - ds->outstanding++; - - if(qtype == QType::AXFR) // XXX fixme we really need to do better - break; - - retry:; - if(!putMsgLen(dsock, qlen)) { - vinfolog("Downstream connection to %s died on us, getting a new one!", ds->remote.toStringWithPort()); - close(dsock); - dsock=getTCPDownstream(localPolicy->policy, pool, &ds, ci.remote, qname, qtype, dh); - goto retry; - } - - writen2(dsock, query, qlen); - - if(!getMsgLen(dsock, &rlen)) { - vinfolog("Downstream connection to %s died on us phase 2, getting a new one!", ds->remote.toStringWithPort()); - close(dsock); - dsock=getTCPDownstream(localPolicy->policy, pool, &ds, ci.remote, qname, qtype, dh); - goto retry; - } - - char answerbuffer[rlen]; - readn2(dsock, answerbuffer, rlen); - - putMsgLen(ci.fd, rlen); - writen2(ci.fd, answerbuffer, rlen); - } - } - catch(...){} - - vinfolog("Closing client connection with %s", ci.remote.toStringWithPort()); - close(ci.fd); - ci.fd=-1; - --ds->outstanding; - } - return 0; -} - - -/* spawn as many of these as required, they call Accept on a socket on which they will accept queries, and - they will hand off to worker threads & spawn more of them if required -*/ -void* tcpAcceptorThread(void* p) -{ - ClientState* cs = (ClientState*) p; - - ComboAddress remote; - remote.sin4.sin_family = cs->local.sin4.sin_family; - - g_tcpclientthreads.addTCPClientThread(); - - for(;;) { - try { - ConnectionInfo* ci = new ConnectionInfo; - ci->fd = SAccept(cs->tcpFD, remote); - - vinfolog("Got connection from %s", remote.toStringWithPort()); - - ci->remote = remote; - writen2(g_tcpclientthreads.getThread(), &ci, sizeof(ci)); - } - catch(...){} - } - - return 0; -} bool upCheck(const ComboAddress& remote) try diff --git a/pdns/dnsdist.hh b/pdns/dnsdist.hh index e465c1198..8c15abf7c 100644 --- a/pdns/dnsdist.hh +++ b/pdns/dnsdist.hh @@ -167,6 +167,35 @@ struct Rings { extern Rings g_rings; // XXX locking for this is still substandard, queryRing and clientRing need RW lock +struct ClientState +{ + ComboAddress local; + int udpFD; + int tcpFD; +}; + +class TCPClientCollection { + std::vector d_tcpclientthreads; + std::atomic d_pos; +public: + std::atomic d_queued, d_numthreads; + + TCPClientCollection() + { + d_tcpclientthreads.reserve(1024); + } + + int getThread() + { + int pos = d_pos++; + ++d_queued; + return d_tcpclientthreads[pos % d_numthreads]; + } + void addTCPClientThread(); +}; + +extern TCPClientCollection g_tcpclientthreads; + struct DownstreamState { DownstreamState(const ComboAddress& remote_); @@ -278,3 +307,6 @@ std::unique_ptr make_unique(Args&&... args) return std::unique_ptr(new T(std::forward(args)...)); } void dnsdistWebserverThread(int sock, const ComboAddress& local, const string& password); +bool getMsgLen(int fd, uint16_t* len); +bool putMsgLen(int fd, uint16_t len); +void* tcpAcceptorThread(void* p); \ No newline at end of file diff --git a/pdns/dnsdistdist/Makefile.am b/pdns/dnsdistdist/Makefile.am index 46ce8fa8b..840714f05 100644 --- a/pdns/dnsdistdist/Makefile.am +++ b/pdns/dnsdistdist/Makefile.am @@ -18,6 +18,7 @@ dnsdist_SOURCES = \ dns.hh \ dnsdist.cc dnsdist.hh \ dnsdist-lua.cc \ + dnsdist-tcp.cc \ dnsdist-web.cc \ dnslabeltext.cc \ dnsname.cc dnsname.hh \ diff --git a/pdns/dnsdistdist/populate b/pdns/dnsdistdist/populate index 05a64fb62..cbe0b1f5e 100755 --- a/pdns/dnsdistdist/populate +++ b/pdns/dnsdistdist/populate @@ -4,7 +4,7 @@ ln -fs ../base32.hh ../base64.hh ../dnsdist.cc ../dnsdist.hh ../dnsdist-lua.cc . ../dnslabeltext.rl ../dnsname.cc ../dnsname.hh ../dnsparser.hh ../dnsrulactions.hh ../dnswriter.cc ../dnswriter.hh \ ../dolog.hh ../iputils.cc ../iputils.hh ../misc.cc ../misc.hh ../namespaces.hh \ ../pdnsexception.hh ../qtype.cc ../qtype.hh ../sholder.hh ../sodcrypto.cc ../sodcrypto.hh \ -../dnsdist-web.cc ../sstuff.hh . +../dnsdist-web.cc ../sstuff.hh ../dnsdist-tcp.cc . ln -fs ../README-dnsdist.md README.md -- 2.40.0