]> granicus.if.org Git - pdns/commitdiff
make dnsdist spread load to remote with least amount of queue, add primitive timeout...
authorbert hubert <bert.hubert@netherlabs.nl>
Tue, 25 Jun 2013 19:18:38 +0000 (21:18 +0200)
committerbert hubert <bert.hubert@netherlabs.nl>
Tue, 25 Jun 2013 19:18:38 +0000 (21:18 +0200)
pdns/dnsdist.cc
pdns/misc.hh

index 4e0dee9479b09a70337fe062ccfcbb09a285bbaf..24724146b063a1543ed97cdac558cb2186f24c3d 100644 (file)
     along with this program; if not, write to the Free Software
     Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
 */
-#include "dnsparser.hh"
 #include "sstuff.hh"
 #include "misc.hh"
-#include "dnswriter.hh"
-#include "dnsrecords.hh"
 #include "statbag.hh"
 #include <netinet/tcp.h>
 #include <boost/array.hpp>
 #include <boost/program_options.hpp>
 #include <boost/foreach.hpp>
+#include <limits>
 
 /* syntax: dnsdist 8.8.8.8 8.8.4.4 208.67.222.222 208.67.220.220
    Added downstream server 8.8.8.8:53
@@ -93,6 +91,7 @@ struct IDState
   int origFD;  // set to <0 to indicate this state is empty
   uint16_t origID;
   ComboAddress origRemote;
+  AtomicCounter age;
   bool used;
 };
 
@@ -104,6 +103,9 @@ struct DownstreamState
   vector<IDState> idStates;
   AtomicCounter idOffset;
   AtomicCounter sendErrors;
+  AtomicCounter outstanding;
+  AtomicCounter reuseds;
+  AtomicCounter queries;
 };
 
 DownstreamState* g_dstates;
@@ -130,6 +132,9 @@ void* responderThread(void *p)
     IDState* ids = &state->idStates[dh->id];
     if(ids->origFD < 0)
       continue;
+    else
+      --state->outstanding;  // you'd think you could game this, but we're using connected socket
+
     dh->id = ids->origID;
     sendto(ids->origFD, packet, len, 0, (struct sockaddr*)&ids->origRemote, ids->origRemote.getSocklen());
     if(g_verbose)
@@ -146,6 +151,20 @@ struct ClientState
   int fd;
 };
 
+DownstreamState& getBestDownstream()
+{
+  unsigned int lowest = std::numeric_limits<unsigned int>::max();
+  unsigned int chosen = 0;
+  for(unsigned int n = 0; n < g_numremotes; ++n) {
+    if(g_dstates[n].outstanding < lowest) {
+      chosen = n;
+      lowest=g_dstates[n].outstanding;
+    }
+  }
+      
+  return g_dstates[chosen];
+}
+
 // listens to incoming queries, sends out to downstream servers, noting the intended return path 
 void* clientThread(void* p)
 {
@@ -167,10 +186,19 @@ void* clientThread(void* p)
       continue;
     g_numQueries++;
     /* right now, this is our simple round robin downstream selector */
-    DownstreamState& ss = g_dstates[(g_pos++) % g_numremotes]; 
+    DownstreamState& ss = getBestDownstream();
+    ss.queries++;
+
     unsigned int idOffset = (ss.idOffset++) % g_maxOutstanding;
     IDState* ids = &ss.idStates[idOffset];
+
+    if(ids->origFD < 0) // if we are reusing, no change in outstanding
+      ss.outstanding++;
+    else
+      ss.reuseds++;
+
     ids->origFD = cs->fd;
+    ids->age = AtomicCounter();
     ids->origID = dh->id;
     ids->origRemote = remote;
     ids->used = true;
@@ -192,19 +220,31 @@ void* statThread(void*)
   if(!interval)
     return 0;
   uint32_t lastQueries=0;
+  vector<DownstreamState> prev;
+  prev.resize(g_numremotes);
+
   for(;;) {
     sleep(interval);
     
     unsigned int outstanding=0;
     for(unsigned int n=0; n < g_numremotes; ++n) {
-      const DownstreamState& dss = g_dstates[n];
+      DownstreamState& dss = g_dstates[n];
+      cout<<dss.remote.toStringWithPort()<<": "<<dss.outstanding<<" outstanding, "<<(dss.queries - prev[n].queries)/interval <<" qps"<<endl;
+      outstanding += dss.outstanding;
+      prev[n].queries = dss.queries;
+
       for(unsigned int i=0 ; i < g_maxOutstanding; ++i) {
-       const IDState& ids = dss.idStates[i];
-       if(ids.used && ids.origFD >=0)
-         outstanding++;
+       IDState& ids = dss.idStates[i];
+       if(ids.origFD >=0 && ids.age++ > 2) {
+         ids.age = AtomicCounter();
+         ids.origFD = -1;
+         dss.reuseds++;
+         --dss.outstanding;
+       }
+         
       }
     }
-    cout<<outstanding<<" outstanding queries, " << (g_numQueries - lastQueries)/interval <<" qps"<<endl;
+    cout<<outstanding<<" outstanding queries, " <<(g_numQueries - lastQueries)/interval <<" qps"<<endl;
     lastQueries=g_numQueries;
   }
   return 0;
index 00a0e1caf3c50554fb2aa50e6ef0a741269e56ac..34cc02f94fba24e73e837b672519f156a8522a8f 100644 (file)
@@ -355,7 +355,6 @@ public:
       return atomic_exchange_and_add( &value_, +1 );
     }
 
-
     unsigned int operator--()
     {
       return atomic_exchange_and_add( &value_, -1 ) - 1;
@@ -366,10 +365,11 @@ public:
       return atomic_exchange_and_add( &value_, 0);
     }
 
-private:
-    AtomicCounter(AtomicCounter const &);
-    AtomicCounter &operator=(AtomicCounter const &);
+    AtomicCounter(AtomicCounter const &rhs) : value_(rhs)
+    {
+    }
 
+private:
     mutable unsigned int value_;
     
     // the below is necessary because __sync_fetch_and_add is not universally available on i386.. I 3> RHEL5.