import dns
+import json
import os
+import requests
import socket
import struct
import sys
dns.rrset.from_text('c.example.zone.rpz.', 60, dns.rdataclass.IN, dns.rdatatype.A, '192.0.2.1'),
dns.rrset.from_text('zone.rpz.', 60, dns.rdataclass.IN, dns.rdatatype.SOA, 'ns.zone.rpz. hostmaster.zone.rpz. %d 3600 3600 3600 1' % newSerial)
]
+ elif newSerial == 5:
+ # this one is a bit special, we are answering with a full AXFR
+ records = [
+ dns.rrset.from_text('zone.rpz.', 60, dns.rdataclass.IN, dns.rdatatype.SOA, 'ns.zone.rpz. hostmaster.zone.rpz. %d 3600 3600 3600 1' % newSerial),
+ dns.rrset.from_text('d.example.zone.rpz.', 60, dns.rdataclass.IN, dns.rdatatype.A, '192.0.2.1'),
+ dns.rrset.from_text('zone.rpz.', 60, dns.rdataclass.IN, dns.rdatatype.SOA, 'ns.zone.rpz. hostmaster.zone.rpz. %d 3600 3600 3600 1' % newSerial)
+ ]
+ elif newSerial == 6:
+ # back to IXFR
+ records = [
+ dns.rrset.from_text('zone.rpz.', 60, dns.rdataclass.IN, dns.rdatatype.SOA, 'ns.zone.rpz. hostmaster.zone.rpz. %d 3600 3600 3600 1' % newSerial),
+ dns.rrset.from_text('zone.rpz.', 60, dns.rdataclass.IN, dns.rdatatype.SOA, 'ns.zone.rpz. hostmaster.zone.rpz. %d 3600 3600 3600 1' % oldSerial),
+ dns.rrset.from_text('d.example.zone.rpz.', 60, dns.rdataclass.IN, dns.rdatatype.A, '192.0.2.1'),
+ dns.rrset.from_text('zone.rpz.', 60, dns.rdataclass.IN, dns.rdatatype.SOA, 'ns.zone.rpz. hostmaster.zone.rpz. %d 3600 3600 3600 1' % newSerial),
+ dns.rrset.from_text('e.example.zone.rpz.', 60, dns.rdataclass.IN, dns.rdatatype.A, '192.0.2.1'),
+ dns.rrset.from_text('zone.rpz.', 60, dns.rdataclass.IN, dns.rdatatype.SOA, 'ns.zone.rpz. hostmaster.zone.rpz. %d 3600 3600 3600 1' % newSerial)
+ ]
response.answer = records
return (newSerial, response)
_lua_config_file = """
rpzMaster('127.0.0.1:%d', 'zone.rpz.', { refresh=1 })
""" % (rpzServerPort)
+ _wsPort = 8042
+ _wsTimeout = 2
+ _wsPassword = 'secretpassword'
+ _apiKey = 'secretapikey'
_confdir = 'RPZ'
_config_template = """
-auth-zones=example=configs/%s/example.zone""" % _confdir
-
+auth-zones=example=configs/%s/example.zone
+webserver=yes
+webserver-port=%d
+webserver-address=127.0.0.1
+webserver-password=%s
+api-key=%s
+""" % (_confdir, _wsPort, _wsPassword, _apiKey)
+ _xfrDone = 0
@classmethod
def generateRecursorConfig(cls, confdir):
a 3600 IN A 192.0.2.42
b 3600 IN A 192.0.2.42
c 3600 IN A 192.0.2.42
+d 3600 IN A 192.0.2.42
+e 3600 IN A 192.0.2.42
""".format(soa=cls._SOA))
super(RPZRecursorTest, cls).generateRecursorConfig(confdir)
if currentSerial > serial:
raise AssertionError("Expected serial %d, got %d" % (serial, currentSerial))
if currentSerial == serial:
+ self._xfrDone = self._xfrDone + 1
return
attempts = attempts + 1
raise AssertionError("Waited %d seconds for the serial to be updated to %d but the serial is still %d" % (timeout, serial, currentSerial))
+ def checkRPZStats(self, serial, recordsCount, fullXFRCount, totalXFRCount):
+ headers = {'x-api-key': self._apiKey}
+ url = 'http://127.0.0.1:' + str(self._wsPort) + '/api/v1/servers/localhost/rpzstatistics'
+ r = requests.get(url, headers=headers, timeout=self._wsTimeout)
+ self.assertTrue(r)
+ self.assertEquals(r.status_code, 200)
+ self.assertTrue(r.json())
+ content = r.json()
+ self.assertIn('zone.rpz.', content)
+ zone = content['zone.rpz.']
+ for key in ['last_update', 'records', 'serial', 'transfers_failed', 'transfers_full', 'transfers_success']:
+ self.assertIn(key, zone)
+
+ self.assertEquals(zone['serial'], serial)
+ self.assertEquals(zone['records'], recordsCount)
+ self.assertEquals(zone['transfers_full'], fullXFRCount)
+ self.assertEquals(zone['transfers_success'], totalXFRCount)
+
def testRPZ(self):
# first zone, only a should be blocked
self.waitUntilCorrectSerialIsLoaded(1)
+ self.checkRPZStats(1, 1, 1, self._xfrDone)
self.checkBlocked('a.example.')
self.checkNotBlocked('b.example.')
self.checkNotBlocked('c.example.')
# second zone, a and b should be blocked
self.waitUntilCorrectSerialIsLoaded(2)
+ self.checkRPZStats(2, 2, 1, self._xfrDone)
self.checkBlocked('a.example.')
self.checkBlocked('b.example.')
self.checkNotBlocked('c.example.')
# third zone, only b should be blocked
self.waitUntilCorrectSerialIsLoaded(3)
+ self.checkRPZStats(3, 1, 1, self._xfrDone)
self.checkNotBlocked('a.example.')
self.checkBlocked('b.example.')
self.checkNotBlocked('c.example.')
# fourth zone, only c should be blocked
self.waitUntilCorrectSerialIsLoaded(4)
+ self.checkRPZStats(4, 1, 1, self._xfrDone)
self.checkNotBlocked('a.example.')
self.checkNotBlocked('b.example.')
self.checkBlocked('c.example.')
+
+ # fifth zone, we should get a full AXFR this time, and only d should be blocked
+ self.waitUntilCorrectSerialIsLoaded(5)
+ self.checkRPZStats(5, 1, 2, self._xfrDone)
+ self.checkNotBlocked('a.example.')
+ self.checkNotBlocked('b.example.')
+ self.checkNotBlocked('c.example.')
+ self.checkBlocked('d.example.')
+
+ # sixth zone, only e should be blocked
+ self.waitUntilCorrectSerialIsLoaded(6)
+ self.checkRPZStats(6, 1, 2, self._xfrDone)
+ self.checkNotBlocked('a.example.')
+ self.checkNotBlocked('b.example.')
+ self.checkNotBlocked('c.example.')
+ self.checkNotBlocked('d.example.')
+ self.checkBlocked('e.example.')