]> granicus.if.org Git - pdns/commitdiff
fix up impedence mismatch between Lua and native server selection policies, make...
authorbert hubert <bert.hubert@netherlabs.nl>
Wed, 11 Mar 2015 10:03:33 +0000 (11:03 +0100)
committerbert hubert <bert.hubert@netherlabs.nl>
Wed, 11 Mar 2015 10:03:33 +0000 (11:03 +0100)
pdns/dnsdist-lua.cc
pdns/dnsdist.cc
pdns/dnsdist.hh
pdns/dnsdistconf.lua

index 20f510634a0955ce32c48ec5ae033f842ea26202..416c62944925186339858751d9d7b6e853734746 100644 (file)
@@ -98,7 +98,6 @@ vector<std::function<void(void)>> setupLua(bool client)
   g_lua.writeFunction("setServerPolicy", [](ServerPolicy policy)  {
       g_policy.setState(policy);
     });
-
   g_lua.writeFunction("setServerPolicyLua", [](string name, policy_t policy)  {
       g_policy.setState(ServerPolicy{name, policy});
     });
@@ -306,6 +305,10 @@ vector<std::function<void(void)>> setupLua(bool client)
       return ret;
     });
 
+  g_lua.writeFunction("getPoolServers", [](string pool) {
+      return getDownstreamCandidates(g_dstates.getCopy(), pool);
+    });
+
   g_lua.writeFunction("getServer", [](int i) { return g_dstates.getCopy().at(i); });
 
   g_lua.registerFunction<void(DownstreamState::*)(int)>("setQPS", [](DownstreamState& s, int lim) { s.qps = lim ? QPSLimiter(lim, lim) : QPSLimiter(); });
index 6a1c582c5d967d80e1a404cc2064927bdc685b63..008680fd0c4dc941ef02c8f6ff42bb4212202172 100644 (file)
@@ -41,6 +41,7 @@
       not *that* bad actually, but now that we are thread safe, might want to scale
    lack of help()
    lack of autocomplete
+   TCP is a bit wonky and may pick the wrong downstream
 */
 
 namespace po = boost::program_options;
@@ -161,26 +162,26 @@ LuaContext g_lua;
 
 GlobalStateHolder<ServerPolicy> g_policy;
 
-shared_ptr<DownstreamState> firstAvailable(const servers_t& servers, const ComboAddress& remote, const DNSName& qname, uint16_t qtype, dnsheader* dh)
+shared_ptr<DownstreamState> firstAvailable(const NumberedServerVector& servers, const ComboAddress& remote, const DNSName& qname, uint16_t qtype, dnsheader* dh)
 {
   for(auto& d : servers) {
-    if(d->isUp() && d->qps.check())
-      return d;
+    if(d.second->isUp() && d.second->qps.check())
+      return d.second;
   }
   static int counter=0;
   ++counter;
   if(servers.empty())
     return shared_ptr<DownstreamState>();
-  return servers[counter % servers.size()];
+  return servers[counter % servers.size()].second;
 }
 
-shared_ptr<DownstreamState> leastOutstanding(const servers_t& servers, const ComboAddress& remote, const DNSName& qname, uint16_t qtype, dnsheader* dh)
+shared_ptr<DownstreamState> leastOutstanding(const NumberedServerVector& servers, const ComboAddress& remote, const DNSName& qname, uint16_t qtype, dnsheader* dh)
 {
   vector<pair<pair<int,int>, shared_ptr<DownstreamState>>> poss;
 
   for(auto& d : servers) {      // w=1, w=10 -> 1, 11
-    if(d->isUp()) {
-      poss.push_back({make_pair(d->outstanding.load(), d->order), d});
+    if(d.second->isUp()) {
+      poss.push_back({make_pair(d.second->outstanding.load(), d.second->order), d.second});
     }
   }
   if(poss.empty())
@@ -189,14 +190,14 @@ shared_ptr<DownstreamState> leastOutstanding(const servers_t& servers, const Com
   return poss.begin()->second;
 }
 
-shared_ptr<DownstreamState> wrandom(const servers_t& servers, const ComboAddress& remote, const DNSName& qname, uint16_t qtype, dnsheader* dh)
+shared_ptr<DownstreamState> wrandom(const NumberedServerVector& servers, const ComboAddress& remote, const DNSName& qname, uint16_t qtype, dnsheader* dh)
 {
   vector<pair<int, shared_ptr<DownstreamState>>> poss;
   int sum=0;
   for(auto& d : servers) {      // w=1, w=10 -> 1, 11
-    if(d->isUp()) {
-      sum+=d->weight;
-      poss.push_back({sum, d});
+    if(d.second->isUp()) {
+      sum+=d.second->weight;
+      poss.push_back({sum, d.second});
 
     }
   }
@@ -207,12 +208,12 @@ shared_ptr<DownstreamState> wrandom(const servers_t& servers, const ComboAddress
   return p->second;
 }
 
-shared_ptr<DownstreamState> roundrobin(const servers_t& servers, const ComboAddress& remote, const DNSName& qname, uint16_t qtype, dnsheader* dh)
+shared_ptr<DownstreamState> roundrobin(const NumberedServerVector& servers, const ComboAddress& remote, const DNSName& qname, uint16_t qtype, dnsheader* dh)
 {
-  servers_t poss;
+  NumberedServerVector poss;
 
   for(auto& d : servers) {
-    if(d->isUp()) {
+    if(d.second->isUp()) {
       poss.push_back(d);
     }
   }
@@ -226,7 +227,7 @@ shared_ptr<DownstreamState> roundrobin(const servers_t& servers, const ComboAddr
 
   static unsigned int counter;
  
-  return (*res)[(counter++) % res->size()];
+  return (*res)[(counter++) % res->size()].second;
 }
 
 static void daemonize(void)
@@ -252,15 +253,15 @@ GlobalStateHolder<SuffixMatchNode> g_suffixMatchNodeFilter;
 ComboAddress g_serverControl{"127.0.0.1:5199"};
 
 
-servers_t getDownstreamCandidates(const std::string& pool)
+NumberedServerVector getDownstreamCandidates(const servers_t& servers, const std::string& pool)
 {
-  servers_t ret;
-  for(const auto& s : g_dstates.getCopy()) 
+  NumberedServerVector ret;
+  int count=0;
+  for(const auto& s : servers) 
     if((pool.empty() && s->pools.empty()) || s->pools.count(pool))
-      ret.push_back(s);
+      ret.push_back(make_pair(++count, s));
   
   return ret;
-
 }
 
 // listens to incoming queries, sends out to downstream servers, noting the intended return path 
@@ -291,7 +292,7 @@ try
   auto localLimiters = g_limiters.getLocal();
   auto localPool = g_poolrules.getLocal();
   auto localMatchNodeFilter = g_suffixMatchNodeFilter.getLocal();
-
+  auto localServers = g_dstates.getLocal();
   struct msghdr msgh;
   struct iovec iov;
   char cbuf[256];
@@ -372,7 +373,7 @@ try
        }
       }
       DownstreamState* ss = 0;
-      auto candidates=getDownstreamCandidates(pool);
+      auto candidates=getDownstreamCandidates(*localServers, pool);
       auto policy=localPolicy->policy;
       {
        std::lock_guard<std::mutex> lock(g_luamutex);
@@ -447,13 +448,11 @@ catch(...)
    Let's start naively.
 */
 
-int getTCPDownstream(DownstreamState** ds, const ComboAddress& remote, const DNSName& qname, uint16_t qtype, dnsheader* dh)
-{
-  auto policy=g_policy.getCopy().policy;
-  
+int getTCPDownstream(policy_t policy, string pool, DownstreamState** ds, const ComboAddress& remote, const DNSName& qname, uint16_t qtype, dnsheader* dh)
+{  
   {
     std::lock_guard<std::mutex> lock(g_luamutex);
-    *ds = policy(g_dstates.getCopy(), remote, qname, qtype, dh).get(); // XXX I think this misses pool selection!
+    *ds = policy(getDownstreamCandidates(g_dstates.getCopy(), pool), remote, qname, qtype, dh).get(); 
   }
   
   vinfolog("TCP connecting to downstream %s", (*ds)->remote.toStringWithPort());
@@ -546,31 +545,35 @@ void* tcpClientThread(int pipefd)
     delete citmp;
     
     uint16_t qlen, rlen;
+    string pool; // empty for now
     try {
+      auto localPolicy = g_policy.getLocal();
       for(;;) {      
         if(!getMsgLen(ci.fd, &qlen))
           break;
         
-        ds->queries++;
-        ds->outstanding++;
         char query[qlen];
         readn2(ci.fd, query, qlen);
        uint16_t qtype;
        DNSName qname(query, qlen, 12, false, &qtype);
        struct dnsheader* dh =(dnsheader*)query;
        if(dsock == -1) {
-         dsock = getTCPDownstream(&ds, ci.remote, qname, qtype, dh);
+         dsock = getTCPDownstream(localPolicy->policy, pool, &ds, ci.remote, qname, qtype, dh);
        }
        else {
          vinfolog("Reusing existing TCP connection to %s", ds->remote.toStringWithPort());
        }
+        ds->queries++;
+        ds->outstanding++;
+
+       if(qtype == QType::AXFR)  // XXX fixme we really need to do better
+         break;
 
-        // FIXME: drop AXFR queries here, they confuse us
       retry:; 
         if(!putMsgLen(dsock, qlen)) {
          vinfolog("Downstream connection to %s died on us, getting a new one!", ds->remote.toStringWithPort());
           close(dsock);
-          dsock=getTCPDownstream(&ds, ci.remote, qname, qtype, dh);
+          dsock=getTCPDownstream(localPolicy->policy, pool, &ds, ci.remote, qname, qtype, dh);
           goto retry;
         }
       
@@ -579,7 +582,7 @@ void* tcpClientThread(int pipefd)
         if(!getMsgLen(dsock, &rlen)) {
          vinfolog("Downstream connection to %s died on us phase 2, getting a new one!", ds->remote.toStringWithPort());
           close(dsock);
-          dsock=getTCPDownstream(&ds, ci.remote, qname, qtype, dh);
+          dsock=getTCPDownstream(localPolicy->policy, pool, &ds, ci.remote, qname, qtype, dh);
           goto retry;
         }
 
index b9776241cc69d1b43d3f74876136fd8eade044cf..98d449ffae1f51741c8110220155a67d7ff4f424 100644 (file)
@@ -190,8 +190,16 @@ struct DownstreamState
   void setAuto() { availability = Availability::Auto; }
 };
 using servers_t =vector<std::shared_ptr<DownstreamState>>;
-typedef std::function<shared_ptr<DownstreamState>(const servers_t& servers, const ComboAddress& remote, const DNSName& qname, uint16_t qtype, dnsheader* dh)> policy_t;
 
+template <class T> using NumberedVector = std::vector<std::pair<unsigned int, T> >;
+
+void* responderThread(std::shared_ptr<DownstreamState> state);
+extern std::mutex g_luamutex;
+extern LuaContext g_lua;
+extern std::string g_outputBuffer; // locking for this is ok, as locked by g_luamutex
+
+using NumberedServerVector = NumberedVector<shared_ptr<DownstreamState>>;
+typedef std::function<shared_ptr<DownstreamState>(const NumberedServerVector& servers, const ComboAddress& remote, const DNSName& qname, uint16_t qtype, dnsheader* dh)> policy_t;
 
 struct ServerPolicy
 {
@@ -199,11 +207,6 @@ struct ServerPolicy
   policy_t policy;
 };
 
-void* responderThread(std::shared_ptr<DownstreamState> state);
-extern std::mutex g_luamutex;
-extern LuaContext g_lua;
-extern std::string g_outputBuffer; // locking for this is ok, as locked by g_luamutex
-
 extern GlobalStateHolder<ServerPolicy> g_policy;
 extern GlobalStateHolder<servers_t> g_dstates;
 extern GlobalStateHolder<vector<pair<boost::variant<SuffixMatchNode,NetmaskGroup>, QPSLimiter> >> g_limiters;
@@ -224,8 +227,10 @@ vector<std::function<void(void)>> setupLua(bool client);
 
 namespace po = boost::program_options;
 extern po::variables_map g_vm;
+NumberedServerVector getDownstreamCandidates(const servers_t& servers, const std::string& pool);
+
+std::shared_ptr<DownstreamState> firstAvailable(const NumberedServerVector& servers, const ComboAddress& remote, const DNSName& qname, uint16_t qtype, dnsheader* dh);
 
-std::shared_ptr<DownstreamState> firstAvailable(const servers_t& servers, const ComboAddress& remote, const DNSName& qname, uint16_t qtype, dnsheader* dh);
-std::shared_ptr<DownstreamState> leastOutstanding(const servers_t& servers, const ComboAddress& remote, const DNSName& qname, uint16_t qtype, dnsheader* dh);
-std::shared_ptr<DownstreamState> wrandom(const servers_t& servers, const ComboAddress& remote, const DNSName& qname, uint16_t qtype, dnsheader* dh);
-std::shared_ptr<DownstreamState> roundrobin(const servers_t& servers, const ComboAddress& remote, const DNSName& qname, uint16_t qtype, dnsheader* dh);
+std::shared_ptr<DownstreamState> leastOutstanding(const NumberedServerVector& servers, const ComboAddress& remote, const DNSName& qname, uint16_t qtype, dnsheader* dh);
+std::shared_ptr<DownstreamState> wrandom(const NumberedServerVector& servers, const ComboAddress& remote, const DNSName& qname, uint16_t qtype, dnsheader* dh);
+std::shared_ptr<DownstreamState> roundrobin(const NumberedServerVector& servers, const ComboAddress& remote, const DNSName& qname, uint16_t qtype, dnsheader* dh);
index 995efdd149b9ece2c4d2f58e069a4a4ed8d64ecb..31a1c043f516c988752c269dda75a689d02df591 100644 (file)
@@ -39,24 +39,28 @@ function blockFilter(remote, qname, qtype, dh)
         return false
 end
 
+blockFilter = nil -- this is how you disable a filter
+
 counter=0
 
--- called to pick a downstream server
+-- called to pick a downstream server, ignores 'up' status
 function luaroundrobin(servers, remote, qname, qtype, dh) 
-        print("Got called: "..#servers)
         counter=counter+1;
         return servers[1+(counter % #servers)]
 end
 
 -- setServerPolicyLua("luaroundrobin", luaroundrobin)
 
-authServer=newServer{address="2001:888:2000:1d::2", order=12}
+newServer{address="2001:888:2000:1d::2", pool="auth"}
+newServer{address="2a01:4f8:110:4389::2", pool="auth"}
 
 function splitSetup(servers, remote, qname, qtype, dh)
         if(dh:getRD() == false)
         then
-               return authServer
+               return firstAvailable.policy(getPoolServers("auth"), remote, qname, qtype, dh)
         else
-               return firstAvailable(servers, remote, qname, qtype, dh)
+               return firstAvailable.policy(servers, remote, qname, qtype, dh)
         end
-end
\ No newline at end of file
+end
+
+-- setServerPolicyLua("splitSetup", splitSetup)
\ No newline at end of file