static void handleIOCallback(int fd, FDMultiplexer::funcparam_t& param);
static void handleNewIOState(std::shared_ptr<IncomingTCPConnectionState>& state, IOState iostate, const int fd, FDMultiplexer::callbackfunc_t callback, boost::optional<struct timeval> ttd=boost::none);
static void handleIO(std::shared_ptr<IncomingTCPConnectionState>& state, struct timeval& now);
+static void handleDownstreamIO(std::shared_ptr<IncomingTCPConnectionState>& state, struct timeval& now);
-static void handleResponseSent(std::shared_ptr<IncomingTCPConnectionState>& state)
+static void handleResponseSent(std::shared_ptr<IncomingTCPConnectionState>& state, struct timeval& now)
{
handleNewIOState(state, IOState::Done, state->d_ci.fd, handleIOCallback);
/* we need to resume reading from the backend! */
state->d_state = IncomingTCPConnectionState::State::readingResponseSizeFromBackend;
state->d_currentPos = 0;
- //cerr<<__func__<<": add read client FD "<<state->d_ci.fd<<endl;
- // XXX: if we ever do TLS toward the backend, we need to try to read right away
- // because the TLS layer might have more bits already waiting for us
- handleNewIOState(state, IOState::NeedRead, state->d_downstreamSocket->getHandle(), handleDownstreamIOCallback, state->getBackendReadTTD());
+ handleDownstreamIO(state, now);
return;
}
return;
}
- struct timeval now;
- gettimeofday(&now, 0);
if (state->maxConnectionDurationReached(g_maxTCPConnectionDuration, now)) {
vinfolog("Terminating TCP connection from %s because it reached the maximum TCP connection duration", state->d_ci.remote.toStringWithPort());
return;
handleIO(state, now);
}
-static void sendResponse(std::shared_ptr<IncomingTCPConnectionState>& state)
+static void sendResponse(std::shared_ptr<IncomingTCPConnectionState>& state, struct timeval& now)
{
state->d_state = IncomingTCPConnectionState::State::sendingResponse;
const uint8_t sizeBytes[] = { static_cast<uint8_t>(state->d_responseSize / 256), static_cast<uint8_t>(state->d_responseSize % 256) };
state->d_currentPos = 0;
- try {
- auto iostate = state->d_handler.tryWrite(state->d_responseBuffer, state->d_currentPos, state->d_responseBuffer.size());
- if (iostate == IOState::Done) {
-
- handleResponseSent(state);
- return;
- }
- else {
- //cerr<<__func__<<": adding client write FD "<<state->d_ci.fd<<endl;
- handleNewIOState(state, IOState::NeedWrite, state->d_ci.fd, handleIOCallback, state->getClientWriteTTD());
- }
- }
- catch (const std::exception& e) {
- vinfolog("Got an exception while writing TCP response to %s: %s", state->d_ci.remote.toStringWithPort(), e.what());
- ++state->d_ci.cs->tcpDiedSendingResponse;
- handleNewIOState(state, IOState::Done, state->d_ci.fd, handleIOCallback);
- }
+ handleIO(state, now);
}
-static void handleResponse(std::shared_ptr<IncomingTCPConnectionState>& state)
+static void handleResponse(std::shared_ptr<IncomingTCPConnectionState>& state, struct timeval& now)
{
if (state->d_responseSize < sizeof(dnsheader)) {
return;
state->d_xfrStarted = true;
}
- sendResponse(state);
+ sendResponse(state, now);
++g_stats.responses;
struct timespec answertime;
g_rings.insertResponse(answertime, state->d_ci.remote, *dr.qname, dr.qtype, static_cast<unsigned int>(udiff), static_cast<unsigned int>(state->d_responseBuffer.size()), cleartextDH, state->d_ds->remote);
}
-static void sendQueryToBackend(std::shared_ptr<IncomingTCPConnectionState>& state)
+static void sendQueryToBackend(std::shared_ptr<IncomingTCPConnectionState>& state, struct timeval& now)
{
auto ds = state->d_ds;
state->d_state = IncomingTCPConnectionState::State::sendingQueryToBackend;
return;
}
- //cerr<<__func__<<": add write backend FD "<<state->d_downstreamSocket->getHandle()<<endl;
- handleNewIOState(state, IOState::NeedWrite, state->d_downstreamSocket->getHandle(), handleDownstreamIOCallback, state->getBackendWriteTTD());
+ handleDownstreamIO(state, now);
return;
}
vinfolog("Downstream connection to %s failed %u times in a row, giving up.", ds->getName(), state->d_downstreamFailures);
}
-static void handleQuery(std::shared_ptr<IncomingTCPConnectionState>& state)
+static void handleQuery(std::shared_ptr<IncomingTCPConnectionState>& state, struct timeval& now)
{
if (state->d_querySize < sizeof(dnsheader)) {
++g_stats.nonCompliantQueries;
/* we need an accurate ("real") value for the response and
to store into the IDS, but not for insertion into the
rings for example */
- struct timespec now;
struct timespec queryRealTime;
- gettime(&now);
gettime(&queryRealTime, true);
auto query = reinterpret_cast<char*>(&state->d_buffer.at(0));
if (dnsCryptResponse) {
state->d_responseBuffer = std::move(*dnsCryptResponse);
state->d_responseSize = state->d_responseBuffer.size();
- sendResponse(state);
+ sendResponse(state, now);
return;
}
state->d_buffer.resize(dq.len);
state->d_responseBuffer = std::move(state->d_buffer);
state->d_responseSize = state->d_responseBuffer.size();
- sendResponse(state);
+ sendResponse(state, now);
return;
}
that could occur if we had to deal with the size during the processing,
especially alignment issues */
state->d_buffer.insert(state->d_buffer.begin(), sizeBytes, sizeBytes + 2);
- sendQueryToBackend(state);
+ sendQueryToBackend(state, now);
}
static void handleNewIOState(std::shared_ptr<IncomingTCPConnectionState>& state, IOState iostate, const int fd, FDMultiplexer::callbackfunc_t callback, boost::optional<struct timeval> ttd)
}
}
-static void handleDownstreamIOCallback(int fd, FDMultiplexer::funcparam_t& param)
+static void handleDownstreamIO(std::shared_ptr<IncomingTCPConnectionState>& state, struct timeval& now)
{
- auto state = boost::any_cast<std::shared_ptr<IncomingTCPConnectionState>>(param);
if (state->d_downstreamSocket == nullptr) {
throw std::runtime_error("No downstream socket in " + std::string(__func__) + "!");
}
- if (fd != state->d_downstreamSocket->getHandle()) {
- throw std::runtime_error("Unexpected socket descriptor " + std::to_string(fd) + " received in " + std::string(__func__) + ", expected " + std::to_string(state->d_downstreamSocket->getHandle()));
- }
+ int fd = state->d_downstreamSocket->getHandle();
IOState iostate = IOState::Done;
bool connectionDied = false;
- struct timeval now;
- gettimeofday(&now, 0);
try {
if (state->d_state == IncomingTCPConnectionState::State::sendingQueryToBackend) {
}
fd = -1;
- handleResponse(state);
+ handleResponse(state, now);
return;
}
}
}
if (connectionDied) {
- sendQueryToBackend(state);
+ sendQueryToBackend(state, now);
+ }
+}
+
+static void handleDownstreamIOCallback(int fd, FDMultiplexer::funcparam_t& param)
+{
+ auto state = boost::any_cast<std::shared_ptr<IncomingTCPConnectionState>>(param);
+ if (state->d_downstreamSocket == nullptr) {
+ throw std::runtime_error("No downstream socket in " + std::string(__func__) + "!");
+ }
+ if (fd != state->d_downstreamSocket->getHandle()) {
+ throw std::runtime_error("Unexpected socket descriptor " + std::to_string(fd) + " received in " + std::string(__func__) + ", expected " + std::to_string(state->d_downstreamSocket->getHandle()));
}
+
+ struct timeval now;
+ gettimeofday(&now, 0);
+ handleDownstreamIO(state, now);
}
static void handleIO(std::shared_ptr<IncomingTCPConnectionState>& state, struct timeval& now)
iostate = state->d_handler.tryRead(state->d_buffer, state->d_currentPos, state->d_querySize);
if (iostate == IOState::Done) {
handleNewIOState(state, IOState::Done, fd, handleIOCallback);
- handleQuery(state);
+ handleQuery(state, now);
return;
}
}
if (state->d_state == IncomingTCPConnectionState::State::sendingResponse) {
iostate = state->d_handler.tryWrite(state->d_responseBuffer, state->d_currentPos, state->d_responseBuffer.size());
if (iostate == IOState::Done) {
- handleResponseSent(state);
+ handleResponseSent(state, now);
return;
}
}