zone "example.com" { type master; file "example.com"; };
"""
-# Prepare sqlite DB with a single zone.
-subprocess.check_call(["rm", "-f", SQLITE_DB])
-subprocess.check_call(["make", "-C", "../pdns", "zone2sql"])
-
-with open('../modules/gsqlite3backend/no-dnssec.schema.sqlite3.sql', 'r') as schema_file:
- subprocess.check_call(["sqlite3", SQLITE_DB], stdin=schema_file)
-with open('../modules/gsqlite3backend/dnssec.schema.sqlite3.sql', 'r') as schema_file:
- subprocess.check_call(["sqlite3", SQLITE_DB], stdin=schema_file)
-
-with open('named.conf', 'w') as named_conf:
- named_conf.write(NAMED_CONF_TPL)
-with tempfile.TemporaryFile() as tf:
- p = subprocess.Popen(["../pdns/zone2sql", "--transactions", "--gsqlite", "--named-conf=named.conf"], stdout=tf)
- p.communicate()
- if p.returncode != 0:
- raise Exception("zone2sql failed")
- tf.seek(0, os.SEEK_SET) # rewind
- subprocess.check_call(["sqlite3", SQLITE_DB], stdin=tf)
+daemon = (len(sys.argv) == 2) and sys.argv[1] or None
+if daemon not in ('authoritative', 'recursor'):
+ print "Usage: ./runtests (authoritative|recursor)"
+ sys.exit(2)
+
+daemon = sys.argv[1]
+
+if daemon == 'authoritative':
+
+ # Prepare sqlite DB with a single zone.
+ subprocess.check_call(["rm", "-f", SQLITE_DB])
+ subprocess.check_call(["make", "-C", "../pdns", "zone2sql"])
+
+ with open('../modules/gsqlite3backend/no-dnssec.schema.sqlite3.sql', 'r') as schema_file:
+ subprocess.check_call(["sqlite3", SQLITE_DB], stdin=schema_file)
+ with open('../modules/gsqlite3backend/dnssec.schema.sqlite3.sql', 'r') as schema_file:
+ subprocess.check_call(["sqlite3", SQLITE_DB], stdin=schema_file)
+
+ with open('named.conf', 'w') as named_conf:
+ named_conf.write(NAMED_CONF_TPL)
+ with tempfile.TemporaryFile() as tf:
+ p = subprocess.Popen(["../pdns/zone2sql", "--transactions", "--gsqlite", "--named-conf=named.conf"], stdout=tf)
+ p.communicate()
+ if p.returncode != 0:
+ raise Exception("zone2sql failed")
+ tf.seek(0, os.SEEK_SET) # rewind
+ subprocess.check_call(["sqlite3", SQLITE_DB], stdin=tf)
+
+ pdnscmd = ("../pdns/pdns_server --daemon=no --local-port=5300 --socket-dir=./ --no-shuffle --launch=gsqlite3 --gsqlite3-dnssec --send-root-referral --allow-2136-from=127.0.0.0/8 --experimental-rfc2136=yes --cache-ttl=0 --no-config --gsqlite3-database="+SQLITE_DB+" --experimental-json-interface=yes --webserver=yes --webserver-port="+WEBPORT+" --webserver-address=127.0.0.1 --query-logging --webserver-password="+WEBPASSWORD).split()
+
+else:
+
+ # No preparations for recursor
+ pdnscmd = ("../pdns/pdns_recursor --daemon=no --socket-dir=. --local-port=5555 --experimental-json-interface=yes --webserver=yes --webserver-port="+WEBPORT+" --webserver-address=127.0.0.1 --webserver-password="+WEBPASSWORD).split()
# Now run pdns and the tests.
-print "Launching pdns_server and running tests..."
-pdnsargs = ("--daemon=no --local-port=5300 --socket-dir=./ --no-shuffle --launch=gsqlite3 --gsqlite3-dnssec --send-root-referral --allow-2136-from=127.0.0.0/8 --experimental-rfc2136=yes --cache-ttl=0 --no-config --gsqlite3-database="+SQLITE_DB+" --experimental-json-interface=yes --webserver=yes --webserver-port="+WEBPORT+" --webserver-address=127.0.0.1 --query-logging --webserver-password="+WEBPASSWORD).split()
-pdns = subprocess.Popen(["../pdns/pdns_server"] + pdnsargs, close_fds=True)
+print "Launching pdns..."
+print ' '.join(pdnscmd)
+pdns = subprocess.Popen(pdnscmd, close_fds=True)
+print "Running tests..."
rc = 0
test_env = {}
test_env.update(os.environ)
-test_env.update({'WEBPORT': WEBPORT, 'WEBPASSWORD': WEBPASSWORD})
+test_env.update({'WEBPORT': WEBPORT, 'WEBPASSWORD': WEBPASSWORD, 'DAEMON': daemon})
try:
print ""
import unittest
import requests
-from test_helper import ApiTestCase
+from test_helper import ApiTestCase, isAuth, isRecursor
class Servers(ApiTestCase):
self.assertEquals(data['id'], 'localhost')
self.assertEquals(data['type'], 'Server')
# or 'recursor' for recursors
- self.assertEquals(data['daemon_type'], 'authoritative')
+ if isAuth():
+ daemon_type = 'authoritative'
+ elif isRecursor():
+ daemon_type = 'recursor'
+ self.assertEquals(data['daemon_type'], daemon_type)
def test_ReadConfig(self):
r = self.session.get(self.url("/servers/localhost/config"))
import urlparse
import unittest
+DAEMON = os.environ.get('DAEMON', 'authoritative')
+
class ApiTestCase(unittest.TestCase):
def setUp(self):
# TODO: config
- self.server_url = 'http://127.0.0.1:%s/' % (os.environ.get('WEBPORT','5580'))
+ self.server_url = 'http://127.0.0.1:%s/' % (os.environ.get('WEBPORT', '5580'))
self.session = requests.Session()
- self.session.auth = ('admin', os.environ.get('WEBPASSWORD','changeme'))
+ self.session.auth = ('admin', os.environ.get('WEBPASSWORD', 'changeme'))
def url(self, relative_url):
return urlparse.urljoin(self.server_url, relative_url)
def unique_zone_name():
return 'test-' + datetime.now().strftime('%d%H%S%M%f') + '.org'
+
+
+def isAuth():
+ return (DAEMON == 'authoritative')
+
+
+def isRecursor():
+ return (DAEMON == 'recursor')