test-base32_cc.cc \
test-base64_cc.cc \
test-bindparser_cc.cc \
+ test-delaypipe_hh.cc \
test-distributor_hh.cc \
test-dns_random_hh.cc \
test-dnsname_cc.cc \
--- /dev/null
+#include "delaypipe.hh"
+#include "misc.hh"
+#include <thread>
+
+template<class T>
+ObjectPipe<T>::ObjectPipe()
+{
+ if(pipe(d_fds))
+ unixDie("pipe");
+}
+
+template<class T>
+ObjectPipe<T>::~ObjectPipe()
+{
+ ::close(d_fds[0]);
+ if(d_fds[1] >= 0)
+ ::close(d_fds[1]);
+}
+
+template<class T>
+void ObjectPipe<T>::close()
+{
+ if(d_fds[1] < 0)
+ return;
+ ::close(d_fds[1]); // the writing side
+ d_fds[1]=-1;
+}
+
+template<class T>
+void ObjectPipe<T>::write(T& t)
+{
+ auto ptr = new T(t);
+ if(::write(d_fds[1], &ptr, sizeof(ptr)) != sizeof(ptr))
+ unixDie("write");
+}
+
+template<class T>
+bool ObjectPipe<T>::read(T* t)
+{
+ T* ptr;
+ int ret = ::read(d_fds[0], &ptr, sizeof(ptr));
+
+ if(ret < 0)
+ unixDie("read");
+ if(ret==0)
+ return false;
+ *t=*ptr;
+ delete ptr;
+ return true;
+}
+
+template<class T>
+int ObjectPipe<T>::readTimeout(T* t, int msec)
+{
+ T* ptr;
+ int ret = waitForData(d_fds[0], 0, 1000*msec);
+ if(ret <0)
+ unixDie("waiting for data in object pipe");
+ if(ret == 0)
+ return -1;
+
+ ret = ::read(d_fds[0], &ptr, sizeof(ptr));
+
+ if(ret < 0)
+ unixDie("read");
+ if(ret==0)
+ return false;
+ *t=*ptr;
+ delete ptr;
+ return 1;
+}
+
+
+template<class T>
+DelayPipe<T>::DelayPipe() : d_thread(&DelayPipe<T>::worker, this)
+{
+}
+
+template<class T>
+void DelayPipe<T>::submit(T& t, int msec)
+{
+ struct timespec now;
+ clock_gettime(CLOCK_MONOTONIC, &now);
+ now.tv_nsec += msec*1e6;
+ if(now.tv_nsec > 1e9) {
+ now.tv_sec++;
+ now.tv_nsec-=1e9;
+ }
+ Combo c{t, now};
+ d_pipe.write(c);
+}
+
+template<class T>
+DelayPipe<T>::~DelayPipe()
+{
+ d_pipe.close();
+ d_thread.join();
+}
+
+template<class T>
+void DelayPipe<T>::worker()
+{
+ Combo c;
+ for(;;) {
+ int ret = d_pipe.readTimeout(&c, 10); // XXXX NEEDS TO BE DYNAMIC
+ if(ret > 0) { // we got an object
+ d_work.insert(make_pair(c.when, c.what));
+ }
+ else if(ret==0) { // timeout
+
+ break;
+ }
+ else {
+ // cout<<"Got a timeout"<<endl;
+ }
+ struct timespec now;
+ clock_gettime(CLOCK_MONOTONIC, &now);
+ tscomp cmp;
+ for(auto iter = d_work.begin() ; iter != d_work.end(); ) { // do the needful
+ if(cmp(iter->first, now)) {
+ iter->second();
+ d_work.erase(iter++);
+ }
+ else break;
+ }
+ }
+}
--- /dev/null
+#pragma once
+#include <map>
+#include <time.h>
+#include <thread>
+
+/**
+ General idea: many threads submit work to this class, but only one executes it. The work should therefore be entirely trivial.
+ The implementatin is that submitter threads create an object that represents the work, and it gets sent over a pipe
+ to the worker thread.
+
+ The worker thread meanwhile listens on this pipe (non-blocking), with a delay set to the next object that needs to be executed.
+ If meanwhile new work comes in, all objects who's time has come are executed, a new sleep time is calculated.
+*/
+
+
+/* ObjectPipe facilitates the type-safe passing of types over a pipe */
+
+template<class T>
+class ObjectPipe
+{
+public:
+ ObjectPipe();
+ ~ObjectPipe();
+ void write(T& t);
+ bool read(T* t); // returns false on EOF
+ int readTimeout(T* t, int msec); // -1 is timeout, 0 is no data, 1 is data
+ void close();
+private:
+ int d_fds[2];
+};
+
+template<class T>
+class DelayPipe
+{
+public:
+ DelayPipe();
+ ~DelayPipe();
+ void submit(T& t, int msec);
+
+private:
+ std::thread d_thread;
+ void worker();
+ struct Combo
+ {
+ T what;
+ struct timespec when;
+ };
+
+ ObjectPipe<Combo> d_pipe;
+ struct tscomp {
+ bool operator()(const struct timespec& a, const struct timespec& b) const
+ {
+ return std::tie(a.tv_sec, a.tv_nsec) < std::tie(b.tv_sec, b.tv_nsec);
+ }
+ };
+ std::multimap<struct timespec, T, tscomp> d_work;
+};
+
+#include "delaypipe.cc"
});
});
+ g_lua.writeFunction("addDelay", [](boost::variant<string,vector<pair<int, string>> > var, int msec) {
+ auto rule = makeRule(var);
+ g_rulactions.modify([msec,rule](decltype(g_rulactions)::value_type& rulactions) {
+ rulactions.push_back({rule,
+ std::make_shared<DelayAction>(msec)});
+ });
+ });
g_lua.writeFunction("showRules", []() {
#include "dnswriter.hh"
#include "base64.hh"
#include <fstream>
-
+#include "delaypipe.hh"
#include <unistd.h>
#include "sodcrypto.hh"
#include "dnsrulactions.hh"
g_stats.truncFail++;
}
+struct DelayedPacket
+{
+ int fd;
+ string packet;
+ ComboAddress destination;
+ void operator()()
+ {
+ sendto(fd, packet.c_str(), packet.size(), 0, (struct sockaddr*)&destination, destination.getSocklen());
+ }
+};
+
+DelayPipe<DelayedPacket> g_delay;
+
// listens on a dedicated socket, lobs answers from downstream servers to original requestors
void* responderThread(std::shared_ptr<DownstreamState> state)
{
dh->id = ids->origID;
g_stats.responses++;
- if(ids->origDest.sin4.sin_family == 0)
- sendto(ids->origFD, packet, len, 0, (struct sockaddr*)&ids->origRemote, ids->origRemote.getSocklen());
- else
- sendfromto(ids->origFD, packet, len, 0, ids->origDest, ids->origRemote);
+
+ if(ids->delayMsec) {
+ DelayedPacket dp{ids->origFD, string(packet,len), ids->origRemote};
+ g_delay.submit(dp, ids->delayMsec);
+ }
+ else {
+ if(ids->origDest.sin4.sin_family == 0)
+ sendto(ids->origFD, packet, len, 0, (struct sockaddr*)&ids->origRemote, ids->origRemote.getSocklen());
+ else
+ sendfromto(ids->origFD, packet, len, 0, ids->origDest, ids->origRemote);
+ }
double udiff = ids->sentTime.udiff();
vinfolog("Got answer from %s, relayed to %s, took %f usec", state->remote.toStringWithPort(), ids->origRemote.toStringWithPort(), udiff);
return 0;
}
+bool operator<(const struct timespec&a, const struct timespec& b)
+{
+ return std::tie(a.tv_sec, a.tv_nsec) < std::tie(b.tv_sec, b.tv_nsec);
+}
+
+
DownstreamState::DownstreamState(const ComboAddress& remote_)
{
remote = remote_;
}
}
}
+ int delayMsec=0;
switch(action) {
case DNSAction::Action::Drop:
g_stats.ruleDrop++;
case DNSAction::Action::HeaderModify:
dh->qr=true;
break;
+
+ case DNSAction::Action::Delay:
+ delayMsec = atoi(ruleresult.c_str()); // sorry
+ break;
case DNSAction::Action::Allow:
case DNSAction::Action::None:
break;
ids->qname = qname;
ids->qtype = qtype;
ids->origDest.sin4.sin_family=0;
+ ids->delayMsec = delayMsec;
HarvestDestinationAddress(&msgh, &ids->origDest);
dh->id = idOffset;
// e is the exception that was thrown from inside the lambda
response+= string(e.what());
}
+ catch(const PDNSException& e) {
+ // e is the exception that was thrown from inside the lambda
+ response += string(e.reason);
+ }
}
response = sodEncryptSym(response, g_key, ours);
putMsgLen(fd, response.length());
// e is the exception that was thrown from inside the lambda
std::cerr << e.what() << std::endl;
}
+ catch(const PDNSException& e) {
+ // e is the exception that was thrown from inside the lambda
+ std::cerr << e.reason << std::endl;
+ }
}
catch(const std::exception& e) {
// e is the exception that was thrown from inside the lambda
struct IDState
{
- IDState() : origFD(-1) { origDest.sin4.sin_family = 0;}
+ IDState() : origFD(-1), delayMsec(0) { origDest.sin4.sin_family = 0;}
IDState(const IDState& orig)
{
origFD = orig.origFD;
origID = orig.origID;
origRemote = orig.origRemote;
origDest = orig.origDest;
+ delayMsec = orig.delayMsec;
age.store(orig.age.load());
}
std::atomic<uint16_t> age; // 4
uint16_t qtype; // 2
uint16_t origID; // 2
+ int delayMsec;
};
struct Rings {
class DNSAction
{
public:
- enum class Action { Drop, Nxdomain, Spoof, Allow, HeaderModify, Pool, None};
+ enum class Action { Drop, Nxdomain, Spoof, Allow, HeaderModify, Pool, Delay, None};
virtual Action operator()(const ComboAddress& remote, const DNSName& qname, uint16_t qtype, dnsheader* dh, int len, string* ruleresult) const =0;
virtual string toString() const = 0;
};
QPSLimiter d_qps;
};
+class DelayAction : public DNSAction
+{
+public:
+ DelayAction(int msec) : d_msec(msec)
+ {}
+ DNSAction::Action operator()(const ComboAddress& remote, const DNSName& qname, uint16_t qtype, dnsheader* dh, int len, string* ruleresult) const override
+ {
+ *ruleresult=std::to_string(d_msec);
+ return Action::Delay;
+ }
+ string toString() const override
+ {
+ return "delay by "+std::to_string(d_msec)+ " msec";
+ }
+private:
+ int d_msec;
+};
+
+
class PoolAction : public DNSAction
{
public:
--- /dev/null
+#define BOOST_TEST_DYN_LINK
+#define BOOST_TEST_NO_MAIN
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+#include "delaypipe.hh"
+#include <boost/test/unit_test.hpp>
+
+BOOST_AUTO_TEST_SUITE(test_delaypipe_hh);
+
+BOOST_AUTO_TEST_CASE(test_object_pipe) {
+ ObjectPipe<int> op;
+ for(int n=0; n < 100; ++n)
+ op.write(n);
+
+ int i;
+ for(int n=0; n < 100; ++n) {
+ bool res=op.read(&i);
+ BOOST_CHECK_EQUAL(res, true);
+ BOOST_CHECK_EQUAL(n, i);
+ }
+
+ op.close();
+ BOOST_CHECK_EQUAL(op.read(&i), false);
+
+};
+
+int done=0;
+BOOST_AUTO_TEST_CASE(test_delay_pipe) {
+
+ struct Work
+ {
+ int i;
+ void operator()()
+ {
+ ++done;
+ }
+ };
+ DelayPipe<Work> dp;
+ int n;
+ for(n=0; n < 5; ++n) {
+ Work w{n};
+ dp.submit(w, 500);
+ }
+ BOOST_CHECK_EQUAL(done, 0);
+
+ for(; n < 10; ++n) {
+ Work w{n};
+ dp.submit(w, 1000);
+ }
+ sleep(1);
+ BOOST_CHECK_EQUAL(done, 5);
+ sleep(1);
+ BOOST_CHECK_EQUAL(done, n);
+
+};
+
+
+BOOST_AUTO_TEST_SUITE_END();