]> granicus.if.org Git - pdns/commitdiff
make pdns-distributes-queries use a hash so queries get sent to the same thread....
authorbert hubert <bert.hubert@netherlabs.nl>
Wed, 23 Apr 2014 20:33:18 +0000 (22:33 +0200)
committerbert hubert <bert.hubert@netherlabs.nl>
Wed, 23 Apr 2014 20:33:18 +0000 (22:33 +0200)
Astoundingly effective, approximately halves CPU usage.

pdns/dns.cc
pdns/dns.hh
pdns/dnspcap.hh
pdns/misc.cc
pdns/misc.hh
pdns/pdns_recursor.cc
pdns/syncres.hh

index 34999b550a171dc585b42ff960438c95ec383765..db23944a286459cb17ee16e398f26fffed8954eb 100644 (file)
@@ -128,6 +128,25 @@ bool dnspacketLessThan(const std::string& a, const std::string& b)
   return boost::tie(aQtype, aQclass) < boost::tie(bQtype, bQclass);
 }
 
+// goal is to has based purely on the question name, and turn error into 'default'
+uint32_t hashQuestion(const char* packet, uint16_t len, uint32_t init)
+{
+  if(len < 12) 
+    return init;
+  
+  uint32_t ret=init;
+  const unsigned char* end = (const unsigned char*)packet+len;
+  const unsigned char* pos = (const unsigned char*)packet+12;
+
+  unsigned char labellen;
+  while((labellen=*pos++) && pos < end) { 
+    if(pos + labellen + 1> end)
+      return 0;
+    ret=burtle(pos, labellen+1, ret);
+    pos += labellen;
+  }
+  return ret;
+}
 
 string questionExpand(const char* packet, uint16_t len, uint16_t& type)
 {
index 7f7146f202dbeb247f0bee7e3dbd47ac2c8ca9b2..2e9d5ef6e1b2e2c867a2ad4d9414357a0ff709d4 100644 (file)
@@ -283,6 +283,7 @@ struct dnsheader {
 #define L theL()
 extern time_t s_starttime;
 std::string questionExpand(const char* packet, uint16_t len, uint16_t& type);
+uint32_t hashQuestion(const char* packet, uint16_t len, uint32_t init);
 bool dnspacketLessThan(const std::string& a, const std::string& b);
 
 /** helper function for both DNSPacket and addSOARecord() - converts a line into a struct, for easier parsing */
index 1abc40c5a8c9247529bd27d05d600b9b927d182a..d18b69a7f2d183b0aeb82fde689b7dfc598ccf6b 100644 (file)
@@ -83,7 +83,6 @@ public:
 
   bool getUDPPacket();
 
-
   ComboAddress getSource() const;
   ComboAddress getDest() const;
 
index 53e9802552d6fa2f68bd334266c896c4a453a549..db9ea9675165bbf3a231cec21498c340e05664ad 100644 (file)
@@ -847,3 +847,56 @@ void setFilenumLimit(unsigned int lim)
   if(setrlimit(RLIMIT_NOFILE, &rlim) < 0)
     unixDie("Setting number of available file descriptors");
 }
+
+#define burtlemix(a,b,c) \
+{ \
+  a -= b; a -= c; a ^= (c>>13); \
+  b -= c; b -= a; b ^= (a<<8); \
+  c -= a; c -= b; c ^= (b>>13); \
+  a -= b; a -= c; a ^= (c>>12);  \
+  b -= c; b -= a; b ^= (a<<16); \
+  c -= a; c -= b; c ^= (b>>5); \
+  a -= b; a -= c; a ^= (c>>3);  \
+  b -= c; b -= a; b ^= (a<<10); \
+  c -= a; c -= b; c ^= (b>>15); \
+}
+
+uint32_t burtle(const unsigned char* k, uint32_t length, uint32_t initval)
+{
+  uint32_t a,b,c,len;
+
+   /* Set up the internal state */
+  len = length;
+  a = b = 0x9e3779b9;  /* the golden ratio; an arbitrary value */
+  c = initval;         /* the previous hash value */
+
+  /*---------------------------------------- handle most of the key */
+  while (len >= 12) {
+    a += (k[0] +((uint32_t)k[1]<<8) +((uint32_t)k[2]<<16) +((uint32_t)k[3]<<24));
+    b += (k[4] +((uint32_t)k[5]<<8) +((uint32_t)k[6]<<16) +((uint32_t)k[7]<<24));
+    c += (k[8] +((uint32_t)k[9]<<8) +((uint32_t)k[10]<<16)+((uint32_t)k[11]<<24));
+    burtlemix(a,b,c);
+    k += 12; len -= 12;
+  }
+
+  /*------------------------------------- handle the last 11 bytes */
+  c += length;
+  switch(len) {             /* all the case statements fall through */
+  case 11: c+=((uint32_t)k[10]<<24);
+  case 10: c+=((uint32_t)k[9]<<16);
+  case 9 : c+=((uint32_t)k[8]<<8);
+    /* the first byte of c is reserved for the length */
+  case 8 : b+=((uint32_t)k[7]<<24);
+  case 7 : b+=((uint32_t)k[6]<<16);
+  case 6 : b+=((uint32_t)k[5]<<8);
+  case 5 : b+=k[4];
+  case 4 : a+=((uint32_t)k[3]<<24);
+  case 3 : a+=((uint32_t)k[2]<<16);
+  case 2 : a+=((uint32_t)k[1]<<8);
+  case 1 : a+=k[0];
+    /* case 0: nothing left to add */
+  }
+  burtlemix(a,b,c);
+  /*-------------------------------------------- report the result */
+  return c;
+}
index 917fd36ccff203ee5e0e674ed629c1af00a83e90..4712e28ac185e050831998487b5e9f45f2292c85 100644 (file)
@@ -528,4 +528,5 @@ void addCMsgSrcAddr(struct msghdr* msgh, void* cmsgbuf, ComboAddress* source);
 unsigned int getFilenumLimit(bool hardOrSoft=0);
 void setFilenumLimit(unsigned int lim);
 bool readFileIfThere(const char* fname, std::string* line);
+uint32_t burtle(const unsigned char* k, uint32_t lengh, uint32_t init);
 #endif
index cd16f00f1a81ae392885b27a02c6ef0ca3ac90e0..595c674ee5dc45021e7785e3f21c1b70e02657ce 100644 (file)
@@ -947,7 +947,7 @@ void handleNewUDPQuestion(int fd, FDMultiplexer::funcparam_t& var)
       else {
         string question(data, len);
         if(g_weDistributeQueries)
-          distributeAsyncFunction(boost::bind(doProcessUDPQuestion, question, fromaddr, fd));
+          distributeAsyncFunction(question, boost::bind(doProcessUDPQuestion, question, fromaddr, fd));
         else
           doProcessUDPQuestion(question, fromaddr, fd);
       }
@@ -1272,11 +1272,13 @@ void broadcastFunction(const pipefunc_t& func, bool skipSelf)
     }
   }
 }
-void distributeAsyncFunction(const pipefunc_t& func)
+
+uint32_t g_disthashseed;
+void distributeAsyncFunction(const std::string& question, const pipefunc_t& func)
 {
-  static unsigned int counter;
-  unsigned int target = 1 + (++counter % (g_pipes.size()-1));
-  // cerr<<"Sending to: "<<target<<endl;
+  unsigned int hash = hashQuestion(question.c_str(), question.length(), g_disthashseed);
+  unsigned int target = 1 + (hash % (g_pipes.size()-1));
+
   if(target == t_id) {
     func();
     return;
@@ -1287,8 +1289,7 @@ void distributeAsyncFunction(const pipefunc_t& func)
   tmsg->wantAnswer = false;
   
   if(write(tps.writeToThread, &tmsg, sizeof(tmsg)) != sizeof(tmsg))
-    unixDie("write to thread pipe returned wrong size or error");
-    
+    unixDie("write to thread pipe returned wrong size or error");    
 }
 
 void handlePipeRequest(int fd, FDMultiplexer::funcparam_t& var)
@@ -1759,6 +1760,8 @@ int serviceMain(int argc, char*argv[])
 
   showProductVersion();
   seedRandom(::arg()["entropy-source"]);
+  g_disthashseed=dns_random(0xffffffff);
+
   parseACLs();
   
   if(!::arg()["dont-query"].empty()) {
index 6a8840b83845266e011e9dfe9aae2de8a220c806..4c855b01d50bd639837df6bebe6a24f691a46e9a 100644 (file)
@@ -622,7 +622,7 @@ ComboAddress parseIPAndPort(const std::string& input, uint16_t port);
 ComboAddress getQueryLocalAddress(int family, uint16_t port);
 typedef boost::function<void*(void)> pipefunc_t;
 void broadcastFunction(const pipefunc_t& func, bool skipSelf = false);
-void distributeAsyncFunction(const pipefunc_t& func);
+void distributeAsyncFunction(const std::string& question, const pipefunc_t& func);
 
 int directResolve(const std::string& qname, const QType& qtype, int qclass, vector<DNSResourceRecord>& ret);