From cff9aa03e45caa3d6aecaa8377daf09c5b12b1c0 Mon Sep 17 00:00:00 2001 From: Remi Gacogne Date: Fri, 5 Apr 2019 12:51:45 +0200 Subject: [PATCH] dnsdist: Add more TCP metrics Keep, for each frontend and backend: - the number of concurrent TCP connections - the average number of queries per connection - the average duration of a connection --- pdns/dnsdist-carbon.cc | 6 + pdns/dnsdist-lua-inspection.cc | 12 +- pdns/dnsdist-tcp.cc | 237 ++++++++++++++++++++++++--------- pdns/dnsdist-web.cc | 39 ++++-- pdns/dnsdist.hh | 22 ++- 5 files changed, 232 insertions(+), 84 deletions(-) diff --git a/pdns/dnsdist-carbon.cc b/pdns/dnsdist-carbon.cc index 9ad220f40..1fa49f584 100644 --- a/pdns/dnsdist-carbon.cc +++ b/pdns/dnsdist-carbon.cc @@ -100,6 +100,9 @@ try str<tcpGaveUp.load() << " " << now << "\r\n"; str<tcpReadTimeouts.load() << " " << now << "\r\n"; str<tcpWriteTimeouts.load() << " " << now << "\r\n"; + str<tcpCurrentConnections.load() << " " << now << "\r\n"; + str<tcpAvgQueriesPerConnection.load() << " " << now << "\r\n"; + str<tcpAvgConnectionDuration.load() << " " << now << "\r\n"; } for(const auto& front : g_frontends) { if (front->udpFD == -1 && front->tcpFD == -1) @@ -114,6 +117,9 @@ try str<tcpGaveUp.load() << " " << now << "\r\n"; str<tcpClientTimeouts.load() << " " << now << "\r\n"; str<tcpDownstreamTimeouts.load() << " " << now << "\r\n"; + str<tcpCurrentConnections.load() << " " << now << "\r\n"; + str<tcpAvgQueriesPerConnection.load() << " " << now << "\r\n"; + str<tcpAvgConnectionDuration.load() << " " << now << "\r\n"; } auto localPools = g_pools.getLocal(); for (const auto& entry : *localPools) { diff --git a/pdns/dnsdist-lua-inspection.cc b/pdns/dnsdist-lua-inspection.cc index 92fb41304..25d9a187d 100644 --- a/pdns/dnsdist-lua-inspection.cc +++ b/pdns/dnsdist-lua-inspection.cc @@ -562,24 +562,24 @@ void setupLuaInspection() ret << endl; ret << "Frontends:" << endl; - fmt = boost::format("%-3d %-20.20s %-25d %-25d %-25d %-25d %-25d"); - ret << (fmt % "#" % "Address" % "Died reading query" % "Died sending response" % "Gave up" % "Client timeouts" % "Downstream timeouts" ) << endl; + fmt = boost::format("%-3d %-20.20s %-20d %-20d %-25d %-20d %-20d %-20d %-20f %-20f"); + ret << (fmt % "#" % "Address" % "Connnections" % "Died reading query" % "Died sending response" % "Gave up" % "Client timeouts" % "Downstream timeouts" % "Avg queries/conn" % "Avg duration") << endl; size_t counter = 0; for(const auto& f : g_frontends) { - ret << (fmt % counter % f->local.toStringWithPort() % f->tcpDiedReadingQuery % f->tcpDiedSendingResponse % f->tcpGaveUp % f->tcpClientTimeouts % f->tcpDownstreamTimeouts) << endl; + ret << (fmt % counter % f->local.toStringWithPort() % f->tcpCurrentConnections % f->tcpDiedReadingQuery % f->tcpDiedSendingResponse % f->tcpGaveUp % f->tcpClientTimeouts % f->tcpDownstreamTimeouts % f->tcpAvgQueriesPerConnection % f->tcpAvgConnectionDuration) << endl; ++counter; } ret << endl; ret << "Backends:" << endl; - fmt = boost::format("%-3d %-20.20s %-20.20s %-25d %-25d %-25d %-25d %-25d"); - ret << (fmt % "#" % "Name" % "Address" % "Died sending query" % "Died reading response" % "Gave up" % "Read timeouts" % "Write timeouts" ) << endl; + fmt = boost::format("%-3d %-20.20s %-20.20s %-20d %-20d %-25d %-20d %-20d %-20d %-20f %-20f"); + ret << (fmt % "#" % "Name" % "Address" % "Connections" % "Died sending query" % "Died reading response" % "Gave up" % "Read timeouts" % "Write timeouts" % "Avg queries/conn" % "Avg duration") << endl; auto states = g_dstates.getLocal(); counter = 0; for(const auto& s : *states) { - ret << (fmt % counter % s->name % s->remote.toStringWithPort() % s->tcpDiedSendingQuery % s->tcpDiedReadingResponse % s->tcpGaveUp % s->tcpReadTimeouts % s->tcpWriteTimeouts) << endl; + ret << (fmt % counter % s->name % s->remote.toStringWithPort() % s->tcpCurrentConnections % s->tcpDiedSendingQuery % s->tcpDiedReadingResponse % s->tcpGaveUp % s->tcpReadTimeouts % s->tcpWriteTimeouts % s->tcpAvgQueriesPerConnection % s->tcpAvgConnectionDuration) << endl; ++counter; } diff --git a/pdns/dnsdist-tcp.cc b/pdns/dnsdist-tcp.cc index e9ee3e6e7..878f056a0 100644 --- a/pdns/dnsdist-tcp.cc +++ b/pdns/dnsdist-tcp.cc @@ -55,7 +55,6 @@ using std::atomic; Let's start naively. */ -static thread_local map>> t_downstreamSockets; static std::mutex tcpClientsCountMutex; static std::map tcpClientsCount; static const size_t g_maxCachedConnectionsPerDownstream = 20; @@ -63,10 +62,10 @@ uint64_t g_maxTCPQueuedConnections{1000}; size_t g_maxTCPQueriesPerConn{0}; size_t g_maxTCPConnectionDuration{0}; size_t g_maxTCPConnectionsPerClient{0}; +uint16_t g_downstreamTCPCleanupInterval{60}; bool g_useTCPSinglePipe{false}; -std::atomic g_downstreamTCPCleanupInterval{60}; -static std::unique_ptr setupTCPDownstream(shared_ptr& ds, uint16_t& downstreamFailures, int timeout) +static std::unique_ptr setupTCPDownstream(shared_ptr& ds, uint16_t& downstreamFailures) { std::unique_ptr result; @@ -86,10 +85,10 @@ static std::unique_ptr setupTCPDownstream(shared_ptr& d result->setNonBlocking(); #ifdef MSG_FASTOPEN if (!ds->tcpFastOpen) { - SConnectWithTimeout(result->getHandle(), ds->remote, timeout); + SConnectWithTimeout(result->getHandle(), ds->remote, /* no timeout, we will handle it ourselves */ 0); } #else - SConnectWithTimeout(result->getHandle(), ds->remote, timeout); + SConnectWithTimeout(result->getHandle(), ds->remote, /* no timeout, we will handle it ourselves */ 0); #endif /* MSG_FASTOPEN */ return result; } @@ -105,49 +104,109 @@ static std::unique_ptr setupTCPDownstream(shared_ptr& d return nullptr; } -static std::unique_ptr getConnectionToDownstream(std::shared_ptr& ds, uint16_t& downstreamFailures, bool& isFresh) +class TCPConnectionToBackend { - std::unique_ptr result; +public: + TCPConnectionToBackend(std::shared_ptr& ds, uint16_t& downstreamFailures, const struct timeval& now): d_ds(ds), d_connectionStartTime(now) + { + d_socket = setupTCPDownstream(d_ds, downstreamFailures); + ++d_ds->tcpCurrentConnections; + } - const auto& it = t_downstreamSockets.find(ds->remote); - if (it != t_downstreamSockets.end()) { + ~TCPConnectionToBackend() + { + if (d_ds && d_socket) { + --d_ds->tcpCurrentConnections; + struct timeval now; + gettimeofday(&now, nullptr); + + auto diff = now - d_connectionStartTime; + d_ds->updateTCPMetrics(d_queries, diff.tv_sec * 1000 + diff.tv_usec / 1000); + } + } + + int getHandle() const + { + if (!d_socket) { + throw std::runtime_error("Attempt to get the socket handle from a non-established TCP connection"); + } + + return d_socket->getHandle(); + } + + const ComboAddress& getRemote() const + { + return d_ds->remote; + } + + bool isFresh() const + { + return d_fresh; + } + + void incQueries() + { + ++d_queries; + } + + void setReused() + { + d_fresh = false; + } + +private: + std::unique_ptr d_socket{nullptr}; + std::shared_ptr d_ds{nullptr}; + struct timeval d_connectionStartTime; + uint64_t d_queries{0}; + bool d_fresh{true}; +}; + +static thread_local map>> t_downstreamConnections; + +static std::unique_ptr getConnectionToDownstream(std::shared_ptr& ds, uint16_t& downstreamFailures, const struct timeval& now) +{ + std::unique_ptr result; + + const auto& it = t_downstreamConnections.find(ds->remote); + if (it != t_downstreamConnections.end()) { auto& list = it->second; if (!list.empty()) { result = std::move(list.front()); list.pop_front(); - isFresh = false; + result->setReused(); return result; } } - isFresh = true; - return setupTCPDownstream(ds, downstreamFailures, 0); + return std::unique_ptr(new TCPConnectionToBackend(ds, downstreamFailures, now)); } -static void releaseDownstreamConnection(std::shared_ptr& ds, std::unique_ptr&& socket) +static void releaseDownstreamConnection(std::unique_ptr&& conn) { - if (socket == nullptr) { + if (conn == nullptr) { return; } - const auto& it = t_downstreamSockets.find(ds->remote); - if (it != t_downstreamSockets.end()) { + const auto& remote = conn->getRemote(); + const auto& it = t_downstreamConnections.find(remote); + if (it != t_downstreamConnections.end()) { auto& list = it->second; if (list.size() >= g_maxCachedConnectionsPerDownstream) { /* too many connections queued already */ - socket.reset(); + conn.reset(); return; } - list.push_back(std::move(socket)); + list.push_back(std::move(conn)); } else { - t_downstreamSockets[ds->remote].push_back(std::move(socket)); + t_downstreamConnections[remote].push_back(std::move(conn)); } } struct ConnectionInfo { - ConnectionInfo(): cs(nullptr), fd(-1) + ConnectionInfo(ClientState* cs_): cs(cs_), fd(-1) { } ConnectionInfo(ConnectionInfo&& rhs) @@ -178,6 +237,9 @@ struct ConnectionInfo close(fd); fd = -1; } + if (cs) { + --cs->tcpCurrentConnections; + } } ComboAddress remote; @@ -263,13 +325,13 @@ void TCPClientCollection::addTCPClientThread() static void cleanupClosedTCPConnections() { - for(auto dsIt = t_downstreamSockets.begin(); dsIt != t_downstreamSockets.end(); ) { - for (auto socketIt = dsIt->second.begin(); socketIt != dsIt->second.end(); ) { - if (*socketIt && isTCPSocketUsable((*socketIt)->getHandle())) { - ++socketIt; + for(auto dsIt = t_downstreamConnections.begin(); dsIt != t_downstreamConnections.end(); ) { + for (auto connIt = dsIt->second.begin(); connIt != dsIt->second.end(); ) { + if (*connIt && isTCPSocketUsable((*connIt)->getHandle())) { + ++connIt; } else { - socketIt = dsIt->second.erase(socketIt); + connIt = dsIt->second.erase(connIt); } } @@ -277,7 +339,7 @@ static void cleanupClosedTCPConnections() ++dsIt; } else { - dsIt = t_downstreamSockets.erase(dsIt); + dsIt = t_downstreamConnections.erase(dsIt); } } } @@ -353,6 +415,13 @@ public: ~IncomingTCPConnectionState() { decrementTCPClientCount(d_ci.remote); + if (d_ci.cs != nullptr) { + struct timeval now; + gettimeofday(&now, nullptr); + + auto diff = now - d_connectionStartTime; + d_ci.cs->updateTCPMetrics(d_queriesCount, diff.tv_sec * 1000.0 + diff.tv_usec / 1000.0); + } if (d_ds != nullptr) { if (d_outstanding) { @@ -360,15 +429,15 @@ public: d_outstanding = false; } - if (d_downstreamSocket) { + if (d_downstreamConnection) { try { if (d_lastIOState == IOState::NeedRead) { - cerr<<__func__<<": removing leftover backend read FD "<getHandle()<removeReadFD(d_downstreamSocket->getHandle()); + cerr<<__func__<<": removing leftover backend read FD "<getHandle()<removeReadFD(d_downstreamConnection->getHandle()); } else if (d_lastIOState == IOState::NeedWrite) { - cerr<<__func__<<": removing leftover backend write FD "<getHandle()<removeWriteFD(d_downstreamSocket->getHandle()); + cerr<<__func__<<": removing leftover backend write FD "<getHandle()<removeWriteFD(d_downstreamConnection->getHandle()); } } catch(const FDMultiplexerException& e) { @@ -425,7 +494,7 @@ public: return now; } - boost::optional getBackendReadTTD() const + boost::optional getBackendReadTTD(const struct timeval& now) const { if (d_ds == nullptr) { throw std::runtime_error("getBackendReadTTD() without any backend selected"); @@ -434,27 +503,19 @@ public: return boost::none; } - struct timeval res; - gettimeofday(&res, 0); - + struct timeval res = now; res.tv_sec += d_ds->tcpRecvTimeout; return res; } - boost::optional getClientWriteTTD(boost::optional now=boost::none) const + boost::optional getClientWriteTTD(const struct timeval& now) const { if (g_maxTCPConnectionDuration == 0 && g_tcpSendTimeout == 0) { return boost::none; } - struct timeval res; - if (now) { - res = *now; - } - else { - gettimeofday(&res, 0); - } + struct timeval res = now; if (g_maxTCPConnectionDuration > 0) { auto elapsed = res.tv_sec - d_connectionStartTime.tv_sec; @@ -472,7 +533,7 @@ public: return res; } - boost::optional getBackendWriteTTD() const + boost::optional getBackendWriteTTD(const struct timeval& now) const { if (d_ds == nullptr) { throw std::runtime_error("getBackendReadTTD() called without any backend selected"); @@ -481,9 +542,7 @@ public: return boost::none; } - struct timeval res; - gettimeofday(&res, 0); - + struct timeval res = now; res.tv_sec += d_ds->tcpSendTimeout; return res; @@ -506,6 +565,40 @@ public: return false; } + void dump() const + { + static std::mutex s_mutex; + + struct timeval now; + gettimeofday(&now, 0); + + { + std::lock_guard lock(s_mutex); + fprintf(stderr, "State is %p\n", this); + cerr << "Current state is " << static_cast(d_state) << ", got "< State::doingHandshake) { + cerr << "Handshake done at " << d_handshakeDoneTime.tv_sec << " - " << d_handshakeDoneTime.tv_usec << endl; + } + if (d_state > State::readingQuerySize) { + cerr << "Got first query size at " << d_firstQuerySizeReadTime.tv_sec << " - " << d_firstQuerySizeReadTime.tv_usec << endl; + } + if (d_state > State::readingQuerySize) { + cerr << "Got query size at " << d_querySizeReadTime.tv_sec << " - " << d_querySizeReadTime.tv_usec << endl; + } + if (d_state > State::readingQuery) { + cerr << "Got query at " << d_queryReadTime.tv_sec << " - " << d_queryReadTime.tv_usec << endl; + } + if (d_state > State::sendingQueryToBackend) { + cerr << "Sent query at " << d_querySentTime.tv_sec << " - " << d_querySentTime.tv_usec << endl; + } + if (d_state > State::readingResponseFromBackend) { + cerr << "Got response at " << d_responseReadTime.tv_sec << " - " << d_responseReadTime.tv_usec << endl; + } + } + } + enum class State { doingHandshake, readingQuerySize, readingQuery, sendingQueryToBackend, readingResponseSizeFromBackend, readingResponseFromBackend, sendingResponse }; std::vector d_buffer; @@ -514,9 +607,15 @@ public: IDState d_ids; ConnectionInfo d_ci; TCPIOHandler d_handler; - std::unique_ptr d_downstreamSocket{nullptr}; + std::unique_ptr d_downstreamConnection{nullptr}; std::shared_ptr d_ds{nullptr}; struct timeval d_connectionStartTime; + struct timeval d_handshakeDoneTime; + struct timeval d_firstQuerySizeReadTime; + struct timeval d_querySizeReadTime; + struct timeval d_queryReadTime; + struct timeval d_querySentTime; + struct timeval d_responseReadTime; size_t d_currentPos{0}; size_t d_queriesCount{0}; unsigned int d_remainingTime{0}; @@ -525,7 +624,6 @@ public: uint16_t d_downstreamFailures{0}; State d_state{State::doingHandshake}; IOState d_lastIOState{IOState::Done}; - bool d_freshDownstreamConnection{false}; bool d_readingFirstQuery{true}; bool d_outstanding{false}; bool d_firstResponsePacket{true}; @@ -542,7 +640,7 @@ static void handleResponseSent(std::shared_ptr& stat { handleNewIOState(state, IOState::Done, state->d_ci.fd, handleIOCallback); - if (state->d_isXFR && state->d_downstreamSocket) { + if (state->d_isXFR && state->d_downstreamConnection) { /* we need to resume reading from the backend! */ state->d_state = IncomingTCPConnectionState::State::readingResponseSizeFromBackend; state->d_currentPos = 0; @@ -643,7 +741,7 @@ static void sendQueryToBackend(std::shared_ptr& stat state->d_state = IncomingTCPConnectionState::State::sendingQueryToBackend; state->d_currentPos = 0; state->d_firstResponsePacket = true; - state->d_downstreamSocket.reset(); + state->d_downstreamConnection.reset(); if (state->d_xfrStarted) { /* sorry, but we are not going to resume a XFR if we have already sent some packets @@ -653,9 +751,9 @@ static void sendQueryToBackend(std::shared_ptr& stat while (state->d_downstreamFailures < state->d_ds->retries) { - state->d_downstreamSocket = getConnectionToDownstream(ds, state->d_downstreamFailures, state->d_freshDownstreamConnection); + state->d_downstreamConnection = getConnectionToDownstream(ds, state->d_downstreamFailures, now); - if (!state->d_downstreamSocket) { + if (!state->d_downstreamConnection) { ++ds->tcpGaveUp; ++state->d_ci.cs->tcpGaveUp; vinfolog("Downstream connection to %s failed %d times in a row, giving up.", ds->getName(), state->d_downstreamFailures); @@ -789,11 +887,11 @@ static void handleNewIOState(std::shared_ptr& state, static void handleDownstreamIO(std::shared_ptr& state, struct timeval& now) { - if (state->d_downstreamSocket == nullptr) { + if (state->d_downstreamConnection == nullptr) { throw std::runtime_error("No downstream socket in " + std::string(__func__) + "!"); } - int fd = state->d_downstreamSocket->getHandle(); + int fd = state->d_downstreamConnection->getHandle(); IOState iostate = IOState::Done; bool connectionDied = false; @@ -801,7 +899,7 @@ static void handleDownstreamIO(std::shared_ptr& stat if (state->d_state == IncomingTCPConnectionState::State::sendingQueryToBackend) { int socketFlags = 0; #ifdef MSG_FASTOPEN - if (state->d_ds->tcpFastOpen && state->d_freshDownstreamConnection) { + if (state->d_ds->tcpFastOpen && state->d_downstreamConnection->isFresh()) { socketFlags |= MSG_FASTOPEN; } #endif /* MSG_FASTOPEN */ @@ -809,8 +907,10 @@ static void handleDownstreamIO(std::shared_ptr& stat size_t sent = sendMsgWithTimeout(fd, reinterpret_cast(&state->d_buffer.at(state->d_currentPos)), state->d_buffer.size() - state->d_currentPos, 0, &state->d_ds->remote, &state->d_ds->sourceAddr, state->d_ds->sourceItf, 0, socketFlags); if (sent == state->d_buffer.size()) { /* request sent ! */ + state->d_downstreamConnection->incQueries(); state->d_state = IncomingTCPConnectionState::State::readingResponseSizeFromBackend; state->d_currentPos = 0; + state->d_querySentTime = now; iostate = IOState::NeedRead; if (!state->d_isXFR) { /* don't bother with the outstanding count for XFR queries */ @@ -822,7 +922,7 @@ static void handleDownstreamIO(std::shared_ptr& stat state->d_currentPos += sent; iostate = IOState::NeedWrite; /* disable fast open on partial write */ - state->d_freshDownstreamConnection = false; + state->d_downstreamConnection->setReused(); } } @@ -850,10 +950,11 @@ static void handleDownstreamIO(std::shared_ptr& stat /* but don't reset it either, we will need to read more messages */ } else { - releaseDownstreamConnection(state->d_ds, std::move(state->d_downstreamSocket)); + releaseDownstreamConnection(std::move(state->d_downstreamConnection)); } fd = -1; + state->d_responseReadTime = now; handleResponse(state, now); return; } @@ -879,7 +980,7 @@ static void handleDownstreamIO(std::shared_ptr& stat } /* don't increase this counter when reusing connections */ - if (state->d_freshDownstreamConnection) { + if (state->d_downstreamConnection->isFresh()) { ++state->d_downstreamFailures; } if (state->d_outstanding && state->d_ds != nullptr) { @@ -895,7 +996,7 @@ static void handleDownstreamIO(std::shared_ptr& stat handleNewIOState(state, iostate, fd, handleDownstreamIOCallback); } else { - handleNewIOState(state, iostate, fd, handleDownstreamIOCallback, iostate == IOState::NeedRead ? state->getBackendReadTTD() : state->getBackendWriteTTD()); + handleNewIOState(state, iostate, fd, handleDownstreamIOCallback, iostate == IOState::NeedRead ? state->getBackendReadTTD(now) : state->getBackendWriteTTD(now)); } if (connectionDied) { @@ -906,11 +1007,11 @@ static void handleDownstreamIO(std::shared_ptr& stat static void handleDownstreamIOCallback(int fd, FDMultiplexer::funcparam_t& param) { auto state = boost::any_cast>(param); - if (state->d_downstreamSocket == nullptr) { + if (state->d_downstreamConnection == nullptr) { throw std::runtime_error("No downstream socket in " + std::string(__func__) + "!"); } - if (fd != state->d_downstreamSocket->getHandle()) { - throw std::runtime_error("Unexpected socket descriptor " + std::to_string(fd) + " received in " + std::string(__func__) + ", expected " + std::to_string(state->d_downstreamSocket->getHandle())); + if (fd != state->d_downstreamConnection->getHandle()) { + throw std::runtime_error("Unexpected socket descriptor " + std::to_string(fd) + " received in " + std::string(__func__) + ", expected " + std::to_string(state->d_downstreamConnection->getHandle())); } struct timeval now; @@ -933,6 +1034,7 @@ static void handleIO(std::shared_ptr& state, struct if (state->d_state == IncomingTCPConnectionState::State::doingHandshake) { iostate = state->d_handler.tryHandshake(); if (iostate == IOState::Done) { + state->d_handshakeDoneTime = now; state->d_state = IncomingTCPConnectionState::State::readingQuerySize; } } @@ -941,6 +1043,10 @@ static void handleIO(std::shared_ptr& state, struct iostate = state->d_handler.tryRead(state->d_buffer, state->d_currentPos, sizeof(uint16_t) - state->d_currentPos); if (iostate == IOState::Done) { state->d_state = IncomingTCPConnectionState::State::readingQuery; + state->d_querySizeReadTime = now; + if (state->d_queriesCount == 0) { + state->d_firstQuerySizeReadTime = now; + } state->d_querySize = state->d_buffer.at(0) * 256 + state->d_buffer.at(1); if (state->d_querySize < sizeof(dnsheader)) { /* go away */ @@ -1144,13 +1250,14 @@ void tcpAcceptorThread(void* p) tcpClientCountIncremented = false; try { socklen_t remlen = remote.getSocklen(); - ci = std::unique_ptr(new ConnectionInfo); - ci->cs = cs; + ci = std::unique_ptr(new ConnectionInfo(cs)); #ifdef HAVE_ACCEPT4 ci->fd = accept4(cs->tcpFD, reinterpret_cast(&remote), &remlen, SOCK_NONBLOCK); #else ci->fd = accept(cs->tcpFD, reinterpret_cast(&remote), &remlen); #endif + ++cs->tcpCurrentConnections; + if(ci->fd < 0) { throw std::runtime_error((boost::format("accepting new connection on socket: %s") % strerror(errno)).str()); } diff --git a/pdns/dnsdist-web.cc b/pdns/dnsdist-web.cc index 745a798f0..291e16661 100644 --- a/pdns/dnsdist-web.cc +++ b/pdns/dnsdist-web.cc @@ -470,6 +470,12 @@ static void connectionThread(int sock, ComboAddress remote) output << "# TYPE " << statesbase << "tcpreadtimeouts " << "counter" << "\n"; output << "# HELP " << statesbase << "tcpwritetimeouts " << "The number of TCP write timeouts" << "\n"; output << "# TYPE " << statesbase << "tcpwritetimeouts " << "counter" << "\n"; + output << "# HELP " << statesbase << "tcpcurrentconnections " << "The number of current TCP connections" << "\n"; + output << "# TYPE " << statesbase << "tcpcurrentconnections " << "gauge" << "\n"; + output << "# HELP " << statesbase << "tcpavgqueriesperconn " << "The average number of queries per TCP connection" << "\n"; + output << "# TYPE " << statesbase << "tcpavgqueriesperconn " << "gauge" << "\n"; + output << "# HELP " << statesbase << "tcpavgconnduration " << "The average duration of a TCP connection (ms)" << "\n"; + output << "# TYPE " << statesbase << "tcpavgconnduration " << "gauge" << "\n"; for (const auto& state : *states) { string serverName; @@ -484,18 +490,21 @@ static void connectionThread(int sock, ComboAddress remote) const std::string label = boost::str(boost::format("{server=\"%1%\",address=\"%2%\"}") % serverName % state->remote.toStringWithPort()); - output << statesbase << "queries" << label << " " << state->queries.load() << "\n"; - output << statesbase << "drops" << label << " " << state->reuseds.load() << "\n"; - output << statesbase << "latency" << label << " " << state->latencyUsec/1000.0 << "\n"; - output << statesbase << "senderrors" << label << " " << state->sendErrors.load() << "\n"; - output << statesbase << "outstanding" << label << " " << state->outstanding.load() << "\n"; - output << statesbase << "order" << label << " " << state->order << "\n"; - output << statesbase << "weight" << label << " " << state->weight << "\n"; - output << statesbase << "tcpdiedsendingquery" << label << " " << state->tcpDiedSendingQuery << "\n"; - output << statesbase << "tcpdiedreadingresponse" << label << " " << state->tcpDiedReadingResponse << "\n"; - output << statesbase << "tcpgaveup" << label << " " << state->tcpGaveUp << "\n"; - output << statesbase << "tcpreadtimeouts" << label << " " << state->tcpReadTimeouts << "\n"; - output << statesbase << "tcpwritetimeouts" << label << " " << state->tcpWriteTimeouts << "\n"; + output << statesbase << "queries" << label << " " << state->queries.load() << "\n"; + output << statesbase << "drops" << label << " " << state->reuseds.load() << "\n"; + output << statesbase << "latency" << label << " " << state->latencyUsec/1000.0 << "\n"; + output << statesbase << "senderrors" << label << " " << state->sendErrors.load() << "\n"; + output << statesbase << "outstanding" << label << " " << state->outstanding.load() << "\n"; + output << statesbase << "order" << label << " " << state->order << "\n"; + output << statesbase << "weight" << label << " " << state->weight << "\n"; + output << statesbase << "tcpdiedsendingquery" << label << " " << state->tcpDiedSendingQuery << "\n"; + output << statesbase << "tcpdiedreadingresponse" << label << " " << state->tcpDiedReadingResponse << "\n"; + output << statesbase << "tcpgaveup" << label << " " << state->tcpGaveUp << "\n"; + output << statesbase << "tcpreadtimeouts" << label << " " << state->tcpReadTimeouts << "\n"; + output << statesbase << "tcpwritetimeouts" << label << " " << state->tcpWriteTimeouts << "\n"; + output << statesbase << "tcpcurrentconnections" << label << " " << state->tcpCurrentConnections << "\n"; + output << statesbase << "tcpavgqueriesperconn" << label << " " << state->tcpAvgQueriesPerConnection << "\n"; + output << statesbase << "tcpavgconnduration" << label << " " << state->tcpAvgConnectionDuration << "\n"; } for (const auto& front : g_frontends) { @@ -582,6 +591,9 @@ static void connectionThread(int sock, ComboAddress remote) {"tcpGaveUp", (double)a->tcpGaveUp}, {"tcpReadTimeouts", (double)a->tcpReadTimeouts}, {"tcpWriteTimeouts", (double)a->tcpWriteTimeouts}, + {"tcpCurrentConnections", (double)a->tcpCurrentConnections}, + {"tcpAvgQueriesPerConnection", (double)a->tcpAvgQueriesPerConnection}, + {"tcpAvgConnectionDuration", (double)a->tcpAvgConnectionDuration}, {"dropRate", (double)a->dropRate} }; @@ -610,6 +622,9 @@ static void connectionThread(int sock, ComboAddress remote) { "tcpGaveUp", (double) front->tcpGaveUp.load() }, { "tcpClientTimeouts", (double) front->tcpClientTimeouts }, { "tcpDownstreamTimeouts", (double) front->tcpDownstreamTimeouts }, + { "tcpCurrentConnections", (double) front->tcpCurrentConnections }, + { "tcpAvgQueriesPerConnection", (double) front->tcpAvgQueriesPerConnection }, + { "tcpAvgConnectionDuration", (double) front->tcpAvgConnectionDuration }, }; frontends.push_back(frontend); } diff --git a/pdns/dnsdist.hh b/pdns/dnsdist.hh index 0a3eb01f0..663e18061 100644 --- a/pdns/dnsdist.hh +++ b/pdns/dnsdist.hh @@ -591,6 +591,10 @@ struct ClientState std::atomic tcpGaveUp{0}; std::atomic tcpClientTimeouts{0}; std::atomic tcpDownstreamTimeouts{0}; + std::atomic tcpCurrentConnections{0}; + std::atomic tcpAvgQueriesPerConnection{0.0}; + /* in ms */ + std::atomic tcpAvgConnectionDuration{0.0}; int udpFD{-1}; int tcpFD{-1}; bool muted{false}; @@ -633,6 +637,12 @@ struct ClientState d_filter = bpf; } #endif /* HAVE_EBPF */ + + void updateTCPMetrics(size_t queries, uint64_t durationMs) + { + tcpAvgQueriesPerConnection = (99.0 * tcpAvgQueriesPerConnection / 100.0) + (queries / 100.0); + tcpAvgConnectionDuration = (99.0 * tcpAvgConnectionDuration / 100.0) + (durationMs / 100.0); + } }; class TCPClientCollection { @@ -744,6 +754,10 @@ struct DownstreamState std::atomic tcpGaveUp{0}; std::atomic tcpReadTimeouts{0}; std::atomic tcpWriteTimeouts{0}; + std::atomic tcpCurrentConnections{0}; + std::atomic tcpAvgQueriesPerConnection{0.0}; + /* in ms */ + std::atomic tcpAvgConnectionDuration{0.0}; string name; size_t socketsOffset{0}; double queryLoad{0.0}; @@ -815,6 +829,12 @@ struct DownstreamState void hash(); void setId(const boost::uuids::uuid& newId); void setWeight(int newWeight); + + void updateTCPMetrics(size_t queries, uint64_t durationMs) + { + tcpAvgQueriesPerConnection = (99.0 * tcpAvgQueriesPerConnection / 100.0) + (queries / 100.0); + tcpAvgConnectionDuration = (99.0 * tcpAvgConnectionDuration / 100.0) + (durationMs / 100.0); + } }; using servers_t =vector>; @@ -1010,7 +1030,7 @@ extern std::string g_apiConfigDirectory; extern bool g_servFailOnNoPolicy; extern uint32_t g_hashperturb; extern bool g_useTCPSinglePipe; -extern std::atomic g_downstreamTCPCleanupInterval; +extern uint16_t g_downstreamTCPCleanupInterval; extern size_t g_udpVectorSize; extern bool g_preserveTrailingData; extern bool g_allowEmptyResponse; -- 2.49.0