import sys
import time
-from ido_mysql import run_query
+import utils
STATE_OK = 0
def main():
+ run_query = lambda q: utils.run_mysql_query(q, b'/usr/bin/mysql')
+
# We need to wait a bit first as Icinga processes a
# checkresult only if its newer than the last check
query = 'select unix_timestamp(s.last_check) as last_check ' \
from datetime import datetime, timedelta
-
CHECK_INTERVAL = 10 # minutes; The actual interval are 5 minutes but as other
# tests might restart Icinga we need to take any
# rescheduling into account
--- /dev/null
+from __future__ import unicode_literals
+
+import os
+import subprocess
+
+__all__ = ['parse_statusdata', 'run_mysql_query', 'run_pgsql_query']
+
+
+MYSQL_PARAMS = b"-t -D icinga -u icinga --password=icinga -e".split()
+MYSQL_SEPARATOR = '|'
+
+PGSQL_PARAMS = b"-nq -U icinga -d icinga -c".split()
+PGSQL_SEPARATOR = '|'
+PGSQL_ENVIRONMENT = {
+ b'PGPASSWORD': b'icinga'
+ }
+
+
+def parse_statusdata(data, intelligent_cast=True):
+ parsed_data, data_type, type_data = {}, '', {}
+ for line in (l for l in data.split(os.linesep)
+ if l and not l.startswith('#')):
+ if '{' in line:
+ data_type = line.partition('{')[0].strip()
+ elif '}' in line:
+ parsed_data.setdefault(data_type, []).append(type_data)
+ else:
+ key, _, value = line.partition('=')
+
+ if intelligent_cast:
+ value = _cast_status_value(value)
+
+ type_data[key.strip()] = value
+
+ return parsed_data
+
+
+def _cast_status_value(value):
+ try:
+ return int(value)
+ except ValueError:
+ try:
+ return float(value)
+ except ValueError:
+ return value
+
+
+def run_mysql_query(query, path):
+ p = subprocess.Popen([path] + MYSQL_PARAMS + [query.encode('utf-8')],
+ stdout=subprocess.PIPE)
+ return _parse_mysql_result([l.decode('utf-8') for l in p.stdout.readlines()])
+
+
+def _parse_mysql_result(resultset):
+ result, header = [], None
+ for line in (l for l in resultset if MYSQL_SEPARATOR in l):
+ columns = [c.strip() for c in line[1:-3].split(MYSQL_SEPARATOR)]
+ if header is None:
+ header = columns
+ else:
+ result.append(dict((header[i], v) for i, v in enumerate(columns)))
+ return result
+
+
+def run_pgsql_query(query, path):
+ p = subprocess.Popen([path] + PGSQL_PARAMS + [query.encode('utf-8')],
+ stdout=subprocess.PIPE, env=PGSQL_ENVIRONMENT)
+ return _parse_pgsql_result([l.decode('utf-8') for l in p.stdout.readlines()])
+
+
+def _parse_pgsql_result(resultset):
+ result, header = [], None
+ for line in (l for l in resultset if PGSQL_SEPARATOR in l):
+ columns = [c.strip() for c in line.split(PGSQL_SEPARATOR)]
+ if header is None:
+ header = columns
+ else:
+ result.append(dict((header[i], v) for i, v in enumerate(columns)))
+ return result
+
from __future__ import unicode_literals
import sys
-import subprocess
-try:
- import ido_tests
- IDO_TESTS_FOUND = True
-except ImportError:
- IDO_TESTS_FOUND = False
-
-
-MYSQL = b"/usr/bin/mysql"
-PARAMS = b"-t -D icinga -u icinga --password=icinga -e".split()
-SEPARATOR = '|'
-
-
-def run_query(query):
- p = subprocess.Popen([MYSQL] + PARAMS + [query.encode('utf-8')],
- stdout=subprocess.PIPE)
- return parse_result([l.decode('utf-8') for l in p.stdout.readlines()])
-
-
-def parse_result(resultset):
- result, header = [], None
- for line in (l for l in resultset if SEPARATOR in l):
- columns = [c.strip() for c in line[1:-3].split(SEPARATOR)]
- if header is None:
- header = columns
- else:
- result.append(dict((header[i], v) for i, v in enumerate(columns)))
- return result
+import utils
+import ido_tests
def main():
+ run_query = lambda q: utils.run_mysql_query(q, b'/usr/bin/mysql')
+
if not ido_tests.validate_tables([d['Tables_in_icinga']
for d in run_query('show tables')]):
return 1
if __name__ == '__main__':
- if not IDO_TESTS_FOUND:
- raise
-
sys.exit(main())
from __future__ import unicode_literals
import sys
-import subprocess
-try:
- import ido_tests
- IDO_TESTS_FOUND = True
-except ImportError:
- IDO_TESTS_FOUND = False
-
-
-PSQL = b"/usr/bin/psql"
-PARAMS = b"-nq -U icinga -d icinga -c".split()
-SEPARATOR = '|'
-ENVIRONMENT = {
- b'PGPASSWORD': b'icinga'
- }
-
-
-def run_query(query):
- p = subprocess.Popen([PSQL] + PARAMS + [query.encode('utf-8')],
- stdout=subprocess.PIPE, env=ENVIRONMENT)
- return parse_result([l.decode('utf-8') for l in p.stdout.readlines()])
-
-
-def parse_result(resultset):
- result, header = [], None
- for line in (l for l in resultset if SEPARATOR in l):
- columns = [c.strip() for c in line.split(SEPARATOR)]
- if header is None:
- header = columns
- else:
- result.append(dict((header[i], v) for i, v in enumerate(columns)))
- return result
+import utils
+import ido_tests
def main():
+ run_query = lambda q: utils.run_pgsql_query(q, b'/usr/bin/psql')
+
if not ido_tests.validate_tables([d['Name'] for d in run_query('\\dt')
if d['Type'] == 'table']):
return 1
if __name__ == '__main__':
- if not IDO_TESTS_FOUND:
- raise
-
sys.exit(main())
"setups": {
"^ido_[a-z]{2}sql.test$": {
"setup": {
- "copy": ["files/ido_tests.py >> /tmp/ido_tests.py"]
+ "copy": [
+ "files/ido_tests.py >> /tmp/ido_tests.py",
+ "files/utils.py >> /tmp/utils.py"
+ ]
},
"teardown": {
- "clean": ["/tmp/ido_tests.py", "/tmp/ido_tests.pyc"]
+ "clean": [
+ "/tmp/ido_tests.py*",
+ "/tmp/utils.py*"
+ ]
}
},
"checkresult.test": {
"copy": [
"files/configs/checkresult.conf >> /tmp/checkresult.conf",
"files/wait_for_ido.sh >> /tmp/wait_for_ido.sh",
- "ido_mysql.test >> /tmp/ido_mysql.py"
+ "files/utils.py >> /tmp/utils.py"
],
"exec": [
"sudo mv /tmp/checkresult.conf /etc/icinga2/conf.d/",
]
},
"teardown": {
- "clean": ["/tmp/ido_mysql.py*"],
+ "clean": ["/tmp/utils.py*"],
"exec": [
"sudo rm /etc/icinga2/conf.d/checkresult.conf",
"sudo service icinga2 restart",