};
vector<pair<boost::variant<SuffixMatchNode,NetmaskGroup>, QPSLimiter> > g_limiters;
+vector<pair<boost::variant<SuffixMatchNode,NetmaskGroup>, string> > g_poolrules;
struct IDState
{
int order{1};
int weight{1};
StopWatch sw;
+ set<string> pools;
enum class Availability { Up, Down, Auto} availability{Availability::Auto};
bool upStatus{false};
bool isUp() const
std::mutex g_luamutex;
LuaContext g_lua;
-typedef std::function<shared_ptr<DownstreamState>(const ComboAddress& remote, const DNSName& qname, uint16_t qtype, dnsheader* dh)> policy_t;
+typedef std::function<shared_ptr<DownstreamState>(const vector<shared_ptr<DownstreamState>>& servers, const ComboAddress& remote, const DNSName& qname, uint16_t qtype, dnsheader* dh)> policy_t;
struct ServerPolicy
{
policy_t policy;
} g_policy;
-shared_ptr<DownstreamState> firstAvailable(const ComboAddress& remote, const DNSName& qname, uint16_t qtype, dnsheader* dh)
+shared_ptr<DownstreamState> firstAvailable(const vector<shared_ptr<DownstreamState>>& servers, const ComboAddress& remote, const DNSName& qname, uint16_t qtype, dnsheader* dh)
{
- for(auto& d : g_dstates) {
+ for(auto& d : servers) {
if(d->isUp() && d->qps.check())
return d;
}
return g_dstates[counter % g_dstates.size()];
}
-shared_ptr<DownstreamState> leastOutstanding(const ComboAddress& remote, const DNSName& qname, uint16_t qtype, dnsheader* dh)
+shared_ptr<DownstreamState> leastOutstanding(const vector<shared_ptr<DownstreamState>>& servers, const ComboAddress& remote, const DNSName& qname, uint16_t qtype, dnsheader* dh)
{
- vector<pair<int, shared_ptr<DownstreamState>>> poss;
+ vector<pair<pair<int,int>, shared_ptr<DownstreamState>>> poss;
- for(auto& d : g_dstates) { // w=1, w=10 -> 1, 11
+ for(auto& d : servers) { // w=1, w=10 -> 1, 11
if(d->isUp()) {
- poss.push_back({d->outstanding.load(), d});
+ poss.push_back({make_pair(d->outstanding.load(), d->order), d});
}
}
if(poss.empty())
return poss.begin()->second;
}
-shared_ptr<DownstreamState> wrandom(const ComboAddress& remote, const DNSName& qname, uint16_t qtype, dnsheader* dh)
+shared_ptr<DownstreamState> wrandom(const vector<shared_ptr<DownstreamState>>& servers, const ComboAddress& remote, const DNSName& qname, uint16_t qtype, dnsheader* dh)
{
vector<pair<int, shared_ptr<DownstreamState>>> poss;
int sum=0;
- for(auto& d : g_dstates) { // w=1, w=10 -> 1, 11
+ for(auto& d : servers) { // w=1, w=10 -> 1, 11
if(d->isUp()) {
sum+=d->weight;
poss.push_back({sum, d});
return p->second;
}
-shared_ptr<DownstreamState> roundrobin(const ComboAddress& remote, const DNSName& qname, uint16_t qtype, dnsheader* dh)
+shared_ptr<DownstreamState> roundrobin(const vector<shared_ptr<DownstreamState>>& servers, const ComboAddress& remote, const DNSName& qname, uint16_t qtype, dnsheader* dh)
{
vector<shared_ptr<DownstreamState>> poss;
- for(auto& d : g_dstates) {
+ for(auto& d : servers) {
if(d->isUp()) {
poss.push_back(d);
}
return (*res)[(counter++) % res->size()];
}
-
-
static void daemonize(void)
{
if(fork())
}
SuffixMatchNode g_suffixMatchNodeFilter;
-SuffixMatchNode g_abuseSMN;
-NetmaskGroup g_abuseNMG;
-shared_ptr<DownstreamState> g_abuseDSS;
+
ComboAddress g_serverControl{"127.0.0.1:5199"};
+using servers_t =vector<shared_ptr<DownstreamState>>;
+servers_t getDownstreamCandidates(const std::string& pool)
+{
+ if(pool.empty())
+ return g_dstates;
+
+ servers_t ret;
+ for(auto& s : g_dstates)
+ if(s->pools.count(pool))
+ ret.push_back(s);
+
+ return ret;
+}
// listens to incoming queries, sends out to downstream servers, noting the intended return path
void* udpClientThread(ClientState* cs)
}
for(;;) {
try {
- len = recvfrom(cs->udpFD, packet, sizeof(packet), 0, (struct sockaddr*) &remote, &socklen);
- if(len < (int)sizeof(struct dnsheader))
- continue;
-
- if(!g_ACL.match(remote))
- continue;
-
- if(dh->qr) // don't respond to responses
- continue;
-
-
- DNSName qname(packet, len, 12, false, &qtype);
-
- g_rings.queryRing.push_back(qname);
-
- bool blocked=false;
- for(auto& lim : g_limiters) {
- if(auto nmg=boost::get<NetmaskGroup>(&lim.first)) {
- if(nmg->match(remote) && !lim.second.check()) {
- blocked=true;
- break;
+ len = recvfrom(cs->udpFD, packet, sizeof(packet), 0, (struct sockaddr*) &remote, &socklen);
+ if(len < (int)sizeof(struct dnsheader))
+ continue;
+
+ if(!g_ACL.match(remote))
+ continue;
+
+ if(dh->qr) // don't respond to responses
+ continue;
+
+
+ DNSName qname(packet, len, 12, false, &qtype);
+
+ g_rings.queryRing.push_back(qname);
+
+ bool blocked=false;
+ for(auto& lim : g_limiters) {
+ if(auto nmg=boost::get<NetmaskGroup>(&lim.first)) {
+ if(nmg->match(remote) && !lim.second.check()) {
+ blocked=true;
+ break;
+ }
}
- }
- else if(auto smn=boost::get<SuffixMatchNode>(&lim.first)) {
- if(smn->check(qname) && !lim.second.check()) {
- blocked=true;
- break;
+ else if(auto smn=boost::get<SuffixMatchNode>(&lim.first)) {
+ if(smn->check(qname) && !lim.second.check()) {
+ blocked=true;
+ break;
+ }
}
}
- }
- if(blocked)
- continue;
+ if(blocked)
+ continue;
- if(blockFilter)
- {
- std::lock_guard<std::mutex> lock(g_luamutex);
- if(blockFilter(remote, qname, qtype, dh))
+
+ if(blockFilter) {
+ std::lock_guard<std::mutex> lock(g_luamutex);
+
+ if(blockFilter(remote, qname, qtype, dh))
+ continue;
+ }
+
+ if(g_suffixMatchNodeFilter.check(qname))
continue;
- }
-
- if(g_suffixMatchNodeFilter.check(qname))
- continue;
+
+ if(re && re->match(qname.toString())) {
+ g_regexBlocks++;
+ continue;
+ }
+
+ if(dh->qr) { // something turned it into a response
+ sendto(cs->udpFD, packet, len, 0, (struct sockaddr*)&remote, remote.getSocklen());
+ continue;
+ }
- if(re && re->match(qname.toString())) {
- g_regexBlocks++;
- continue;
- }
-
- if(dh->qr) { // something turned it into a response
- sendto(cs->udpFD, packet, len, 0, (struct sockaddr*)&remote, remote.getSocklen());
- continue;
- }
+ string pool;
+ for(auto& pr : g_poolrules) {
+ if(auto nmg=boost::get<NetmaskGroup>(&pr.first)) {
+ if(nmg->match(remote)) {
+ pool=pr.second;
+ break;
+ }
+ }
+ else if(auto smn=boost::get<SuffixMatchNode>(&pr.first)) {
+ if(smn->check(qname)) {
+ pool=pr.second;
+ break;
+ }
+ }
+ }
+ DownstreamState* ss = 0;
+ {
+ std::lock_guard<std::mutex> lock(g_luamutex);
+ auto candidates=getDownstreamCandidates(pool);
+ ss = g_policy.policy(candidates, remote, qname, qtype, dh).get();
+ }
- DownstreamState* ss = 0;
- if(g_abuseSMN.check(qname) || g_abuseNMG.match(remote)) {
- ss = &*g_abuseDSS;
- }
- else {
- std::lock_guard<std::mutex> lock(g_luamutex);
- ss = g_policy.policy(remote, qname, qtype, dh).get();
if(!ss)
continue;
+
+ ss->queries++;
+
+ unsigned int idOffset = (ss->idOffset++) % ss->idStates.size();
+ IDState* ids = &ss->idStates[idOffset];
+
+ if(ids->origFD < 0) // if we are reusing, no change in outstanding
+ ss->outstanding++;
+ else
+ ss->reuseds++;
+
+ ids->origFD = cs->udpFD;
+ ids->age = 0;
+ ids->origID = dh->id;
+ ids->origRemote = remote;
+ ids->sentTime.start();
+ ids->qname = qname;
+ ids->qtype = qtype;
+ dh->id = idOffset;
+
+ len = send(ss->fd, packet, len, 0);
+ if(len < 0)
+ ss->sendErrors++;
+
+ vinfolog("Got query from %s, relayed to %s", remote.toStringWithPort(), ss->remote.toStringWithPort());
}
- ss->queries++;
-
- unsigned int idOffset = (ss->idOffset++) % ss->idStates.size();
- IDState* ids = &ss->idStates[idOffset];
-
- if(ids->origFD < 0) // if we are reusing, no change in outstanding
- ss->outstanding++;
- else
- ss->reuseds++;
-
- ids->origFD = cs->udpFD;
- ids->age = 0;
- ids->origID = dh->id;
- ids->origRemote = remote;
- ids->sentTime.start();
- ids->qname = qname;
- ids->qtype = qtype;
- dh->id = idOffset;
-
- len = send(ss->fd, packet, len, 0);
- if(len < 0)
- ss->sendErrors++;
-
- vinfolog("Got query from %s, relayed to %s", remote.toStringWithPort(), ss->remote.toStringWithPort());
+ catch(std::exception& e){
+ errlog("Got an error: %s", e.what());
}
- catch(...){}
}
return 0;
}
{
{
std::lock_guard<std::mutex> lock(g_luamutex);
- *ds = g_policy.policy(remote, qname, qtype, dh).get();
+ *ds = g_policy.policy(g_dstates, remote, qname, qtype, dh).get();
}
vinfolog("TCP connecting to downstream %s", (*ds)->remote.toStringWithPort());
--g_tcpclientthreads.d_queued;
ci=*citmp;
delete citmp;
-
-
+
uint16_t qlen, rlen;
try {
for(;;) {
ret->qps=QPSLimiter(boost::lexical_cast<int>(vars["qps"]),boost::lexical_cast<int>(vars["qps"]));
}
+ if(vars.count("pool")) {
+ ret->pools.insert(vars["pool"]);
+ }
+
if(vars.count("order")) {
ret->order=boost::lexical_cast<int>(vars["order"]);
}
- g_lua.writeFunction("deleteServer",
+ g_lua.writeFunction("rmServer",
[](boost::variant<std::shared_ptr<DownstreamState>, int> var)
{
if(auto* rem = boost::get<shared_ptr<DownstreamState>>(&var))
} );
- g_lua.writeFunction("setServerPolicy", [](ServerPolicy policy) {
- g_policy = policy;
+ g_lua.writeFunction("setServerPolicy", [](ServerPolicy policy) {
+ g_policy=policy;
+ });
+
+ g_lua.writeFunction("setServerPolicyLua", [](string name, policy_t policy) {
+ g_policy=ServerPolicy{name, policy};
});
g_lua.writeFunction("showServerPolicy", []() {
g_lua.registerMember("name", &ServerPolicy::name);
g_lua.registerMember("policy", &ServerPolicy::policy);
-
+ g_lua.writeFunction("newServerPolicy", [](string name, policy_t policy) { return ServerPolicy{name, policy};});
g_lua.writeVariable("firstAvailable", ServerPolicy{"firstAvailable", firstAvailable});
g_lua.writeVariable("roundrobin", ServerPolicy{"roundrobin", roundrobin});
g_lua.writeVariable("wrandom", ServerPolicy{"wrandom", wrandom});
try {
ostringstream ret;
- boost::format fmt("%1$-3d %2% %|30t|%3$5s %|36t|%4$7.1f %|41t|%5$7d %|44t|%6$3d %|53t|%7$2d %|55t|%8$10d %|61t|%9$7d %|76t|%10$5.1f %|84t|%11$5.1f" );
+ boost::format fmt("%1$-3d %2% %|30t|%3$5s %|36t|%4$7.1f %|41t|%5$7d %|44t|%6$3d %|53t|%7$2d %|55t|%8$10d %|61t|%9$7d %|76t|%10$5.1f %|84t|%11$5.1f %12%" );
// 1 2 3 4 5 6 7 8 9 10 11
- ret << (fmt % "#" % "Address" % "State" % "Qps" % "Qlim" % "Ord" % "Wt" % "Queries" % "Drops" % "Drate" % "Lat") << endl;
+ ret << (fmt % "#" % "Address" % "State" % "Qps" % "Qlim" % "Ord" % "Wt" % "Queries" % "Drops" % "Drate" % "Lat" % "Pools") << endl;
uint64_t totQPS{0}, totQueries{0}, totDrops{0};
int counter=0;
else
status = (s->upStatus ? "up" : "down");
+ string pools;
+ for(auto& p : s->pools) {
+ if(!pools.empty())
+ pools+=" ";
+ pools+=p;
+ }
+
ret << (fmt % counter % s->remote.toStringWithPort() %
status %
- s->queryLoad % s->qps.getRate() % s->order % s->weight % s->queries.load() % s->reuseds.load() % (s->dropRate) % (s->latencyUsec/1000.0)) << endl;
+ s->queryLoad % s->qps.getRate() % s->order % s->weight % s->queries.load() % s->reuseds.load() % (s->dropRate) % (s->latencyUsec/1000.0) % pools) << endl;
totQPS += s->queryLoad;
totQueries += s->queries.load();
}
ret<< (fmt % "All" % "" % ""
%
- (double)totQPS % "" % "" % "" % totQueries % totDrops % "" % "") << endl;
+ (double)totQPS % "" % "" % "" % totQueries % totDrops % "" % "" % "" ) << endl;
g_outputBuffer=ret.str();
}catch(std::exception& e) { g_outputBuffer=e.what(); throw; }
});
+ g_lua.writeFunction("addPoolRule", [](boost::variant<string,vector<pair<int, string>> > var, string pool) {
+ SuffixMatchNode smn;
+ NetmaskGroup nmg;
+
+ auto add=[&](string src) {
+ try {
+ smn.add(DNSName(src));
+ } catch(...) {
+ nmg.addMask(src);
+ }
+ };
+ if(auto src = boost::get<string>(&var))
+ add(*src);
+ else {
+ for(auto& a : boost::get<vector<pair<int, string>>>(var)) {
+ add(a.second);
+ }
+ }
+ if(nmg.empty())
+ g_poolrules.push_back({smn, pool});
+ else
+ g_poolrules.push_back({nmg, pool});
+
+ });
+
+ g_lua.writeFunction("showPoolRules", []() {
+ boost::format fmt("%-3d %-50s %s\n");
+ g_outputBuffer += (fmt % "#" % "Object" % "Pool").str();
+ int num=0;
+ for(const auto& lim : g_poolrules) {
+ string name;
+ if(auto nmg=boost::get<NetmaskGroup>(&lim.first)) {
+ name=nmg->toString();
+ }
+ else if(auto smn=boost::get<SuffixMatchNode>(&lim.first)) {
+ name=smn->toString();
+ }
+ g_outputBuffer += (fmt % num % name % lim.second).str();
+ ++num;
+ }
+ });
+
g_lua.writeFunction("addQPSLimit", [](boost::variant<string,vector<pair<int, string>> > var, int lim) {
SuffixMatchNode smn;
g_limiters.push_back({nmg, QPSLimiter(lim, lim)});
});
- g_lua.writeFunction("deleteQPSLimit", [](int i) {
+ g_lua.writeFunction("rmQPSLimit", [](int i) {
g_limiters.erase(g_limiters.begin() + i);
});
g_lua.writeFunction("getServer", [](int i) { return g_dstates[i]; });
- g_lua.registerFunction<string(DownstreamState::*)()>("tostring", [](const DownstreamState& s) { return s.remote.toStringWithPort(); });
g_lua.registerFunction<bool(DownstreamState::*)()>("checkQPS", [](DownstreamState& s) { return s.qps.check(); });
g_lua.registerFunction<void(DownstreamState::*)(int)>("setQPS", [](DownstreamState& s, int lim) { s.qps = lim ? QPSLimiter(lim, lim) : QPSLimiter(); });
+ g_lua.registerFunction<void(DownstreamState::*)(string)>("addPool", [](DownstreamState& s, string pool) { s.pools.insert(pool);});
+ g_lua.registerFunction<void(DownstreamState::*)(string)>("rmPool", [](DownstreamState& s, string pool) { s.pools.erase(pool);});
- g_lua.registerFunction<int(DownstreamState::*)()>("getOutstanding", [](const DownstreamState& s) { return s.outstanding.load(); });
+ g_lua.registerFunction<void(DownstreamState::*)()>("getOutstanding", [](const DownstreamState& s) { g_outputBuffer=std::to_string(s.outstanding.load()); });
g_lua.registerFunction("isUp", &DownstreamState::isUp);
g_lua.registerFunction("setAuto", &DownstreamState::setAuto);
g_lua.registerMember("upstatus", &DownstreamState::upStatus);
g_lua.registerMember("weight", &DownstreamState::weight);
+ g_lua.registerMember("order", &DownstreamState::order);
g_lua.writeFunction("show", [](const string& arg) {
g_outputBuffer+=arg;
g_lua.writeFunction("newQPSLimiter", [](int rate, int burst) { return QPSLimiter(rate, burst); });
g_lua.registerFunction("check", &QPSLimiter::check);
- g_lua.writeFunction("usleep", [](int usec) { usleep(usec); });
-
- g_lua.writeFunction("abuseShuntSMN", [](const std::string& name) {
- g_abuseSMN.add(DNSName(name));
- });
-
- g_lua.writeFunction("abuseShuntNM", [](const std::string& str) {
- g_abuseNMG.addMask(str);
- });
-
- g_lua.writeFunction("abuseServer", [](shared_ptr<DownstreamState> dss) {
- g_abuseDSS=dss;
- });
g_lua.writeFunction("makeKey", []() {
g_outputBuffer="setKey("+newKey()+")\n";