]> granicus.if.org Git - pdns/commitdiff
hypermodern bulk slave engine forward ported from 2.9.22.x. Does 5000 zones in 3...
authorBert Hubert <bert.hubert@netherlabs.nl>
Mon, 10 Jan 2011 13:48:17 +0000 (13:48 +0000)
committerBert Hubert <bert.hubert@netherlabs.nl>
Mon, 10 Jan 2011 13:48:17 +0000 (13:48 +0000)
git-svn-id: svn://svn.powerdns.com/pdns/trunk/pdns@1859 d19b8d6e-7fed-0310-83ef-9ca221ded41b

pdns/common_startup.cc
pdns/communicator.cc
pdns/communicator.hh
pdns/resolver.hh
pdns/slavecommunicator.cc

index f8bffc4c4196eb8aacab17737911845afd713d7a..1913edc394cb36ad6f26139984efc18b827d6272 100644 (file)
@@ -49,6 +49,8 @@ void declareArguments()
   ::arg().set("query-local-address","Source IP address for sending queries")="";
   ::arg().set("max-queue-length","Maximum queuelength before considering situation lost")="5000";
   ::arg().set("soa-serial-offset","Make sure that no SOA serial is less than this number")="0";
+  
+  ::arg().set("retrieval-threads", "Number of AXFR-retrieval threads for slave operation")="2";
 
   ::arg().setCmd("help","Provide a helpful message");
   ::arg().setCmd("version","Output version and compilation date");
index fcdd2aa5cbf12d5dc460dca02c8ad5052161965e..e5f0dd0969218045c3d1e8d7363f67c67a8afafa 100644 (file)
 #include "packetcache.hh"
 #include <boost/lexical_cast.hpp>
 
-#include "namespaces.hh"
+// #include "namespaces.hh"
 
+void CommunicatorClass::retrievalLoopThread(void)
+{
+  for(;;) {
+    d_suck_sem.wait();
+    SuckRequest sr;
+    {
+      Lock l(&d_lock);
+      if(d_suckdomains.empty()) 
+       continue;
+       
+      sr=d_suckdomains.front();
+      d_suckdomains.pop_front();
+    }
+    try {
+      suck(sr.domain,sr.master);
+    }
+    catch(AhuException& ae) {
+      cerr<<"Error: "<<ae.reason<<endl;
+    }
+  }
+}
+
+
+void CommunicatorClass::go()
+{
+  pthread_t tid;
+  pthread_create(&tid,0,&launchhelper,this);
+  for(int n=0; n < ::arg().asNum("retrieval-threads"); ++n)
+    pthread_create(&tid, 0, &retrieveLaunchhelper, this);
+
+}
 
 void CommunicatorClass::mainloop(void)
 {
@@ -67,15 +98,7 @@ void CommunicatorClass::mainloop(void)
         if(rc)
           Utility::sleep(1);
         else { 
-          if(!d_suck_sem.tryWait()) {
-            SuckRequest sr;
-            {
-              Lock l(&d_lock);
-              sr=d_suckdomains.front();
-              d_suckdomains.pop_front();
-            }
-            suck(sr.domain,sr.master);
-          }
+          
         }
         // this gets executed at least once every second
         doNotifications();
index 36846b3192b6c45b0748deb48bc4a170dca538c3..9e8a29655bc8866d076628a2a4938e0f2f561cc8 100644 (file)
@@ -1,6 +1,6 @@
 /*
     PowerDNS Versatile Database Driven Nameserver
-    Copyright (C) 2002-2007  PowerDNS.COM BV
+    Copyright (C) 2002-2010  PowerDNS.COM BV
 
     This program is free software; you can redistribute it and/or modify
     it under the terms of the GNU General Public License version 2
 #include <queue>
 #include <list>
 #include <limits>
+#include <boost/multi_index_container.hpp>
+#include <boost/multi_index/identity.hpp>
+#include <boost/multi_index/sequenced_index.hpp>
+using namespace boost::multi_index;
 
-#ifndef WIN32
+#ifndef WIN32 
 # include <unistd.h>
 # include <fcntl.h>
 # include <netdb.h>
@@ -41,8 +45,22 @@ struct SuckRequest
 {
   string domain;
   string master;
+  bool operator<(const SuckRequest& b) const
+  {
+    return tie(domain, master) < tie(b.domain, b.master);
+  }
 };
 
+struct IDTag{};
+
+typedef multi_index_container<
+  SuckRequest,
+  indexed_by<
+    sequenced<>,
+    ordered_unique<tag<IDTag>, identity<SuckRequest> >
+  >
+> UniQueue;
+
 class NotificationQueue
 {
 public:
@@ -126,28 +144,30 @@ public:
   {
     pthread_mutex_init(&d_lock,0);
     pthread_mutex_init(&d_holelock,0);
-//    sem_init(&d_suck_sem,0,0);
-//    sem_init(&d_any_sem,0,0);
+
     d_tickinterval=60;
     d_masterschanged=d_slaveschanged=true;
   }
   time_t doNotifications();    
-  void go()
-  {
-    pthread_t tid;
-    pthread_create(&tid,0,&launchhelper,this);
-  }
-
+  void go();
+  
+  
   void drillHole(const string &domain, const string &ip);
   bool justNotified(const string &domain, const string &ip);
   void addSuckRequest(const string &domain, const string &master, bool priority=false);
   void notify(const string &domain, const string &ip);
   void mainloop();
+  void retrievalLoopThread();
   static void *launchhelper(void *p)
   {
     static_cast<CommunicatorClass *>(p)->mainloop();
     return 0;
   }
+  static void *retrieveLaunchhelper(void *p)
+  {
+    static_cast<CommunicatorClass *>(p)->retrievalLoopThread();
+    return 0;
+  }
   bool notifyDomain(const string &domain);
 private:
   void makeNotifySocket();
@@ -155,11 +175,14 @@ private:
   int d_nsock;
   map<pair<string,string>,time_t>d_holes;
   pthread_mutex_t d_holelock;
+  void launchRetrievalThreads();
   void suck(const string &domain, const string &remote);
   void slaveRefresh(PacketHandler *P);
   void masterUpdateCheck(PacketHandler *P);
   pthread_mutex_t d_lock;
-  std::deque<SuckRequest> d_suckdomains;
+  
+  UniQueue d_suckdomains;
+  
   bool d_havepriosuckrequest;
   Semaphore d_suck_sem;
   Semaphore d_any_sem;
index 58d4fdc8cf896e4fd8bd4110efa4a2d6b3e76673..e5f21585dc749f3898566b23904ec64dc5e4068b 100644 (file)
@@ -1,6 +1,6 @@
 /*
     PowerDNS Versatile Database Driven Nameserver
-    Copyright (C) 2002 - 2006  PowerDNS.COM BV
+    Copyright (C) 2002 - 2011  PowerDNS.COM BV
 
     This program is free software; you can redistribute it and/or modify
     it under the terms of the GNU General Public License version 2
index 7d9d0e7658674d7f37cd22a7c9596397b2be7118..e722762ff0ac9850c0dd68df761bb594dcaf37a0 100644 (file)
@@ -46,16 +46,17 @@ void CommunicatorClass::addSuckRequest(const string &domain, const string &maste
   SuckRequest sr;
   sr.domain = domain;
   sr.master = master;
-
+  pair<UniQueue::iterator, bool>  res;
   if(priority) {
-    d_suckdomains.push_front(sr);
-    //  d_havepriosuckrequest=true;
+    res=d_suckdomains.push_front(sr);
+  }
+  else {
+    res=d_suckdomains.push_back(sr);
   }
-  else 
-    d_suckdomains.push_back(sr);
   
+  if(res.second) {
   d_suck_sem.post();
-  d_any_sem.post();
+  }
 }
 
 void CommunicatorClass::suck(const string &domain,const string &remote)
@@ -200,7 +201,7 @@ struct SlaveSenderReceiver
     return 0;
   }
   
-  void deliverAnswer(DomainInfo& i, uint32_t serial, uint32_t usec)
+  void deliverAnswer(DomainInfo& i, uint32_t serial, unsigned int usec)
   {
     d_serials[i.id]=serial;
   }
@@ -212,19 +213,44 @@ struct SlaveSenderReceiver
 void CommunicatorClass::slaveRefresh(PacketHandler *P)
 {
   UeberBackend *B=dynamic_cast<UeberBackend *>(P->getBackend());
-  vector<DomainInfo> sdomains;
-  B->getUnfreshSlaveInfos(&sdomains);
+  vector<DomainInfo> sdomains, rdomains;
+  B->getUnfreshSlaveInfos(&rdomains);
+  
+  {
+    Lock l(&d_lock);
+    typedef UniQueue::index<IDTag>::type domains_by_name_t;
+    domains_by_name_t& nameindex=boost::multi_index::get<IDTag>(d_suckdomains);
+
+    
+    BOOST_FOREACH(DomainInfo& di, rdomains) {
+      SuckRequest sr;
+      sr.domain=di.zone;
+      if(di.masters.empty()) // slave domains w/o masters are ignored
+        continue;
+      // remove unfresh domains already queued for AXFR, no sense polling them again
+      sr.master=*di.masters.begin();
+      if(nameindex.count(sr))
+        continue;
+      sdomains.push_back(di);
+    }
+//    cerr<<rdomains.size() - sdomains.size()<<" prevented"<<endl;  
+  }
+  
   if(sdomains.empty())
   {
-    if(d_slaveschanged)
-      L<<Logger::Warning<<"All slave domains are fresh"<<endl;
-    d_slaveschanged=false;
+    if(d_slaveschanged) {
+      Lock l(&d_lock);
+      L<<Logger::Warning<<"No new unfresh slave domains, "<<d_suckdomains.size()<<" queued for AXFR already"<<endl;
+    }
+    d_slaveschanged = !rdomains.empty();
     return;
   }
-  else 
+  else {
+    Lock l(&d_lock);
     L<<Logger::Warning<<sdomains.size()<<" slave domain"<<(sdomains.size()>1 ? "s" : "")<<" need"<<
       (sdomains.size()>1 ? "" : "s")<<
-      " checking"<<endl;
+      " checking, "<<d_suckdomains.size()<<" queued for AXFR"<<endl;
+  }
       
   SlaveSenderReceiver ssr;
   Inflighter<vector<DomainInfo>, SlaveSenderReceiver> ifl(sdomains, ssr);
@@ -236,7 +262,7 @@ void CommunicatorClass::slaveRefresh(PacketHandler *P)
       ifl.run();
       break;
     }
-    catch(exception& e) {
+    catch(std::exception& e) {
       L<<Logger::Error<<"While checking domain freshness: " << e.what()<<endl;
     }
     catch(AhuException &re) {