From: Bert Hubert Date: Mon, 10 Jan 2011 13:48:17 +0000 (+0000) Subject: hypermodern bulk slave engine forward ported from 2.9.22.x. Does 5000 zones in 3... X-Git-Tag: auth-3.0~388 X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=dbcb3066a1c1da733a432b78a71ef60084be7d63;p=pdns hypermodern bulk slave engine forward ported from 2.9.22.x. Does 5000 zones in 3 seconds or so. git-svn-id: svn://svn.powerdns.com/pdns/trunk/pdns@1859 d19b8d6e-7fed-0310-83ef-9ca221ded41b --- diff --git a/pdns/common_startup.cc b/pdns/common_startup.cc index f8bffc4c4..1913edc39 100644 --- a/pdns/common_startup.cc +++ b/pdns/common_startup.cc @@ -49,6 +49,8 @@ void declareArguments() ::arg().set("query-local-address","Source IP address for sending queries")=""; ::arg().set("max-queue-length","Maximum queuelength before considering situation lost")="5000"; ::arg().set("soa-serial-offset","Make sure that no SOA serial is less than this number")="0"; + + ::arg().set("retrieval-threads", "Number of AXFR-retrieval threads for slave operation")="2"; ::arg().setCmd("help","Provide a helpful message"); ::arg().setCmd("version","Output version and compilation date"); diff --git a/pdns/communicator.cc b/pdns/communicator.cc index fcdd2aa5c..e5f0dd096 100644 --- a/pdns/communicator.cc +++ b/pdns/communicator.cc @@ -32,8 +32,39 @@ #include "packetcache.hh" #include -#include "namespaces.hh" +// #include "namespaces.hh" +void CommunicatorClass::retrievalLoopThread(void) +{ + for(;;) { + d_suck_sem.wait(); + SuckRequest sr; + { + Lock l(&d_lock); + if(d_suckdomains.empty()) + continue; + + sr=d_suckdomains.front(); + d_suckdomains.pop_front(); + } + try { + suck(sr.domain,sr.master); + } + catch(AhuException& ae) { + cerr<<"Error: "< #include #include +#include +#include +#include +using namespace boost::multi_index; -#ifndef WIN32 +#ifndef WIN32 # include # include # include @@ -41,8 +45,22 @@ struct SuckRequest { string domain; string master; + bool operator<(const SuckRequest& b) const + { + return tie(domain, master) < tie(b.domain, b.master); + } }; +struct IDTag{}; + +typedef multi_index_container< + SuckRequest, + indexed_by< + sequenced<>, + ordered_unique, identity > + > +> UniQueue; + class NotificationQueue { public: @@ -126,28 +144,30 @@ public: { pthread_mutex_init(&d_lock,0); pthread_mutex_init(&d_holelock,0); -// sem_init(&d_suck_sem,0,0); -// sem_init(&d_any_sem,0,0); + d_tickinterval=60; d_masterschanged=d_slaveschanged=true; } time_t doNotifications(); - void go() - { - pthread_t tid; - pthread_create(&tid,0,&launchhelper,this); - } - + void go(); + + void drillHole(const string &domain, const string &ip); bool justNotified(const string &domain, const string &ip); void addSuckRequest(const string &domain, const string &master, bool priority=false); void notify(const string &domain, const string &ip); void mainloop(); + void retrievalLoopThread(); static void *launchhelper(void *p) { static_cast(p)->mainloop(); return 0; } + static void *retrieveLaunchhelper(void *p) + { + static_cast(p)->retrievalLoopThread(); + return 0; + } bool notifyDomain(const string &domain); private: void makeNotifySocket(); @@ -155,11 +175,14 @@ private: int d_nsock; map,time_t>d_holes; pthread_mutex_t d_holelock; + void launchRetrievalThreads(); void suck(const string &domain, const string &remote); void slaveRefresh(PacketHandler *P); void masterUpdateCheck(PacketHandler *P); pthread_mutex_t d_lock; - std::deque d_suckdomains; + + UniQueue d_suckdomains; + bool d_havepriosuckrequest; Semaphore d_suck_sem; Semaphore d_any_sem; diff --git a/pdns/resolver.hh b/pdns/resolver.hh index 58d4fdc8c..e5f21585d 100644 --- a/pdns/resolver.hh +++ b/pdns/resolver.hh @@ -1,6 +1,6 @@ /* PowerDNS Versatile Database Driven Nameserver - Copyright (C) 2002 - 2006 PowerDNS.COM BV + Copyright (C) 2002 - 2011 PowerDNS.COM BV This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License version 2 diff --git a/pdns/slavecommunicator.cc b/pdns/slavecommunicator.cc index 7d9d0e765..e722762ff 100644 --- a/pdns/slavecommunicator.cc +++ b/pdns/slavecommunicator.cc @@ -46,16 +46,17 @@ void CommunicatorClass::addSuckRequest(const string &domain, const string &maste SuckRequest sr; sr.domain = domain; sr.master = master; - + pair res; if(priority) { - d_suckdomains.push_front(sr); - // d_havepriosuckrequest=true; + res=d_suckdomains.push_front(sr); + } + else { + res=d_suckdomains.push_back(sr); } - else - d_suckdomains.push_back(sr); + if(res.second) { d_suck_sem.post(); - d_any_sem.post(); + } } void CommunicatorClass::suck(const string &domain,const string &remote) @@ -200,7 +201,7 @@ struct SlaveSenderReceiver return 0; } - void deliverAnswer(DomainInfo& i, uint32_t serial, uint32_t usec) + void deliverAnswer(DomainInfo& i, uint32_t serial, unsigned int usec) { d_serials[i.id]=serial; } @@ -212,19 +213,44 @@ struct SlaveSenderReceiver void CommunicatorClass::slaveRefresh(PacketHandler *P) { UeberBackend *B=dynamic_cast(P->getBackend()); - vector sdomains; - B->getUnfreshSlaveInfos(&sdomains); + vector sdomains, rdomains; + B->getUnfreshSlaveInfos(&rdomains); + + { + Lock l(&d_lock); + typedef UniQueue::index::type domains_by_name_t; + domains_by_name_t& nameindex=boost::multi_index::get(d_suckdomains); + + + BOOST_FOREACH(DomainInfo& di, rdomains) { + SuckRequest sr; + sr.domain=di.zone; + if(di.masters.empty()) // slave domains w/o masters are ignored + continue; + // remove unfresh domains already queued for AXFR, no sense polling them again + sr.master=*di.masters.begin(); + if(nameindex.count(sr)) + continue; + sdomains.push_back(di); + } +// cerr<1 ? "s" : "")<<" need"<< (sdomains.size()>1 ? "" : "s")<< - " checking"<, SlaveSenderReceiver> ifl(sdomains, ssr); @@ -236,7 +262,7 @@ void CommunicatorClass::slaveRefresh(PacketHandler *P) ifl.run(); break; } - catch(exception& e) { + catch(std::exception& e) { L<