]> granicus.if.org Git - pdns/commitdiff
delaypipe, plus addDelay() command
authorbert hubert <bert.hubert@netherlabs.nl>
Fri, 5 Jun 2015 14:52:50 +0000 (16:52 +0200)
committerbert hubert <bert.hubert@netherlabs.nl>
Tue, 9 Jun 2015 11:14:26 +0000 (13:14 +0200)
pdns/Makefile.am
pdns/delaypipe.cc [new file with mode: 0644]
pdns/delaypipe.hh [new file with mode: 0644]
pdns/dnsdist-lua.cc
pdns/dnsdist.cc
pdns/dnsdist.hh
pdns/dnsrulactions.hh
pdns/test-delaypipe_hh.cc [new file with mode: 0644]

index 7691bc4b43ec562ec13a861588d50038a926ba75..2a51ea565bc4c93f1580513ed8379ca8900d113d 100644 (file)
@@ -995,6 +995,7 @@ testrunner_SOURCES = \
        test-base32_cc.cc \
        test-base64_cc.cc \
        test-bindparser_cc.cc \
+       test-delaypipe_hh.cc \
        test-distributor_hh.cc \
        test-dns_random_hh.cc \
        test-dnsname_cc.cc \
diff --git a/pdns/delaypipe.cc b/pdns/delaypipe.cc
new file mode 100644 (file)
index 0000000..d0798a5
--- /dev/null
@@ -0,0 +1,127 @@
+#include "delaypipe.hh"
+#include "misc.hh"
+#include <thread>
+
+template<class T>
+ObjectPipe<T>::ObjectPipe()
+{
+  if(pipe(d_fds))
+    unixDie("pipe");
+}
+
+template<class T>
+ObjectPipe<T>::~ObjectPipe()
+{
+  ::close(d_fds[0]);
+  if(d_fds[1] >= 0)
+    ::close(d_fds[1]);
+}
+
+template<class T>
+void ObjectPipe<T>::close()
+{
+  if(d_fds[1] < 0)
+    return;
+  ::close(d_fds[1]); // the writing side
+  d_fds[1]=-1;
+}
+
+template<class T>
+void ObjectPipe<T>::write(T& t)
+{
+  auto ptr = new T(t);
+  if(::write(d_fds[1], &ptr, sizeof(ptr)) != sizeof(ptr))
+    unixDie("write");
+}
+
+template<class T>
+bool ObjectPipe<T>::read(T* t)
+{
+  T* ptr;
+  int ret = ::read(d_fds[0], &ptr, sizeof(ptr));
+
+  if(ret < 0)
+    unixDie("read");
+  if(ret==0)
+    return false;
+  *t=*ptr;
+  delete ptr;
+  return true;
+}
+
+template<class T>
+int ObjectPipe<T>::readTimeout(T* t, int msec)
+{
+  T* ptr;
+  int ret = waitForData(d_fds[0], 0, 1000*msec);
+  if(ret <0)
+    unixDie("waiting for data in object pipe");
+  if(ret == 0) 
+    return -1;
+
+  ret = ::read(d_fds[0], &ptr, sizeof(ptr));
+
+  if(ret < 0)
+    unixDie("read");
+  if(ret==0)
+    return false;
+  *t=*ptr;
+  delete ptr;
+  return 1;
+}
+
+
+template<class T>
+DelayPipe<T>::DelayPipe() : d_thread(&DelayPipe<T>::worker, this)
+{
+}
+
+template<class T>
+void DelayPipe<T>::submit(T& t, int msec)
+{
+  struct timespec now;
+  clock_gettime(CLOCK_MONOTONIC, &now);
+  now.tv_nsec += msec*1e6;
+  if(now.tv_nsec > 1e9) {
+    now.tv_sec++;
+    now.tv_nsec-=1e9;
+  }
+  Combo c{t, now};
+  d_pipe.write(c);
+}
+
+template<class T>
+DelayPipe<T>::~DelayPipe()
+{
+  d_pipe.close();
+  d_thread.join();
+}
+
+template<class T>
+void DelayPipe<T>::worker()
+{
+  Combo c;
+  for(;;) {
+    int ret = d_pipe.readTimeout(&c, 10); // XXXX NEEDS TO BE DYNAMIC
+    if(ret > 0) {  // we got an object
+      d_work.insert(make_pair(c.when, c.what));
+    }
+    else if(ret==0) { // timeout
+
+      break;
+    }
+    else {
+      //      cout<<"Got a timeout"<<endl;
+    }
+    struct timespec now;
+    clock_gettime(CLOCK_MONOTONIC, &now);
+    tscomp cmp;
+    for(auto iter = d_work.begin() ; iter != d_work.end(); ) { // do the needful
+      if(cmp(iter->first, now)) {
+       iter->second();
+       d_work.erase(iter++);
+      }
+      else break;
+    }
+  }
+}
diff --git a/pdns/delaypipe.hh b/pdns/delaypipe.hh
new file mode 100644 (file)
index 0000000..cfc72e7
--- /dev/null
@@ -0,0 +1,59 @@
+#pragma once
+#include <map>
+#include <time.h>
+#include <thread>
+
+/**
+   General idea: many threads submit work to this class, but only one executes it. The work should therefore be entirely trivial.
+   The implementatin is that submitter threads create an object that represents the work, and it gets sent over a pipe 
+   to the worker thread.
+
+   The worker thread meanwhile listens on this pipe (non-blocking), with a delay set to the next object that needs to be executed.
+   If meanwhile new work comes in, all objects who's time has come are executed, a new sleep time is calculated.
+*/
+
+
+/* ObjectPipe facilitates the type-safe passing of types over a pipe */
+
+template<class T>
+class ObjectPipe
+{
+public:
+  ObjectPipe();
+  ~ObjectPipe();
+  void write(T& t);
+  bool read(T* t); // returns false on EOF
+  int readTimeout(T* t, int msec); // -1 is timeout, 0 is no data, 1 is data
+  void close(); 
+private:
+  int d_fds[2];
+};
+
+template<class T>
+class DelayPipe
+{
+public:
+  DelayPipe();
+  ~DelayPipe();
+  void submit(T& t, int msec);
+
+private:
+  std::thread d_thread;
+  void worker();
+  struct Combo
+  {
+    T what;
+    struct timespec when;
+  };
+
+  ObjectPipe<Combo> d_pipe;
+  struct tscomp {
+    bool operator()(const struct timespec& a, const struct timespec& b) const
+    {
+      return std::tie(a.tv_sec, a.tv_nsec) < std::tie(b.tv_sec, b.tv_nsec);
+    }
+  };
+  std::multimap<struct timespec, T, tscomp> d_work;
+};
+
+#include "delaypipe.cc"
index b27d85f36cf667b31e8ae7e55ad8afbc16e0ca5b..48b17ffc22087378271c101ad90ddd12e22876e9 100644 (file)
@@ -374,6 +374,13 @@ vector<std::function<void(void)>> setupLua(bool client, const std::string& confi
        });
     });
    
+  g_lua.writeFunction("addDelay", [](boost::variant<string,vector<pair<int, string>> > var, int msec) {
+      auto rule = makeRule(var);
+      g_rulactions.modify([msec,rule](decltype(g_rulactions)::value_type& rulactions) {
+         rulactions.push_back({rule, 
+               std::make_shared<DelayAction>(msec)});
+       });
+    });
 
 
   g_lua.writeFunction("showRules", []() {
index b72e55e2e4a74de693afbeb957684f0d3ab4b80d..8fb6d84d19e6d863014a2fdeeca5aff42f83bf45 100644 (file)
@@ -32,7 +32,7 @@
 #include "dnswriter.hh"
 #include "base64.hh"
 #include <fstream>
-
+#include "delaypipe.hh"
 #include <unistd.h>
 #include "sodcrypto.hh"
 #include "dnsrulactions.hh"
@@ -118,6 +118,19 @@ catch(...)
   g_stats.truncFail++;
 }
 
+struct DelayedPacket
+{
+  int fd;
+  string packet;
+  ComboAddress destination;
+  void operator()()
+  {
+    sendto(fd, packet.c_str(), packet.size(), 0, (struct sockaddr*)&destination, destination.getSocklen());
+  }
+};
+
+DelayPipe<DelayedPacket> g_delay;
+
 // listens on a dedicated socket, lobs answers from downstream servers to original requestors
 void* responderThread(std::shared_ptr<DownstreamState> state)
 {
@@ -145,10 +158,17 @@ void* responderThread(std::shared_ptr<DownstreamState> state)
 
     dh->id = ids->origID;
     g_stats.responses++;
-    if(ids->origDest.sin4.sin_family == 0)
-      sendto(ids->origFD, packet, len, 0, (struct sockaddr*)&ids->origRemote, ids->origRemote.getSocklen());
-    else
-      sendfromto(ids->origFD, packet, len, 0, ids->origDest, ids->origRemote);
+
+    if(ids->delayMsec) {
+      DelayedPacket dp{ids->origFD, string(packet,len), ids->origRemote};
+      g_delay.submit(dp, ids->delayMsec);
+    }
+    else {
+      if(ids->origDest.sin4.sin_family == 0)
+       sendto(ids->origFD, packet, len, 0, (struct sockaddr*)&ids->origRemote, ids->origRemote.getSocklen());
+      else
+       sendfromto(ids->origFD, packet, len, 0, ids->origDest, ids->origRemote);
+    }
     double udiff = ids->sentTime.udiff();
     vinfolog("Got answer from %s, relayed to %s, took %f usec", state->remote.toStringWithPort(), ids->origRemote.toStringWithPort(), udiff);
 
@@ -181,6 +201,12 @@ void* responderThread(std::shared_ptr<DownstreamState> state)
   return 0;
 }
 
+bool operator<(const struct timespec&a, const struct timespec& b) 
+{ 
+  return std::tie(a.tv_sec, a.tv_nsec) < std::tie(b.tv_sec, b.tv_nsec); 
+}
+
+
 DownstreamState::DownstreamState(const ComboAddress& remote_)
 {
   remote = remote_;
@@ -412,6 +438,7 @@ try
          }
        }
       }
+      int delayMsec=0;
       switch(action) {
       case DNSAction::Action::Drop:
        g_stats.ruleDrop++;
@@ -430,6 +457,10 @@ try
       case DNSAction::Action::HeaderModify:
        dh->qr=true;
        break;
+
+      case DNSAction::Action::Delay:
+       delayMsec = atoi(ruleresult.c_str()); // sorry
+       break;
       case DNSAction::Action::Allow:
       case DNSAction::Action::None:
        break;
@@ -480,6 +511,7 @@ try
       ids->qname = qname;
       ids->qtype = qtype;
       ids->origDest.sin4.sin_family=0;
+      ids->delayMsec = delayMsec;
       HarvestDestinationAddress(&msgh, &ids->origDest);
       
       dh->id = idOffset;
@@ -636,6 +668,10 @@ try
         // e is the exception that was thrown from inside the lambda
         response+= string(e.what());
       }
+      catch(const PDNSException& e) {
+        // e is the exception that was thrown from inside the lambda
+        response += string(e.reason);
+      }
     }
     response = sodEncryptSym(response, g_key, ours);
     putMsgLen(fd, response.length());
@@ -803,6 +839,10 @@ void doConsole()
         // e is the exception that was thrown from inside the lambda
         std::cerr << e.what() << std::endl;      
       }
+      catch(const PDNSException& e) {
+        // e is the exception that was thrown from inside the lambda
+        std::cerr << e.reason << std::endl;      
+      }
     }
     catch(const std::exception& e) {
       // e is the exception that was thrown from inside the lambda
index 20a3b13310b8b8ed7fcaab1cb9d41b3f8326dc42..fce1b0b462ee18fae226261ebe52957cb45a5fee 100644 (file)
@@ -143,13 +143,14 @@ private:
 
 struct IDState
 {
-  IDState() : origFD(-1) { origDest.sin4.sin_family = 0;}
+  IDState() : origFD(-1), delayMsec(0) { origDest.sin4.sin_family = 0;}
   IDState(const IDState& orig)
   {
     origFD = orig.origFD;
     origID = orig.origID;
     origRemote = orig.origRemote;
     origDest = orig.origDest;
+    delayMsec = orig.delayMsec;
     age.store(orig.age.load());
   }
 
@@ -162,6 +163,7 @@ struct IDState
   std::atomic<uint16_t> age;                                  // 4
   uint16_t qtype;                                             // 2
   uint16_t origID;                                            // 2
+  int delayMsec;
 };
 
 struct Rings {
@@ -284,7 +286,7 @@ public:
 class DNSAction
 {
 public:
-  enum class Action { Drop, Nxdomain, Spoof, Allow, HeaderModify, Pool, None};
+  enum class Action { Drop, Nxdomain, Spoof, Allow, HeaderModify, Pool, Delay, None};
   virtual Action operator()(const ComboAddress& remote, const DNSName& qname, uint16_t qtype, dnsheader* dh, int len, string* ruleresult) const =0;
   virtual string toString() const = 0;
 };
index 49d11690bcf04accb0ee20092f789858de9411f2..b5a7a505cbc825b84e24885405ab9cf655f2b5b6 100644 (file)
@@ -110,6 +110,25 @@ private:
   QPSLimiter d_qps;
 };
 
+class DelayAction : public DNSAction
+{
+public:
+  DelayAction(int msec) : d_msec(msec)
+  {}
+  DNSAction::Action operator()(const ComboAddress& remote, const DNSName& qname, uint16_t qtype, dnsheader* dh, int len, string* ruleresult) const override
+  {
+    *ruleresult=std::to_string(d_msec);
+    return Action::Delay;
+  }
+  string toString() const override
+  {
+    return "delay by "+std::to_string(d_msec)+ " msec";
+  }
+private:
+  int d_msec;
+};
+
+
 class PoolAction : public DNSAction
 {
 public:
diff --git a/pdns/test-delaypipe_hh.cc b/pdns/test-delaypipe_hh.cc
new file mode 100644 (file)
index 0000000..94287a9
--- /dev/null
@@ -0,0 +1,59 @@
+#define BOOST_TEST_DYN_LINK
+#define BOOST_TEST_NO_MAIN
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+#include "delaypipe.hh"
+#include <boost/test/unit_test.hpp>
+
+BOOST_AUTO_TEST_SUITE(test_delaypipe_hh);
+
+BOOST_AUTO_TEST_CASE(test_object_pipe) {
+  ObjectPipe<int> op;
+  for(int n=0; n < 100; ++n)
+    op.write(n);
+
+  int i;
+  for(int n=0; n < 100; ++n) {
+    bool res=op.read(&i);
+    BOOST_CHECK_EQUAL(res, true);
+    BOOST_CHECK_EQUAL(n, i);
+  }
+
+  op.close();
+  BOOST_CHECK_EQUAL(op.read(&i), false);
+
+};
+
+int done=0;
+BOOST_AUTO_TEST_CASE(test_delay_pipe) {
+  
+  struct Work
+  {
+    int i;
+    void operator()()
+    {
+      ++done;
+    }
+  };
+  DelayPipe<Work> dp;
+  int n;
+  for(n=0; n < 5; ++n) {
+    Work w{n};
+    dp.submit(w, 500);
+  }
+  BOOST_CHECK_EQUAL(done, 0);
+
+  for(; n < 10; ++n) {
+    Work w{n};
+    dp.submit(w, 1000);
+  }
+  sleep(1);
+  BOOST_CHECK_EQUAL(done, 5);
+  sleep(1);
+  BOOST_CHECK_EQUAL(done, n);
+
+};
+
+
+BOOST_AUTO_TEST_SUITE_END();