]> granicus.if.org Git - pdns/commitdiff
split up master & slave communicator, enable 'Inflighter' which in tests does 3200...
authorBert Hubert <bert.hubert@netherlabs.nl>
Sun, 6 Dec 2009 14:51:08 +0000 (14:51 +0000)
committerBert Hubert <bert.hubert@netherlabs.nl>
Sun, 6 Dec 2009 14:51:08 +0000 (14:51 +0000)
Clean up resolver class, plus add to it the tools needed for many many simultaneous serial verifications

git-svn-id: svn://svn.powerdns.com/pdns/trunk/pdns@1449 d19b8d6e-7fed-0310-83ef-9ca221ded41b

pdns/Makefile.am
pdns/communicator.cc
pdns/mastercommunicator.cc [new file with mode: 0644]
pdns/resolver.cc
pdns/resolver.hh
pdns/slavecommunicator.cc [new file with mode: 0644]

index 24a66e8c091d2ab3f3c423f87ec1c046b185e2d7..e4a8001346457e6f6bc2de0d9218bbe13f6c43aa 100644 (file)
@@ -19,7 +19,8 @@ bin_PROGRAMS = pdns_control
 endif
 
 
-EXTRA_PROGRAMS=pdns_recursor sdig speedtest dnspbench pdns_control dnsscope dnsgram dnsdemog dnswasher dnsreplay dnsscan dnslog nproxy notify
+EXTRA_PROGRAMS=pdns_recursor sdig speedtest dnspbench pdns_control dnsscope dnsgram \
+ dnsdemog dnswasher dnsreplay dnsscan dnslog nproxy notify
 
 pdns_server_SOURCES=dnspacket.cc nameserver.cc tcpreceiver.hh \
 qtype.cc logger.cc arguments.cc packethandler.cc tcpreceiver.cc \
@@ -29,7 +30,7 @@ nameserver.hh packetcache.hh packethandler.hh qtype.hh statbag.hh \
 ueberbackend.hh pdns.conf-dist ws.hh ws.cc webserver.cc webserver.hh \
 session.cc session.hh misc.cc misc.hh receiver.cc ueberbackend.cc \
 dynlistener.cc dynlistener.hh  dynhandler.cc dynhandler.hh  \
-resolver.hh resolver.cc slavecommunicator.cc mastercommunicator.cc communicator.hh dnsproxy.cc \
+resolver.hh resolver.cc slavecommunicator.cc mastercommunicator.cc communicator.cc communicator.hh dnsproxy.cc \
 dnsproxy.hh randombackend.cc unix_utility.cc common_startup.cc \
 utility.hh iputils.hh common_startup.hh unix_semaphore.cc \
 backends/bind/bindbackend2.cc  \
@@ -45,7 +46,7 @@ randomhelper.cc
 
 #
 pdns_server_LDFLAGS= @moduleobjects@ @modulelibs@ @DYNLINKFLAGS@ @LIBDL@ @THREADFLAGS@
-pdns_server_INCLUDES=
+pdns_server_INCLUDES=inflighter.cc
 
 sdig_SOURCES=sdig.cc sstuff.hh dnsparser.cc dnsparser.hh dnsrecords.cc dnswriter.cc dnswriter.hh \
        misc.cc misc.hh rcpgenerator.cc rcpgenerator.hh base64.cc base64.hh unix_utility.cc \
@@ -64,6 +65,10 @@ dnswasher_SOURCES=dnswasher.cc misc.cc unix_utility.cc qtype.cc \
 
 dnswasher_LDFLAGS= @DYNLINKFLAGS@ @THREADFLAGS@
 
+#inflighter_SOURCES=inflighter.cc misc.cc unix_utility.cc qtype.cc logger.cc statbag.cc
+#inflighter_LDFLAGS= @DYNLINKFLAGS@ @THREADFLAGS@
+
+
 # unix_utility.cc
 
 dnsscan_SOURCES=dnsscan.cc misc.cc  qtype.cc anadns.hh \
index 8fb488d95a1832373d23f6b57c1f3efa4f51867f..89247dac230571facbe1523f619b23e919d6fc6f 100644 (file)
@@ -1,6 +1,6 @@
 /*
     PowerDNS Versatile Database Driven Nameserver
-    Copyright (C) 2002-2008  PowerDNS.COM BV
+    Copyright (C) 2002-2009  PowerDNS.COM BV
 
     This program is free software; you can redistribute it and/or modify
     it under the terms of the GNU General Public License version 2 as 
 #include "packetcache.hh"
 #include <boost/lexical_cast.hpp>
 
-using namespace boost;
+#include "namespaces.hh"
 
-void CommunicatorClass::addSuckRequest(const string &domain, const string &master, bool priority)
-{
-  Lock l(&d_lock);
-  
-  SuckRequest sr;
-  sr.domain = domain;
-  sr.master = master;
-
-  if(priority) {
-    d_suckdomains.push_front(sr);
-    //  d_havepriosuckrequest=true;
-  }
-  else 
-    d_suckdomains.push_back(sr);
-  
-  d_suck_sem.post();
-  d_any_sem.post();
-}
-
-void CommunicatorClass::suck(const string &domain,const string &remote)
-{
-  L<<Logger::Error<<"Initiating transfer of '"<<domain<<"' from remote '"<<remote<<"'"<<endl;
-  uint32_t domain_id;
-  PacketHandler P;
-
-  DomainInfo di;
-  di.backend=0;
-  bool first=true;    
-  try {
-    Resolver resolver;
-    resolver.axfr(remote, domain.c_str());
-
-    UeberBackend *B=dynamic_cast<UeberBackend *>(P.getBackend());
-
-    if(!B->getDomainInfo(domain, di) || !di.backend) {
-      L<<Logger::Error<<"Can't determine backend for domain '"<<domain<<"'"<<endl;
-      return;
-    }
-    domain_id=di.id;
-
-    Resolver::res_t recs;
-
-    while(resolver.axfrChunk(recs)) {
-      if(first) {
-       L<<Logger::Error<<"AXFR started for '"<<domain<<"', transaction started"<<endl;
-       di.backend->startTransaction(domain, domain_id);
-       first=false;
-      }
-      for(Resolver::res_t::iterator i=recs.begin();i!=recs.end();++i) {
-       if(!endsOn(i->qname, domain)) { 
-         L<<Logger::Error<<"Remote "<<remote<<" tried to sneak in out-of-zone data '"<<i->qname<<"' during AXFR of zone '"<<domain<<"', ignoring"<<endl;
-         continue;
-       }
-       i->domain_id=domain_id;
-       if(i->qtype.getCode()>=1024)
-         throw DBException("Database can't store unknown record type "+lexical_cast<string>(i->qtype.getCode()-1024));
-
-       di.backend->feedRecord(*i);
-      }
-    }
-    di.backend->commitTransaction();
-    di.backend->setFresh(domain_id);
-    L<<Logger::Error<<"AXFR done for '"<<domain<<"', zone committed"<<endl;
-  }
-  catch(DBException &re) {
-    L<<Logger::Error<<"Unable to feed record during incoming AXFR of '"+domain+"': "<<re.reason<<endl;
-    if(di.backend && !first) {
-      L<<Logger::Error<<"Aborting possible open transaction for domain '"<<domain<<"' AXFR"<<endl;
-      di.backend->abortTransaction();
-    }
-  }
-  catch(ResolverException &re) {
-    L<<Logger::Error<<"Unable to AXFR zone '"+domain+"' from remote '"<<remote<<"': "<<re.reason<<endl;
-    if(di.backend && !first) {
-      L<<Logger::Error<<"Aborting possible open transaction for domain '"<<domain<<"' AXFR"<<endl;
-      di.backend->abortTransaction();
-    }
-  }
-}
-
-class FindNS
-{
-public:
-  vector<string>lookup(const string &name, DNSBackend *B)
-  {
-    vector<string>addresses;
-    struct hostent *h;
-    h=gethostbyname(name.c_str());
-
-    if(h) {
-      for(char **h_addr_list=h->h_addr_list;*h_addr_list;++h_addr_list) {
-       ostringstream os;
-       unsigned char *p=reinterpret_cast<unsigned char *>(*h_addr_list);
-       os<<(int)*p++<<".";
-       os<<(int)*p++<<".";
-       os<<(int)*p++<<".";
-       os<<(int)*p++;
-
-       addresses.push_back(os.str());
-      }
-    }
-
-    B->lookup(QType(QType::A),name);
-    DNSResourceRecord rr;
-    while(B->get(rr)) 
-      addresses.push_back(rr.content);   // SOL if you have a CNAME for an NS
-
-    return addresses;
-  }
-}d_fns;
-
-void CommunicatorClass::queueNotifyDomain(const string &domain, DNSBackend *B)
-{
-  set<string> ips;
-  
-  DNSResourceRecord rr;
-  set<string>nsset;
-
-  B->lookup(QType(QType::NS),domain);
-  while(B->get(rr)) 
-    nsset.insert(rr.content);
-  
-  for(set<string>::const_iterator j=nsset.begin();j!=nsset.end();++j) {
-    vector<string>nsips=d_fns.lookup(*j, B);
-    if(nsips.empty())
-      L<<Logger::Warning<<"Unable to queue notification of domain '"<<domain<<"': nameservers do not resolve!"<<endl;
-    for(vector<string>::const_iterator k=nsips.begin();k!=nsips.end();++k)
-      ips.insert(*k);
-  }
-  
-  // make calls to d_nq.add(domain, ip);
-  for(set<string>::const_iterator j=ips.begin();j!=ips.end();++j) {
-    L<<Logger::Warning<<"Queued notification of domain '"<<domain<<"' to "<<*j<<endl;
-    d_nq.add(domain,*j);
-  }
-  
-  set<string>alsoNotify;
-  B->alsoNotifies(domain, &alsoNotify);
-  
-  for(set<string>::const_iterator j=alsoNotify.begin();j!=alsoNotify.end();++j) {
-    L<<Logger::Warning<<"Queued also-notification of domain '"<<domain<<"' to "<<*j<<endl;
-    d_nq.add(domain,*j);
-  }
-}
-
-bool CommunicatorClass::notifyDomain(const string &domain)
-{
-  DomainInfo di;
-  PacketHandler P;
-  if(!P.getBackend()->getDomainInfo(domain, di)) {
-    L<<Logger::Error<<"No such domain '"<<domain<<"' in our database"<<endl;
-    return false;
-  }
-  queueNotifyDomain(domain, P.getBackend());
-  // call backend and tell them we sent out the notification - even though that is premature    
-  di.backend->setNotified(di.id, di.serial);
-
-  return true; 
-}
-
-
-void CommunicatorClass::masterUpdateCheck(PacketHandler *P)
-{
-  if(!::arg().mustDo("master"))
-    return; 
-
-  UeberBackend *B=dynamic_cast<UeberBackend *>(P->getBackend());
-  vector<DomainInfo> cmdomains;
-  B->getUpdatedMasters(&cmdomains);
-  
-  if(cmdomains.empty()) {
-    if(d_masterschanged)
-      L<<Logger::Warning<<"No master domains need notifications"<<endl;
-    d_masterschanged=false;
-  }
-  else {
-    d_masterschanged=true;
-    L<<Logger::Error<<cmdomains.size()<<" domain"<<(cmdomains.size()>1 ? "s" : "")<<" for which we are master need"<<
-      (cmdomains.size()>1 ? "" : "s")<<
-      " notifications"<<endl;
-  }
-
-  // figure out A records of everybody needing notification
-  // do this via the FindNS class, d_fns
-  
-  for(vector<DomainInfo>::const_iterator i=cmdomains.begin();i!=cmdomains.end();++i) {
-    extern PacketCache PC;
-    vector<string> topurge;
-    topurge.push_back(i->zone);
-    PC.purge(topurge); // fixes cvstrac ticket #30
-    queueNotifyDomain(i->zone,P->getBackend());
-    i->backend->setNotified(i->id,i->serial); 
-  }
-}
-
-void CommunicatorClass::slaveRefresh(PacketHandler *P)
-{
-  UeberBackend *B=dynamic_cast<UeberBackend *>(P->getBackend());
-  vector<DomainInfo> sdomains;
-  B->getUnfreshSlaveInfos(&sdomains);
-  
-  if(sdomains.empty())
-  {
-    if(d_slaveschanged)
-      L<<Logger::Warning<<"All slave domains are fresh"<<endl;
-    d_slaveschanged=false;
-    return;
-  }
-  else 
-    L<<Logger::Warning<<sdomains.size()<<" slave domain"<<(sdomains.size()>1 ? "s" : "")<<" need"<<
-      (sdomains.size()>1 ? "" : "s")<<
-      " checking"<<endl;
-  map<string, int> skipMasters;
-  for(vector<DomainInfo>::iterator i=sdomains.begin();i!=sdomains.end();++i) {
-    Resolver resolver;   
-    resolver.makeUDPSocket();  
-    d_slaveschanged=true;
-    uint32_t ourserial=i->serial, theirserial=0;
-    
-    if(d_havepriosuckrequest) {
-      d_havepriosuckrequest=false;
-      break;
-    }
-
-    random_shuffle(i->masters.begin(), i->masters.end());
-    for(vector<string>::const_iterator iter = i->masters.begin(); iter != i->masters.end(); ++iter) {
-      try {
-       if(skipMasters[*iter] > 5)
-         throw AhuException("Skipping query to '"+*iter+"' because of previous timeouts in this cycle");
-       
-       resolver.getSoaSerial(*iter, i->zone, &theirserial);
-       skipMasters[*iter]=0;   
-       if(theirserial<i->serial) {
-         L<<Logger::Error<<"Domain "<<i->zone<<" more recent than master, our serial "<<ourserial<<" > their serial "<<theirserial<<endl;
-         i->backend->setFresh(i->id);
-       }
-       else if(theirserial==i->serial) {
-         L<<Logger::Warning<<"Domain "<<i->zone<<" is fresh"<<endl;
-         i->backend->setFresh(i->id);
-       }
-       else {
-         L<<Logger::Warning<<"Domain "<<i->zone<<" is stale, master serial "<<theirserial<<", our serial "<<i->serial<<endl;
-         addSuckRequest(i->zone, *iter);
-       }
-       break;
-      }
-      catch(ResolverException &re) {
-       if(re.reason.find("Timeout") != string::npos)
-         skipMasters[*iter]++;
-
-       L<<Logger::Error<<"Error trying to retrieve/refresh '"+i->zone+"': "+re.reason<<endl;
-       if(next(iter) != i->masters.end()) 
-         L<<Logger::Error<<"Trying next master '"<<*next(iter)<<"' for '"+i->zone+"'"<<endl;
-      }
-      catch(AhuException &re) {
-       L<<Logger::Error<<"Error trying to retrieve/refresh '"+i->zone+"': "+re.reason<<endl;
-       if(next(iter) != i->masters.end()) 
-         L<<Logger::Error<<"Trying next master '"<<*next(iter)<<"' for '"+i->zone+"'"<<endl;
-      }
-    }
-  }
-}  
-
-time_t CommunicatorClass::doNotifications()
-{
-  ComboAddress from;
-  Utility::socklen_t fromlen=sizeof(from);
-  char buffer[1500];
-  int size;
-  static Resolver d_nresolver;
-  // receive incoming notifications on the nonblocking socket and take them off the list
-
-  while((size=recvfrom(d_nsock,buffer,sizeof(buffer),0,(struct sockaddr *)&from,&fromlen))>0) {
-    DNSPacket p;
-
-    p.setRemote(&from);
-
-    if(p.parse(buffer,size)<0) {
-      L<<Logger::Warning<<"Unable to parse SOA notification answer from "<<p.getRemote()<<endl;
-      continue;
-    }
-
-    if(p.d.rcode)
-      L<<Logger::Warning<<"Received unsuccesful notification report for '"<<p.qdomain<<"' from "<<p.getRemote()<<", rcode: "<<p.d.rcode<<endl;      
-    
-    if(d_nq.removeIf(p.getRemote(), p.d.id, p.qdomain))
-      L<<Logger::Warning<<"Removed from notification list: '"<<p.qdomain<<"' to "<<p.getRemote()<< (p.d.rcode ? "" : " (was acknowledged)")<<endl;      
-    else
-      L<<Logger::Warning<<"Received spurious notify answer for '"<<p.qdomain<<"' from "<<p.getRemote()<<endl;      
-  }
-
-  // send out possible new notifications
-  string domain, ip;
-  uint16_t id;
-
-  bool purged;
-  while(d_nq.getOne(domain, ip, &id, purged)) {
-    if(!purged) {
-      try {
-       d_nresolver.notify(d_nsock, domain, ip, id);
-       drillHole(domain, ip);
-      }
-      catch(ResolverException &re) {
-       L<<Logger::Error<<"Error trying to resolve '"+ip+"' for notifying '"+domain+"' to server: "+re.reason<<endl;
-      }
-    }
-    else
-      L<<Logger::Error<<Logger::NTLog<<"Notification for "<<domain<<" to "<<ip<<" failed after retries"<<endl;
-  }
-
-  return d_nq.earliest();
-}
-
-void CommunicatorClass::drillHole(const string &domain, const string &ip)
-{
-  Lock l(&d_holelock);
-  d_holes[make_pair(domain,ip)]=time(0);
-}
-
-bool CommunicatorClass::justNotified(const string &domain, const string &ip)
-{
-  Lock l(&d_holelock);
-  if(d_holes.find(make_pair(domain,ip))==d_holes.end()) // no hole
-    return false;
-
-  if(d_holes[make_pair(domain,ip)]>time(0)-900)    // recent hole
-    return true;
-
-  // do we want to purge this? XXX FIXME 
-  return false;
-}
-
-void CommunicatorClass::makeNotifySocket()
-{
-  if((d_nsock=socket(AF_INET, SOCK_DGRAM,0))<0)
-    throw AhuException(string("notification socket: ")+strerror(errno));
-
-  struct sockaddr_in sin;
-  memset((char *)&sin,0, sizeof(sin));
-  
-  sin.sin_family = AF_INET;
-
-  // Bind to a specific IP (query-local-address) if specified
-  string querylocaladdress(::arg()["query-local-address"]);
-  if (querylocaladdress=="") {
-    sin.sin_addr.s_addr = INADDR_ANY;
-  }
-  else
-  {
-    struct hostent *h=0;
-    h=gethostbyname(querylocaladdress.c_str());
-    if(!h) {
-      Utility::closesocket(d_nsock);
-      d_nsock=-1;      
-      throw AhuException("Unable to resolve query local address");
-    }
-
-    sin.sin_addr.s_addr = *(int*)h->h_addr;
-  }
-  
-  int n=0;
-  for(;n<10;n++) {
-    sin.sin_port = htons(10000+(Utility::random()%50000));
-    
-    if(::bind(d_nsock, (struct sockaddr *)&sin, sizeof(sin)) >= 0) 
-      break;
-  }
-  if(n==10) {
-    Utility::closesocket(d_nsock);
-    d_nsock=-1;
-    throw AhuException(string("binding notify socket: ")+strerror(errno));
-  }
-  if( !Utility::setNonBlocking( d_nsock ))
-    throw AhuException(string("error getting or setting notify socket non-blocking: ")+strerror(errno));
-
-}
-
-void CommunicatorClass::notify(const string &domain, const string &ip)
-{
-  d_nq.add(domain, ip);
-
-  d_any_sem.post();
-}
 
 void CommunicatorClass::mainloop(void)
 {
@@ -437,9 +54,9 @@ void CommunicatorClass::mainloop(void)
     for(;;) {
       slaveRefresh(&P);
       masterUpdateCheck(&P);
-
-      tick=min(doNotifications(),
-              d_tickinterval);
+      tick=doNotifications();
+      
+      tick = min (tick, d_tickinterval); 
 
       //      L<<Logger::Error<<"tick = "<<tick<<", d_tickinterval = "<<d_tickinterval<<endl;
       next=time(0)+tick;
diff --git a/pdns/mastercommunicator.cc b/pdns/mastercommunicator.cc
new file mode 100644 (file)
index 0000000..b99c898
--- /dev/null
@@ -0,0 +1,273 @@
+/*
+    PowerDNS Versatile Database Driven Nameserver
+    Copyright (C) 2002-2009  PowerDNS.COM BV
+
+    This program is free software; you can redistribute it and/or modify
+    it under the terms of the GNU General Public License version 2 as 
+    published by the Free Software Foundation; 
+
+    This program is distributed in the hope that it will be useful,
+    but WITHOUT ANY WARRANTY; without even the implied warranty of
+    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+    GNU General Public License for more details.
+
+    You should have received a copy of the GNU General Public License
+    along with this program; if not, write to the Free Software
+    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
+*/
+#include "packetcache.hh"
+#include "utility.hh"
+#include <errno.h>
+#include "communicator.hh"
+#include <set>
+#include <boost/utility.hpp>
+#include "dnsbackend.hh"
+#include "ueberbackend.hh"
+#include "packethandler.hh"
+#include "resolver.hh"
+#include "logger.hh"
+#include "dns.hh"
+#include "arguments.hh"
+#include "session.hh"
+#include "packetcache.hh"
+#include <boost/lexical_cast.hpp>
+
+#include "namespaces.hh"
+
+// class that one day might be more than a function to help you get IP addresses for a nameserver
+class FindNS
+{
+public:
+  vector<string>lookup(const string &name, DNSBackend *B)
+  {
+    vector<string>addresses;
+    struct hostent *h;
+    h=gethostbyname(name.c_str());
+
+    if(h) {
+      for(char **h_addr_list=h->h_addr_list;*h_addr_list;++h_addr_list) {
+       ostringstream os;
+       unsigned char *p=reinterpret_cast<unsigned char *>(*h_addr_list);
+       os<<(int)*p++<<".";
+       os<<(int)*p++<<".";
+       os<<(int)*p++<<".";
+       os<<(int)*p++;
+
+       addresses.push_back(os.str());
+      }
+    }
+
+    B->lookup(QType(QType::A),name);
+    DNSResourceRecord rr;
+    while(B->get(rr)) 
+      addresses.push_back(rr.content);   // SOL if you have a CNAME for an NS
+
+    return addresses;
+  }
+}d_fns;
+
+void CommunicatorClass::queueNotifyDomain(const string &domain, DNSBackend *B)
+{
+  set<string> ips;
+  
+  DNSResourceRecord rr;
+  set<string>nsset;
+
+  B->lookup(QType(QType::NS),domain);
+  while(B->get(rr)) 
+    nsset.insert(rr.content);
+  
+  for(set<string>::const_iterator j=nsset.begin();j!=nsset.end();++j) {
+    vector<string>nsips=d_fns.lookup(*j, B);
+    if(nsips.empty())
+      L<<Logger::Warning<<"Unable to queue notification of domain '"<<domain<<"': nameservers do not resolve!"<<endl;
+    for(vector<string>::const_iterator k=nsips.begin();k!=nsips.end();++k)
+      ips.insert(*k);
+  }
+  
+  // make calls to d_nq.add(domain, ip);
+  for(set<string>::const_iterator j=ips.begin();j!=ips.end();++j) {
+    L<<Logger::Warning<<"Queued notification of domain '"<<domain<<"' to "<<*j<<endl;
+    d_nq.add(domain,*j);
+  }
+  
+  set<string>alsoNotify;
+  B->alsoNotifies(domain, &alsoNotify);
+  
+  for(set<string>::const_iterator j=alsoNotify.begin();j!=alsoNotify.end();++j) {
+    L<<Logger::Warning<<"Queued also-notification of domain '"<<domain<<"' to "<<*j<<endl;
+    d_nq.add(domain,*j);
+  }
+}
+
+bool CommunicatorClass::notifyDomain(const string &domain)
+{
+  DomainInfo di;
+  PacketHandler P;
+  if(!P.getBackend()->getDomainInfo(domain, di)) {
+    L<<Logger::Error<<"No such domain '"<<domain<<"' in our database"<<endl;
+    return false;
+  }
+  queueNotifyDomain(domain, P.getBackend());
+  // call backend and tell them we sent out the notification - even though that is premature    
+  di.backend->setNotified(di.id, di.serial);
+
+  return true; 
+}
+
+
+void CommunicatorClass::masterUpdateCheck(PacketHandler *P)
+{
+  if(!::arg().mustDo("master"))
+    return; 
+
+  UeberBackend *B=dynamic_cast<UeberBackend *>(P->getBackend());
+  vector<DomainInfo> cmdomains;
+  B->getUpdatedMasters(&cmdomains);
+  
+  if(cmdomains.empty()) {
+    if(d_masterschanged)
+      L<<Logger::Warning<<"No master domains need notifications"<<endl;
+    d_masterschanged=false;
+  }
+  else {
+    d_masterschanged=true;
+    L<<Logger::Error<<cmdomains.size()<<" domain"<<(cmdomains.size()>1 ? "s" : "")<<" for which we are master need"<<
+      (cmdomains.size()>1 ? "" : "s")<<
+      " notifications"<<endl;
+  }
+
+  // figure out A records of everybody needing notification
+  // do this via the FindNS class, d_fns
+  
+  for(vector<DomainInfo>::const_iterator i=cmdomains.begin();i!=cmdomains.end();++i) {
+    extern PacketCache PC;
+    vector<string> topurge;
+    topurge.push_back(i->zone);
+    PC.purge(topurge); // fixes cvstrac ticket #30
+    queueNotifyDomain(i->zone,P->getBackend());
+    i->backend->setNotified(i->id,i->serial); 
+  }
+}
+
+time_t CommunicatorClass::doNotifications()
+{
+  ComboAddress from;
+  Utility::socklen_t fromlen=sizeof(from);
+  char buffer[1500];
+  int size;
+  static Resolver d_nresolver;
+  // receive incoming notifications on the nonblocking socket and take them off the list
+
+  while((size=recvfrom(d_nsock,buffer,sizeof(buffer),0,(struct sockaddr *)&from,&fromlen))>0) {
+    DNSPacket p;
+
+    p.setRemote(&from);
+
+    if(p.parse(buffer,size)<0) {
+      L<<Logger::Warning<<"Unable to parse SOA notification answer from "<<p.getRemote()<<endl;
+      continue;
+    }
+
+    if(p.d.rcode)
+      L<<Logger::Warning<<"Received unsuccesful notification report for '"<<p.qdomain<<"' from "<<p.getRemote()<<", rcode: "<<p.d.rcode<<endl;      
+    
+    if(d_nq.removeIf(p.getRemote(), p.d.id, p.qdomain))
+      L<<Logger::Warning<<"Removed from notification list: '"<<p.qdomain<<"' to "<<p.getRemote()<< (p.d.rcode ? "" : " (was acknowledged)")<<endl;      
+    else
+      L<<Logger::Warning<<"Received spurious notify answer for '"<<p.qdomain<<"' from "<<p.getRemote()<<endl;      
+  }
+
+  // send out possible new notifications
+  string domain, ip;
+  uint16_t id;
+
+  bool purged;
+  while(d_nq.getOne(domain, ip, &id, purged)) {
+    if(!purged) {
+      try {
+       d_nresolver.notify(d_nsock, domain, ip, id);
+       drillHole(domain, ip);
+      }
+      catch(ResolverException &re) {
+       L<<Logger::Error<<"Error trying to resolve '"+ip+"' for notifying '"+domain+"' to server: "+re.reason<<endl;
+      }
+    }
+    else
+      L<<Logger::Error<<Logger::NTLog<<"Notification for "<<domain<<" to "<<ip<<" failed after retries"<<endl;
+  }
+
+  return d_nq.earliest();
+}
+
+void CommunicatorClass::drillHole(const string &domain, const string &ip)
+{
+  Lock l(&d_holelock);
+  d_holes[make_pair(domain,ip)]=time(0);
+}
+
+bool CommunicatorClass::justNotified(const string &domain, const string &ip)
+{
+  Lock l(&d_holelock);
+  if(d_holes.find(make_pair(domain,ip))==d_holes.end()) // no hole
+    return false;
+
+  if(d_holes[make_pair(domain,ip)]>time(0)-900)    // recent hole
+    return true;
+
+  // do we want to purge this? XXX FIXME 
+  return false;
+}
+
+void CommunicatorClass::makeNotifySocket()
+{
+  if((d_nsock=socket(AF_INET, SOCK_DGRAM,0))<0)
+    throw AhuException(string("notification socket: ")+strerror(errno));
+
+  struct sockaddr_in sin;
+  memset((char *)&sin,0, sizeof(sin));
+  
+  sin.sin_family = AF_INET;
+
+  // Bind to a specific IP (query-local-address) if specified
+  string querylocaladdress(::arg()["query-local-address"]);
+  if (querylocaladdress=="") {
+    sin.sin_addr.s_addr = INADDR_ANY;
+  }
+  else
+  {
+    struct hostent *h=0;
+    h=gethostbyname(querylocaladdress.c_str());
+    if(!h) {
+      Utility::closesocket(d_nsock);
+      d_nsock=-1;      
+      throw AhuException("Unable to resolve query local address");
+    }
+
+    sin.sin_addr.s_addr = *(int*)h->h_addr;
+  }
+  
+  int n=0;
+  for(;n<10;n++) {
+    sin.sin_port = htons(10000+(Utility::random()%50000));
+    
+    if(::bind(d_nsock, (struct sockaddr *)&sin, sizeof(sin)) >= 0) 
+      break;
+  }
+  if(n==10) {
+    Utility::closesocket(d_nsock);
+    d_nsock=-1;
+    throw AhuException(string("binding notify socket: ")+strerror(errno));
+  }
+  if( !Utility::setNonBlocking( d_nsock ))
+    throw AhuException(string("error getting or setting notify socket non-blocking: ")+strerror(errno));
+
+}
+
+void CommunicatorClass::notify(const string &domain, const string &ip)
+{
+  d_nq.add(domain, ip);
+
+  d_any_sem.post();
+}
+
index 098a04c0ed7e5897ebc3cbbdb772b9cfc85a615c..e6b6d45b9bd9a31a95b9a6ba42298495f3f23fb5 100644 (file)
@@ -1,6 +1,6 @@
 /*
     PowerDNS Versatile Database Driven Nameserver
-    Copyright (C) 2002 - 2008 PowerDNS.COM BV
+    Copyright (C) 2002 - 2009 PowerDNS.COM BV
 
     This program is free software; you can redistribute it and/or modify
     it under the terms of the GNU General Public License version 2 as 
@@ -25,6 +25,7 @@
 #include "misc.hh"
 #include <algorithm>
 #include <sstream>
+#include "dnsrecords.hh"
 #include <cstring>
 #include <string>
 #include <vector>
@@ -113,30 +114,6 @@ void Resolver::timeoutReadn(char *buffer, int bytes)
   }
 }
 
-char* Resolver::sendReceive(const string &ip, uint16_t remotePort, const char *packet, int length, unsigned int *replen)
-{
-  makeTCPSocket(ip, remotePort);
-
-  if(sendData(packet,length,d_sock)<0) 
-    throw ResolverException("Unable to send packet to remote nameserver "+ip+": "+stringerror());
-
-  int plen=getLength();
-  if(plen<0)
-    throw ResolverException("EOF trying to get length of answer from remote TCP server");
-
-  char *answer=new char[plen];
-  try {
-    timeoutReadn(answer,plen);
-    *replen=plen;
-    return answer;
-  }
-  catch(...) {
-    delete answer;
-    throw; // whop!
-  }
-  return 0;
-}
-
 int Resolver::notify(int sock, const string &domain, const string &ip, uint16_t id)
 {
   vector<uint8_t> packet;
@@ -151,7 +128,7 @@ int Resolver::notify(int sock, const string &domain, const string &ip, uint16_t
   return true;
 }
 
-void Resolver::sendResolve(const string &ip, const char *domain, int type)
+uint16_t Resolver::sendResolve(const string &ip, const char *domain, int type)
 {
   vector<uint8_t> packet;
   DNSPacketWriter pw(packet, domain, type);
@@ -175,20 +152,49 @@ void Resolver::sendResolve(const string &ip, const char *domain, int type)
   if(sendto(d_sock, &packet[0], packet.size(), 0, (struct sockaddr*)(&remote), remote.getSocklen()) < 0) {
     throw ResolverException("Unable to ask query of "+st.host+":"+itoa(st.port)+": "+stringerror());
   }
+  return d_randomid;
 }
 
-int Resolver::receiveResolve(struct sockaddr* fromaddr, Utility::socklen_t addrlen)
+bool Resolver::tryGetSOASerial(string* domain, uint32_t *theirSerial, uint16_t* id)
 {
-  fd_set rd;
-  FD_ZERO(&rd);
-  FD_SET(d_sock, &rd);
+  Utility::setNonBlocking( d_sock );
+  
+  if(waitForData(d_sock, 0, 500000) == 0)
+    return false;
+  
+  int err;
+  ComboAddress fromaddr;
+  socklen_t addrlen=fromaddr.getSocklen();
+  err = recvfrom(d_sock, reinterpret_cast< char * >( d_buf ), 512, 0,(struct sockaddr*)(&fromaddr), &addrlen);
+  if(err < 0) {
+    if(errno == EAGAIN)
+      return false;
+    
+    throw ResolverException("recvfrom error waiting for answer: "+stringerror());
+  }
+  
+  MOADNSParser mdp((char*)d_buf, err);
+  *id=mdp.d_header.id;
+  *domain = stripDot(mdp.d_qname);
+  
+  if(mdp.d_answers.empty())
+    throw ResolverException("Query to '" + fromaddr.toString() + "' for SOA of '" + *domain + "' produced no results");
+  
+  if(mdp.d_qtype != QType::SOA || mdp.d_answers.begin()->first.d_type != QType::SOA) 
+    throw ResolverException("Query to '" + fromaddr.toString() + "' for SOA of '" + *domain + "' returned wrong record type");
 
-  struct timeval timeout;
-  timeout.tv_sec=0;
-  timeout.tv_usec=750000;
+  shared_ptr<SOARecordContent> rrc=boost::dynamic_pointer_cast<SOARecordContent>(mdp.d_answers.begin()->first.d_content);
 
-  int res=select(d_sock+1,&rd,0,0,&timeout);
+  *theirSerial=rrc->d_st.serial;
+  
+  
+  return true;
+}
 
+int Resolver::receiveResolve(struct sockaddr* fromaddr, Utility::socklen_t addrlen)
+{
+  int res=waitForData(d_sock, 0, 7500000); 
+  
   if(!res) {
     throw ResolverException("Timeout waiting for answer");
   }
@@ -256,36 +262,27 @@ void Resolver::makeTCPSocket(const string &ip, uint16_t port)
   if(!err)
     goto done;
 
-  fd_set rset,wset;
-  struct timeval tval;
-
-  FD_ZERO(&rset);
-  FD_SET(d_sock, &rset);
-  wset=rset;
-  tval.tv_sec=10;
-  tval.tv_usec=0;
-
-  if(!select(d_sock+1,&rset,&wset,0,tval.tv_sec ? &tval : 0)) {
+  err=waitForRWData(d_sock, false, 10, 0); // wait for writeability
+  
+  if(!err) {
     Utility::closesocket(d_sock); // timeout
     d_sock=-1;
     errno=ETIMEDOUT;
     
     throw ResolverException("Timeout connecting to server");
   }
-  
-  if(FD_ISSET(d_sock, &rset) || FD_ISSET(d_sock, &wset))
-    {
+  else if(err < 0) {
+    throw ResolverException("Error connecting: "+string(strerror(err)));
+  }
+  else {
     Utility::socklen_t len=sizeof(err);
-      if(getsockopt(d_sock, SOL_SOCKET,SO_ERROR,(char *)&err,&len)<0)
-       throw ResolverException("Error connecting: "+stringerror()); // Solaris
-
-      if(err)
-       throw ResolverException("Error connecting: "+string(strerror(err)));
-
-    }
-  else
-    throw ResolverException("nonblocking connect failed");
+    if(getsockopt(d_sock, SOL_SOCKET,SO_ERROR,(char *)&err,&len)<0)
+      throw ResolverException("Error connecting: "+stringerror()); // Solaris
 
+    if(err)
+      throw ResolverException("Error connecting: "+string(strerror(err)));
+  }
+  
  done:
   Utility::setBlocking( d_sock );
   // d_sock now connected
@@ -315,15 +312,8 @@ int Resolver::axfr(const string &ip, const char *domain)
   if(ret<0)
     throw ResolverException("Error sending question to "+ip+": "+stringerror());
 
-  fd_set rd;
-  FD_ZERO(&rd);
-  FD_SET(d_sock, &rd);
-
-  struct timeval timeout;
-  timeout.tv_sec=10;
-  timeout.tv_usec=0;
-
-  int res=select(d_sock+1,&rd,0,0,&timeout);
+  int res = waitForData(d_sock, 10, 0);
+  
   if(!res)
     throw ResolverException("Timeout waiting for answer from "+ip+" during AXFR");
   if(res<0)
index 8e1c71714abc7582f33b2fa47e7e16f1af01ad55..58d4fdc8cf896e4fd8bd4110efa4a2d6b3e76673 100644 (file)
@@ -57,10 +57,11 @@ public:
   void makeTCPSocket(const string &ip, uint16_t port=53);
   int notify(int sock, const string &domain, const string &ip, uint16_t id);
   int resolve(const string &ip, const char *domain, int type);
-  void sendResolve(const string &ip, const char *domain, int type);
+  uint16_t sendResolve(const string &ip, const char *domain, int type);
+  bool tryGetSOASerial(string* theirDomain, uint32_t* theirSerial, uint16_t* id);
 
   int receiveResolve(struct sockaddr* fromaddr, Utility::socklen_t addrlen);
-  char* sendReceive(const string &ip, uint16_t remotePort, const char *packet, int length, unsigned int *replylen);
+  
   void getSoaSerial(const string &, const string &, uint32_t *);
   int axfrChunk(Resolver::res_t &res);
   vector<DNSResourceRecord> result();
diff --git a/pdns/slavecommunicator.cc b/pdns/slavecommunicator.cc
new file mode 100644 (file)
index 0000000..541efdc
--- /dev/null
@@ -0,0 +1,217 @@
+/*
+    PowerDNS Versatile Database Driven Nameserver
+    Copyright (C) 2002-2009  PowerDNS.COM BV
+
+    This program is free software; you can redistribute it and/or modify
+    it under the terms of the GNU General Public License version 2 as 
+    published by the Free Software Foundation; 
+
+    This program is distributed in the hope that it will be useful,
+    but WITHOUT ANY WARRANTY; without even the implied warranty of
+    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+    GNU General Public License for more details.
+
+    You should have received a copy of the GNU General Public License
+    along with this program; if not, write to the Free Software
+    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
+*/
+#include "packetcache.hh"
+#include "utility.hh"
+#include <errno.h>
+#include "communicator.hh"
+#include <set>
+#include <boost/utility.hpp>
+#include "dnsbackend.hh"
+#include "ueberbackend.hh"
+#include "packethandler.hh"
+#include "resolver.hh"
+#include "logger.hh"
+#include "dns.hh"
+#include "arguments.hh"
+#include "session.hh"
+#include "packetcache.hh"
+#include <boost/foreach.hpp>
+#include <boost/lexical_cast.hpp>
+#include "inflighter.cc"
+
+#include "namespaces.hh"
+
+void CommunicatorClass::addSuckRequest(const string &domain, const string &master, bool priority)
+{
+  Lock l(&d_lock);
+  
+  SuckRequest sr;
+  sr.domain = domain;
+  sr.master = master;
+
+  if(priority) {
+    d_suckdomains.push_front(sr);
+    //  d_havepriosuckrequest=true;
+  }
+  else 
+    d_suckdomains.push_back(sr);
+  
+  d_suck_sem.post();
+  d_any_sem.post();
+}
+
+void CommunicatorClass::suck(const string &domain,const string &remote)
+{
+  L<<Logger::Error<<"Initiating transfer of '"<<domain<<"' from remote '"<<remote<<"'"<<endl;
+  uint32_t domain_id;
+  PacketHandler P;
+
+  DomainInfo di;
+  di.backend=0;
+  bool first=true;    
+  try {
+    Resolver resolver;
+    resolver.axfr(remote, domain.c_str());
+
+    UeberBackend *B=dynamic_cast<UeberBackend *>(P.getBackend());
+
+    if(!B->getDomainInfo(domain, di) || !di.backend) {
+      L<<Logger::Error<<"Can't determine backend for domain '"<<domain<<"'"<<endl;
+      return;
+    }
+    domain_id=di.id;
+
+    Resolver::res_t recs;
+
+    while(resolver.axfrChunk(recs)) {
+      if(first) {
+       L<<Logger::Error<<"AXFR started for '"<<domain<<"', transaction started"<<endl;
+       di.backend->startTransaction(domain, domain_id);
+       first=false;
+      }
+      for(Resolver::res_t::iterator i=recs.begin();i!=recs.end();++i) {
+       if(!endsOn(i->qname, domain)) { 
+         L<<Logger::Error<<"Remote "<<remote<<" tried to sneak in out-of-zone data '"<<i->qname<<"' during AXFR of zone '"<<domain<<"', ignoring"<<endl;
+         continue;
+       }
+       i->domain_id=domain_id;
+       if(i->qtype.getCode()>=1024)
+         throw DBException("Database can't store unknown record type "+lexical_cast<string>(i->qtype.getCode()-1024));
+
+       di.backend->feedRecord(*i);
+      }
+    }
+    di.backend->commitTransaction();
+    di.backend->setFresh(domain_id);
+    L<<Logger::Error<<"AXFR done for '"<<domain<<"', zone committed"<<endl;
+  }
+  catch(DBException &re) {
+    L<<Logger::Error<<"Unable to feed record during incoming AXFR of '"+domain+"': "<<re.reason<<endl;
+    if(di.backend && !first) {
+      L<<Logger::Error<<"Aborting possible open transaction for domain '"<<domain<<"' AXFR"<<endl;
+      di.backend->abortTransaction();
+    }
+  }
+  catch(ResolverException &re) {
+    L<<Logger::Error<<"Unable to AXFR zone '"+domain+"' from remote '"<<remote<<"': "<<re.reason<<endl;
+    if(di.backend && !first) {
+      L<<Logger::Error<<"Aborting possible open transaction for domain '"<<domain<<"' AXFR"<<endl;
+      di.backend->abortTransaction();
+    }
+  }
+}
+struct QueryInfo
+  {
+    struct timeval query_ttd;
+    uint16_t id;
+  };
+
+struct SlaveSenderReceiver
+{
+  typedef pair<string, uint16_t> Identifier;
+  typedef uint32_t Answer;
+  
+  map<uint32_t, uint32_t> d_serials;
+  
+  SlaveSenderReceiver()
+  {
+    d_resolver.makeUDPSocket();
+  }
+  
+  Identifier send(DomainInfo& di)
+  {
+    random_shuffle(di.masters.begin(), di.masters.end());
+    return make_pair(di.zone, d_resolver.sendResolve(*di.masters.begin(), di.zone.c_str(), QType::SOA));
+  }
+  
+  bool receive(Identifier& id, Answer& a)
+  {
+    if(d_resolver.tryGetSOASerial(&id.first, &a, &id.second)) {
+      return 1;
+    }
+    return 0;
+  }
+  
+  void deliverAnswer(DomainInfo& i, uint32_t serial)
+  {
+    d_serials[i.id]=serial;
+    //cerr<<"Got a serial of "<<serial<<" for "<<i.zone<<endl;
+  }
+  
+  Resolver d_resolver;
+
+};
+
+void CommunicatorClass::slaveRefresh(PacketHandler *P)
+{
+  UeberBackend *B=dynamic_cast<UeberBackend *>(P->getBackend());
+  vector<DomainInfo> sdomains;
+  B->getUnfreshSlaveInfos(&sdomains);
+  if(sdomains.empty())
+  {
+    if(d_slaveschanged)
+      L<<Logger::Warning<<"All slave domains are fresh"<<endl;
+    d_slaveschanged=false;
+    return;
+  }
+  else 
+    L<<Logger::Warning<<sdomains.size()<<" slave domain"<<(sdomains.size()>1 ? "s" : "")<<" need"<<
+      (sdomains.size()>1 ? "" : "s")<<
+      " checking"<<endl;
+      
+  SlaveSenderReceiver ssr;
+  Inflighter<vector<DomainInfo>, SlaveSenderReceiver> ifl(sdomains, ssr);
+  
+  ifl.d_maxInFlight = 200;
+
+  for(;;) {
+    try {
+      ifl.run();
+      break;
+    }
+    catch(exception& e) {
+      L<<Logger::Error<<"While checking domain freshness: " << e.what()<<endl;
+    }
+    catch(AhuException &re) {  
+      L<<Logger::Error<<"While checking domain freshness: " << re.reason<<endl;
+    }
+  }
+  L<<Logger::Warning<<"Received serial number updates for "<<ssr.d_serials.size()<<" zones"<<endl;
+  int suckRequests=0;
+  BOOST_FOREACH(DomainInfo& di, sdomains) {
+    if(!ssr.d_serials.count(di.id)) 
+      continue;
+    uint32_t theirserial = ssr.d_serials[di.id], ourserial = di.serial;
+    
+    if(theirserial < ourserial) {
+      L<<Logger::Error<<"Domain "<<di.zone<<" more recent than master, our serial " << ourserial << " > their serial "<< theirserial << endl;
+      di.backend->setFresh(di.id);
+    }
+    else if(theirserial == ourserial) {
+      L<<Logger::Warning<<"Domain "<< di.zone<<" is fresh"<<endl;
+      di.backend->setFresh(di.id);
+    }
+    else {
+      L<<Logger::Warning<<"Domain "<< di.zone<<" is stale, master serial "<<theirserial<<", our serial "<< ourserial <<endl;
+      addSuckRequest(di.zone, *di.masters.begin());
+      suckRequests++;
+    }
+  }
+  L<<Logger::Warning<<"Attempted "<<suckRequests<<" zone transfers"<<endl;
+}  
+