static ArgvMap theArg;
return theArg;
}
-static int d_clientsock;
-static int d_prevclientsock;
-static vector<int> d_udpserversocks;
+
+static vector<int> g_udpserversocks;
typedef vector<int> tcpserversocks_t;
static tcpserversocks_t s_tcpserversocks;
return ret;
}
+int makeClientSocket()
+{
+ int ret=socket(AF_INET, SOCK_DGRAM, 0);
+ if(ret<0)
+ throw AhuException("Making a socket for resolver: "+stringerror());
+
+ static optional<struct sockaddr_in> sin;
+ int queryPort=0;
+ if(!sin) {
+ struct sockaddr_in tmp;
+ sin=tmp;
+ memset((char *)&*sin,0, sizeof(sin));
+ sin->sin_family = AF_INET;
+
+ if(!IpToU32(::arg()["query-local-address"], &sin->sin_addr.s_addr))
+ throw AhuException("Unable to resolve local address '"+ ::arg()["query-local-address"] +"'");
+
+ queryPort=::arg().asNum("query-local-port");
+ }
+
+ int tries=100;
+ while(--tries) {
+ uint16_t port=10000+Utility::random()%50000;
+ if(queryPort) {
+ port=queryPort;
+ tries=1;
+ }
+ sin->sin_port = htons(port);
+
+ if (::bind(ret, (struct sockaddr *)&*sin, sizeof(sin)) >= 0)
+ break;
+ }
+ if(!tries)
+ throw AhuException("Resolver binding to local query client socket: "+stringerror());
+
+ Utility::setNonBlocking(ret);
+ return ret;
+}
+
+// you can ask this class for a UDP socket to send a query from
+// this socket is not yours, don't even think about deleting it
+// but after you call 'returnSocket' on it, don't assume anything anymore
+class UDPClientSocks
+{
+ bool d_passthrough;
+ unsigned int d_numsocks;
+ unsigned int d_maxsocks;
+public:
+ UDPClientSocks() : d_passthrough(false) , d_numsocks(0), d_maxsocks(500)
+ {
+ }
+
+ typedef map<int,int> socks_t;
+ socks_t d_socks;
+
+ void setPassthrough(bool state)
+ {
+ if((d_passthrough=state)) {
+ pair<int, int> sock=make_pair(makeClientSocket(), 1);
+ d_socks.insert(sock);
+ }
+ }
+
+ int getSocket()
+ {
+ if(d_passthrough)
+ return d_socks.begin()->first;
+
+ if(d_numsocks < d_maxsocks) {
+ pair<int, int> sock=make_pair(makeClientSocket(), 1);
+ d_socks.insert(sock);
+ d_numsocks++;
+ return sock.first;
+ }
+ else {
+ socks_t::iterator pos=d_socks.begin();
+ advance(pos, random() % d_socks.size());
+ pos->second++;
+ return pos->first;
+ }
+ }
+
+ // return a socket to the pool, or simply erase it
+ void returnSocket(socks_t::iterator i)
+ {
+ if(d_passthrough) {
+ ++i;
+ return;
+ }
+
+ if(!--i->second) {
+ ::close(i->first);
+ d_socks.erase(i++);
+ --d_numsocks;
+ }
+ else {
+ ++i;
+ }
+ }
+}g_udpclientsocks;
+
/* these two functions are used by LWRes */
// -1 is error, > 1 is success
-int asendto(const char *data, int len, int flags, struct sockaddr *toaddr, int addrlen, int id)
+int asendto(const char *data, int len, int flags, struct sockaddr *toaddr, int addrlen, int id, int* fd)
{
- return sendto(d_clientsock, data, len, flags, toaddr, addrlen);
+ *fd=g_udpclientsocks.getSocket();
+ return sendto(*fd, data, len, flags, toaddr, addrlen);
}
// -1 is error, 0 is timeout, 1 is success
-int arecvfrom(char *data, int len, int flags, struct sockaddr *toaddr, Utility::socklen_t *addrlen, int *d_len, int id, const string& domain)
+int arecvfrom(char *data, int len, int flags, struct sockaddr *toaddr, Utility::socklen_t *addrlen, int *d_len, int id, const string& domain, int fd)
{
static optional<unsigned int> nearMissLimit;
if(!nearMissLimit)
nearMissLimit=::arg().asNum("spoof-nearmiss-max");
PacketID pident;
+ pident.fd=fd;
pident.id=id;
pident.domain=domain;
memcpy(&pident.remote, toaddr, sizeof(pident.remote));
s_rcc.listen(sockname);
}
-// this stuff is a tad complicated. There are two client sockets, the current one and the previous one (prevclientsocket)
-// if this function is called, and more than 5 seconds have passed since the previous call, the previous client socket is closed,
-// and replaced by the current one, which is then reopened.
-void remakeClientSocket()
-{
- static time_t lastChange;
-
- if(d_clientsock>=0 && !::arg()["query-local-port"].empty()) // already have a port, and we are fixed
- return;
-
- if(!lastChange)
- lastChange=time(0)-10;
-
- if(lastChange > time(0) - 5)
- return;
-
- lastChange=time(0);
-
- if(d_prevclientsock >= 0) {
- close(d_prevclientsock);
- }
- d_prevclientsock=d_clientsock;
-
- d_clientsock=socket(AF_INET, SOCK_DGRAM,0);
- if(d_clientsock<0)
- throw AhuException("Making a socket for resolver: "+stringerror());
- setReceiveBuffer(d_clientsock, 200000);
- struct sockaddr_in sin;
- memset((char *)&sin,0, sizeof(sin));
-
- sin.sin_family = AF_INET;
-
- if(!IpToU32(::arg()["query-local-address"], &sin.sin_addr.s_addr))
- throw AhuException("Unable to resolve local address '"+ ::arg()["query-local-address"] +"'");
-
- int tries=10;
- while(--tries) {
- uint16_t port;
- if(::arg()["query-local-port"].empty())
- port=10000+Utility::random()%50000;
- else {
- port=::arg().asNum("query-local-port");
- tries=1;
- }
- sin.sin_port = htons(port);
-
- if (::bind(d_clientsock, (struct sockaddr *)&sin, sizeof(sin)) >= 0)
- break;
-
- }
- if(!tries)
- throw AhuException("Resolver binding to local query client socket: "+stringerror());
-
- Utility::setNonBlocking(d_clientsock);
-
- // L<<Logger::Error<<"Sending UDP queries from "<<inet_ntoa(sin.sin_addr)<<":"<< ntohs(sin.sin_port) <<endl;
-}
void makeTCPServerSockets()
{
throw AhuException("Resolver binding to server socket for "+*i+": "+stringerror());
Utility::setNonBlocking(fd);
- d_udpserversocks.push_back(fd);
+ g_udpserversocks.push_back(fd);
L<<Logger::Error<<"Listening for UDP queries on "<<inet_ntoa(sin.sin_addr)<<":"<<::arg().asNum("local-port")<<endl;
}
}
const char* pos=packet+12;
unsigned char labellen;
string ret;
-
+ ret.reserve(len-12);
while((labellen=*pos++)) {
if(pos+labellen > end)
break;
::arg().set("max-tcp-per-client", "If set, maximum number of TCP sessions per client (IP address)")="0";
::arg().set("fork", "If set, fork the daemon for possible double performance")="no";
::arg().set("spoof-nearmiss-max", "If non-zero, assume spoofing after this many near misses")="20";
+ ::arg().set("single-socket", "If set, only use a single socket for outgoing queries")="off";
::arg().setCmd("help","Provide a helpful message");
L.toConsole(Logger::Warning);
fork();
L<<Logger::Warning<<"This is forked pid "<<getpid()<<endl;
}
- d_clientsock=d_prevclientsock=-1;
- remakeClientSocket();
+
makeControlChannelSocket();
unsigned int maxTCPPerClient=::arg().asNum("max-tcp-per-client");
+ if(::arg().parmIsset("query-local-port") || ::arg().mustDo("single-socket"))
+ g_udpclientsocks.setPassthrough(true);
+
for(;;) {
while(MT->schedule()); // housekeeping, let threads do their thing
if(!((counter++)%500)) {
- remakeClientSocket();
MT->makeThread(houseKeeping,0);
}
if(statsWanted) {
fd_set readfds, writefds;
FD_ZERO( &readfds );
FD_ZERO( &writefds );
- FD_SET( d_clientsock, &readfds );
- if(d_prevclientsock >= 0)
- FD_SET( d_prevclientsock, &readfds );
FD_SET( s_rcc.d_fd, &readfds);
- int fdmax=max(d_clientsock, s_rcc.d_fd);
+ int fdmax=s_rcc.d_fd;
+ for(UDPClientSocks::socks_t::iterator i=g_udpclientsocks.d_socks.begin(); i!=g_udpclientsocks.d_socks.end(); ++i) {
+ FD_SET( i->first, &readfds );
+ fdmax=max(fdmax, i->first);
+ }
if(!g_tcpconnections.empty())
gettimeofday(&now, 0);
}
sweeped.swap(g_tcpconnections);
- for(vector<int>::const_iterator i=d_udpserversocks.begin(); i!=d_udpserversocks.end(); ++i) {
+ for(vector<int>::const_iterator i=g_udpserversocks.begin(); i!=g_udpserversocks.end(); ++i) {
FD_SET( *i, &readfds );
fdmax=max(fdmax,*i);
}
command();
}
- for(int port=0; port < 2; ++port) {
- if(port && d_prevclientsock < 0)
- break;
- int sock = port ? d_prevclientsock : d_clientsock;
-
- if(FD_ISSET(sock,&readfds)) { // do we have a UDP question response from a server ("we are the client", hence d_clientsock)
- while((d_len=recvfrom(sock, data, sizeof(data), 0, (sockaddr *)&fromaddr, &addrlen)) >= 0) {
- dnsheader dh;
+ for(UDPClientSocks::socks_t::iterator i=g_udpclientsocks.d_socks.begin(); i!=g_udpclientsocks.d_socks.end(); i++ ) {
+ if(FD_ISSET(i->first, &readfds)) { // do we have a UDP question response from a server ("we are the client", hence clientsock)
+ while((d_len=recvfrom(i->first, data, sizeof(data), 0, (sockaddr *)&fromaddr, &addrlen)) >= 0) {
if((size_t) d_len >= sizeof(dnsheader)) {
+ dnsheader dh;
memcpy(&dh, data, sizeof(dh));
- if(dh.qr && dh.qdcount) {
+ if(!dh.qdcount) // UPC, Nominum?
+ continue;
+
+ if(dh.qr) {
pident.remote=fromaddr;
pident.id=dh.id;
+ pident.fd=i->first;
pident.domain=questionExpand(data, d_len);
string packet;
packet.assign(data, d_len);
}
}
}
- else
+ else
L<<Logger::Warning<<"Ignoring question on outgoing socket from "<< sockAddrToString((struct sockaddr_in*) &fromaddr, addrlen) <<endl;
}
else {
L<<Logger::Error<<"Unable to parse packet from remote UDP server "<< sockAddrToString((struct sockaddr_in*) &fromaddr, addrlen) <<": packet too small"<<endl;
}
}
+ g_udpclientsocks.returnSocket(i);
}
}
- for(vector<int>::const_iterator i=d_udpserversocks.begin(); i!=d_udpserversocks.end(); ++i) {
+ for(vector<int>::const_iterator i=g_udpserversocks.begin(); i!=g_udpserversocks.end(); ++i) {
if(FD_ISSET(*i,&readfds)) { // do we have a new question on udp?
while((d_len=recvfrom(*i, data, sizeof(data), 0, (sockaddr *)&fromaddr, &addrlen)) >= 0) {
// g_stats.queryrate.pulse(now); // (broken)