From: Remi Gacogne Date: Tue, 5 Jun 2018 11:49:07 +0000 (+0200) Subject: dnsdist: Test the content of dynamic blocks using the API X-Git-Tag: dnsdist-1.3.1~43^2 X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=d3473b8c62305852e1569a40f8cf176d9e77d2d5;p=pdns dnsdist: Test the content of dynamic blocks using the API --- diff --git a/regression-tests.dnsdist/test_DynBlocks.py b/regression-tests.dnsdist/test_DynBlocks.py index fbb914965..a7f883cd0 100644 --- a/regression-tests.dnsdist/test_DynBlocks.py +++ b/regression-tests.dnsdist/test_DynBlocks.py @@ -1,12 +1,40 @@ #!/usr/bin/env python import base64 +import json +import requests import time import dns from dnsdisttests import DNSDistTest, range class DynBlocksTest(DNSDistTest): - def doTestQRate(self, name): + _webTimeout = 2.0 + _webServerPort = 8083 + _webServerBasicAuthPassword = 'secret' + _webServerAPIKey = 'apisecret' + + def doTestDynBlockViaAPI(self, range, reason, minSeconds, maxSeconds, minBlocks, maxBlocks): + headers = {'x-api-key': self._webServerAPIKey} + url = 'http://127.0.0.1:' + str(self._webServerPort) + '/jsonstat?command=dynblocklist' + r = requests.get(url, headers=headers, timeout=self._webTimeout) + self.assertTrue(r) + self.assertEquals(r.status_code, 200) + + content = r.json() + self.assertIsNotNone(content) + self.assertIn(range, content) + + values = content[range] + for key in ['reason', 'seconds', 'blocks']: + self.assertIn(key, values) + + self.assertEqual(values['reason'], reason) + self.assertGreater(values['seconds'], minSeconds) + self.assertLessEqual(values['seconds'], maxSeconds) + self.assertGreaterEqual(values['blocks'], minBlocks) + self.assertLessEqual(values['blocks'], maxBlocks) + + def doTestQRate(self, name, testViaAPI=False): query = dns.message.make_query(name, 'A', 'IN') response = dns.message.make_response(query) rrset = dns.rrset.from_text(name, @@ -43,6 +71,9 @@ class DynBlocksTest(DNSDistTest): (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False) self.assertEquals(receivedResponse, None) + if testViaAPI: + self.doTestDynBlockViaAPI('127.0.0.1/32', 'Exceeded query rate', self._dynBlockDuration - 4, self._dynBlockDuration, (sent-allowed)+1, (sent-allowed)+1) + # wait until we are not blocked anymore time.sleep(self._dynBlockDuration + self._dynBlockPeriod) @@ -414,12 +445,13 @@ class TestDynBlockQPS(DynBlocksTest): _dynBlockQPS = 10 _dynBlockPeriod = 2 _dynBlockDuration = 5 - _config_params = ['_dynBlockQPS', '_dynBlockPeriod', '_dynBlockDuration', '_testServerPort'] + _config_params = ['_dynBlockQPS', '_dynBlockPeriod', '_dynBlockDuration', '_testServerPort', '_webServerPort', '_webServerBasicAuthPassword', '_webServerAPIKey'] _config_template = """ function maintenance() addDynBlocks(exceedQRate(%d, %d), "Exceeded query rate", %d) end newServer{address="127.0.0.1:%s"} + webserver("127.0.0.1:%s", "%s", "%s") """ def testDynBlocksQRate(self): @@ -427,14 +459,14 @@ class TestDynBlockQPS(DynBlocksTest): Dyn Blocks: QRate """ name = 'qrate.dynblocks.tests.powerdns.com.' - self.doTestQRate(name) + self.doTestQRate(name, testViaAPI=True) class TestDynBlockGroupQPS(DynBlocksTest): _dynBlockQPS = 10 _dynBlockPeriod = 2 _dynBlockDuration = 5 - _config_params = ['_dynBlockQPS', '_dynBlockPeriod', '_dynBlockDuration', '_testServerPort'] + _config_params = ['_dynBlockQPS', '_dynBlockPeriod', '_dynBlockDuration', '_testServerPort', '_webServerPort', '_webServerBasicAuthPassword', '_webServerAPIKey'] _config_template = """ local dbr = dynBlockRulesGroup() dbr:setQueryRate(%d, %d, "Exceeded query rate", %d) @@ -443,6 +475,7 @@ class TestDynBlockGroupQPS(DynBlocksTest): dbr:apply() end newServer{address="127.0.0.1:%s"} + webserver("127.0.0.1:%s", "%s", "%s") """ def testDynBlocksQRate(self): @@ -450,7 +483,7 @@ class TestDynBlockGroupQPS(DynBlocksTest): Dyn Blocks (Group): QRate """ name = 'qrate.group.dynblocks.tests.powerdns.com.' - self.doTestQRate(name) + self.doTestQRate(name, testViaAPI=True) class TestDynBlockQPSRefused(DynBlocksTest): @@ -519,7 +552,7 @@ class TestDynBlockQPSActionRefused(DynBlocksTest): name = 'qrateactionrefused.dynblocks.tests.powerdns.com.' self.doTestQRateRCode(name, dns.rcode.REFUSED) -class TestDynBlockQPSActionRefused(DynBlocksTest): +class TestDynBlockGroupQPSActionRefused(DynBlocksTest): _dynBlockQPS = 10 _dynBlockPeriod = 2