dns.rrset.from_text('e.example.zone.rpz.', 60, dns.rdataclass.IN, dns.rdatatype.A, '192.0.2.1', '192.0.2.2'),
dns.rrset.from_text('zone.rpz.', 60, dns.rdataclass.IN, dns.rdatatype.SOA, 'ns.zone.rpz. hostmaster.zone.rpz. %d 3600 3600 3600 1' % newSerial),
dns.rrset.from_text('e.example.zone.rpz.', 60, dns.rdataclass.IN, dns.rdatatype.A, '192.0.2.2'),
+ dns.rrset.from_text('tc.example.zone.rpz.', 60, dns.rdataclass.IN, dns.rdatatype.CNAME, 'rpz-tcp-only.'),
+ dns.rrset.from_text('drop.example.zone.rpz.', 60, dns.rdataclass.IN, dns.rdatatype.CNAME, 'rpz-drop.'),
dns.rrset.from_text('zone.rpz.', 60, dns.rdataclass.IN, dns.rdatatype.SOA, 'ns.zone.rpz. hostmaster.zone.rpz. %d 3600 3600 3600 1' % newSerial)
]
query.flags |= dns.flags.CD
if adQuery:
query.flags |= dns.flags.AD
- res = self.sendUDPQuery(query)
- if shouldBeBlocked:
- expected = dns.rrset.from_text(name, 0, dns.rdataclass.IN, 'A', '192.0.2.1')
- else:
- expected = dns.rrset.from_text(name, 0, dns.rdataclass.IN, 'A', '192.0.2.42')
- self.assertRRsetInAnswer(res, expected)
+ for method in ("sendUDPQuery", "sendTCPQuery"):
+ sender = getattr(self, method)
+ res = sender(query)
+ self.assertRcodeEqual(res, dns.rcode.NOERROR)
+ if shouldBeBlocked:
+ expected = dns.rrset.from_text(name, 0, dns.rdataclass.IN, 'A', '192.0.2.1')
+ else:
+ expected = dns.rrset.from_text(name, 0, dns.rdataclass.IN, 'A', '192.0.2.42')
+
+ self.assertRRsetInAnswer(res, expected)
def checkNotBlocked(self, name, adQuery=False):
self.checkBlocked(name, False, adQuery)
def checkCustom(self, qname, qtype, expected):
query = dns.message.make_query(qname, qtype, want_dnssec=True)
query.flags |= dns.flags.CD
- res = self.sendUDPQuery(query)
-
- self.assertRRsetInAnswer(res, expected)
+ for method in ("sendUDPQuery", "sendTCPQuery"):
+ sender = getattr(self, method)
+ res = sender(query)
+ self.assertRcodeEqual(res, dns.rcode.NOERROR)
+ self.assertRRsetInAnswer(res, expected)
def checkNoData(self, qname, qtype):
+ query = dns.message.make_query(qname, qtype, want_dnssec=True)
+ query.flags |= dns.flags.CD
+ for method in ("sendUDPQuery", "sendTCPQuery"):
+ sender = getattr(self, method)
+ res = sender(query)
+ self.assertRcodeEqual(res, dns.rcode.NOERROR)
+ self.assertEqual(len(res.answer), 0)
+
+ def checkTruncated(self, qname, qtype='A'):
query = dns.message.make_query(qname, qtype, want_dnssec=True)
query.flags |= dns.flags.CD
res = self.sendUDPQuery(query)
+ self.assertRcodeEqual(res, dns.rcode.NOERROR)
+ self.assertMessageHasFlags(res, ['QR', 'RA', 'RD', 'CD', 'TC'])
+ self.assertEqual(len(res.answer), 0)
+ self.assertEqual(len(res.authority), 0)
+ self.assertEqual(len(res.additional), 0)
+ res = self.sendTCPQuery(query)
+ self.assertRcodeEqual(res, dns.rcode.NXDOMAIN)
+ self.assertMessageHasFlags(res, ['QR', 'RA', 'RD', 'CD'])
self.assertEqual(len(res.answer), 0)
+ self.assertEqual(len(res.authority), 1)
+ self.assertEqual(len(res.additional), 0)
+
+ def checkDropped(self, qname, qtype='A'):
+ query = dns.message.make_query(qname, qtype, want_dnssec=True)
+ query.flags |= dns.flags.CD
+ for method in ("sendUDPQuery", "sendTCPQuery"):
+ sender = getattr(self, method)
+ res = sender(query)
+ self.assertEqual(res, None)
def waitUntilCorrectSerialIsLoaded(self, serial, timeout=5):
global rpzServer
# seventh zone, e should only have one A
self.waitUntilCorrectSerialIsLoaded(7)
- self.checkRPZStats(7, 2, 2, self._xfrDone)
+ self.checkRPZStats(7, 4, 2, self._xfrDone)
self.checkNotBlocked('a.example.')
self.checkNotBlocked('b.example.')
self.checkNotBlocked('c.example.')
self.checkCustom('f.example.', 'A', dns.rrset.from_text('f.example.', 0, dns.rdataclass.IN, 'CNAME', 'e.example.'))
# check that the policy is disabled for AD=1 queries
self.checkNotBlocked('e.example.', True)
+ # check non-custom policies
+ self.checkTruncated('tc.example.')
+ self.checkDropped('drop.example.')