iputils.cc \
misc.cc misc.hh \
qtype.cc \
+ sholder.hh \
sodcrypto.cc sodcrypto.hh \
sstuff.hh
if(qps) {
ret->qps=QPSLimiter(*qps, *qps);
}
- g_dstates.push_back(ret);
+ g_dstates.modify([ret](servers_t& servers) {
+ servers.push_back(ret);
+ std::stable_sort(servers.begin(), servers.end(), [](const decltype(ret)& a, const decltype(ret)& b) {
+ return a->order < b->order;
+ });
+
+ });
if(g_launchWork) {
g_launchWork->push_back([ret]() {
ret->tid = move(thread(responderThread, ret));
}
- g_dstates.push_back(ret);
- std::stable_sort(g_dstates.begin(), g_dstates.end(), [](const decltype(ret)& a, const decltype(ret)& b) {
+ auto states = g_dstates.getCopy();
+ states->push_back(ret);
+ std::stable_sort(states->begin(), states->end(), [](const decltype(ret)& a, const decltype(ret)& b) {
return a->order < b->order;
});
+ g_dstates.setState(states);
return ret;
} );
g_lua.writeFunction("rmServer",
[](boost::variant<std::shared_ptr<DownstreamState>, int> var)
{
+ auto states = g_dstates.getCopy();
if(auto* rem = boost::get<shared_ptr<DownstreamState>>(&var))
- g_dstates.erase(remove(g_dstates.begin(), g_dstates.end(), *rem), g_dstates.end());
+ states->erase(remove(states->begin(), states->end(), *rem), states->end());
else
- g_dstates.erase(g_dstates.begin() + boost::get<int>(var));
+ states->erase(states->begin() + boost::get<int>(var));
+ g_dstates.setState(states);
} );
g_lua.writeFunction("setServerPolicy", [](ServerPolicy policy) {
- g_policy=policy;
+ g_policy.setState(std::make_shared<ServerPolicy>(policy));
});
g_lua.writeFunction("setServerPolicyLua", [](string name, policy_t policy) {
- g_policy=ServerPolicy{name, policy};
+ g_policy.setState(std::make_shared<ServerPolicy>(ServerPolicy{name, policy}));
});
g_lua.writeFunction("showServerPolicy", []() {
- g_outputBuffer=g_policy.name+"\n";
+ g_outputBuffer=g_policy.getLocal()->name+"\n";
});
return;
try {
ComboAddress loc(addr, 53);
- g_locals.push_back(loc);
+ g_locals.push_back(loc); /// only works pre-startup, so no sync necessary
}
catch(std::exception& e) {
g_outputBuffer="Error: "+string(e.what())+"\n";
g_lua.writeFunction("shutdown", []() { _exit(0);} );
- g_lua.writeFunction("addDomainBlock", [](const std::string& domain) { g_suffixMatchNodeFilter.add(DNSName(domain)); });
+ g_lua.writeFunction("addDomainBlock", [](const std::string& domain) {
+ g_suffixMatchNodeFilter.modify([domain](SuffixMatchNode& smn) {
+ smn.add(DNSName(domain));
+ });
+ });
g_lua.writeFunction("showServers", []() {
try {
ostringstream ret;
uint64_t totQPS{0}, totQueries{0}, totDrops{0};
int counter=0;
- for(auto& s : g_dstates) {
+ auto states = g_dstates.getCopy();
+ for(const auto& s : *states) {
string status;
if(s->availability == DownstreamState::Availability::Up)
status = "UP";
}
}
if(nmg.empty())
- g_poolrules.push_back({smn, pool});
+ g_poolrules.modify([smn, pool](decltype(g_poolrules)::value_type& poolrules) {
+ poolrules.push_back({smn, pool});
+ });
else
- g_poolrules.push_back({nmg, pool});
+ g_poolrules.modify([nmg,pool](decltype(g_poolrules)::value_type& poolrules) {
+ poolrules.push_back({nmg, pool});
+ });
});
boost::format fmt("%-3d %-50s %s\n");
g_outputBuffer += (fmt % "#" % "Object" % "Pool").str();
int num=0;
- for(const auto& lim : g_poolrules) {
+ for(const auto& lim : *g_poolrules.getCopy()) {
string name;
if(auto nmg=boost::get<NetmaskGroup>(&lim.first)) {
name=nmg->toString();
}
}
if(nmg.empty())
- g_limiters.push_back({smn, QPSLimiter(lim, lim)});
+ g_limiters.modify([smn, lim](decltype(g_limiters)::value_type& limiters) {
+ limiters.push_back({smn, QPSLimiter(lim, lim)});
+ });
else
- g_limiters.push_back({nmg, QPSLimiter(lim, lim)});
+ g_limiters.modify([nmg, lim](decltype(g_limiters)::value_type& limiters) {
+ limiters.push_back({nmg, QPSLimiter(lim, lim)});
+ });
});
g_lua.writeFunction("rmQPSLimit", [](int i) {
- g_limiters.erase(g_limiters.begin() + i);
+ g_limiters.modify([i](decltype(g_limiters)::value_type& limiters) {
+ limiters.erase(limiters.begin() + i);
+ });
});
g_lua.writeFunction("showQPSLimits", []() {
boost::format fmt("%-3d %-50s %7d %8d %8d\n");
g_outputBuffer += (fmt % "#" % "Object" % "Lim" % "Passed" % "Blocked").str();
int num=0;
- for(const auto& lim : g_limiters) {
+ for(const auto& lim : *g_limiters.getCopy()) {
string name;
if(auto nmg=boost::get<NetmaskGroup>(&lim.first)) {
name=nmg->toString();
g_lua.writeFunction("getServers", []() {
vector<pair<int, std::shared_ptr<DownstreamState> > > ret;
int count=1;
- for(auto& s : g_dstates) {
+ for(const auto& s : *g_dstates.getCopy()) {
ret.push_back(make_pair(count++, s));
}
return ret;
});
- g_lua.writeFunction("getServer", [](int i) { return g_dstates[i]; });
+ g_lua.writeFunction("getServer", [](int i) { return (*g_dstates.getCopy())[i]; });
g_lua.registerFunction<void(DownstreamState::*)(int)>("setQPS", [](DownstreamState& s, int lim) { s.qps = lim ? QPSLimiter(lim, lim) : QPSLimiter(); });
g_lua.registerFunction<void(DownstreamState::*)(string)>("addPool", [](DownstreamState& s, string pool) { s.pools.insert(pool);});
}
});
+
+ // something needs to be done about this, unlocked will 'mostly' work
g_lua.writeFunction("getTopQueries", [](unsigned int top, boost::optional<int> labels) {
map<DNSName, int> counts;
unsigned int total=0;
#undef L
/* Known sins:
- We replace g_ACL w/o locking, might crash
- g_policy too probably
No centralized statistics
We neglect to do recvfromto() on 0.0.0.0
Receiver is currently singlethreaded (not that bad actually)
- We can't compile w/o crypto
lack of help()
- we offer now way to log from Lua
+ we offer no way to log from Lua
*/
namespace po = boost::program_options;
If all downstreams are over QPS, we pick the fastest server */
-vector<pair<boost::variant<SuffixMatchNode,NetmaskGroup>, QPSLimiter> > g_limiters;
-vector<pair<boost::variant<SuffixMatchNode,NetmaskGroup>, string> > g_poolrules;
+GlobalStateHolder<vector<pair<boost::variant<SuffixMatchNode,NetmaskGroup>, QPSLimiter> > > g_limiters;
+GlobalStateHolder<vector<pair<boost::variant<SuffixMatchNode,NetmaskGroup>, string> > > g_poolrules;
Rings g_rings;
-servers_t g_dstates;
+GlobalStateHolder<servers_t> g_dstates;
// listens on a dedicated socket, lobs answers from downstream servers to original requestors
void* responderThread(std::shared_ptr<DownstreamState> state)
LuaContext g_lua;
-ServerPolicy g_policy;
+GlobalStateHolder<ServerPolicy> g_policy;
shared_ptr<DownstreamState> firstAvailable(const servers_t& servers, const ComboAddress& remote, const DNSName& qname, uint16_t qtype, dnsheader* dh)
{
}
static int counter=0;
++counter;
- if(g_dstates.empty())
+ if(servers.empty())
return shared_ptr<DownstreamState>();
- return g_dstates[counter % g_dstates.size()];
+ return servers[counter % servers.size()];
}
shared_ptr<DownstreamState> leastOutstanding(const servers_t& servers, const ComboAddress& remote, const DNSName& qname, uint16_t qtype, dnsheader* dh)
}
}
- auto *res=&poss;
+ const auto *res=&poss;
if(poss.empty())
- res = &g_dstates;
+ res = &servers;
if(res->empty())
return shared_ptr<DownstreamState>();
}
}
-SuffixMatchNode g_suffixMatchNodeFilter;
+GlobalStateHolder<SuffixMatchNode> g_suffixMatchNodeFilter;
ComboAddress g_serverControl{"127.0.0.1:5199"};
servers_t getDownstreamCandidates(const std::string& pool)
{
servers_t ret;
- for(auto& s : g_dstates)
+ for(const auto& s : *g_dstates.getCopy())
if((pool.empty() && s->pools.empty()) || s->pools.count(pool))
ret.push_back(s);
blockFilter = *candidate;
}
auto acl = g_ACL.getLocal();
+ auto localPolicy = g_policy.getLocal();
+ auto localLimiters = g_limiters.getLocal();
+ auto localPool = g_poolrules.getLocal();
+ auto localMatchNodeFilter = g_suffixMatchNodeFilter.getLocal();
for(;;) {
try {
len = recvfrom(cs->udpFD, packet, sizeof(packet), 0, (struct sockaddr*) &remote, &socklen);
g_rings.queryRing.push_back(qname);
bool blocked=false;
- for(auto& lim : g_limiters) {
+ for(const auto& lim : *localLimiters) {
if(auto nmg=boost::get<NetmaskGroup>(&lim.first)) {
if(nmg->match(remote) && !lim.second.check()) {
blocked=true;
continue;
}
- if(g_suffixMatchNodeFilter.check(qname))
+ if(localMatchNodeFilter->check(qname))
continue;
if(re && re->match(qname.toString())) {
}
string pool;
- for(auto& pr : g_poolrules) {
+ for(const auto& pr : *localPool) {
if(auto nmg=boost::get<NetmaskGroup>(&pr.first)) {
if(nmg->match(remote)) {
pool=pr.second;
}
}
DownstreamState* ss = 0;
+ auto candidates=getDownstreamCandidates(pool);
+ auto policy=localPolicy->policy;
{
std::lock_guard<std::mutex> lock(g_luamutex);
- auto candidates=getDownstreamCandidates(pool);
- ss = g_policy.policy(candidates, remote, qname, qtype, dh).get();
+ ss = policy(candidates, remote, qname, qtype, dh).get();
}
if(!ss)
int getTCPDownstream(DownstreamState** ds, const ComboAddress& remote, const DNSName& qname, uint16_t qtype, dnsheader* dh)
{
+ auto policy=g_policy.getCopy()->policy;
+
{
std::lock_guard<std::mutex> lock(g_luamutex);
- *ds = g_policy.policy(g_dstates, remote, qname, qtype, dh).get();
+ *ds = policy(*g_dstates.getCopy(), remote, qname, qtype, dh).get(); // XXX I think this misses pool selection!
}
vinfolog("TCP connecting to downstream %s", (*ds)->remote.toStringWithPort());
if(g_tcpclientthreads.d_queued > 1 && g_tcpclientthreads.d_numthreads < 10)
g_tcpclientthreads.addTCPClientThread();
- for(auto& dss : g_dstates) {
+ for(auto& dss : *(g_dstates.getCopy())) { // this points to the actual shared_ptrs!
if(dss->availability==DownstreamState::Availability::Auto) {
bool newState=upCheck(dss->remote);
if(newState != dss->upStatus) {
ServerPolicy leastOutstandingPol{"leastOutstanding", leastOutstanding};
- g_policy = leastOutstandingPol;
-
-
+ g_policy.setState(std::make_shared<ServerPolicy>(leastOutstandingPol));
if(g_vm.count("client") || g_vm.count("command")) {
setupLua(true);
doClient(g_serverControl);
for(const auto& address : g_vm["remotes"].as<vector<string>>()) {
auto ret=std::make_shared<DownstreamState>(ComboAddress(address, 53));
ret->tid = move(thread(responderThread, ret));
- g_dstates.push_back(ret);
+ g_dstates.modify([ret](servers_t& servers) { servers.push_back(ret); });
}
}
- for(auto& dss : g_dstates) {
+ for(auto& dss : *g_dstates.getCopy()) {
if(dss->availability==DownstreamState::Availability::Auto) {
bool newState=upCheck(dss->remote);
warnlog("Marking downstream %s as '%s'", dss->remote.toStringWithPort(), newState ? "up" : "down");
#include <boost/program_options.hpp>
#include <mutex>
#include <thread>
+#include "sholder.hh"
-template<typename T> class GlobalStateHolder;
-
-template<typename T>
-class LocalStateHolder
-{
-public:
- explicit LocalStateHolder(GlobalStateHolder<T>* source) : d_source(source)
- {}
-
- const T* operator->()
- {
- if(d_source->getGeneration() != d_generation) {
- d_source->getState(&d_state, & d_generation);
- }
-
- return d_state.get();
- }
-
- void reset()
- {
- d_generation=0;
- d_state.reset();
- }
-private:
- std::shared_ptr<T> d_state;
- unsigned int d_generation;
- const GlobalStateHolder<T>* d_source;
-};
-
-template<typename T>
-class GlobalStateHolder
-{
-public:
- GlobalStateHolder(){}
- LocalStateHolder<T> getLocal()
- {
- return LocalStateHolder<T>(this);
- }
- void setState(std::shared_ptr<T> state)
- {
- std::lock_guard<std::mutex> l(d_lock);
- d_state = state;
- d_generation++;
- }
- unsigned int getGeneration() const
- {
- return d_generation;
- }
- void getState(std::shared_ptr<T>* state, unsigned int* generation) const
- {
- std::lock_guard<std::mutex> l(d_lock);
- *state=d_state;
- *generation = d_generation;
- }
- std::shared_ptr<T> getCopy() const
- {
- std::lock_guard<std::mutex> l(d_lock);
- if(!d_state)
- return std::make_shared<T>();
- shared_ptr<T> ret = shared_ptr<T>(new T(*d_state));
- return d_state;
- }
-private:
- mutable std::mutex d_lock;
- std::shared_ptr<T> d_state;
- std::atomic<unsigned int> d_generation{0};
-};
struct StopWatch
{
return d_blocked;
}
- bool check()
+ bool check() const // this is not quite fair
{
if(d_passthrough)
return true;
bool d_passthrough{true};
unsigned int d_rate;
unsigned int d_burst;
- double d_tokens;
- StopWatch d_prev;
- unsigned int d_passed{0};
- unsigned int d_blocked{0};
+ mutable double d_tokens;
+ mutable StopWatch d_prev;
+ mutable unsigned int d_passed{0};
+ mutable unsigned int d_blocked{0};
};
std::mutex respMutex;
};
-extern Rings g_rings;
+extern Rings g_rings; // XXX locking for this is still substandard, queryRing and clientRing need RW lock
struct DownstreamState
{
void* responderThread(std::shared_ptr<DownstreamState> state);
extern std::mutex g_luamutex;
extern LuaContext g_lua;
-extern ServerPolicy g_policy;
-extern servers_t g_dstates;
-extern std::string g_outputBuffer;
-extern std::vector<ComboAddress> g_locals;
-struct dnsheader;
-std::shared_ptr<DownstreamState> firstAvailable(const servers_t& servers, const ComboAddress& remote, const DNSName& qname, uint16_t qtype, dnsheader* dh);
-std::shared_ptr<DownstreamState> leastOutstanding(const servers_t& servers, const ComboAddress& remote, const DNSName& qname, uint16_t qtype, dnsheader* dh);
-std::shared_ptr<DownstreamState> wrandom(const servers_t& servers, const ComboAddress& remote, const DNSName& qname, uint16_t qtype, dnsheader* dh);
-std::shared_ptr<DownstreamState> roundrobin(const servers_t& servers, const ComboAddress& remote, const DNSName& qname, uint16_t qtype, dnsheader* dh);
-extern vector<pair<boost::variant<SuffixMatchNode,NetmaskGroup>, QPSLimiter> > g_limiters;
-extern vector<pair<boost::variant<SuffixMatchNode,NetmaskGroup>, std::string> > g_poolrules;
-extern SuffixMatchNode g_suffixMatchNodeFilter;
+extern std::string g_outputBuffer; // locking for this is ok, as locked by g_luamutex
-extern ComboAddress g_serverControl;
-void controlThread(int fd, ComboAddress local);
+extern GlobalStateHolder<ServerPolicy> g_policy;
+extern GlobalStateHolder<servers_t> g_dstates;
+extern GlobalStateHolder<vector<pair<boost::variant<SuffixMatchNode,NetmaskGroup>, QPSLimiter> >> g_limiters;
+extern GlobalStateHolder<vector<pair<boost::variant<SuffixMatchNode,NetmaskGroup>, std::string> >> g_poolrules;
+extern GlobalStateHolder<SuffixMatchNode> g_suffixMatchNodeFilter;
extern GlobalStateHolder<NetmaskGroup> g_ACL;
+extern ComboAddress g_serverControl; // not changed during runtime
+
+extern std::vector<ComboAddress> g_locals; // not changed at runtime
+extern std::string g_key; // in theory needs locking
+
+struct dnsheader;
+
+void controlThread(int fd, ComboAddress local);
vector<std::function<void(void)>> setupLua(bool client);
-extern std::string g_key;
+
+
namespace po = boost::program_options;
extern po::variables_map g_vm;
+
+std::shared_ptr<DownstreamState> firstAvailable(const servers_t& servers, const ComboAddress& remote, const DNSName& qname, uint16_t qtype, dnsheader* dh);
+std::shared_ptr<DownstreamState> leastOutstanding(const servers_t& servers, const ComboAddress& remote, const DNSName& qname, uint16_t qtype, dnsheader* dh);
+std::shared_ptr<DownstreamState> wrandom(const servers_t& servers, const ComboAddress& remote, const DNSName& qname, uint16_t qtype, dnsheader* dh);
+std::shared_ptr<DownstreamState> roundrobin(const servers_t& servers, const ComboAddress& remote, const DNSName& qname, uint16_t qtype, dnsheader* dh);
namespaces.hh \
pdnsexception.hh \
qtype.cc qtype.hh \
+ sholder.hh \
sodcrypto.cc sodcrypto.hh \
sstuff.hh pdns/ext/luawrapper/include/LuaContext.hpp
ln -fs ../base32.hh ../base64.hh ../dnsdist.cc ../dnsdist.hh ../dnsdist-lua.cc ../dns.hh \
../dnslabeltext.{cc,rl} ../dnsname.cc ../dnsname.hh ../dnsparser.hh ../dnswriter.cc ../dnswriter.hh \
../dolog.hh ../iputils.cc ../iputils.hh ../misc.cc ../misc.hh ../namespaces.hh \
-../pdnsexception.hh ../qtype.cc ../qtype.hh ../sodcrypto.cc ../sodcrypto.hh ../sstuff.hh .
+../pdnsexception.hh ../qtype.cc ../qtype.hh ../sholder.hh ../sodcrypto.cc ../sodcrypto.hh ../sstuff.hh .
ln -fs ../dnsdistconf.lua .
mkdir -p pdns/ext/luawrapper/include
--- /dev/null
+#include <memory>
+#include <atomic>
+
+/** This is sort of a light-weight RCU idea.
+ Suitable for when you frequently consult some "readonly" state, which infrequently
+ gets changed. One way of dealing with this is fully locking access to the state, but
+ this is rather wasteful.
+
+ Instead, in the code below, the frequent users of the state get a "readonly" copy of it,
+ which they can consult. On access, we atomically compare if the local copy is still current
+ with the global one. If it isn't we do the lock thing, and create a new local copy.
+
+ Meanwhile, to upgrade the global state, methods are offered that do appropriate locking
+ and upgrade the 'generation' counter, signaling to the local copies that they need to be
+ refreshed on the next access.
+
+ Two ways to change the global copy are available:
+ getCopy(), which delivers a deep copy of the current state, followed by setState()
+ modify(), which accepts a (lambda)function that modifies the state
+
+ NOTE: The actual destruction of the 'old' state happens when the last local state
+ relinquishes its access to the state.
+
+ "read-only"
+ Sometimes, a 'state' can contain parts that can safely be modified by multiple users, for
+ example, atomic counters. In such cases, it may be useful to explicitly declare such counters
+ as mutable. */
+
+template<typename T> class GlobalStateHolder;
+
+template<typename T>
+class LocalStateHolder
+{
+public:
+ explicit LocalStateHolder(GlobalStateHolder<T>* source) : d_source(source)
+ {}
+
+ const T* operator->() // only const access, but see "read-only" above
+ {
+ if(d_source->getGeneration() != d_generation) {
+ d_source->getState(&d_state, & d_generation);
+ }
+
+ return d_state.get();
+ }
+ const T& operator*() // only const access, but see "read-only" above
+ {
+ if(d_source->getGeneration() != d_generation) {
+ d_source->getState(&d_state, & d_generation);
+ }
+
+ return *d_state;
+ }
+
+ void reset()
+ {
+ d_generation=0;
+ d_state.reset();
+ }
+private:
+ std::shared_ptr<T> d_state;
+ unsigned int d_generation;
+ const GlobalStateHolder<T>* d_source;
+};
+
+template<typename T>
+class GlobalStateHolder
+{
+public:
+ GlobalStateHolder(){}
+ LocalStateHolder<T> getLocal()
+ {
+ return LocalStateHolder<T>(this);
+ }
+ void setState(std::shared_ptr<T> state)
+ {
+ std::lock_guard<std::mutex> l(d_lock);
+ d_state = state;
+ d_generation++;
+ }
+ unsigned int getGeneration() const
+ {
+ return d_generation;
+ }
+ void getState(std::shared_ptr<T>* state, unsigned int* generation) const
+ {
+ std::lock_guard<std::mutex> l(d_lock);
+ *state=d_state;
+ *generation = d_generation;
+ }
+ std::shared_ptr<T> getCopy() const
+ {
+ std::lock_guard<std::mutex> l(d_lock);
+ if(!d_state)
+ return std::make_shared<T>();
+ shared_ptr<T> ret = shared_ptr<T>(new T(*d_state));
+ return d_state;
+ }
+
+ template<typename F>
+ void modify(F act) {
+ std::lock_guard<std::mutex> l(d_lock);
+ if(!d_state)
+ d_state=std::make_shared<T>();
+ act(*d_state);
+ ++d_generation;
+ }
+
+
+ typedef T value_type;
+private:
+ mutable std::mutex d_lock;
+ std::shared_ptr<T> d_state;
+ std::atomic<unsigned int> d_generation{0};
+};