]> granicus.if.org Git - pdns/commitdiff
dnsdist: Allow the use of stale cache entries if no backend are available
authorRemi Gacogne <remi.gacogne@powerdns.com>
Thu, 3 Mar 2016 09:54:27 +0000 (10:54 +0100)
committerRemi Gacogne <remi.gacogne@powerdns.com>
Thu, 3 Mar 2016 09:54:27 +0000 (10:54 +0100)
And add more documentation and regression tests.

pdns/README-dnsdist.md
pdns/dnsdist-cache.cc
pdns/dnsdist-cache.hh
pdns/dnsdist-lua2.cc
pdns/dnsdist-tcp.cc
pdns/dnsdist.cc
pdns/dnsdist.hh
pdns/test-dnsdistpacketcache_cc.cc
regression-tests.dnsdist/dnsdisttests.py
regression-tests.dnsdist/test_Caching.py

index 1e6704e4ab1ce2bc91577382a5d15609cf766f85..b6810da226e99aca874597bdead1f32ae78c3d71 100644 (file)
@@ -712,15 +712,56 @@ The first step is to define a cache, then to assign that cache to the chosen poo
 the default one being represented by the empty string:
 
 ```
-pc = newPacketCache(10000, 86400, 600, 60)
+pc = newPacketCache(10000, 86400, 600, 60, 60)
 getPool(""):setCache(pc)
 ```
 
-The first parameter is the maximum number of entries stored in the cache, the
-second one, optional, is the maximum lifetime of an entry in the cache, in seconds,
-the third one, optional too, is the minimum TTL an entry should have to be considered
-for insertion in the cache, and the last one, still optional, is the TTL used for a
-Server Failure response.
+The first parameter is the maximum number of entries stored in the cache, and is the
+only one required. All the others parameters are optional and in seconds.
+The second one is the maximum lifetime of an entry in the cache, the third one is
+the minimum TTL an entry should have to be considered for insertion in the cache,
+the fourth one is the TTL used for a Server Failure response. The last one is the
+TTL that will be used when a stale cache entry is returned.
+
+The `setStaleCacheEntriesTTL(n)` directive can be used to allow `dnsdist` to use
+expired entries from the cache when no backend is available. Only entries that have
+expired for less than `n` seconds will be used, and the returned TTL can be set
+when creating a new cache with `newPacketCache()`.
+
+A reference to the cache affected to a specific pool can be retrieved with:
+
+```
+getPool("poolname"):getCache()
+```
+
+Cache usage stats (hits, misses, deferred inserts and lookups, collisions)
+can be displayed by using the `printStats()` method:
+
+```
+getPool("poolname"):getCache():printStats()
+```
+
+Expired cached entries can be removed from a cache using the `purgeExpired(n)`
+method, which will remove expired entries from the cache until at least `n`
+entries remain in the cache. For example, to remove all expired entries:
+
+```
+getPool("poolname"):getCache():purgeExpired(0)
+```
+
+Specific entries can also be removed using the `expungeByName(DNSName [, qtype=ANY])`
+method.
+
+```
+getPool("poolname"):getCache():expungeByName(newDNSName("powerdns.com"), dnsdist.A)
+```
+
+Finally, the `expunge(n)` method will remove all entries until at most `n`
+entries remain in the cache:
+
+```
+getPool("poolname"):getCache():expunge(0)
+```
 
 
 Performance tuning
@@ -1036,7 +1077,7 @@ instantiate a server with additional parameters
     * `expunge(n)`: remove entries from the cache, leaving at most `n` entries
     * `expungeByName(DNSName [, qtype=ANY])`: remove entries matching the supplied DNSName and type from the cache
     * `isFull()`: return true if the cache has reached the maximum number of entries
-    * `newPacketCache(maxEntries[, maxTTL=86400, minTTL=60, servFailTTL=60])`: return a new PacketCache
+    * `newPacketCache(maxEntries[, maxTTL=86400, minTTL=60, servFailTTL=60, stateTTL=60])`: return a new PacketCache
     * `printStats()`: print the cache stats (hits, misses, deferred lookups and deferred inserts)
     * `purgeExpired(n)`: remove expired entries from the cache until there is at most `n` entries remaining in the cache
     * `toString()`: return the number of entries in the Packet Cache, and the maximum number of entries
@@ -1091,6 +1132,7 @@ instantiate a server with additional parameters
     * `setMaxTCPClientThreads(n)`: set the maximum of TCP client threads, handling TCP connections
     * `setMaxUDPOutstanding(n)`: set the maximum number of outstanding UDP queries to a given backend server. This can only be set at configuration time and defaults to 10240
     * `setCacheCleaningDelay(n)`: set the interval in seconds between two runs of the cache cleaning algorithm, removing expired entries
+    * `setStaleCacheEntriesTTL(n)`: allows using cache entries expired for at most `n` seconds when no backend available to answer for a query
  * DNSCrypt related:
     * `addDNSCryptBind("127.0.0.1:8443", "provider name", "/path/to/resolver.cert", "/path/to/resolver.key", [false]):` listen to incoming DNSCrypt queries on 127.0.0.1 port 8443, with a provider name of "provider name", using a resolver certificate and associated key stored respectively in the `resolver.cert` and `resolver.key` files. The last optional parameter sets SO_REUSEPORT when available
     * `generateDNSCryptProviderKeys("/path/to/providerPublic.key", "/path/to/providerPrivate.key"):` generate a new provider keypair
index 3508962be39a41bde6f2d70839733da9bbb4a263..8231fae4e35fd412d23f8c2bbc524fef1ac763fc 100644 (file)
@@ -1,8 +1,9 @@
+#include "dnsdist.hh"
 #include "dolog.hh"
-#include "dnsdist-cache.hh"
 #include "dnsparser.hh"
+#include "dnsdist-cache.hh"
 
-DNSDistPacketCache::DNSDistPacketCache(size_t maxEntries, uint32_t maxTTL, uint32_t minTTL, uint32_t servFailTTL): d_maxEntries(maxEntries), d_maxTTL(maxTTL), d_servFailTTL(servFailTTL), d_minTTL(minTTL)
+DNSDistPacketCache::DNSDistPacketCache(size_t maxEntries, uint32_t maxTTL, uint32_t minTTL, uint32_t servFailTTL, uint32_t staleTTL): d_maxEntries(maxEntries), d_maxTTL(maxTTL), d_servFailTTL(servFailTTL), d_minTTL(minTTL), d_staleTTL(staleTTL)
 {
   pthread_rwlock_init(&d_lock, 0);
   /* we reserve maxEntries + 1 to avoid rehashing from occuring
@@ -98,14 +99,15 @@ void DNSDistPacketCache::insert(uint32_t key, const DNSName& qname, uint16_t qty
   }
 }
 
-bool DNSDistPacketCache::get(const unsigned char* query, uint16_t queryLen, const DNSName& qname, uint16_t qtype, uint16_t qclass, uint16_t consumed, uint16_t queryId, char* response, uint16_t* responseLen, bool tcp, uint32_t* keyOut, bool skipAging)
+bool DNSDistPacketCache::get(const DNSQuestion& dq, uint16_t consumed, uint16_t queryId, char* response, uint16_t* responseLen, uint32_t* keyOut, uint32_t allowExpired, bool skipAging)
 {
-  uint32_t key = getKey(qname, consumed, query, queryLen, tcp);
+  uint32_t key = getKey(*dq.qname, consumed, (const unsigned char*)dq.dh, dq.len, dq.tcp);
   if (keyOut)
     *keyOut = key;
 
   time_t now = time(NULL);
   time_t age;
+  bool stale = false;
   {
     TryReadLock r(&d_lock);
     if (!r.gotIt()) {
@@ -121,8 +123,13 @@ bool DNSDistPacketCache::get(const unsigned char* query, uint16_t queryLen, cons
 
     const CacheValue& value = it->second;
     if (value.validity < now) {
-      d_misses++;
-      return false;
+      if ((value.validity + allowExpired) < now) {
+        d_misses++;
+        return false;
+      }
+      else {
+        stale = true;
+      }
     }
 
     if (*responseLen < value.len) {
@@ -130,23 +137,30 @@ bool DNSDistPacketCache::get(const unsigned char* query, uint16_t queryLen, cons
     }
 
     /* check for collision */
-    if (!cachedValueMatches(value, qname, qtype, qclass, tcp)) {
+    if (!cachedValueMatches(value, *dq.qname, dq.qtype, dq.qclass, dq.tcp)) {
       d_misses++;
       d_lookupCollisions++;
       return false;
     }
 
-    string dnsQName(qname.toDNSString());
+    string dnsQName(dq.qname->toDNSString());
     memcpy(response, &queryId, sizeof(queryId));
     memcpy(response + sizeof(queryId), value.value.c_str() + sizeof(queryId), sizeof(dnsheader) - sizeof(queryId));
     memcpy(response + sizeof(dnsheader), dnsQName.c_str(), dnsQName.length());
     memcpy(response + sizeof(dnsheader) + dnsQName.length(), value.value.c_str() + sizeof(dnsheader) + dnsQName.length(), value.value.length() - (sizeof(dnsheader) + dnsQName.length()));
     *responseLen = value.len;
-    age = now - value.added;
+    if (!stale) {
+      age = now - value.added;
+    }
+    else {
+      age = (value.validity - value.added) - d_staleTTL;
+    }
   }
 
-  if (!skipAging)
+  if (!skipAging) {
     ageDNSPacket(response, *responseLen, age);
+  }
+
   d_hits++;
   return true;
 }
index 0ee1b136cb322e5aa4876f1e545eb75e36b5237e..011a151c4f773988e3cb5882fdae5d9a2ca4328f 100644 (file)
@@ -4,14 +4,16 @@
 #include <unordered_map>
 #include "lock.hh"
 
+struct DNSQuestion;
+
 class DNSDistPacketCache : boost::noncopyable
 {
 public:
-  DNSDistPacketCache(size_t maxEntries, uint32_t maxTTL=86400, uint32_t minTTL=60, uint32_t servFailTTL=60);
+  DNSDistPacketCache(size_t maxEntries, uint32_t maxTTL=86400, uint32_t minTTL=60, uint32_t servFailTTL=60, uint32_t staleTTL=60);
   ~DNSDistPacketCache();
 
   void insert(uint32_t key, const DNSName& qname, uint16_t qtype, uint16_t qclass, const char* response, uint16_t responseLen, bool tcp, bool servFail=false);
-  bool get(const unsigned char* query, uint16_t queryLen, const DNSName& qname, uint16_t qtype, uint16_t qclass, uint16_t consumed, uint16_t queryId, char* response, uint16_t* responseLen, bool tcp, uint32_t* keyOut, bool skipAging=false);
+  bool get(const DNSQuestion& dq, uint16_t consumed, uint16_t queryId, char* response, uint16_t* responseLen, uint32_t* keyOut, uint32_t allowExpired=0, bool skipAging=false);
   void purgeExpired(size_t upTo=0);
   void expunge(size_t upTo=0);
   void expungeByName(const DNSName& name, uint16_t qtype=QType::ANY);
@@ -57,4 +59,5 @@ private:
   uint32_t d_maxTTL;
   uint32_t d_servFailTTL;
   uint32_t d_minTTL;
+  uint32_t d_staleTTL;
 };
index 82b35927fd0a7d72098e237252673c005b5e9d96..72ed238ab1192aedea17a73792d70ef0de37f72b 100644 (file)
@@ -531,8 +531,8 @@ void moreLua(bool client)
     });
     g_lua.registerFunction("getCache", &ServerPool::getCache);
 
-    g_lua.writeFunction("newPacketCache", [client](size_t maxEntries, boost::optional<uint32_t> maxTTL, boost::optional<uint32_t> minTTL, boost::optional<uint32_t> servFailTTL) {
-        return std::make_shared<DNSDistPacketCache>(maxEntries, maxTTL ? *maxTTL : 86400, minTTL ? *minTTL : 60, servFailTTL ? *servFailTTL : 60);
+    g_lua.writeFunction("newPacketCache", [client](size_t maxEntries, boost::optional<uint32_t> maxTTL, boost::optional<uint32_t> minTTL, boost::optional<uint32_t> servFailTTL, boost::optional<uint32_t> staleTTL) {
+        return std::make_shared<DNSDistPacketCache>(maxEntries, maxTTL ? *maxTTL : 86400, minTTL ? *minTTL : 60, servFailTTL ? *servFailTTL : 60, staleTTL ? *staleTTL : 60);
       });
     g_lua.registerFunction("toString", &DNSDistPacketCache::toString);
     g_lua.registerFunction("isFull", &DNSDistPacketCache::isFull);
@@ -561,4 +561,5 @@ void moreLua(bool client)
       });
 
     g_lua.writeFunction("setVerboseHealthChecks", [](bool verbose) { g_verboseHealthChecks=verbose; });
+    g_lua.writeFunction("setStaleCacheEntriesTTL", [](uint32_t ttl) { g_staleCacheEntriesTTL = ttl; });
 }
index 275f8d0e6e29c7453f6e2089c4c36c31eba94954..f4ed14d7416b13b1f33a94c6698cb708f2567106 100644 (file)
@@ -330,12 +330,8 @@ void* tcpClientThread(int pipefd)
          ds = localPolicy->policy(serverPool->servers, &dq);
          packetCache = serverPool->packetCache;
        }
-       if(!ds) {
-         g_stats.noPolicy++;
-         break;
-       }
 
-        if (ds->useECS) {
+        if (ds && ds->useECS) {
           uint16_t newLen = dq.len;
           handleEDNSClientSubnet(queryBuffer, dq.size, consumed, &newLen, largerQuery, &ednsAdded, ci.remote);
           if (largerQuery.empty() == false) {
@@ -351,7 +347,8 @@ void* tcpClientThread(int pipefd)
         if (packetCache && !dq.skipCache) {
           char cachedResponse[4096];
           uint16_t cachedResponseSize = sizeof cachedResponse;
-          if (packetCache->get((unsigned char*) query, dq.len, *dq.qname, dq.qtype, dq.qclass, consumed, dq.dh->id, cachedResponse, &cachedResponseSize, true, &cacheKey)) {
+          uint32_t allowExpired = ds ? 0 : g_staleCacheEntriesTTL;
+          if (packetCache->get(dq, consumed, dq.dh->id, cachedResponse, &cachedResponseSize, &cacheKey, allowExpired)) {
             if (putNonBlockingMsgLen(ci.fd, cachedResponseSize, g_tcpSendTimeout))
               writen2WithTimeout(ci.fd, cachedResponse, cachedResponseSize, g_tcpSendTimeout);
             g_stats.cacheHits++;
@@ -360,6 +357,11 @@ void* tcpClientThread(int pipefd)
           g_stats.cacheMisses++;
         }
 
+       if(!ds) {
+         g_stats.noPolicy++;
+         break;
+       }
+
        int dsock;
        if(sockets.count(ds->remote) == 0) {
          dsock=sockets[ds->remote]=setupTCPDownstream(ds);
index d86c36081804ecee295343916708e8317f8af555..d46e1b416e9255a11ed77525d101cfed4b3a20a8 100644 (file)
@@ -62,6 +62,7 @@ struct DNSDistStats g_stats;
 uint16_t g_maxOutstanding{10240};
 bool g_console;
 bool g_verboseHealthChecks{false};
+uint32_t g_staleCacheEntriesTTL{0};
 
 GlobalStateHolder<NetmaskGroup> g_ACL;
 string g_outputBuffer;
@@ -818,13 +819,8 @@ try
        packetCache = serverPool->packetCache;
       }
 
-      if(!ss) {
-       g_stats.noPolicy++;
-       continue;
-      }
-
       bool ednsAdded = false;
-      if (ss->useECS) {
+      if (ss && ss->useECS) {
         handleEDNSClientSubnet(query, dq.size, consumed, &dq.len, largerQuery, &(ednsAdded), remote);
       }
 
@@ -832,7 +828,8 @@ try
       if (packetCache && !dq.skipCache) {
         char cachedResponse[4096];
         uint16_t cachedResponseSize = sizeof cachedResponse;
-        if (packetCache->get((unsigned char*) query, dq.len, *dq.qname, dq.qtype, dq.qclass, consumed, dh->id, cachedResponse, &cachedResponseSize, false, &cacheKey)) {
+        uint32_t allowExpired = ss ? 0 : g_staleCacheEntriesTTL;
+        if (packetCache->get(dq, consumed, dh->id, cachedResponse, &cachedResponseSize, &cacheKey, allowExpired)) {
           ComboAddress dest;
           if(HarvestDestinationAddress(&msgh, &dest))
             sendfromto(cs->udpFD, cachedResponse, cachedResponseSize, 0, dest, remote);
@@ -846,6 +843,11 @@ try
         g_stats.cacheMisses++;
       }
 
+      if(!ss) {
+       g_stats.noPolicy++;
+       continue;
+      }
+
       ss->queries++;
 
       unsigned int idOffset = (ss->idOffset++) % ss->idStates.size();
index 61ef35daf1f9f84fbac0ce8589666dccf4732b6b..ca0edddaeb465fdb6a8d2b01accf2e81754041c8 100644 (file)
@@ -13,6 +13,7 @@
 #include "sholder.hh"
 #include "dnscrypt.hh"
 #include "dnsdist-cache.hh"
+
 void* carbonDumpThread();
 uint64_t uptimeOfProcess(const std::string& str);
 
@@ -469,6 +470,7 @@ extern uint16_t g_ECSSourcePrefixV4;
 extern uint16_t g_ECSSourcePrefixV6;
 extern bool g_ECSOverride;
 extern bool g_verboseHealthChecks;
+extern uint32_t g_staleCacheEntriesTTL;
 
 struct dnsheader;
 
index fbf0cf4cb78c8b51a77e2a4ed79694413ecae651..825dc4a812159bca655b99c071567e762580b98d 100644 (file)
@@ -3,9 +3,10 @@
 
 #include <boost/test/unit_test.hpp>
 
+#include "dnsdist.hh"
 #include "iputils.hh"
-#include "dnsdist-cache.hh"
 #include "dnswriter.hh"
+#include "dnsdist-cache.hh"
 
 BOOST_AUTO_TEST_SUITE(dnsdistpacketcache_cc)
 
@@ -16,6 +17,7 @@ BOOST_AUTO_TEST_CASE(test_PacketCacheSimple) {
 
   size_t counter=0;
   size_t skipped=0;
+  ComboAddress remote;
   try {
     for(counter = 0; counter < 100000; ++counter) {
       DNSName a=DNSName("hello ")+DNSName(std::to_string(counter));
@@ -39,12 +41,13 @@ BOOST_AUTO_TEST_CASE(test_PacketCacheSimple) {
       char responseBuf[4096];
       uint16_t responseBufSize = sizeof(responseBuf);
       uint32_t key = 0;
-      bool found = PC.get((const unsigned char*) query.data(), query.size(), a, QType::A, QClass::IN, a.wirelength(), 0, responseBuf, &responseBufSize, false, &key);
+      DNSQuestion dq(&a, QType::A, QClass::IN, &remote, &remote, (struct dnsheader*) query.data(), query.size(), query.size(), false);
+      bool found = PC.get(dq, a.wirelength(), 0, responseBuf, &responseBufSize, &key);
       BOOST_CHECK_EQUAL(found, false);
 
       PC.insert(key, a, QType::A, QClass::IN, (const char*) response.data(), responseLen, false);
 
-      found = PC.get((const unsigned char*) query.data(), query.size(), a, QType::A, QClass::IN, a.wirelength(), pwR.getHeader()->id, responseBuf, &responseBufSize, false, &key, true);
+      found = PC.get(dq, a.wirelength(), pwR.getHeader()->id, responseBuf, &responseBufSize, &key, 0, true);
       if (found == true) {
         BOOST_CHECK_EQUAL(responseBufSize, responseLen);
         int match = memcmp(responseBuf, response.data(), responseLen);
@@ -68,7 +71,8 @@ BOOST_AUTO_TEST_CASE(test_PacketCacheSimple) {
       char responseBuf[4096];
       uint16_t responseBufSize = sizeof(responseBuf);
       uint32_t key = 0;
-      bool found = PC.get((const unsigned char*) query.data(), query.size(), a, QType::A, QClass::IN, a.wirelength(), 0, responseBuf, &responseBufSize, false, &key);
+      DNSQuestion dq(&a, QType::A, QClass::IN, &remote, &remote, (struct dnsheader*) query.data(), query.size(), query.size(), false);
+      bool found = PC.get(dq, a.wirelength(), 0, responseBuf, &responseBufSize, &key);
       if (found == true) {
         PC.expungeByName(a);
         deleted++;
@@ -88,7 +92,8 @@ BOOST_AUTO_TEST_CASE(test_PacketCacheSimple) {
       uint32_t key = 0;
       char response[4096];
       uint16_t responseSize = sizeof(response);
-      if(PC.get(query.data(), len, a, QType::A, QClass::IN, a.wirelength(), pwQ.getHeader()->id, response, &responseSize, false, &key)) {
+      DNSQuestion dq(&a, QType::A, QClass::IN, &remote, &remote, (struct dnsheader*) query.data(), len, query.size(), false);
+      if(PC.get(dq, a.wirelength(), pwQ.getHeader()->id, response, &responseSize, &key)) {
        matches++;
       }
     }
@@ -105,6 +110,7 @@ static DNSDistPacketCache PC(500000);
 static void *threadMangler(void* a)
 {
   try {
+    ComboAddress remote;
     unsigned int offset=(unsigned int)(unsigned long)a;
     for(unsigned int counter=0; counter < 100000; ++counter) {
       DNSName a=DNSName("hello ")+DNSName(std::to_string(counter+offset));
@@ -126,7 +132,8 @@ static void *threadMangler(void* a)
       char responseBuf[4096];
       uint16_t responseBufSize = sizeof(responseBuf);
       uint32_t key = 0;
-      PC.get((const unsigned char*) query.data(), query.size(), a, QType::A, QClass::IN, a.wirelength(), 0, responseBuf, &responseBufSize, false, &key);
+      DNSQuestion dq(&a, QType::A, QClass::IN, &remote, &remote, (struct dnsheader*) query.data(), query.size(), query.size(), false);
+      PC.get(dq, a.wirelength(), 0, responseBuf, &responseBufSize, &key);
 
       PC.insert(key, a, QType::A, QClass::IN, (const char*) response.data(), responseLen, false);
     }
@@ -146,6 +153,7 @@ static void *threadReader(void* a)
   {
     unsigned int offset=(unsigned int)(unsigned long)a;
     vector<DNSResourceRecord> entry;
+    ComboAddress remote;
     for(unsigned int counter=0; counter < 100000; ++counter) {
       DNSName a=DNSName("hello ")+DNSName(std::to_string(counter+offset));
       vector<uint8_t> query;
@@ -155,7 +163,8 @@ static void *threadReader(void* a)
       char responseBuf[4096];
       uint16_t responseBufSize = sizeof(responseBuf);
       uint32_t key = 0;
-      bool found = PC.get((const unsigned char*) query.data(), query.size(), a, QType::A, QClass::IN, a.wirelength(), 0, responseBuf, &responseBufSize, false, &key);
+      DNSQuestion dq(&a, QType::A, QClass::IN, &remote, &remote, (struct dnsheader*) query.data(), query.size(), query.size(), false);
+      bool found = PC.get(dq, a.wirelength(), 0, responseBuf, &responseBufSize, &key);
       if (!found) {
        g_missing++;
       }
index 816b61d683b57e9b3ef17f8269e3b68f4e4a38ee..a9ef2be97ef83a156eac2e7731ada18c33b5c7ea 100644 (file)
@@ -12,7 +12,8 @@ import time
 import unittest
 import dns
 import dns.message
-
+import libnacl
+import libnacl.utils
 
 class DNSDistTest(unittest.TestCase):
     """
@@ -36,6 +37,8 @@ class DNSDistTest(unittest.TestCase):
     """
     _config_params = ['_testServerPort']
     _acl = ['127.0.0.1/32']
+    _consolePort = 5199
+    _consoleKey = None
 
     @classmethod
     def startResponders(cls):
@@ -259,3 +262,40 @@ class DNSDistTest(unittest.TestCase):
 
         while not self._fromResponderQueue.empty():
             self._fromResponderQueue.get(False)
+
+    @staticmethod
+    def generateConsoleKey():
+        return libnacl.utils.salsa_key()
+
+    @classmethod
+    def _encryptConsole(cls, command, nonce):
+        if cls._consoleKey is None:
+            return command
+        return libnacl.crypto_secretbox(command, nonce, cls._consoleKey)
+
+    @classmethod
+    def _decryptConsole(cls, command, nonce):
+        if cls._consoleKey is None:
+            return command
+        return libnacl.crypto_secretbox_open(command, nonce, cls._consoleKey)
+
+    @classmethod
+    def sendConsoleCommand(cls, command, timeout=1.0):
+        ourNonce = libnacl.utils.rand_nonce()
+        theirNonce = None
+        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+        if timeout:
+            sock.settimeout(timeout)
+
+        sock.connect(("127.0.0.1", cls._consolePort))
+        sock.send(ourNonce)
+        theirNonce = sock.recv(len(ourNonce))
+
+        msg = cls._encryptConsole(command, ourNonce)
+        sock.send(struct.pack("!I", len(msg)))
+        sock.send(msg)
+        data = sock.recv(4)
+        (responseLen,) = struct.unpack("!I", data)
+        data = sock.recv(responseLen)
+        response = cls._decryptConsole(data, theirNonce)
+        return response
index 7adcd70893bafd63bca5d28399eff93c5669c20f..1e1cde1be33155a3d2617acb61d5227449335218 100644 (file)
@@ -1,4 +1,5 @@
 #!/usr/bin/env python
+import base64
 import time
 import dns
 from dnsdisttests import DNSDistTest
@@ -480,3 +481,341 @@ class TestCachingCacheFull(DNSDistTest):
             total += TestCachingCacheFull._responsesCounter[key]
 
         self.assertEquals(total, misses)
+
+class TestCachingNoStale(DNSDistTest):
+
+    _consoleKey = DNSDistTest.generateConsoleKey()
+    _consoleKeyB64 = base64.b64encode(_consoleKey)
+    _config_params = ['_consoleKeyB64', '_consolePort', '_testServerPort']
+    _config_template = """
+    pc = newPacketCache(100, 86400, 1)
+    getPool(""):setCache(pc)
+    setKey("%s")
+    controlSocket("127.0.0.1:%s")
+    newServer{address="127.0.0.1:%s"}
+    """
+    def testCacheNoStale(self):
+        """
+        Cache: Cache entry, set backend down, we should not get a stale entry
+
+        """
+        ttl = 1
+        name = 'nostale.cache.tests.powerdns.com.'
+        query = dns.message.make_query(name, 'A', 'IN')
+        response = dns.message.make_response(query)
+        rrset = dns.rrset.from_text(name,
+                                    1,
+                                    dns.rdataclass.IN,
+                                    dns.rdatatype.A,
+                                    '127.0.0.1')
+        response.answer.append(rrset)
+
+        # Miss
+        (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
+        self.assertTrue(receivedQuery)
+        self.assertTrue(receivedResponse)
+        receivedQuery.id = query.id
+        self.assertEquals(query, receivedQuery)
+        self.assertEquals(response, receivedResponse)
+
+        # next queries should hit the cache
+        (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
+        self.assertEquals(receivedResponse, response)
+
+        # ok, we mark the backend as down
+        self.sendConsoleCommand("getServer(0):setDown()")
+        # and we wait for the entry to expire
+        time.sleep(ttl + 1)
+
+        # we should NOT get a cached, stale, entry
+        (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
+        self.assertEquals(receivedResponse, None)
+
+
+class TestCachingStale(DNSDistTest):
+
+    _consoleKey = DNSDistTest.generateConsoleKey()
+    _consoleKeyB64 = base64.b64encode(_consoleKey)
+    _staleCacheTTL = 60
+    _config_params = ['_staleCacheTTL', '_consoleKeyB64', '_consolePort', '_testServerPort']
+    _config_template = """
+    pc = newPacketCache(100, 86400, 1, %s)
+    getPool(""):setCache(pc)
+    setStaleCacheEntriesTTL(600)
+    setKey("%s")
+    controlSocket("127.0.0.1:%s")
+    newServer{address="127.0.0.1:%s"}
+    """
+    def testCacheStale(self):
+        """
+        Cache: Cache entry, set backend down, get stale entry
+
+        """
+        misses = 0
+        ttl = 1
+        name = 'stale.cache.tests.powerdns.com.'
+        query = dns.message.make_query(name, 'A', 'IN')
+        response = dns.message.make_response(query)
+        rrset = dns.rrset.from_text(name,
+                                    ttl,
+                                    dns.rdataclass.IN,
+                                    dns.rdatatype.A,
+                                    '127.0.0.1')
+        response.answer.append(rrset)
+
+        # Miss
+        (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
+        self.assertTrue(receivedQuery)
+        self.assertTrue(receivedResponse)
+        receivedQuery.id = query.id
+        self.assertEquals(query, receivedQuery)
+        self.assertEquals(response, receivedResponse)
+        misses += 1
+
+        # next queries should hit the cache
+        (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
+        self.assertEquals(receivedResponse, response)
+
+        # ok, we mark the backend as down
+        self.sendConsoleCommand("getServer(0):setDown()")
+        # and we wait for the entry to expire
+        time.sleep(ttl + 1)
+
+        # we should get a cached, stale, entry
+        (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
+        self.assertEquals(receivedResponse, response)
+        for an in receivedResponse.answer:
+            self.assertEquals(an.ttl, self._staleCacheTTL)
+
+        total = 0
+        for key in TestCachingCacheFull._responsesCounter:
+            total += TestCachingCacheFull._responsesCounter[key]
+
+        self.assertEquals(total, misses)
+
+class TestCacheManagement(DNSDistTest):
+
+    _consoleKey = DNSDistTest.generateConsoleKey()
+    _consoleKeyB64 = base64.b64encode(_consoleKey)
+    _config_params = ['_consoleKeyB64', '_consolePort', '_testServerPort']
+    _config_template = """
+    pc = newPacketCache(100, 86400, 1)
+    getPool(""):setCache(pc)
+    setKey("%s")
+    controlSocket("127.0.0.1:%s")
+    newServer{address="127.0.0.1:%s"}
+    """
+    def testCacheExpunge(self):
+        """
+        Cache: Expunge
+
+        """
+        misses = 0
+        ttl = 600
+        name = 'expunge.cache.tests.powerdns.com.'
+        query = dns.message.make_query(name, 'A', 'IN')
+        response = dns.message.make_response(query)
+        rrset = dns.rrset.from_text(name,
+                                    ttl,
+                                    dns.rdataclass.IN,
+                                    dns.rdatatype.A,
+                                    '127.0.0.1')
+        response.answer.append(rrset)
+
+        # Miss
+        (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
+        self.assertTrue(receivedQuery)
+        self.assertTrue(receivedResponse)
+        receivedQuery.id = query.id
+        self.assertEquals(query, receivedQuery)
+        self.assertEquals(response, receivedResponse)
+        misses += 1
+
+        # next queries should hit the cache
+        (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
+        self.assertEquals(receivedResponse, response)
+
+        # remove cached entries
+        self.sendConsoleCommand("getPool(\"\"):getCache():expunge(0)")
+
+        # Miss
+        (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
+        self.assertTrue(receivedQuery)
+        self.assertTrue(receivedResponse)
+        receivedQuery.id = query.id
+        self.assertEquals(query, receivedQuery)
+        self.assertEquals(response, receivedResponse)
+        misses += 1
+
+        # next queries should hit the cache again
+        (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
+        self.assertEquals(receivedResponse, response)
+
+        total = 0
+        for key in TestCachingCacheFull._responsesCounter:
+            total += TestCachingCacheFull._responsesCounter[key]
+
+        self.assertEquals(total, misses)
+
+    def testCacheExpungeByName(self):
+        """
+        Cache: Expunge by name
+
+        """
+        misses = 0
+        ttl = 600
+        name = 'expungebyname.cache.tests.powerdns.com.'
+        query = dns.message.make_query(name, 'A', 'IN')
+        response = dns.message.make_response(query)
+        rrset = dns.rrset.from_text(name,
+                                    ttl,
+                                    dns.rdataclass.IN,
+                                    dns.rdatatype.A,
+                                    '127.0.0.1')
+        response.answer.append(rrset)
+
+        name2 = 'expungebynameother.cache.tests.powerdns.com.'
+        query2 = dns.message.make_query(name2, 'A', 'IN')
+        response2 = dns.message.make_response(query2)
+        rrset2 = dns.rrset.from_text(name2,
+                                     ttl,
+                                     dns.rdataclass.IN,
+                                     dns.rdatatype.A,
+                                     '127.0.0.1')
+        response2.answer.append(rrset2)
+
+        # Miss
+        (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
+        self.assertTrue(receivedQuery)
+        self.assertTrue(receivedResponse)
+        receivedQuery.id = query.id
+        self.assertEquals(query, receivedQuery)
+        self.assertEquals(response, receivedResponse)
+        misses += 1
+
+        # next queries should hit the cache
+        (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
+        self.assertEquals(receivedResponse, response)
+
+        # cache another entry
+        (receivedQuery, receivedResponse) = self.sendUDPQuery(query2, response2)
+        self.assertTrue(receivedQuery)
+        self.assertTrue(receivedResponse)
+        receivedQuery.id = query2.id
+        self.assertEquals(query2, receivedQuery)
+        self.assertEquals(response2, receivedResponse)
+        misses += 1
+
+        # queries for name and name 2 should hit the cache
+        (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
+        self.assertEquals(receivedResponse, response)
+
+        (_, receivedResponse) = self.sendUDPQuery(query2, response=None, useQueue=False)
+        self.assertEquals(receivedResponse, response2)
+
+        # remove cached entries from name
+        self.sendConsoleCommand("getPool(\"\"):getCache():expungeByName(newDNSName(\"" + name + "\"))")
+
+        # Miss for name
+        (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
+        self.assertTrue(receivedQuery)
+        self.assertTrue(receivedResponse)
+        receivedQuery.id = query.id
+        self.assertEquals(query, receivedQuery)
+        self.assertEquals(response, receivedResponse)
+        misses += 1
+
+        # next queries for name should hit the cache again
+        (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
+        self.assertEquals(receivedResponse, response)
+
+        # queries for name2 should still hit the cache
+        (_, receivedResponse) = self.sendUDPQuery(query2, response=None, useQueue=False)
+        self.assertEquals(receivedResponse, response2)
+
+        total = 0
+        for key in TestCachingCacheFull._responsesCounter:
+            total += TestCachingCacheFull._responsesCounter[key]
+
+        self.assertEquals(total, misses)
+
+    def testCacheExpungeByNameAndType(self):
+        """
+        Cache: Expunge by name and type
+
+        """
+        misses = 0
+        ttl = 600
+        name = 'expungebynameandtype.cache.tests.powerdns.com.'
+        query = dns.message.make_query(name, 'A', 'IN')
+        response = dns.message.make_response(query)
+        rrset = dns.rrset.from_text(name,
+                                    ttl,
+                                    dns.rdataclass.IN,
+                                    dns.rdatatype.A,
+                                    '127.0.0.1')
+        response.answer.append(rrset)
+
+        query2 = dns.message.make_query(name, 'AAAA', 'IN')
+        response2 = dns.message.make_response(query2)
+        rrset2 = dns.rrset.from_text(name,
+                                     ttl,
+                                     dns.rdataclass.IN,
+                                     dns.rdatatype.AAAA,
+                                     '::1')
+        response2.answer.append(rrset2)
+
+        # Miss
+        (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
+        self.assertTrue(receivedQuery)
+        self.assertTrue(receivedResponse)
+        receivedQuery.id = query.id
+        self.assertEquals(query, receivedQuery)
+        self.assertEquals(response, receivedResponse)
+        misses += 1
+
+        # next queries should hit the cache
+        (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
+        self.assertEquals(receivedResponse, response)
+
+        # cache another entry
+        (receivedQuery, receivedResponse) = self.sendUDPQuery(query2, response2)
+        self.assertTrue(receivedQuery)
+        self.assertTrue(receivedResponse)
+        receivedQuery.id = query2.id
+        self.assertEquals(query2, receivedQuery)
+        self.assertEquals(response2, receivedResponse)
+        misses += 1
+
+        # queries for name A and AAAA should hit the cache
+        (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
+        self.assertEquals(receivedResponse, response)
+
+        (_, receivedResponse) = self.sendUDPQuery(query2, response=None, useQueue=False)
+        self.assertEquals(receivedResponse, response2)
+
+        # remove cached entries from name A
+        self.sendConsoleCommand("getPool(\"\"):getCache():expungeByName(newDNSName(\"" + name + "\"), dnsdist.A)")
+
+        # Miss for name A
+        (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
+        self.assertTrue(receivedQuery)
+        self.assertTrue(receivedResponse)
+        receivedQuery.id = query.id
+        self.assertEquals(query, receivedQuery)
+        self.assertEquals(response, receivedResponse)
+        misses += 1
+
+        # next queries for name A should hit the cache again
+        (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
+        self.assertEquals(receivedResponse, response)
+
+        # queries for name AAAA should still hit the cache
+        (_, receivedResponse) = self.sendUDPQuery(query2, response=None, useQueue=False)
+        self.assertEquals(receivedResponse, response2)
+
+        total = 0
+        for key in TestCachingCacheFull._responsesCounter:
+            total += TestCachingCacheFull._responsesCounter[key]
+
+        self.assertEquals(total, misses)