From: Otto Moerbeek Date: Wed, 9 Oct 2019 08:35:00 +0000 (+0200) Subject: - Fix multiplexer accounting in the write error case X-Git-Tag: dnsdist-1.4.0-rc4~8^2~5 X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=b5b94bebe20ed9ff0fd5fa06bb937b1eaf6c6112;p=pdns - Fix multiplexer accounting in the write error case - Use proper type for in-flight accounting --- diff --git a/pdns/pdns_recursor.cc b/pdns/pdns_recursor.cc index e5a781134..99cc7273a 100644 --- a/pdns/pdns_recursor.cc +++ b/pdns/pdns_recursor.cc @@ -746,7 +746,7 @@ static void writePid(void) } } -TCPConnection::TCPConnection(int fd, const ComboAddress& addr) : data(2, 0), d_remote(addr), d_requestsInFlight(0), d_fd(fd) +TCPConnection::TCPConnection(int fd, const ComboAddress& addr) : data(2, 0), d_remote(addr), d_fd(fd) { ++s_currentConnections; (*t_tcpClientCounts)[d_remote]++; @@ -767,7 +767,7 @@ TCPConnection::~TCPConnection() --s_currentConnections; } -int TCPConnection::s_maxInFlight = 10; +uint16_t TCPConnection::s_maxInFlight = 10; AtomicCounter TCPConnection::s_currentConnections; @@ -1750,27 +1750,41 @@ static void startDoResolve(void *p) else hadError=false; - // update tcp connection status, either by closing or moving to 'BYTE0' + // update tcp connection status, closing if needed and doing the fd multiplexer accounting + dc->d_tcpConnection->d_requestsInFlight--; + + // In the code below, we try to remove the fd from the set, but + // we don't know if another mthread already did the remove, so we can get a + // "Tried to remove unlisted fd" exception. Not that an inflight < limit test + // will not work since we do not know if the other mthread got an error or not. if(hadError) { - // no need to remove us from FDM, we weren't there + try { + t_fdm->removeReadFD(dc->d_socket); + } + catch (FDMultiplexerException &) { + } dc->d_socket = -1; } else { dc->d_tcpConnection->queriesCount++; if (g_tcpMaxQueriesPerConn && dc->d_tcpConnection->queriesCount >= g_tcpMaxQueriesPerConn) { + try { + t_fdm->removeReadFD(dc->d_socket); + } + catch (FDMultiplexerException &) { + } dc->d_socket = -1; } else { Utility::gettimeofday(&g_now, 0); // needs to be updated struct timeval ttd = g_now; - dc->d_tcpConnection->d_requestsInFlight--; if (dc->d_tcpConnection->d_requestsInFlight == TCPConnection::s_maxInFlight - 1) { - //cerr << "Reenabling... " << dc->d_tcpConnection->d_requestsInFlight << ' ' << dc->d_socket << endl; + //cerr << "Reenabling " << dc->d_socket << ' ' << dc->d_tcpConnection->d_requestsInFlight << endl; ttd.tv_sec += g_tcpTimeout; t_fdm->addReadFD(dc->d_socket, handleRunningTCPQuestion, dc->d_tcpConnection, &ttd); } else { - t_fdm->setReadTTD(dc->d_socket, ttd, g_tcpTimeout); + t_fdm->setReadTTD(dc->d_socket, ttd, g_tcpTimeout); } } } @@ -2010,8 +2024,12 @@ static void handleRunningTCPQuestion(int fd, FDMultiplexer::funcparam_t& var) if(conn->bytesread==conn->qlen) { conn->d_requestsInFlight++; if (conn->d_requestsInFlight >= TCPConnection::s_maxInFlight) { - //cerr << "Disabling... " << conn->d_requestsInFlight << ' ' << fd << endl; + //cerr << "Disabling " << fd << ' ' << conn->d_requestsInFlight << endl; t_fdm->removeReadFD(fd); // should no longer awake ourselves when there is data to read + } else { + Utility::gettimeofday(&g_now, 0); // needed? + struct timeval ttd = g_now; + t_fdm->setReadTTD(fd, ttd, g_tcpTimeout); } std::unique_ptr dc; diff --git a/pdns/syncres.hh b/pdns/syncres.hh index c4e0a605d..67689a058 100644 --- a/pdns/syncres.hh +++ b/pdns/syncres.hh @@ -1012,10 +1012,10 @@ public: enum stateenum {BYTE0, BYTE1, GETQUESTION, DONE} state{BYTE0}; uint16_t qlen{0}; uint16_t bytesread{0}; - std::atomic d_requestsInFlight; + uint16_t d_requestsInFlight{0}; // number of mthreads spawned for this connection static unsigned int getCurrentConnections() { return s_currentConnections; } // The max number of concurent TCP queries we're willing to process - static int s_maxInFlight; + static uint16_t s_maxInFlight; private: const int d_fd; static AtomicCounter s_currentConnections; //!< total number of current TCP connections