--- /dev/null
+#!/usr/bin/env python
+from __future__ import unicode_literals
+
+import sys
+import time
+import random
+
+import utils
+
+
+USERNAME = 'Icinga 2 Admin'
+
+
+def send_command(command):
+ return LIVESTATUS.query('COMMAND [{0}] {1}'.format(int(time.time()), command))
+
+
+def test_host_comments(hostname):
+ print repr(send_command('ADD_HOST_COMMENT;'+hostname+';0;'+USERNAME+';test'))
+ return True
+
+
+def test_service_comments(hostname, servicename):
+ pass
+
+
+def main():
+ failure = False
+
+ # Check whether host comments are properly processed
+ if not test_host_comments('localhost') or not test_host_comments('nsca-ng'):
+ failure = True
+
+ return 1 if failure else 0
+
+
+if __name__ == '__main__':
+ with utils.LiveStatusSocket('/var/run/icinga2/cmd/livestatus') as LIVESTATUS:
+ sys.exit(main())
+
from __future__ import unicode_literals
import os
+import json
+import socket
import subprocess
-__all__ = ['parse_statusdata', 'run_mysql_query', 'run_pgsql_query']
+__all__ = ['parse_statusdata', 'run_mysql_query', 'run_pgsql_query',
+ 'LiveStatusSocket']
MYSQL_PARAMS = b"-t -D icinga -u icinga --password=icinga -e".split()
result.append(dict((header[i], v) for i, v in enumerate(columns)))
return result
+
+class LiveStatusSocket(object):
+ options = [
+ 'OutputFormat: json',
+ 'KeepAlive: on',
+ 'ResponseHeader: fixed16'
+ ]
+
+ def __init__(self, path):
+ self.path = path
+
+ def __enter__(self):
+ self.connect()
+ return self
+
+ def __exit__(self, exc_type, exc_value, tb):
+ self.close()
+
+ def connect(self):
+ self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+ self.sock.connect(self.path)
+
+ def close(self):
+ self.sock.shutdown(socket.SHUT_RDWR)
+ self.sock.close()
+
+ def query(self, command):
+ full_command = '\n'.join([command] + self.options)
+ self.send(full_command + '\n')
+ return self.recv()
+
+ def send(self, query):
+ print repr(query)
+ self.sock.sendall(query.encode('utf-8'))
+
+ def recv(self):
+ response_header = self.sock.recv(16)
+ response_code = int(response_header[:3])
+ response_length = int(response_header[3:].strip())
+ return json.loads(self.sock.recv(response_length).decode('utf-8'))
+
"/tmp/wait_for_ido.sh pgsql && rm /tmp/wait_for_ido.sh"
]
}
+ },
+ "external_commands.test": {
+ "setup": {
+ "copy": ["files/utils.py >> /tmp/utils.py"]
+ },
+ "teardown": {
+ "clean": ["/tmp/utils.py*"],
+ "exec": ["sudo service icinga2 restart"]
+ }
}
}
}