]> granicus.if.org Git - pdns/commitdiff
split out the TCP part of dnsdist to a separate file
authorbert hubert <bert.hubert@netherlabs.nl>
Wed, 1 Apr 2015 09:52:57 +0000 (11:52 +0200)
committerbert hubert <bert.hubert@netherlabs.nl>
Wed, 1 Apr 2015 09:52:57 +0000 (11:52 +0200)
pdns/Makefile.am
pdns/dnsdist-tcp.cc [new file with mode: 0644]
pdns/dnsdist.cc
pdns/dnsdist.hh
pdns/dnsdistdist/Makefile.am
pdns/dnsdistdist/populate

index a95c00d6f8545686eff7bad8cd5c8e7e94a11e97..33dd7bba63ef03d7c21cf2ff79cf712e89ec199f 100644 (file)
@@ -560,6 +560,7 @@ dnsdist_SOURCES = \
        base64.hh \
        dnsdist.cc \
        dnsdist-lua.cc \
+       dnsdist-tcp.cc \
        dnsdist-web.cc \
        dnslabeltext.cc \
        dnsname.cc dnsname.hh \
diff --git a/pdns/dnsdist-tcp.cc b/pdns/dnsdist-tcp.cc
new file mode 100644 (file)
index 0000000..b7b59a1
--- /dev/null
@@ -0,0 +1,213 @@
+/*
+    PowerDNS Versatile Database Driven Nameserver
+    Copyright (C) 2013 - 2015  PowerDNS.COM BV
+
+    This program is free software; you can redistribute it and/or modify
+    it under the terms of the GNU General Public License version 2
+    as published by the Free Software Foundation
+
+    Additionally, the license of this program contains a special
+    exception which allows to distribute the program in binary form when
+    it is linked against OpenSSL.
+
+    This program is distributed in the hope that it will be useful,
+    but WITHOUT ANY WARRANTY; without even the implied warranty of
+    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+    GNU General Public License for more details.
+
+    You should have received a copy of the GNU General Public License
+    along with this program; if not, write to the Free Software
+    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
+*/
+
+#include "dnsdist.hh"
+#include "dolog.hh"
+#include <thread>
+#include <atomic>
+
+using std::thread;
+using std::atomic;
+
+/* TCP: the grand design. 
+   We forward 'messages' between clients and downstream servers. Messages are 65k bytes large, tops. 
+   An answer might theoretically consist of multiple messages (for example, in the case of AXFR), initially 
+   we will not go there.
+
+   In a sense there is a strong symmetry between UDP and TCP, once a connection to a downstream has been setup.
+   This symmetry is broken because of head-of-line blocking within TCP though, necessitating additional connections
+   to guarantee performance.
+
+   So the idea is to have a 'pool' of available downstream connections, and forward messages to/from them and never queue.
+   So whenever an answer comes in, we know where it needs to go.
+
+   Let's start naively.
+*/
+
+int getTCPDownstream(policy_t policy, string pool, DownstreamState** ds, const ComboAddress& remote, const DNSName& qname, uint16_t qtype, dnsheader* dh)
+{  
+  {
+    std::lock_guard<std::mutex> lock(g_luamutex);
+    *ds = policy(getDownstreamCandidates(g_dstates.getCopy(), pool), remote, qname, qtype, dh).get(); 
+  }
+  
+  vinfolog("TCP connecting to downstream %s", (*ds)->remote.toStringWithPort());
+  int sock = SSocket((*ds)->remote.sin4.sin_family, SOCK_STREAM, 0);
+  SConnect(sock, (*ds)->remote);
+  return sock;
+}
+
+bool getMsgLen(int fd, uint16_t* len)
+try
+{
+  uint16_t raw;
+  int ret = readn2(fd, &raw, 2);
+  if(ret != 2)
+    return false;
+  *len = ntohs(raw);
+  return true;
+}
+catch(...) {
+   return false;
+}
+
+bool putMsgLen(int fd, uint16_t len)
+try
+{
+  uint16_t raw = htons(len);
+  int ret = writen2(fd, &raw, 2);
+  return ret==2;
+}
+catch(...) {
+  return false;
+}
+
+struct ConnectionInfo
+{
+  int fd;
+  ComboAddress remote;
+};
+
+void* tcpClientThread(int pipefd);
+
+
+  // Should not be called simultaneously!
+void TCPClientCollection::addTCPClientThread()
+{  
+  vinfolog("Adding TCP Client thread");
+
+  int pipefds[2];
+  if(pipe(pipefds) < 0)
+    unixDie("Creating pipe");
+
+  d_tcpclientthreads.push_back(pipefds[1]);    
+  thread t1(tcpClientThread, pipefds[0]);
+  t1.detach();
+  ++d_numthreads;
+}
+
+TCPClientCollection g_tcpclientthreads;
+
+
+
+void* tcpClientThread(int pipefd)
+{
+  /* we get launched with a pipe on which we receive file descriptors from clients that we own
+     from that point on */
+  int dsock = -1;
+  DownstreamState *ds=0;
+  
+  for(;;) {
+    ConnectionInfo* citmp, ci;
+
+    readn2(pipefd, &citmp, sizeof(citmp));
+    --g_tcpclientthreads.d_queued;
+    ci=*citmp;
+    delete citmp;
+    
+    uint16_t qlen, rlen;
+    string pool; // empty for now
+    try {
+      auto localPolicy = g_policy.getLocal();
+      for(;;) {      
+        if(!getMsgLen(ci.fd, &qlen))
+          break;
+        
+        char query[qlen];
+        readn2(ci.fd, query, qlen);
+       uint16_t qtype;
+       DNSName qname(query, qlen, 12, false, &qtype);
+       struct dnsheader* dh =(dnsheader*)query;
+       if(dsock == -1) {
+         dsock = getTCPDownstream(localPolicy->policy, pool, &ds, ci.remote, qname, qtype, dh);
+       }
+       else {
+         vinfolog("Reusing existing TCP connection to %s", ds->remote.toStringWithPort());
+       }
+        ds->queries++;
+        ds->outstanding++;
+
+       if(qtype == QType::AXFR)  // XXX fixme we really need to do better
+         break;
+
+      retry:; 
+        if(!putMsgLen(dsock, qlen)) {
+         vinfolog("Downstream connection to %s died on us, getting a new one!", ds->remote.toStringWithPort());
+          close(dsock);
+          dsock=getTCPDownstream(localPolicy->policy, pool, &ds, ci.remote, qname, qtype, dh);
+          goto retry;
+        }
+      
+        writen2(dsock, query, qlen);
+      
+        if(!getMsgLen(dsock, &rlen)) {
+         vinfolog("Downstream connection to %s died on us phase 2, getting a new one!", ds->remote.toStringWithPort());
+          close(dsock);
+          dsock=getTCPDownstream(localPolicy->policy, pool, &ds, ci.remote, qname, qtype, dh);
+          goto retry;
+        }
+
+        char answerbuffer[rlen];
+        readn2(dsock, answerbuffer, rlen);
+      
+        putMsgLen(ci.fd, rlen);
+        writen2(ci.fd, answerbuffer, rlen);
+      }
+    }
+    catch(...){}
+    
+    vinfolog("Closing client connection with %s", ci.remote.toStringWithPort());
+    close(ci.fd); 
+    ci.fd=-1;
+    --ds->outstanding;
+  }
+  return 0;
+}
+
+
+/* spawn as many of these as required, they call Accept on a socket on which they will accept queries, and 
+   they will hand off to worker threads & spawn more of them if required
+*/
+void* tcpAcceptorThread(void* p)
+{
+  ClientState* cs = (ClientState*) p;
+
+  ComboAddress remote;
+  remote.sin4.sin_family = cs->local.sin4.sin_family;
+  
+  g_tcpclientthreads.addTCPClientThread();
+
+  for(;;) {
+    try {
+      ConnectionInfo* ci = new ConnectionInfo;      
+      ci->fd = SAccept(cs->tcpFD, remote);
+
+      vinfolog("Got connection from %s", remote.toStringWithPort());
+      
+      ci->remote = remote;
+      writen2(g_tcpclientthreads.getThread(), &ci, sizeof(ci));
+    }
+    catch(...){}
+  }
+
+  return 0;
+}
index 23178c3364ca35368695f474199bf39c67e228b4..b0b179f7abfa498a5fadee2423b7bfd6469a3759 100644 (file)
@@ -161,13 +161,6 @@ DownstreamState::DownstreamState(const ComboAddress& remote_)
 }
 
 
-struct ClientState
-{
-  ComboAddress local;
-  int udpFD;
-  int tcpFD;
-};
-
 
 std::mutex g_luamutex;
 LuaContext g_lua;
@@ -471,204 +464,6 @@ catch(...)
   return 0;
 }
 
-/* TCP: the grand design. 
-   We forward 'messages' between clients and downstream servers. Messages are 65k bytes large, tops. 
-   An answer might theoretically consist of multiple messages (for example, in the case of AXFR), initially 
-   we will not go there.
-
-   In a sense there is a strong symmetry between UDP and TCP, once a connection to a downstream has been setup.
-   This symmetry is broken because of head-of-line blocking within TCP though, necessitating additional connections
-   to guarantee performance.
-
-   So the idea is to have a 'pool' of available downstream connections, and forward messages to/from them and never queue.
-   So whenever an answer comes in, we know where it needs to go.
-
-   Let's start naively.
-*/
-
-int getTCPDownstream(policy_t policy, string pool, DownstreamState** ds, const ComboAddress& remote, const DNSName& qname, uint16_t qtype, dnsheader* dh)
-{  
-  {
-    std::lock_guard<std::mutex> lock(g_luamutex);
-    *ds = policy(getDownstreamCandidates(g_dstates.getCopy(), pool), remote, qname, qtype, dh).get(); 
-  }
-  
-  vinfolog("TCP connecting to downstream %s", (*ds)->remote.toStringWithPort());
-  int sock = SSocket((*ds)->remote.sin4.sin_family, SOCK_STREAM, 0);
-  SConnect(sock, (*ds)->remote);
-  return sock;
-}
-
-bool getMsgLen(int fd, uint16_t* len)
-try
-{
-  uint16_t raw;
-  int ret = readn2(fd, &raw, 2);
-  if(ret != 2)
-    return false;
-  *len = ntohs(raw);
-  return true;
-}
-catch(...) {
-   return false;
-}
-
-bool putMsgLen(int fd, uint16_t len)
-try
-{
-  uint16_t raw = htons(len);
-  int ret = writen2(fd, &raw, 2);
-  return ret==2;
-}
-catch(...) {
-  return false;
-}
-
-struct ConnectionInfo
-{
-  int fd;
-  ComboAddress remote;
-};
-
-void* tcpClientThread(int pipefd);
-
-class TCPClientCollection {
-  vector<int> d_tcpclientthreads;
-  atomic<uint64_t> d_pos;
-public:
-  atomic<uint64_t> d_queued, d_numthreads;
-
-  TCPClientCollection()
-  {
-    d_tcpclientthreads.reserve(1024);
-  }
-
-  int getThread() 
-  {
-    int pos = d_pos++;
-    ++d_queued;
-    return d_tcpclientthreads[pos % d_numthreads];
-  }
-
-  // Should not be called simultaneously!
-  void addTCPClientThread()
-  {  
-    vinfolog("Adding TCP Client thread");
-
-    int pipefds[2];
-    if(pipe(pipefds) < 0)
-      unixDie("Creating pipe");
-
-    d_tcpclientthreads.push_back(pipefds[1]);    
-    thread t1(tcpClientThread, pipefds[0]);
-    t1.detach();
-    ++d_numthreads;
-  }
-} g_tcpclientthreads;
-
-
-void* tcpClientThread(int pipefd)
-{
-  /* we get launched with a pipe on which we receive file descriptors from clients that we own
-     from that point on */
-  int dsock = -1;
-  DownstreamState *ds=0;
-  
-  for(;;) {
-    ConnectionInfo* citmp, ci;
-
-    readn2(pipefd, &citmp, sizeof(citmp));
-    --g_tcpclientthreads.d_queued;
-    ci=*citmp;
-    delete citmp;
-    
-    uint16_t qlen, rlen;
-    string pool; // empty for now
-    try {
-      auto localPolicy = g_policy.getLocal();
-      for(;;) {      
-        if(!getMsgLen(ci.fd, &qlen))
-          break;
-        
-        char query[qlen];
-        readn2(ci.fd, query, qlen);
-       uint16_t qtype;
-       DNSName qname(query, qlen, 12, false, &qtype);
-       struct dnsheader* dh =(dnsheader*)query;
-       if(dsock == -1) {
-         dsock = getTCPDownstream(localPolicy->policy, pool, &ds, ci.remote, qname, qtype, dh);
-       }
-       else {
-         vinfolog("Reusing existing TCP connection to %s", ds->remote.toStringWithPort());
-       }
-        ds->queries++;
-        ds->outstanding++;
-
-       if(qtype == QType::AXFR)  // XXX fixme we really need to do better
-         break;
-
-      retry:; 
-        if(!putMsgLen(dsock, qlen)) {
-         vinfolog("Downstream connection to %s died on us, getting a new one!", ds->remote.toStringWithPort());
-          close(dsock);
-          dsock=getTCPDownstream(localPolicy->policy, pool, &ds, ci.remote, qname, qtype, dh);
-          goto retry;
-        }
-      
-        writen2(dsock, query, qlen);
-      
-        if(!getMsgLen(dsock, &rlen)) {
-         vinfolog("Downstream connection to %s died on us phase 2, getting a new one!", ds->remote.toStringWithPort());
-          close(dsock);
-          dsock=getTCPDownstream(localPolicy->policy, pool, &ds, ci.remote, qname, qtype, dh);
-          goto retry;
-        }
-
-        char answerbuffer[rlen];
-        readn2(dsock, answerbuffer, rlen);
-      
-        putMsgLen(ci.fd, rlen);
-        writen2(ci.fd, answerbuffer, rlen);
-      }
-    }
-    catch(...){}
-    
-    vinfolog("Closing client connection with %s", ci.remote.toStringWithPort());
-    close(ci.fd); 
-    ci.fd=-1;
-    --ds->outstanding;
-  }
-  return 0;
-}
-
-
-/* spawn as many of these as required, they call Accept on a socket on which they will accept queries, and 
-   they will hand off to worker threads & spawn more of them if required
-*/
-void* tcpAcceptorThread(void* p)
-{
-  ClientState* cs = (ClientState*) p;
-
-  ComboAddress remote;
-  remote.sin4.sin_family = cs->local.sin4.sin_family;
-  
-  g_tcpclientthreads.addTCPClientThread();
-
-  for(;;) {
-    try {
-      ConnectionInfo* ci = new ConnectionInfo;      
-      ci->fd = SAccept(cs->tcpFD, remote);
-
-      vinfolog("Got connection from %s", remote.toStringWithPort());
-      
-      ci->remote = remote;
-      writen2(g_tcpclientthreads.getThread(), &ci, sizeof(ci));
-    }
-    catch(...){}
-  }
-
-  return 0;
-}
 
 bool upCheck(const ComboAddress& remote)
 try
index e465c1198c3db7a4f17e52386134f7cd36f2cf7f..8c15abf7c1bf58da75c289973ef5a79366881f6f 100644 (file)
@@ -167,6 +167,35 @@ struct Rings {
 
 extern Rings g_rings; // XXX locking for this is still substandard, queryRing and clientRing need RW lock
 
+struct ClientState
+{
+  ComboAddress local;
+  int udpFD;
+  int tcpFD;
+};
+
+class TCPClientCollection {
+  std::vector<int> d_tcpclientthreads;
+  std::atomic<uint64_t> d_pos;
+public:
+  std::atomic<uint64_t> d_queued, d_numthreads;
+
+  TCPClientCollection()
+  {
+    d_tcpclientthreads.reserve(1024);
+  }
+
+  int getThread() 
+  {
+    int pos = d_pos++;
+    ++d_queued;
+    return d_tcpclientthreads[pos % d_numthreads];
+  }
+  void addTCPClientThread();
+};
+
+extern TCPClientCollection g_tcpclientthreads;
+
 struct DownstreamState
 {
   DownstreamState(const ComboAddress& remote_);
@@ -278,3 +307,6 @@ std::unique_ptr<T> make_unique(Args&&... args)
     return std::unique_ptr<T>(new T(std::forward<Args>(args)...));
 }
 void dnsdistWebserverThread(int sock, const ComboAddress& local, const string& password);
+bool getMsgLen(int fd, uint16_t* len);
+bool putMsgLen(int fd, uint16_t len);
+void* tcpAcceptorThread(void* p);
\ No newline at end of file
index 46ce8fa8b8b784f2da65d47eb8284bbc2d9fd8b2..840714f0578afd1e2b4a2028eee1c126d8c38b0d 100644 (file)
@@ -18,6 +18,7 @@ dnsdist_SOURCES = \
        dns.hh \
        dnsdist.cc dnsdist.hh \
        dnsdist-lua.cc \
+       dnsdist-tcp.cc \
        dnsdist-web.cc \
        dnslabeltext.cc \
        dnsname.cc dnsname.hh \
index 05a64fb62cab1f624f969fe4afb2f6fe1fa4c2ff..cbe0b1f5e5101e14b11ba7a538657abbf3760fd4 100755 (executable)
@@ -4,7 +4,7 @@ ln -fs ../base32.hh ../base64.hh ../dnsdist.cc ../dnsdist.hh ../dnsdist-lua.cc .
 ../dnslabeltext.rl ../dnsname.cc ../dnsname.hh ../dnsparser.hh ../dnsrulactions.hh ../dnswriter.cc ../dnswriter.hh \
 ../dolog.hh ../iputils.cc ../iputils.hh ../misc.cc ../misc.hh ../namespaces.hh \
 ../pdnsexception.hh ../qtype.cc ../qtype.hh ../sholder.hh ../sodcrypto.cc ../sodcrypto.hh \
-../dnsdist-web.cc ../sstuff.hh .
+../dnsdist-web.cc ../sstuff.hh ../dnsdist-tcp.cc .
 
 ln -fs ../README-dnsdist.md README.md