#!/usr/bin/env python
import base64
+import json
+import requests
import time
import dns
from dnsdisttests import DNSDistTest, range
class DynBlocksTest(DNSDistTest):
- def doTestQRate(self, name):
+ _webTimeout = 2.0
+ _webServerPort = 8083
+ _webServerBasicAuthPassword = 'secret'
+ _webServerAPIKey = 'apisecret'
+
+ def doTestDynBlockViaAPI(self, range, reason, minSeconds, maxSeconds, minBlocks, maxBlocks):
+ headers = {'x-api-key': self._webServerAPIKey}
+ url = 'http://127.0.0.1:' + str(self._webServerPort) + '/jsonstat?command=dynblocklist'
+ r = requests.get(url, headers=headers, timeout=self._webTimeout)
+ self.assertTrue(r)
+ self.assertEquals(r.status_code, 200)
+
+ content = r.json()
+ self.assertIsNotNone(content)
+ self.assertIn(range, content)
+
+ values = content[range]
+ for key in ['reason', 'seconds', 'blocks']:
+ self.assertIn(key, values)
+
+ self.assertEqual(values['reason'], reason)
+ self.assertGreater(values['seconds'], minSeconds)
+ self.assertLessEqual(values['seconds'], maxSeconds)
+ self.assertGreaterEqual(values['blocks'], minBlocks)
+ self.assertLessEqual(values['blocks'], maxBlocks)
+
+ def doTestQRate(self, name, testViaAPI=False):
query = dns.message.make_query(name, 'A', 'IN')
response = dns.message.make_response(query)
rrset = dns.rrset.from_text(name,
(_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
self.assertEquals(receivedResponse, None)
+ if testViaAPI:
+ self.doTestDynBlockViaAPI('127.0.0.1/32', 'Exceeded query rate', self._dynBlockDuration - 4, self._dynBlockDuration, (sent-allowed)+1, (sent-allowed)+1)
+
# wait until we are not blocked anymore
time.sleep(self._dynBlockDuration + self._dynBlockPeriod)
_dynBlockQPS = 10
_dynBlockPeriod = 2
_dynBlockDuration = 5
- _config_params = ['_dynBlockQPS', '_dynBlockPeriod', '_dynBlockDuration', '_testServerPort']
+ _config_params = ['_dynBlockQPS', '_dynBlockPeriod', '_dynBlockDuration', '_testServerPort', '_webServerPort', '_webServerBasicAuthPassword', '_webServerAPIKey']
_config_template = """
function maintenance()
addDynBlocks(exceedQRate(%d, %d), "Exceeded query rate", %d)
end
newServer{address="127.0.0.1:%s"}
+ webserver("127.0.0.1:%s", "%s", "%s")
"""
def testDynBlocksQRate(self):
Dyn Blocks: QRate
"""
name = 'qrate.dynblocks.tests.powerdns.com.'
- self.doTestQRate(name)
+ self.doTestQRate(name, testViaAPI=True)
class TestDynBlockGroupQPS(DynBlocksTest):
_dynBlockQPS = 10
_dynBlockPeriod = 2
_dynBlockDuration = 5
- _config_params = ['_dynBlockQPS', '_dynBlockPeriod', '_dynBlockDuration', '_testServerPort']
+ _config_params = ['_dynBlockQPS', '_dynBlockPeriod', '_dynBlockDuration', '_testServerPort', '_webServerPort', '_webServerBasicAuthPassword', '_webServerAPIKey']
_config_template = """
local dbr = dynBlockRulesGroup()
dbr:setQueryRate(%d, %d, "Exceeded query rate", %d)
dbr:apply()
end
newServer{address="127.0.0.1:%s"}
+ webserver("127.0.0.1:%s", "%s", "%s")
"""
def testDynBlocksQRate(self):
Dyn Blocks (Group): QRate
"""
name = 'qrate.group.dynblocks.tests.powerdns.com.'
- self.doTestQRate(name)
+ self.doTestQRate(name, testViaAPI=True)
class TestDynBlockQPSRefused(DynBlocksTest):
name = 'qrateactionrefused.dynblocks.tests.powerdns.com.'
self.doTestQRateRCode(name, dns.rcode.REFUSED)
-class TestDynBlockQPSActionRefused(DynBlocksTest):
+class TestDynBlockGroupQPSActionRefused(DynBlocksTest):
_dynBlockQPS = 10
_dynBlockPeriod = 2