::arg().set("query-local-address","Source IP address for sending queries")="";
::arg().set("max-queue-length","Maximum queuelength before considering situation lost")="5000";
::arg().set("soa-serial-offset","Make sure that no SOA serial is less than this number")="0";
+
+ ::arg().set("retrieval-threads", "Number of AXFR-retrieval threads for slave operation")="2";
::arg().setCmd("help","Provide a helpful message");
::arg().setCmd("version","Output version and compilation date");
#include "packetcache.hh"
#include <boost/lexical_cast.hpp>
-#include "namespaces.hh"
+// #include "namespaces.hh"
+void CommunicatorClass::retrievalLoopThread(void)
+{
+ for(;;) {
+ d_suck_sem.wait();
+ SuckRequest sr;
+ {
+ Lock l(&d_lock);
+ if(d_suckdomains.empty())
+ continue;
+
+ sr=d_suckdomains.front();
+ d_suckdomains.pop_front();
+ }
+ try {
+ suck(sr.domain,sr.master);
+ }
+ catch(AhuException& ae) {
+ cerr<<"Error: "<<ae.reason<<endl;
+ }
+ }
+}
+
+
+void CommunicatorClass::go()
+{
+ pthread_t tid;
+ pthread_create(&tid,0,&launchhelper,this);
+ for(int n=0; n < ::arg().asNum("retrieval-threads"); ++n)
+ pthread_create(&tid, 0, &retrieveLaunchhelper, this);
+
+}
void CommunicatorClass::mainloop(void)
{
if(rc)
Utility::sleep(1);
else {
- if(!d_suck_sem.tryWait()) {
- SuckRequest sr;
- {
- Lock l(&d_lock);
- sr=d_suckdomains.front();
- d_suckdomains.pop_front();
- }
- suck(sr.domain,sr.master);
- }
+
}
// this gets executed at least once every second
doNotifications();
/*
PowerDNS Versatile Database Driven Nameserver
- Copyright (C) 2002-2007 PowerDNS.COM BV
+ Copyright (C) 2002-2010 PowerDNS.COM BV
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License version 2
#include <queue>
#include <list>
#include <limits>
+#include <boost/multi_index_container.hpp>
+#include <boost/multi_index/identity.hpp>
+#include <boost/multi_index/sequenced_index.hpp>
+using namespace boost::multi_index;
-#ifndef WIN32
+#ifndef WIN32
# include <unistd.h>
# include <fcntl.h>
# include <netdb.h>
{
string domain;
string master;
+ bool operator<(const SuckRequest& b) const
+ {
+ return tie(domain, master) < tie(b.domain, b.master);
+ }
};
+struct IDTag{};
+
+typedef multi_index_container<
+ SuckRequest,
+ indexed_by<
+ sequenced<>,
+ ordered_unique<tag<IDTag>, identity<SuckRequest> >
+ >
+> UniQueue;
+
class NotificationQueue
{
public:
{
pthread_mutex_init(&d_lock,0);
pthread_mutex_init(&d_holelock,0);
-// sem_init(&d_suck_sem,0,0);
-// sem_init(&d_any_sem,0,0);
+
d_tickinterval=60;
d_masterschanged=d_slaveschanged=true;
}
time_t doNotifications();
- void go()
- {
- pthread_t tid;
- pthread_create(&tid,0,&launchhelper,this);
- }
-
+ void go();
+
+
void drillHole(const string &domain, const string &ip);
bool justNotified(const string &domain, const string &ip);
void addSuckRequest(const string &domain, const string &master, bool priority=false);
void notify(const string &domain, const string &ip);
void mainloop();
+ void retrievalLoopThread();
static void *launchhelper(void *p)
{
static_cast<CommunicatorClass *>(p)->mainloop();
return 0;
}
+ static void *retrieveLaunchhelper(void *p)
+ {
+ static_cast<CommunicatorClass *>(p)->retrievalLoopThread();
+ return 0;
+ }
bool notifyDomain(const string &domain);
private:
void makeNotifySocket();
int d_nsock;
map<pair<string,string>,time_t>d_holes;
pthread_mutex_t d_holelock;
+ void launchRetrievalThreads();
void suck(const string &domain, const string &remote);
void slaveRefresh(PacketHandler *P);
void masterUpdateCheck(PacketHandler *P);
pthread_mutex_t d_lock;
- std::deque<SuckRequest> d_suckdomains;
+
+ UniQueue d_suckdomains;
+
bool d_havepriosuckrequest;
Semaphore d_suck_sem;
Semaphore d_any_sem;
/*
PowerDNS Versatile Database Driven Nameserver
- Copyright (C) 2002 - 2006 PowerDNS.COM BV
+ Copyright (C) 2002 - 2011 PowerDNS.COM BV
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License version 2
SuckRequest sr;
sr.domain = domain;
sr.master = master;
-
+ pair<UniQueue::iterator, bool> res;
if(priority) {
- d_suckdomains.push_front(sr);
- // d_havepriosuckrequest=true;
+ res=d_suckdomains.push_front(sr);
+ }
+ else {
+ res=d_suckdomains.push_back(sr);
}
- else
- d_suckdomains.push_back(sr);
+ if(res.second) {
d_suck_sem.post();
- d_any_sem.post();
+ }
}
void CommunicatorClass::suck(const string &domain,const string &remote)
return 0;
}
- void deliverAnswer(DomainInfo& i, uint32_t serial, uint32_t usec)
+ void deliverAnswer(DomainInfo& i, uint32_t serial, unsigned int usec)
{
d_serials[i.id]=serial;
}
void CommunicatorClass::slaveRefresh(PacketHandler *P)
{
UeberBackend *B=dynamic_cast<UeberBackend *>(P->getBackend());
- vector<DomainInfo> sdomains;
- B->getUnfreshSlaveInfos(&sdomains);
+ vector<DomainInfo> sdomains, rdomains;
+ B->getUnfreshSlaveInfos(&rdomains);
+
+ {
+ Lock l(&d_lock);
+ typedef UniQueue::index<IDTag>::type domains_by_name_t;
+ domains_by_name_t& nameindex=boost::multi_index::get<IDTag>(d_suckdomains);
+
+
+ BOOST_FOREACH(DomainInfo& di, rdomains) {
+ SuckRequest sr;
+ sr.domain=di.zone;
+ if(di.masters.empty()) // slave domains w/o masters are ignored
+ continue;
+ // remove unfresh domains already queued for AXFR, no sense polling them again
+ sr.master=*di.masters.begin();
+ if(nameindex.count(sr))
+ continue;
+ sdomains.push_back(di);
+ }
+// cerr<<rdomains.size() - sdomains.size()<<" prevented"<<endl;
+ }
+
if(sdomains.empty())
{
- if(d_slaveschanged)
- L<<Logger::Warning<<"All slave domains are fresh"<<endl;
- d_slaveschanged=false;
+ if(d_slaveschanged) {
+ Lock l(&d_lock);
+ L<<Logger::Warning<<"No new unfresh slave domains, "<<d_suckdomains.size()<<" queued for AXFR already"<<endl;
+ }
+ d_slaveschanged = !rdomains.empty();
return;
}
- else
+ else {
+ Lock l(&d_lock);
L<<Logger::Warning<<sdomains.size()<<" slave domain"<<(sdomains.size()>1 ? "s" : "")<<" need"<<
(sdomains.size()>1 ? "" : "s")<<
- " checking"<<endl;
+ " checking, "<<d_suckdomains.size()<<" queued for AXFR"<<endl;
+ }
SlaveSenderReceiver ssr;
Inflighter<vector<DomainInfo>, SlaveSenderReceiver> ifl(sdomains, ssr);
ifl.run();
break;
}
- catch(exception& e) {
+ catch(std::exception& e) {
L<<Logger::Error<<"While checking domain freshness: " << e.what()<<endl;
}
catch(AhuException &re) {