not *that* bad actually, but now that we are thread safe, might want to scale
lack of help()
lack of autocomplete
+ TCP is a bit wonky and may pick the wrong downstream
*/
namespace po = boost::program_options;
GlobalStateHolder<ServerPolicy> g_policy;
-shared_ptr<DownstreamState> firstAvailable(const servers_t& servers, const ComboAddress& remote, const DNSName& qname, uint16_t qtype, dnsheader* dh)
+shared_ptr<DownstreamState> firstAvailable(const NumberedServerVector& servers, const ComboAddress& remote, const DNSName& qname, uint16_t qtype, dnsheader* dh)
{
for(auto& d : servers) {
- if(d->isUp() && d->qps.check())
- return d;
+ if(d.second->isUp() && d.second->qps.check())
+ return d.second;
}
static int counter=0;
++counter;
if(servers.empty())
return shared_ptr<DownstreamState>();
- return servers[counter % servers.size()];
+ return servers[counter % servers.size()].second;
}
-shared_ptr<DownstreamState> leastOutstanding(const servers_t& servers, const ComboAddress& remote, const DNSName& qname, uint16_t qtype, dnsheader* dh)
+shared_ptr<DownstreamState> leastOutstanding(const NumberedServerVector& servers, const ComboAddress& remote, const DNSName& qname, uint16_t qtype, dnsheader* dh)
{
vector<pair<pair<int,int>, shared_ptr<DownstreamState>>> poss;
for(auto& d : servers) { // w=1, w=10 -> 1, 11
- if(d->isUp()) {
- poss.push_back({make_pair(d->outstanding.load(), d->order), d});
+ if(d.second->isUp()) {
+ poss.push_back({make_pair(d.second->outstanding.load(), d.second->order), d.second});
}
}
if(poss.empty())
return poss.begin()->second;
}
-shared_ptr<DownstreamState> wrandom(const servers_t& servers, const ComboAddress& remote, const DNSName& qname, uint16_t qtype, dnsheader* dh)
+shared_ptr<DownstreamState> wrandom(const NumberedServerVector& servers, const ComboAddress& remote, const DNSName& qname, uint16_t qtype, dnsheader* dh)
{
vector<pair<int, shared_ptr<DownstreamState>>> poss;
int sum=0;
for(auto& d : servers) { // w=1, w=10 -> 1, 11
- if(d->isUp()) {
- sum+=d->weight;
- poss.push_back({sum, d});
+ if(d.second->isUp()) {
+ sum+=d.second->weight;
+ poss.push_back({sum, d.second});
}
}
return p->second;
}
-shared_ptr<DownstreamState> roundrobin(const servers_t& servers, const ComboAddress& remote, const DNSName& qname, uint16_t qtype, dnsheader* dh)
+shared_ptr<DownstreamState> roundrobin(const NumberedServerVector& servers, const ComboAddress& remote, const DNSName& qname, uint16_t qtype, dnsheader* dh)
{
- servers_t poss;
+ NumberedServerVector poss;
for(auto& d : servers) {
- if(d->isUp()) {
+ if(d.second->isUp()) {
poss.push_back(d);
}
}
static unsigned int counter;
- return (*res)[(counter++) % res->size()];
+ return (*res)[(counter++) % res->size()].second;
}
static void daemonize(void)
ComboAddress g_serverControl{"127.0.0.1:5199"};
-servers_t getDownstreamCandidates(const std::string& pool)
+NumberedServerVector getDownstreamCandidates(const servers_t& servers, const std::string& pool)
{
- servers_t ret;
- for(const auto& s : g_dstates.getCopy())
+ NumberedServerVector ret;
+ int count=0;
+ for(const auto& s : servers)
if((pool.empty() && s->pools.empty()) || s->pools.count(pool))
- ret.push_back(s);
+ ret.push_back(make_pair(++count, s));
return ret;
-
}
// listens to incoming queries, sends out to downstream servers, noting the intended return path
auto localLimiters = g_limiters.getLocal();
auto localPool = g_poolrules.getLocal();
auto localMatchNodeFilter = g_suffixMatchNodeFilter.getLocal();
-
+ auto localServers = g_dstates.getLocal();
struct msghdr msgh;
struct iovec iov;
char cbuf[256];
}
}
DownstreamState* ss = 0;
- auto candidates=getDownstreamCandidates(pool);
+ auto candidates=getDownstreamCandidates(*localServers, pool);
auto policy=localPolicy->policy;
{
std::lock_guard<std::mutex> lock(g_luamutex);
Let's start naively.
*/
-int getTCPDownstream(DownstreamState** ds, const ComboAddress& remote, const DNSName& qname, uint16_t qtype, dnsheader* dh)
-{
- auto policy=g_policy.getCopy().policy;
-
+int getTCPDownstream(policy_t policy, string pool, DownstreamState** ds, const ComboAddress& remote, const DNSName& qname, uint16_t qtype, dnsheader* dh)
+{
{
std::lock_guard<std::mutex> lock(g_luamutex);
- *ds = policy(g_dstates.getCopy(), remote, qname, qtype, dh).get(); // XXX I think this misses pool selection!
+ *ds = policy(getDownstreamCandidates(g_dstates.getCopy(), pool), remote, qname, qtype, dh).get();
}
vinfolog("TCP connecting to downstream %s", (*ds)->remote.toStringWithPort());
delete citmp;
uint16_t qlen, rlen;
+ string pool; // empty for now
try {
+ auto localPolicy = g_policy.getLocal();
for(;;) {
if(!getMsgLen(ci.fd, &qlen))
break;
- ds->queries++;
- ds->outstanding++;
char query[qlen];
readn2(ci.fd, query, qlen);
uint16_t qtype;
DNSName qname(query, qlen, 12, false, &qtype);
struct dnsheader* dh =(dnsheader*)query;
if(dsock == -1) {
- dsock = getTCPDownstream(&ds, ci.remote, qname, qtype, dh);
+ dsock = getTCPDownstream(localPolicy->policy, pool, &ds, ci.remote, qname, qtype, dh);
}
else {
vinfolog("Reusing existing TCP connection to %s", ds->remote.toStringWithPort());
}
+ ds->queries++;
+ ds->outstanding++;
+
+ if(qtype == QType::AXFR) // XXX fixme we really need to do better
+ break;
- // FIXME: drop AXFR queries here, they confuse us
retry:;
if(!putMsgLen(dsock, qlen)) {
vinfolog("Downstream connection to %s died on us, getting a new one!", ds->remote.toStringWithPort());
close(dsock);
- dsock=getTCPDownstream(&ds, ci.remote, qname, qtype, dh);
+ dsock=getTCPDownstream(localPolicy->policy, pool, &ds, ci.remote, qname, qtype, dh);
goto retry;
}
if(!getMsgLen(dsock, &rlen)) {
vinfolog("Downstream connection to %s died on us phase 2, getting a new one!", ds->remote.toStringWithPort());
close(dsock);
- dsock=getTCPDownstream(&ds, ci.remote, qname, qtype, dh);
+ dsock=getTCPDownstream(localPolicy->policy, pool, &ds, ci.remote, qname, qtype, dh);
goto retry;
}
void setAuto() { availability = Availability::Auto; }
};
using servers_t =vector<std::shared_ptr<DownstreamState>>;
-typedef std::function<shared_ptr<DownstreamState>(const servers_t& servers, const ComboAddress& remote, const DNSName& qname, uint16_t qtype, dnsheader* dh)> policy_t;
+template <class T> using NumberedVector = std::vector<std::pair<unsigned int, T> >;
+
+void* responderThread(std::shared_ptr<DownstreamState> state);
+extern std::mutex g_luamutex;
+extern LuaContext g_lua;
+extern std::string g_outputBuffer; // locking for this is ok, as locked by g_luamutex
+
+using NumberedServerVector = NumberedVector<shared_ptr<DownstreamState>>;
+typedef std::function<shared_ptr<DownstreamState>(const NumberedServerVector& servers, const ComboAddress& remote, const DNSName& qname, uint16_t qtype, dnsheader* dh)> policy_t;
struct ServerPolicy
{
policy_t policy;
};
-void* responderThread(std::shared_ptr<DownstreamState> state);
-extern std::mutex g_luamutex;
-extern LuaContext g_lua;
-extern std::string g_outputBuffer; // locking for this is ok, as locked by g_luamutex
-
extern GlobalStateHolder<ServerPolicy> g_policy;
extern GlobalStateHolder<servers_t> g_dstates;
extern GlobalStateHolder<vector<pair<boost::variant<SuffixMatchNode,NetmaskGroup>, QPSLimiter> >> g_limiters;
namespace po = boost::program_options;
extern po::variables_map g_vm;
+NumberedServerVector getDownstreamCandidates(const servers_t& servers, const std::string& pool);
+
+std::shared_ptr<DownstreamState> firstAvailable(const NumberedServerVector& servers, const ComboAddress& remote, const DNSName& qname, uint16_t qtype, dnsheader* dh);
-std::shared_ptr<DownstreamState> firstAvailable(const servers_t& servers, const ComboAddress& remote, const DNSName& qname, uint16_t qtype, dnsheader* dh);
-std::shared_ptr<DownstreamState> leastOutstanding(const servers_t& servers, const ComboAddress& remote, const DNSName& qname, uint16_t qtype, dnsheader* dh);
-std::shared_ptr<DownstreamState> wrandom(const servers_t& servers, const ComboAddress& remote, const DNSName& qname, uint16_t qtype, dnsheader* dh);
-std::shared_ptr<DownstreamState> roundrobin(const servers_t& servers, const ComboAddress& remote, const DNSName& qname, uint16_t qtype, dnsheader* dh);
+std::shared_ptr<DownstreamState> leastOutstanding(const NumberedServerVector& servers, const ComboAddress& remote, const DNSName& qname, uint16_t qtype, dnsheader* dh);
+std::shared_ptr<DownstreamState> wrandom(const NumberedServerVector& servers, const ComboAddress& remote, const DNSName& qname, uint16_t qtype, dnsheader* dh);
+std::shared_ptr<DownstreamState> roundrobin(const NumberedServerVector& servers, const ComboAddress& remote, const DNSName& qname, uint16_t qtype, dnsheader* dh);
return false
end
+blockFilter = nil -- this is how you disable a filter
+
counter=0
--- called to pick a downstream server
+-- called to pick a downstream server, ignores 'up' status
function luaroundrobin(servers, remote, qname, qtype, dh)
- print("Got called: "..#servers)
counter=counter+1;
return servers[1+(counter % #servers)]
end
-- setServerPolicyLua("luaroundrobin", luaroundrobin)
-authServer=newServer{address="2001:888:2000:1d::2", order=12}
+newServer{address="2001:888:2000:1d::2", pool="auth"}
+newServer{address="2a01:4f8:110:4389::2", pool="auth"}
function splitSetup(servers, remote, qname, qtype, dh)
if(dh:getRD() == false)
then
- return authServer
+ return firstAvailable.policy(getPoolServers("auth"), remote, qname, qtype, dh)
else
- return firstAvailable(servers, remote, qname, qtype, dh)
+ return firstAvailable.policy(servers, remote, qname, qtype, dh)
end
-end
\ No newline at end of file
+end
+
+-- setServerPolicyLua("splitSetup", splitSetup)
\ No newline at end of file