From: Remi Gacogne Date: Mon, 13 May 2019 10:36:00 +0000 (+0200) Subject: dnsdist: Add a regression test for an invalid DoH query X-Git-Tag: rec-4.2.0-rc1~22^2 X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=472251170d18b2d3c2a5b7bdbf1c17167e2fcca4;p=pdns dnsdist: Add a regression test for an invalid DoH query --- diff --git a/regression-tests.dnsdist/test_DOH.py b/regression-tests.dnsdist/test_DOH.py index f730821ab..69c4ea53f 100644 --- a/regression-tests.dnsdist/test_DOH.py +++ b/regression-tests.dnsdist/test_DOH.py @@ -12,8 +12,11 @@ import pycurl class DNSDistDOHTest(DNSDistTest): @classmethod - def getDOHGetURL(cls, baseurl, query): - wire = query.to_wire() + def getDOHGetURL(cls, baseurl, query, rawQuery=False): + if rawQuery: + wire = query + else: + wire = query.to_wire() param = base64.urlsafe_b64encode(wire).decode('UTF8').rstrip('=') return baseurl + "?dns=" + param @@ -27,8 +30,8 @@ class DNSDistDOHTest(DNSDistTest): return conn @classmethod - def sendDOHQuery(cls, port, servername, baseurl, query, response=None, timeout=2.0, caFile=None, useQueue=True): - url = cls.getDOHGetURL(baseurl, query) + def sendDOHQuery(cls, port, servername, baseurl, query, response=None, timeout=2.0, caFile=None, useQueue=True, rawQuery=False): + url = cls.getDOHGetURL(baseurl, query, rawQuery) conn = cls.openDOHConnection(port, caFile=caFile, timeout=timeout) #conn.setopt(pycurl.VERBOSE, True) conn.setopt(pycurl.URL, url) @@ -220,6 +223,38 @@ class TestDOH(DNSDistDOHTest): (_, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, caFile=self._caCert, query=query, response=None, useQueue=False) self.assertEquals(receivedResponse, expectedResponse) + def testDOHInvalid(self): + """ + DOH: Invalid query + """ + name = 'invalid.doh.tests.powerdns.com.' + invalidQuery = dns.message.make_query(name, 'A', 'IN', use_edns=False) + invalidQuery.id = 0 + # first an invalid query + invalidQuery = invalidQuery.to_wire() + invalidQuery = invalidQuery[:-5] + (_, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, caFile=self._caCert, query=invalidQuery, response=None, useQueue=False, rawQuery=True) + self.assertEquals(receivedResponse, None) + + # and now a valid one + query = dns.message.make_query(name, 'A', 'IN', use_edns=False) + query.id = 0 + expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096) + expectedQuery.id = 0 + response = dns.message.make_response(query) + rrset = dns.rrset.from_text(name, + 3600, + dns.rdataclass.IN, + dns.rdatatype.A, + '127.0.0.1') + response.answer.append(rrset) + (receivedQuery, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, response=response, caFile=self._caCert) + self.assertTrue(receivedQuery) + self.assertTrue(receivedResponse) + receivedQuery.id = expectedQuery.id + self.assertEquals(expectedQuery, receivedQuery) + self.checkQueryEDNSWithoutECS(expectedQuery, receivedQuery) + self.assertEquals(response, receivedResponse) class TestDOHAddingECS(DNSDistDOHTest):