From: Chris Hofstaedtler Date: Tue, 6 Mar 2018 07:40:53 +0000 (+0100) Subject: api tests: fix up for py3k and other cleanup X-Git-Tag: dnsdist-1.3.0~62^2~2 X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=541bb91be67d81ce3cd4475c05c8edd3e87028ef;p=pdns api tests: fix up for py3k and other cleanup --- diff --git a/regression-tests.api/runtests.py b/regression-tests.api/runtests.py index 8332fa465..9d2fb6c60 100755 --- a/regression-tests.api/runtests.py +++ b/regression-tests.api/runtests.py @@ -2,6 +2,7 @@ # # Shell-script style. +from __future__ import print_function import os import requests import shutil @@ -70,7 +71,7 @@ def format_call_args(cmd): def run_check_call(cmd, *args, **kwargs): - print format_call_args(cmd) + print(format_call_args(cmd)) subprocess.check_call(cmd, *args, **kwargs) @@ -86,7 +87,7 @@ tests = [opt.split('=', 1)[1] for opt in tests] daemon = (len(sys.argv) == 2) and sys.argv[1] or None if daemon not in ('authoritative', 'recursor'): - print "Usage: ./runtests (authoritative|recursor)" + print("Usage: ./runtests (authoritative|recursor)") sys.exit(2) daemon = sys.argv[1] @@ -151,11 +152,11 @@ else: # Now run pdns and the tests. -print "Launching server..." -print format_call_args(servercmd) +print("Launching server...") +print(format_call_args(servercmd)) serverproc = subprocess.Popen(servercmd, close_fds=True) -print "Waiting for webserver port to become available..." +print("Waiting for webserver port to become available...") available = False for try_number in range(0, 10): try: @@ -166,15 +167,15 @@ for try_number in range(0, 10): time.sleep(0.5) if not available: - print "Webserver port not reachable after 10 tries, giving up." + print("Webserver port not reachable after 10 tries, giving up.") serverproc.terminate() serverproc.wait() sys.exit(2) -print "Query for example.com/A to create statistic data..." +print("Query for example.com/A to create statistic data...") run_check_call([sdig, "127.0.0.1", str(DNSPORT), "example.com", "A"]) -print "Running tests..." +print("Running tests...") returncode = 0 test_env = {} test_env.update(os.environ) @@ -187,13 +188,13 @@ test_env.update({ }) try: - print "" + print("") run_check_call(["nosetests", "--with-xunit", "-v"] + tests, env=test_env) except subprocess.CalledProcessError as ex: returncode = ex.returncode finally: if wait: - print "Waiting as requested, press ENTER to stop." + print("Waiting as requested, press ENTER to stop.") raw_input() serverproc.terminate() serverproc.wait() diff --git a/regression-tests.api/test_Basics.py b/regression-tests.api/test_Basics.py index a3d4d8136..eca56505d 100644 --- a/regression-tests.api/test_Basics.py +++ b/regression-tests.api/test_Basics.py @@ -20,16 +20,16 @@ class TestBasics(ApiTestCase): print("Sending request") for part in parts: print("Sending %s" % part) - s.sendall(part) + s.sendall(part.encode('ascii')) time.sleep(0.5) resp = s.recv(4096, socket.MSG_WAITALL) s.close() - print "response", repr(resp) + print("response", repr(resp)) status = resp.splitlines(0)[0] - if '400' in status: + if b'400' in status: raise Exception('Got unwanted response: %s' % status) def test_cors(self): @@ -41,4 +41,4 @@ class TestBasics(ApiTestCase): self.assertEquals(r.headers['access-control-allow-headers'], 'Content-Type, X-API-Key') self.assertEquals(r.headers['access-control-allow-methods'], 'GET, POST, PUT, PATCH, DELETE, OPTIONS') - print "response", repr(r.headers) + print("response", repr(r.headers)) diff --git a/regression-tests.api/test_Servers.py b/regression-tests.api/test_Servers.py index 10bd99d88..400a6e47f 100644 --- a/regression-tests.api/test_Servers.py +++ b/regression-tests.api/test_Servers.py @@ -42,7 +42,7 @@ class Servers(ApiTestCase): data = r.json() self.assertIn('uptime', [e['name'] for e in data]) if is_auth(): - print data + print(data) qtype_stats, respsize_stats, queries_stats = None, None, None for elem in data: if elem['type'] == 'MapStatisticItem' and elem['name'] == 'queries-by-qtype': diff --git a/regression-tests.api/test_Zones.py b/regression-tests.api/test_Zones.py index ece4a0df2..6a456d901 100644 --- a/regression-tests.api/test_Zones.py +++ b/regression-tests.api/test_Zones.py @@ -1,3 +1,4 @@ +from __future__ import print_function import json import time import unittest @@ -23,23 +24,23 @@ def get_first_rec(data, qname, qtype): def eq_zone_rrsets(rrsets, expected): data_got = {} data_expected = {} - for type_, expected_records in expected.iteritems(): + for type_, expected_records in expected.items(): type_ = str(type_) data_got[type_] = set() data_expected[type_] = set() uses_name = any(['name' in expected_record for expected_record in expected_records]) # minify + convert received data for rrset in [rrset for rrset in rrsets if rrset['type'] == type_]: - print rrset + print(rrset) for r in rrset['records']: data_got[type_].add((rrset['name'] if uses_name else '@', rrset['type'], r['content'])) # minify expected data for r in expected_records: data_expected[type_].add((r['name'] if uses_name else '@', type_, r['content'])) - print "eq_zone_rrsets: got:" + print("eq_zone_rrsets: got:") pprint(data_got) - print "eq_zone_rrsets: expected:" + print("eq_zone_rrsets: expected:") pprint(data_expected) assert data_got == data_expected, "%r != %r" % (data_got, data_expected) @@ -79,7 +80,7 @@ class AuthZonesHelperMixin(object): del payload[k] else: payload[k] = v - print "sending", payload + print("sending", payload) r = self.session.post( self.url("/api/v1/servers/localhost/zones"), data=json.dumps(payload), @@ -87,7 +88,7 @@ class AuthZonesHelperMixin(object): self.assert_success_json(r) self.assertEquals(r.status_code, 201) reply = r.json() - print "reply", reply + print("reply", reply) return name, payload, reply @@ -120,7 +121,7 @@ class AuthZones(ApiTestCase, AuthZonesHelperMixin): if k in payload: self.assertEquals(data[k], payload[k]) # generated EPOCH serial surely is > fixed serial we passed in - print data + print(data) self.assertGreater(data['serial'], payload['serial']) soa_serial = int(get_first_rec(data, name, 'SOA')['content'].split(' ')[2]) self.assertGreater(soa_serial, payload['serial']) @@ -129,7 +130,7 @@ class AuthZones(ApiTestCase, AuthZonesHelperMixin): def test_create_zone_with_account(self): # soa_edit_api wins over serial name, payload, data = self.create_zone(account='anaccount', serial=10) - print data + print(data) for k in ('account', ): self.assertIn(k, data) if k in payload: @@ -137,12 +138,12 @@ class AuthZones(ApiTestCase, AuthZonesHelperMixin): def test_create_zone_default_soa_edit_api(self): name, payload, data = self.create_zone() - print data + print(data) self.assertEquals(data['soa_edit_api'], 'DEFAULT') def test_create_zone_with_soa_edit(self): name, payload, data = self.create_zone(soa_edit='INCEPTION-INCREMENT', soa_edit_api='SOA-EDIT-INCREASE') - print data + print(data) self.assertEquals(data['soa_edit'], 'INCEPTION-INCREMENT') self.assertEquals(data['soa_edit_api'], 'SOA-EDIT-INCREASE') soa_serial = get_first_rec(data, name, 'SOA')['content'].split(' ')[2] @@ -222,7 +223,7 @@ class AuthZones(ApiTestCase, AuthZonesHelperMixin): 'kind': 'Native', 'nameservers': ['uncanon.example.com'] } - print payload + print(payload) r = self.session.post( self.url("/api/v1/servers/localhost/zones"), data=json.dumps(payload), @@ -236,7 +237,7 @@ class AuthZones(ApiTestCase, AuthZonesHelperMixin): 'name': '', 'kind': 'Native', } - print payload + print(payload) r = self.session.post( self.url("/api/v1/servers/localhost/zones"), data=json.dumps(payload), @@ -268,7 +269,7 @@ class AuthZones(ApiTestCase, AuthZonesHelperMixin): 'kind': 'Native', 'nameservers': ['ns1.example.com.'] } - print payload + print(payload) r = self.session.post( self.url("/api/v1/servers/localhost/zones"), data=json.dumps(payload), @@ -283,7 +284,7 @@ class AuthZones(ApiTestCase, AuthZonesHelperMixin): 'kind': 'Native', 'nameservers': ['ns1.example.com'] } - print payload + print(payload) r = self.session.post( self.url("/api/v1/servers/localhost/zones"), data=json.dumps(payload), @@ -308,7 +309,7 @@ class AuthZones(ApiTestCase, AuthZonesHelperMixin): 'nameservers': ['ns1.example.com.'], 'rrsets': [rrset], } - print payload + print(payload) r = self.session.post( self.url("/api/v1/servers/localhost/zones"), data=json.dumps(payload), @@ -333,7 +334,7 @@ class AuthZones(ApiTestCase, AuthZonesHelperMixin): 'nameservers': ['ns1.example.com.'], 'rrsets': [rrset], } - print payload + print(payload) r = self.session.post( self.url("/api/v1/servers/localhost/zones"), data=json.dumps(payload), @@ -360,7 +361,7 @@ class AuthZones(ApiTestCase, AuthZonesHelperMixin): 'kind': 'Native', 'nameservers': [{'a': 'ns1.example.com'}] # invalid } - print payload + print(payload) r = self.session.post( self.url("/api/v1/servers/localhost/zones"), data=json.dumps(payload), @@ -385,7 +386,7 @@ class AuthZones(ApiTestCase, AuthZonesHelperMixin): keys = r.json() - print keys + print(keys) self.assertEquals(r.status_code, 200) self.assertEquals(len(keys), 1) @@ -436,7 +437,7 @@ class AuthZones(ApiTestCase, AuthZonesHelperMixin): data = r.json() - print data + print(data) self.assertEquals(r.status_code, 200) self.assertEquals(len(data['metadata']), 1) @@ -463,7 +464,7 @@ class AuthZones(ApiTestCase, AuthZonesHelperMixin): data = r.json() - print data + print(data) self.assertEquals(r.status_code, 200) self.assertEquals(len(data['metadata']), 1) @@ -540,17 +541,17 @@ class AuthZones(ApiTestCase, AuthZonesHelperMixin): for k in ('name', 'masters', 'kind'): self.assertIn(k, data) self.assertEquals(data[k], payload[k]) - print "payload:", payload - print "data:", data + print("payload:", payload) + print("data:", data) # Because slave zones don't get a SOA, we need to test that they'll show up in the zone list. r = self.session.get(self.url("/api/v1/servers/localhost/zones")) zonelist = r.json() - print "zonelist:", zonelist + print("zonelist:", zonelist) self.assertIn(payload['name'], [zone['name'] for zone in zonelist]) # Also test that fetching the zone works. r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + data['id'])) data = r.json() - print "zone (fetched):", data + print("zone (fetched):", data) for k in ('name', 'masters', 'kind'): self.assertIn(k, data) self.assertEquals(data[k], payload[k]) @@ -564,21 +565,21 @@ class AuthZones(ApiTestCase, AuthZonesHelperMixin): def test_retrieve_slave_zone(self): name, payload, data = self.create_zone(kind='Slave', nameservers=None, masters=['127.0.0.2']) - print "payload:", payload - print "data:", data + print("payload:", payload) + print("data:", data) r = self.session.put(self.url("/api/v1/servers/localhost/zones/" + data['id'] + "/axfr-retrieve")) data = r.json() - print "status for axfr-retrieve:", data + print("status for axfr-retrieve:", data) self.assertEqual(data['result'], u'Added retrieval request for \'' + payload['name'] + '\' from master 127.0.0.2') def test_notify_master_zone(self): name, payload, data = self.create_zone(kind='Master') - print "payload:", payload - print "data:", data + print("payload:", payload) + print("data:", data) r = self.session.put(self.url("/api/v1/servers/localhost/zones/" + data['id'] + "/notify")) data = r.json() - print "status for notify:", data + print("status for notify:", data) self.assertEqual(data['result'], 'Notification queued') def test_get_zone_with_symbols(self): @@ -1172,7 +1173,7 @@ fred IN A 192.168.0.4 self.url("/api/v1/servers/localhost/zones/" + name), data=json.dumps(payload), headers={'content-type': 'application/json'}) - print r.content + print(r.content) self.assert_success(r) # succeed so users can fix their wrong, old data def test_zone_delete(self): @@ -1209,7 +1210,7 @@ fred IN A 192.168.0.4 # records are still present data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json() serverset = get_rrset(data, name, 'NS') - print serverset + print(serverset) self.assertNotEquals(serverset['records'], []) self.assertNotEquals(serverset['comments'], []) # verify that modified_at has been set by pdns @@ -1235,7 +1236,7 @@ fred IN A 192.168.0.4 # make sure the NS records are still present data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json() serverset = get_rrset(data, name, 'NS') - print serverset + print(serverset) self.assertNotEquals(serverset['records'], []) self.assertEquals(serverset['comments'], []) @@ -1283,7 +1284,7 @@ fred IN A 192.168.0.4 # make sure the comments still exist data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json() serverset = get_rrset(data, name, 'NS') - print serverset + print(serverset) self.assertEquals(serverset['records'], rrset2['records']) self.assertEquals(serverset['comments'], rrset['comments']) @@ -1306,7 +1307,7 @@ fred IN A 192.168.0.4 self.assertEquals(get_rrset(data, name, 'A')['records'], rrset['records']) r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + revzone)).json() revsets = [s for s in r['rrsets'] if s['type'] == 'PTR'] - print revsets + print(revsets) self.assertEquals(revsets, [{ u'name': u'44.4.2.192.in-addr.arpa.', u'ttl': 3600, @@ -1345,7 +1346,7 @@ fred IN A 192.168.0.4 self.assert_success(r) r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + revzone)).json() revsets = [s for s in r['rrsets'] if s['type'] == 'PTR'] - print revsets + print(revsets) self.assertEquals(revsets, [{ u'name': u'2.0.2.192.in-addr.arpa.', u'ttl': 3600, @@ -1385,7 +1386,7 @@ fred IN A 192.168.0.4 self.assert_success(r) r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + revzone)).json() revsets = [s for s in r['rrsets'] if s['type'] == 'PTR'] - print revsets + print(revsets) self.assertEquals(revsets, [{ u'name': u'a.a.0.0.b.b.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.8.b.d.0.1.0.0.2.ip6.arpa.', u'ttl': 3600, @@ -1404,7 +1405,7 @@ fred IN A 192.168.0.4 self.create_zone(name=name, serial=22, soa_edit_api='') r = self.session.get(self.url("/api/v1/servers/localhost/search-data?q=" + name.rstrip('.'))) self.assert_success_json(r) - print r.json() + print(r.json()) self.assertEquals(r.json(), [ {u'object_type': u'zone', u'name': name, u'zone_id': name}, {u'content': u'a.misconfigured.powerdns.server. hostmaster.'+name+' 22 10800 3600 604800 3600', @@ -1419,25 +1420,27 @@ fred IN A 192.168.0.4 ]) def test_search_rr_substring(self): - name = 'search-rr-zone.name.' + name = unique_zone_name() + search = name[5:-5] self.create_zone(name=name) - r = self.session.get(self.url("/api/v1/servers/localhost/search-data?q=*rr-zone*")) + r = self.session.get(self.url("/api/v1/servers/localhost/search-data?q=*%s*" % search)) self.assert_success_json(r) - print r.json() + print(r.json()) # should return zone, SOA, ns1, ns2 self.assertEquals(len(r.json()), 4) def test_search_rr_case_insensitive(self): - name = 'search-rr-insenszone.name.' + name = unique_zone_name()+'testsuffix.' self.create_zone(name=name) - r = self.session.get(self.url("/api/v1/servers/localhost/search-data?q=*rr-insensZONE*")) + r = self.session.get(self.url("/api/v1/servers/localhost/search-data?q=*testSUFFIX*")) self.assert_success_json(r) - print r.json() + print(r.json()) # should return zone, SOA, ns1, ns2 self.assertEquals(len(r.json()), 4) def test_search_after_rectify_with_ent(self): - name = 'search-rectified.name.' + name = unique_zone_name() + search = name.split('.')[0] rrset = { "name": 'sub.sub.' + name, "type": "A", @@ -1449,9 +1452,9 @@ fred IN A 192.168.0.4 } self.create_zone(name=name, rrsets=[rrset]) pdnsutil_rectify(name) - r = self.session.get(self.url("/api/v1/servers/localhost/search-data?q=*search-rectified*")) + r = self.session.get(self.url("/api/v1/servers/localhost/search-data?q=*%s*" % search)) self.assert_success_json(r) - print r.json() + print(r.json()) # should return zone, SOA, ns1, ns2, sub.sub A (but not the ENT) self.assertEquals(len(r.json()), 5) @@ -1466,7 +1469,7 @@ fred IN A 192.168.0.4 self.url("/api/v1/servers/localhost/zones?rrsets=false"), data=json.dumps(payload), headers={'content-type': 'application/json'}) - print r.json() + print(r.json()) self.assert_success_json(r) self.assertEquals(r.status_code, 201) self.assertEquals(r.json().get('rrsets'), None) @@ -1476,7 +1479,7 @@ fred IN A 192.168.0.4 self.create_zone(name=name, kind='Native') r = self.session.get(self.url("/api/v1/servers/localhost/zones/"+name+"?rrsets=false")) self.assert_success_json(r) - print r.json() + print(r.json()) self.assertEquals(r.json().get('rrsets'), None) def test_rrset_true_parameter(self): @@ -1484,7 +1487,7 @@ fred IN A 192.168.0.4 self.create_zone(name=name, kind='Native') r = self.session.get(self.url("/api/v1/servers/localhost/zones/"+name+"?rrsets=true")) self.assert_success_json(r) - print r.json() + print(r.json()) self.assertEquals(len(r.json().get('rrsets')), 2) def test_wrong_rrset_parameter(self): @@ -1518,13 +1521,13 @@ class AuthRootZone(ApiTestCase, AuthZonesHelperMixin): ) # Regression test: verify zone list works zonelist = self.session.get(self.url("/api/v1/servers/localhost/zones")).json() - print "zonelist:", zonelist + print("zonelist:", zonelist) self.assertIn(payload['name'], [zone['name'] for zone in zonelist]) # Also test that fetching the zone works. - print "id:", data['id'] + print("id:", data['id']) self.assertEquals(data['id'], '=2E') data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + data['id'])).json() - print "zone (fetched):", data + print("zone (fetched):", data) for k in ('name', 'kind'): self.assertIn(k, data) self.assertEquals(data[k], payload[k]) @@ -1599,7 +1602,7 @@ class RecursorZones(ApiTestCase): 'servers': ['8.8.8.8'], 'recursion_desired': False, } - print payload + print(payload) r = self.session.post( self.url("/api/v1/servers/localhost/zones"), data=json.dumps(payload), @@ -1658,7 +1661,7 @@ class RecursorZones(ApiTestCase): self.create_zone(name=name, kind='Native') r = self.session.get(self.url("/api/v1/servers/localhost/search-data?q=" + name)) self.assert_success_json(r) - print r.json() + print(r.json()) self.assertEquals(r.json(), [{u'type': u'zone', u'name': name, u'zone_id': name}]) def test_search_rr_substring(self): @@ -1666,7 +1669,7 @@ class RecursorZones(ApiTestCase): self.create_zone(name=name, kind='Native') r = self.session.get(self.url("/api/v1/servers/localhost/search-data?q=rr-zone")) self.assert_success_json(r) - print r.json() + print(r.json()) # should return zone, SOA self.assertEquals(len(r.json()), 2) diff --git a/regression-tests.api/test_cryptokeys.py b/regression-tests.api/test_cryptokeys.py index 49c4b400d..e2ce4de8b 100644 --- a/regression-tests.api/test_cryptokeys.py +++ b/regression-tests.api/test_cryptokeys.py @@ -3,33 +3,39 @@ import json import unittest import os -from test_helper import ApiTestCase, is_auth +from test_helper import ApiTestCase, is_auth, pdnsutil, unique_zone_name @unittest.skipIf(not is_auth(), "Not applicable") class Cryptokeys(ApiTestCase): - def __init__(self, *args, **kwds): - super(Cryptokeys, self).__init__(*args, **kwds) + def setUp(self): + super(Cryptokeys, self).setUp() self.keyid = 0 - self.zone = "cryptokeys.org" + self.zone = unique_zone_name() + self.zone_nodot = self.zone[:-1] + payload = { + 'name': self.zone, + 'kind': 'Native', + 'nameservers': ['ns1.example.com.', 'ns2.example.com.'] + } + r = self.session.post( + self.url("/api/v1/servers/localhost/zones"), + data=json.dumps(payload), + headers={'content-type': 'application/json'}) + self.assert_success_json(r) + self.assertEquals(r.status_code, 201) def tearDown(self): - super(Cryptokeys,self).tearDown() + super(Cryptokeys, self).tearDown() self.remove_zone_key(self.keyid) # Adding a key to self.zone using the pdnsutil command def add_zone_key(self, status='inactive'): - try: - return subprocess.check_output(["../pdns/pdnsutil", "--config-dir=.", "add-zone-key", self.zone, "ksk", status], stderr=open(os.devnull, 'wb')) - except subprocess.CalledProcessError as e: - self.fail("pdnsutil add-zone-key failed: "+e.output) + return pdnsutil("add-zone-key", self.zone_nodot, "ksk", status) # Removes a key from self.zone by id using the pdnsutil command def remove_zone_key(self, key_id): - try: - subprocess.check_output(["../pdns/pdnsutil", "--config-dir=.", "remove-zone-key", self.zone, str(key_id)]) - except subprocess.CalledProcessError as e: - self.fail("pdnsutil remove-zone-key failed: "+e.output) + return pdnsutil("remove-zone-key", self.zone_nodot, str(key_id)) # This method tests the DELETE api call. def test_delete(self): @@ -38,14 +44,11 @@ class Cryptokeys(ApiTestCase): #checks the status code. I don't know how to test explicit that the backend fail removing a key. r = self.session.delete(self.url("/api/v1/servers/localhost/zones/"+self.zone+"/cryptokeys/"+self.keyid)) self.assertEquals(r.status_code, 200) - self.assertEquals(r.content, "") + self.assertEquals(r.content, b"") # Check that the key is actually deleted - try: - out = subprocess.check_output(["../pdns/pdnsutil", "--config-dir=.", "list-keys", self.zone]) - self.assertNotIn(self.zone, out) - except subprocess.CalledProcessError as e: - self.fail("pdnsutil list-keys failed: " + e.output) + out = pdnsutil("list-keys", self.zone) + self.assertNotIn(self.zone, out) def test_get_wrong_zone(self): self.keyid = self.add_zone_key() @@ -64,25 +67,21 @@ class Cryptokeys(ApiTestCase): #checks for key is gone. Its ok even if no key had to be deleted. Or something went wrong with the backend. r = self.session.delete(self.url("/api/v1/servers/localhost/zones/"+self.zone+"/cryptokeys/"+self.keyid)) self.assertEquals(r.status_code, 200) - self.assertEquals(r.content, "") + self.assertEquals(r.content, b"") # Prepares the json object for Post and sends it to the server - def add_key(self, content='', type='ksk', active='true' , algo='', bits=0): - if algo == '': - payload = { - 'keytype': type, - 'active' : active - } - else: - payload = { - 'keytype': type, - 'active' : active, - 'algorithm' : algo - } - if bits > 0: + def add_key(self, content='', type='ksk', active='true', algo='', bits=None): + payload = { + 'keytype': type, + 'active': active, + } + if algo: + payload['algorithm'] = algo + if bits is not None: payload['bits'] = bits if content != '': payload['content'] = content + print("create key with payload:", payload) r = self.session.post( self.url("/api/v1/servers/localhost/zones/"+self.zone+"/cryptokeys"), data=json.dumps(payload), @@ -91,7 +90,7 @@ class Cryptokeys(ApiTestCase): return r # Test POST for a positive result and delete the added key - def post_helper(self,content='', algo='', bits=0): + def post_helper(self, content='', algo='', bits=None): r = self.add_key(content=content, algo=algo, bits=bits) self.assert_success_json(r) self.assertEquals(r.status_code, 201) @@ -100,11 +99,8 @@ class Cryptokeys(ApiTestCase): self.assertEquals(response['keytype'], 'csk') self.keyid = response['id'] # Check if the key is actually added - try: - out = subprocess.check_output(["../pdns/pdnsutil", "--config-dir=.", "list-keys", self.zone]) - self.assertIn(self.zone, out) - except subprocess.CalledProcessError as e: - self.fail("pdnsutil list-keys failed: " + e.output) + out = pdnsutil("list-keys", self.zone_nodot) + self.assertIn(self.zone_nodot, out) # Test POST to add a key with default algorithm def test_post(self): @@ -198,17 +194,14 @@ class Cryptokeys(ApiTestCase): data=json.dumps(payload), headers={'content-type': 'application/json'}) self.assertEquals(r.status_code, 204) - self.assertEquals(r.content, "") + self.assertEquals(r.content, b"") # check if key is activated - try: - out = subprocess.check_output(["../pdns/pdnsutil", "--config-dir=.", "show-zone", self.zone]) - self.assertIn("Active", out) - except subprocess.CalledProcessError as e: - self.fail("pdnsutil show-zone failed: " + e.output) + out = pdnsutil("show-zone", self.zone_nodot) + self.assertIn("Active", out) def test_put_deactivate_key(self): - self.keyid= self.add_zone_key(status='active') + self.keyid = self.add_zone_key(status='active') # deactivate key payload2 = { 'active': False @@ -219,14 +212,11 @@ class Cryptokeys(ApiTestCase): data=json.dumps(payload2), headers={'content-type': 'application/json'}) self.assertEquals(r.status_code, 204) - self.assertEquals(r.content, "") + self.assertEquals(r.content, b"") # check if key is deactivated - try: - out = subprocess.check_output(["../pdns/pdnsutil", "--config-dir=.", "show-zone", self.zone]) - self.assertIn("Inactive", out) - except subprocess.CalledProcessError as e: - self.fail("pdnsutil show-zone failed: " + e.output) + out = pdnsutil("show-zone", self.zone_nodot) + self.assertIn("Inactive", out) def test_put_deactivate_inactive_key(self): self.keyid = self.add_zone_key() @@ -241,14 +231,11 @@ class Cryptokeys(ApiTestCase): data=json.dumps(payload), headers={'content-type': 'application/json'}) self.assertEquals(r.status_code, 204) - self.assertEquals(r.content, "") + self.assertEquals(r.content, b"") # check if key is still deactivated - try: - out = subprocess.check_output(["../pdns/pdnsutil", "--config-dir=.", "show-zone", self.zone]) - self.assertIn("Inactive", out) - except subprocess.CalledProcessError as e: - self.fail("pdnsutil show-zone failed: " + e.output) + out = pdnsutil("show-zone", self.zone_nodot) + self.assertIn("Inactive", out) def test_put_activate_active_key(self): self.keyid =self.add_zone_key(status='active') @@ -262,11 +249,8 @@ class Cryptokeys(ApiTestCase): data=json.dumps(payload2), headers={'content-type': 'application/json'}) self.assertEquals(r.status_code, 204) - self.assertEquals(r.content, "") + self.assertEquals(r.content, b"") # check if key is activated - try: - out = subprocess.check_output(["../pdns/pdnsutil", "--config-dir=.", "show-zone", self.zone]) - self.assertIn("Active", out) - except subprocess.CalledProcessError as e: - self.fail("pdnsutil show-zone failed: " + e.output) + out = pdnsutil("show-zone", self.zone_nodot) + self.assertIn("Active", out) diff --git a/regression-tests.api/test_helper.py b/regression-tests.api/test_helper.py index 14c2ea4c2..8c1dfd375 100644 --- a/regression-tests.api/test_helper.py +++ b/regression-tests.api/test_helper.py @@ -1,10 +1,16 @@ from datetime import datetime import os import requests -import urlparse import unittest import sqlite3 import subprocess +import sys + +if sys.version_info[0] == 2: + from urlparse import urljoin +else: + from urllib.parse import urljoin + DAEMON = os.environ.get('DAEMON', 'authoritative') PDNSUTIL_CMD = os.environ.get('PDNSUTIL_CMD', 'NOT_SET BUT_THIS MIGHT_BE_A_LIST').split(' ') @@ -22,13 +28,13 @@ class ApiTestCase(unittest.TestCase): self.session.headers = {'X-API-Key': os.environ.get('APIKEY', 'changeme-key'), 'Origin': 'http://%s:%s' % (self.server_address, self.server_port)} def url(self, relative_url): - return urlparse.urljoin(self.server_url, relative_url) + return urljoin(self.server_url, relative_url) def assert_success_json(self, result): try: result.raise_for_status() except: - print result.content + print(result.content) raise self.assertEquals(result.headers['Content-Type'], 'application/json') @@ -40,7 +46,7 @@ class ApiTestCase(unittest.TestCase): try: result.raise_for_status() except: - print result.content + print(result.content) raise @@ -70,10 +76,17 @@ def get_db_records(zonename, qtype): SELECT id FROM domains WHERE name = ? )""", (qtype, zonename.rstrip('.'))).fetchall() recs = [{'name': row[0], 'type': row[1], 'content': row[2], 'ttl': row[3]} for row in rows] - print "DB Records:", recs + print("DB Records:", recs) return recs +def pdnsutil(subcommand, *args): + try: + return subprocess.check_output(PDNSUTIL_CMD + [subcommand] + list(args), close_fds=True).decode('ascii') + except subprocess.CalledProcessError as except_inst: + raise RuntimeError("pdnsutil %s %s failed: %s" % (command, args, except_inst.output.decode('ascii', errors='replace'))) + + def pdnsutil_rectify(zonename): """Run pdnsutil rectify-zone on the given zone.""" - subprocess.check_call(PDNSUTIL_CMD + ['rectify-zone', zonename]) + pdnsutil('rectify-zone', zonename)