}
}
-TCPConnection::TCPConnection(int fd, const ComboAddress& addr) : data(2, 0), d_remote(addr), d_requestsInFlight(0), d_fd(fd)
+TCPConnection::TCPConnection(int fd, const ComboAddress& addr) : data(2, 0), d_remote(addr), d_fd(fd)
{
++s_currentConnections;
(*t_tcpClientCounts)[d_remote]++;
--s_currentConnections;
}
-int TCPConnection::s_maxInFlight = 10;
+uint16_t TCPConnection::s_maxInFlight = 10;
AtomicCounter TCPConnection::s_currentConnections;
else
hadError=false;
- // update tcp connection status, either by closing or moving to 'BYTE0'
+ // update tcp connection status, closing if needed and doing the fd multiplexer accounting
+ dc->d_tcpConnection->d_requestsInFlight--;
+
+ // In the code below, we try to remove the fd from the set, but
+ // we don't know if another mthread already did the remove, so we can get a
+ // "Tried to remove unlisted fd" exception. Not that an inflight < limit test
+ // will not work since we do not know if the other mthread got an error or not.
if(hadError) {
- // no need to remove us from FDM, we weren't there
+ try {
+ t_fdm->removeReadFD(dc->d_socket);
+ }
+ catch (FDMultiplexerException &) {
+ }
dc->d_socket = -1;
}
else {
dc->d_tcpConnection->queriesCount++;
if (g_tcpMaxQueriesPerConn && dc->d_tcpConnection->queriesCount >= g_tcpMaxQueriesPerConn) {
+ try {
+ t_fdm->removeReadFD(dc->d_socket);
+ }
+ catch (FDMultiplexerException &) {
+ }
dc->d_socket = -1;
}
else {
Utility::gettimeofday(&g_now, 0); // needs to be updated
struct timeval ttd = g_now;
- dc->d_tcpConnection->d_requestsInFlight--;
if (dc->d_tcpConnection->d_requestsInFlight == TCPConnection::s_maxInFlight - 1) {
- //cerr << "Reenabling... " << dc->d_tcpConnection->d_requestsInFlight << ' ' << dc->d_socket << endl;
+ //cerr << "Reenabling " << dc->d_socket << ' ' << dc->d_tcpConnection->d_requestsInFlight << endl;
ttd.tv_sec += g_tcpTimeout;
t_fdm->addReadFD(dc->d_socket, handleRunningTCPQuestion, dc->d_tcpConnection, &ttd);
} else {
- t_fdm->setReadTTD(dc->d_socket, ttd, g_tcpTimeout);
+ t_fdm->setReadTTD(dc->d_socket, ttd, g_tcpTimeout);
}
}
}
if(conn->bytesread==conn->qlen) {
conn->d_requestsInFlight++;
if (conn->d_requestsInFlight >= TCPConnection::s_maxInFlight) {
- //cerr << "Disabling... " << conn->d_requestsInFlight << ' ' << fd << endl;
+ //cerr << "Disabling " << fd << ' ' << conn->d_requestsInFlight << endl;
t_fdm->removeReadFD(fd); // should no longer awake ourselves when there is data to read
+ } else {
+ Utility::gettimeofday(&g_now, 0); // needed?
+ struct timeval ttd = g_now;
+ t_fdm->setReadTTD(fd, ttd, g_tcpTimeout);
}
std::unique_ptr<DNSComboWriter> dc;
enum stateenum {BYTE0, BYTE1, GETQUESTION, DONE} state{BYTE0};
uint16_t qlen{0};
uint16_t bytesread{0};
- std::atomic<int> d_requestsInFlight;
+ uint16_t d_requestsInFlight{0}; // number of mthreads spawned for this connection
static unsigned int getCurrentConnections() { return s_currentConnections; }
// The max number of concurent TCP queries we're willing to process
- static int s_maxInFlight;
+ static uint16_t s_maxInFlight;
private:
const int d_fd;
static AtomicCounter s_currentConnections; //!< total number of current TCP connections