ComboAddress remote(remoteAddr, port);
connect(d_socket, (struct sockaddr*)&remote, remote.getSocklen());
d_oks = d_errors = d_nodatas = d_nxdomains = d_unknowns = 0;
- d_receiveds = d_receiveerrors = 0;
+ d_receiveds = d_receiveerrors = d_senderrors = 0;
for(unsigned int id =0 ; id < numeric_limits<uint16_t>::max(); ++id)
d_idqueue.push_back(id);
}
vector<uint8_t> packet;
DNSPacketWriter pw(packet, domain, QType::A);
-
+
+ if(d_idqueue.empty()) {
+ cerr<<"Exhausted ids!"<<endl;
+ exit(1);
+ }
pw.getHeader()->id = d_idqueue.front();
d_idqueue.pop_front();
pw.getHeader()->rd = 1;
pw.getHeader()->qr = 0;
- ::send(d_socket, &*packet.begin(), packet.size(), 0);
+ if(::send(d_socket, &*packet.begin(), packet.size(), 0) < 0)
+ d_senderrors++;
return pw.getHeader()->id;
}
}
id = mdp.d_header.id;
+ d_idqueue.push_back(id);
return 1;
}
}
}
unsigned int d_errors, d_nxdomains, d_nodatas, d_oks, d_unknowns;
- unsigned int d_receiveds, d_receiveerrors;
+ unsigned int d_receiveds, d_receiveerrors, d_senderrors;
};
Inflighter<vector<string>, SendReceive> inflighter(domains, sr);
inflighter.d_maxInFlight = 1000;
- inflighter.d_timeoutSeconds = 15;
+ inflighter.d_timeoutSeconds = 3;
string line;
pair<string, string> split;
cerr<<"Read "<<domains.size()<<" domains!"<<endl;
random_shuffle(domains.begin(), domains.end());
+ boost::format datafmt("%s %|20t|%+15s %|40t|%s %|60t|%+15s\n");
+
for(;;) {
try {
inflighter.run();
cerr<<"Caught exception: "<<e.what()<<endl;
}
}
- cerr<<"Results: "<<sr.d_errors<<" errors, "<<sr.d_oks<<" oks, "<<sr.d_nodatas<<" nodatas, "<<sr.d_nxdomains<<" nxdomains, "<<inflighter.getTimeouts()<<" timeouts"<<endl;
- cerr<<sr.d_unknowns<<" answers with an unknown status"<<endl;
-
- cerr<<domains.size() - (sr.d_errors + sr.d_oks + sr.d_nodatas + sr.d_nxdomains + inflighter.getTimeouts() + sr.d_unknowns)<<" status results missing"<<endl;
- cerr<<sr.d_receiveerrors<<" receive errors, "<<sr.d_receiveds<<" packets received correctly"<<endl;
- cerr<<inflighter.getUnexpecteds()<<" unexpected responses (probably seen as timeouts)"<<endl;
+ cerr<< datafmt % "Sending" % "" % "Receiving" % "";
+ cerr<< datafmt % " Queued " % domains.size() % " Received" % sr.d_receiveds;
+ cerr<< datafmt % " Error -/-" % sr.d_senderrors % " Timeouts" % inflighter.getTimeouts();
+ cerr<< datafmt % " " % "" % " Unexpected" % inflighter.getUnexpecteds();
+
+ cerr<< datafmt % " Sent" % (domains.size() - sr.d_senderrors) % " Total" % (sr.d_receiveds + inflighter.getTimeouts() + inflighter.getUnexpecteds());
+
+ cerr<<endl;
+ cerr<< datafmt % "DNS Status" % "" % "" % "";
+ cerr<< datafmt % " OK" % sr.d_oks % "" % "";
+ cerr<< datafmt % " Error" % sr.d_errors % "" % "";
+ cerr<< datafmt % " No Data" % sr.d_nodatas % "" % "";
+ cerr<< datafmt % " NXDOMAIN" % sr.d_nxdomains % "" % "";
+ cerr<< datafmt % " Unknowns" % sr.d_unknowns % "" % "";
+ cerr<< datafmt % "Answers" % (sr.d_oks + sr.d_errors + sr.d_nodatas + sr.d_nxdomains + sr.d_unknowns) % "" % "";
+ cerr<< datafmt % " Timeouts " % (inflighter.getTimeouts()) % "" % "";
+ cerr<< datafmt % "Total " % (sr.d_oks + sr.d_errors + sr.d_nodatas + sr.d_nxdomains + sr.d_unknowns + inflighter.getTimeouts()) % "" % "";
+
+ /*
+
+ cerr<<"Questions: "<<domains.size()<<", responses + network errors + timeouts: " <<
+ sr.d_receiveds <<" + " << sr.d_receiveerrors<<" + " << inflighter.getTimeouts()<< " = " <<
+ sr.d_receiveds + sr.d_receiveerrors + inflighter.getTimeouts() <<endl;
+
+ cerr<< "Unexpected responses "<< inflighter.getUnexpecteds() << endl;
+
+ cerr<<"DNS OK + DNS Error + NODATA + NXDOMAIN + Unknown: "<<
+ sr.d_oks << " + " << sr.d_errors << " + " << sr.d_nodatas << " + " << sr.d_nxdomains << " + " << sr.d_unknowns << " = " <<
+ sr.d_oks + sr.d_errors + sr.d_nodatas + sr.d_nxdomains + sr.d_unknowns << endl;
+
+ cerr<< "(" << domains.size() - (sr.d_errors + sr.d_oks + sr.d_nodatas + sr.d_nxdomains + inflighter.getTimeouts() + sr.d_unknowns)<<" status results missing)"<<endl;
+ */
+
}
d_iter = d_container.begin();
d_init=true;
}
- void run();
+
+ bool run(); //!< keep calling this as long as it returns 1, or if it throws an exception
unsigned int d_maxInFlight;
unsigned int d_timeoutSeconds;
uint64_t d_unexpectedResponse, d_timeouts;
};
-template<typename Container, typename SendReceive> void Inflighter<Container, SendReceive>::run()
+template<typename Container, typename SendReceive> bool Inflighter<Container, SendReceive>::run()
{
if(!d_init)
init();
- // cout << "Have "<<d_container.size() << " things to do!"<<endl;
-
for(;;) {
int burst = 0;
+
+ // 'send' as many items as allowed, limited by 'max in flight' and our burst parameter (which limits query rate growth)
while(d_iter != d_container.end() && d_ttdWatch.size() < d_maxInFlight) {
TTDItem ttdi;
ttdi.iter = d_iter++;
ttdi.id = d_sr.send(*ttdi.iter);
gettimeofday(&ttdi.ttd, 0);
ttdi.ttd.tv_sec += d_timeoutSeconds;
-
+ if(d_ttdWatch.count(ttdi.id)) {
+// cerr<<"DUPLICATE INSERT!"<<endl;
+ }
d_ttdWatch.insert(ttdi);
if(++burst == d_burst)
break;
}
int processed=0;
+
+
+ // if there are queries in flight, handle responses
if(!d_ttdWatch.empty()) {
// cerr<<"Have "<< d_ttdWatch.size() <<" queries in flight"<<endl;
typename SendReceive::Answer answer;
typename SendReceive::Identifier id;
+ // get as many answers as available - 'receive' should block for a short while to wait for an answer
while(d_sr.receive(id, answer)) {
- typename ttdwatch_t::iterator ival = d_ttdWatch.find(id);
- if(ival != d_ttdWatch.end()) {
+ typename ttdwatch_t::iterator ival = d_ttdWatch.find(id); // match up what we received to what we were waiting for
+
+ if(ival != d_ttdWatch.end()) { // found something!
++processed;
- // cerr<<"Received expected item with id '"<<id<<"' and value '"<<item<<"'"<<endl;
- d_sr.deliverAnswer(*ival->iter, answer);
+ d_sr.deliverAnswer(*ival->iter, answer); // deliver to sender/receiver
d_ttdWatch.erase(ival);
break; // we can send new questions!
}
}
- if(!processed) { // no new responses, time for some cleanup
+ if(!processed /* || d_ttdWatch.size() > 10000 */ ) { // no new responses, time for some cleanup of the ttdWatch
struct timeval now;
gettimeofday(&now, 0);
typedef typename ttdwatch_t::template index<TimeTag>::type waiters_by_ttd_index_t;
waiters_by_ttd_index_t& waiters_index = boost::multi_index::get<TimeTag>(d_ttdWatch);
+ // this provides a list of items sorted by age
for(typename waiters_by_ttd_index_t::iterator valiter = waiters_index.begin(); valiter != waiters_index.end(); ) {
if(valiter->ttd.tv_sec < now.tv_sec || (valiter->ttd.tv_sec == now.tv_sec && valiter->ttd.tv_usec < now.tv_usec)) {
d_sr.deliverTimeout(valiter->id); // so backend can release id
d_timeouts++;
}
else
- break;
+ break; // if this one was too new, rest will be too
}
}
}
if(d_ttdWatch.empty() && d_iter == d_container.end())
break;
}
+ return false;
}
#if 0