]> granicus.if.org Git - pdns/commitdiff
rec: Add some python regression tests for RPZ qname policies
authorRemi Gacogne <remi.gacogne@powerdns.com>
Fri, 11 Jan 2019 09:27:49 +0000 (10:27 +0100)
committerRemi Gacogne <remi.gacogne@powerdns.com>
Fri, 11 Jan 2019 09:27:49 +0000 (10:27 +0100)
regression-tests.recursor-dnssec/test_RPZ.py

index 9c3c143468e14c9a1149f94f6b09d10cbe213567..cbaee35f22d8e7f7f34166fe16749ff6456773ce 100644 (file)
@@ -110,6 +110,8 @@ class RPZServer(object):
                     dns.rrset.from_text('e.example.zone.rpz.', 60, dns.rdataclass.IN, dns.rdatatype.A, '192.0.2.1', '192.0.2.2'),
                     dns.rrset.from_text('zone.rpz.', 60, dns.rdataclass.IN, dns.rdatatype.SOA, 'ns.zone.rpz. hostmaster.zone.rpz. %d 3600 3600 3600 1' % newSerial),
                     dns.rrset.from_text('e.example.zone.rpz.', 60, dns.rdataclass.IN, dns.rdatatype.A, '192.0.2.2'),
+                    dns.rrset.from_text('tc.example.zone.rpz.', 60, dns.rdataclass.IN, dns.rdatatype.CNAME, 'rpz-tcp-only.'),
+                    dns.rrset.from_text('drop.example.zone.rpz.', 60, dns.rdataclass.IN, dns.rdatatype.CNAME, 'rpz-drop.'),
                     dns.rrset.from_text('zone.rpz.', 60, dns.rdataclass.IN, dns.rdatatype.SOA, 'ns.zone.rpz. hostmaster.zone.rpz. %d 3600 3600 3600 1' % newSerial)
                     ]
 
@@ -244,13 +246,17 @@ e 3600 IN A 192.0.2.42
         query.flags |= dns.flags.CD
         if adQuery:
             query.flags |= dns.flags.AD
-        res = self.sendUDPQuery(query)
-        if shouldBeBlocked:
-            expected = dns.rrset.from_text(name, 0, dns.rdataclass.IN, 'A', '192.0.2.1')
-        else:
-            expected = dns.rrset.from_text(name, 0, dns.rdataclass.IN, 'A', '192.0.2.42')
 
-        self.assertRRsetInAnswer(res, expected)
+        for method in ("sendUDPQuery", "sendTCPQuery"):
+            sender = getattr(self, method)
+            res = sender(query)
+            self.assertRcodeEqual(res, dns.rcode.NOERROR)
+            if shouldBeBlocked:
+                expected = dns.rrset.from_text(name, 0, dns.rdataclass.IN, 'A', '192.0.2.1')
+            else:
+                expected = dns.rrset.from_text(name, 0, dns.rdataclass.IN, 'A', '192.0.2.42')
+
+            self.assertRRsetInAnswer(res, expected)
 
     def checkNotBlocked(self, name, adQuery=False):
         self.checkBlocked(name, False, adQuery)
@@ -258,16 +264,45 @@ e 3600 IN A 192.0.2.42
     def checkCustom(self, qname, qtype, expected):
         query = dns.message.make_query(qname, qtype, want_dnssec=True)
         query.flags |= dns.flags.CD
-        res = self.sendUDPQuery(query)
-
-        self.assertRRsetInAnswer(res, expected)
+        for method in ("sendUDPQuery", "sendTCPQuery"):
+            sender = getattr(self, method)
+            res = sender(query)
+            self.assertRcodeEqual(res, dns.rcode.NOERROR)
+            self.assertRRsetInAnswer(res, expected)
 
     def checkNoData(self, qname, qtype):
+        query = dns.message.make_query(qname, qtype, want_dnssec=True)
+        query.flags |= dns.flags.CD
+        for method in ("sendUDPQuery", "sendTCPQuery"):
+            sender = getattr(self, method)
+            res = sender(query)
+            self.assertRcodeEqual(res, dns.rcode.NOERROR)
+            self.assertEqual(len(res.answer), 0)
+
+    def checkTruncated(self, qname, qtype='A'):
         query = dns.message.make_query(qname, qtype, want_dnssec=True)
         query.flags |= dns.flags.CD
         res = self.sendUDPQuery(query)
+        self.assertRcodeEqual(res, dns.rcode.NOERROR)
+        self.assertMessageHasFlags(res, ['QR', 'RA', 'RD', 'CD', 'TC'])
+        self.assertEqual(len(res.answer), 0)
+        self.assertEqual(len(res.authority), 0)
+        self.assertEqual(len(res.additional), 0)
 
+        res = self.sendTCPQuery(query)
+        self.assertRcodeEqual(res, dns.rcode.NXDOMAIN)
+        self.assertMessageHasFlags(res, ['QR', 'RA', 'RD', 'CD'])
         self.assertEqual(len(res.answer), 0)
+        self.assertEqual(len(res.authority), 1)
+        self.assertEqual(len(res.additional), 0)
+
+    def checkDropped(self, qname, qtype='A'):
+        query = dns.message.make_query(qname, qtype, want_dnssec=True)
+        query.flags |= dns.flags.CD
+        for method in ("sendUDPQuery", "sendTCPQuery"):
+            sender = getattr(self, method)
+            res = sender(query)
+            self.assertEqual(res, None)
 
     def waitUntilCorrectSerialIsLoaded(self, serial, timeout=5):
         global rpzServer
@@ -357,7 +392,7 @@ e 3600 IN A 192.0.2.42
 
         # seventh zone, e should only have one A
         self.waitUntilCorrectSerialIsLoaded(7)
-        self.checkRPZStats(7, 2, 2, self._xfrDone)
+        self.checkRPZStats(7, 4, 2, self._xfrDone)
         self.checkNotBlocked('a.example.')
         self.checkNotBlocked('b.example.')
         self.checkNotBlocked('c.example.')
@@ -368,3 +403,6 @@ e 3600 IN A 192.0.2.42
         self.checkCustom('f.example.', 'A', dns.rrset.from_text('f.example.', 0, dns.rdataclass.IN, 'CNAME', 'e.example.'))
         # check that the policy is disabled for AD=1 queries
         self.checkNotBlocked('e.example.', True)
+        # check non-custom policies
+        self.checkTruncated('tc.example.')
+        self.checkDropped('drop.example.')