confdir = os.path.join('configs', cls._confdir)
cls.wipeRecursorCache(confdir)
- @classmethod
- def sendQuery(self, name, rdtype, useTCP=False):
- """Helper function that creates the query"""
- msg = dns.message.make_query(name, rdtype, want_dnssec=True)
- msg.flags |= dns.flags.AD
-
- if useTCP:
- return self.sendTCPQuery(msg)
- return self.sendUDPQuery(msg)
-
def testSecureAnswer(self):
res = self.sendQuery('ns.secure.example.', 'A')
expected = dns.rrset.from_text('ns.secure.example.', 0, dns.rdataclass.IN, 'A', '{prefix}.10'.format(prefix=self._PREFIX))
print(expectedResponse)
print(response)
self.assertEquals(response, expectedResponse)
+
+ @classmethod
+ def sendQuery(cls, name, rdtype, useTCP=False):
+ """Helper function that creates the query"""
+ msg = dns.message.make_query(name, rdtype, want_dnssec=True)
+ msg.flags |= dns.flags.AD
+
+ if useTCP:
+ return cls.sendTCPQuery(msg)
+ return cls.sendUDPQuery(msg)
+
+ def createQuery(self, name, rdtype, flags, ednsflags):
+ """Helper function that creates the query with the specified flags.
+ The flags need to be strings (no checking is performed atm)"""
+ msg = dns.message.make_query(name, rdtype)
+ msg.flags = dns.flags.from_text(flags)
+ msg.flags += dns.flags.from_text('RD')
+ msg.use_edns(edns=0, ednsflags=dns.flags.edns_from_text(ednsflags))
+ return msg
cls._recursor = recursor
cls.tearDownRecursor()
- def createQuery(self, name, rdtype, flags, ednsflags):
- """Helper function that creates the query with the specified flags.
- The flags need to be strings (no checking is performed atm)"""
- msg = dns.message.make_query(name, rdtype)
- msg.flags = dns.flags.from_text(flags)
- msg.flags += dns.flags.from_text('RD')
- msg.use_edns(edns=0, ednsflags=dns.flags.edns_from_text(ednsflags))
- return msg
-
def getQueryForSecure(self, flags='', ednsflags=''):
return self.createQuery('ns1.example.', 'A', flags, ednsflags)
--- /dev/null
+import dns
+import os
+from recursortests import RecursorTest
+
+class testBogusMaxTTL(RecursorTest):
+ _confdir = 'BogusMaxTTL'
+
+ _config_template = """dnssec=validate
+max-cache-bogus-ttl=5"""
+
+ @classmethod
+ def setUp(cls):
+ confdir = os.path.join('configs', cls._confdir)
+ cls.wipeRecursorCache(confdir)
+
+ def testBogusCheckDisabled(self):
+ # first query with CD=0, so we should get a ServFail
+ query = self.createQuery('ted.bogus.example.', 'A', 'AD', 'DO')
+ res = self.sendUDPQuery(query)
+ self.assertRcodeEqual(res, dns.rcode.SERVFAIL)
+
+ # then with CD=1 so we should get the A + RRSIG
+ # check that we correctly applied the maximum TTL when caching Bogus entries
+ query = self.createQuery('ted.bogus.example.', 'A', 'AD CD', 'DO')
+ res = self.sendUDPQuery(query)
+ self.assertMessageHasFlags(res, ['CD', 'QR', 'RA', 'RD'], ['DO'])
+ self.assertRcodeEqual(res, dns.rcode.NOERROR)
+ self.assertEquals(len(res.answer), 2)
+ for ans in res.answer:
+ self.assertLessEqual(ans.ttl, 5)