+++ /dev/null
-#!/usr/bin/env python
-from __future__ import unicode_literals
-
-import sys
-import time
-import random
-
-import utils
-
-
-USERNAME = 'Icinga 2 Admin'
-
-
-def send_command(command):
- return LIVESTATUS.query('COMMAND [{0}] {1}'.format(int(time.time()), command))
-
-
-def test_host_comments(hostname):
- print repr(send_command('ADD_HOST_COMMENT;'+hostname+';0;'+USERNAME+';test'))
- return True
-
-
-def test_service_comments(hostname, servicename):
- pass
-
-
-def main():
- failure = False
-
- # Check whether host comments are properly processed
- if not test_host_comments('localhost') or not test_host_comments('nsca-ng'):
- failure = True
-
- return 1 if failure else 0
-
-
-if __name__ == '__main__':
- with utils.LiveStatusSocket('/var/run/icinga2/cmd/livestatus') as LIVESTATUS:
- sys.exit(main())
-
return result
+class LiveStatusError(Exception):
+ pass
+
+
class LiveStatusSocket(object):
options = [
- 'OutputFormat: json',
'KeepAlive: on',
+ 'OutputFormat: json',
'ResponseHeader: fixed16'
]
self.sock.close()
def query(self, command):
- full_command = '\n'.join([command] + self.options)
- self.send(full_command + '\n')
- return self.recv()
+ self.send(command)
+ statuscode, response = self.recv()
+
+ if statuscode != 200:
+ raise LiveStatusError(statuscode, response)
+
+ return response
def send(self, query):
- print repr(query)
- self.sock.sendall(query.encode('utf-8'))
+ full_query = '\n'.join([query] + self.options)
+ self.sock.sendall((full_query + '\n\n').encode('utf-8'))
def recv(self):
+ response = b''
response_header = self.sock.recv(16)
response_code = int(response_header[:3])
response_length = int(response_header[3:].strip())
- return json.loads(self.sock.recv(response_length).decode('utf-8'))
+
+ if response_length > 0:
+ while len(response) < response_length:
+ response += self.sock.recv(response_length - len(response))
+
+ response = response.decode('utf-8')
+
+ try:
+ response = json.loads(response)
+ except ValueError:
+ pass
+
+ return response_code, response