From 150105a20eaebc8e0041b1a41b81fd90d7dbaba3 Mon Sep 17 00:00:00 2001 From: Remi Gacogne Date: Thu, 1 Mar 2018 11:19:29 +0000 Subject: [PATCH] dnsdist: Add an option to use several source ports toward a backend This is very useful if the backend is distributing queries based only on (source IP, source port, destination IP, destination port), which is for example the case of PowerDNS Recursor with several threads, reuseport set and pdns-distribute-queries not set. --- pdns/dnsdist-console.cc | 2 +- pdns/dnsdist-lua.cc | 7 +- pdns/dnsdist.cc | 99 +++++++++++++++------- pdns/dnsdist.hh | 15 ++-- pdns/dnsdistdist/docs/reference/config.rst | 3 +- 5 files changed, 89 insertions(+), 37 deletions(-) diff --git a/pdns/dnsdist-console.cc b/pdns/dnsdist-console.cc index 7d9d0209e..d83b6c1e2 100644 --- a/pdns/dnsdist-console.cc +++ b/pdns/dnsdist-console.cc @@ -349,7 +349,7 @@ const std::vector g_consoleKeywords{ { "newQPSLimiter", true, "rate, burst", "configure a QPS limiter with that rate and that burst capacity" }, { "newRemoteLogger", true, "address:port [, timeout=2, maxQueuedEntries=100, reconnectWaitTime=1]", "create a Remote Logger object, to use with `RemoteLogAction()` and `RemoteLogResponseAction()`" }, { "newRuleAction", true, "DNS rule, DNS action [, {uuid=\"UUID\"}]", "return a pair of DNS Rule and DNS Action, to be used with `setRules()`" }, - { "newServer", true, "{address=\"ip:port\", qps=1000, order=1, weight=10, pool=\"abuse\", retries=5, tcpConnectTimeout=5, tcpSendTimeout=30, tcpRecvTimeout=30, checkName=\"a.root-servers.net.\", checkType=\"A\", maxCheckFailures=1, mustResolve=false, useClientSubnet=true, source=\"address|interface name|address@interface\"}", "instantiate a server" }, + { "newServer", true, "{address=\"ip:port\", qps=1000, order=1, weight=10, pool=\"abuse\", retries=5, tcpConnectTimeout=5, tcpSendTimeout=30, tcpRecvTimeout=30, checkName=\"a.root-servers.net.\", checkType=\"A\", maxCheckFailures=1, mustResolve=false, useClientSubnet=true, source=\"address|interface name|address@interface\", sockets=1}", "instantiate a server" }, { "newServerPolicy", true, "name, function", "create a policy object from a Lua function" }, { "newSuffixMatchNode", true, "", "returns a new SuffixMatchNode" }, { "NoRecurseAction", true, "", "strip RD bit from the question, let it go through" }, diff --git a/pdns/dnsdist-lua.cc b/pdns/dnsdist-lua.cc index d8a8c916f..a73b6d1b1 100644 --- a/pdns/dnsdist-lua.cc +++ b/pdns/dnsdist-lua.cc @@ -117,6 +117,7 @@ void setupLuaConfig(bool client) } ComboAddress sourceAddr; unsigned int sourceItf = 0; + size_t numberOfSockets = 1; std::set cpus; if(auto addressStr = boost::get(&pvars)) { std::shared_ptr ret; @@ -216,6 +217,10 @@ void setupLuaConfig(bool client) } } + if(vars.count("sockets")) { + numberOfSockets=std::stoi(boost::get(vars["sockets"])); + } + std::shared_ptr ret; try { ComboAddress address(boost::get(vars["address"]), 53); @@ -224,7 +229,7 @@ void setupLuaConfig(bool client) errlog("Error creating new server: %s is not a valid address for a downstream server", boost::get(vars["address"])); return ret; } - ret=std::make_shared(address, sourceAddr, sourceItf); + ret=std::make_shared(address, sourceAddr, sourceItf, numberOfSockets); } catch(const PDNSException& e) { g_outputBuffer="Error creating new server: "+string(e.reason); diff --git a/pdns/dnsdist.cc b/pdns/dnsdist.cc index e525ba9c0..5061e6336 100644 --- a/pdns/dnsdist.cc +++ b/pdns/dnsdist.cc @@ -383,6 +383,34 @@ static bool sendUDPResponse(int origFD, char* response, uint16_t responseLen, in return true; } + +static int pickBackendFD(DownstreamState* state) +{ + return state->fds[state->fdOffset++ % state->fds.size()]; +} + +static int selectBackendFD(const std::shared_ptr& state) +{ + if (state->fds.size() == 1) { + return state->fds[0]; + } + + std::set fds; + for (auto fd : state->fds) { + if (fd >= 0) { + fds.insert(fd); + } + } + + int selected = -1; + int res = waitForMultiData(fds, -1, -1, &selected); + if (res != 1) { + throw std::runtime_error("Error selecting a socket for a backend " + state->remote.toStringWithPort() + ": " + strerror(errno)); + } + + return selected; +} + // listens on a dedicated socket, lobs answers from downstream servers to original requestors void* responderThread(std::shared_ptr dss) try { @@ -403,7 +431,8 @@ try { dnsheader* dh = reinterpret_cast(packet); bool outstandingDecreased = false; try { - ssize_t got = recv(dss->fd, packet, sizeof(packet), 0); + int fd = selectBackendFD(dss); + ssize_t got = recv(fd, packet, sizeof(packet), 0); char * response = packet; size_t responseSize = sizeof(packet); @@ -541,30 +570,37 @@ catch(...) void DownstreamState::reconnect() { connected = false; - if (fd != -1) { - /* shutdown() is needed to wake up recv() in the responderThread */ - shutdown(fd, SHUT_RDWR); - close(fd); - fd = -1; - } - if (!IsAnyAddress(remote)) { - fd = SSocket(remote.sin4.sin_family, SOCK_DGRAM, 0); - if (!IsAnyAddress(sourceAddr)) { - SSetsockopt(fd, SOL_SOCKET, SO_REUSEADDR, 1); - SBind(fd, sourceAddr); - } - try { - SConnect(fd, remote); - connected = true; - } - catch(const std::runtime_error& error) { - infolog("Error connecting to new server with address %s: %s", remote.toStringWithPort(), error.what()); + for (auto& fd : fds) { + if (fd != -1) { + /* shutdown() is needed to wake up recv() in the responderThread */ + shutdown(fd, SHUT_RDWR); + close(fd); + fd = -1; + } + if (!IsAnyAddress(remote)) { + fd = SSocket(remote.sin4.sin_family, SOCK_DGRAM, 0); + if (!IsAnyAddress(sourceAddr)) { + SSetsockopt(fd, SOL_SOCKET, SO_REUSEADDR, 1); + SBind(fd, sourceAddr); + } + try { + SConnect(fd, remote); + connected = true; + } + catch(const std::runtime_error& error) { + infolog("Error connecting to new server with address %s: %s", remote.toStringWithPort(), error.what()); + } } } } -DownstreamState::DownstreamState(const ComboAddress& remote_, const ComboAddress& sourceAddr_, unsigned int sourceItf_): remote(remote_), sourceAddr(sourceAddr_), sourceItf(sourceItf_) +DownstreamState::DownstreamState(const ComboAddress& remote_, const ComboAddress& sourceAddr_, unsigned int sourceItf_, size_t numberOfSockets): remote(remote_), sourceAddr(sourceAddr_), sourceItf(sourceItf_) { + fds.resize(numberOfSockets); + for (auto& fd : fds) { + fd = -1; + } + if (!IsAnyAddress(remote)) { reconnect(); idStates.resize(g_maxOutstanding); @@ -1462,7 +1498,8 @@ static void processUDPQuery(ClientState& cs, LocalHolders& holders, const struct dh->id = idOffset; - ssize_t ret = udpClientSendRequestToBackend(ss, ss->fd, query, dq.len); + int fd = pickBackendFD(ss); + ssize_t ret = udpClientSendRequestToBackend(ss, fd, query, dq.len); if(ret < 0) { ss->sendErrors++; @@ -1791,15 +1828,19 @@ void* healthChecksThread() warnlog("Marking downstream %s as '%s'", dss->getNameWithAddr(), newState ? "up" : "down"); if (newState && !dss->connected) { - try { - SConnect(dss->fd, dss->remote); - dss->connected = true; - dss->tid = thread(responderThread, dss); + for (auto& fd : dss->fds) { + try { + SConnect(fd, dss->remote); + dss->connected = true; + } + catch(const std::runtime_error& error) { + infolog("Error connecting to new server with address %s: %s", dss->remote.toStringWithPort(), error.what()); + newState = false; + dss->connected = false; + } } - catch(const std::runtime_error& error) { - infolog("Error connecting to new server with address %s: %s", dss->remote.toStringWithPort(), error.what()); - newState = false; - dss->connected = false; + if (dss->connected) { + dss->tid = thread(responderThread, dss); } } diff --git a/pdns/dnsdist.hh b/pdns/dnsdist.hh index cfc92475f..131b280a9 100644 --- a/pdns/dnsdist.hh +++ b/pdns/dnsdist.hh @@ -523,15 +523,19 @@ extern std::shared_ptr g_tcpclientthreads; struct DownstreamState { - DownstreamState(const ComboAddress& remote_, const ComboAddress& sourceAddr_, unsigned int sourceItf); - DownstreamState(const ComboAddress& remote_): DownstreamState(remote_, ComboAddress(), 0) {} + DownstreamState(const ComboAddress& remote_, const ComboAddress& sourceAddr_, unsigned int sourceItf, size_t numberOfSockets); + DownstreamState(const ComboAddress& remote_): DownstreamState(remote_, ComboAddress(), 0, 1) {} ~DownstreamState() { - if (fd >= 0) - close(fd); + for (auto& fd : fds) { + if (fd >= 0) { + close(fd); + fd = -1; + } + } } - int fd{-1}; + std::vector fds; std::thread tid; ComboAddress remote; QPSLimiter qps; @@ -551,6 +555,7 @@ struct DownstreamState std::atomic queries{0}; } prev; string name; + size_t fdOffset{0}; double queryLoad{0.0}; double dropRate{0.0}; double latencyUsec{0.0}; diff --git a/pdns/dnsdistdist/docs/reference/config.rst b/pdns/dnsdistdist/docs/reference/config.rst index a89d46634..b136c5cea 100644 --- a/pdns/dnsdistdist/docs/reference/config.rst +++ b/pdns/dnsdistdist/docs/reference/config.rst @@ -268,8 +268,9 @@ Servers -- "address", e.g. "192.0.2.2" -- "interface name", e.g. "eth0" -- "address@interface", e.g. "192.0.2.2@eth0" - addXPF=NUM -- Add the client's IP address and port to the query, along with the original destination address and port, + addXPF=NUM, -- Add the client's IP address and port to the query, along with the original destination address and port, -- using the experimental XPF record from `draft-bellis-dnsop-xpf `_ and the specified option code. Default is disabled (0) + sockets=NUM -- Number of sockets (and thus source ports) used toward the backend server, defaults to a single one }) :param str server_string: A simple IP:PORT string. -- 2.40.0