]> granicus.if.org Git - icinga2/commitdiff
Add LogstashWriter feature
authorKai Goller <kai.goller@netways.de>
Wed, 15 Jun 2016 13:03:32 +0000 (15:03 +0200)
committerMichael Friedrich <michael.friedrich@icinga.com>
Mon, 20 Mar 2017 13:30:03 +0000 (14:30 +0100)
This adds the UdpSocket class.

refs #4054

14 files changed:
etc/icinga2/features-available/logstash.conf [new file with mode: 0644]
lib/base/CMakeLists.txt
lib/base/dictionary.cpp
lib/base/socket.cpp
lib/base/socket.hpp
lib/base/tcpsocket.cpp
lib/base/tcpsocket.hpp
lib/base/udpsocket.cpp [new file with mode: 0644]
lib/base/udpsocket.hpp [new file with mode: 0644]
lib/perfdata/CMakeLists.txt
lib/perfdata/gelfwriter.hpp
lib/perfdata/logstashwriter.cpp [new file with mode: 0644]
lib/perfdata/logstashwriter.hpp [new file with mode: 0644]
lib/perfdata/logstashwriter.ti [new file with mode: 0644]

diff --git a/etc/icinga2/features-available/logstash.conf b/etc/icinga2/features-available/logstash.conf
new file mode 100644 (file)
index 0000000..6a08f16
--- /dev/null
@@ -0,0 +1,13 @@
+/**
+ * The LogstashWriter type writes check result metrics and
+ * performance data to a TCP or UDP socket.
+ */
+
+library "perfdata"
+
+object LogstashWriter "logstash" {
+  //host = "127.0.0.1"
+  //port = 9201
+  /* default is tcp */
+  //defaultProtocol = true
+}
index 7f362f6cbbdce3f5a1299d728f30c935ea4e27f3..420e3204ca9c481030c4fce95a4e3b32e9d767cd 100644 (file)
@@ -36,7 +36,7 @@ set(base_SOURCES
   function.cpp function.thpp function-script.cpp functionwrapper.cpp scriptglobal.cpp
   scriptutils.cpp serializer.cpp socket.cpp socketevents.cpp socketevents-epoll.cpp socketevents-poll.cpp stacktrace.cpp
   statsfunction.cpp stdiostream.cpp stream.cpp streamlogger.cpp streamlogger.thpp string.cpp string-script.cpp
-  sysloglogger.cpp sysloglogger.thpp tcpsocket.cpp threadpool.cpp timer.cpp
+  sysloglogger.cpp sysloglogger.thpp tcpsocket.cpp udpsocket.cpp threadpool.cpp timer.cpp
   tlsstream.cpp tlsutility.cpp type.cpp typetype-script.cpp unixsocket.cpp utility.cpp value.cpp
   value-operators.cpp workqueue.cpp
 )
index 2594b850dfe2a0dde4ee109b19706d5d59af59f1..1d55f557ef805a3291f29cc5722bbf42d7e5f850 100644 (file)
@@ -45,6 +45,7 @@ Value Dictionary::Get(const String& key) const
        return it->second;
 }
 
+
 /**
  * Retrieves a value from a dictionary.
  *
index 026da03c37e613138fde6cdced98dddbc8aac258..2016cb76d6736b542b4a5a158f7d9b9a88d41e69 100644 (file)
@@ -29,7 +29,7 @@
 #include <socketpair.h>
 
 #ifndef _WIN32
-#      include <poll.h>
+#include <poll.h>
 #endif /* _WIN32 */
 
 using namespace icinga;
@@ -423,3 +423,86 @@ void Socket::SocketPair(SOCKET s[2])
                    << boost::errinfo_errno(errno));
 }
 
+/**
+ * Creates a socket and connects to the specified node and service.
+ *
+ * @param node The node.
+ * @param service The service.
+ * @param protocol The protocol
+ */
+void Socket::Connect(const String& node, const String& service)
+{
+       addrinfo hints;
+       addrinfo *result;
+       int error;
+       const char *func;
+       
+       SocketType();
+       memset(&hints, 0, sizeof(hints));
+       hints.ai_family = AF_UNSPEC;
+       hints.ai_socktype = socktype;
+       hints.ai_protocol = protocol;   
+       int rc = getaddrinfo(node.CStr(), service.CStr(), &hints, &result);
+
+       if (rc != 0) {
+               Log(LogCritical, protocol+"Socket")
+                   << "getaddrinfo() failed with error code " << rc << ", \"" << gai_strerror(rc) << "\"";
+
+               BOOST_THROW_EXCEPTION(socket_error()
+                   << boost::errinfo_api_function("getaddrinfo")
+                   << errinfo_getaddrinfo_error(rc));
+       }
+
+       int fd = INVALID_SOCKET;
+
+       for (addrinfo *info = result; info != NULL; info = info->ai_next) {
+               fd = socket(info->ai_family, info->ai_socktype, info->ai_protocol);
+
+               if (fd == INVALID_SOCKET) {
+#ifdef _WIN32
+                       error = WSAGetLastError();
+#else /* _WIN32 */
+                       error = errno;
+#endif /* _WIN32 */
+                       func = "socket";
+
+                       continue;
+               }
+
+               rc = connect(fd, info->ai_addr, info->ai_addrlen);
+
+               if (rc < 0) {
+#ifdef _WIN32
+                       error = WSAGetLastError();
+#else /* _WIN32 */
+                       error = errno;
+#endif /* _WIN32 */
+                       func = "connect";
+
+                       closesocket(fd);
+
+                       continue;
+               }
+
+               SetFD(fd);
+
+               break;
+       }
+
+       freeaddrinfo(result);
+
+       if (GetFD() == INVALID_SOCKET) {
+               Log(LogCritical, "UdpSocket")
+                   << "Invalid socket: " << Utility::FormatErrorNumber(error);
+
+#ifndef _WIN32
+               BOOST_THROW_EXCEPTION(socket_error()
+                   << boost::errinfo_api_function(func)
+                   << boost::errinfo_errno(error));
+#else /* _WIN32 */
+               BOOST_THROW_EXCEPTION(socket_error()
+                   << boost::errinfo_api_function(func)
+                   << errinfo_win32_error(error));
+#endif /* _WIN32 */
+       }
+}
index 12834233bd308b9e2aaa83b285b39386e14d9170..df5bff12296294e3040723ccd0ad48a1ec3f6b7c 100644 (file)
@@ -51,6 +51,7 @@ public:
        size_t Write(const void *buffer, size_t size);
 
        void Listen(void);
+       void Connect(const String& node, const String& service);
        Socket::Ptr Accept(void);
 
        bool Poll(bool read, bool write, struct timeval *timeout = NULL);
@@ -58,11 +59,15 @@ public:
        void MakeNonBlocking(void);
 
        static void SocketPair(SOCKET s[2]);
-
+       
 protected:
        void SetFD(SOCKET fd);
 
        int GetError(void) const;
+       int socktype; 
+        int protocol;
+
+       virtual void SocketType(){};
 
        mutable boost::mutex m_SocketMutex;
 
index 734f0878d90c4b96241f8fbd2e807a11aa513389..22ff6076cdc1271798a9dd76da96b6c0fd633943 100644 (file)
 
 using namespace icinga;
 
+void TcpSocket::SocketType(){
+        socktype = SOCK_STREAM;
+        protocol = IPPROTO_TCP;
+}
+
 /**
  * Creates a socket and binds it to the specified service.
  *
index 53769c92f7bc7e1020d46727485ef60626481a18..3f4a74a7bde35d11f51b9f82d0518cd03a762974 100644 (file)
@@ -39,7 +39,8 @@ public:
        void Bind(const String& service, int family);
        void Bind(const String& node, const String& service, int family);
 
-       void Connect(const String& node, const String& service);
+private:
+       void SocketType();
 };
 
 }
diff --git a/lib/base/udpsocket.cpp b/lib/base/udpsocket.cpp
new file mode 100644 (file)
index 0000000..2b219c9
--- /dev/null
@@ -0,0 +1,33 @@
+/******************************************************************************
+ * Icinga 2                                                                   *
+ * Copyright (C) 2012-2016 Icinga Development Team (https://www.icinga.org/)  *
+ *                                                                            *
+ * This program is free software; you can redistribute it and/or              *
+ * modify it under the terms of the GNU General Public License                *
+ * as published by the Free Software Foundation; either version 2             *
+ * of the License, or (at your option) any later version.                     *
+ *                                                                            *
+ * This program is distributed in the hope that it will be useful,            *
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of             *
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the              *
+ * GNU General Public License for more details.                               *
+ *                                                                            *
+ * You should have received a copy of the GNU General Public License          *
+ * along with this program; if not, write to the Free Software Foundation     *
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA.             *
+ ******************************************************************************/
+
+#include "base/udpsocket.hpp"
+#include "base/logger.hpp"
+#include "base/utility.hpp"
+#include "base/exception.hpp"
+#include <boost/exception/errinfo_api_function.hpp>
+#include <boost/exception/errinfo_errno.hpp>
+#include <iostream>
+
+using namespace icinga;
+
+void UdpSocket::SocketType(){
+       socktype = SOCK_DGRAM;
+       protocol = IPPROTO_UDP;
+}
diff --git a/lib/base/udpsocket.hpp b/lib/base/udpsocket.hpp
new file mode 100644 (file)
index 0000000..d3b8fc2
--- /dev/null
@@ -0,0 +1,45 @@
+/******************************************************************************
+ * Icinga 2                                                                   *
+ * Copyright (C) 2012-2016 Icinga Development Team (https://www.icinga.org/)  *
+ *                                                                            *
+ * This program is free software; you can redistribute it and/or              *
+ * modify it under the terms of the GNU General Public License                *
+ * as published by the Free Software Foundation; either version 2             *
+ * of the License, or (at your option) any later version.                     *
+ *                                                                            *
+ * This program is distributed in the hope that it will be useful,            *
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of             *
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the              *
+ * GNU General Public License for more details.                               *
+ *                                                                            *
+ * You should have received a copy of the GNU General Public License          *
+ * along with this program; if not, write to the Free Software Foundation     *
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA.             *
+ ******************************************************************************/
+
+#ifndef UDPSOCKET_H
+#define UDPSOCKET_H
+
+#include "base/i2-base.hpp"
+#include "base/socket.hpp"
+
+namespace icinga
+{
+
+/**
+ * A UDP socket.
+ *
+ * @ingroup base
+ */
+class I2_BASE_API UdpSocket : public Socket
+{
+public:
+               DECLARE_PTR_TYPEDEFS(UdpSocket);
+
+private: 
+       void SocketType();      
+};
+}
+
+#endif /* UDPSOCKET_H */
+
index 08920dd3ddb6256669bf35f0bb0719db9cfce81e..f4d86f965898e22f1c83513fcad19d9e15d28a8a 100644 (file)
 
 mkclass_target(gelfwriter.ti gelfwriter.tcpp gelfwriter.thpp)
 mkclass_target(graphitewriter.ti graphitewriter.tcpp graphitewriter.thpp)
+mkclass_target(logstashwriter.ti logstashwriter.tcpp logstashwriter.thpp)
 mkclass_target(influxdbwriter.ti influxdbwriter.tcpp influxdbwriter.thpp)
 mkclass_target(opentsdbwriter.ti opentsdbwriter.tcpp opentsdbwriter.thpp)
 mkclass_target(perfdatawriter.ti perfdatawriter.tcpp perfdatawriter.thpp)
 
 set(perfdata_SOURCES
-  gelfwriter.cpp gelfwriter.thpp graphitewriter.cpp graphitewriter.thpp influxdbwriter.cpp influxdbwriter.thpp opentsdbwriter.cpp opentsdbwriter.thpp perfdatawriter.cpp perfdatawriter.thpp
+  gelfwriter.cpp gelfwriter.thpp graphitewriter.cpp graphitewriter.thpp logstashwriter.cpp logstashwriter.thpp influxdbwriter.cpp influxdbwriter.thpp opentsdbwriter.cpp opentsdbwriter.thpp perfdatawriter.cpp perfdatawriter.thpp
 )
 
 if(ICINGA2_UNITY_BUILD)
@@ -51,6 +52,11 @@ install_if_not_exists(
   ${CMAKE_INSTALL_SYSCONFDIR}/icinga2/features-available
 )
 
+install_if_not_exists(
+  ${PROJECT_SOURCE_DIR}/etc/icinga2/features-available/logstash.conf
+  ${CMAKE_INSTALL_SYSCONFDIR}/icinga2/features-available
+)
+
 install_if_not_exists(
   ${PROJECT_SOURCE_DIR}/etc/icinga2/features-available/influxdb.conf
   ${CMAKE_INSTALL_SYSCONFDIR}/icinga2/features-available
index 26bb1c42692c600b0eeab6f57769255c6be160d8..0b613fb224b0d6e1468068cd912abac5c2acb71f 100644 (file)
@@ -64,3 +64,4 @@ private:
 }
 
 #endif /* GELFWRITER_H */
+
diff --git a/lib/perfdata/logstashwriter.cpp b/lib/perfdata/logstashwriter.cpp
new file mode 100644 (file)
index 0000000..def51d0
--- /dev/null
@@ -0,0 +1,281 @@
+/******************************************************************************
+ * Icinga 2                                                                   *
+ * Copyright (C) 2012-2016 Icinga Development Team (https://www.icinga.org/)  *
+ *                                                                            *
+ * This program is free software; you can redistribute it and/or              *
+ * modify it under the terms of the GNU General Public License                *
+ * as published by the Free Software Foundation; either version 2             *
+ * of the License, or (at your option) any later version.                     *
+ *                                                                            *
+ * This program is distributed in the hope that it will be useful,            *
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of             *
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the              *
+ * GNU General Public License for more details.                               *
+ *                                                                            *
+ * You should have received a copy of the GNU General Public License          *
+ * along with this program; if not, write to the Free Software Foundation     *
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA.             *
+ ******************************************************************************/
+
+#include "perfdata/logstashwriter.hpp"
+#include "perfdata/logstashwriter.tcpp"
+#include "icinga/service.hpp"
+#include "icinga/macroprocessor.hpp"
+#include "icinga/compatutility.hpp"
+#include "icinga/perfdatavalue.hpp"
+
+#include "icinga/notification.hpp"
+#include "base/configtype.hpp"
+#include "base/objectlock.hpp"
+#include "base/logger.hpp"
+#include "base/utility.hpp"
+#include "base/stream.hpp"
+#include "base/networkstream.hpp"
+
+#include "base/json.hpp"
+#include "base/context.hpp"
+#include <boost/foreach.hpp>
+#include <boost/algorithm/string/replace.hpp>
+#include <string>
+
+using namespace icinga;
+
+REGISTER_TYPE(LogstashWriter);
+
+
+void LogstashWriter::Start(bool runtimeCreated)
+{
+        ObjectImpl<LogstashWriter>::Start(runtimeCreated);
+
+        m_ReconnectTimer = new Timer();
+        m_ReconnectTimer->SetInterval(10);
+        m_ReconnectTimer->OnTimerExpired.connect(boost::bind(&LogstashWriter::ReconnectTimerHandler, this));
+        m_ReconnectTimer->Start();
+        m_ReconnectTimer->Reschedule(0);
+
+        // Send check results
+        Service::OnNewCheckResult.connect(boost::bind(&LogstashWriter::CheckResultHandler, this, _1, _2));
+        // Send notifications
+        Service::OnNotificationSentToUser.connect(boost::bind(&LogstashWriter::NotificationToUserHandler, this, _1, _2, _3, _4, _5, _6, _7, _8));
+        // Send state change
+        Service::OnStateChange.connect(boost::bind(&LogstashWriter::StateChangeHandler, this, _1, _2, _3));
+}
+
+
+void LogstashWriter::ReconnectTimerHandler(void)
+{
+       if (m_Stream)
+               return;
+       Socket::Ptr socket;
+       if(GetDefaultProtocol() == true)
+               socket = new TcpSocket();
+       else
+               socket = new UdpSocket();
+
+        Log(LogNotice, "LogstashWriter")
+            << "Reconnecting to Logstash endpoint '" << GetHost() << "' port '" << GetPort() << "'.";
+
+        try {
+                socket->Connect(GetHost(), GetPort());
+        } catch (const std::exception&) {
+                Log(LogCritical, "LogstashWriter")
+                    << "Can't connect to Logstash endpoint '" << GetHost() << "' port '" << GetPort() << "'.";
+                return;
+        }
+
+        m_Stream = new NetworkStream(socket);
+}
+
+void LogstashWriter::CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
+{
+        CONTEXT("LOGSTASGH Processing check result for '" + checkable->GetName() + "'");
+
+        Log(LogDebug, "LogstashWriter")<< "Logstash Processing check result for '" << checkable->GetName() << "'";
+
+        Host::Ptr host;
+        Service::Ptr service;
+        tie(host, service) = GetHostService(checkable);
+        double ts = cr->GetExecutionEnd();
+
+        Dictionary::Ptr fields = new Dictionary();
+
+        if (service) {
+                fields->Set("service_name", service->GetShortName());
+                fields->Set("service_state", Service::StateToString(service->GetState()));
+                fields->Set("last_state", service->GetLastState());
+                fields->Set("last_hard_state", service->GetLastHardState());
+        } else {
+                fields->Set("last_state", host->GetLastState());
+                fields->Set("last_hard_state", host->GetLastHardState());
+        }
+
+        fields->Set("hostname", host->GetName());
+        fields->Set("type", "CHECK RESULT");
+        fields->Set("state", service ? Service::StateToString(service->GetState()) : Host::StateToString(host->GetState()));
+
+        fields->Set("current_check_attempt", checkable->GetCheckAttempt());
+        fields->Set("max_check_attempts", checkable->GetMaxCheckAttempts());
+
+        fields->Set("latency", cr->CalculateLatency());
+        fields->Set("execution_time", cr->CalculateExecutionTime());
+        fields->Set("reachable",  checkable->IsReachable());
+
+        if (cr) {
+               fields->Set("short_message", CompatUtility::GetCheckResultOutput(cr));
+                fields->Set("full_message", CompatUtility::GetCheckResultLongOutput(cr));
+                fields->Set("check_source", cr->GetCheckSource());
+        }
+
+        if (GetEnableSendPerfdata()) {
+                Array::Ptr perfdata = cr->GetPerformanceData();
+
+                if (perfdata) {
+                        ObjectLock olock(perfdata);
+                        BOOST_FOREACH(const Value& val, perfdata) {
+                                PerfdataValue::Ptr pdv;
+
+                                if (val.IsObjectType<PerfdataValue>())
+                                        pdv = val;
+                                else {
+                                        try {
+                                                pdv = PerfdataValue::Parse(val);
+                                               String escaped_key = pdv->GetLabel();
+                                               boost::replace_all(escaped_key, " ", "_");
+                                                boost::replace_all(escaped_key, ".", "_");
+                                                boost::replace_all(escaped_key, "\\", "_");
+                                                boost::algorithm::replace_all(escaped_key, "::", ".");
+                                               fields->Set(escaped_key, pdv->GetValue());
+
+                                                if (pdv->GetMin())
+                                                        fields->Set(escaped_key + "_min", pdv->GetMin());
+                                                if (pdv->GetMax())
+                                                        fields->Set(escaped_key + "_max", pdv->GetMax());
+                                                if (pdv->GetWarn())
+                                                        fields->Set(escaped_key + "_warn", pdv->GetWarn());
+                                                if (pdv->GetCrit())
+                                                        fields->Set(escaped_key + "_crit", pdv->GetCrit());
+                                        } catch (const std::exception&) {
+                                                Log(LogWarning, "LogstashWriter")
+                                                    << "Ignoring invalid perfdata value: '" << val << "' for object '"
+                                                    << checkable-GetName() << "'.";
+                                        }
+                                }
+                        }
+                }
+        }
+
+        SendLogMessage(ComposeLogstashMessage(fields, GetSource(), ts));
+}
+
+
+void LogstashWriter::NotificationToUserHandler(const Notification::Ptr& notification, const Checkable::Ptr& checkable,
+    const User::Ptr& user, NotificationType notification_type, CheckResult::Ptr const& cr,
+    const String& author, const String& comment_text, const String& command_name)
+{
+        CONTEXT("Logstash Processing notification to all users '" + checkable->GetName() + "'");
+
+        Log(LogDebug, "LogstashWriter")
+            << "Logstash Processing notification for '" << checkable->GetName() << "'";
+
+        Host::Ptr host;
+        Service::Ptr service;
+        tie(host, service) = GetHostService(checkable);
+        double ts = cr->GetExecutionEnd();
+
+        String notification_type_str = Notification::NotificationTypeToString(notification_type);
+
+        String author_comment = "";
+
+        if (notification_type == NotificationCustom || notification_type == NotificationAcknowledgement) {
+                author_comment = author + ";" + comment_text;
+        }
+
+               String output;
+        if (cr)
+                       output = CompatUtility::GetCheckResultOutput(cr);
+
+        Dictionary::Ptr fields = new Dictionary();
+
+        if (service) {
+                fields->Set("type", "SERVICE NOTIFICATION");
+                fields->Set("service", service->GetShortName());
+                fields->Set("short_message", output);
+        } else {
+                fields->Set("type", "HOST NOTIFICATION");
+                fields->Set("short_message", CompatUtility::GetHostStateString(host)+ ")");
+        }
+
+        fields->Set("state", service ? Service::StateToString(service->GetState()) : Host::StateToString(host->GetState()));
+
+        fields->Set("hostname", host->GetName());
+        fields->Set("command", command_name);
+        fields->Set("notification_type", notification_type_str);
+        fields->Set("comment", author_comment);
+
+        SendLogMessage(ComposeLogstashMessage(fields, GetSource(), ts));
+}
+
+void LogstashWriter::StateChangeHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type)
+{
+        CONTEXT("Logstash Processing state change '" + checkable->GetName() + "'");
+
+        Log(LogDebug, "LogstashWriter")
+            << "Logstash Processing state change for '" << checkable->GetName() << "'";
+
+        Host::Ptr host;
+        Service::Ptr service;
+        tie(host, service) = GetHostService(checkable);
+        double ts = cr->GetExecutionEnd();
+
+        Dictionary::Ptr fields = new Dictionary();
+
+        fields->Set("state", service ? Service::StateToString(service->GetState()) : Host::StateToString(host->GetState()));
+        fields->Set("type", "STATE CHANGE");
+        fields->Set("current_check_attempt", checkable->GetCheckAttempt());
+        fields->Set("max_check_attempts", checkable->GetMaxCheckAttempts());
+        fields->Set("hostname", host->GetName());
+
+        if (service) {
+                fields->Set("service_name", service->GetShortName());
+                fields->Set("service_state", Service::StateToString(service->GetState()));
+                fields->Set("last_state", service->GetLastState());
+                fields->Set("last_hard_state", service->GetLastHardState());
+        } else {
+                fields->Set("last_state", host->GetLastState());
+                fields->Set("last_hard_state", host->GetLastHardState());
+        }
+
+        if (cr) {
+                fields->Set("short_message", CompatUtility::GetCheckResultOutput(cr));
+                fields->Set("full_message", CompatUtility::GetCheckResultLongOutput(cr));
+                fields->Set("check_source", cr->GetCheckSource());
+        }
+        SendLogMessage(ComposeLogstashMessage(fields, GetSource(), ts));
+}
+
+String LogstashWriter::ComposeLogstashMessage(const Dictionary::Ptr& fields, const String& source, double ts)
+{
+        fields->Set("version", "1.1");
+        fields->Set("host", source);
+        fields->Set("timestamp", ts);
+        String logstashObj= JsonEncode(fields);
+       return logstashObj+ "\n";
+}
+
+void LogstashWriter::SendLogMessage(const String& message)
+{
+        ObjectLock olock(this);
+
+        if (!m_Stream)
+                return;
+
+        try {
+             m_Stream->Write(&message[0], message.GetLength());
+        } catch (const std::exception& ex) {
+                Log(LogCritical, "LogstashWriter") << "Cannot write to " << 
+               ((GetDefaultProtocol()==true) ? "tcp" : "udp") << " socket on host '" << GetHost() << "' port '" << GetPort() << "'.";
+
+                m_Stream.reset();
+        }
+}
+
diff --git a/lib/perfdata/logstashwriter.hpp b/lib/perfdata/logstashwriter.hpp
new file mode 100644 (file)
index 0000000..65c46f5
--- /dev/null
@@ -0,0 +1,67 @@
+/******************************************************************************
+ * Icinga 2                                                                   *
+ * Copyright (C) 2012-2016 Icinga Development Team (https://www.icinga.org/)  *
+ *                                                                            *
+ * This program is free software; you can redistribute it and/or              *
+ * modify it under the terms of the GNU General Public License                *
+ * as published by the Free Software Foundation; either version 2             *
+ * of the License, or (at your option) any later version.                     *
+ *                                                                            *
+ * This program is distributed in the hope that it will be useful,            *
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of             *
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the              *
+ * GNU General Public License for more details.                               *
+ *                                                                            *
+ * You should have received a copy of the GNU General Public License          *
+ * along with this program; if not, write to the Free Software Foundation     *
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA.             *
+ ******************************************************************************/
+
+#ifndef LOGSTASHWRITER_H
+#define LOGSTASHWRITER_H
+
+#include "perfdata/logstashwriter.thpp"
+#include "icinga/service.hpp"
+#include "base/configobject.hpp"
+#include "base/tcpsocket.hpp"
+#include "base/udpsocket.hpp"
+#include "base/timer.hpp"
+#include <fstream>
+#include <string>
+
+namespace icinga
+{
+
+/**
+ * An Icinga logstash writer.
+ *
+ * @ingroup perfdata
+ */
+class LogstashWriter : public ObjectImpl<LogstashWriter>
+{
+public:
+        DECLARE_OBJECT(LogstashWriter);
+        DECLARE_OBJECTNAME(LogstashWriter);
+
+protected:
+        virtual void Start(bool runtimeCreated) override;
+
+private:
+        Stream::Ptr m_Stream;
+
+        Timer::Ptr m_ReconnectTimer;
+
+        void CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
+        void NotificationToUserHandler(const Notification::Ptr& notification, const Checkable::Ptr& checkable,
+        const User::Ptr& user, NotificationType notification_type, CheckResult::Ptr const& cr,
+        const String& author, const String& comment_text, const String& command_name);
+        void StateChangeHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type);
+        void SendLogMessage(const String& message);
+       String ComposeLogstashMessage(const Dictionary::Ptr& fields, const String& source, double ts);
+
+        void ReconnectTimerHandler(void);
+};
+
+}
+
+#endif /* LOGSTASHWRITER_H */
diff --git a/lib/perfdata/logstashwriter.ti b/lib/perfdata/logstashwriter.ti
new file mode 100644 (file)
index 0000000..6d822c9
--- /dev/null
@@ -0,0 +1,50 @@
+/******************************************************************************
+ * Icinga 2                                                                   *
+ * Copyright (C) 2012-2016 Icinga Development Team (https://www.icinga.org/)  *
+ *                                                                            *
+ * This program is free software; you can redistribute it and/or              *
+ * modify it under the terms of the GNU General Public License                *
+ * as published by the Free Software Foundation; either version 2             *
+ * of the License, or (at your option) any later version.                     *
+ *                                                                            *
+ * This program is distributed in the hope that it will be useful,            *
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of             *
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the              *
+ * GNU General Public License for more details.                               *
+ *                                                                            *
+ * You should have received a copy of the GNU General Public License          *
+ * along with this program; if not, write to the Free Software Foundation     *
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA.             *
+ ******************************************************************************/
+
+#include "base/configobject.hpp"
+
+library perfdata;
+
+namespace icinga
+{
+
+class LogstashWriter : ConfigObject
+{
+       [config] String host {
+               default {{{ return "127.0.0.1"; }}}
+       };
+
+       [config] String port {
+               default {{{ return "9201"; }}}
+       };
+       
+       [config] bool defaultProtocol {
+               default {{{ return "true"; }}}
+       };
+
+       [config] String source {
+               default {{{ return "icinga2"; }}}
+       };
+
+       [config] bool enable_send_perfdata {
+               default {{{ return false; }}}
+       };
+};
+
+}