From: Remi Gacogne Date: Fri, 11 Jan 2019 09:27:49 +0000 (+0100) Subject: rec: Add some python regression tests for RPZ qname policies X-Git-Tag: rec-4.2.0-alpha1~35^2~1 X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=d13c4d181184db1a676da8b3f58063ee2a0d53ae;p=pdns rec: Add some python regression tests for RPZ qname policies --- diff --git a/regression-tests.recursor-dnssec/test_RPZ.py b/regression-tests.recursor-dnssec/test_RPZ.py index 9c3c14346..cbaee35f2 100644 --- a/regression-tests.recursor-dnssec/test_RPZ.py +++ b/regression-tests.recursor-dnssec/test_RPZ.py @@ -110,6 +110,8 @@ class RPZServer(object): dns.rrset.from_text('e.example.zone.rpz.', 60, dns.rdataclass.IN, dns.rdatatype.A, '192.0.2.1', '192.0.2.2'), dns.rrset.from_text('zone.rpz.', 60, dns.rdataclass.IN, dns.rdatatype.SOA, 'ns.zone.rpz. hostmaster.zone.rpz. %d 3600 3600 3600 1' % newSerial), dns.rrset.from_text('e.example.zone.rpz.', 60, dns.rdataclass.IN, dns.rdatatype.A, '192.0.2.2'), + dns.rrset.from_text('tc.example.zone.rpz.', 60, dns.rdataclass.IN, dns.rdatatype.CNAME, 'rpz-tcp-only.'), + dns.rrset.from_text('drop.example.zone.rpz.', 60, dns.rdataclass.IN, dns.rdatatype.CNAME, 'rpz-drop.'), dns.rrset.from_text('zone.rpz.', 60, dns.rdataclass.IN, dns.rdatatype.SOA, 'ns.zone.rpz. hostmaster.zone.rpz. %d 3600 3600 3600 1' % newSerial) ] @@ -244,13 +246,17 @@ e 3600 IN A 192.0.2.42 query.flags |= dns.flags.CD if adQuery: query.flags |= dns.flags.AD - res = self.sendUDPQuery(query) - if shouldBeBlocked: - expected = dns.rrset.from_text(name, 0, dns.rdataclass.IN, 'A', '192.0.2.1') - else: - expected = dns.rrset.from_text(name, 0, dns.rdataclass.IN, 'A', '192.0.2.42') - self.assertRRsetInAnswer(res, expected) + for method in ("sendUDPQuery", "sendTCPQuery"): + sender = getattr(self, method) + res = sender(query) + self.assertRcodeEqual(res, dns.rcode.NOERROR) + if shouldBeBlocked: + expected = dns.rrset.from_text(name, 0, dns.rdataclass.IN, 'A', '192.0.2.1') + else: + expected = dns.rrset.from_text(name, 0, dns.rdataclass.IN, 'A', '192.0.2.42') + + self.assertRRsetInAnswer(res, expected) def checkNotBlocked(self, name, adQuery=False): self.checkBlocked(name, False, adQuery) @@ -258,16 +264,45 @@ e 3600 IN A 192.0.2.42 def checkCustom(self, qname, qtype, expected): query = dns.message.make_query(qname, qtype, want_dnssec=True) query.flags |= dns.flags.CD - res = self.sendUDPQuery(query) - - self.assertRRsetInAnswer(res, expected) + for method in ("sendUDPQuery", "sendTCPQuery"): + sender = getattr(self, method) + res = sender(query) + self.assertRcodeEqual(res, dns.rcode.NOERROR) + self.assertRRsetInAnswer(res, expected) def checkNoData(self, qname, qtype): + query = dns.message.make_query(qname, qtype, want_dnssec=True) + query.flags |= dns.flags.CD + for method in ("sendUDPQuery", "sendTCPQuery"): + sender = getattr(self, method) + res = sender(query) + self.assertRcodeEqual(res, dns.rcode.NOERROR) + self.assertEqual(len(res.answer), 0) + + def checkTruncated(self, qname, qtype='A'): query = dns.message.make_query(qname, qtype, want_dnssec=True) query.flags |= dns.flags.CD res = self.sendUDPQuery(query) + self.assertRcodeEqual(res, dns.rcode.NOERROR) + self.assertMessageHasFlags(res, ['QR', 'RA', 'RD', 'CD', 'TC']) + self.assertEqual(len(res.answer), 0) + self.assertEqual(len(res.authority), 0) + self.assertEqual(len(res.additional), 0) + res = self.sendTCPQuery(query) + self.assertRcodeEqual(res, dns.rcode.NXDOMAIN) + self.assertMessageHasFlags(res, ['QR', 'RA', 'RD', 'CD']) self.assertEqual(len(res.answer), 0) + self.assertEqual(len(res.authority), 1) + self.assertEqual(len(res.additional), 0) + + def checkDropped(self, qname, qtype='A'): + query = dns.message.make_query(qname, qtype, want_dnssec=True) + query.flags |= dns.flags.CD + for method in ("sendUDPQuery", "sendTCPQuery"): + sender = getattr(self, method) + res = sender(query) + self.assertEqual(res, None) def waitUntilCorrectSerialIsLoaded(self, serial, timeout=5): global rpzServer @@ -357,7 +392,7 @@ e 3600 IN A 192.0.2.42 # seventh zone, e should only have one A self.waitUntilCorrectSerialIsLoaded(7) - self.checkRPZStats(7, 2, 2, self._xfrDone) + self.checkRPZStats(7, 4, 2, self._xfrDone) self.checkNotBlocked('a.example.') self.checkNotBlocked('b.example.') self.checkNotBlocked('c.example.') @@ -368,3 +403,6 @@ e 3600 IN A 192.0.2.42 self.checkCustom('f.example.', 'A', dns.rrset.from_text('f.example.', 0, dns.rdataclass.IN, 'CNAME', 'e.example.')) # check that the policy is disabled for AD=1 queries self.checkNotBlocked('e.example.', True) + # check non-custom policies + self.checkTruncated('tc.example.') + self.checkDropped('drop.example.')