#include "ahuexception.hh"
#include "statbag.hh"
#include "arguments.hh"
-
+#include "sstuff.hh"
+#include "syncres.hh"
LWRes::LWRes()
{
//! returns -1 for permanent error, 0 for timeout, 1 for success
/** Never throws! */
-int LWRes::asyncresolve(const string &ip, const char *domain, int type)
+int LWRes::asyncresolve(const string &ip, const char *domain, int type, bool doTCP)
{
DNSPacket p;
p.setQuestion(Opcode::Query,domain,type);
struct sockaddr_in toaddr;
struct in_addr inp;
+ Utility::socklen_t addrlen=sizeof(toaddr);
Utility::inet_aton(ip.c_str(),&inp);
toaddr.sin_addr.s_addr=inp.s_addr;
DTime dt;
dt.set();
- if(asendto(p.getData(), p.len, 0, (struct sockaddr*)(&toaddr), sizeof(toaddr),p.d.id)<0) {
- return -1;
+
+ if(!doTCP) {
+ if(asendto(p.getData(), p.len, 0, (struct sockaddr*)(&toaddr), sizeof(toaddr),p.d.id)<0) {
+ return -1;
+ }
+
+ // sleep until we see an answer to this, interface to mtasker
+
+ ret=arecvfrom(reinterpret_cast<char *>(d_buf), d_bufsize-1,0,(struct sockaddr*)(&toaddr), &addrlen, &d_len, p.d.id);
}
+ else {
+ Socket s(InterNetwork, Stream);
+ IPEndpoint ie(ip, 53);
+ s.setNonBlocking();
+ s.connect(ie);
+
+ int len=htons(p.len);
+ char *lenP=(char*)&len;
+ const char *msgP=p.getData();
+ string packet=string(lenP, lenP+2)+string(msgP, msgP+p.len);
+
+ if(asendtcp(packet, &s) == 0) {
+ cerr<<"asendtcp: timeout"<<endl;
+ return -1;
+ }
- Utility::socklen_t addrlen=sizeof(toaddr);
-
- // sleep until we see an answer to this, interface to mtasker
-
- ret=arecvfrom(reinterpret_cast<char *>(d_buf), d_bufsize-1,0,(struct sockaddr*)(&toaddr), &addrlen, &d_len, p.d.id);
- d_usec=dt.udiff();
+ packet.clear();
+ if(arecvtcp(packet,2, &s)==0) {
+ cerr<<"arecvtcp: timeout"<<endl;
+ return -1;
+ }
+
+ memcpy(&len, packet.c_str(), 2);
+ len=ntohs(len);
+
+ // cerr<<"Now reading "<<len<<" bytes"<<endl;
+
+ if(arecvtcp(packet, len, &s)==0) {
+ cerr<<"arecvtcp: timeout"<<endl;
+ return -1;
+ }
+
+ memcpy(d_buf, packet.c_str(), len);
+ d_len=len;
+ ret=1;
+ }
+ d_usec=dt.udiff();
return ret;
}
if(p.parse((char *)d_buf, d_len)<0)
throw LWResException("resolver: unable to parse packet of "+itoa(d_len)+" bytes");
d_aabit=p.d.aa;
+ d_aabit=p.d.tc;
d_rcode=p.d.rcode;
return p.getAnswers();
}
#include "syncres.hh"
#include <fcntl.h>
#include <fstream>
+#include "sstuff.hh"
+#include <boost/tuple/tuple.hpp>
+#include <boost/tuple/tuple_comparison.hpp>
+
+using namespace boost;
+
#include "recursor_cache.hh"
#ifdef __FreeBSD__ // see cvstrac ticket #26
static vector<int> d_udpserversocks;
static vector<int> d_tcpserversocks;
+
struct PacketID
{
- u_int16_t id;
- struct sockaddr_in remote;
+ PacketID() : sock(0), inNeeded(0), outPos(0)
+ {}
+
+ u_int16_t id; // wait for a specific id/remote paie
+ struct sockaddr_in remote; // this is the remote
+
+ Socket* sock; // or wait for an event on a TCP fd
+ int inNeeded; // if this is set, we'll read until inNeeded bytes are read
+ string inMSG; // they'll go here
+
+ string outMSG; // the outgoing message that needs to be sent
+ int outPos; // how far we are along in the outMSG
+
+ bool operator<(const PacketID& b) const
+ {
+ int ourSock= sock ? sock->getHandle() : 0;
+ int bSock = b.sock ? b.sock->getHandle() : 0;
+ return
+ tie(id, remote.sin_addr.s_addr, remote.sin_port, ourSock) <
+ tie(b.id, b.remote.sin_addr.s_addr, b.remote.sin_port, bSock);
+ }
};
-bool operator<(const PacketID& a, const PacketID& b)
+static map<int,PacketID> d_tcpclientreadsocks, d_tcpclientwritesocks;
+
+MTasker<PacketID,string>* MT;
+
+int asendtcp(const string& data, Socket* sock)
+{
+ PacketID pident;
+ pident.sock=sock;
+ pident.outMSG=data;
+ string packet;
+
+ // cerr<<"asendtcp called for "<<data.size()<<" bytes"<<endl;
+ d_tcpclientwritesocks[sock->getHandle()]=pident;
+
+ if(!MT->waitEvent(pident,&packet,1)) { // timeout
+ d_tcpclientwritesocks.erase(sock->getHandle());
+ return 0;
+ }
+ // cerr<<"asendtcp happy"<<endl;
+ return 1;
+}
+
+int arecvtcp(string& data, int len, Socket* sock)
{
- if(a.id<b.id)
- return true;
-
- if(a.id==b.id) {
- if(a.remote.sin_addr.s_addr < b.remote.sin_addr.s_addr)
- return true;
- if(a.remote.sin_addr.s_addr == b.remote.sin_addr.s_addr)
- if(a.remote.sin_port < b.remote.sin_port)
- return true;
+ data="";
+ PacketID pident;
+ pident.sock=sock;
+ pident.inNeeded=len;
+
+ // cerr<<"arecvtcp called for "<<len<<" bytes"<<endl;
+ // cerr<<d_tcpclientwritesocks.size()<<" write sockets"<<endl;
+ d_tcpclientreadsocks[sock->getHandle()]=pident;
+
+ if(!MT->waitEvent(pident,&data,1)) { // timeout
+ d_tcpclientreadsocks.erase(sock->getHandle());
+ return 0;
}
- return false;
+ // cerr<<"arecvtcp happy, data.size(): "<<data.size()<<endl;
+ return 1;
}
-MTasker<PacketID,string>* MT;
/* these two functions are used by LWRes */
int asendto(const char *data, int len, int flags, struct sockaddr *toaddr, int addrlen, int id)
if(!quiet) {
L<<Logger::Error<<"["<<MT->getTid()<<"] answer to "<<(P.d.rd?"":"non-rd ")<<"question '"<<P.qdomain<<"|"<<P.qtype.getName();
L<<"': "<<ntohs(R->d.ancount)<<" answers, "<<ntohs(R->d.arcount)<<" additional, took "<<sr.d_outqueries<<" packets, "<<
- sr.d_throttledqueries<<" throttled, "<<sr.d_timeouts<<" timeouts, rcode="<<res<<endl;
+ sr.d_throttledqueries<<" throttled, "<<sr.d_timeouts<<" timeouts, "<<sr.d_tcpoutqueries<<" tcp connections, rcode="<<res<<endl;
}
sr.d_outqueries ? RC.cacheMisses++ : RC.cacheHits++;
L<<Logger::Error<<", outpacket/query ratio "<<(int)(SyncRes::s_outqueries*100.0/SyncRes::s_queries)<<"%";
L<<Logger::Error<<", "<<(int)(SyncRes::s_throttledqueries*100.0/(SyncRes::s_outqueries+SyncRes::s_throttledqueries))<<"% throttled, "
<<SyncRes::s_nodelegated<<" no-delegation drops"<<endl;
- L<<Logger::Error<<"stats: "<<MT->numProcesses()<<" queries running, "<<SyncRes::s_outgoingtimeouts<<" outgoing timeouts"<<endl;
+ L<<Logger::Error<<"stats: "<<SyncRes::s_tcpoutqueries<<" outgoing tcp connections, "<<MT->numProcesses()<<" queries running, "<<SyncRes::s_outgoingtimeouts<<" outgoing timeouts"<<endl;
}
else if(statsWanted)
L<<Logger::Error<<"stats: no stats yet!"<<endl;
tv.tv_sec=0;
tv.tv_usec=500000;
- fd_set readfds;
+ fd_set readfds, writefds;
FD_ZERO( &readfds );
+ FD_ZERO( &writefds );
FD_SET( d_clientsock, &readfds );
int fdmax=d_clientsock;
FD_SET( *i, &readfds );
fdmax=max(fdmax,*i);
}
+ for(map<int,PacketID>::const_iterator i=d_tcpclientreadsocks.begin(); i!=d_tcpclientreadsocks.end(); ++i) {
+ // cerr<<"Adding TCP socket "<<i->first<<" to read select set"<<endl;
+ FD_SET( i->first, &readfds );
+ fdmax=max(fdmax,i->first);
+ }
+
+ for(map<int,PacketID>::const_iterator i=d_tcpclientwritesocks.begin(); i!=d_tcpclientwritesocks.end(); ++i) {
+ // cerr<<"Adding TCP socket "<<i->first<<" to write select set"<<endl;
+ FD_SET( i->first, &writefds );
+ fdmax=max(fdmax,i->first);
+ }
- int selret = select( fdmax + 1, &readfds, NULL, NULL, &tv );
+ int selret = select( fdmax + 1, &readfds, &writefds, NULL, &tv );
if(selret<=0)
if (selret == -1 && errno!=EINTR)
throw AhuException("Select returned: "+stringerror());
}
else {
if(P.d.qr) {
-
pident.remote=fromaddr;
pident.id=P.d.id;
string packet;
}
}
+ for(map<int,PacketID>::iterator i=d_tcpclientreadsocks.begin(); i!=d_tcpclientreadsocks.end();) {
+ if(FD_ISSET(i->first, &readfds)) { // can we receive
+ // cerr<<"Something happened on our socket when we wanted to read "<<i->first<<endl;
+ // cerr<<"inMSG.size(): "<<i->second.inMSG.size()<<endl;
+ char buffer[i->second.inNeeded];
+ int ret=read(i->first, buffer, min(i->second.inNeeded,200));
+ // cerr<<"Read returned "<<ret<<endl;
+ if(ret > 0) {
+ i->second.inMSG.append(buffer, buffer+ret);
+ i->second.inNeeded-=ret;
+ if(!i->second.inNeeded) {
+ // cerr<<"Got entire load of "<<i->second.inMSG.size()<<" bytes"<<endl;
+ PacketID pid=i->second;
+ string msg=i->second.inMSG;
+
+ d_tcpclientreadsocks.erase((i++));
+ MT->sendEvent(pid, &msg); // XXX DODGY
+ }
+ else {
+ // cerr<<"Still have "<<i->second.inNeeded<<" left to go"<<endl;
+ ++i;
+ }
+ }
+ else {
+ cerr<<"when reading ret="<<ret<<endl;
+ ++i;
+ }
+ }
+ else
+ ++i;
+
+ }
+
+ for(map<int,PacketID>::iterator i=d_tcpclientwritesocks.begin(); i!=d_tcpclientwritesocks.end(); ) {
+ if(FD_ISSET(i->first, &writefds)) { // can we send over TCP
+ // cerr<<"Socket "<<i->first<<" available for writing"<<endl;
+ int ret=write(i->first, i->second.outMSG.c_str(), i->second.outMSG.size() - i->second.outPos);
+ if(ret > 0) {
+ i->second.outPos+=ret;
+ if(i->second.outPos==i->second.outMSG.size()) {
+ // cerr<<"Sent out entire load of "<<i->second.outMSG.size()<<" bytes"<<endl;
+ PacketID pid=i->second;
+ d_tcpclientwritesocks.erase((i++));
+ MT->sendEvent(pid, 0);
+ // cerr<<"Sent event too"<<endl;
+ }
+ else
+ ++i;
+ }
+ else {
+ ++i;
+ cerr<<"ret="<<ret<<" when writing"<<endl;
+ }
+ }
+ else
+ ++i;
+ }
+
+
for(vector<TCPConnection>::iterator i=tcpconnections.begin();i!=tcpconnections.end();++i) {
if(FD_ISSET(i->fd, &readfds)) {
if(i->state==TCPConnection::BYTE0) {
unsigned int SyncRes::s_queries;
unsigned int SyncRes::s_outgoingtimeouts;
unsigned int SyncRes::s_outqueries;
+unsigned int SyncRes::s_tcpoutqueries;
unsigned int SyncRes::s_throttledqueries;
unsigned int SyncRes::s_nodelegated;
bool SyncRes::s_log;
}
}
-
if(!(res=doResolveAt(nsset,subdomain,qname,qtype,ret,depth, beenthere)))
return 0;
}
L<<endl;
}
-
-
return rnameservers;
}
}
LOG<<prefix<<qname<<": Resolved '"+auth+"' NS "<<*tns<<" to "<<remoteIP<<", asking '"<<qname<<"|"<<qtype.getName()<<"'"<<endl;
+ bool doTCP=false;
+
if(s_throttle.shouldThrottle(d_now, remoteIP+"|"+qname+"|"+qtype.getName())) {
LOG<<prefix<<qname<<": query throttled "<<endl;
s_throttledqueries++;
else {
s_outqueries++;
d_outqueries++;
- int ret=d_lwr.asyncresolve(remoteIP,qname.c_str(),qtype.getCode());
- if(ret != 1) { // <- we go out on the wire!
+ TryTCP:
+ if(doTCP) {
+ s_tcpoutqueries++;
+ d_tcpoutqueries++;
+ }
+
+ int ret=d_lwr.asyncresolve(remoteIP, qname.c_str(), qtype.getCode(), doTCP); // <- we go out on the wire!
+ if(ret != 1) {
if(ret==0) {
LOG<<prefix<<qname<<": timeout resolving"<<endl;
d_timeouts++;
// for ANY answers we *must* have an authoritive answer
else if(i->d_place==DNSResourceRecord::ANSWER && toLower(i->qname)==toLower(qname) &&
(((i->qtype==qtype) || (i->qtype.getCode()>1024 && i->qtype.getCode()-1024==qtype.getCode())) || ( qtype==QType(QType::ANY) &&
-
d_lwr.d_aabit))) {
if(i->qtype.getCode() < 1024) {
LOG<<prefix<<qname<<": answer is in: resolved to '"<< i->content<<"|"<<i->qtype.getName()<<"'"<<endl;
return doResolve(newtarget, qtype, ret,0,beenthere2);
}
if(nsset.empty() && !d_lwr.d_rcode) {
+ if(!negindic && d_lwr.d_tcbit) {
+ if(!doTCP) {
+ doTCP=true;
+ LOG<<prefix<<qname<<": status=noerror, truncated bit set, no negative SOA, retrying via TCP"<<endl;
+ goto TryTCP;
+ }
+ LOG<<prefix<<qname<<": status=noerror, truncated bit set, over TCP?"<<endl;
+ continue;
+ }
LOG<<prefix<<qname<<": status=noerror, other types may exist, but we are done "<<(negindic ? "(have negative SOA)" : "")<<endl;
return 0;
}