From 09e6702a5f9317b2bd72c742f21e1687ab9018c9 Mon Sep 17 00:00:00 2001 From: Bert Hubert Date: Sun, 16 Apr 2006 19:14:33 +0000 Subject: [PATCH] move to mplexer git-svn-id: svn://svn.powerdns.com/pdns/trunk/pdns@710 d19b8d6e-7fed-0310-83ef-9ca221ded41b --- pdns/Makefile.am | 3 +- pdns/pdns_recursor.cc | 662 ++++++++++++++++++++---------------------- 2 files changed, 321 insertions(+), 344 deletions(-) diff --git a/pdns/Makefile.am b/pdns/Makefile.am index edbfbf283..15e1424f2 100644 --- a/pdns/Makefile.am +++ b/pdns/Makefile.am @@ -77,7 +77,6 @@ dnsscan_SOURCES=dnsscan.cc misc.cc qtype.cc anadns.hh \ rcpgenerator.cc rcpgenerator.hh base64.cc base64.hh dnswriter.cc dnswriter.hh \ unix_utility.cc utility.hh - dnsreplay_mindex_SOURCES=dnsreplay-mindex.cc misc.cc qtype.cc anadns.hh \ logger.cc statbag.cc dnspcap.cc dnspcap.hh dnsparser.cc dnsrecords.cc dnsparser.hh \ rcpgenerator.cc rcpgenerator.hh base64.cc base64.hh dnswriter.cc dnswriter.hh \ @@ -102,7 +101,7 @@ logger.cc statbag.cc arguments.cc lwres.cc pdns_recursor.cc lwres.hh \ mtasker.hh syncres.hh recursor_cache.cc recursor_cache.hh dnsparser.cc \ dnswriter.cc dnswriter.hh dnsrecords.cc dnsrecords.hh rcpgenerator.cc rcpgenerator.hh \ base64.cc base64.hh zoneparser-tng.cc zoneparser-tng.hh rec_channel.cc rec_channel.hh \ -rec_channel_rec.cc +rec_channel_rec.cc selectmplexer.cc if NEDMALLOC pdns_recursor_SOURCES += ext/nedmalloc/malloc.c diff --git a/pdns/pdns_recursor.cc b/pdns/pdns_recursor.cc index 10b78da75..73d483dba 100644 --- a/pdns/pdns_recursor.cc +++ b/pdns/pdns_recursor.cc @@ -49,12 +49,16 @@ #include "rec_channel.hh" #include "logger.hh" #include "iputils.hh" +#include "mplexer.hh" #ifndef RECURSOR #include "statbag.hh" StatBag S; #endif +FDMultiplexer* g_fdm; +unsigned int g_maxTCPPerClient; +bool g_logCommonErrors; using namespace boost; #ifdef __FreeBSD__ // see cvstrac ticket #26 @@ -119,16 +123,15 @@ ArgvMap &arg() return theArg; } -static vector g_udpserversocks; - +struct timeval g_now; typedef vector tcpserversocks_t; -static tcpserversocks_t s_tcpserversocks; - -static map d_tcpclientreadsocks, d_tcpclientwritesocks; typedef MTasker MT_t; MT_t* MT; + +void handleTCPClientWritable(int fd, boost::any& var); + // -1 is error, 0 is timeout, 1 is success int asendtcp(const string& data, Socket* sock) { @@ -136,21 +139,22 @@ int asendtcp(const string& data, Socket* sock) pident.sock=sock; pident.outMSG=data; - d_tcpclientwritesocks[sock->getHandle()]=pident; + g_fdm->addWriteFD(sock->getHandle(), handleTCPClientWritable, pident); string packet; int ret=MT->waitEvent(pident,&packet,1); if(!ret || ret==-1) { // timeout - d_tcpclientwritesocks.erase(sock->getHandle()); + g_fdm->removeWriteFD(sock->getHandle()); } else if(packet.size() !=data.size()) { // main loop tells us what it sent out, or empty in case of an error return -1; } - return ret; } +void handleTCPClientReadable(int fd, boost::any& var); + // -1 is error, 0 is timeout, 1 is success int arecvtcp(string& data, int len, Socket* sock) { @@ -159,13 +163,13 @@ int arecvtcp(string& data, int len, Socket* sock) PacketID pident; pident.sock=sock; pident.inNeeded=len; - // cerr<<"Adding fd to clientreadsocks: "<getHandle()<getHandle()]=pident; + // cerr<<"Adding fd to clientreadsocks: "<getHandle()<<", needed: "<addReadFD(sock->getHandle(), handleTCPClientReadable, pident); int ret=MT->waitEvent(pident,&data,1); // cerr<<"ret in arecvtcp: "<getHandle()); + g_fdm->removeReadFD(sock->getHandle()); } else if(data.empty()) {// error, EOF or other return -1; @@ -214,6 +218,8 @@ int makeClientSocket() return ret; } +void handleUDPServerResponse(int fd, boost::any&); + // you can ask this class for a UDP socket to send a query from // this socket is not yours, don't even think about deleting it // but after you call 'returnSocket' on it, don't assume anything anymore @@ -235,6 +241,7 @@ public: if((d_passthrough=state)) { pair sock=make_pair(makeClientSocket(), 1); d_socks.insert(sock); + g_fdm->addReadFD(sock.first, handleUDPServerResponse); d_numsocks=1; } } @@ -248,6 +255,7 @@ public: pair sock=make_pair(makeClientSocket(), 1); d_socks.insert(sock); d_numsocks++; + g_fdm->addReadFD(sock.first, handleUDPServerResponse); return sock.first; } else { @@ -273,7 +281,9 @@ public: } if(!--i->second) { + g_fdm->removeReadFD(i->first); ::close(i->first); + d_socks.erase(i++); --d_numsocks; } @@ -316,8 +326,9 @@ int arecvfrom(char *data, int len, int flags, struct sockaddr *toaddr, Utility:: return -1; } } - else + else { g_udpclientsocks.returnSocket(fd); + } return ret; } @@ -424,8 +435,6 @@ struct TCPConnection } }; -vector g_tcpconnections; // all *running* TCP/IP questions (from clients) - void startDoResolve(void *p) { DNSComboWriter* dc=(DNSComboWriter *)p; @@ -435,7 +444,7 @@ void startDoResolve(void *p) if(dc->d_mdp.getEDNSOpts(&edo)) { maxudpsize=edo.d_packetsize; } - + vector ret; vector packet; @@ -520,7 +529,18 @@ void startDoResolve(void *p) L<getRemote()<<" for "<d_mdp.d_qname<<" (size="<< (2 + packet.size()) <<", sent "<removeReadFD(dc->d_socket); + close(dc->d_socket); + // i->closeAndCleanup(); // XXX we don't remove ourselves from the list anymore + } + + // XXX FIXME, need to restore resetting connection to BYTE0 in case of noerror! +#if 0 for(vector::iterator i=g_tcpconnections.begin();i!=g_tcpconnections.end();++i) { if(i->fd == dc->d_socket) { if(hadError) { @@ -534,8 +554,10 @@ void startDoResolve(void *p) break; } } +#endif } + if(!g_quiet) { L<getTid()<<"] answer to "<<(dc->d_mdp.d_header.rd?"":"non-rd ")<<"question '"<d_mdp.d_qname<<"|"<d_mdp.d_qtype); L<<"': "<ancount)<<" answers, "<arcount)<<" additional, took "<(var); + + if(conn.state==TCPConnection::BYTE0) { + int bytes=read(conn.fd,conn.data,2); + if(bytes==1) + conn.state=TCPConnection::BYTE1; + if(bytes==2) { + conn.qlen=(conn.data[0]<<8)+conn.data[1]; + conn.bytesread=0; + conn.state=TCPConnection::GETQUESTION; + } + if(!bytes || bytes < 0) { + g_fdm->removeReadFD(fd); + conn.closeAndCleanup(); + return; + } + } + else if(conn.state==TCPConnection::BYTE1) { + int bytes=read(conn.fd,conn.data+1,1); + if(bytes==1) { + conn.state=TCPConnection::GETQUESTION; + conn.qlen=(conn.data[0]<<8)+conn.data[1]; + conn.bytesread=0; + } + if(!bytes || bytes < 0) { + if(g_logCommonErrors) + L<removeReadFD(fd); + conn.closeAndCleanup(); + return; + } + } + else if(conn.state==TCPConnection::GETQUESTION) { + int bytes=read(conn.fd,conn.data + conn.bytesread,conn.qlen - conn.bytesread); + if(!bytes || bytes < 0) { + L<removeReadFD(fd); + conn.closeAndCleanup(); + + return; + } + conn.bytesread+=bytes; + if(conn.bytesread==conn.qlen) { + // conn.state=TCPConnection::DONE; // this makes us immune from timeouts, from now on *we* are responsible + conn.state=TCPConnection::BYTE0; // *wrong* - yes, we want to listen for a new question already, but we shouldn't timeout etc + DNSComboWriter* dc=0; + try { + dc=new DNSComboWriter(conn.data, conn.qlen, g_now); + } + catch(MOADNSException &mde) { + g_stats.clientParseError++; + L<removeReadFD(fd); + + conn.closeAndCleanup(); + return; + } + + dc->setSocket(conn.fd); + dc->d_tcp=true; + dc->setRemote((struct sockaddr *)&conn.remote,sizeof(conn.remote)); + if(dc->d_mdp.d_header.qr) + L<makeThread(startDoResolve, dc); + return; + } + } + } +} + +void handleNewTCPQuestion(int fd, boost::any& ) +{ + struct sockaddr_in addr; + socklen_t addrlen=sizeof(addr); + int newsock=accept(fd, (struct sockaddr*)&addr, &addrlen); + if(newsock>0) { + if(g_allowFrom && !g_allowFrom->match(&addr)) { + g_stats.unauthorizedTCP++; + close(newsock); + return; + } + + if(g_maxTCPPerClient && g_tcpClientCounts.count(addr.sin_addr.s_addr) && g_tcpClientCounts[addr.sin_addr.s_addr] >= g_maxTCPPerClient) { + g_stats.tcpClientOverflow++; + close(newsock); // don't call TCPConnection::closeAndCleanup here - did not enter it in the counts yet! + return; + } + g_tcpClientCounts[addr.sin_addr.s_addr]++; + Utility::setNonBlocking(newsock); + TCPConnection tc; + tc.fd=newsock; + tc.state=TCPConnection::BYTE0; + tc.remote=addr; + tc.startTime=g_now.tv_sec; + g_fdm->addReadFD(tc.fd, handleRunningTCPQuestion, tc); + } +} + void makeTCPServerSockets() { @@ -627,11 +752,48 @@ void makeTCPServerSockets() Utility::setNonBlocking(fd); setSendBuffer(fd, 65000); listen(fd, 128); - s_tcpserversocks.push_back(fd); + g_fdm->addReadFD(fd, handleNewTCPQuestion); L<= 0) { + if(g_allowFrom && !g_allowFrom->match(&fromaddr)) { + g_stats.unauthorizedUDP++; + continue; + } + + try { + DNSComboWriter* dc = new DNSComboWriter(data, d_len, g_now); + + dc->setRemote((struct sockaddr *)&fromaddr, addrlen); + + if(dc->d_mdp.d_header.qr) { + if(g_logCommonErrors) + L<getRemote()<<" on server socket!"<setSocket(fd); + dc->d_tcp=false; + MT->makeThread(startDoResolve, (void*) dc); + } + } + catch(MOADNSException& mde) { + g_stats.clientParseError++; + L<locals; @@ -662,7 +824,7 @@ void makeUDPServerSockets() throw AhuException("Resolver binding to server socket for "+*i+": "+stringerror()); Utility::setNonBlocking(fd); - g_udpserversocks.push_back(fd); + g_fdm->addReadFD(fd, handleNewUDPQuestion); L<(var); + // cerr<<"handleTCPClientReadable called for fd "<getHandle()< buffer(new char[pident.inNeeded]); + + int ret=read(fd, buffer.get(), pident.inNeeded); + if(ret > 0) { + pident.inMSG.append(&buffer[0], &buffer[ret]); + pident.inNeeded-=ret; + if(!pident.inNeeded) { + // cerr<<"Got entire load of "<removeReadFD(fd); + MT->sendEvent(pid, &msg); + } + else { + // cerr<<"Still have "<removeReadFD(fd); // pident might now be invalid (it isn't, but still) + string empty; + MT->sendEvent(tmp, &empty); // this conveys error status + } +} + +void handleTCPClientWritable(int fd, boost::any& var) +{ + PacketID& pid=any_cast(var); + + int ret=write(fd, pid.outMSG.c_str(), pid.outMSG.size() - pid.outPos); + if(ret > 0) { + pid.outPos+=ret; + if(pid.outPos==pid.outMSG.size()) { + PacketID tmp=pid; + g_fdm->removeWriteFD(fd); + MT->sendEvent(tmp, &tmp.outMSG); // send back what we sent to convey everything is ok + } + } + else { // error or EOF + g_fdm->removeWriteFD(fd); + string sent; + MT->sendEvent(pid, &sent); // we convey error status by sending empty string + } +} + +void handleUDPServerResponse(int fd, boost::any&) +{ + int d_len; + char data[1500]; + struct sockaddr_in fromaddr; + socklen_t addrlen=sizeof(fromaddr); + + while((d_len=recvfrom(fd, data, sizeof(data), 0, (sockaddr *)&fromaddr, &addrlen)) >= 0) { + if((size_t) d_len >= sizeof(dnsheader)) { + dnsheader dh; + memcpy(&dh, data, sizeof(dh)); + + if(!dh.qdcount) // UPC, Nominum? + continue; + + if(dh.qr) { + PacketID pident; + pident.remote=fromaddr; + pident.id=dh.id; + pident.fd=fd; + pident.domain=questionExpand(data, d_len); + string packet; + packet.assign(data, d_len); + if(!MT->sendEvent(pident, &packet)) { + // if(g_logCommonErrors) + // L<d_waiters.begin(); mthread!=MT->d_waiters.end(); ++mthread) { + if(pident.fd==mthread->key.fd && !memcmp(&mthread->key.remote.sin_addr, &pident.remote.sin_addr, sizeof(pident.remote.sin_addr)) && + !strcasecmp(pident.domain.c_str(), mthread->key.domain.c_str())) { + mthread->key.nearMisses++; + } + } + } + } + else + L< sweeped; + + for(vector::iterator i=g_tcpconnections.begin();i!=g_tcpconnections.end();++i) { + if(i->state==TCPConnection::DONE || g_now.tv_sec < i->startTime + tcpTimeout) { // don't timeout when we are working on the question! + sweeped.push_back(*i); + if(i->state!=TCPConnection::DONE) { // we don't listen for data when we are processing the question + FD_SET(i->fd, &readfds); + fdmax=max(fdmax,i->fd); + } + } + else { + if(g_logCommonErrors) + L<remote.sin_addr)<closeAndCleanup(); + } + } + sweeped.swap(g_tcpconnections); +#endif + int main(int argc, char **argv) { reportBasicTypes(); @@ -894,6 +1189,7 @@ int main(int argc, char **argv) } L.setName("pdns_recursor"); + g_fdm=getMultiplexer(); L<(100000); - char data[1500]; - struct sockaddr_in fromaddr; PacketID pident; primeHints(); @@ -982,17 +1276,19 @@ int main(int argc, char **argv) counter=0; - struct timeval now; unsigned int maxTcpClients=::arg().asNum("max-tcp-clients"); int tcpTimeout=::arg().asNum("client-tcp-timeout"); - unsigned int maxTCPPerClient=::arg().asNum("max-tcp-per-client"); + g_maxTCPPerClient=::arg().asNum("max-tcp-per-client"); if(!::arg()["query-local-port"].empty() || ::arg().mustDo("single-socket")) { L<addReadFD(s_rcc.d_fd, handleRCC); // control channel + for(;;) { while(MT->schedule()); // housekeeping, let threads do their thing @@ -1003,326 +1299,8 @@ int main(int argc, char **argv) doStats(); } - Utility::socklen_t addrlen=sizeof(fromaddr); - int d_len; - - struct timeval tv; - tv.tv_sec=0; - tv.tv_usec=500000; - - fd_set readfds, writefds; - FD_ZERO( &readfds ); - FD_ZERO( &writefds ); - - FD_SET( s_rcc.d_fd, &readfds); - int fdmax=s_rcc.d_fd; - for(UDPClientSocks::socks_t::iterator i=g_udpclientsocks.d_socks.begin(); i!=g_udpclientsocks.d_socks.end(); ++i) { - FD_SET( i->first, &readfds ); - fdmax=max(fdmax, i->first); - } - - if(!g_tcpconnections.empty()) - gettimeofday(&now, 0); - - vector sweeped; - - for(vector::iterator i=g_tcpconnections.begin();i!=g_tcpconnections.end();++i) { - if(i->state==TCPConnection::DONE || now.tv_sec < i->startTime + tcpTimeout) { // don't timeout when we are working on the question! - sweeped.push_back(*i); - if(i->state!=TCPConnection::DONE) { // we don't listen for data when we are processing the question - FD_SET(i->fd, &readfds); - fdmax=max(fdmax,i->fd); - } - } - else { - if(logCommonErrors) - L<remote.sin_addr)<closeAndCleanup(); - } - } - sweeped.swap(g_tcpconnections); - - for(vector::const_iterator i=g_udpserversocks.begin(); i!=g_udpserversocks.end(); ++i) { - FD_SET( *i, &readfds ); - fdmax=max(fdmax,*i); - } - if(g_tcpconnections.size() < maxTcpClients) - for(tcpserversocks_t::const_iterator i=s_tcpserversocks.begin(); i!=s_tcpserversocks.end(); ++i) { - FD_SET(*i, &readfds ); - fdmax=max(fdmax,*i); - } - - for(map::const_iterator i=d_tcpclientreadsocks.begin(); i!=d_tcpclientreadsocks.end(); ++i) { - // cerr<<"Adding TCP socket "<first<<" to read select set"<first, &readfds ); - fdmax=max(fdmax,i->first); - } - - for(map::const_iterator i=d_tcpclientwritesocks.begin(); i!=d_tcpclientwritesocks.end(); ++i) { - // cerr<<"Adding TCP socket "<first<<" to write select set"<first, &writefds ); - fdmax=max(fdmax,i->first); - } - - int selret = select( fdmax + 1, &readfds, &writefds, NULL, &tv ); - gettimeofday(&now, 0); - if(selret<=0) - if (selret == -1 && errno!=EINTR) - throw AhuException("Select returned: "+stringerror()); - else - continue; - - if(FD_ISSET(s_rcc.d_fd, &readfds)) { - string remote; - string msg=s_rcc.recv(&remote); - RecursorControlParser rcp; - RecursorControlParser::func_t* command; - string answer=rcp.getAnswer(msg, &command); - s_rcc.send(answer, &remote); - command(); - } - - // do we have a UDP question response from a server ("we are the client", hence clientsock) - for(UDPClientSocks::socks_t::iterator i=g_udpclientsocks.d_socks.begin(); i!=g_udpclientsocks.d_socks.end(); ) { - if(i->first <= fdmax && FD_ISSET(i->first, &readfds)) { - while((d_len=recvfrom(i->first, data, sizeof(data), 0, (sockaddr *)&fromaddr, &addrlen)) >= 0) { - if((size_t) d_len >= sizeof(dnsheader)) { - dnsheader dh; - memcpy(&dh, data, sizeof(dh)); - - if(!dh.qdcount) // UPC, Nominum? - continue; - - if(dh.qr) { - pident.remote=fromaddr; - pident.id=dh.id; - pident.fd=i->first; - pident.domain=questionExpand(data, d_len); - string packet; - packet.assign(data, d_len); - if(!MT->sendEvent(pident, &packet)) { -// if(logCommonErrors) -// L<d_waiters.begin(); mthread!=MT->d_waiters.end(); ++mthread) { - if(pident.fd==mthread->key.fd && !memcmp(&mthread->key.remote.sin_addr, &pident.remote.sin_addr, sizeof(pident.remote.sin_addr)) && - !strcasecmp(pident.domain.c_str(), mthread->key.domain.c_str())) { - mthread->key.nearMisses++; - } - } - } - } - else - L<::const_iterator i=g_udpserversocks.begin(); i!=g_udpserversocks.end(); ++i) { - if(FD_ISSET(*i,&readfds)) { // do we have a new question on udp? - while((d_len=recvfrom(*i, data, sizeof(data), 0, (sockaddr *)&fromaddr, &addrlen)) >= 0) { - // g_stats.queryrate.pulse(now); // (broken) - if(g_allowFrom && !g_allowFrom->match(&fromaddr)) { - g_stats.unauthorizedUDP++; - continue; - } - - try { - DNSComboWriter* dc = new DNSComboWriter(data, d_len, now); - - dc->setRemote((struct sockaddr *)&fromaddr, addrlen); - - if(dc->d_mdp.d_header.qr) { - if(logCommonErrors) - L<getRemote()<<" on server socket!"<setSocket(*i); - dc->d_tcp=false; - MT->makeThread(startDoResolve, (void*) dc); - } - } - catch(MOADNSException& mde) { - g_stats.clientParseError++; - L<0) { - if(g_allowFrom && !g_allowFrom->match(&addr)) { - g_stats.unauthorizedTCP++; - close(newsock); - continue; - } - - if(maxTCPPerClient && g_tcpClientCounts.count(addr.sin_addr.s_addr) && g_tcpClientCounts[addr.sin_addr.s_addr] >= maxTCPPerClient) { - g_stats.tcpClientOverflow++; - close(newsock); // don't call TCPConnection::closeAndCleanup here - did not enter it in the counts yet! - continue; - } - g_tcpClientCounts[addr.sin_addr.s_addr]++; - Utility::setNonBlocking(newsock); - TCPConnection tc; - tc.fd=newsock; - tc.state=TCPConnection::BYTE0; - tc.remote=addr; - tc.startTime=now.tv_sec; - g_tcpconnections.push_back(tc); - } - } - } - - // have any question answers come in over TCP? - for(map::iterator i=d_tcpclientreadsocks.begin(); i!=d_tcpclientreadsocks.end();) { - bool haveErased=false; - if(FD_ISSET(i->first, &readfds)) { // can we receive - shared_array buffer(new char[i->second.inNeeded]); - - int ret=read(i->first, buffer.get(), i->second.inNeeded); - if(ret > 0) { - i->second.inMSG.append(&buffer[0], &buffer[ret]); - i->second.inNeeded-=ret; - if(!i->second.inNeeded) { - // cerr<<"Got entire load of "<second.inMSG.size()<<" bytes"<second; - string msg=i->second.inMSG; - - d_tcpclientreadsocks.erase((i++)); - haveErased=true; - MT->sendEvent(pid, &msg); // XXX DODGY (why? msg is copied nicely by sendEvent) - } - else { - // cerr<<"Still have "<second.inNeeded<<" left to go"<second; - d_tcpclientreadsocks.erase((i++)); - haveErased=true; - string empty; - MT->sendEvent(pid, &empty); // this conveys error status - } - } - if(!haveErased) - ++i; - } - - // is there data we can send to remote nameservers over TCP? - for(map::iterator i=d_tcpclientwritesocks.begin(); i!=d_tcpclientwritesocks.end(); ) { - bool haveErased=false; - if(FD_ISSET(i->first, &writefds)) { // can we send over TCP - int ret=write(i->first, i->second.outMSG.c_str(), i->second.outMSG.size() - i->second.outPos); - if(ret > 0) { - i->second.outPos+=ret; - if(i->second.outPos==i->second.outMSG.size()) { - PacketID pid=i->second; - d_tcpclientwritesocks.erase(i++); // erase! - haveErased=true; - MT->sendEvent(pid, &pid.outMSG); // send back what we sent to convey everything is ok - } - } - else { // error or EOF - PacketID pid=i->second; - d_tcpclientwritesocks.erase(i++); - haveErased=true; - string sent; - MT->sendEvent(pid, &sent); // we convey error status by sending empty string - } - } - if(!haveErased) - ++i; - } - - // very braindead TCP incoming question parser - for(vector::iterator i=g_tcpconnections.begin();i!=g_tcpconnections.end();++i) { - if(FD_ISSET(i->fd, &readfds)) { - if(i->state==TCPConnection::BYTE0) { - int bytes=read(i->fd,i->data,2); - if(bytes==1) - i->state=TCPConnection::BYTE1; - if(bytes==2) { - i->qlen=(i->data[0]<<8)+i->data[1]; - i->bytesread=0; - i->state=TCPConnection::GETQUESTION; - } - if(!bytes || bytes < 0) { - i->closeAndCleanup(); - g_tcpconnections.erase(i); - break; - } - } - else if(i->state==TCPConnection::BYTE1) { - int bytes=read(i->fd,i->data+1,1); - if(bytes==1) { - i->state=TCPConnection::GETQUESTION; - i->qlen=(i->data[0]<<8)+i->data[1]; - i->bytesread=0; - } - if(!bytes || bytes < 0) { - if(logCommonErrors) - L<remote,sizeof(i->remote))<<" disconnected after first byte"<closeAndCleanup(); - g_tcpconnections.erase(i); - break; - } - } - else if(i->state==TCPConnection::GETQUESTION) { - int bytes=read(i->fd,i->data + i->bytesread,i->qlen - i->bytesread); - if(!bytes || bytes < 0) { - L<remote,sizeof(i->remote))<<" disconnected while reading question body"<closeAndCleanup(); - g_tcpconnections.erase(i); - break; - } - i->bytesread+=bytes; - if(i->bytesread==i->qlen) { - i->state=TCPConnection::DONE; // this makes us immune from timeouts, from now on *we* are responsible - DNSComboWriter* dc=0; - try { - dc=new DNSComboWriter(i->data, i->qlen, now); - } - catch(MOADNSException &mde) { - g_stats.clientParseError++; - L<remote,sizeof(i->remote))<closeAndCleanup(); - g_tcpconnections.erase(i); - break; - } - - dc->setSocket(i->fd); - dc->d_tcp=true; - dc->setRemote((struct sockaddr *)&i->remote,sizeof(i->remote)); - if(dc->d_mdp.d_header.qr) - L<makeThread(startDoResolve, dc); - break; - } - } - } - } - } + gettimeofday(&g_now, 0); // make this only happen if there are tcp clients XXX FIXME + g_fdm->run(&g_now); } } catch(AhuException &ae) { -- 2.49.0