From 7b3865cdb4d928ef9d9fe47287b5838329cf0aa4 Mon Sep 17 00:00:00 2001 From: bert hubert Date: Fri, 5 Jun 2015 16:52:50 +0200 Subject: [PATCH] delaypipe, plus addDelay() command --- pdns/Makefile.am | 1 + pdns/delaypipe.cc | 127 ++++++++++++++++++++++++++++++++++++++ pdns/delaypipe.hh | 59 ++++++++++++++++++ pdns/dnsdist-lua.cc | 7 +++ pdns/dnsdist.cc | 50 +++++++++++++-- pdns/dnsdist.hh | 6 +- pdns/dnsrulactions.hh | 19 ++++++ pdns/test-delaypipe_hh.cc | 59 ++++++++++++++++++ 8 files changed, 321 insertions(+), 7 deletions(-) create mode 100644 pdns/delaypipe.cc create mode 100644 pdns/delaypipe.hh create mode 100644 pdns/test-delaypipe_hh.cc diff --git a/pdns/Makefile.am b/pdns/Makefile.am index 7691bc4b4..2a51ea565 100644 --- a/pdns/Makefile.am +++ b/pdns/Makefile.am @@ -995,6 +995,7 @@ testrunner_SOURCES = \ test-base32_cc.cc \ test-base64_cc.cc \ test-bindparser_cc.cc \ + test-delaypipe_hh.cc \ test-distributor_hh.cc \ test-dns_random_hh.cc \ test-dnsname_cc.cc \ diff --git a/pdns/delaypipe.cc b/pdns/delaypipe.cc new file mode 100644 index 000000000..d0798a532 --- /dev/null +++ b/pdns/delaypipe.cc @@ -0,0 +1,127 @@ +#include "delaypipe.hh" +#include "misc.hh" +#include + +template +ObjectPipe::ObjectPipe() +{ + if(pipe(d_fds)) + unixDie("pipe"); +} + +template +ObjectPipe::~ObjectPipe() +{ + ::close(d_fds[0]); + if(d_fds[1] >= 0) + ::close(d_fds[1]); +} + +template +void ObjectPipe::close() +{ + if(d_fds[1] < 0) + return; + ::close(d_fds[1]); // the writing side + d_fds[1]=-1; +} + +template +void ObjectPipe::write(T& t) +{ + auto ptr = new T(t); + if(::write(d_fds[1], &ptr, sizeof(ptr)) != sizeof(ptr)) + unixDie("write"); +} + +template +bool ObjectPipe::read(T* t) +{ + T* ptr; + int ret = ::read(d_fds[0], &ptr, sizeof(ptr)); + + if(ret < 0) + unixDie("read"); + if(ret==0) + return false; + *t=*ptr; + delete ptr; + return true; +} + +template +int ObjectPipe::readTimeout(T* t, int msec) +{ + T* ptr; + int ret = waitForData(d_fds[0], 0, 1000*msec); + if(ret <0) + unixDie("waiting for data in object pipe"); + if(ret == 0) + return -1; + + ret = ::read(d_fds[0], &ptr, sizeof(ptr)); + + if(ret < 0) + unixDie("read"); + if(ret==0) + return false; + *t=*ptr; + delete ptr; + return 1; +} + + +template +DelayPipe::DelayPipe() : d_thread(&DelayPipe::worker, this) +{ +} + +template +void DelayPipe::submit(T& t, int msec) +{ + struct timespec now; + clock_gettime(CLOCK_MONOTONIC, &now); + now.tv_nsec += msec*1e6; + if(now.tv_nsec > 1e9) { + now.tv_sec++; + now.tv_nsec-=1e9; + } + Combo c{t, now}; + d_pipe.write(c); +} + +template +DelayPipe::~DelayPipe() +{ + d_pipe.close(); + d_thread.join(); +} + +template +void DelayPipe::worker() +{ + Combo c; + for(;;) { + int ret = d_pipe.readTimeout(&c, 10); // XXXX NEEDS TO BE DYNAMIC + if(ret > 0) { // we got an object + d_work.insert(make_pair(c.when, c.what)); + } + else if(ret==0) { // timeout + + break; + } + else { + // cout<<"Got a timeout"<first, now)) { + iter->second(); + d_work.erase(iter++); + } + else break; + } + } +} diff --git a/pdns/delaypipe.hh b/pdns/delaypipe.hh new file mode 100644 index 000000000..cfc72e74b --- /dev/null +++ b/pdns/delaypipe.hh @@ -0,0 +1,59 @@ +#pragma once +#include +#include +#include + +/** + General idea: many threads submit work to this class, but only one executes it. The work should therefore be entirely trivial. + The implementatin is that submitter threads create an object that represents the work, and it gets sent over a pipe + to the worker thread. + + The worker thread meanwhile listens on this pipe (non-blocking), with a delay set to the next object that needs to be executed. + If meanwhile new work comes in, all objects who's time has come are executed, a new sleep time is calculated. +*/ + + +/* ObjectPipe facilitates the type-safe passing of types over a pipe */ + +template +class ObjectPipe +{ +public: + ObjectPipe(); + ~ObjectPipe(); + void write(T& t); + bool read(T* t); // returns false on EOF + int readTimeout(T* t, int msec); // -1 is timeout, 0 is no data, 1 is data + void close(); +private: + int d_fds[2]; +}; + +template +class DelayPipe +{ +public: + DelayPipe(); + ~DelayPipe(); + void submit(T& t, int msec); + +private: + std::thread d_thread; + void worker(); + struct Combo + { + T what; + struct timespec when; + }; + + ObjectPipe d_pipe; + struct tscomp { + bool operator()(const struct timespec& a, const struct timespec& b) const + { + return std::tie(a.tv_sec, a.tv_nsec) < std::tie(b.tv_sec, b.tv_nsec); + } + }; + std::multimap d_work; +}; + +#include "delaypipe.cc" diff --git a/pdns/dnsdist-lua.cc b/pdns/dnsdist-lua.cc index b27d85f36..48b17ffc2 100644 --- a/pdns/dnsdist-lua.cc +++ b/pdns/dnsdist-lua.cc @@ -374,6 +374,13 @@ vector> setupLua(bool client, const std::string& confi }); }); + g_lua.writeFunction("addDelay", [](boost::variant> > var, int msec) { + auto rule = makeRule(var); + g_rulactions.modify([msec,rule](decltype(g_rulactions)::value_type& rulactions) { + rulactions.push_back({rule, + std::make_shared(msec)}); + }); + }); g_lua.writeFunction("showRules", []() { diff --git a/pdns/dnsdist.cc b/pdns/dnsdist.cc index b72e55e2e..8fb6d84d1 100644 --- a/pdns/dnsdist.cc +++ b/pdns/dnsdist.cc @@ -32,7 +32,7 @@ #include "dnswriter.hh" #include "base64.hh" #include - +#include "delaypipe.hh" #include #include "sodcrypto.hh" #include "dnsrulactions.hh" @@ -118,6 +118,19 @@ catch(...) g_stats.truncFail++; } +struct DelayedPacket +{ + int fd; + string packet; + ComboAddress destination; + void operator()() + { + sendto(fd, packet.c_str(), packet.size(), 0, (struct sockaddr*)&destination, destination.getSocklen()); + } +}; + +DelayPipe g_delay; + // listens on a dedicated socket, lobs answers from downstream servers to original requestors void* responderThread(std::shared_ptr state) { @@ -145,10 +158,17 @@ void* responderThread(std::shared_ptr state) dh->id = ids->origID; g_stats.responses++; - if(ids->origDest.sin4.sin_family == 0) - sendto(ids->origFD, packet, len, 0, (struct sockaddr*)&ids->origRemote, ids->origRemote.getSocklen()); - else - sendfromto(ids->origFD, packet, len, 0, ids->origDest, ids->origRemote); + + if(ids->delayMsec) { + DelayedPacket dp{ids->origFD, string(packet,len), ids->origRemote}; + g_delay.submit(dp, ids->delayMsec); + } + else { + if(ids->origDest.sin4.sin_family == 0) + sendto(ids->origFD, packet, len, 0, (struct sockaddr*)&ids->origRemote, ids->origRemote.getSocklen()); + else + sendfromto(ids->origFD, packet, len, 0, ids->origDest, ids->origRemote); + } double udiff = ids->sentTime.udiff(); vinfolog("Got answer from %s, relayed to %s, took %f usec", state->remote.toStringWithPort(), ids->origRemote.toStringWithPort(), udiff); @@ -181,6 +201,12 @@ void* responderThread(std::shared_ptr state) return 0; } +bool operator<(const struct timespec&a, const struct timespec& b) +{ + return std::tie(a.tv_sec, a.tv_nsec) < std::tie(b.tv_sec, b.tv_nsec); +} + + DownstreamState::DownstreamState(const ComboAddress& remote_) { remote = remote_; @@ -412,6 +438,7 @@ try } } } + int delayMsec=0; switch(action) { case DNSAction::Action::Drop: g_stats.ruleDrop++; @@ -430,6 +457,10 @@ try case DNSAction::Action::HeaderModify: dh->qr=true; break; + + case DNSAction::Action::Delay: + delayMsec = atoi(ruleresult.c_str()); // sorry + break; case DNSAction::Action::Allow: case DNSAction::Action::None: break; @@ -480,6 +511,7 @@ try ids->qname = qname; ids->qtype = qtype; ids->origDest.sin4.sin_family=0; + ids->delayMsec = delayMsec; HarvestDestinationAddress(&msgh, &ids->origDest); dh->id = idOffset; @@ -636,6 +668,10 @@ try // e is the exception that was thrown from inside the lambda response+= string(e.what()); } + catch(const PDNSException& e) { + // e is the exception that was thrown from inside the lambda + response += string(e.reason); + } } response = sodEncryptSym(response, g_key, ours); putMsgLen(fd, response.length()); @@ -803,6 +839,10 @@ void doConsole() // e is the exception that was thrown from inside the lambda std::cerr << e.what() << std::endl; } + catch(const PDNSException& e) { + // e is the exception that was thrown from inside the lambda + std::cerr << e.reason << std::endl; + } } catch(const std::exception& e) { // e is the exception that was thrown from inside the lambda diff --git a/pdns/dnsdist.hh b/pdns/dnsdist.hh index 20a3b1331..fce1b0b46 100644 --- a/pdns/dnsdist.hh +++ b/pdns/dnsdist.hh @@ -143,13 +143,14 @@ private: struct IDState { - IDState() : origFD(-1) { origDest.sin4.sin_family = 0;} + IDState() : origFD(-1), delayMsec(0) { origDest.sin4.sin_family = 0;} IDState(const IDState& orig) { origFD = orig.origFD; origID = orig.origID; origRemote = orig.origRemote; origDest = orig.origDest; + delayMsec = orig.delayMsec; age.store(orig.age.load()); } @@ -162,6 +163,7 @@ struct IDState std::atomic age; // 4 uint16_t qtype; // 2 uint16_t origID; // 2 + int delayMsec; }; struct Rings { @@ -284,7 +286,7 @@ public: class DNSAction { public: - enum class Action { Drop, Nxdomain, Spoof, Allow, HeaderModify, Pool, None}; + enum class Action { Drop, Nxdomain, Spoof, Allow, HeaderModify, Pool, Delay, None}; virtual Action operator()(const ComboAddress& remote, const DNSName& qname, uint16_t qtype, dnsheader* dh, int len, string* ruleresult) const =0; virtual string toString() const = 0; }; diff --git a/pdns/dnsrulactions.hh b/pdns/dnsrulactions.hh index 49d11690b..b5a7a505c 100644 --- a/pdns/dnsrulactions.hh +++ b/pdns/dnsrulactions.hh @@ -110,6 +110,25 @@ private: QPSLimiter d_qps; }; +class DelayAction : public DNSAction +{ +public: + DelayAction(int msec) : d_msec(msec) + {} + DNSAction::Action operator()(const ComboAddress& remote, const DNSName& qname, uint16_t qtype, dnsheader* dh, int len, string* ruleresult) const override + { + *ruleresult=std::to_string(d_msec); + return Action::Delay; + } + string toString() const override + { + return "delay by "+std::to_string(d_msec)+ " msec"; + } +private: + int d_msec; +}; + + class PoolAction : public DNSAction { public: diff --git a/pdns/test-delaypipe_hh.cc b/pdns/test-delaypipe_hh.cc new file mode 100644 index 000000000..94287a935 --- /dev/null +++ b/pdns/test-delaypipe_hh.cc @@ -0,0 +1,59 @@ +#define BOOST_TEST_DYN_LINK +#define BOOST_TEST_NO_MAIN +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif +#include "delaypipe.hh" +#include + +BOOST_AUTO_TEST_SUITE(test_delaypipe_hh); + +BOOST_AUTO_TEST_CASE(test_object_pipe) { + ObjectPipe op; + for(int n=0; n < 100; ++n) + op.write(n); + + int i; + for(int n=0; n < 100; ++n) { + bool res=op.read(&i); + BOOST_CHECK_EQUAL(res, true); + BOOST_CHECK_EQUAL(n, i); + } + + op.close(); + BOOST_CHECK_EQUAL(op.read(&i), false); + +}; + +int done=0; +BOOST_AUTO_TEST_CASE(test_delay_pipe) { + + struct Work + { + int i; + void operator()() + { + ++done; + } + }; + DelayPipe dp; + int n; + for(n=0; n < 5; ++n) { + Work w{n}; + dp.submit(w, 500); + } + BOOST_CHECK_EQUAL(done, 0); + + for(; n < 10; ++n) { + Work w{n}; + dp.submit(w, 1000); + } + sleep(1); + BOOST_CHECK_EQUAL(done, 5); + sleep(1); + BOOST_CHECK_EQUAL(done, n); + +}; + + +BOOST_AUTO_TEST_SUITE_END(); -- 2.40.0