import threading
import time
import unittest
+import clientsubnetoption
import dns
import dns.message
import libnacl
def startResponders(cls):
print("Launching responders..")
- cls._UDPResponder = threading.Thread(name='UDP Responder', target=cls.UDPResponder, args=[cls._testServerPort])
+ cls._UDPResponder = threading.Thread(name='UDP Responder', target=cls.UDPResponder, args=[cls._testServerPort, cls._toResponderQueue, cls._fromResponderQueue])
cls._UDPResponder.setDaemon(True)
cls._UDPResponder.start()
- cls._TCPResponder = threading.Thread(name='TCP Responder', target=cls.TCPResponder, args=[cls._testServerPort])
+ cls._TCPResponder = threading.Thread(name='TCP Responder', target=cls.TCPResponder, args=[cls._testServerPort, cls._toResponderQueue, cls._fromResponderQueue])
cls._TCPResponder.setDaemon(True)
cls._TCPResponder.start()
cls._responsesCounter[threading.currentThread().name] = 1
@classmethod
- def _getResponse(cls, request):
+ def _getResponse(cls, request, fromQueue, toQueue):
response = None
if len(request.question) != 1:
print("Skipping query with question count %d" % (len(request.question)))
healthcheck = not str(request.question[0].name).endswith('tests.powerdns.com.')
if not healthcheck:
cls._ResponderIncrementCounter()
- if not cls._toResponderQueue.empty():
- response = cls._toResponderQueue.get(True, cls._queueTimeout)
+ if not fromQueue.empty():
+ response = fromQueue.get(True, cls._queueTimeout)
if response:
response = copy.copy(response)
response.id = request.id
- cls._fromResponderQueue.put(request, True, cls._queueTimeout)
+ toQueue.put(request, True, cls._queueTimeout)
if not response:
# unexpected query, or health check
return response
@classmethod
- def UDPResponder(cls, port, ignoreTrailing=False):
+ def UDPResponder(cls, port, fromQueue, toQueue, ignoreTrailing=False):
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
sock.bind(("127.0.0.1", port))
while True:
data, addr = sock.recvfrom(4096)
request = dns.message.from_wire(data, ignore_trailing=ignoreTrailing)
- response = cls._getResponse(request)
+ response = cls._getResponse(request, fromQueue, toQueue)
if not response:
continue
sock.close()
@classmethod
- def TCPResponder(cls, port, ignoreTrailing=False, multipleResponses=False):
+ def TCPResponder(cls, port, fromQueue, toQueue, ignoreTrailing=False, multipleResponses=False):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
try:
(datalen,) = struct.unpack("!H", data)
data = conn.recv(datalen)
request = dns.message.from_wire(data, ignore_trailing=ignoreTrailing)
- response = cls._getResponse(request)
+ response = cls._getResponse(request, fromQueue, toQueue)
if not response:
conn.close()
conn.send(wire)
while multipleResponses:
- if cls._toResponderQueue.empty():
+ if fromQueue.empty():
break
- response = cls._toResponderQueue.get(True, cls._queueTimeout)
+ response = fromQueue.get(True, cls._queueTimeout)
if not response:
break
data = sock.recv(responseLen)
response = cls._decryptConsole(data, readingNonce)
return response
+
+ def compareOptions(self, a, b):
+ self.assertEquals(len(a), len(b))
+ for idx in xrange(len(a)):
+ self.assertEquals(a[idx], b[idx])
+
+ def checkMessageNoEDNS(self, expected, received):
+ self.assertEquals(expected, received)
+ self.assertEquals(received.edns, -1)
+ self.assertEquals(len(received.options), 0)
+
+ def checkMessageEDNSWithoutECS(self, expected, received, withCookies=0):
+ self.assertEquals(expected, received)
+ self.assertEquals(received.edns, 0)
+ self.assertEquals(len(received.options), withCookies)
+ if withCookies:
+ for option in received.options:
+ self.assertEquals(option.otype, 10)
+
+ def checkMessageEDNSWithECS(self, expected, received):
+ self.assertEquals(expected, received)
+ self.assertEquals(received.edns, 0)
+ self.assertEquals(len(received.options), 1)
+ self.assertEquals(received.options[0].otype, clientsubnetoption.ASSIGNED_OPTION_CODE)
+ self.compareOptions(expected.options, received.options)
+
+ def checkQueryEDNSWithECS(self, expected, received):
+ self.checkMessageEDNSWithECS(expected, received)
+
+ def checkResponseEDNSWithECS(self, expected, received):
+ self.checkMessageEDNSWithECS(expected, received)
+
+ def checkQueryEDNSWithoutECS(self, expected, received):
+ self.checkMessageEDNSWithoutECS(expected, received)
+
+ def checkResponseEDNSWithoutECS(self, expected, received, withCookies=0):
+ self.checkMessageEDNSWithoutECS(expected, received, withCookies)
+
+ def checkQueryNoEDNS(self, expected, received):
+ self.checkMessageNoEDNS(expected, received)
+
+ def checkResponseNoEDNS(self, expected, received):
+ self.checkMessageNoEDNS(expected, received)
def startResponders(cls):
print("Launching responders..")
- cls._UDPResponder = threading.Thread(name='UDP Responder', target=cls.UDPResponder, args=[cls._testServerPort])
+ cls._UDPResponder = threading.Thread(name='UDP Responder', target=cls.UDPResponder, args=[cls._testServerPort, cls._toResponderQueue, cls._fromResponderQueue])
cls._UDPResponder.setDaemon(True)
cls._UDPResponder.start()
- cls._TCPResponder = threading.Thread(name='TCP Responder', target=cls.TCPResponder, args=[cls._testServerPort, False, True])
+ cls._TCPResponder = threading.Thread(name='TCP Responder', target=cls.TCPResponder, args=[cls._testServerPort, cls._toResponderQueue, cls._fromResponderQueue, False, True])
cls._TCPResponder.setDaemon(True)
cls._TCPResponder.start()
import cookiesoption
from dnsdisttests import DNSDistTest
-class TestEdnsClientSubnet(DNSDistTest):
- def compareOptions(self, a, b):
- self.assertEquals(len(a), len(b))
- for idx in xrange(len(a)):
- self.assertEquals(a[idx], b[idx])
-
- def checkMessageNoEDNS(self, expected, received):
- self.assertEquals(expected, received)
- self.assertEquals(received.edns, -1)
- self.assertEquals(len(received.options), 0)
-
- def checkMessageEDNSWithoutECS(self, expected, received, withCookies=0):
- self.assertEquals(expected, received)
- self.assertEquals(received.edns, 0)
- self.assertEquals(len(received.options), withCookies)
- if withCookies:
- for option in received.options:
- self.assertEquals(option.otype, 10)
-
- def checkMessageEDNSWithECS(self, expected, received):
- self.assertEquals(expected, received)
- self.assertEquals(received.edns, 0)
- self.assertEquals(len(received.options), 1)
- self.assertEquals(received.options[0].otype, clientsubnetoption.ASSIGNED_OPTION_CODE)
- self.compareOptions(expected.options, received.options)
-
- def checkQueryEDNSWithECS(self, expected, received):
- self.checkMessageEDNSWithECS(expected, received)
-
- def checkResponseEDNSWithECS(self, expected, received):
- self.checkMessageEDNSWithECS(expected, received)
-
- def checkQueryEDNSWithoutECS(self, expected, received):
- self.checkMessageEDNSWithoutECS(expected, received)
-
- def checkResponseEDNSWithoutECS(self, expected, received, withCookies=0):
- self.checkMessageEDNSWithoutECS(expected, received, withCookies)
-
- def checkQueryNoEDNS(self, expected, received):
- self.checkMessageNoEDNS(expected, received)
-
- def checkResponseNoEDNS(self, expected, received):
- self.checkMessageNoEDNS(expected, received)
-
-class TestEdnsClientSubnetNoOverride(TestEdnsClientSubnet):
+class TestEdnsClientSubnetNoOverride(DNSDistTest):
"""
dnsdist is configured to add the EDNS0 Client Subnet
option, but only if it's not already present in the
self.checkResponseEDNSWithoutECS(expectedResponse, receivedResponse, withCookies=2)
-class TestEdnsClientSubnetOverride(TestEdnsClientSubnet):
+class TestEdnsClientSubnetOverride(DNSDistTest):
"""
dnsdist is configured to add the EDNS0 Client Subnet
option, overwriting any existing value.
self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
self.checkResponseEDNSWithECS(response, receivedResponse)
-class TestECSDisabledByRuleOrLua(TestEdnsClientSubnet):
+class TestECSDisabledByRuleOrLua(DNSDistTest):
"""
dnsdist is configured to add the EDNS0 Client Subnet
option, but we disable it via DisableECSAction()
self.checkQueryNoEDNS(query, receivedQuery)
self.checkResponseNoEDNS(response, receivedResponse)
-class TestECSOverrideSetByRuleOrLua(TestEdnsClientSubnet):
+class TestECSOverrideSetByRuleOrLua(DNSDistTest):
"""
dnsdist is configured to set the EDNS0 Client Subnet
option without overriding an existing one, but we
self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
self.checkResponseEDNSWithECS(response, receivedResponse)
-class TestECSPrefixLengthSetByRuleOrLua(TestEdnsClientSubnet):
+class TestECSPrefixLengthSetByRuleOrLua(DNSDistTest):
"""
dnsdist is configured to set the EDNS0 Client Subnet
option with a prefix length of 24 for IPv4 and 56 for IPv6,
@classmethod
def startResponders(cls):
- cls._UDPResponder = threading.Thread(name='UDP Responder', target=cls.UDPResponder, args=[cls._testServerPort])
+ cls._UDPResponder = threading.Thread(name='UDP Responder', target=cls.UDPResponder, args=[cls._testServerPort, cls._toResponderQueue, cls._fromResponderQueue])
cls._UDPResponder.setDaemon(True)
cls._UDPResponder.start()
- cls._TCPResponder = threading.Thread(name='TCP Responder', target=cls.TCPResponder, args=[cls._testServerPort])
+ cls._TCPResponder = threading.Thread(name='TCP Responder', target=cls.TCPResponder, args=[cls._testServerPort, cls._toResponderQueue, cls._fromResponderQueue])
cls._TCPResponder.setDaemon(True)
cls._TCPResponder.start()
@classmethod
def startResponders(cls):
print("Launching responders..")
- cls._UDPResponder = threading.Thread(name='UDP Responder', target=cls.UDPResponder, args=[cls._testServerPort])
+ cls._UDPResponder = threading.Thread(name='UDP Responder', target=cls.UDPResponder, args=[cls._testServerPort, cls._toResponderQueue, cls._fromResponderQueue])
cls._UDPResponder.setDaemon(True)
cls._UDPResponder.start()
- cls._UDPResponder2 = threading.Thread(name='UDP Responder 2', target=cls.UDPResponder, args=[cls._testServer2Port])
+ cls._UDPResponder2 = threading.Thread(name='UDP Responder 2', target=cls.UDPResponder, args=[cls._testServer2Port, cls._toResponderQueue, cls._fromResponderQueue])
cls._UDPResponder2.setDaemon(True)
cls._UDPResponder2.start()
- cls._TCPResponder = threading.Thread(name='TCP Responder', target=cls.TCPResponder, args=[cls._testServerPort])
+ cls._TCPResponder = threading.Thread(name='TCP Responder', target=cls.TCPResponder, args=[cls._testServerPort, cls._toResponderQueue, cls._fromResponderQueue])
cls._TCPResponder.setDaemon(True)
cls._TCPResponder.start()
- cls._TCPResponder2 = threading.Thread(name='TCP Responder 2', target=cls.TCPResponder, args=[cls._testServer2Port])
+ cls._TCPResponder2 = threading.Thread(name='TCP Responder 2', target=cls.TCPResponder, args=[cls._testServer2Port, cls._toResponderQueue, cls._fromResponderQueue])
cls._TCPResponder2.setDaemon(True)
cls._TCPResponder2.start()
@classmethod
def startResponders(cls):
print("Launching responders..")
- cls._UDPResponder = threading.Thread(name='UDP Responder', target=cls.UDPResponder, args=[cls._testServerPort])
+ cls._UDPResponder = threading.Thread(name='UDP Responder', target=cls.UDPResponder, args=[cls._testServerPort, cls._toResponderQueue, cls._fromResponderQueue])
cls._UDPResponder.setDaemon(True)
cls._UDPResponder.start()
- cls._UDPResponder2 = threading.Thread(name='UDP Responder 2', target=cls.UDPResponder, args=[cls._testServer2Port])
+ cls._UDPResponder2 = threading.Thread(name='UDP Responder 2', target=cls.UDPResponder, args=[cls._testServer2Port, cls._toResponderQueue, cls._fromResponderQueue])
cls._UDPResponder2.setDaemon(True)
cls._UDPResponder2.start()
- cls._TCPResponder = threading.Thread(name='TCP Responder', target=cls.TCPResponder, args=[cls._testServerPort])
+ cls._TCPResponder = threading.Thread(name='TCP Responder', target=cls.TCPResponder, args=[cls._testServerPort, cls._toResponderQueue, cls._fromResponderQueue])
cls._TCPResponder.setDaemon(True)
cls._TCPResponder.start()
- cls._TCPResponder2 = threading.Thread(name='TCP Responder 2', target=cls.TCPResponder, args=[cls._testServer2Port])
+ cls._TCPResponder2 = threading.Thread(name='TCP Responder 2', target=cls.TCPResponder, args=[cls._testServer2Port, cls._toResponderQueue, cls._fromResponderQueue])
cls._TCPResponder2.setDaemon(True)
cls._TCPResponder2.start()
--- /dev/null
+#!/usr/bin/env python
+import base64
+import Queue
+import threading
+import clientsubnetoption
+import dns
+from dnsdisttests import DNSDistTest
+
+class TestTeeAction(DNSDistTest):
+
+ _consoleKey = DNSDistTest.generateConsoleKey()
+ _consoleKeyB64 = base64.b64encode(_consoleKey)
+ _teeServerPort = 5390
+ _toTeeQueue = Queue.Queue()
+ _fromTeeQueue = Queue.Queue()
+ _config_template = """
+ setKey("%s")
+ controlSocket("127.0.0.1:%s")
+ newServer{address="127.0.0.1:%d"}
+ addAction(QTypeRule(dnsdist.A), TeeAction("127.0.0.1:%d", true))
+ addAction(QTypeRule(dnsdist.AAAA), TeeAction("127.0.0.1:%d", false))
+ """
+ _config_params = ['_consoleKeyB64', '_consolePort', '_testServerPort', '_teeServerPort', '_teeServerPort']
+ @classmethod
+ def startResponders(cls):
+ print("Launching responders..")
+
+ cls._UDPResponder = threading.Thread(name='UDP Responder', target=cls.UDPResponder, args=[cls._testServerPort, cls._toResponderQueue, cls._fromResponderQueue])
+ cls._UDPResponder.setDaemon(True)
+ cls._UDPResponder.start()
+
+ cls._TCPResponder = threading.Thread(name='TCP Responder', target=cls.TCPResponder, args=[cls._testServerPort, cls._toResponderQueue, cls._fromResponderQueue, False, True])
+ cls._TCPResponder.setDaemon(True)
+ cls._TCPResponder.start()
+
+ cls._TeeResponder = threading.Thread(name='Tee Responder', target=cls.UDPResponder, args=[cls._teeServerPort, cls._toTeeQueue, cls._fromTeeQueue])
+ cls._TeeResponder.setDaemon(True)
+ cls._TeeResponder.start()
+
+ def testTeeWithECS(self):
+ """
+ TeeAction: ECS
+ """
+ name = 'ecs.tee.tests.powerdns.com.'
+ query = dns.message.make_query(name, 'A', 'IN')
+ response = dns.message.make_response(query)
+
+ rrset = dns.rrset.from_text(name,
+ 3600,
+ dns.rdataclass.IN,
+ dns.rdatatype.A,
+ '192.0.2.1')
+ response.answer.append(rrset)
+
+ numberOfQueries = 10
+ for _ in range(numberOfQueries):
+ # push the response to the Tee server
+ self._toTeeQueue.put(response, True, 2.0)
+
+ (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
+ self.assertTrue(receivedQuery)
+ self.assertTrue(receivedResponse)
+ receivedQuery.id = query.id
+ self.assertEquals(query, receivedQuery)
+ self.assertEquals(response, receivedResponse)
+
+ # retrieve the query from the Tee server
+ teedQuery = self._fromTeeQueue.get(True, 2.0)
+ ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
+ expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, options=[ecso], payload=512)
+ expectedQuery.id = query.id
+ self.checkQueryEDNSWithECS(expectedQuery, teedQuery)
+
+ # check the TeeAction stats
+ stats = self.sendConsoleCommand("getAction(0):printStats()")
+ self.assertEquals(stats, """refuseds\t0
+nxdomains\t0
+noerrors\t%d
+servfails\t0
+recv-errors\t0
+tcp-drops\t0
+responses\t%d
+other-rcode\t0
+send-errors\t0
+queries\t%d
+""" % (numberOfQueries, numberOfQueries, numberOfQueries))
+
+ def testTeeWithoutECS(self):
+ """
+ TeeAction: No ECS
+ """
+ name = 'noecs.tee.tests.powerdns.com.'
+ query = dns.message.make_query(name, 'AAAA', 'IN')
+ response = dns.message.make_response(query)
+
+ rrset = dns.rrset.from_text(name,
+ 3600,
+ dns.rdataclass.IN,
+ dns.rdatatype.AAAA,
+ '2001:DB8::1')
+ response.answer.append(rrset)
+
+ numberOfQueries = 10
+ for _ in range(numberOfQueries):
+ # push the response to the Tee server
+ self._toTeeQueue.put(response, True, 2.0)
+
+ (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
+ self.assertTrue(receivedQuery)
+ self.assertTrue(receivedResponse)
+ receivedQuery.id = query.id
+ self.assertEquals(query, receivedQuery)
+ self.assertEquals(response, receivedResponse)
+
+ # retrieve the query from the Tee server
+ teedQuery = self._fromTeeQueue.get(True, 2.0)
+ ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
+ expectedQuery = dns.message.make_query(name, 'AAAA', 'IN', use_edns=True, options=[ecso], payload=512)
+ expectedQuery.id = query.id
+ self.checkMessageNoEDNS(expectedQuery, teedQuery)
+
+ # check the TeeAction stats
+ stats = self.sendConsoleCommand("getAction(0):printStats()")
+ self.assertEquals(stats, """refuseds\t0
+nxdomains\t0
+noerrors\t%d
+servfails\t0
+recv-errors\t0
+tcp-drops\t0
+responses\t%d
+other-rcode\t0
+send-errors\t0
+queries\t%d
+""" % (numberOfQueries, numberOfQueries, numberOfQueries))
def startResponders(cls):
print("Launching responders..")
- cls._UDPResponder = threading.Thread(name='UDP Responder', target=cls.UDPResponder, args=[cls._testServerPort, True])
+ cls._UDPResponder = threading.Thread(name='UDP Responder', target=cls.UDPResponder, args=[cls._testServerPort, cls._toResponderQueue, cls._fromResponderQueue, True])
cls._UDPResponder.setDaemon(True)
cls._UDPResponder.start()
- cls._TCPResponder = threading.Thread(name='TCP Responder', target=cls.TCPResponder, args=[cls._testServerPort, True])
+
+ cls._TCPResponder = threading.Thread(name='TCP Responder', target=cls.TCPResponder, args=[cls._testServerPort, cls._toResponderQueue, cls._fromResponderQueue, True])
cls._TCPResponder.setDaemon(True)
cls._TCPResponder.start()