--- /dev/null
+#!/usr/bin/env python
+from __future__ import unicode_literals
+
+import os
+import sys
+import time
+
+import utils
+
+
+LS_HOST_COLUMNS = [
+ 'name',
+ 'name',
+ 'display_name',
+ 'display_name',
+ None,
+ 'state',
+ 'state_type',
+ 'current_attempt',
+ 'max_check_attempts',
+ None,
+ 'last_state',
+ None,
+ 'last_state_change',
+ None,
+ 'latency',
+ 'execution_time',
+ 'plugin_output',
+ None,
+ 'last_check',
+ 'address',
+ 'address6'
+ ]
+
+LS_SVC_COLUMNS = [
+ 'description',
+ 'display_name',
+ 'display_name',
+ None,
+ 'state',
+ 'state_type',
+ 'current_attempt',
+ 'max_check_attempts',
+ None,
+ 'last_state',
+ None,
+ 'last_state_change',
+ None,
+ 'latency',
+ 'execution_time',
+ 'plugin_output',
+ 'perf_data',
+ 'last_check',
+ 'host_num_services',
+ 'host_num_services_ok',
+ 'host_num_services_warn',
+ 'host_num_services_unknown',
+ 'host_num_services_crit'
+ ]
+
+STATE_MAP = {
+ 'SOFT': 0,
+ 'HARD': 1
+ }
+
+
+def send_command(command):
+ try:
+ return send_query('COMMAND [{0}] {1}'.format(int(time.time()), command))
+ except utils.LiveStatusError, error:
+ sys.stderr.write('Failed to execute command: {0}\n\n{1}\n'
+ ''.format(command, error))
+
+
+def send_query(query):
+ response = LIVESTATUS.query(query + '\nColumnHeaders: on')
+ if response:
+ header, result = response.pop(0), {}
+ return [dict((header[i], v) for i, v in enumerate(r)) for r in response]
+ return []
+
+
+def get_one(query):
+ return next(iter(send_query(query)), {})
+
+
+def get_event_output():
+ try:
+ with open('/tmp/test_event.out') as f:
+ remove = True
+ return f.read().rstrip().split('|')
+ except (IOError, OSError):
+ remove = False
+ finally:
+ if remove:
+ os.system('sudo rm -f /tmp/test_event.out')
+
+
+def convert_output(value):
+ try:
+ return int(value)
+ except ValueError:
+ try:
+ return float(value)
+ except ValueError:
+ return STATE_MAP.get(value, value)
+
+
+def success(msg):
+ print '[OK] {0}'.format(msg).encode('utf-8')
+
+
+def fail(msg):
+ print '[FAIL] {0}'.format(msg).encode('utf-8')
+
+
+def info(msg):
+ print '[INFO] {0}'.format(msg).encode('utf-8')
+
+
+def main():
+ send_command('CHANGE_HOST_EVENT_HANDLER;localhost;test_event')
+ host_info = get_one('GET hosts\nFilter: name = localhost'
+ '\nColumns: event_handler')
+ if host_info.get('event_handler') != 'test_event':
+ fail('Could not assign eventcommand "test_event" to host "localhost"')
+ return 1
+ success('Successfully assigned an eventcommand to host "localhost"')
+
+ send_command('PROCESS_HOST_CHECK_RESULT;localhost;1;A negative result to'
+ ' trigger an eventhandler|some interesting perfdata!')
+ event_output = get_event_output()
+ if not event_output:
+ send_command('CHANGE_HOST_EVENT_HANDLER;localhost;')
+ fail('Could not trigger the eventcommand')
+ return 1
+ success('Successfully triggered the eventcommand')
+ failure = False
+
+ info('Checking host macros...')
+ host_info = get_one('GET hosts\nFilter: name = localhost\nColumns: {0}'
+ ''.format(' '.join(c for c in LS_HOST_COLUMNS if c)))
+ if event_output[0] != host_info['name']*2:
+ failure = True
+ fail('Escaping environment variables seems not to properly working')
+ fail(' Expected: {0!r} Got: {1!r}'.format(host_info['name']*2,
+ event_output[0]))
+ for i, column in enumerate(LS_HOST_COLUMNS[1:], 1):
+ if column is not None:
+ output_value = convert_output(event_output[i])
+ if output_value != host_info[column]:
+ failure = True
+ fail('{0!r} is unequal to {1!r} ({2})'.format(output_value,
+ host_info[column],
+ column))
+
+ info('Checking service macros...')
+ svc_info = get_one('GET services\nFilter: description = ping4\nColumns: {0}'
+ ''.format(' '.join(c for c in LS_SVC_COLUMNS if c)))
+ for i, column in enumerate(LS_SVC_COLUMNS, len(LS_HOST_COLUMNS)):
+ if column is not None:
+ output_value = convert_output(event_output[i])
+ if output_value != svc_info[column]:
+ failure = True
+ fail('{0!r} is unequal to {1!r} ({2})'.format(output_value,
+ svc_info[column],
+ column))
+
+ info('Checking global macros...')
+
+ info('Checking command macros...')
+ if convert_output(event_output[-1]) != 1337:
+ failure = True
+ fail('The command macro "custom_macro" is not being substituted')
+
+ send_command('DISABLE_HOST_EVENT_HANDLER;localhost')
+ send_command('PROCESS_HOST_CHECK_RESULT;localhost;0;A positive result that'
+ ' should not trigger an eventhandler')
+ if get_event_output():
+ failure = True
+ fail('Could not disable the eventcommand')
+ else:
+ success('Successfully disabled the eventcommand')
+
+ send_command('ENABLE_HOST_EVENT_HANDLER;localhost')
+ host_info = get_one('GET hosts\nFilter: name = localhost'
+ '\nColumns: event_handler_enabled')
+ if host_info['event_handler_enabled'] != 1:
+ failure = True
+ fail('Could not re-enable the eventcommand')
+ else:
+ success('Successfully re-enabled the eventcommand')
+
+ send_command('CHANGE_HOST_EVENT_HANDLER;localhost;')
+ host_info = get_one('GET hosts\nFilter: name = localhost'
+ '\nColumns: event_handler')
+ if host_info['event_handler']:
+ failure = True
+ fail('Could not remove eventcommand "test_event"'
+ ' assigned to host "localhost"')
+ else:
+ success('Successfully removed the eventcommand'
+ ' assigned to host "localhost"')
+
+ return 1 if failure else 0
+
+
+if __name__ == '__main__':
+ with utils.LiveStatusSocket('/var/run/icinga2/cmd/livestatus') as LIVESTATUS:
+ sys.exit(main())