:query server_id: The name of the server
:query domain: The domainname to flush for
+ :query subtree: If set to `true`, also flush the whole subtree (default=`false`)
**Example Response:**
throw HttpMethodNotAllowedException();
DNSName canon = apiNameToDNSName(req->getvars["domain"]);
+ bool subtree = (req->getvars["subtree"].compare("true") == 0);
- int count = broadcastAccFunction<uint64_t>(boost::bind(pleaseWipeCache, canon, false));
- count += broadcastAccFunction<uint64_t>(boost::bind(pleaseWipePacketCache, canon, false));
- count += broadcastAccFunction<uint64_t>(boost::bind(pleaseWipeAndCountNegCache, canon, false));
+ int count = broadcastAccFunction<uint64_t>(boost::bind(pleaseWipeCache, canon, subtree));
+ count += broadcastAccFunction<uint64_t>(boost::bind(pleaseWipePacketCache, canon, subtree));
+ count += broadcastAccFunction<uint64_t>(boost::bind(pleaseWipeAndCountNegCache, canon, subtree));
resp->setBody(Json::object {
{ "count", count },
{ "result", "Flushed cache." }
python -V
pip install -r requirements.txt
+export SDIG=$(type -P sdig)
+
set -e
if [ "${PDNS_DEBUG}" = "YES" ]; then
set -x
SQLITE_DB = 'pdns.sqlite3'
WEBPORT = '5556'
+DNSPORT = 5555
APIKEY = '1234567890abcdefghijklmnopq-key'
PDNSUTIL_CMD = ["../pdns/pdnsutil", "--config-dir=."]
shutil.rmtree(name)
os.mkdir(name)
+def format_call_args(cmd):
+ return "$ '%s'" % ("' '".join(cmd))
+
+def run_check_call(cmd, *args, **kwargs):
+ print format_call_args(cmd)
+ subprocess.check_call(cmd, *args, **kwargs)
wait = ('--wait' in sys.argv)
if wait:
pdns_recursor = os.environ.get("PDNSRECURSOR", "../pdns/recursordist/pdns_recursor")
+# Take sdig if it exists (recursor in travis), otherwise build it from Authoritative source.
+sdig = os.environ.get("SDIG", "")
+if sdig:
+ sdig = os.path.abspath(sdig)
+if not sdig or not os.path.exists(sdig):
+ run_check_call(["make", "-C", "../pdns", "sdig"])
+ sdig = "../pdns/sdig"
+
if daemon == 'authoritative':
# Prepare sqlite DB with some zones.
'DAEMON': daemon,
'SQLITE_DB': SQLITE_DB,
'PDNSUTIL_CMD': ' '.join(PDNSUTIL_CMD),
+ 'SDIG': sdig,
+ 'DNSPORT': str(DNSPORT)
})
try:
-from test_helper import ApiTestCase, is_auth, is_recursor
+from test_helper import ApiTestCase, is_auth, is_recursor, sdig
class Servers(ApiTestCase):
data = r.json()
self.assertIn('count', data)
+ def test_flush_count(self):
+ sdig("ns1.example.com", 'A')
+ r = self.session.put(self.url("/api/v1/servers/localhost/cache/flush?domain=ns1.example.com."))
+ self.assert_success_json(r)
+ data = r.json()
+ self.assertIn('count', data)
+ self.assertEquals(1, data['count'])
+
+ def test_flush_subtree(self):
+ sdig("ns1.example.com", 'A')
+ sdig("ns2.example.com", 'A')
+ r = self.session.put(self.url("/api/v1/servers/localhost/cache/flush?domain=example.com.&subtree=false"))
+ self.assert_success_json(r)
+ data = r.json()
+ self.assertIn('count', data)
+ # Yes, this is 0 in 4.1.x, 1 in master, because in master we send a query for example.com "to create statistic data"
+ self.assertEquals(0, data['count'])
+ r = self.session.put(self.url("/api/v1/servers/localhost/cache/flush?domain=example.com.&subtree=true"))
+ self.assert_success_json(r)
+ data = r.json()
+ self.assertIn('count', data)
+ self.assertEquals(2, data['count'])
+
def test_flush_root(self):
r = self.session.put(self.url("/api/v1/servers/localhost/cache/flush?domain=."))
self.assert_success_json(r)
DAEMON = os.environ.get('DAEMON', 'authoritative')
PDNSUTIL_CMD = os.environ.get('PDNSUTIL_CMD', 'NOT_SET BUT_THIS MIGHT_BE_A_LIST').split(' ')
SQLITE_DB = os.environ.get('SQLITE_DB', 'pdns.sqlite3')
-
+SDIG = os.environ.get('SDIG', 'sdig')
+DNSPORT = os.environ.get('DNSPORT', '53')
class ApiTestCase(unittest.TestCase):
def pdnsutil_rectify(zonename):
"""Run pdnsutil rectify-zone on the given zone."""
subprocess.check_call(PDNSUTIL_CMD + ['rectify-zone', zonename])
+
+def sdig(*args):
+ try:
+ return subprocess.check_call([SDIG, '127.0.0.1', str(DNSPORT)] + list(args))
+ except subprocess.CalledProcessError as except_inst:
+ raise RuntimeError("sdig %s %s failed: %s" % (command, args, except_inst.output.decode('ascii', errors='replace')))