]> granicus.if.org Git - pdns/commitdiff
dnsdist now does things & has documentation
authorbert hubert <bert.hubert@netherlabs.nl>
Wed, 25 Feb 2015 17:08:39 +0000 (18:08 +0100)
committerbert hubert <bert.hubert@netherlabs.nl>
Wed, 25 Feb 2015 17:08:39 +0000 (18:08 +0100)
pdns/README-dnsdist.md [new file with mode: 0644]
pdns/dnsdist.cc
pdns/dnsdistconf.lua

diff --git a/pdns/README-dnsdist.md b/pdns/README-dnsdist.md
new file mode 100644 (file)
index 0000000..f52f639
--- /dev/null
@@ -0,0 +1,151 @@
+dnsdist
+-------
+
+WARNING: `dnsdist` is still under HEAVY development, but we are giving it some
+publicity in hopes of getting constructive feedback that will help us guide
+our feature set.
+
+Do NOT take this into production, but please DO let us know your thoughts!
+
+`dnsdist` is a highly DNS-, DoS- and abuse-aware loadbalancer. Its goal in
+life is to route traffic to the best server, delivering top performance
+to legitimate users while shunting or blocking abusive traffic.
+
+`dnsdist` is dynamic, in the sense that its configuration can be changed at
+runtime, and that its statistics can be queried from a console-like
+interface.
+
+Here is a minimal configuration:
+
+```
+$ cat dnsdistconf.lua
+newServer2 {address="2001:4860:4860::8888", qps=1}
+newServer2 {address="2001:4860:4860::8844", qps=1} 
+newServer2 {address="2620:0:ccc::2", qps=10}
+newServer2 {address="2620:0:ccd::2", qps=10}
+newServer("192.168.1.2")
+
+$ dnsdist --local=0.0.0.0:5200 
+Marking downstream [2001:4860:4860::8888]:53 as 'up'
+Marking downstream [2001:4860:4860::8844]:53 as 'up'
+Marking downstream [2620:0:ccc::2]:53 as 'up'
+Marking downstream [2620:0:ccd::2]:53 as 'up'
+Marking downstream 192.168.1.2:53 as 'up'
+Listening on 0.0.0.0:5200
+>
+```
+
+We can now send queries to port 5200, and get answers:
+
+```
+$ dig -t aaaa powerdns.com @127.0.0.1 -p 5200 +short
+2001:888:2000:1d::2
+```
+
+Note that dnsdist offered us a prompt above, and on it we can get some
+statistics:
+
+```
+> listServers()
+#   Address                   State     Qps    Qlim    Queries   Drops Drate    Lat
+0   [2001:4860:4860::8888]:53    up     0.0       1          1       0 0.0      0.1
+1   [2001:4860:4860::8844]:53    up     0.0       1          0       0 0.0      0.0
+2   [2620:0:ccc::2]:53           up     0.0      10          0       0 0.0      0.0
+3   [2620:0:ccd::2]:53           up     0.0      10          0       0 0.0      0.0
+4   192.168.1.2:53               up     0.0       0          0       0 0.0      0.0
+All                                     0.0                  1       0             
+```
+
+Here we also see our configuration. 5 downstreamservers have been configured, of
+which the first 4 have a QPS limit (of 1, 1, 10 and 10 queries per second,
+respectively). The final serverhas no limit, which we can easily test:
+
+```
+$ for a in {0..1000}; do dig powerdns.com @127.0.0.1 -p 5200 +noall > /dev/null; done
+> listServers()
+#   Address                   State     Qps    Qlim    Queries   Drops Drate    Lat
+0   [2001:4860:4860::8888]:53    up     1.0       1          7       0 0.0      1.6
+1   [2001:4860:4860::8844]:53    up     1.0       1          6       0 0.0      0.6
+2   [2620:0:ccc::2]:53           up    10.3      10         64       0 0.0      2.4
+3   [2620:0:ccd::2]:53           up    10.3      10         63       0 0.0      2.4
+4   192.168.1.2:53               up   125.8       0        671       0 0.0      0.4
+All                                   145.0                811       0             
+```
+
+Note that the first 4 servers were all limited to near their configured QPS,
+and that our final server was taking up most of the traffic. No queries were
+dropped, and all servers remain up.
+
+To force a server down, try:
+
+```
+> getServer(0):setDown()
+> listServers()
+#   Address                   State     Qps    Qlim    Queries   Drops Drate    Lat
+0   [2001:4860:4860::8888]:53  DOWN     0.0       1          8       0 0.0      1.7
+...
+```
+
+The 'DOWN' in all caps means it was forced down. A lower case 'down'
+would've meant that dnsdist itself had concluded the server was down.
+Similarly, setUp() forces a server to be up, and setAuto() returns it to the
+default availability-probing.
+
+To change the QPS for a server:
+```
+> getServer(0):setQPS(1000)
+```
+
+Now for some cool stuff. Let's say we know we're getting a whole bunch of
+traffic for a domain used in DoS attacks, for example 'sh43354.cn'. We can
+do two things with this kind of traffic. Either we block it outright, like
+this:
+
+```
+> addDomainBlock("sh43354.cn.")
+```
+
+WARNING: This is not actually implemented yet, but `dnsdistconf.lua` shows
+the powerful but more typing way of achieving addDomainBlock!
+
+Or we configure a server dedicated to receiving the nasty stuff:
+
+```
+> abuseServer(newServer("192.168.1.3"))
+> abuseSMN("sh43353.cn.")
+```
+
+The wonderful thing about this last solution is that it can also be used for
+things where a domain might possibly be legit, but it is still causing load
+on the system and slowing down the internet for everyone. With such an abuse
+server, 'bad traffic' still gets a chance of an answer, but without
+impacting the rest of the world (too much).
+
+We can similarly add clients to the abuse server:
+
+```
+> abuseNM("192.168.12.0/24")
+> abuseNM("192.168.13.14")
+```
+
+More power
+----------
+More powerful things can be achieved by defining a function called
+`blockFilter()` in the configuration file, which can decide to drop traffic
+on any reason it wants.
+
+The default load balancing policy is called 'first', which means the first
+server that has not exceeded its QPS limit gets the traffic. If you don't
+like this default policy, you can create your own, like this for example:
+
+```
+counter=0
+servers=getServers()
+function roundrobin(remote, qname, qtype) 
+        counter=counter+1;
+        return servers[1+(counter % #servers)]
+end
+
+setServerPolicy(roundrobin)
+```
+
index 99426da2f9878a919df1b99b06694a333afc3693..db30d53ddf4fb225b49c8173d070ec764e0011d0 100644 (file)
@@ -22,6 +22,7 @@
 #include "ext/luawrapper/include/LuaContext.hpp"
 #include "sstuff.hh"
 #include "misc.hh"
+#include <mutex>
 #include "statbag.hh"
 #include <netinet/tcp.h>
 #include <boost/program_options.hpp>
@@ -34,6 +35,7 @@
 #include <readline/readline.h>
 #include <readline/history.h>
 #include "dnsname.hh"
+#include "dnswriter.hh"
 #include <fstream>
 
 #undef L
@@ -87,6 +89,87 @@ bool g_console;
    Send it to the first server that is not overloaded
 */
 
+/* Idea:
+   Multiple server groups, by default we load balance to the group with no name.
+   Each instance is either 'up', 'down' or 'auto', where 'auto' means that dnsdist 
+   determines if the instance is up or not. Auto should be the default and very very good.
+
+   In addition, to each instance you can attach a QPS object with rate & burst, which will optionally
+   limit the amount of queries we send there.
+
+   If all downstreams are over QPS, we pick the fastest server */
+
+struct StopWatch
+{
+  struct timespec d_start{0,0};
+  void start() {  
+    if(clock_gettime(CLOCK_MONOTONIC_RAW, &d_start) < 0)
+      unixDie("Getting timestamp");
+    
+  }
+  
+  double udiff() const {
+    struct timespec now;
+    if(clock_gettime(CLOCK_MONOTONIC_RAW, &now) < 0)
+      unixDie("Getting timestamp");
+    
+    return 1000000.0*(now.tv_sec - d_start.tv_sec) + (now.tv_nsec - d_start.tv_nsec)/1000.0;
+  }
+
+  double udiffAndSet() {
+    struct timespec now;
+    if(clock_gettime(CLOCK_MONOTONIC_RAW, &now) < 0)
+      unixDie("Getting timestamp");
+    
+    auto ret= 1000000.0*(now.tv_sec - d_start.tv_sec) + (now.tv_nsec - d_start.tv_nsec)/1000.0;
+    d_start = now;
+    return ret;
+  }
+
+};
+
+class QPSLimiter
+{
+public:
+  QPSLimiter()
+  {
+  }
+
+  QPSLimiter(unsigned int rate, unsigned int burst) : d_rate(rate), d_burst(burst), d_tokens(burst)
+  {
+    d_passthrough=false;
+    d_prev.start();
+  }
+
+  unsigned int getRate() const
+  {
+    return d_passthrough? 0 : d_rate;
+  }
+
+  bool check()
+  {
+    if(d_passthrough)
+      return true;
+    d_tokens += 1.0*d_rate * (d_prev.udiffAndSet()/1000000.0);
+
+    if(d_tokens > d_burst)
+      d_tokens = d_burst;
+
+    bool ret=false;
+    if(d_tokens >= 1.0) { // we need this because burst=1 is weird otherwise
+      ret=true;
+      --d_tokens;
+    }
+    return ret; 
+  }
+private:
+  bool d_passthrough{true};
+  unsigned int d_rate;
+  unsigned int d_burst;
+  double d_tokens;
+  StopWatch d_prev;
+};
+
 struct IDState
 {
   IDState() : origFD(-1) {}
@@ -101,6 +184,7 @@ struct IDState
   int origFD;  // set to <0 to indicate this state is empty
   uint16_t origID;
   ComboAddress origRemote;
+  StopWatch sentTime;
   atomic<uint64_t> age;
 };
 
@@ -112,6 +196,7 @@ struct DownstreamState
   int fd;            
   thread tid;
   ComboAddress remote;
+  QPSLimiter qps;
   vector<IDState> idStates;
   atomic<uint64_t> idOffset{0};
   atomic<uint64_t> sendErrors{0};
@@ -119,12 +204,28 @@ struct DownstreamState
   atomic<uint64_t> reuseds{0};
   atomic<uint64_t> queries{0};
   struct {
-    atomic<uint64_t> idOffset{0};
     atomic<uint64_t> sendErrors{0};
-    atomic<uint64_t> outstanding{0};
     atomic<uint64_t> reuseds{0};
     atomic<uint64_t> queries{0};
   } prev;
+  double queryLoad{0.0};
+  double dropRate{0.0};
+  double latencyUsec{0.0};
+  int order{1};
+  StopWatch sw;
+  enum class Availability { Up, Down, Auto} availability{Availability::Auto};
+  bool upStatus{false};
+  bool isUp() const
+  {
+    if(availability == Availability::Down)
+      return false;
+    if(availability == Availability::Up)
+      return true;
+    return upStatus;
+  }
+  void setUp() { availability = Availability::Up; }
+  void setDown() { availability = Availability::Down; }
+  void setAuto() { availability = Availability::Auto; }
 };
 
 vector<std::shared_ptr<DownstreamState> > g_dstates;
@@ -138,14 +239,14 @@ void* responderThread(std::shared_ptr<DownstreamState> state)
   int len;
   for(;;) {
     len = recv(state->fd, packet, sizeof(packet), 0);
-    if(len < 0)
+    if(len < (signed)sizeof(dnsheader))
       continue;
 
-    if(dh->id >= g_maxOutstanding)
+    if(dh->id >= state->idStates.size())
       continue;
 
     IDState* ids = &state->idStates[dh->id];
-    if(ids->origFD < 0)
+    if(ids->origFD < 0) // duplicate
       continue;
     else
       --state->outstanding;  // you'd think an attacker could game this, but we're using connected socket
@@ -153,7 +254,10 @@ void* responderThread(std::shared_ptr<DownstreamState> state)
     dh->id = ids->origID;
     sendto(ids->origFD, packet, len, 0, (struct sockaddr*)&ids->origRemote, ids->origRemote.getSocklen());
 
-    vinfolog("Got answer from %s, relayed to %s", state->remote.toStringWithPort(), ids->origRemote.toStringWithPort());
+    double udiff = ids->sentTime.udiff();
+    vinfolog("Got answer from %s, relayed to %s, took %f usec", state->remote.toStringWithPort(), ids->origRemote.toStringWithPort(), udiff);
+    
+    state->latencyUsec = (127.0 * state->latencyUsec / 128.0) + udiff/128.0;
 
     ids->origFD = -1;
   }
@@ -168,8 +272,8 @@ DownstreamState::DownstreamState(const ComboAddress& remote_)
   SConnect(fd, remote);
   
   idStates.resize(g_maxOutstanding);
-  
-  warnlog("Added downstream server %s", remote.toStringWithPort());
+  sw.start();
+  infolog("Added downstream server %s", remote.toStringWithPort());
 }
 
 
@@ -180,15 +284,34 @@ struct ClientState
   int tcpFD;
 };
 
+
+std::mutex g_luamutex;
 LuaContext g_lua;
 
-DownstreamState& getBestDownstream(const ComboAddress& remote, const DNSName& qname, uint16_t qtype)
+std::function<shared_ptr<DownstreamState>(const ComboAddress& remote, const DNSName& qname, uint16_t qtype)> g_policy;
+
+shared_ptr<DownstreamState> getLuaDownstream(const ComboAddress& remote, const DNSName& qname, uint16_t qtype)
 {
   //auto pickServer=g_lua.readVariable<LuaContext::LuaFunctionCaller<std::shared_ptr<DownstreamState> (void)> >("pickServer");
+
+  std::lock_guard<std::mutex> lock(g_luamutex);
+
   auto pickServer=g_lua.readVariable<std::function<std::shared_ptr<DownstreamState>(ComboAddress, DNSName, uint16_t)> >("pickServer");
-  return *pickServer(remote, DNSName(qname), qtype);
+  return pickServer(remote, DNSName(qname), qtype);
+}
+
+shared_ptr<DownstreamState> firstAvailable(const ComboAddress& remote, const DNSName& qname, uint16_t qtype)
+{
+  for(auto& d : g_dstates) {
+    if(d->isUp() && d->qps.check())
+      return d;
+  }
+  static int counter=0;
+  ++counter;
+  return g_dstates[counter % g_dstates.size()];
 }
 
+#if 0
 static void daemonize(void)
 {
   if(fork())
@@ -206,7 +329,11 @@ static void daemonize(void)
     close(i);
   }
 }
+#endif
 
+SuffixMatchNode g_abuseSMN;
+NetmaskGroup g_abuseNMG;
+shared_ptr<DownstreamState> g_abuseDSS;
 
 // listens to incoming queries, sends out to downstream servers, noting the intended return path 
 void* udpClientThread(ClientState* cs)
@@ -227,46 +354,66 @@ try
   if(g_vm.count("regex-drop"))
     re=new Regex(g_vm["regex-drop"].as<string>());
 
-  auto blockFilter=g_lua.readVariable<std::function<bool(ComboAddress, DNSName, uint16_t)> >("blockFilter");
+  typedef std::function<bool(ComboAddress, DNSName, uint16_t)> blockfilter_t;
+  blockfilter_t blockFilter = 0;
 
+  
+  {
+    std::lock_guard<std::mutex> lock(g_luamutex);
+    auto candidate = g_lua.readVariable<boost::optional<blockfilter_t> >("blockFilter");
+    if(candidate)
+      blockFilter = *candidate;
+  }
   for(;;) {
+    try {
     len = recvfrom(cs->udpFD, packet, sizeof(packet), 0, (struct sockaddr*) &remote, &socklen);
     if(len < (int)sizeof(struct dnsheader)) 
       continue;
 
-
     DNSName qname(packet, len, 12, false, &qtype);
-    if(blockFilter(remote, qname, qtype))
-      continue;
+    if(blockFilter)
+    {
+      std::lock_guard<std::mutex> lock(g_luamutex);
+      if(blockFilter(remote, qname, qtype))
+       continue;
+    }
+
     if(re && re->match(qname.toString())) {
       g_regexBlocks++;
       continue;
     }
    
+    DownstreamState* ss = 0;
+    if(g_abuseSMN.check(qname) || g_abuseNMG.match(remote)) {
+      ss = &*g_abuseDSS;
+    }
+    else
+      ss = g_policy(remote, qname, qtype).get();
+    ss->queries++;
 
-    DownstreamState& ss = getBestDownstream(remote, qname, qtype);
-    ss.queries++;
-
-    unsigned int idOffset = (ss.idOffset++) % g_maxOutstanding;
-    IDState* ids = &ss.idStates[idOffset];
+    unsigned int idOffset = (ss->idOffset++) % ss->idStates.size();
+    IDState* ids = &ss->idStates[idOffset];
 
     if(ids->origFD < 0) // if we are reusing, no change in outstanding
-      ss.outstanding++;
+      ss->outstanding++;
     else
-      ss.reuseds++;
+      ss->reuseds++;
 
     ids->origFD = cs->udpFD;
     ids->age = 0;
     ids->origID = dh->id;
     ids->origRemote = remote;
+    ids->sentTime.start();
 
     dh->id = idOffset;
     
-    len = send(ss.fd, packet, len, 0);
+    len = send(ss->fd, packet, len, 0);
     if(len < 0) 
-      ss.sendErrors++;
+      ss->sendErrors++;
 
-    vinfolog("Got query from %s, relayed to %s", remote.toStringWithPort(), ss.remote.toStringWithPort());
+    vinfolog("Got query from %s, relayed to %s", remote.toStringWithPort(), ss->remote.toStringWithPort());
+    }
+    catch(...){}
   }
   return 0;
 }
@@ -303,7 +450,7 @@ catch(...)
 
 int getTCPDownstream(DownstreamState** ds, const ComboAddress& remote, const std::string& qname, uint16_t qtype)
 {
-  *ds = &getBestDownstream(remote, qname, qtype);
+  *ds = g_policy(remote, qname, qtype).get();
   
   vinfolog("TCP connecting to downstream %s", (*ds)->remote.toStringWithPort());
   int sock = SSocket((*ds)->remote.sin4.sin_family, SOCK_STREAM, 0);
@@ -477,30 +624,61 @@ void* tcpAcceptorThread(void* p)
   return 0;
 }
 
+bool upCheck(const ComboAddress& remote)
+try
+{
+  vector<uint8_t> packet;
+  DNSPacketWriter dpw(packet, "a.root-servers.net.", QType::A);
+  dpw.getHeader()->rd=true;
+
+  Socket sock(remote.sin4.sin_family, SOCK_DGRAM);
+  sock.setNonBlocking();
+  sock.connect(remote);
+  sock.write((char*)&packet[0], packet.size());  
+  int ret=waitForRWData(sock.getHandle(), true, 1, 0);
+  if(ret < 0 || !ret) // error, timeout, both are down!
+    return false;
+  string reply;
+  ComboAddress dest=remote;
+  sock.recvFrom(reply, dest);
+
+  // XXX fixme do bunch of checking here etc 
+  return true;
+}
+catch(...)
+{
+  return false;
+}
 
 void* maintThread()
 {
-  int interval = 1;
-  if(!interval)
-    return 0;
-  uint32_t lastQueries=0;
+  int interval = 2;
 
   for(;;) {
     sleep(interval);
-    
+
     if(g_tcpclientthreads.d_queued > 1 && g_tcpclientthreads.d_numthreads < 10)
       g_tcpclientthreads.addTCPClientThread();
 
-    unsigned int outstanding=0;
-    uint64_t numQueries=0;
     for(auto& dss : g_dstates) {
-      vinfolog(" %s: %d outstanding, %f qps", dss->remote.toStringWithPort(), dss->outstanding.load(), ((dss->queries.load() - dss->prev.queries.load())/interval));
+      if(dss->availability==DownstreamState::Availability::Auto) {
+       bool newState=upCheck(dss->remote);
+       if(newState != dss->upStatus) {
+         cout<<endl;
+         warnlog("Marking downstream %s as '%s'", dss->remote.toStringWithPort(), newState ? "up" : "down");
+         cout<<"> ";
+         cout.flush();
+       }
+       dss->upStatus = newState;
+      }
 
-      outstanding += dss->outstanding;
+      auto delta = dss->sw.udiffAndSet()/1000000.0;
+      dss->queryLoad = 1.0*(dss->queries.load() - dss->prev.queries.load())/delta;
+      dss->dropRate = 1.0*(dss->reuseds.load() - dss->prev.reuseds.load())/delta;
       dss->prev.queries.store(dss->queries.load());
-      numQueries += dss->queries;
+      dss->prev.reuseds.store(dss->reuseds.load());
       
-      for(IDState& ids  : dss->idStates) {
+      for(IDState& ids  : dss->idStates) { // timeouts
         if(ids.origFD >=0 && ids.age++ > 2) {
           ids.age = 0;
           ids.origFD = -1;
@@ -509,9 +687,6 @@ void* maintThread()
         }          
       }
     }
-
-    vinfolog("%d outstanding queries, %d qps", outstanding, ((numQueries - lastQueries)/interval));
-    lastQueries=numQueries;
   }
   return 0;
 }
@@ -521,14 +696,42 @@ void* maintThread()
 void setupLua()
 {
   g_lua.writeFunction("newServer", 
-                     [](const std::string& address)
+                     [](const std::string& address, boost::optional<int> qps)
                      { 
                        auto ret=std::shared_ptr<DownstreamState>(new DownstreamState(ComboAddress(address, 53)));
                        ret->tid = move(thread(responderThread, ret));
+                       if(qps) {
+                         ret->qps=QPSLimiter(*qps, *qps);
+                       }
                        g_dstates.push_back(ret);
                        return ret;
                      } );
 
+  g_lua.writeFunction("newServer2", 
+                     [](std::unordered_map<std::string, std::string> vars)
+                     { 
+                       auto ret=std::shared_ptr<DownstreamState>(new DownstreamState(ComboAddress(vars["address"], 53)));
+
+                       ret->tid = move(thread(responderThread, ret));
+
+                       if(vars.count("qps")) {
+                         ret->qps=QPSLimiter(boost::lexical_cast<int>(vars["qps"]),boost::lexical_cast<int>(vars["qps"]));
+                       }
+
+                       if(vars.count("order")) {
+                         ret->order=boost::lexical_cast<int>(vars["order"]);
+                       }
+
+                       g_dstates.push_back(ret);
+                       std::stable_sort(g_dstates.begin(), g_dstates.end(), [](const decltype(ret)& a, const decltype(ret)& b) {
+                           return a->order < b->order;
+                         });
+                       return ret;
+                      
+
+                     } );
+
+
   g_lua.writeFunction("deleteServer", 
                      [](std::shared_ptr<DownstreamState> rem)
                      { 
@@ -536,13 +739,51 @@ void setupLua()
                      } );
 
 
+  g_lua.writeFunction("setServerPolicy", [](std::function<shared_ptr<DownstreamState>(const ComboAddress&, const DNSName&, uint16_t)> func) {
+      g_policy = func;
+    });
+
+  g_lua.writeFunction("unsetServerPolicy", []() {
+      g_policy = firstAvailable;
+    });
+
   g_lua.writeFunction("listServers", []() {  
+      try {
       string ret;
+      
+      boost::format fmt("%1$-3d %2% %|30t|%3$5s %|36t|%4$7.1f %|41t|%5$7d %|48t|%6$10d %|59t|%7$7d %|69t|%8$2.1f %|78t|%9$5.1f" );
+
+      cout << (fmt % "#" % "Address" % "State" % "Qps" % "Qlim" % "Queries" % "Drops" % "Drate" % "Lat") << endl;
+
+      uint64_t totQPS{0}, totQueries{0}, totDrops{0};
+      int counter=0;
       for(auto& s : g_dstates) {
        if(!ret.empty()) ret+="\n";
        ret+=s->remote.toStringWithPort() + " " + std::to_string(s->queries.load()) + " " + std::to_string(s->outstanding.load());
+
+       string status;
+       if(s->availability == DownstreamState::Availability::Up) 
+         status = "UP";
+       else if(s->availability == DownstreamState::Availability::Down) 
+         status = "DOWN";
+       else 
+         status = (s->upStatus ? "up" : "down");
+
+       cout<< (fmt % counter % s->remote.toStringWithPort() % 
+               status % 
+               s->queryLoad % s->qps.getRate() % s->queries.load() % s->reuseds.load() % (s->dropRate) % (s->latencyUsec/1000.0)) << endl;
+
+       totQPS += s->queryLoad;
+       totQueries += s->queries.load();
+       totDrops += s->reuseds.load();
+       ++counter;
       }
+      cout<< (fmt % "All" % "" % "" 
+               % 
+             (double)totQPS % "" % totQueries % totDrops % "" % "") << endl;
+
       return ret;
+      }catch(std::exception& e) { cerr<<e.what()<<endl; throw; }
     });
 
 
@@ -555,9 +796,19 @@ void setupLua()
       return ret;
     });
 
+  g_lua.writeFunction("getServer", [](int i) { return g_dstates[i]; });
+
   g_lua.registerFunction<string(DownstreamState::*)()>("tostring", [](const DownstreamState& s) { return s.remote.toStringWithPort(); });
-  
-  std::ifstream ifs("dnsdistconf.lua");
+  g_lua.registerFunction<bool(DownstreamState::*)()>("checkQPS", [](DownstreamState& s) { return s.qps.check(); });
+  g_lua.registerFunction<void(DownstreamState::*)(int)>("setQPS", [](DownstreamState& s, int lim) { s.qps = lim ? QPSLimiter(lim, lim) : QPSLimiter(); });
+  g_lua.registerFunction("isUp", &DownstreamState::isUp);
+  g_lua.registerFunction("setDown", &DownstreamState::setDown);
+  g_lua.registerFunction("setUp", &DownstreamState::setUp);
+  g_lua.registerFunction("setAuto", &DownstreamState::setAuto);
+  g_lua.registerMember("upstatus", &DownstreamState::upStatus);
+
+  std::ifstream ifs(g_vm["config"].as<string>());
+
   g_lua.registerFunction("tostring", &ComboAddress::toString);
 
   g_lua.registerFunction("isPartOf", &DNSName::isPartOf);
@@ -565,9 +816,30 @@ void setupLua()
   g_lua.writeFunction("newDNSName", [](const std::string& name) { return DNSName(name); });
   g_lua.writeFunction("newSuffixNode", []() { return SuffixMatchNode(); });
 
+
+
   g_lua.registerFunction("add",(void (SuffixMatchNode::*)(const DNSName&)) &SuffixMatchNode::add);
   g_lua.registerFunction("check",(bool (SuffixMatchNode::*)(const DNSName&) const) &SuffixMatchNode::check);
 
+
+
+  g_lua.writeFunction("newQPSLimiter", [](int rate, int burst) { return QPSLimiter(rate, burst); });
+  g_lua.registerFunction("check", &QPSLimiter::check);
+
+  g_lua.writeFunction("usleep", [](int usec) { usleep(usec); });
+
+  g_lua.writeFunction("abuseShuntSMN", [](const std::string& name) {
+      g_abuseSMN.add(DNSName(name));
+    });
+
+  g_lua.writeFunction("abuseShuntNM", [](const std::string& str) {
+      g_abuseNMG.addMask(str);
+    });
+
+  g_lua.writeFunction("abuseServer", [](shared_ptr<DownstreamState> dss) {
+      g_abuseDSS=dss;
+    });
+
   g_lua.executeCode(ifs);
 }
 
@@ -581,7 +853,8 @@ try
   po::options_description desc("Allowed options"), hidden, alloptions;
   desc.add_options()
     ("help,h", "produce help message")
-    ("daemon", po::value<bool>()->default_value(true), "run in background")
+    ("config", po::value<string>()->default_value("dnsdistconf.lua"), "Filename with our configuration")
+    //    ("daemon", po::value<bool>()->default_value(true), "run in background")
     ("local", po::value<vector<string> >(), "Listen on which addresses")
     ("max-outstanding", po::value<uint16_t>()->default_value(65535), "maximum outstanding queries per downstream")
     ("regex-drop", po::value<string>(), "If set, block queries matching this regex. Mind trailing dot!")
@@ -606,6 +879,7 @@ try
   g_verbose=g_vm.count("verbose");
   g_maxOutstanding = g_vm["max-outstanding"].as<uint16_t>();
 
+  g_policy = firstAvailable;
   setupLua();
 
   if(g_vm.count("remotes")) {
@@ -616,6 +890,7 @@ try
     }
   }
 
+  /*
   if(g_vm["daemon"].as<bool>())  {
     g_console=false;
     daemonize();
@@ -623,6 +898,15 @@ try
   else {
     vinfolog("Running in the foreground");
   }
+  */
+
+  for(auto& dss : g_dstates) {
+    if(dss->availability==DownstreamState::Availability::Auto) {
+      bool newState=upCheck(dss->remote);
+      warnlog("Marking downstream %s as '%s'", dss->remote.toStringWithPort(), newState ? "up" : "down");
+      dss->upStatus = newState;
+    }
+  }
 
   vector<string> locals;
   if(g_vm.count("local"))
@@ -668,15 +952,27 @@ try
   thread stattid(maintThread);
   stattid.detach();
 
+  set<string> dupper;
+  {
+    ifstream history(".history");
+    string line;
+    while(getline(history, line))
+      add_history(line.c_str());
+  }
+  ofstream history(".history");
+  string lastline;
   for(;;) {
     char* sline = readline("> ");
     if(!sline)
       break;
 
     string line(sline);
-    if(!line.empty())
+    if(!line.empty() && line != lastline) {
       add_history(sline);
-
+      history << sline <<endl;
+      history.flush();
+    }
+    lastline=line;
     free(sline);
 
     
@@ -684,6 +980,7 @@ try
       break;
 
     try {
+      std::lock_guard<std::mutex> lock(g_luamutex);
       g_lua.executeCode(line);
     }
     catch(std::exception& e) {
index 37dc8255ab8d90c44cee632bdf12662f3063c67b..421e849753f9cb93b62b591e82db6a8e62818229 100644 (file)
@@ -1,44 +1,45 @@
+-- define the good servers
+newServer("8.8.8.8", 2)  -- 2 qps
+newServer("8.8.4.4", 2) 
+newServer("208.67.222.222", 1)
+newServer("208.67.220.220", 1) 
+newServer("2001:4860:4860::8888", 1)
+newServer("2001:4860:4860::8844",1) 
+newServer("2620:0:ccc::2", 10) 
+newServer("2620:0:ccd::2", 10) 
+newServer2{address="192.168.1.2", qps=1000, order=2}
 
+newServer2{address="127.0.0.1:5300", order=3}
+abuse=newServer2{address="192.168.1.30:5300", order=4}
+
+abuseServer(abuse)
+abuseShuntSMN("ezdns.it.")
+abuseShuntSMN("xxx.")
+abuseShuntNM("192.168.1.0/24")
 
--- define the good servers
-good={newServer("8.8.8.8"), newServer("8.8.4.4"), newServer("208.67.222.222"), newServer("208.67.220.220")}
 
--- this is where we send bad traffic
-abuse={newServer("127.0.0.1:5300")}
 
 
+
+block=newDNSName("powerdns.org.")
 -- called before we distribute a question
 function blockFilter(remote, qname, qtype)
---      print("Called about ",remote:tostring(), qname, qtype)
-
-        if(qname == "powerdns.org.")
+        if(qname:isPartOf(block))
         then
-               print("Blocking powerdns.org")
+               print("Blocking *.powerdns.org")
                return true
         end
         return false
 end
 
 counter=0
-
-block=newSuffixNode()
-block:add(newDNSName("ezdns.it."))
-block:add(newDNSName("xxx."))
+servers=getServers()
 
 -- called to pick a downstream server
-function pickServer(remote, qname, qtype) 
-        local servers
-                if(block:check(qname))
-                then 
-                   print("Sending to abuse pool: ",qname:tostring())   
-           servers=abuse 
-        else
-               servers=good
-        end
-
+function roundrobin(remote, qname, qtype) 
         counter=counter+1;
-        return servers[1 + (counter % #servers)]
+        return servers[1+(counter % #servers)]
 end
 
-
+-- setServerPolicy("roundrobin")