#!/usr/bin/env python
from datetime import datetime, timedelta
import os
+import time
import dns
from dnsdisttests import DNSDistTest
(_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
self.assertTrue(receivedResponse)
self.assertEquals(response, receivedResponse)
+
+class TestAdvancedQPS(DNSDistTest):
+
+ _config_template = """
+ addQPSLimit("qps.advanced.tests.powerdns.com", 10)
+ newServer{address="127.0.0.1:%s"}
+ """
+
+ def testAdvancedQPSLimit(self):
+ """
+ Advanced: QPS Limit
+
+ Send queries to "qps.advanced.tests.powerdns.com."
+ check that dnsdist drops queries when the max QPS has been reached.
+ """
+ maxQPS = 10
+ name = 'qps.advanced.tests.powerdns.com.'
+ query = dns.message.make_query(name, 'A', 'IN')
+ response = dns.message.make_response(query)
+ rrset = dns.rrset.from_text(name,
+ 60,
+ dns.rdataclass.IN,
+ dns.rdatatype.A,
+ '192.0.2.1')
+ response.answer.append(rrset)
+
+ for _ in range(maxQPS):
+ (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
+ receivedQuery.id = query.id
+ self.assertEquals(query, receivedQuery)
+ self.assertEquals(response, receivedResponse)
+
+ # we should now be dropped
+ (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
+ self.assertEquals(receivedResponse, None)
+
+ time.sleep(1)
+
+ # again, over TCP this time
+ for _ in range(maxQPS):
+ (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
+ receivedQuery.id = query.id
+ self.assertEquals(query, receivedQuery)
+ self.assertEquals(response, receivedResponse)
+
+
+ (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
+ self.assertEquals(receivedResponse, None)
+
+class TestAdvancedQPSNone(DNSDistTest):
+
+ _config_template = """
+ addQPSLimit("qpsnone.advanced.tests.powerdns.com", 100)
+ addAction(AllRule(), RCodeAction(5))
+ newServer{address="127.0.0.1:%s"}
+ """
+
+ def testAdvancedQPSNone(self):
+ """
+ Advanced: Not matching QPS returns None, not Allow
+
+ Send queries to "qps.advanced.tests.powerdns.com."
+ check that the rule returns None when the QPS has not been
+ reached, not Allow.
+ """
+ name = 'qpsnone.advanced.tests.powerdns.com.'
+ query = dns.message.make_query(name, 'A', 'IN')
+ expectedResponse = dns.message.make_response(query)
+ expectedResponse.set_rcode(dns.rcode.REFUSED)
+
+ (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
+ self.assertEquals(receivedResponse, expectedResponse)
+
+ (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
+ self.assertEquals(receivedResponse, expectedResponse)