--- /dev/null
+#!/usr/bin/env python2
+
+import errno
+import shutil
+import os
+import socket
+import struct
+import subprocess
+import sys
+import time
+import unittest
+import dns
+import dns.message
+
+class RecursorTest(unittest.TestCase):
+ """
+ Setup all recursors and auths required for the tests
+ """
+
+ _confdir = 'recursor'
+
+ _recursorStartupDelay = 2.0
+ _recursorPort = 5300
+
+ _recursor = None
+
+ _PREFIX = os.environ['PREFIX']
+
+ _config_template_default = """
+daemon=no
+trace=yes
+dont-query=
+local-address=127.0.0.1
+packetcache-ttl=0
+packetcache-servfail-ttl=0
+max-cache-ttl=15
+threads=1
+loglevel=9
+disable-syslog=yes
+"""
+ _config_template = """
+"""
+ _config_params = []
+ _lua_config_file = None
+ _roothints = """
+. 3600 IN NS ns.root.
+ns.root. 3600 IN A %s.8
+""" % _PREFIX
+ _root_DS = "63149 13 1 a59da3f5c1b97fcd5fa2b3b2b0ac91d38a60d33a"
+
+ # The default SOA for zones in the authoritative servers
+ _SOA = "ns1.example.net. hostmaster.example.net. 1 3600 1800 1209600 300"
+
+ # The definitions of the zones on the authoritative servers, the key is the
+ # zonename and the value is the zonefile content. several strings are replaced:
+ # - {soa} => value of _SOA
+ # - {prefix} value of _PREFIX
+ _zones = {
+ 'ROOT': """
+. 3600 IN SOA {soa}
+. 3600 IN NS ns.root.
+ns.root. 3600 IN A {prefix}.8
+
+example. 3600 IN NS ns1.example.
+example. 3600 IN NS ns2.example.
+example. 3600 IN DS 53174 13 1 50c9e913818767c236c06c2d8272723cb78cbf26
+
+ns1.example. 3600 IN A {prefix}.10
+ns2.example. 3600 IN A {prefix}.11
+ """,
+ 'example': """
+example. 3600 IN SOA {soa}
+example. 3600 IN NS ns1.nic.example.
+example. 3600 IN NS ns2.nic.example.
+ns1.example. 3600 IN A {prefix}.10
+ns2.example. 3600 IN A {prefix}.11
+
+secure.example. 3600 IN NS ns.secure.example.
+secure.example. 3600 IN DS 64723 13 1 53eb985040d3a89bacf29dbddb55a65834706f33
+ns.secure.example. 3600 IN A {prefix}.9
+
+bogus.example. 3600 IN NS ns.bogus.example.
+bogus.example. 3600 IN DS 65034 13 1 6df3bb50ea538e90eacdd7ae5419730783abb0ee
+ns.bogus.example. 3600 IN A {prefix}.12
+
+insecure.example. 3600 IN NS ns.insecure.example.
+ns.insecure.example. 3600 IN A {prefix}.13
+ """,
+ 'secure.example': """
+secure.example. 3600 IN SOA {soa}
+secure.example. 3600 IN NS ns.secure.example.
+ns.secure.example. 3600 IN A {prefix}.9
+
+host1.secure.example. 3600 IN A 192.0.2.2
+ """,
+ 'bogus.example': """
+bogus.example. 3600 IN SOA {soa}
+bogus.example. 3600 IN NS ns1.bogus.example.
+ns1.bogus.example. 3600 IN A {prefix}.12
+ted.bogus.example. 3600 IN A 192.0.2.1
+bill.bogus.example. 3600 IN AAAA 2001:db8:12::3
+ """,
+ 'insecure.example': """
+insecure.example. 3600 IN SOA {soa}
+insecure.example. 3600 IN NS ns1.insecure.example.
+ns1.insecure.example. 3600 IN A {prefix}.13
+
+node1.insecure.example. 3600 IN A 192.0.2.6
+ """
+ }
+
+ # The private keys for the zones (note that DS records should go into
+ # the zonecontent in _zones
+ _zone_keys = {
+ 'ROOT': """
+Private-key-format: v1.2
+Algorithm: 13 (ECDSAP256SHA256)
+PrivateKey: rhWuEydDz3QaIspSVj683B8Xq5q/ozzA38XUgzD4Fbo=
+ """,
+
+ 'example': """
+Private-key-format: v1.2
+Algorithm: 13 (ECDSAP256SHA256)
+PrivateKey: Lt0v0Gol3pRUFM7fDdcy0IWN0O/MnEmVPA+VylL8Y4U=
+ """,
+
+ 'secure.example': """
+Private-key-format: v1.2
+Algorithm: 13 (ECDSAP256SHA256)
+PrivateKey: 1G4WRoOFJJXk+fotDCHVORtJmIG2OUhKi8AO2jDPGZA=
+ """,
+
+ 'bogus.example': """
+Private-key-format: v1.2
+Algorithm: 13 (ECDSAP256SHA256)
+PrivateKey: f5jV7Q8kd5hDpMWObsuQ6SQda0ftf+JrO3uZwEg6nVw=
+ """,
+ }
+
+ # This dict is keyed with the suffix of the IP address and its value
+ # is a list of zones hosted on that IP. Note that delegations should
+ # go into the _zones's zonecontent
+ _auth_zones = {
+ '8': ['ROOT'],
+ '9': ['secure.example'],
+ '10': ['example'],
+ '11': ['example'],
+ '12': ['bogus.example'],
+ '13': ['insecure.example']
+ }
+
+ _auths = {}
+
+ @classmethod
+ def createConfigDir(cls, confdir):
+ try:
+ shutil.rmtree(confdir)
+ except OSError as e:
+ if e.errno != errno.ENOENT:
+ raise
+ os.mkdir(confdir, 0755)
+
+ @classmethod
+ def generateAuthZone(cls, confdir, zonename, zonecontent):
+ with open(os.path.join(confdir, '%s.zone' % zonename), 'w') as zonefile:
+ zonefile.write(zonecontent.format(prefix=cls._PREFIX, soa=cls._SOA))
+
+ @classmethod
+ def generateAuthNamedConf(cls, confdir, zones):
+ with open(os.path.join(confdir, 'named.conf'), 'w') as namedconf:
+ namedconf.write("""
+options {
+ directory "%s";
+};""" % confdir)
+ for zonename in zones:
+ zone = '.' if zonename == 'ROOT' else zonename
+
+ namedconf.write("""
+ zone "%s" {
+ type master;
+ file "%s.zone";
+ };""" % (zone, zonename))
+
+ @classmethod
+ def generateAuthConfig(cls, confdir):
+ bind_dnssec_db = os.path.join(confdir, 'bind-dnssec.sqlite3')
+
+ with open(os.path.join(confdir, 'pdns.conf'), 'w') as pdnsconf:
+ pdnsconf.write("""
+module-dir=../regression-tests/modules
+launch=bind
+daemon=no
+local-ipv6=
+bind-config={confdir}/named.conf
+bind-dnssec-db={bind_dnssec_db}
+socket-dir={confdir}
+cache-ttl=0
+negquery-cache-ttl=0
+query-cache-ttl=0
+log-dns-queries=yes
+log-dns-details=yes
+loglevel=9
+distributor-threads=1""".format(confdir=confdir,
+ bind_dnssec_db=bind_dnssec_db))
+
+ pdnsutilCmd = [os.environ['PDNSUTIL'],
+ '--config-dir=%s' % confdir,
+ 'create-bind-db',
+ bind_dnssec_db]
+
+ print ' '.join(pdnsutilCmd)
+ try:
+ subprocess.check_output(pdnsutilCmd, stderr=subprocess.STDOUT)
+ except subprocess.CalledProcessError as e:
+ print e.output
+ raise
+
+ @classmethod
+ def secureZone(cls, confdir, zonename, key=None):
+ zone = '.' if zonename == 'ROOT' else zonename
+ if not key:
+ pdnsutilCmd = [os.environ['PDNSUTIL'],
+ '--config-dir=%s' % confdir,
+ 'secure-zone',
+ zone]
+ else:
+ keyfile = os.path.join(confdir, 'dnssec.key')
+ with open(keyfile, 'w') as fdKeyfile:
+ fdKeyfile.write(key)
+
+ pdnsutilCmd = [os.environ['PDNSUTIL'],
+ '--config-dir=%s' % confdir,
+ 'import-zone-key',
+ zone,
+ keyfile,
+ 'active',
+ 'ksk']
+
+ print ' '.join(pdnsutilCmd)
+ try:
+ subprocess.check_output(pdnsutilCmd, stderr=subprocess.STDOUT)
+ except subprocess.CalledProcessError as e:
+ print e.output
+ raise
+
+ @classmethod
+ def generateAllAuthConfig(cls, confdir):
+ if cls._auth_zones:
+ for auth_suffix, zones in cls._auth_zones.items():
+ authconfdir = os.path.join(confdir, 'auth-%s' % auth_suffix)
+
+ os.mkdir(authconfdir)
+
+ cls.generateAuthConfig(authconfdir)
+ cls.generateAuthNamedConf(authconfdir, zones)
+
+ for zonename, content in cls._zones.items():
+ cls.generateAuthZone(authconfdir, zonename, content)
+ if cls._zone_keys.get(zonename, None):
+ cls.secureZone(authconfdir, zonename, cls._zone_keys.get(zonename))
+
+ @classmethod
+ def startAllAuth(cls, confdir):
+ if cls._auth_zones:
+ for auth_suffix, _ in cls._auth_zones.items():
+ authconfdir = os.path.join(confdir, 'auth-%s' % auth_suffix)
+ ipaddress = cls._PREFIX + '.' + auth_suffix
+ cls.startAuth(authconfdir, ipaddress)
+
+ @classmethod
+ def startAuth(cls, confdir, ipaddress):
+ print("Launching pdns_server..")
+ authcmd = ['authbind',
+ os.environ['PDNS'],
+ '--config-dir=%s' % confdir,
+ '--local-address=%s' % ipaddress]
+ print(' '.join(authcmd))
+
+ logFile = os.path.join(confdir, 'pdns.log')
+ with open(logFile, 'w') as fdLog:
+ cls._auths[ipaddress] = subprocess.Popen(authcmd, close_fds=True,
+ stdout=fdLog, stderr=fdLog)
+
+ time.sleep(2)
+
+ if cls._auths[ipaddress].poll() is not None:
+ try:
+ cls._auths[ipaddress].kill()
+ except OSError as e:
+ if e.errno != errno.ESRCH:
+ raise
+ with open(logFile, 'r') as fdLog:
+ print fdLog.read()
+ sys.exit(cls._auths[ipaddress].returncode)
+
+ @classmethod
+ def generateRecursorConfig(cls, confdir):
+ params = tuple([getattr(cls, param) for param in cls._config_params])
+ if len(params):
+ print(params)
+
+ recursorconf = os.path.join(confdir, 'recursor.conf')
+
+ with open(recursorconf, 'w') as conf:
+ conf.write("# Autogenerated by recursortests.py\n")
+ conf.write(cls._config_template_default)
+ conf.write(cls._config_template % params)
+ conf.write("\n")
+ conf.write("socket-dir=%s\n" % confdir)
+ if cls._lua_config_file or cls._root_DS:
+ luaconfpath = os.path.join(confdir, 'conffile.lua')
+ with open(luaconfpath, 'w') as luaconf:
+ if cls._root_DS:
+ luaconf.write("addDS('.', '%s')" % cls._root_DS)
+ if cls._lua_config_file:
+ luaconf.write(cls._lua_config_file)
+ conf.write("lua-config-file=%s\n" % luaconfpath)
+ if cls._roothints:
+ roothintspath = os.path.join(confdir, 'root.hints')
+ with open(roothintspath, 'w') as roothints:
+ roothints.write(cls._roothints)
+ conf.write("hint-file=%s\n" % roothintspath)
+
+ @classmethod
+ def startRecursor(cls, confdir, port):
+ print("Launching pdns_recursor..")
+ recursorcmd = [os.environ['PDNSRECURSOR'],
+ '--config-dir=%s' % confdir,
+ '--local-port=%s' % port]
+ print(' '.join(recursorcmd))
+
+ logFile = os.path.join(confdir, 'recursor.log')
+ with open(logFile, 'w') as fdLog:
+ cls._recursor = subprocess.Popen(recursorcmd, close_fds=True,
+ stdout=fdLog, stderr=fdLog)
+
+ if 'PDNSRECURSOR_FAST_TESTS' in os.environ:
+ delay = 0.5
+ else:
+ delay = cls._recursorStartupDelay
+
+ time.sleep(delay)
+
+ if cls._recursor.poll() is not None:
+ try:
+ cls._recursor.kill()
+ except OSError as e:
+ if e.errno != errno.ESRCH:
+ raise
+ with open(logFile, 'r') as fdLog:
+ print fdLog.read()
+ sys.exit(cls._recursor.returncode)
+
+ @classmethod
+ def wipeRecursorCache(cls, confdir):
+ rec_controlCmd = [os.environ['RECCONTROL'],
+ '--config-dir=%s' % confdir,
+ 'wipe-cache',
+ '.$']
+ try:
+ subprocess.check_output(rec_controlCmd, stderr=subprocess.STDOUT)
+ except subprocess.CalledProcessError as e:
+ print e.output
+ raise
+
+ @classmethod
+ def setUpSockets(cls):
+ print("Setting up UDP socket..")
+ cls._sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+ cls._sock.settimeout(2.0)
+ cls._sock.connect(("127.0.0.1", cls._recursorPort))
+
+ @classmethod
+ def setUpClass(cls):
+ cls.setUpSockets()
+ confdir = os.path.join('configs', cls._confdir)
+ cls.createConfigDir(confdir)
+ cls.generateAllAuthConfig(confdir)
+ cls.startAllAuth(confdir)
+
+ cls.generateRecursorConfig(confdir)
+ cls.startRecursor(confdir, cls._recursorPort)
+
+ print("Launching tests..")
+
+ @classmethod
+ def tearDownClass(cls):
+ cls.tearDownRecursor()
+ cls.tearDownAuth()
+
+ @classmethod
+ def tearDownAuth(cls):
+ if 'PDNSRECURSOR_FAST_TESTS' in os.environ:
+ delay = 0.1
+ else:
+ delay = 1.0
+
+ for _, auth in cls._auths.items():
+ try:
+ auth.terminate()
+ if auth.poll() is None:
+ time.sleep(delay)
+ if auth.poll() is None:
+ auth.kill()
+ auth.wait()
+ except OSError as e:
+ if e.errno != errno.ESRCH:
+ raise
+
+ @classmethod
+ def tearDownRecursor(cls):
+ if 'PDNSRECURSOR_FAST_TESTS' in os.environ:
+ delay = 0.1
+ else:
+ delay = 1.0
+ try:
+ if cls._recursor:
+ cls._recursor.terminate()
+ if cls._recursor.poll() is None:
+ time.sleep(delay)
+ if cls._recursor.poll() is None:
+ cls._recursor.kill()
+ cls._recursor.wait()
+ except OSError as e:
+ # There is a race-condition with the poll() and
+ # kill() statements, when the process is dead on the
+ # kill(), this is fine
+ if e.errno != errno.ESRCH:
+ raise
+
+
+ @classmethod
+ def sendUDPQuery(cls, query, timeout=2.0):
+ if timeout:
+ cls._sock.settimeout(timeout)
+
+ try:
+ cls._sock.send(query.to_wire())
+ data = cls._sock.recv(4096)
+ except socket.timeout:
+ data = None
+ finally:
+ if timeout:
+ cls._sock.settimeout(None)
+
+ message = None
+ if data:
+ message = dns.message.from_wire(data)
+ return message
+
+ @classmethod
+ def sendTCPQuery(cls, query, timeout=2.0):
+ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ if timeout:
+ sock.settimeout(timeout)
+
+ sock.connect(("127.0.0.1", cls._recursorPort))
+
+ try:
+ wire = query.to_wire()
+ sock.send(struct.pack("!H", len(wire)))
+ sock.send(wire)
+ data = sock.recv(2)
+ if data:
+ (datalen,) = struct.unpack("!H", data)
+ data = sock.recv(datalen)
+ except socket.timeout as e:
+ print("Timeout: %s" % (str(e)))
+ data = None
+ except socket.error as e:
+ print("Network error: %s" % (str(e)))
+ data = None
+ finally:
+ sock.close()
+
+ message = None
+ if data:
+ message = dns.message.from_wire(data)
+ return message
+
+ def setUp(self):
+ # This function is called before every tests
+ return
+
+ ## Functions for comparisons
+ def assertMessageHasFlags(self, msg, flags, ednsflags=[]):
+ """Asserts that msg has all the flags from flags set
+
+ @param msg: the dns.message.Message to check
+ @param flags: a list of strings with flag mnemonics (like ['RD', 'RA'])
+ @param ednsflags: a list of strings with edns-flag mnemonics (like ['DO'])"""
+
+ if not isinstance(msg, dns.message.Message):
+ raise TypeError("msg is not a dns.message.Message")
+
+ if isinstance(flags, list):
+ for elem in flags:
+ if not isinstance(elem, str):
+ raise TypeError("flags is not a list of strings")
+ else:
+ raise TypeError("flags is not a list of strings")
+
+ if isinstance(ednsflags, list):
+ for elem in ednsflags:
+ if not isinstance(elem, str):
+ raise TypeError("ednsflags is not a list of strings")
+ else:
+ raise TypeError("ednsflags is not a list of strings")
+
+ msgFlags = dns.flags.to_text(msg.flags).split()
+ missingFlags = [flag for flag in flags if flag not in msgFlags]
+
+ msgEdnsFlags = dns.flags.edns_to_text(msg.flags).split()
+ missingEdnsFlags = [ednsflag for ednsflag in ednsflags if ednsflag not in msgEdnsFlags]
+
+ if len(missingFlags) or len(missingEdnsFlags) or len(msgFlags) > len(flags):
+ raise AssertionError("Expected flags '%s' (EDNS: '%s'), found '%s' (EDNS: '%s') in query %s" %
+ (' '.join(flags), ' '.join(ednsflags),
+ ' '.join(msgFlags), ' '.join(msgEdnsFlags),
+ msg.question[0]))
+
+ def assertMessageIsAuthenticated(self, msg):
+ """Asserts that the message has the AD bit set
+
+ @param msg: the dns.message.Message to check"""
+
+ if not isinstance(msg, dns.message.Message):
+ raise TypeError("msg is not a dns.message.Message")
+
+ msgFlags = dns.flags.to_text(msg.flags)
+ self.assertTrue('AD' in msgFlags, "No AD flag found in the message for %s" % msg.question[0].name)
+
+ def assertRRsetInAnswer(self, msg, rrset):
+ """Asserts the rrset (without comparing TTL) exists in the
+ answer section of msg
+
+ @param msg: the dns.message.Message to check
+ @param rrset: a dns.rrset.RRset object"""
+
+ ret = ''
+ if not isinstance(msg, dns.message.Message):
+ raise TypeError("msg is not a dns.message.Message")
+
+ if not isinstance(rrset, dns.rrset.RRset):
+ raise TypeError("rrset is not a dns.rrset.RRset")
+
+ found = False
+ for ans in msg.answer:
+ ret += "%s\n" % ans.to_text()
+ if ans.match(rrset.name, rrset.rdclass, rrset.rdtype, 0, None):
+ self.assertEqual(ans, rrset)
+ found = True
+
+ if not found:
+ raise AssertionError("RRset not found in answer")
+
+ def assertMatchingRRSIGInAnswer(self, msg, coveredRRset, keys=None):
+ """Looks for coveredRRset in the answer section and if there is an RRSIG RRset
+ that covers that RRset. If keys is not None, this function will also try to
+ validate the RRset against the RRSIG
+
+ @param msg: The dns.message.Message to check
+ @param coveredRRset: The RRSet to check for
+ @param keys: a dictionary keyed by dns.name.Name with node or rdataset values to use for validation"""
+
+ if not isinstance(msg, dns.message.Message):
+ raise TypeError("msg is not a dns.message.Message")
+
+ if not isinstance(coveredRRset, dns.rrset.RRset):
+ raise TypeError("coveredRRset is not a dns.rrset.RRset")
+
+ msgRRsigRRSet = None
+ msgRRSet = None
+
+ ret = ''
+ for ans in msg.answer:
+ ret += ans.to_text() + "\n"
+
+ if ans.match(coveredRRset.name, coveredRRset.rdclass, coveredRRset.rdtype, 0, None):
+ msgRRSet = ans
+ if ans.match(coveredRRset.name, dns.rdataclass.IN, dns.rdatatype.RRSIG, coveredRRset.rdtype, None):
+ msgRRsigRRSet = ans
+ if msgRRSet and msgRRsigRRSet:
+ break
+
+ if not msgRRSet:
+ raise AssertionError("RRset for '%s' not found in answer" % msg.question[0].to_text())
+
+ if not msgRRsigRRSet:
+ raise AssertionError("No RRSIGs found in answer for %s:\nFull answer:\n%s" % (msg.question[0].to_text(), ret))
+
+ if keys:
+ try:
+ dns.dnssec.validate(msgRRSet, msgRRsigRRSet.to_rdataset(), keys)
+ except dns.dnssec.ValidationFailure as e:
+ raise AssertionError("Signature validation failed for %s:\n%s" % (msg.question[0].to_text(), e))
+
+ def assertNoRRSIGsInAnswer(self, msg):
+ """Checks if there are _no_ RRSIGs in the answer section of msg"""
+
+ if not isinstance(msg, dns.message.Message):
+ raise TypeError("msg is not a dns.message.Message")
+
+ ret = ""
+ for ans in msg.answer:
+ if ans.rdtype == dns.rdatatype.RRSIG:
+ ret += ans.name.to_text() + "\n"
+
+ if len(ret):
+ raise AssertionError("RRSIG found in answers for:\n%s" % ret)
+
+ def assertAnswerEmpty(self, msg):
+ self.assertTrue(len(msg.answer) == 0, "Data found in the the answer section for %s:\n%s" % (msg.question[0].to_text(), '\n'.join([i.to_text() for i in msg.answer])))
+
+ def assertRcodeEqual(self, msg, rcode):
+ if not isinstance(msg, dns.message.Message):
+ raise TypeError("msg is not a dns.message.Message but a %s" % type(msg))
+
+ if not isinstance(rcode, int):
+ if isinstance(rcode, str):
+ rcode = dns.rcode.from_text(rcode)
+ else:
+ raise TypeError("rcode is neither a str nor int")
+
+ if msg.rcode() != rcode:
+ msgRcode = dns.rcode._by_value[msg.rcode()]
+ wantedRcode = dns.rcode._by_value[rcode]
+
+ raise AssertionError("Rcode for %s is %s, expected %s." % (msg.question[0].to_text(), msgRcode, wantedRcode))
--- /dev/null
+import os
+import socket
+import unittest
+
+import dns
+from recursortests import RecursorTest
+
+class TestFlags(RecursorTest):
+ _confdir = 'Flags'
+ _config_template = """dnssec=%s"""
+ _config_params = ['_dnssec_setting']
+ _dnssec_setting = None
+ _recursors = {}
+
+ _dnssec_setting_ports = {'off': 5300, 'process': 5301, 'validate': 5302}
+
+ @classmethod
+ def setUp(cls):
+ for setting in cls._dnssec_setting_ports:
+ confdir = os.path.join('configs', cls._confdir, setting)
+ cls.wipeRecursorCache(confdir)
+
+ @classmethod
+ def setUpClass(cls):
+ cls.setUpSockets()
+ confdir = os.path.join('configs', cls._confdir)
+ cls.createConfigDir(confdir)
+
+ cls.generateAllAuthConfig(confdir)
+ cls.startAllAuth(confdir)
+
+ for dnssec_setting, port in cls._dnssec_setting_ports.items():
+ cls._dnssec_setting = dnssec_setting
+ recConfdir = os.path.join(confdir, dnssec_setting)
+ cls.createConfigDir(recConfdir)
+ cls.generateRecursorConfig(recConfdir)
+ cls.startRecursor(recConfdir, port)
+ cls._recursors[dnssec_setting] = cls._recursor
+
+ @classmethod
+ def setUpSockets(cls):
+ cls._sock = {}
+ for dnssec_setting, port in cls._dnssec_setting_ports.items():
+ print("Setting up UDP socket..")
+ cls._sock[dnssec_setting] = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+ cls._sock[dnssec_setting].settimeout(2.0)
+ cls._sock[dnssec_setting].connect(("127.0.0.1", port))
+
+ @classmethod
+ def sendUDPQuery(cls, query, dnssec_setting, timeout=2.0):
+ if timeout:
+ cls._sock[dnssec_setting].settimeout(timeout)
+
+ try:
+ cls._sock[dnssec_setting].send(query.to_wire())
+ data = cls._sock[dnssec_setting].recv(4096)
+ except socket.timeout:
+ data = None
+ finally:
+ if timeout:
+ cls._sock[dnssec_setting].settimeout(None)
+
+ msg = None
+ if data:
+ msg = dns.message.from_wire(data)
+ return msg
+
+ @classmethod
+ def tearDownClass(cls):
+ cls.tearDownAuth()
+ for _, recursor in cls._recursors.items():
+ cls._recursor = recursor
+ cls.tearDownRecursor()
+
+ def createQuery(self, name, rdtype, flags, ednsflags):
+ """Helper function that creates the query with the specified flags.
+ The flags need to be strings (no checking is performed atm)"""
+ msg = dns.message.make_query(name, rdtype)
+ msg.flags = dns.flags.from_text(flags)
+ msg.flags += dns.flags.from_text('RD')
+ msg.use_edns(edns=0, ednsflags=dns.flags.edns_from_text(ednsflags))
+ return msg
+
+ def getQueryForSecure(self, flags='', ednsflags=''):
+ return self.createQuery('ns1.example.', 'A', flags, ednsflags)
+
+ ##
+ # -AD -CD -DO
+ ##
+ def testOff_Secure_None(self):
+ msg = self.getQueryForSecure()
+ res = self.sendUDPQuery(msg, 'off')
+ self.assertMessageHasFlags(res, ['QR', 'RA', 'RD'])
+ self.assertNoRRSIGsInAnswer(res)
+
+ def testProcess_Secure_None(self):
+ msg = self.getQueryForSecure()
+ res = self.sendUDPQuery(msg, 'process')
+ self.assertMessageHasFlags(res, ['QR', 'RA', 'RD'])
+ self.assertNoRRSIGsInAnswer(res)
+
+ @unittest.skip("See #3682")
+ def testValidate_Secure_None(self):
+ msg = self.getQueryForSecure()
+ res = self.sendUDPQuery(msg, 'validate')
+ self.assertMessageHasFlags(res, ['QR', 'RA', 'RD'])
+ self.assertNoRRSIGsInAnswer(res)
+
+ ##
+ # +AD -CD -DO
+ ##
+ @unittest.skip("See #3682")
+ def testOff_Secure_AD(self):
+ msg = self.getQueryForSecure('AD')
+ res = self.sendUDPQuery(msg, 'off')
+ self.assertMessageHasFlags(res, ['QR', 'RA', 'RD'])
+
+ # Raises because #3682
+ self.assertNoRRSIGsInAnswer(res)
+
+ @unittest.skip("See #3682")
+ def testProcess_Secure_AD(self):
+ msg = self.getQueryForSecure('AD')
+ res = self.sendUDPQuery(msg, 'process')
+ self.assertMessageIsAuthenticated(res)
+ self.assertMessageHasFlags(res, ['AD', 'QR', 'RA', 'RD'])
+ self.assertNoRRSIGsInAnswer(res)
+
+ @unittest.skip("See #3682")
+ def testValidate_Secure_AD(self):
+ msg = self.getQueryForSecure('AD')
+ res = self.sendUDPQuery(msg, 'validate')
+
+ self.assertMessageIsAuthenticated(res)
+ self.assertMessageHasFlags(res, ['AD', 'RD', 'RA', 'QR'])
+ # Raises because #3682
+ self.assertNoRRSIGsInAnswer(res)
+
+ ##
+ # +AD -CD +DO
+ ##
+ def testOff_Secure_ADDO(self):
+ msg = self.getQueryForSecure('AD', 'DO')
+ res = self.sendUDPQuery(msg, 'off')
+
+ self.assertMessageHasFlags(res, ['QR', 'RA', 'RD'])
+ self.assertNoRRSIGsInAnswer(res)
+
+ @unittest.skip("See #3682")
+ def testProcess_Secure_ADDO(self):
+ msg = self.getQueryForSecure('AD', 'DO')
+ expected = dns.rrset.from_text('ns1.example.', 0, dns.rdataclass.IN, 'A', '{prefix}.10'.format(prefix=self._PREFIX))
+ res = self.sendUDPQuery(msg, 'process')
+
+ self.assertMessageIsAuthenticated(res)
+ self.assertMessageHasFlags(res, ['AD', 'QR', 'RA', 'RD'], ['DO'])
+ self.assertMatchingRRSIGInAnswer(res, expected)
+
+ def testValidate_Secure_ADDO(self):
+ msg = self.getQueryForSecure('AD', 'DO')
+ expected = dns.rrset.from_text('ns1.example.', 0, dns.rdataclass.IN, 'A', '{prefix}.10'.format(prefix=self._PREFIX))
+ res = self.sendUDPQuery(msg, 'validate')
+
+ self.assertMessageIsAuthenticated(res)
+ self.assertMessageHasFlags(res, ['AD', 'QR', 'RA', 'RD'], ['DO'])
+ self.assertMatchingRRSIGInAnswer(res, expected)
+
+ ##
+ # +AD +CD +DO
+ ##
+ def testOff_Secure_ADDOCD(self):
+ msg = self.getQueryForSecure('AD CD', 'DO')
+ res = self.sendUDPQuery(msg, 'off')
+
+ self.assertMessageHasFlags(res, ['QR', 'RA', 'RD', 'CD'])
+
+ def testProcess_Secure_ADDOCD(self):
+ msg = self.getQueryForSecure('AD CD', 'DO')
+ expected = dns.rrset.from_text('ns1.example.', 0, dns.rdataclass.IN, 'A', '{prefix}.10'.format(prefix=self._PREFIX))
+ res = self.sendUDPQuery(msg, 'process')
+
+ self.assertMessageIsAuthenticated(res)
+ self.assertMessageHasFlags(res, ['AD', 'CD', 'QR', 'RA', 'RD'], ['DO'])
+ self.assertMatchingRRSIGInAnswer(res, expected)
+
+ def testValidate_Secure_ADDOCD(self):
+ msg = self.getQueryForSecure('AD CD', 'DO')
+ expected = dns.rrset.from_text('ns1.example.', 0, dns.rdataclass.IN, 'A', '{prefix}.10'.format(prefix=self._PREFIX))
+ res = self.sendUDPQuery(msg, 'validate')
+
+ self.assertMessageIsAuthenticated(res)
+ self.assertMessageHasFlags(res, ['AD', 'QR', 'RA', 'RD', 'CD'], ['DO'])
+ self.assertMatchingRRSIGInAnswer(res, expected)
+
+ ##
+ # -AD -CD +DO
+ ##
+ def testOff_Secure_DO(self):
+ msg = self.getQueryForSecure('', 'DO')
+ res = self.sendUDPQuery(msg, 'off')
+
+ self.assertMessageHasFlags(res, ['QR', 'RA', 'RD'])
+ self.assertNoRRSIGsInAnswer(res)
+
+ @unittest.skip("See #3682")
+ def testProcess_Secure_DO(self):
+ msg = self.getQueryForSecure('', 'DO')
+ expected = dns.rrset.from_text('ns1.example.', 0, dns.rdataclass.IN, 'A', '{prefix}.10'.format(prefix=self._PREFIX))
+ res = self.sendUDPQuery(msg, 'process')
+
+ self.assertMessageHasFlags(res, ['QR', 'RA', 'RD'], ['DO'])
+ self.assertMatchingRRSIGInAnswer(res, expected)
+
+ @unittest.skip("See #3682")
+ def testValidate_Secure_DO(self):
+ msg = self.getQueryForSecure('', 'DO')
+ expected = dns.rrset.from_text('ns1.example.', 0, dns.rdataclass.IN, 'A', '{prefix}.10'.format(prefix=self._PREFIX))
+ res = self.sendUDPQuery(msg, 'validate')
+
+ self.assertMessageHasFlags(res, ['QR', 'RA', 'RD'], ['DO'])
+ self.assertMatchingRRSIGInAnswer(res, expected)
+
+ ##
+ # -AD +CD +DO
+ ##
+ @unittest.skip("See #3682")
+ def testOff_Secure_DOCD(self):
+ msg = self.getQueryForSecure('CD', 'DO')
+ res = self.sendUDPQuery(msg, 'off')
+
+ self.assertMessageHasFlags(res, ['QR', 'RA', 'RD'])
+ self.assertNoRRSIGsInAnswer(res)
+
+ @unittest.skip("See #3682")
+ def testProcess_Secure_DOCD(self):
+ msg = self.getQueryForSecure('CD', 'DO')
+ expected = dns.rrset.from_text('ns1.example.', 0, dns.rdataclass.IN, 'A', '{prefix}.10'.format(prefix=self._PREFIX))
+ res = self.sendUDPQuery(msg, 'process')
+
+ self.assertMessageHasFlags(res, ['QR', 'RA', 'RD', 'CD'], ['DO'])
+ self.assertMatchingRRSIGInAnswer(res, expected)
+
+ @unittest.skip("See #3682")
+ def testValidate_Secure_DOCD(self):
+ msg = self.getQueryForSecure('CD', 'DO')
+ expected = dns.rrset.from_text('ns1.example.', 0, dns.rdataclass.IN, 'A', '{prefix}.10'.format(prefix=self._PREFIX))
+ res = self.sendUDPQuery(msg, 'validate')
+
+ self.assertMessageHasFlags(res, ['QR', 'RA', 'RD', 'CD'], ['DO'])
+ self.assertMatchingRRSIGInAnswer(res, expected)
+
+ ##
+ # -AD +CD -DO
+ ##
+ @unittest.skip("See #3682")
+ def testOff_Secure_CD(self):
+ msg = self.getQueryForSecure('CD')
+ expected = dns.rrset.from_text('ns1.example.', 0, dns.rdataclass.IN, 'A', '{prefix}.10'.format(prefix=self._PREFIX))
+ res = self.sendUDPQuery(msg, 'off')
+
+ self.assertMessageHasFlags(res, ['QR', 'RA', 'RD'])
+ self.assertRRsetInAnswer(res, expected)
+ self.assertNoRRSIGsInAnswer(res)
+
+ @unittest.skip("See #3682")
+ def testProcess_Secure_CD(self):
+ msg = self.getQueryForSecure('CD')
+ expected = dns.rrset.from_text('ns1.example.', 0, dns.rdataclass.IN, 'A', '{prefix}.10'.format(prefix=self._PREFIX))
+ res = self.sendUDPQuery(msg, 'process')
+
+ self.assertMessageHasFlags(res, ['QR', 'RA', 'RD', 'CD'])
+ self.assertRRsetInAnswer(res, expected)
+ self.assertNoRRSIGsInAnswer(res)
+
+ @unittest.skip("See #3682")
+ def testValidate_Secure_CD(self):
+ msg = self.getQueryForSecure('CD')
+ expected = dns.rrset.from_text('ns1.example.', 0, dns.rdataclass.IN, 'A', '{prefix}.10'.format(prefix=self._PREFIX))
+ res = self.sendUDPQuery(msg, 'validate')
+
+ self.assertMessageHasFlags(res, ['QR', 'RA', 'RD', 'CD'])
+ self.assertRRsetInAnswer(res, expected)
+ self.assertNoRRSIGsInAnswer(res)
+
+
+ ### Bogus
+ def getQueryForBogus(self, flags='', ednsflags=''):
+ return self.createQuery('ted.bogus.example.', 'A', flags, ednsflags)
+
+ ##
+ # -AD -CD -DO
+ ##
+ def testOff_Bogus_None(self):
+ msg = self.getQueryForBogus()
+ res = self.sendUDPQuery(msg, 'off')
+ self.assertMessageHasFlags(res, ['QR', 'RA', 'RD'])
+ self.assertRcodeEqual(res, dns.rcode.NOERROR)
+
+ def testProcess_Bogus_None(self):
+ msg = self.getQueryForBogus()
+ res = self.sendUDPQuery(msg, 'process')
+ self.assertMessageHasFlags(res, ['QR', 'RA', 'RD'])
+ self.assertRcodeEqual(res, dns.rcode.NOERROR)
+
+ def testValidate_Bogus_None(self):
+ msg = self.getQueryForBogus()
+ res = self.sendUDPQuery(msg, 'validate')
+ self.assertMessageHasFlags(res, ['QR', 'RA', 'RD'])
+ self.assertRcodeEqual(res, dns.rcode.SERVFAIL)
+ self.assertAnswerEmpty(res)
+
+ ##
+ # +AD -CD -DO
+ ##
+ def testOff_Bogus_AD(self):
+ msg = self.getQueryForBogus('AD')
+ res = self.sendUDPQuery(msg, 'off')
+ self.assertMessageHasFlags(res, ['QR', 'RA', 'RD'])
+ self.assertRcodeEqual(res, dns.rcode.NOERROR)
+
+ @unittest.skip("See #3682")
+ def testProcess_Bogus_AD(self):
+ msg = self.getQueryForBogus('AD')
+ res = self.sendUDPQuery(msg, 'process')
+ self.assertMessageHasFlags(res, ['QR', 'RA', 'RD'])
+ # These asserts trigger because of #3682
+ self.assertRcodeEqual(res, dns.rcode.SERVFAIL)
+ self.assertAnswerEmpty(res)
+
+ def testValidate_Bogus_AD(self):
+ msg = self.getQueryForBogus('AD')
+ res = self.sendUDPQuery(msg, 'validate')
+
+ self.assertMessageHasFlags(res, ['RD', 'RA', 'QR'])
+ self.assertRcodeEqual(res, dns.rcode.SERVFAIL)
+ self.assertAnswerEmpty(res)
+
+ ##
+ # +AD -CD +DO
+ ##
+ def testOff_Bogus_ADDO(self):
+ msg = self.getQueryForBogus('AD', 'DO')
+ res = self.sendUDPQuery(msg, 'off')
+
+ self.assertMessageHasFlags(res, ['QR', 'RA', 'RD'])
+ self.assertRcodeEqual(res, dns.rcode.NOERROR)
+
+ def testProcess_Bogus_ADDO(self):
+ msg = self.getQueryForBogus('AD', 'DO')
+ res = self.sendUDPQuery(msg, 'process')
+
+ self.assertMessageHasFlags(res, ['QR', 'RA', 'RD'], ['DO'])
+ # This assert triggers because of #3682
+ self.assertRcodeEqual(res, dns.rcode.SERVFAIL)
+ self.assertAnswerEmpty(res)
+
+ def testValidate_Bogus_ADDO(self):
+ msg = self.getQueryForBogus('AD', 'DO')
+ res = self.sendUDPQuery(msg, 'validate')
+
+ self.assertMessageHasFlags(res, ['QR', 'RA', 'RD'], ['DO'])
+ self.assertRcodeEqual(res, dns.rcode.SERVFAIL)
+ self.assertAnswerEmpty(res)
+ ##
+ # +AD +CD +DO
+ ##
+ def testOff_Bogus_ADDOCD(self):
+ msg = self.getQueryForBogus('AD CD', 'DO')
+ res = self.sendUDPQuery(msg, 'off')
+
+ self.assertMessageHasFlags(res, ['QR', 'RA', 'RD', 'CD'])
+ self.assertRcodeEqual(res, dns.rcode.NOERROR)
+
+ def testProcess_Bogus_ADDOCD(self):
+ msg = self.getQueryForBogus('AD CD', 'DO')
+ expected = dns.rrset.from_text('ted.bogus.example.', 0, dns.rdataclass.IN, 'A', '192.0.2.1')
+ res = self.sendUDPQuery(msg, 'process')
+
+ self.assertRcodeEqual(res, dns.rcode.NOERROR)
+ self.assertMessageHasFlags(res, ['CD', 'QR', 'RA', 'RD'], ['DO'])
+ self.assertMatchingRRSIGInAnswer(res, expected)
+
+ def testValidate_Bogus_ADDOCD(self):
+ msg = self.getQueryForBogus('AD CD', 'DO')
+ expected = dns.rrset.from_text('ted.bogus.example.', 0, dns.rdataclass.IN, 'A', '192.0.2.1')
+ res = self.sendUDPQuery(msg, 'validate')
+
+ self.assertRcodeEqual(res, dns.rcode.NOERROR)
+ self.assertMessageHasFlags(res, ['QR', 'RA', 'RD', 'CD'], ['DO'])
+ self.assertMatchingRRSIGInAnswer(res, expected)
+
+ ##
+ # -AD -CD +DO
+ ##
+ def testOff_Bogus_DO(self):
+ msg = self.getQueryForBogus('', 'DO')
+ res = self.sendUDPQuery(msg, 'off')
+
+ self.assertRcodeEqual(res, dns.rcode.NOERROR)
+ self.assertMessageHasFlags(res, ['QR', 'RA', 'RD'])
+ self.assertNoRRSIGsInAnswer(res)
+
+ @unittest.skip("See #3682")
+ def testProcess_Bogus_DO(self):
+ msg = self.getQueryForBogus('', 'DO')
+ expected = dns.rrset.from_text('ted.bogus.example.', 0, dns.rdataclass.IN, 'A', '192.0.2.1')
+ res = self.sendUDPQuery(msg, 'process')
+
+ self.assertRcodeEqual(res, dns.rcode.NOERROR)
+ self.assertMessageHasFlags(res, ['QR', 'RA', 'RD'], ['DO'])
+ self.assertMatchingRRSIGInAnswer(res, expected)
+
+ def testValidate_Bogus_DO(self):
+ msg = self.getQueryForBogus('', 'DO')
+ res = self.sendUDPQuery(msg, 'validate')
+
+ self.assertRcodeEqual(res, dns.rcode.SERVFAIL)
+ self.assertMessageHasFlags(res, ['QR', 'RA', 'RD'], ['DO'])
+ self.assertAnswerEmpty(res)
+
+ ##
+ # -AD +CD +DO
+ ##
+ @unittest.skip("See #3682")
+ def testOff_Bogus_DOCD(self):
+ msg = self.getQueryForBogus('CD', 'DO')
+ res = self.sendUDPQuery(msg, 'off')
+
+ self.assertRcodeEqual(res, dns.rcode.NOERROR)
+ self.assertMessageHasFlags(res, ['QR', 'RA', 'RD'])
+ self.assertNoRRSIGsInAnswer(res)
+
+ def testProcess_Bogus_DOCD(self):
+ msg = self.getQueryForBogus('CD', 'DO')
+ expected = dns.rrset.from_text('ted.bogus.example.', 0, dns.rdataclass.IN, 'A', '192.0.2.1')
+ res = self.sendUDPQuery(msg, 'process')
+
+ self.assertRcodeEqual(res, dns.rcode.NOERROR)
+ self.assertMessageHasFlags(res, ['QR', 'RA', 'RD', 'CD'], ['DO'])
+ self.assertMatchingRRSIGInAnswer(res, expected)
+
+ def testValidate_Bogus_DOCD(self):
+ msg = self.getQueryForBogus('CD', 'DO')
+ expected = dns.rrset.from_text('ted.bogus.example.', 0, dns.rdataclass.IN, 'A', '192.0.2.1')
+ res = self.sendUDPQuery(msg, 'validate')
+
+ self.assertRcodeEqual(res, dns.rcode.NOERROR)
+ self.assertMessageHasFlags(res, ['QR', 'RA', 'RD', 'CD'], ['DO'])
+ self.assertMatchingRRSIGInAnswer(res, expected)
+
+ ##
+ # -AD +CD -DO
+ ##
+ @unittest.skip("See #3682")
+ def testOff_Bogus_CD(self):
+ msg = self.getQueryForBogus('CD')
+ expected = dns.rrset.from_text('ted.bogus.example.', 0, dns.rdataclass.IN, 'A', '192.0.2.1')
+ res = self.sendUDPQuery(msg, 'off')
+
+ self.assertRcodeEqual(res, dns.rcode.NOERROR)
+ self.assertMessageHasFlags(res, ['QR', 'RA', 'RD'])
+ self.assertRRsetInAnswer(res, expected)
+ self.assertNoRRSIGsInAnswer(res)
+
+ @unittest.skip("See #3682")
+ def testProcess_Bogus_CD(self):
+ msg = self.getQueryForBogus('CD')
+ expected = dns.rrset.from_text('ted.bogus.example.', 0, dns.rdataclass.IN, 'A', '192.0.2.1')
+ res = self.sendUDPQuery(msg, 'process')
+
+ self.assertRcodeEqual(res, dns.rcode.NOERROR)
+ self.assertMessageHasFlags(res, ['QR', 'RA', 'RD', 'CD'])
+ self.assertRRsetInAnswer(res, expected)
+ self.assertNoRRSIGsInAnswer(res)
+
+ @unittest.skip("See #3682")
+ def testValidate_Bogus_CD(self):
+ msg = self.getQueryForBogus('CD')
+ expected = dns.rrset.from_text('ted.bogus.example.', 0, dns.rdataclass.IN, 'A', '192.0.2.1')
+ res = self.sendUDPQuery(msg, 'validate')
+
+ self.assertRcodeEqual(res, dns.rcode.NOERROR)
+ self.assertMessageHasFlags(res, ['QR', 'RA', 'RD', 'CD'])
+ self.assertRRsetInAnswer(res, expected)
+ self.assertNoRRSIGsInAnswer(res)
+
+
+ ## Insecure
+ def getQueryForInsecure(self, flags='', ednsflags=''):
+ return self.createQuery('node1.insecure.example.', 'A', flags, ednsflags)
+
+ ##
+ # -AD -CD -DO
+ ##
+ def testOff_Insecure_None(self):
+ msg = self.getQueryForInsecure()
+ res = self.sendUDPQuery(msg, 'off')
+ self.assertRcodeEqual(res, dns.rcode.NOERROR)
+ self.assertMessageHasFlags(res, ['QR', 'RA', 'RD'])
+ self.assertNoRRSIGsInAnswer(res)
+
+ def testProcess_Insecure_None(self):
+ msg = self.getQueryForInsecure()
+ res = self.sendUDPQuery(msg, 'process')
+ self.assertRcodeEqual(res, dns.rcode.NOERROR)
+ self.assertMessageHasFlags(res, ['QR', 'RA', 'RD'])
+ self.assertNoRRSIGsInAnswer(res)
+
+ def testValidate_Insecure_None(self):
+ msg = self.getQueryForInsecure()
+ res = self.sendUDPQuery(msg, 'validate')
+ self.assertRcodeEqual(res, dns.rcode.NOERROR)
+ self.assertMessageHasFlags(res, ['QR', 'RA', 'RD'])
+ self.assertNoRRSIGsInAnswer(res)
+
+ ##
+ # +AD -CD -DO
+ ##
+ def testOff_Insecure_AD(self):
+ msg = self.getQueryForInsecure('AD')
+ res = self.sendUDPQuery(msg, 'off')
+
+ self.assertMessageHasFlags(res, ['QR', 'RA', 'RD'])
+ self.assertNoRRSIGsInAnswer(res)
+ self.assertRcodeEqual(res, dns.rcode.NOERROR)
+
+ def testProcess_Insecure_AD(self):
+ msg = self.getQueryForInsecure('AD')
+ res = self.sendUDPQuery(msg, 'process')
+
+ self.assertMessageHasFlags(res, ['QR', 'RA', 'RD'])
+ self.assertNoRRSIGsInAnswer(res)
+ self.assertRcodeEqual(res, dns.rcode.NOERROR)
+
+ def testValidate_Insecure_AD(self):
+ msg = self.getQueryForInsecure('AD')
+ res = self.sendUDPQuery(msg, 'validate')
+
+ self.assertMessageHasFlags(res, ['RD', 'RA', 'QR'])
+ self.assertNoRRSIGsInAnswer(res)
+ self.assertRcodeEqual(res, dns.rcode.NOERROR)
+
+ ##
+ # +AD -CD +DO
+ ##
+ def testOff_Insecure_ADDO(self):
+ msg = self.getQueryForInsecure('AD', 'DO')
+ res = self.sendUDPQuery(msg, 'off')
+
+ self.assertMessageHasFlags(res, ['QR', 'RA', 'RD'])
+ self.assertNoRRSIGsInAnswer(res)
+ self.assertRcodeEqual(res, dns.rcode.NOERROR)
+
+ def testProcess_Insecure_ADDO(self):
+ msg = self.getQueryForInsecure('AD', 'DO')
+ res = self.sendUDPQuery(msg, 'process')
+
+ self.assertMessageHasFlags(res, ['QR', 'RA', 'RD'], ['DO'])
+ self.assertNoRRSIGsInAnswer(res)
+ self.assertRcodeEqual(res, dns.rcode.NOERROR)
+
+ def testValidate_Insecure_ADDO(self):
+ msg = self.getQueryForInsecure('AD', 'DO')
+ res = self.sendUDPQuery(msg, 'validate')
+
+ self.assertMessageHasFlags(res, ['QR', 'RA', 'RD'], ['DO'])
+ self.assertNoRRSIGsInAnswer(res)
+ self.assertRcodeEqual(res, dns.rcode.NOERROR)
+
+ ##
+ # +AD +CD +DO
+ ##
+ def testOff_Insecure_ADDOCD(self):
+ msg = self.getQueryForInsecure('AD CD', 'DO')
+ res = self.sendUDPQuery(msg, 'off')
+
+ self.assertMessageHasFlags(res, ['QR', 'RA', 'RD', 'CD'])
+ self.assertNoRRSIGsInAnswer(res)
+ self.assertRcodeEqual(res, dns.rcode.NOERROR)
+
+ def testProcess_Insecure_ADDOCD(self):
+ msg = self.getQueryForInsecure('AD CD', 'DO')
+ res = self.sendUDPQuery(msg, 'process')
+
+ self.assertMessageHasFlags(res, ['CD', 'QR', 'RA', 'RD'], ['DO'])
+ self.assertNoRRSIGsInAnswer(res)
+ self.assertRcodeEqual(res, dns.rcode.NOERROR)
+
+ def testValidate_Insecure_ADDOCD(self):
+ msg = self.getQueryForInsecure('AD CD', 'DO')
+ expected = dns.rrset.from_text('ns1.example.', 0, dns.rdataclass.IN, 'A', '{prefix}.10'.format(prefix=self._PREFIX))
+ res = self.sendUDPQuery(msg, 'validate')
+
+ self.assertMessageHasFlags(res, ['QR', 'RA', 'RD', 'CD'], ['DO'])
+ self.assertNoRRSIGsInAnswer(res)
+ self.assertRcodeEqual(res, dns.rcode.NOERROR)
+
+ ##
+ # -AD -CD +DO
+ ##
+ def testOff_Insecure_DO(self):
+ msg = self.getQueryForInsecure('', 'DO')
+ res = self.sendUDPQuery(msg, 'off')
+
+ self.assertMessageHasFlags(res, ['QR', 'RA', 'RD'])
+ self.assertNoRRSIGsInAnswer(res)
+ self.assertRcodeEqual(res, dns.rcode.NOERROR)
+
+ def testProcess_Insecure_DO(self):
+ msg = self.getQueryForInsecure('', 'DO')
+ res = self.sendUDPQuery(msg, 'process')
+
+ self.assertMessageHasFlags(res, ['QR', 'RA', 'RD'], ['DO'])
+ self.assertNoRRSIGsInAnswer(res)
+ self.assertRcodeEqual(res, dns.rcode.NOERROR)
+
+ def testValidate_Insecure_DO(self):
+ msg = self.getQueryForInsecure('', 'DO')
+ res = self.sendUDPQuery(msg, 'validate')
+
+ self.assertMessageHasFlags(res, ['QR', 'RA', 'RD'], ['DO'])
+ self.assertNoRRSIGsInAnswer(res)
+ self.assertRcodeEqual(res, dns.rcode.NOERROR)
+
+ ##
+ # -AD +CD +DO
+ ##
+ @unittest.skip("See #3682")
+ def testOff_Insecure_DOCD(self):
+ msg = self.getQueryForInsecure('CD', 'DO')
+ res = self.sendUDPQuery(msg, 'off')
+
+ self.assertMessageHasFlags(res, ['QR', 'RA', 'RD'])
+ self.assertNoRRSIGsInAnswer(res)
+ self.assertRcodeEqual(res, dns.rcode.NOERROR)
+
+ def testProcess_Insecure_DOCD(self):
+ msg = self.getQueryForInsecure('CD', 'DO')
+ res = self.sendUDPQuery(msg, 'process')
+
+ self.assertMessageHasFlags(res, ['QR', 'RA', 'RD', 'CD'], ['DO'])
+ self.assertNoRRSIGsInAnswer(res)
+ self.assertRcodeEqual(res, dns.rcode.NOERROR)
+
+ def testValidate_Insecure_DOCD(self):
+ msg = self.getQueryForInsecure('CD', 'DO')
+ res = self.sendUDPQuery(msg, 'validate')
+
+ self.assertMessageHasFlags(res, ['QR', 'RA', 'RD', 'CD'], ['DO'])
+ self.assertNoRRSIGsInAnswer(res)
+ self.assertRcodeEqual(res, dns.rcode.NOERROR)
+
+ ##
+ # -AD +CD -DO
+ ##
+ @unittest.skip("See #3682")
+ def testOff_Insecure_CD(self):
+ msg = self.getQueryForInsecure('CD')
+ res = self.sendUDPQuery(msg, 'off')
+
+ self.assertMessageHasFlags(res, ['QR', 'RA', 'RD'])
+ self.assertNoRRSIGsInAnswer(res)
+ self.assertRcodeEqual(res, dns.rcode.NOERROR)
+
+ def testProcess_Insecure_CD(self):
+ msg = self.getQueryForInsecure('CD')
+ res = self.sendUDPQuery(msg, 'process')
+
+ self.assertMessageHasFlags(res, ['QR', 'RA', 'RD', 'CD'])
+ self.assertNoRRSIGsInAnswer(res)
+ self.assertRcodeEqual(res, dns.rcode.NOERROR)
+
+ def testValidate_Insecure_CD(self):
+ msg = self.getQueryForInsecure('CD')
+ res = self.sendUDPQuery(msg, 'validate')
+
+ self.assertMessageHasFlags(res, ['QR', 'RA', 'RD', 'CD'])
+ self.assertNoRRSIGsInAnswer(res)
+ self.assertRcodeEqual(res, dns.rcode.NOERROR)