name = 'servfailrate.dynblocks.tests.powerdns.com.'
self.doTestRCodeRate(name, dns.rcode.SERVFAIL)
+class TestDynBlockWhitelist(DynBlocksTest):
+
+ _dynBlockQPS = 10
+ _dynBlockPeriod = 2
+ _dynBlockDuration = 5
+ _config_params = ['_dynBlockQPS', '_dynBlockPeriod', '_dynBlockDuration', '_testServerPort']
+ _config_template = """
+ whitelisted = false
+ function maintenance()
+ toBlock = exceedQRate(%d, %d)
+ for addr, count in pairs(toBlock) do
+ if addr:toString() == "127.0.0.1" then
+ whitelisted = true
+ toBlock[addr] = nil
+ end
+ end
+ addDynBlocks(toBlock, "Exceeded query rate", %d)
+ end
+
+ function spoofrule(dq)
+ if (whitelisted)
+ then
+ return DNSAction.Spoof, "192.0.2.42"
+ else
+ return DNSAction.None, ""
+ end
+ end
+ addAction("whitelisted-test.dynblocks.tests.powerdns.com.", LuaAction(spoofrule))
+
+ newServer{address="127.0.0.1:%s"}
+ """
+
+ def testWhitelisted(self):
+ """
+ Dyn Blocks: Whitelisted from the dynamic blocks
+ """
+ name = 'whitelisted.dynblocks.tests.powerdns.com.'
+ query = dns.message.make_query(name, 'A', 'IN')
+ response = dns.message.make_response(query)
+ rrset = dns.rrset.from_text(name,
+ 60,
+ dns.rdataclass.IN,
+ dns.rdatatype.A,
+ '192.0.2.1')
+ response.answer.append(rrset)
+
+ allowed = 0
+ sent = 0
+ for _ in range((self._dynBlockQPS * self._dynBlockPeriod) + 1):
+ (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
+ sent = sent + 1
+ if receivedQuery:
+ receivedQuery.id = query.id
+ self.assertEquals(query, receivedQuery)
+ self.assertEquals(response, receivedResponse)
+ allowed = allowed + 1
+ else:
+ # the query has not reached the responder,
+ # let's clear the response queue
+ self.clearToResponderQueue()
+
+ # we should not have been blocked
+ self.assertEqual(allowed, sent)
+
+ # wait for the maintenance function to run
+ time.sleep(2)
+
+ # we should still not be blocked
+ (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
+ receivedQuery.id = query.id
+ self.assertEquals(query, receivedQuery)
+ self.assertEquals(receivedResponse, receivedResponse)
+
+ # check that we would have been blocked without the whitelisting
+ name = 'whitelisted-test.dynblocks.tests.powerdns.com.'
+ query = dns.message.make_query(name, 'A', 'IN')
+ # dnsdist set RA = RD for spoofed responses
+ query.flags &= ~dns.flags.RD
+ expectedResponse = dns.message.make_response(query)
+ rrset = dns.rrset.from_text(name,
+ 60,
+ dns.rdataclass.IN,
+ dns.rdatatype.A,
+ '192.0.2.42')
+ expectedResponse.answer.append(rrset)
+ (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
+ self.assertEquals(receivedResponse, expectedResponse)
+
class TestDynBlockGroupServFails(DynBlocksTest):
_dynBlockQPS = 10