It only supports one source address/interface per downstream server.
The more I tried to support more than one, the more I realized I was
in fact having grouping several DS into one, without the benefits
of separate stats and status checking. In particular, having several
sources adresses mean that we would get "random" failures if some
addresses are allowed on the backend and some others are not.
Simply adding the same backend several times with different source
addresses means that only the ones with faulty addresses will be
disabled.
Closes #3138.
newServer({address="192.0.2.1", tcpRecvTimeout=10, tcpSendTimeout=10})
```
+Source address
+--------------
+
+In multi-homed setups, it can be useful to be able to select the source address or the outgoing
+interface used by `dnsdist` to contact a downstream server.
+This can be done by using the `source` parameter:
+```
+newServer({address="192.0.2.1", source="192.0.2.127"})
+newServer({address="192.0.2.1", source="eth1"})
+newServer({address="192.0.2.1", source="192.0.2.127@eth1"})
+```
+
+The supported values for `source` are:
+ * an IPv4 or IPv6 address, which must exist on the system
+ * an interface name
+ * an IPv4 or IPv6 address followed by '@' then an interface name
+
+Specifying the interface name is only supported on system having IP_PKTINFO.
+
+
Configuration management
------------------------
At startup, configuration is read from the command line and the
* `errlog(string)`: log at level error
* Server related:
* `newServer("ip:port")`: instantiate a new downstream server with default settings
- * `newServer({address="ip:port", qps=1000, order=1, weight=10, pool="abuse", retries=5, tcpSendTimeout=30, tcpRecvTimeout=30, checkName="a.root-servers.net.", checkType="A", mustResolve=false, useClientSubnet=true})`:
+ * `newServer({address="ip:port", qps=1000, order=1, weight=10, pool="abuse", retries=5, tcpSendTimeout=30, tcpRecvTimeout=30, checkName="a.root-servers.net.", checkType="A", mustResolve=false, useClientSubnet=true, source="address|interface name|address@interface"})`:
instantiate a server with additional parameters
* `showServers()`: output all servers
* `getServer(n)`: returns server with index n
#include <fstream>
#include "dnswriter.hh"
#include "lock.hh"
+#include <net/if.h>
using std::thread;
if(client) {
return std::make_shared<DownstreamState>(ComboAddress());
}
+ ComboAddress sourceAddr;
+ unsigned int sourceItf = 0;
if(auto address = boost::get<string>(&pvars)) {
std::shared_ptr<DownstreamState> ret;
try {
return ret;
}
auto vars=boost::get<newserver_t>(pvars);
+
+ if(vars.count("source")) {
+ /* handle source in the following forms:
+ - v4 address ("192.0.2.1")
+ - v6 address ("2001:DB8::1")
+ - interface name ("eth0")
+ - v4 address and interface name ("192.0.2.1@eth0")
+ - v6 address and interface name ("2001:DB8::1@eth0")
+ */
+ const string source = boost::get<string>(vars["source"]);
+ bool parsed = false;
+ std::string::size_type pos = source.find("@");
+ if (pos == std::string::npos) {
+ /* no '@', try to parse that as a valid v4/v6 address */
+ try {
+ sourceAddr = ComboAddress(source);
+ parsed = true;
+ }
+ catch(...)
+ {
+ }
+ }
+
+ if (parsed == false)
+ {
+ /* try to parse as interface name, or v4/v6@itf */
+ string itfName = source.substr(pos == std::string::npos ? 0 : pos + 1);
+ unsigned int itfIdx = if_nametoindex(itfName.c_str());
+
+ if (itfIdx != 0) {
+ if (pos == 0 || pos == std::string::npos) {
+ /* "eth0" or "@eth0" */
+ sourceItf = itfIdx;
+ }
+ else {
+ /* "192.0.2.1@eth0" */
+ sourceAddr = ComboAddress(source.substr(0, pos));
+ sourceItf = itfIdx;
+ }
+ }
+ else
+ {
+ warnlog("Dismissing source %s because '%s' is not a valid interface name", source, itfName);
+ }
+ }
+ }
+
std::shared_ptr<DownstreamState> ret;
try {
- ret=std::make_shared<DownstreamState>(ComboAddress(boost::get<string>(vars["address"]), 53));
+ ret=std::make_shared<DownstreamState>(ComboAddress(boost::get<string>(vars["address"]), 53), sourceAddr, sourceItf);
}
catch(std::exception& e) {
g_outputBuffer="Error creating new server: "+string(e.what());
errlog("Error creating new server with address %s: %s", boost::get<string>(vars["address"]), e.what());
return ret;
}
-
+
if(vars.count("qps")) {
int qps=std::stoi(boost::get<string>(vars["qps"]));
ret->qps=QPSLimiter(qps, qps);
Let's start naively.
*/
-static int setupTCPDownstream(const ComboAddress& remote)
+static int setupTCPDownstream(shared_ptr<DownstreamState> ds)
{
- vinfolog("TCP connecting to downstream %s", remote.toStringWithPort());
- int sock = SSocket(remote.sin4.sin_family, SOCK_STREAM, 0);
- SConnect(sock, remote);
+ vinfolog("TCP connecting to downstream %s", ds->remote.toStringWithPort());
+ int sock = SSocket(ds->remote.sin4.sin_family, SOCK_STREAM, 0);
+ if (!IsAnyAddress(ds->sourceAddr)) {
+ SSetsockopt(sock, SOL_SOCKET, SO_REUSEADDR, 1);
+ SBind(sock, ds->sourceAddr);
+ }
+ SConnect(sock, ds->remote);
setNonBlocking(sock);
return sock;
}
return false;
}
+static bool sendNonBlockingMsgLen(int fd, uint16_t len, int timeout, ComboAddress& dest, ComboAddress& local, unsigned int localItf)
+try
+{
+ if (localItf == 0)
+ return putNonBlockingMsgLen(fd, len, timeout);
+
+ uint16_t raw = htons(len);
+ ssize_t ret = sendMsgWithTimeout(fd, (char*) &raw, sizeof raw, timeout, dest, local, localItf);
+ return ret == sizeof raw;
+}
+catch(...) {
+ return false;
+}
+
TCPClientCollection g_tcpclientthreads;
void* tcpClientThread(int pipefd)
}
if(sockets.count(ds->remote) == 0) {
- dsock=sockets[ds->remote]=setupTCPDownstream(ds->remote);
+ dsock=sockets[ds->remote]=setupTCPDownstream(ds);
}
else
dsock=sockets[ds->remote];
break;
}
- if(!putNonBlockingMsgLen(dsock, queryLen, ds->tcpSendTimeout)) {
+ if(!sendNonBlockingMsgLen(dsock, queryLen, ds->tcpSendTimeout, ds->remote, ds->sourceAddr, ds->sourceItf)) {
vinfolog("Downstream connection to %s died on us, getting a new one!", ds->getName());
close(dsock);
- sockets[ds->remote]=dsock=setupTCPDownstream(ds->remote);
+ sockets[ds->remote]=dsock=setupTCPDownstream(ds);
downstream_failures++;
goto retry;
}
try {
- writen2WithTimeout(dsock, query, queryLen, ds->tcpSendTimeout);
+ if (ds->sourceItf == 0) {
+ writen2WithTimeout(dsock, query, queryLen, ds->tcpSendTimeout);
+ }
+ else {
+ sendMsgWithTimeout(dsock, query, queryLen, ds->tcpSendTimeout, ds->remote, ds->sourceAddr, ds->sourceItf);
+ }
}
catch(const runtime_error& e) {
vinfolog("Downstream connection to %s died on us, getting a new one!", ds->getName());
close(dsock);
- sockets[ds->remote]=dsock=setupTCPDownstream(ds->remote);
+ sockets[ds->remote]=dsock=setupTCPDownstream(ds);
downstream_failures++;
goto retry;
}
if(!getNonBlockingMsgLen(dsock, &rlen, ds->tcpRecvTimeout)) {
vinfolog("Downstream connection to %s died on us phase 2, getting a new one!", ds->getName());
close(dsock);
- sockets[ds->remote]=dsock=setupTCPDownstream(ds->remote);
+ sockets[ds->remote]=dsock=setupTCPDownstream(ds);
downstream_failures++;
goto retry;
}
return 0;
}
-DownstreamState::DownstreamState(const ComboAddress& remote_): checkName("a.root-servers.net."), checkType(QType::A), mustResolve(false)
+DownstreamState::DownstreamState(const ComboAddress& remote_, const ComboAddress& sourceAddr_, unsigned int sourceItf_): remote(remote_), sourceAddr(sourceAddr_), sourceItf(sourceItf_)
{
- remote = remote_;
-
fd = SSocket(remote.sin4.sin_family, SOCK_DGRAM, 0);
+ if (!IsAnyAddress(sourceAddr)) {
+ SSetsockopt(fd, SOL_SOCKET, SO_REUSEADDR, 1);
+ SBind(fd, sourceAddr);
+ }
SConnect(fd, remote);
idStates.resize(g_maxOutstanding);
sw.start();
return 0x100 * (*z) + *(z+1);
}
+static ssize_t udpClientSendRequestToBackend(DownstreamState* ss, const int sd, const char* request, const size_t requestLen)
+{
+ if (ss->sourceItf == 0) {
+ return send(sd, request, requestLen, 0);
+ }
+
+ struct msghdr msgh;
+ struct iovec iov;
+ char cbuf[256];
+ fillMSGHdr(&msgh, &iov, cbuf, sizeof(cbuf), const_cast<char*>(request), requestLen, &ss->remote);
+ addCMsgSrcAddr(&msgh, cbuf, &ss->sourceAddr, ss->sourceItf);
+ return sendmsg(sd, &msgh, 0);
+}
+
// listens to incoming queries, sends out to downstream servers, noting the intended return path
static void* udpClientThread(ClientState* cs)
try
}
if (largerQuery.empty()) {
- ret = send(ss->fd, query, len, 0);
+ ret = udpClientSendRequestToBackend(ss, ss->fd, query, len);
}
else {
- ret = send(ss->fd, largerQuery.c_str(), largerQuery.size(), 0);
+ ret = udpClientSendRequestToBackend(ss, ss->fd, largerQuery.c_str(), largerQuery.size());
largerQuery.clear();
}
}
-bool upCheck(const ComboAddress& remote, const DNSName& checkName, const QType& checkType, bool mustResolve)
+static bool upCheck(DownstreamState& ds)
try
{
vector<uint8_t> packet;
- DNSPacketWriter dpw(packet, checkName, checkType.getCode());
+ DNSPacketWriter dpw(packet, ds.checkName, ds.checkType.getCode());
dnsheader * requestHeader = dpw.getHeader();
requestHeader->rd=true;
- Socket sock(remote.sin4.sin_family, SOCK_DGRAM);
+ Socket sock(ds.remote.sin4.sin_family, SOCK_DGRAM);
sock.setNonBlocking();
- sock.connect(remote);
- sock.write((char*)&packet[0], packet.size());
+ if (!IsAnyAddress(ds.sourceAddr)) {
+ sock.setReuseAddr();
+ sock.bind(ds.sourceAddr);
+ }
+ sock.connect(ds.remote);
+ ssize_t sent = udpClientSendRequestToBackend(&ds, sock.getHandle(), (char*)&packet[0], packet.size());
+ if (sent < 0)
+ return false;
+
int ret=waitForRWData(sock.getHandle(), true, 1, 0);
if(ret < 0 || !ret) // error, timeout, both are down!
return false;
string reply;
- ComboAddress dest=remote;
- sock.recvFrom(reply, dest);
+ sock.recvFrom(reply, ds.remote);
const dnsheader * responseHeader = (const dnsheader *) reply.c_str();
return false;
if (responseHeader->rcode == RCode::ServFail)
return false;
- if (mustResolve && (responseHeader->rcode == RCode::NXDomain || responseHeader->rcode == RCode::Refused))
+ if (ds.mustResolve && (responseHeader->rcode == RCode::NXDomain || responseHeader->rcode == RCode::Refused))
return false;
// XXX fixme do bunch of checking here etc
for(auto& dss : g_dstates.getCopy()) { // this points to the actual shared_ptrs!
if(dss->availability==DownstreamState::Availability::Auto) {
- bool newState=upCheck(dss->remote, dss->checkName, dss->checkType, dss->mustResolve);
+ bool newState=upCheck(*dss);
if(newState != dss->upStatus) {
warnlog("Marking downstream %s as '%s'", dss->getNameWithAddr(), newState ? "up" : "down");
}
for(auto& dss : g_dstates.getCopy()) { // it is a copy, but the internal shared_ptrs are the real deal
if(dss->availability==DownstreamState::Availability::Auto) {
- bool newState=upCheck(dss->remote, dss->checkName, dss->checkType, dss->mustResolve);
+ bool newState=upCheck(*dss);
warnlog("Marking downstream %s as '%s'", dss->getNameWithAddr(), newState ? "up" : "down");
dss->upStatus = newState;
}
struct DownstreamState
{
- DownstreamState(const ComboAddress& remote_);
+ DownstreamState(const ComboAddress& remote_, const ComboAddress& sourceAddr_, unsigned int sourceItf);
+ DownstreamState(const ComboAddress& remote_): DownstreamState(remote_, ComboAddress(), 0) {}
int fd;
std::thread tid;
ComboAddress remote;
QPSLimiter qps;
vector<IDState> idStates;
- DNSName checkName;
- QType checkType;
+ ComboAddress sourceAddr;
+ DNSName checkName{"a.root-servers.net."};
+ QType checkType{QType::A};
std::atomic<uint64_t> idOffset{0};
std::atomic<uint64_t> sendErrors{0};
std::atomic<uint64_t> outstanding{0};
int weight{1};
int tcpRecvTimeout{30};
int tcpSendTimeout{30};
+ unsigned int sourceItf{0};
uint16_t retries{5};
StopWatch sw;
set<string> pools;
enum class Availability { Up, Down, Auto} availability{Availability::Auto};
- bool mustResolve;
+ bool mustResolve{false};
bool upStatus{false};
bool useECS{false};
bool isUp() const
msgh.msg_namelen = i->second.remote.getSocklen();
if(i->second.anyLocal) {
- addCMsgSrcAddr(&msgh, cbuf, i->second.anyLocal.get_ptr());
+ addCMsgSrcAddr(&msgh, cbuf, i->second.anyLocal.get_ptr(), 0);
}
if(sendmsg(i->second.outsock, &msgh, 0) < 0)
L<<Logger::Warning<<"dnsproxy.cc: Error sending reply with sendmsg (socket="<<i->second.outsock<<"): "<<strerror(errno)<<endl;
msgh.msg_namelen = to.getSocklen();
if(from.sin4.sin_family) {
- addCMsgSrcAddr(&msgh, cbuf, &from);
+ addCMsgSrcAddr(&msgh, cbuf, &from, 0);
}
else {
msgh.msg_control=NULL;
*place &= (~((1<<bitsleft)-1));
}
+ssize_t sendMsgWithTimeout(int fd, const char* buffer, size_t len, int timeout, ComboAddress& dest, const ComboAddress& local, unsigned int localItf)
+{
+ struct msghdr msgh;
+ struct iovec iov;
+ char cbuf[256];
+ bool firstTry = true;
+ fillMSGHdr(&msgh, &iov, cbuf, sizeof(cbuf), const_cast<char*>(buffer), len, &dest);
+ addCMsgSrcAddr(&msgh, cbuf, &local, localItf);
+
+ do {
+ ssize_t written = sendmsg(fd, &msgh, 0);
+
+ if (written > 0)
+ return written;
+
+ if (errno == EAGAIN) {
+ if (firstTry) {
+ int res = waitForRWData(fd, false, timeout, 0);
+ if (res > 0) {
+ /* there is room available */
+ firstTry = false;
+ }
+ else if (res == 0) {
+ throw runtime_error("Timeout while waiting to write data");
+ } else {
+ throw runtime_error("Error while waiting for room to write data");
+ }
+ }
+ else {
+ throw runtime_error("Timeout while waiting to write data");
+ }
+ }
+ else {
+ unixDie("failed in write2WithTimeout");
+ }
+ }
+ while (firstTry);
+
+ return 0;
+}
+
template class NetmaskTree<bool>;
+
bool HarvestTimestamp(struct msghdr* msgh, struct timeval* tv);
void fillMSGHdr(struct msghdr* msgh, struct iovec* iov, char* cbuf, size_t cbufsize, char* data, size_t datalen, ComboAddress* addr);
int sendfromto(int sock, const char* data, int len, int flags, const ComboAddress& from, const ComboAddress& to);
+ssize_t sendMsgWithTimeout(int fd, const char* buffer, size_t len, int timeout, ComboAddress& dest, const ComboAddress& local, unsigned int localItf);
+
#endif
extern template class NetmaskTree<bool>;
throw PDNSException("Regular expression did not compile");
}
-void addCMsgSrcAddr(struct msghdr* msgh, void* cmsgbuf, const ComboAddress* source)
+void addCMsgSrcAddr(struct msghdr* msgh, void* cmsgbuf, const ComboAddress* source, int itfIndex)
{
struct cmsghdr *cmsg = NULL;
pkt = (struct in6_pktinfo *) CMSG_DATA(cmsg);
memset(pkt, 0, sizeof(*pkt));
pkt->ipi6_addr = source->sin6.sin6_addr;
+ pkt->ipi6_ifindex = itfIndex;
msgh->msg_controllen = cmsg->cmsg_len; // makes valgrind happy and is slightly better style
}
else {
pkt = (struct in_pktinfo *) CMSG_DATA(cmsg);
memset(pkt, 0, sizeof(*pkt));
pkt->ipi_spec_dst = source->sin4.sin_addr;
+ pkt->ipi_ifindex = itfIndex;
msgh->msg_controllen = cmsg->cmsg_len;
#endif
#ifdef IP_SENDSRCADDR
};
union ComboAddress;
-void addCMsgSrcAddr(struct msghdr* msgh, void* cmsgbuf, const ComboAddress* source);
+/* itfIndex is an interface index, as returned by if_nametoindex(). 0 means default. */
+void addCMsgSrcAddr(struct msghdr* msgh, void* cmsgbuf, const ComboAddress* source, int itfIndex);
unsigned int getFilenumLimit(bool hardOrSoft=0);
void setFilenumLimit(unsigned int lim);
fillMSGHdr(&msgh, &iov, cbuf, 0, (char*)buffer.c_str(), buffer.length(), &p->d_remote);
if(p->d_anyLocal) {
- addCMsgSrcAddr(&msgh, cbuf, p->d_anyLocal.get_ptr());
+ addCMsgSrcAddr(&msgh, cbuf, p->d_anyLocal.get_ptr(), 0);
}
else {
msgh.msg_control=NULL;
char cbuf[256];
fillMSGHdr(&msgh, &iov, cbuf, 0, (char*)&*packet.begin(), packet.size(), &dc->d_remote);
if(dc->d_local.sin4.sin_family)
- addCMsgSrcAddr(&msgh, cbuf, &dc->d_local);
+ addCMsgSrcAddr(&msgh, cbuf, &dc->d_local, 0);
else
msgh.msg_control=NULL;
sendmsg(dc->d_socket, &msgh, 0);
char cbuf[256];
fillMSGHdr(&msgh, &iov, cbuf, 0, (char*)response.c_str(), response.length(), const_cast<ComboAddress*>(&fromaddr));
if(destaddr.sin4.sin_family) {
- addCMsgSrcAddr(&msgh, cbuf, &destaddr);
+ addCMsgSrcAddr(&msgh, cbuf, &destaddr, 0);
}
else {
msgh.msg_control=NULL;