// the serial of this SOA record is the serial of the
// zone before the removals and updates of this sequence
if (sr->d_st.serial == masterSOA->d_st.serial) {
+ if (records.size() == 2) {
+ // if the entire update is two SOAs records with the same
+ // serial, this is actually an empty AXFR!
+ return {{remove, records}};
+ }
+
// if it's the final SOA, there is nothing for us to see
break;
}
auto ret = processIXFRRecords(master, zone, records, std::dynamic_pointer_cast<SOARecordContent>(masterSOA));
- BOOST_CHECK_EQUAL(ret.size(), 0);
+ // this is actually an empty AXFR
+ BOOST_CHECK_EQUAL(ret.size(), 1);
+ // nothing in the deletion part then
+ BOOST_CHECK_EQUAL(ret.at(0).first.size(), 0);
+ // and the two SOAs in the addition part
+ BOOST_CHECK_EQUAL(ret.at(0).second.size(), 2);
+ BOOST_CHECK_EQUAL(ret.at(0).second.at(0).d_type, QType(QType::SOA).getCode());
+ BOOST_CHECK_EQUAL(ret.at(0).second.at(1).d_type, QType(QType::SOA).getCode());
}
BOOST_AUTO_TEST_CASE(test_ixfr_invalid_no_records) {
dns.rrset.from_text('drop.example.zone.rpz.', 60, dns.rdataclass.IN, dns.rdatatype.CNAME, 'rpz-drop.'),
dns.rrset.from_text('zone.rpz.', 60, dns.rdataclass.IN, dns.rdatatype.SOA, 'ns.zone.rpz. hostmaster.zone.rpz. %d 3600 3600 3600 1' % newSerial)
]
+ elif newSerial == 8:
+ # this one is a bit special too, we are answering with a full AXFR and the new zone is empty
+ records = [
+ dns.rrset.from_text('zone.rpz.', 60, dns.rdataclass.IN, dns.rdatatype.SOA, 'ns.zone.rpz. hostmaster.zone.rpz. %d 3600 3600 3600 1' % newSerial),
+ dns.rrset.from_text('zone.rpz.', 60, dns.rdataclass.IN, dns.rdatatype.SOA, 'ns.zone.rpz. hostmaster.zone.rpz. %d 3600 3600 3600 1' % newSerial)
+ ]
response.answer = records
return (newSerial, response)
webserver-address=127.0.0.1
webserver-password=%s
api-key=%s
+log-rpz-changes=yes
""" % (_confdir, _wsPort, _wsPassword, _apiKey)
@classmethod
self.assertRcodeEqual(res, dns.rcode.NOERROR)
self.assertEqual(len(res.answer), 0)
- def checkNXD(self, qname, qtype):
+ def checkNXD(self, qname, qtype='A'):
query = dns.message.make_query(qname, qtype, want_dnssec=True)
query.flags |= dns.flags.CD
for method in ("sendUDPQuery", "sendTCPQuery"):
self.checkTruncated('tc.example.')
self.checkDropped('drop.example.')
+ # eighth zone, all entries should be gone
+ self.waitUntilCorrectSerialIsLoaded(8)
+ self.checkRPZStats(8, 0, 3, self._xfrDone)
+ self.checkNotBlocked('a.example.')
+ self.checkNotBlocked('b.example.')
+ self.checkNotBlocked('c.example.')
+ self.checkNotBlocked('d.example.')
+ self.checkNotBlocked('e.example.')
+ self.checkNXD('f.example.')
+ self.checkNXD('tc.example.')
+ self.checkNXD('drop.example.')
+
class RPZFileRecursorTest(RPZRecursorTest):
"""
This test makes sure that we correctly load RPZ zones from a file