]> granicus.if.org Git - pdns/commitdiff
dnsdist: Add a regression test for an invalid DoH query
authorRemi Gacogne <remi.gacogne@powerdns.com>
Mon, 13 May 2019 10:36:00 +0000 (12:36 +0200)
committerRemi Gacogne <remi.gacogne@powerdns.com>
Mon, 13 May 2019 10:36:00 +0000 (12:36 +0200)
regression-tests.dnsdist/test_DOH.py

index f730821abaa00fe6bf967811a13728a1354a43fe..69c4ea53f922a9bef1966fe50e67899989c9945c 100644 (file)
@@ -12,8 +12,11 @@ import pycurl
 class DNSDistDOHTest(DNSDistTest):
 
     @classmethod
-    def getDOHGetURL(cls, baseurl, query):
-        wire = query.to_wire()
+    def getDOHGetURL(cls, baseurl, query, rawQuery=False):
+        if rawQuery:
+            wire = query
+        else:
+            wire = query.to_wire()
         param = base64.urlsafe_b64encode(wire).decode('UTF8').rstrip('=')
         return baseurl + "?dns=" + param
 
@@ -27,8 +30,8 @@ class DNSDistDOHTest(DNSDistTest):
         return conn
 
     @classmethod
-    def sendDOHQuery(cls, port, servername, baseurl, query, response=None, timeout=2.0, caFile=None, useQueue=True):
-        url = cls.getDOHGetURL(baseurl, query)
+    def sendDOHQuery(cls, port, servername, baseurl, query, response=None, timeout=2.0, caFile=None, useQueue=True, rawQuery=False):
+        url = cls.getDOHGetURL(baseurl, query, rawQuery)
         conn = cls.openDOHConnection(port, caFile=caFile, timeout=timeout)
         #conn.setopt(pycurl.VERBOSE, True)
         conn.setopt(pycurl.URL, url)
@@ -220,6 +223,38 @@ class TestDOH(DNSDistDOHTest):
         (_, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, caFile=self._caCert, query=query, response=None, useQueue=False)
         self.assertEquals(receivedResponse, expectedResponse)
 
+    def testDOHInvalid(self):
+        """
+        DOH: Invalid query
+        """
+        name = 'invalid.doh.tests.powerdns.com.'
+        invalidQuery = dns.message.make_query(name, 'A', 'IN', use_edns=False)
+        invalidQuery.id = 0
+        # first an invalid query
+        invalidQuery = invalidQuery.to_wire()
+        invalidQuery = invalidQuery[:-5]
+        (_, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, caFile=self._caCert, query=invalidQuery, response=None, useQueue=False, rawQuery=True)
+        self.assertEquals(receivedResponse, None)
+
+        # and now a valid one
+        query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
+        query.id = 0
+        expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
+        expectedQuery.id = 0
+        response = dns.message.make_response(query)
+        rrset = dns.rrset.from_text(name,
+                                    3600,
+                                    dns.rdataclass.IN,
+                                    dns.rdatatype.A,
+                                    '127.0.0.1')
+        response.answer.append(rrset)
+        (receivedQuery, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, response=response, caFile=self._caCert)
+        self.assertTrue(receivedQuery)
+        self.assertTrue(receivedResponse)
+        receivedQuery.id = expectedQuery.id
+        self.assertEquals(expectedQuery, receivedQuery)
+        self.checkQueryEDNSWithoutECS(expectedQuery, receivedQuery)
+        self.assertEquals(response, receivedResponse)
 
 class TestDOHAddingECS(DNSDistDOHTest):