close(d_devpollfd);
}
- virtual int run(struct timeval* tv);
+ virtual int run(struct timeval* tv) override;
+ virtual void getAvailableFDs(std::vector<int>& fds, int timeout) override;
- virtual void addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, const funcparam_t& parameter);
- virtual void removeFD(callbackmap_t& cbmap, int fd);
- string getName()
+ virtual void addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, const funcparam_t& parameter) override;
+ virtual void removeFD(callbackmap_t& cbmap, int fd) override;
+ string getName() const override
{
return "/dev/poll";
}
}
}
+void DevPollFDMultiplexer::getAvailableFDs(std::vector<int>& fds, int timeout)
+{
+ struct dvpoll dvp;
+ dvp.dp_nfds = d_readCallbacks.size() + d_writeCallbacks.size();
+ dvp.dp_fds = new pollfd[dvp.dp_nfds];
+ dvp.dp_timeout = timeout;
+ int ret=ioctl(d_devpollfd, DP_POLL, &dvp);
+
+ if(ret < 0 && errno!=EINTR) {
+ delete[] dvp.dp_fds;
+ throw FDMultiplexerException("/dev/poll returned error: "+stringerror());
+ }
+
+ for(int n=0; n < ret; ++n) {
+ fds.push_back(dvp.dp_fds[n].fd);
+ }
+
+ delete[] dvp.dp_fds;
+}
+
int DevPollFDMultiplexer::run(struct timeval* now)
{
if(d_inrun) {
dvp.dp_timeout = 500;
int ret=ioctl(d_devpollfd, DP_POLL, &dvp);
gettimeofday(now,0); // MANDATORY!
-
+
if(ret < 0 && errno!=EINTR) {
delete[] dvp.dp_fds;
throw FDMultiplexerException("/dev/poll returned error: "+stringerror());
{ "newQPSLimiter", true, "rate, burst", "configure a QPS limiter with that rate and that burst capacity" },
{ "newRemoteLogger", true, "address:port [, timeout=2, maxQueuedEntries=100, reconnectWaitTime=1]", "create a Remote Logger object, to use with `RemoteLogAction()` and `RemoteLogResponseAction()`" },
{ "newRuleAction", true, "DNS rule, DNS action", "return a pair of DNS Rule and DNS Action, to be used with `setRules()`" },
- { "newServer", true, "{address=\"ip:port\", qps=1000, order=1, weight=10, pool=\"abuse\", retries=5, tcpConnectTimeout=5, tcpSendTimeout=30, tcpRecvTimeout=30, checkName=\"a.root-servers.net.\", checkType=\"A\", maxCheckFailures=1, mustResolve=false, useClientSubnet=true, source=\"address|interface name|address@interface\"}", "instantiate a server" },
+ { "newServer", true, "{address=\"ip:port\", qps=1000, order=1, weight=10, pool=\"abuse\", retries=5, tcpConnectTimeout=5, tcpSendTimeout=30, tcpRecvTimeout=30, checkName=\"a.root-servers.net.\", checkType=\"A\", maxCheckFailures=1, mustResolve=false, useClientSubnet=true, source=\"address|interface name|address@interface\", sockets=1}", "instantiate a server" },
{ "newServerPolicy", true, "name, function", "create a policy object from a Lua function" },
{ "newSuffixMatchNode", true, "", "returns a new SuffixMatchNode" },
{ "NoRecurseAction", true, "", "strip RD bit from the question, let it go through" },
}
ComboAddress sourceAddr;
unsigned int sourceItf = 0;
+ size_t numberOfSockets = 1;
if(auto addressStr = boost::get<string>(&pvars)) {
std::shared_ptr<DownstreamState> ret;
try {
}
}
+ if (vars.count("sockets")) {
+ numberOfSockets = std::stoul(boost::get<string>(vars["sockets"]));
+ if (numberOfSockets == 0) {
+ warnlog("Dismissing invalid number of sockets '%s', using 1 instead", boost::get<string>(vars["sockets"]));
+ numberOfSockets = 1;
+ }
+ }
+
std::shared_ptr<DownstreamState> ret;
try {
ComboAddress address(boost::get<string>(vars["address"]), 53);
errlog("Error creating new server: %s is not a valid address for a downstream server", boost::get<string>(vars["address"]));
return ret;
}
- ret=std::make_shared<DownstreamState>(address, sourceAddr, sourceItf);
+ ret=std::make_shared<DownstreamState>(address, sourceAddr, sourceItf, numberOfSockets);
}
catch(const PDNSException& e) {
g_outputBuffer="Error creating new server: "+string(e.reason);
return true;
}
+
+static int pickBackendSocketForSending(DownstreamState* state)
+{
+ return state->sockets[state->socketsOffset++ % state->sockets.size()];
+}
+
+static void pickBackendSocketsReadyForReceiving(const std::shared_ptr<DownstreamState>& state, std::vector<int>& ready)
+{
+ ready.clear();
+
+ if (state->sockets.size() == 1) {
+ ready.push_back(state->sockets[0]);
+ return ;
+ }
+
+ {
+ std::lock_guard<std::mutex> lock(state->socketsLock);
+ state->mplexer->getAvailableFDs(ready, -1);
+ }
+}
+
// listens on a dedicated socket, lobs answers from downstream servers to original requestors
void* responderThread(std::shared_ptr<DownstreamState> state)
try {
vector<uint8_t> rewrittenResponse;
uint16_t queryId = 0;
+ std::vector<int> sockets;
+ sockets.reserve(state->sockets.size());
+
for(;;) {
dnsheader* dh = reinterpret_cast<struct dnsheader*>(packet);
bool outstandingDecreased = false;
try {
- ssize_t got = recv(state->fd, packet, sizeof(packet), 0);
- char * response = packet;
- size_t responseSize = sizeof(packet);
+ pickBackendSocketsReadyForReceiving(state, sockets);
+ for (const auto& fd : sockets) {
+ ssize_t got = recv(fd, packet, sizeof(packet), 0);
+ char * response = packet;
+ size_t responseSize = sizeof(packet);
- if (got < (ssize_t) sizeof(dnsheader))
- continue;
+ if (got < (ssize_t) sizeof(dnsheader))
+ continue;
- uint16_t responseLen = (uint16_t) got;
- queryId = dh->id;
+ uint16_t responseLen = (uint16_t) got;
+ queryId = dh->id;
- if(queryId >= state->idStates.size())
- continue;
+ if(queryId >= state->idStates.size())
+ continue;
- IDState* ids = &state->idStates[queryId];
- int origFD = ids->origFD;
+ IDState* ids = &state->idStates[queryId];
+ int origFD = ids->origFD;
- if(origFD < 0) // duplicate
- continue;
+ if(origFD < 0) // duplicate
+ continue;
- /* setting age to 0 to prevent the maintainer thread from
- cleaning this IDS while we process the response.
- We have already a copy of the origFD, so it would
- mostly mess up the outstanding counter.
- */
- ids->age = 0;
+ /* setting age to 0 to prevent the maintainer thread from
+ cleaning this IDS while we process the response.
+ We have already a copy of the origFD, so it would
+ mostly mess up the outstanding counter.
+ */
+ ids->age = 0;
- if (!responseContentMatches(response, responseLen, ids->qname, ids->qtype, ids->qclass, state->remote)) {
- continue;
- }
+ if (!responseContentMatches(response, responseLen, ids->qname, ids->qtype, ids->qclass, state->remote)) {
+ continue;
+ }
- --state->outstanding; // you'd think an attacker could game this, but we're using connected socket
- outstandingDecreased = true;
+ --state->outstanding; // you'd think an attacker could game this, but we're using connected socket
+ outstandingDecreased = true;
- if(dh->tc && g_truncateTC) {
- truncateTC(response, &responseLen);
- }
+ if(dh->tc && g_truncateTC) {
+ truncateTC(response, &responseLen);
+ }
- dh->id = ids->origID;
+ dh->id = ids->origID;
- uint16_t addRoom = 0;
- DNSResponse dr(&ids->qname, ids->qtype, ids->qclass, &ids->origDest, &ids->origRemote, dh, sizeof(packet), responseLen, false, &ids->sentTime.d_start);
+ uint16_t addRoom = 0;
+ DNSResponse dr(&ids->qname, ids->qtype, ids->qclass, &ids->origDest, &ids->origRemote, dh, sizeof(packet), responseLen, false, &ids->sentTime.d_start);
#ifdef HAVE_PROTOBUF
- dr.uniqueId = ids->uniqueId;
+ dr.uniqueId = ids->uniqueId;
#endif
- if (!processResponse(localRespRulactions, dr, &ids->delayMsec)) {
- continue;
- }
+ if (!processResponse(localRespRulactions, dr, &ids->delayMsec)) {
+ continue;
+ }
#ifdef HAVE_DNSCRYPT
- if (ids->dnsCryptQuery) {
- addRoom = DNSCRYPT_MAX_RESPONSE_PADDING_AND_MAC_SIZE;
- }
+ if (ids->dnsCryptQuery) {
+ addRoom = DNSCRYPT_MAX_RESPONSE_PADDING_AND_MAC_SIZE;
+ }
#endif
- if (!fixUpResponse(&response, &responseLen, &responseSize, ids->qname, ids->origFlags, ids->ednsAdded, ids->ecsAdded, rewrittenResponse, addRoom)) {
- continue;
- }
+ if (!fixUpResponse(&response, &responseLen, &responseSize, ids->qname, ids->origFlags, ids->ednsAdded, ids->ecsAdded, rewrittenResponse, addRoom)) {
+ continue;
+ }
- if (ids->packetCache && !ids->skipCache) {
- ids->packetCache->insert(ids->cacheKey, ids->qname, ids->qtype, ids->qclass, response, responseLen, false, dh->rcode);
- }
+ if (ids->packetCache && !ids->skipCache) {
+ ids->packetCache->insert(ids->cacheKey, ids->qname, ids->qtype, ids->qclass, response, responseLen, false, dh->rcode);
+ }
- if (ids->cs && !ids->cs->muted) {
+ if (ids->cs && !ids->cs->muted) {
#ifdef HAVE_DNSCRYPT
- if (!encryptResponse(response, &responseLen, responseSize, false, ids->dnsCryptQuery, &dh, &dhCopy)) {
- continue;
- }
+ if (!encryptResponse(response, &responseLen, responseSize, false, ids->dnsCryptQuery, &dh, &dhCopy)) {
+ continue;
+ }
#endif
- ComboAddress empty;
- empty.sin4.sin_family = 0;
- /* if ids->destHarvested is false, origDest holds the listening address.
- We don't want to use that as a source since it could be 0.0.0.0 for example. */
- sendUDPResponse(origFD, response, responseLen, ids->delayMsec, ids->destHarvested ? ids->origDest : empty, ids->origRemote);
- }
+ ComboAddress empty;
+ empty.sin4.sin_family = 0;
+ /* if ids->destHarvested is false, origDest holds the listening address.
+ We don't want to use that as a source since it could be 0.0.0.0 for example. */
+ sendUDPResponse(origFD, response, responseLen, ids->delayMsec, ids->destHarvested ? ids->origDest : empty, ids->origRemote);
+ }
- g_stats.responses++;
+ g_stats.responses++;
- double udiff = ids->sentTime.udiff();
- vinfolog("Got answer from %s, relayed to %s, took %f usec", state->remote.toStringWithPort(), ids->origRemote.toStringWithPort(), udiff);
+ double udiff = ids->sentTime.udiff();
+ vinfolog("Got answer from %s, relayed to %s, took %f usec", state->remote.toStringWithPort(), ids->origRemote.toStringWithPort(), udiff);
- {
- struct timespec ts;
- gettime(&ts);
- std::lock_guard<std::mutex> lock(g_rings.respMutex);
- g_rings.respRing.push_back({ts, ids->origRemote, ids->qname, ids->qtype, (unsigned int)udiff, (unsigned int)got, *dh, state->remote});
- }
+ {
+ struct timespec ts;
+ gettime(&ts);
+ std::lock_guard<std::mutex> lock(g_rings.respMutex);
+ g_rings.respRing.push_back({ts, ids->origRemote, ids->qname, ids->qtype, (unsigned int)udiff, (unsigned int)got, *dh, state->remote});
+ }
+
+ if(dh->rcode == RCode::ServFail)
+ g_stats.servfailResponses++;
+
+ state->latencyUsec = (127.0 * state->latencyUsec / 128.0) + udiff/128.0;
- if(dh->rcode == RCode::ServFail)
- g_stats.servfailResponses++;
- state->latencyUsec = (127.0 * state->latencyUsec / 128.0) + udiff/128.0;
+ if(udiff < 1000) g_stats.latency0_1++;
+ else if(udiff < 10000) g_stats.latency1_10++;
+ else if(udiff < 50000) g_stats.latency10_50++;
+ else if(udiff < 100000) g_stats.latency50_100++;
+ else if(udiff < 1000000) g_stats.latency100_1000++;
+ else g_stats.latencySlow++;
- if(udiff < 1000) g_stats.latency0_1++;
- else if(udiff < 10000) g_stats.latency1_10++;
- else if(udiff < 50000) g_stats.latency10_50++;
- else if(udiff < 100000) g_stats.latency50_100++;
- else if(udiff < 1000000) g_stats.latency100_1000++;
- else g_stats.latencySlow++;
-
- doLatencyAverages(udiff);
+ doLatencyAverages(udiff);
- if (ids->origFD == origFD) {
+ if (ids->origFD == origFD) {
#ifdef HAVE_DNSCRYPT
- ids->dnsCryptQuery = 0;
+ ids->dnsCryptQuery = nullptr;
#endif
- ids->origFD = -1;
- outstandingDecreased = false;
- }
+ ids->origFD = -1;
+ outstandingDecreased = false;
+ }
- rewrittenResponse.clear();
+ rewrittenResponse.clear();
+ }
}
catch(const std::exception& e){
vinfolog("Got an error in UDP responder thread while parsing a response from %s, id %d: %s", state->remote.toStringWithPort(), queryId, e.what());
void DownstreamState::reconnect()
{
connected = false;
- if (fd != -1) {
- /* shutdown() is needed to wake up recv() in the responderThread */
- shutdown(fd, SHUT_RDWR);
- close(fd);
- fd = -1;
- }
- if (!IsAnyAddress(remote)) {
- fd = SSocket(remote.sin4.sin_family, SOCK_DGRAM, 0);
- if (!IsAnyAddress(sourceAddr)) {
- SSetsockopt(fd, SOL_SOCKET, SO_REUSEADDR, 1);
- SBind(fd, sourceAddr);
- }
- try {
- SConnect(fd, remote);
- connected = true;
+ for (auto& fd : sockets) {
+ if (fd != -1) {
+ {
+ std::lock_guard<std::mutex> lock(socketsLock);
+ mplexer->removeReadFD(fd);
+ }
+ /* shutdown() is needed to wake up recv() in the responderThread */
+ shutdown(fd, SHUT_RDWR);
+ close(fd);
+ fd = -1;
+ }
+ if (!IsAnyAddress(remote)) {
+ fd = SSocket(remote.sin4.sin_family, SOCK_DGRAM, 0);
+ if (!IsAnyAddress(sourceAddr)) {
+ SSetsockopt(fd, SOL_SOCKET, SO_REUSEADDR, 1);
+ SBind(fd, sourceAddr);
+ }
+ try {
+ SConnect(fd, remote);
+ {
+ std::lock_guard<std::mutex> lock(socketsLock);
+ mplexer->addReadFD(fd, [](int, boost::any) {});
+ }
+ connected = true;
+ }
+ catch(const std::runtime_error& error) {
+ infolog("Error connecting to new server with address %s: %s", remote.toStringWithPort(), error.what());
+ connected = false;
+ break;
+ }
}
- catch(const std::runtime_error& error) {
- infolog("Error connecting to new server with address %s: %s", remote.toStringWithPort(), error.what());
+ }
+
+ /* if at least one (re-)connection failed, close all sockets */
+ if (!connected) {
+ for (auto& fd : sockets) {
+ if (fd != -1) {
+ /* shutdown() is needed to wake up recv() in the responderThread */
+ shutdown(fd, SHUT_RDWR);
+ close(fd);
+ fd = -1;
+ }
}
}
}
-DownstreamState::DownstreamState(const ComboAddress& remote_, const ComboAddress& sourceAddr_, unsigned int sourceItf_): remote(remote_), sourceAddr(sourceAddr_), sourceItf(sourceItf_)
+DownstreamState::DownstreamState(const ComboAddress& remote_, const ComboAddress& sourceAddr_, unsigned int sourceItf_, size_t numberOfSockets): remote(remote_), sourceAddr(sourceAddr_), sourceItf(sourceItf_)
{
+ mplexer = std::unique_ptr<FDMultiplexer>(FDMultiplexer::getMultiplexerSilent());
+
+ sockets.resize(numberOfSockets);
+ for (auto& fd : sockets) {
+ fd = -1;
+ }
+
if (!IsAnyAddress(remote)) {
reconnect();
idStates.resize(g_maxOutstanding);
dh->id = idOffset;
+ int fd = pickBackendSocketForSending(ss);
+
if (largerQuery.empty()) {
- ret = udpClientSendRequestToBackend(ss, ss->fd, query, dq.len);
+ ret = udpClientSendRequestToBackend(ss, fd, query, dq.len);
}
else {
- ret = udpClientSendRequestToBackend(ss, ss->fd, largerQuery.c_str(), largerQuery.size());
+ ret = udpClientSendRequestToBackend(ss, fd, largerQuery.c_str(), largerQuery.size());
largerQuery.clear();
}
warnlog("Marking downstream %s as '%s'", dss->getNameWithAddr(), newState ? "up" : "down");
if (newState && !dss->connected) {
- try {
- SConnect(dss->fd, dss->remote);
- dss->connected = true;
- dss->tid = thread(responderThread, dss);
+ for (auto& fd : dss->sockets) {
+ try {
+ SConnect(fd, dss->remote);
+ {
+ std::lock_guard<std::mutex> lock(dss->socketsLock);
+ dss->mplexer->addReadFD(fd, [](int, boost::any) {});
+ }
+ dss->connected = true;
+ }
+ catch(const std::runtime_error& error) {
+ infolog("Error connecting to new server with address %s: %s", dss->remote.toStringWithPort(), error.what());
+ newState = false;
+ dss->connected = false;
+ }
}
- catch(const std::runtime_error& error) {
- infolog("Error connecting to new server with address %s: %s", dss->remote.toStringWithPort(), error.what());
- newState = false;
- dss->connected = false;
+ if (dss->connected) {
+ dss->tid = thread(responderThread, dss);
}
}
{
/* stdin, stdout, stderr */
size_t requiredFDsCount = 3;
- size_t backendsCount = g_dstates.getCopy().size();
+ const auto backends = g_dstates.getCopy();
+ /* UDP sockets to backends */
+ size_t backendUDPSocketsCount = 0;
+ for (const auto& backend : backends) {
+ backendUDPSocketsCount += backend->sockets.size();
+ }
+ requiredFDsCount += backendUDPSocketsCount;
+ /* TCP sockets to backends */
+ requiredFDsCount += (backends.size() * g_maxTCPClientThreads);
/* listening sockets */
requiredFDsCount += udpBindsCount;
requiredFDsCount += tcpBindsCount;
requiredFDsCount += g_maxTCPClientThreads;
/* max pipes for communicating between TCP acceptors and client threads */
requiredFDsCount += (g_maxTCPClientThreads * 2);
- /* UDP sockets to backends */
- requiredFDsCount += backendsCount;
- /* TCP sockets to backends */
- requiredFDsCount += (backendsCount * g_maxTCPClientThreads);
/* max TCP queued connections */
requiredFDsCount += g_maxTCPQueuedConnections;
/* DelayPipe pipe */
#include "ext/luawrapper/include/LuaContext.hpp"
#include <time.h>
#include "misc.hh"
+#include "mplexer.hh"
#include "iputils.hh"
#include "dnsname.hh"
#include <atomic>
struct DownstreamState
{
- DownstreamState(const ComboAddress& remote_, const ComboAddress& sourceAddr_, unsigned int sourceItf);
- DownstreamState(const ComboAddress& remote_): DownstreamState(remote_, ComboAddress(), 0) {}
+ DownstreamState(const ComboAddress& remote_, const ComboAddress& sourceAddr_, unsigned int sourceItf, size_t numberOfSockets);
+ DownstreamState(const ComboAddress& remote_): DownstreamState(remote_, ComboAddress(), 0, 1) {}
~DownstreamState()
{
- if (fd >= 0)
- close(fd);
+ for (auto& fd : sockets) {
+ if (fd >= 0) {
+ close(fd);
+ fd = -1;
+ }
+ }
}
- int fd{-1};
+ std::vector<int> sockets;
+ std::mutex socketsLock;
+ std::unique_ptr<FDMultiplexer> mplexer{nullptr};
std::thread tid;
ComboAddress remote;
QPSLimiter qps;
std::atomic<uint64_t> queries{0};
} prev;
string name;
+ size_t socketsOffset{0};
double queryLoad{0.0};
double dropRate{0.0};
double latencyUsec{0.0};
bpf-filter.main.ebpf \
bpf-filter.qname.ebpf \
bpf-filter.ebpf.src \
- DNSDIST-MIB.txt
+ DNSDIST-MIB.txt \
+ devpollmplexer.cc \
+ epollmplexer.cc \
+ kqueuemplexer.cc \
+ portsmplexer.cc
bin_PROGRAMS = dnsdist
iputils.cc iputils.hh \
lock.hh \
misc.cc misc.hh \
+ mplexer.hh \
htmlfiles.h \
namespaces.hh \
pdnsexception.hh \
+ pollmplexer.cc \
protobuf.cc protobuf.hh \
qtype.cc qtype.hh \
remote_logger.cc remote_logger.hh \
endif
endif
+if HAVE_FREEBSD
+dnsdist_SOURCES += kqueuemplexer.cc
+endif
+
+if HAVE_LINUX
+dnsdist_SOURCES += epollmplexer.cc
+endif
+
+if HAVE_SOLARIS
+dnsdist_SOURCES += \
+ devpollmplexer.cc \
+ portsmplexer.cc
+endif
+
testrunner_SOURCES = \
base64.hh \
dns.hh \
--- /dev/null
+../devpollmplexer.cc
\ No newline at end of file
-- "address", e.g. "192.0.2.2"
-- "interface name", e.g. "eth0"
-- "address@interface", e.g. "192.0.2.2@eth0"
+ sockets=NUM -- Number of sockets (and thus source ports) used toward the backend server, defaults to a single one
})
:param str server_string: A simple IP:PORT string.
--- /dev/null
+../epollmplexer.cc
\ No newline at end of file
--- /dev/null
+../kqueuemplexer.cc
\ No newline at end of file
--- /dev/null
+../mplexer.hh
\ No newline at end of file
--- /dev/null
+../pollmplexer.cc
\ No newline at end of file
--- /dev/null
+../portsmplexer.cc
\ No newline at end of file
#include <sys/epoll.h>
#endif
-#include "namespaces.hh"
#include "namespaces.hh"
class EpollFDMultiplexer : public FDMultiplexer
close(d_epollfd);
}
- virtual int run(struct timeval* tv);
+ virtual int run(struct timeval* tv) override;
+ virtual void getAvailableFDs(std::vector<int>& fds, int timeout) override;
- virtual void addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, const funcparam_t& parameter);
- virtual void removeFD(callbackmap_t& cbmap, int fd);
- string getName()
+ virtual void addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, const funcparam_t& parameter) override;
+ virtual void removeFD(callbackmap_t& cbmap, int fd) override;
+ string getName() const override
{
return "epoll";
}
}
} doItEpoll;
-
int EpollFDMultiplexer::s_maxevents=1024;
+
EpollFDMultiplexer::EpollFDMultiplexer() : d_eevents(new epoll_event[s_maxevents])
{
d_epollfd=epoll_create(s_maxevents); // not hard max
throw FDMultiplexerException("Removing fd from epoll set: "+stringerror());
}
+void EpollFDMultiplexer::getAvailableFDs(std::vector<int>& fds, int timeout)
+{
+ int ret=epoll_wait(d_epollfd, d_eevents.get(), s_maxevents, timeout);
+
+ if(ret < 0 && errno!=EINTR)
+ throw FDMultiplexerException("epoll returned error: "+stringerror());
+
+ for(int n=0; n < ret; ++n) {
+ fds.push_back(d_eevents[n].data.fd);
+ }
+}
+
int EpollFDMultiplexer::run(struct timeval* now)
{
if(d_inrun) {
int ret=epoll_wait(d_epollfd, d_eevents.get(), s_maxevents, 500);
gettimeofday(now,0); // MANDATORY
-
+
if(ret < 0 && errno!=EINTR)
throw FDMultiplexerException("epoll returned error: "+stringerror());
#include <iostream>
#include <unistd.h>
#include "misc.hh"
-#include "syncres.hh"
#include <sys/types.h>
#if defined(__FreeBSD__) || defined(__FreeBSD_kernel__)
#include <sys/event.h>
#endif
#include <sys/time.h>
-#include "namespaces.hh"
#include "namespaces.hh"
class KqueueFDMultiplexer : public FDMultiplexer
close(d_kqueuefd);
}
- virtual int run(struct timeval* tv);
+ virtual int run(struct timeval* tv) override;
+ virtual void getAvailableFDs(std::vector<int>& fds, int timeout) override;
- virtual void addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, const boost::any& parameter);
- virtual void removeFD(callbackmap_t& cbmap, int fd);
- string getName()
+ virtual void addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, const boost::any& parameter) override;
+ virtual void removeFD(callbackmap_t& cbmap, int fd) override;
+ string getName() const override
{
return "kqueue";
}
struct kevent kqevent;
EV_SET(&kqevent, fd, (&cbmap == &d_readCallbacks) ? EVFILT_READ : EVFILT_WRITE, EV_ADD, 0,0,0);
-
+
if(kevent(d_kqueuefd, &kqevent, 1, 0, 0, 0) < 0) {
cbmap.erase(fd);
throw FDMultiplexerException("Adding fd to kqueue set: "+stringerror());
throw FDMultiplexerException("Removing fd from kqueue set: "+stringerror());
}
+void KqueueFDMultiplexer::getAvailableFDs(std::vector<int>& fds, int timeout)
+{
+ struct timespec ts;
+ ts.tv_sec=timeout/1000;
+ ts.tv_nsec=(timeout % 1000) * 1000000;
+
+ int ret = kevent(d_kqueuefd, 0, 0, d_kevents.get(), s_maxevents, &ts);
+
+ if(ret < 0 && errno != EINTR)
+ throw FDMultiplexerException("kqueue returned error: "+stringerror());
+
+ for(int n=0; n < ret; ++n) {
+ fds.push_back(d_kevents[n].ident);
+ }
+}
+
int KqueueFDMultiplexer::run(struct timeval* now)
{
if(d_inrun) {
int ret=kevent(d_kqueuefd, 0, 0, d_kevents.get(), s_maxevents, &ts);
gettimeofday(now,0); // MANDATORY!
-
+
if(ret < 0 && errno!=EINTR)
throw FDMultiplexerException("kqueue returned error: "+stringerror());
#include <map>
#include <stdexcept>
#include <string>
-#include "utility.hh"
#include <sys/time.h>
class FDMultiplexerException : public std::runtime_error
virtual ~FDMultiplexer()
{}
+ static FDMultiplexer* getMultiplexerSilent();
+
virtual int run(struct timeval* tv) = 0;
+ /* timeout is in ms, 0 will return immediatly, -1 will block until at least one FD is ready */
+ virtual void getAvailableFDs(std::vector<int>& fds, int timeout) = 0;
+
//! Add an fd to the read watch list - currently an fd can only be on one list at a time!
virtual void addReadFD(int fd, callbackfunc_t toDo, const funcparam_t& parameter=funcparam_t())
{
return theMap;
}
- virtual std::string getName() = 0;
+ virtual std::string getName() const = 0;
protected:
}
};
-class SelectFDMultiplexer : public FDMultiplexer
-{
-public:
- SelectFDMultiplexer()
- {}
- virtual ~SelectFDMultiplexer()
- {}
-
- virtual int run(struct timeval* tv);
-
- virtual void addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, const funcparam_t& parameter);
- virtual void removeFD(callbackmap_t& cbmap, int fd);
- std::string getName()
- {
- return "select";
- }
-};
-
#endif
#include <iostream>
#include <poll.h>
#include "misc.hh"
-#include "syncres.hh"
-#include "utility.hh"
-#include "namespaces.hh"
#include "namespaces.hh"
+FDMultiplexer* FDMultiplexer::getMultiplexerSilent()
+{
+ FDMultiplexer* ret = nullptr;
+ for(const auto& i : FDMultiplexer::getMultiplexerMap()) {
+ try {
+ ret = i.second();
+ return ret;
+ }
+ catch(const FDMultiplexerException& fe) {
+ }
+ catch(...) {
+ }
+ }
+ return ret;
+}
+
+
+class PollFDMultiplexer : public FDMultiplexer
+{
+public:
+ PollFDMultiplexer()
+ {}
+ virtual ~PollFDMultiplexer()
+ {
+ }
+
+ virtual int run(struct timeval* tv) override;
+ virtual void getAvailableFDs(std::vector<int>& fds, int timeout) override;
+
+ virtual void addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, const funcparam_t& parameter) override;
+ virtual void removeFD(callbackmap_t& cbmap, int fd) override;
+
+ string getName() const override
+ {
+ return "poll";
+ }
+private:
+ vector<struct pollfd> preparePollFD() const;
+};
static FDMultiplexer* make()
{
throw FDMultiplexerException("Tried to remove unlisted fd "+std::to_string(fd)+ " from multiplexer");
}
-bool pollfdcomp(const struct pollfd& a, const struct pollfd& b)
+vector<struct pollfd> PollFDMultiplexer::preparePollFD() const
{
- return a.fd < b.fd;
-}
-
-int PollFDMultiplexer::run(struct timeval* now)
-{
- if(d_inrun) {
- throw FDMultiplexerException("FDMultiplexer::run() is not reentrant!\n");
- }
-
vector<struct pollfd> pollfds;
-
+ pollfds.reserve(d_readCallbacks.size() + d_writeCallbacks.size());
+
struct pollfd pollfd;
- for(callbackmap_t::const_iterator i=d_readCallbacks.begin(); i != d_readCallbacks.end(); ++i) {
- pollfd.fd = i->first;
+ for(const auto& cb : d_readCallbacks) {
+ pollfd.fd = cb.first;
pollfd.events = POLLIN;
pollfds.push_back(pollfd);
}
- for(callbackmap_t::const_iterator i=d_writeCallbacks.begin(); i != d_writeCallbacks.end(); ++i) {
- pollfd.fd = i->first;
+ for(const auto& cb : d_writeCallbacks) {
+ pollfd.fd = cb.first;
pollfd.events = POLLOUT;
pollfds.push_back(pollfd);
}
+ return pollfds;
+}
+
+void PollFDMultiplexer::getAvailableFDs(std::vector<int>& fds, int timeout)
+{
+ auto pollfds = preparePollFD();
+ int ret = poll(&pollfds[0], pollfds.size(), timeout);
+
+ if (ret < 0 && errno != EINTR)
+ throw FDMultiplexerException("poll returned error: " + stringerror());
+
+ for(const auto& pollfd : pollfds) {
+ if (pollfd.revents == POLLIN || pollfd.revents == POLLOUT) {
+ fds.push_back(pollfd.fd);
+ }
+ }
+}
+
+int PollFDMultiplexer::run(struct timeval* now)
+{
+ if(d_inrun) {
+ throw FDMultiplexerException("FDMultiplexer::run() is not reentrant!\n");
+ }
+
+ auto pollfds = preparePollFD();
+
int ret=poll(&pollfds[0], pollfds.size(), 500);
- Utility::gettimeofday(now, 0); // MANDATORY!
+ gettimeofday(now, 0); // MANDATORY!
if(ret < 0 && errno!=EINTR)
throw FDMultiplexerException("poll returned error: "+stringerror());
d_iter=d_readCallbacks.end();
d_inrun=true;
-
- for(unsigned int n = 0; n < pollfds.size(); ++n) {
- if(pollfds[n].revents == POLLIN) {
- d_iter=d_readCallbacks.find(pollfds[n].fd);
+
+ for(const auto& pollfd : pollfds) {
+ if(pollfd.revents == POLLIN) {
+ d_iter=d_readCallbacks.find(pollfd.fd);
if(d_iter != d_readCallbacks.end()) {
d_iter->second.d_callback(d_iter->first, d_iter->second.d_parameter);
continue; // so we don't refind ourselves as writable!
}
}
- else if(pollfds[n].revents == POLLOUT) {
- d_iter=d_writeCallbacks.find(pollfds[n].fd);
+ else if(pollfd.revents == POLLOUT) {
+ d_iter=d_writeCallbacks.find(pollfd.fd);
if(d_iter != d_writeCallbacks.end()) {
d_iter->second.d_callback(d_iter->first, d_iter->second.d_parameter);
#include <iostream>
#include "misc.hh"
-#include "syncres.hh"
-#include "namespaces.hh"
#include "namespaces.hh"
class PortsFDMultiplexer : public FDMultiplexer
+++ /dev/null
-#ifdef HAVE_CONFIG_H
-#include "config.h"
-#endif
-#include "mplexer.hh"
-#include "sstuff.hh"
-#include <iostream>
-#include "misc.hh"
-#include "utility.hh"
-
-
-#include "namespaces.hh"
-#include "namespaces.hh"
-
-static FDMultiplexer* make()
-{
- return new SelectFDMultiplexer();
-}
-
-static struct RegisterOurselves
-{
- RegisterOurselves() {
- FDMultiplexer::getMultiplexerMap().insert(make_pair(1, &make));
- }
-} doIt;
-
-void SelectFDMultiplexer::addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, const boost::any& parameter)
-{
- Callback cb;
- cb.d_callback=toDo;
- cb.d_parameter=parameter;
- memset(&cb.d_ttd, 0, sizeof(cb.d_ttd));
- if(cbmap.count(fd))
- throw FDMultiplexerException("Tried to add fd "+std::to_string(fd)+ " to multiplexer twice");
- cbmap[fd]=cb;
-}
-
-void SelectFDMultiplexer::removeFD(callbackmap_t& cbmap, int fd)
-{
- if(d_inrun && d_iter->first==fd) // trying to remove us!
- d_iter++;
-
- if(!cbmap.erase(fd))
- throw FDMultiplexerException("Tried to remove unlisted fd "+std::to_string(fd)+ " from multiplexer");
-}
-
-int SelectFDMultiplexer::run(struct timeval* now)
-{
- if(d_inrun) {
- throw FDMultiplexerException("FDMultiplexer::run() is not reentrant!\n");
- }
- fd_set readfds, writefds;
- FD_ZERO(&readfds);
- FD_ZERO(&writefds);
-
- int fdmax=0;
-
- for(callbackmap_t::const_iterator i=d_readCallbacks.begin(); i != d_readCallbacks.end(); ++i) {
- FD_SET(i->first, &readfds);
- fdmax=max(i->first, fdmax);
- }
-
- for(callbackmap_t::const_iterator i=d_writeCallbacks.begin(); i != d_writeCallbacks.end(); ++i) {
- FD_SET(i->first, &writefds);
- fdmax=max(i->first, fdmax);
- }
-
- struct timeval tv={0,500000};
- int ret=select(fdmax + 1, &readfds, &writefds, 0, &tv);
- Utility::gettimeofday(now, 0); // MANDATORY!
-
- if(ret < 0 && errno!=EINTR)
- throw FDMultiplexerException("select returned error: "+stringerror());
-
- if(ret < 1) // nothing - thanks AB
- return 0;
-
- d_iter=d_readCallbacks.end();
- d_inrun=true;
-
- for(callbackmap_t::iterator i=d_readCallbacks.begin(); i != d_readCallbacks.end() && i->first <= fdmax; ) {
- d_iter=i++;
-
- if(FD_ISSET(d_iter->first, &readfds)) {
- d_iter->second.d_callback(d_iter->first, d_iter->second.d_parameter);
- continue; // so we don't refind ourselves as writable
- }
- }
-
- for(callbackmap_t::iterator i=d_writeCallbacks.begin(); i != d_writeCallbacks.end() && i->first <= fdmax; ) {
- d_iter=i++;
- if(FD_ISSET(d_iter->first, &writefds)) {
- d_iter->second.d_callback(d_iter->first, d_iter->second.d_parameter);
- }
- }
-
- d_inrun=false;
- return 0;
-}
-
-#if 0
-
-void acceptData(int fd, boost::any& parameter)
-{
- cout<<"Have data on fd "<<fd<<endl;
- Socket* sock=boost::any_cast<Socket*>(parameter);
- string packet;
- IPEndpoint rem;
- sock->recvFrom(packet, rem);
- cout<<"Received "<<packet.size()<<" bytes!\n";
-}
-
-
-int main()
-{
- Socket s(AF_INET, SOCK_DGRAM);
-
- IPEndpoint loc("0.0.0.0", 2000);
- s.bind(loc);
-
- SelectFDMultiplexer sfm;
-
- sfm.addReadFD(s.getHandle(), &acceptData, &s);
-
- for(int n=0; n < 100 ; ++n) {
- sfm.run();
- }
- sfm.removeReadFD(s.getHandle());
- sfm.removeReadFD(s.getHandle());
-}
-#endif
-