]> granicus.if.org Git - pdns/commitdiff
first shot at working TCP resolving code on receipt of packet with TC bit (f.e.,...
authorBert Hubert <bert.hubert@netherlabs.nl>
Tue, 5 Jul 2005 22:33:29 +0000 (22:33 +0000)
committerBert Hubert <bert.hubert@netherlabs.nl>
Tue, 5 Jul 2005 22:33:29 +0000 (22:33 +0000)
only use if desperate, very fresh code

git-svn-id: svn://svn.powerdns.com/pdns/trunk/pdns@433 d19b8d6e-7fed-0310-83ef-9ca221ded41b

pdns/lwres.cc
pdns/lwres.hh
pdns/pdns_recursor.cc
pdns/sstuff.hh
pdns/syncres.cc
pdns/syncres.hh

index a50415cb2e0ca997403a1de6802283b5cda79c09..cf77c93f7e0e12de09176c434d9b4e33ee97f11a 100644 (file)
@@ -35,7 +35,8 @@
 #include "ahuexception.hh"
 #include "statbag.hh"
 #include "arguments.hh"
-
+#include "sstuff.hh"
+#include "syncres.hh"
 
 LWRes::LWRes()
 {
@@ -55,7 +56,7 @@ LWRes::~LWRes()
 
 //! returns -1 for permanent error, 0 for timeout, 1 for success
 /** Never throws! */
-int LWRes::asyncresolve(const string &ip, const char *domain, int type)
+int LWRes::asyncresolve(const string &ip, const char *domain, int type, bool doTCP)
 {
   DNSPacket p;
   p.setQuestion(Opcode::Query,domain,type);
@@ -69,6 +70,7 @@ int LWRes::asyncresolve(const string &ip, const char *domain, int type)
 
   struct sockaddr_in toaddr;
   struct in_addr inp;
+  Utility::socklen_t addrlen=sizeof(toaddr);
   Utility::inet_aton(ip.c_str(),&inp);
   toaddr.sin_addr.s_addr=inp.s_addr;
 
@@ -79,16 +81,53 @@ int LWRes::asyncresolve(const string &ip, const char *domain, int type)
 
   DTime dt;
   dt.set();
-  if(asendto(p.getData(), p.len, 0, (struct sockaddr*)(&toaddr), sizeof(toaddr),p.d.id)<0) {
-    return -1;
+
+  if(!doTCP) {
+    if(asendto(p.getData(), p.len, 0, (struct sockaddr*)(&toaddr), sizeof(toaddr),p.d.id)<0) {
+      return -1;
+    }
+  
+    // sleep until we see an answer to this, interface to mtasker
+    
+    ret=arecvfrom(reinterpret_cast<char *>(d_buf), d_bufsize-1,0,(struct sockaddr*)(&toaddr), &addrlen, &d_len, p.d.id);
   }
+  else {
+    Socket s(InterNetwork, Stream);
+    IPEndpoint ie(ip, 53);
+    s.setNonBlocking();
+    s.connect(ie);
+
+    int len=htons(p.len);
+    char *lenP=(char*)&len;
+    const char *msgP=p.getData();
+    string packet=string(lenP, lenP+2)+string(msgP, msgP+p.len);
+
+    if(asendtcp(packet, &s) == 0) {
+      cerr<<"asendtcp: timeout"<<endl;
+      return -1;
+    }
     
-  Utility::socklen_t addrlen=sizeof(toaddr);
-  
-  // sleep until we see an answer to this, interface to mtasker
-  
-  ret=arecvfrom(reinterpret_cast<char *>(d_buf), d_bufsize-1,0,(struct sockaddr*)(&toaddr), &addrlen, &d_len, p.d.id);
-    d_usec=dt.udiff();
+    packet.clear();
+    if(arecvtcp(packet,2, &s)==0) {
+      cerr<<"arecvtcp: timeout"<<endl;
+      return -1;
+    }
+
+    memcpy(&len, packet.c_str(), 2);
+    len=ntohs(len);
+
+    //    cerr<<"Now reading "<<len<<" bytes"<<endl;
+
+    if(arecvtcp(packet, len, &s)==0) {
+      cerr<<"arecvtcp: timeout"<<endl;
+      return -1;
+    }
+
+    memcpy(d_buf, packet.c_str(), len);
+    d_len=len;
+    ret=1;
+  }
+  d_usec=dt.udiff();
     
   return ret;
 }
@@ -102,6 +141,7 @@ LWRes::res_t LWRes::result()
     if(p.parse((char *)d_buf, d_len)<0)
       throw LWResException("resolver: unable to parse packet of "+itoa(d_len)+" bytes");
     d_aabit=p.d.aa;
+    d_aabit=p.d.tc;
     d_rcode=p.d.rcode;
     return p.getAnswers();
   }
index 132d0c9d0a7bd7664558e85e30bf79701e9f955b..3f3afe8b329b379bd39d6e2428d5b233d07799d1 100644 (file)
@@ -1,6 +1,6 @@
 /*
     PowerDNS Versatile Database Driven Nameserver
-    Copyright (C) 2002  PowerDNS.COM BV
+    Copyright (C) 2002 - 2005 PowerDNS.COM BV
 
     This program is free software; you can redistribute it and/or modify
     it under the terms of the GNU General Public License as published by
@@ -61,10 +61,10 @@ public:
 
   typedef vector<DNSResourceRecord> res_t;
 
-  int asyncresolve(const string &ip, const char *domain, int type);
+  int asyncresolve(const string &ip, const char *domain, int type, bool doTCP);
   vector<DNSResourceRecord> result();
   int d_rcode;
-  bool d_aabit;
+  bool d_aabit, d_tcbit;
   u_int32_t d_usec;
 private:
   int d_sock;
@@ -78,8 +78,6 @@ private:
   u_int32_t d_ip;
   bool d_inaxfr;
   int d_bufsize;
-
-
 };
 
 #endif // PDNS_LWRES_HH
index 5bfe6293a03f2914147a37a62957a18da912b31d..7c89a86d0111e68855b6b4f5fc6103e0092bbc33 100644 (file)
 #include "syncres.hh"
 #include <fcntl.h>
 #include <fstream>
+#include "sstuff.hh"
+#include <boost/tuple/tuple.hpp>
+#include <boost/tuple/tuple_comparison.hpp>
+
+using namespace boost;
+
 #include "recursor_cache.hh"
 
 #ifdef __FreeBSD__           // see cvstrac ticket #26
@@ -74,29 +80,74 @@ static int d_clientsock;
 static vector<int> d_udpserversocks;
 static vector<int> d_tcpserversocks;
 
+
 struct PacketID
 {
-  u_int16_t id;
-  struct sockaddr_in remote;
+  PacketID() : sock(0), inNeeded(0), outPos(0)
+  {}
+
+  u_int16_t id;  // wait for a specific id/remote paie
+  struct sockaddr_in remote;  // this is the remote
+
+  Socket* sock;  // or wait for an event on a TCP fd
+  int inNeeded; // if this is set, we'll read until inNeeded bytes are read
+  string inMSG; // they'll go here
+
+  string outMSG; // the outgoing message that needs to be sent
+  int outPos;    // how far we are along in the outMSG
+
+  bool operator<(const PacketID& b) const
+  {
+    int ourSock= sock ? sock->getHandle() : 0;
+    int bSock = b.sock ? b.sock->getHandle() : 0;
+    return 
+      tie(id, remote.sin_addr.s_addr, remote.sin_port, ourSock) <
+      tie(b.id, b.remote.sin_addr.s_addr, b.remote.sin_port, bSock);
+  }
 };
 
-bool operator<(const PacketID& a, const PacketID& b)
+static map<int,PacketID> d_tcpclientreadsocks, d_tcpclientwritesocks;
+
+MTasker<PacketID,string>* MT;
+
+int asendtcp(const string& data, Socket* sock) 
+{
+  PacketID pident;
+  pident.sock=sock;
+  pident.outMSG=data;
+  string packet;
+
+  //  cerr<<"asendtcp called for "<<data.size()<<" bytes"<<endl;
+  d_tcpclientwritesocks[sock->getHandle()]=pident;
+
+  if(!MT->waitEvent(pident,&packet,1)) { // timeout
+    d_tcpclientwritesocks.erase(sock->getHandle());
+    return 0; 
+  }
+  //  cerr<<"asendtcp happy"<<endl;
+  return 1;
+}
+
+int arecvtcp(string& data, int len, Socket* sock) 
 {
-  if(a.id<b.id)
-    return true;
-
-  if(a.id==b.id) {
-    if(a.remote.sin_addr.s_addr < b.remote.sin_addr.s_addr)
-      return true;
-    if(a.remote.sin_addr.s_addr == b.remote.sin_addr.s_addr)
-      if(a.remote.sin_port < b.remote.sin_port)
-       return true;
+  data="";
+  PacketID pident;
+  pident.sock=sock;
+  pident.inNeeded=len;
+
+  // cerr<<"arecvtcp called for "<<len<<" bytes"<<endl;
+  // cerr<<d_tcpclientwritesocks.size()<<" write sockets"<<endl;
+  d_tcpclientreadsocks[sock->getHandle()]=pident;
+
+  if(!MT->waitEvent(pident,&data,1)) { // timeout
+    d_tcpclientreadsocks.erase(sock->getHandle());
+    return 0; 
   }
 
-  return false;
+  // cerr<<"arecvtcp happy, data.size(): "<<data.size()<<endl;
+  return 1;
 }
 
-MTasker<PacketID,string>* MT;
 
 /* these two functions are used by LWRes */
 int asendto(const char *data, int len, int flags, struct sockaddr *toaddr, int addrlen, int id) 
@@ -205,7 +256,7 @@ void startDoResolve(void *p)
     if(!quiet) {
       L<<Logger::Error<<"["<<MT->getTid()<<"] answer to "<<(P.d.rd?"":"non-rd ")<<"question '"<<P.qdomain<<"|"<<P.qtype.getName();
       L<<"': "<<ntohs(R->d.ancount)<<" answers, "<<ntohs(R->d.arcount)<<" additional, took "<<sr.d_outqueries<<" packets, "<<
-       sr.d_throttledqueries<<" throttled, "<<sr.d_timeouts<<" timeouts, rcode="<<res<<endl;
+       sr.d_throttledqueries<<" throttled, "<<sr.d_timeouts<<" timeouts, "<<sr.d_tcpoutqueries<<" tcp connections, rcode="<<res<<endl;
     }
     
     sr.d_outqueries ? RC.cacheMisses++ : RC.cacheHits++; 
@@ -361,7 +412,7 @@ void doStats(void)
     L<<Logger::Error<<", outpacket/query ratio "<<(int)(SyncRes::s_outqueries*100.0/SyncRes::s_queries)<<"%";
     L<<Logger::Error<<", "<<(int)(SyncRes::s_throttledqueries*100.0/(SyncRes::s_outqueries+SyncRes::s_throttledqueries))<<"% throttled, "
      <<SyncRes::s_nodelegated<<" no-delegation drops"<<endl;
-    L<<Logger::Error<<"stats: "<<MT->numProcesses()<<" queries running, "<<SyncRes::s_outgoingtimeouts<<" outgoing timeouts"<<endl;
+    L<<Logger::Error<<"stats: "<<SyncRes::s_tcpoutqueries<<" outgoing tcp connections, "<<MT->numProcesses()<<" queries running, "<<SyncRes::s_outgoingtimeouts<<" outgoing timeouts"<<endl;
   }
   else if(statsWanted) 
     L<<Logger::Error<<"stats: no stats yet!"<<endl;
@@ -526,8 +577,9 @@ int main(int argc, char **argv)
       tv.tv_sec=0;
       tv.tv_usec=500000;
       
-      fd_set readfds;
+      fd_set readfds, writefds;
       FD_ZERO( &readfds );
+      FD_ZERO( &writefds );
       FD_SET( d_clientsock, &readfds );
       int fdmax=d_clientsock;
 
@@ -543,8 +595,19 @@ int main(int argc, char **argv)
        FD_SET( *i, &readfds );
        fdmax=max(fdmax,*i);
       }
+      for(map<int,PacketID>::const_iterator i=d_tcpclientreadsocks.begin(); i!=d_tcpclientreadsocks.end(); ++i) {
+       // cerr<<"Adding TCP socket "<<i->first<<" to read select set"<<endl;
+       FD_SET( i->first, &readfds );
+       fdmax=max(fdmax,i->first);
+      }
+
+      for(map<int,PacketID>::const_iterator i=d_tcpclientwritesocks.begin(); i!=d_tcpclientwritesocks.end(); ++i) {
+       // cerr<<"Adding TCP socket "<<i->first<<" to write select set"<<endl;
+       FD_SET( i->first, &writefds );
+       fdmax=max(fdmax,i->first);
+      }
 
-      int selret = select(  fdmax + 1, &readfds, NULL, NULL, &tv );
+      int selret = select(  fdmax + 1, &readfds, &writefds, NULL, &tv );
       if(selret<=0) 
        if (selret == -1 && errno!=EINTR) 
          throw AhuException("Select returned: "+stringerror());
@@ -562,7 +625,6 @@ int main(int argc, char **argv)
        }
        else { 
          if(P.d.qr) {
-
            pident.remote=fromaddr;
            pident.id=P.d.id;
            string packet;
@@ -614,6 +676,65 @@ int main(int argc, char **argv)
        }
       }
 
+      for(map<int,PacketID>::iterator i=d_tcpclientreadsocks.begin(); i!=d_tcpclientreadsocks.end();) { 
+       if(FD_ISSET(i->first, &readfds)) { // can we receive
+         // cerr<<"Something happened on our socket when we wanted to read "<<i->first<<endl;
+         // cerr<<"inMSG.size(): "<<i->second.inMSG.size()<<endl;
+         char buffer[i->second.inNeeded];
+         int ret=read(i->first, buffer, min(i->second.inNeeded,200));
+         // cerr<<"Read returned "<<ret<<endl;
+         if(ret > 0) {
+           i->second.inMSG.append(buffer, buffer+ret);
+           i->second.inNeeded-=ret;
+           if(!i->second.inNeeded) {
+             // cerr<<"Got entire load of "<<i->second.inMSG.size()<<" bytes"<<endl;
+             PacketID pid=i->second;
+             string msg=i->second.inMSG;
+             
+             d_tcpclientreadsocks.erase((i++));
+             MT->sendEvent(pid, &msg);   // XXX DODGY
+           }
+           else {
+             // cerr<<"Still have "<<i->second.inNeeded<<" left to go"<<endl;
+             ++i;
+           }
+         }
+         else {
+           cerr<<"when reading ret="<<ret<<endl;
+           ++i;
+         }
+       }
+       else
+         ++i;
+
+      }
+
+      for(map<int,PacketID>::iterator i=d_tcpclientwritesocks.begin(); i!=d_tcpclientwritesocks.end(); ) { 
+       if(FD_ISSET(i->first, &writefds)) { // can we send over TCP
+         // cerr<<"Socket "<<i->first<<" available for writing"<<endl;
+         int ret=write(i->first, i->second.outMSG.c_str(), i->second.outMSG.size() - i->second.outPos);
+         if(ret > 0) {
+           i->second.outPos+=ret;
+           if(i->second.outPos==i->second.outMSG.size()) {
+             // cerr<<"Sent out entire load of "<<i->second.outMSG.size()<<" bytes"<<endl;
+             PacketID pid=i->second;
+             d_tcpclientwritesocks.erase((i++));
+             MT->sendEvent(pid, 0);
+             // cerr<<"Sent event too"<<endl;
+           }
+           else
+             ++i;
+         }
+         else { 
+           ++i;
+           cerr<<"ret="<<ret<<" when writing"<<endl;
+         }
+       }
+       else
+         ++i;
+      }
+
+
       for(vector<TCPConnection>::iterator i=tcpconnections.begin();i!=tcpconnections.end();++i) {
        if(FD_ISSET(i->fd, &readfds)) {
          if(i->state==TCPConnection::BYTE0) {
index eb75e91ad020c237c27698b0d214ef806d707232..35546d84da9439f4a7510097f1753ffc5dd4d496 100755 (executable)
@@ -162,7 +162,7 @@ public:
     remote.sin_addr.s_addr=ep.address.byte;
     remote.sin_port=htons(ep.port);
     
-    if(::connect(d_socket,(struct sockaddr *)&remote,sizeof(remote))<0)
+    if(::connect(d_socket,(struct sockaddr *)&remote,sizeof(remote)) < 0 && errno != EINPROGRESS)
       throw NetworkError(strerror(errno));
   }
 
index 769beaedcd7ac20c12fc8d4919ebc81a8e66eaa3..d7384a1dd33b595dac766bac5cec2ea9e83dc814 100644 (file)
@@ -39,6 +39,7 @@ map<string,NegCacheEntry> SyncRes::s_negcache;
 unsigned int SyncRes::s_queries;
 unsigned int SyncRes::s_outgoingtimeouts;
 unsigned int SyncRes::s_outqueries;
+unsigned int SyncRes::s_tcpoutqueries;
 unsigned int SyncRes::s_throttledqueries;
 unsigned int SyncRes::s_nodelegated;
 bool SyncRes::s_log;
@@ -89,7 +90,6 @@ int SyncRes::doResolve(const string &qname, const QType &qtype, vector<DNSResour
     }
   }
 
-
   if(!(res=doResolveAt(nsset,subdomain,qname,qtype,ret,depth, beenthere)))
     return 0;
   
@@ -338,8 +338,6 @@ inline vector<string> SyncRes::shuffle(set<string> &nameservers, const string &p
     }
     L<<endl;
   }
-  
-      
 
   return rnameservers;
 }
@@ -377,6 +375,8 @@ int SyncRes::doResolveAt(set<string> nameservers, string auth, const string &qna
       }
       LOG<<prefix<<qname<<": Resolved '"+auth+"' NS "<<*tns<<" to "<<remoteIP<<", asking '"<<qname<<"|"<<qtype.getName()<<"'"<<endl;
 
+      bool doTCP=false;
+
       if(s_throttle.shouldThrottle(d_now, remoteIP+"|"+qname+"|"+qtype.getName())) {
        LOG<<prefix<<qname<<": query throttled "<<endl;
        s_throttledqueries++;
@@ -386,8 +386,14 @@ int SyncRes::doResolveAt(set<string> nameservers, string auth, const string &qna
       else {
        s_outqueries++;
        d_outqueries++;
-       int ret=d_lwr.asyncresolve(remoteIP,qname.c_str(),qtype.getCode());
-       if(ret != 1) { // <- we go out on the wire!
+      TryTCP:
+       if(doTCP) {
+         s_tcpoutqueries++;
+         d_tcpoutqueries++;
+       }
+
+       int ret=d_lwr.asyncresolve(remoteIP, qname.c_str(), qtype.getCode(), doTCP);    // <- we go out on the wire!
+       if(ret != 1) {
          if(ret==0) {
            LOG<<prefix<<qname<<": timeout resolving"<<endl;
            d_timeouts++;
@@ -482,7 +488,6 @@ int SyncRes::doResolveAt(set<string> nameservers, string auth, const string &qna
        // for ANY answers we *must* have an authoritive answer
        else if(i->d_place==DNSResourceRecord::ANSWER && toLower(i->qname)==toLower(qname) && 
                (((i->qtype==qtype) || (i->qtype.getCode()>1024 && i->qtype.getCode()-1024==qtype.getCode())) || ( qtype==QType(QType::ANY) && 
-
                                                                                                                   d_lwr.d_aabit)))  {
          if(i->qtype.getCode() < 1024) {
            LOG<<prefix<<qname<<": answer is in: resolved to '"<< i->content<<"|"<<i->qtype.getName()<<"'"<<endl;
@@ -531,6 +536,15 @@ int SyncRes::doResolveAt(set<string> nameservers, string auth, const string &qna
        return doResolve(newtarget, qtype, ret,0,beenthere2);
       }
       if(nsset.empty() && !d_lwr.d_rcode) {
+       if(!negindic && d_lwr.d_tcbit) {
+         if(!doTCP) {
+           doTCP=true;
+           LOG<<prefix<<qname<<": status=noerror, truncated bit set, no negative SOA, retrying via TCP"<<endl;
+           goto TryTCP;
+         }
+         LOG<<prefix<<qname<<": status=noerror, truncated bit set, over TCP?"<<endl;
+         continue;
+       }
        LOG<<prefix<<qname<<": status=noerror, other types may exist, but we are done "<<(negindic ? "(have negative SOA)" : "")<<endl;
        return 0;
       }
index 750951272b15fe7f8e137143e8d0943c7c3c6b95..156a105702cb3969a18194ed3fbcc397e1247c15 100644 (file)
@@ -116,7 +116,7 @@ private:
 class SyncRes
 {
 public:
-  SyncRes() : d_outqueries(0), d_throttledqueries(0), d_timeouts(0), d_cacheonly(false), d_nocache(false), d_now(time(0)) {}
+  SyncRes() : d_outqueries(0), d_tcpoutqueries(0), d_throttledqueries(0), d_timeouts(0), d_cacheonly(false), d_nocache(false), d_now(time(0)) {}
   int beginResolve(const string &qname, const QType &qtype, vector<DNSResourceRecord>&ret);
   void setId(int id)
   {
@@ -138,8 +138,10 @@ public:
   static unsigned int s_outgoingtimeouts;
   static unsigned int s_throttledqueries;
   static unsigned int s_outqueries;
+  static unsigned int s_tcpoutqueries;
   static unsigned int s_nodelegated;
   unsigned int d_outqueries;
+  unsigned int d_tcpoutqueries;
   unsigned int d_throttledqueries;
   unsigned int d_timeouts;
   static map<string,NegCacheEntry> s_negcache;    
@@ -185,4 +187,7 @@ private:
   };
 
 };
+class Socket;
+int asendtcp(const string& data, Socket* sock);
+int arecvtcp(string& data, int len, Socket* sock);
 #endif