]> granicus.if.org Git - icinga2/commitdiff
GelfWriter: Use async work queue and add feature metric stats 5329/head
authorMichael Friedrich <michael.friedrich@icinga.com>
Tue, 6 Jun 2017 17:48:23 +0000 (19:48 +0200)
committerMichael Friedrich <michael.friedrich@icinga.com>
Tue, 6 Jun 2017 17:48:23 +0000 (19:48 +0200)
fixes #4532

lib/perfdata/gelfwriter.cpp
lib/perfdata/gelfwriter.hpp
lib/perfdata/gelfwriter.ti

index afc563c12b34bb934e04a49f22e8d48248173a4f..b9d43bdd9cc0be07a534b71556dfbd0c0120f7b9 100644 (file)
 #include "base/logger.hpp"
 #include "base/utility.hpp"
 #include "base/perfdatavalue.hpp"
+#include "base/application.hpp"
 #include "base/stream.hpp"
 #include "base/networkstream.hpp"
-#include "base/json.hpp"
 #include "base/context.hpp"
+#include "base/exception.hpp"
+#include "base/json.hpp"
+#include "base/statsfunction.hpp"
 #include <boost/algorithm/string/replace.hpp>
 
 using namespace icinga;
 
 REGISTER_TYPE(GelfWriter);
 
+REGISTER_STATSFUNCTION(GelfWriter, &GelfWriter::StatsFunc);
+
+GelfWriter::GelfWriter(void)
+    : m_WorkQueue(10000000, 1)
+{ }
+
+void GelfWriter::OnConfigLoaded(void)
+{
+       ObjectImpl<GelfWriter>::OnConfigLoaded();
+
+       m_WorkQueue.SetName("GelfWriter, " + GetName());
+}
+
+void GelfWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata)
+{
+       Dictionary::Ptr nodes = new Dictionary();
+
+       for (const GelfWriter::Ptr& gelfwriter : ConfigType::GetObjectsByType<GelfWriter>()) {
+               size_t workQueueItems = gelfwriter->m_WorkQueue.GetLength();
+               double workQueueItemRate = gelfwriter->m_WorkQueue.GetTaskCount(60) / 60.0;
+
+               Dictionary::Ptr stats = new Dictionary();
+               stats->Set("work_queue_items", workQueueItems);
+               stats->Set("work_queue_item_rate", workQueueItemRate);
+               stats->Set("connected", gelfwriter->GetConnected());
+               stats->Set("source", gelfwriter->GetSource());
+
+               nodes->Set(gelfwriter->GetName(), stats);
+
+               perfdata->Add(new PerfdataValue("gelfwriter_" + gelfwriter->GetName() + "_work_queue_items", workQueueItems));
+               perfdata->Add(new PerfdataValue("gelfwriter_" + gelfwriter->GetName() + "_work_queue_item_rate", workQueueItemRate));
+       }
+
+       status->Set("gelfwriter", nodes);
+}
+
 void GelfWriter::Start(bool runtimeCreated)
 {
        ObjectImpl<GelfWriter>::Start(runtimeCreated);
@@ -46,18 +85,20 @@ void GelfWriter::Start(bool runtimeCreated)
        Log(LogInformation, "GelfWriter")
            << "'" << GetName() << "' started.";
 
+       /* Register exception handler for WQ tasks. */
+       m_WorkQueue.SetExceptionCallback(boost::bind(&GelfWriter::ExceptionHandler, this, _1));
+
+       /* Timer for reconnecting */
        m_ReconnectTimer = new Timer();
        m_ReconnectTimer->SetInterval(10);
        m_ReconnectTimer->OnTimerExpired.connect(boost::bind(&GelfWriter::ReconnectTimerHandler, this));
        m_ReconnectTimer->Start();
        m_ReconnectTimer->Reschedule(0);
 
-       // Send check results
-       Service::OnNewCheckResult.connect(boost::bind(&GelfWriter::CheckResultHandler, this, _1, _2));
-       // Send notifications
-       Service::OnNotificationSentToUser.connect(boost::bind(&GelfWriter::NotificationToUserHandler, this, _1, _2, _3, _4, _5, _6, _7, _8));
-       // Send state change
-       Service::OnStateChange.connect(boost::bind(&GelfWriter::StateChangeHandler, this, _1, _2, _3));
+       /* Register event handlers. */
+       Checkable::OnNewCheckResult.connect(boost::bind(&GelfWriter::CheckResultHandler, this, _1, _2));
+       Checkable::OnNotificationSentToUser.connect(boost::bind(&GelfWriter::NotificationToUserHandler, this, _1, _2, _3, _4, _5, _6, _7, _8));
+       Checkable::OnStateChange.connect(boost::bind(&GelfWriter::StateChangeHandler, this, _1, _2, _3));
 }
 
 void GelfWriter::Stop(bool runtimeRemoved)
@@ -65,41 +106,98 @@ void GelfWriter::Stop(bool runtimeRemoved)
        Log(LogInformation, "GelfWriter")
            << "'" << GetName() << "' stopped.";
 
+       m_WorkQueue.Join();
+
        ObjectImpl<GelfWriter>::Stop(runtimeRemoved);
 }
 
-void GelfWriter::ReconnectTimerHandler(void)
+void GelfWriter::AssertOnWorkQueue(void)
+{
+       ASSERT(m_WorkQueue.IsWorkerThread());
+}
+
+void GelfWriter::ExceptionHandler(boost::exception_ptr exp)
+{
+       Log(LogCritical, "GelfWriter", "Exception during Graylog Gelf operation: Verify that your backend is operational!");
+
+       Log(LogDebug, "GelfWriter")
+           << "Exception during Graylog Gelf operation: " << DiagnosticInformation(exp);
+
+       if (GetConnected()) {
+               m_Stream->Close();
+
+               SetConnected(false);
+       }
+}
+
+void GelfWriter::Reconnect(void)
 {
-       if (m_Stream)
+       AssertOnWorkQueue();
+
+       double startTime = Utility::GetTime();
+
+       CONTEXT("Reconnecting to Graylog Gelf '" + GetName() + "'");
+
+       SetShouldConnect(true);
+
+       if (GetConnected())
                return;
 
        TcpSocket::Ptr socket = new TcpSocket();
 
        Log(LogNotice, "GelfWriter")
-           << "Reconnecting to GELF endpoint '" << GetHost() << "' port '" << GetPort() << "'.";
+           << "Reconnecting to Graylog Gelf on host '" << GetHost() << "' port '" << GetPort() << "'.";
 
        try {
                socket->Connect(GetHost(), GetPort());
-       } catch (std::exception&) {
+       } catch (const std::exception& ex) {
                Log(LogCritical, "GelfWriter")
-                   << "Can't connect to GELF endpoint '" << GetHost() << "' port '" << GetPort() << "'.";
-               return;
+                   << "Can't connect to Graylog Gelf on host '" << GetHost() << "' port '" << GetPort() << "'.";
+               throw ex;
        }
 
        m_Stream = new NetworkStream(socket);
+
+       SetConnected(true);
+
+       Log(LogInformation, "GelfWriter")
+           << "Finished reconnecting to Graylog Gelf in " << std::setw(2) << Utility::GetTime() - startTime << " second(s).";
+}
+
+void GelfWriter::ReconnectTimerHandler(void)
+{
+       m_WorkQueue.Enqueue(boost::bind(&GelfWriter::Reconnect, this), PriorityNormal);
+}
+
+void GelfWriter::Disconnect(void)
+{
+       AssertOnWorkQueue();
+
+       if (!GetConnected())
+               return;
+
+       m_Stream->Close();
+
+       SetConnected(false);
 }
 
 void GelfWriter::CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
 {
+       m_WorkQueue.Enqueue(boost::bind(&GelfWriter::CheckResultHandlerInternal, this, checkable, cr));
+}
+
+void GelfWriter::CheckResultHandlerInternal(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
+{
+       AssertOnWorkQueue();
+
        CONTEXT("GELF Processing check result for '" + checkable->GetName() + "'");
 
        Log(LogDebug, "GelfWriter")
-           << "GELF Processing check result for '" << checkable->GetName() << "'";
+           << "Processing check result for '" << checkable->GetName() << "'";
 
        Host::Ptr host;
        Service::Ptr service;
        tie(host, service) = GetHostService(checkable);
-       double ts = Utility::GetTime();
 
        Dictionary::Ptr fields = new Dictionary();
 
@@ -122,6 +220,8 @@ void GelfWriter::CheckResultHandler(const Checkable::Ptr& checkable, const Check
 
        fields->Set("_reachable", checkable->IsReachable());
 
+       double ts = Utility::GetTime();
+
        if (cr) {
                fields->Set("_latency", cr->CalculateLatency());
                fields->Set("_execution_time", cr->CalculateExecutionTime());
@@ -175,28 +275,38 @@ void GelfWriter::CheckResultHandler(const Checkable::Ptr& checkable, const Check
 }
 
 void GelfWriter::NotificationToUserHandler(const Notification::Ptr& notification, const Checkable::Ptr& checkable,
-    const User::Ptr& user, NotificationType notification_type, CheckResult::Ptr const& cr,
-    const String& author, const String& comment_text, const String& command_name)
+    const User::Ptr& user, NotificationType notificationType, CheckResult::Ptr const& cr,
+    const String& author, const String& commentText, const String& commandName)
 {
+       m_WorkQueue.Enqueue(boost::bind(&GelfWriter::NotificationToUserHandlerInternal, this,
+           notification, checkable, user, notificationType, cr, author, commentText, commandName));
+}
+
+void GelfWriter::NotificationToUserHandlerInternal(const Notification::Ptr& notification, const Checkable::Ptr& checkable,
+    const User::Ptr& user, NotificationType notificationType, CheckResult::Ptr const& cr,
+    const String& author, const String& commentText, const String& commandName)
+{
+       AssertOnWorkQueue();
+
        CONTEXT("GELF Processing notification to all users '" + checkable->GetName() + "'");
 
        Log(LogDebug, "GelfWriter")
-           << "GELF Processing notification for '" << checkable->GetName() << "'";
+           << "Processing notification for '" << checkable->GetName() << "'";
 
        Host::Ptr host;
        Service::Ptr service;
        tie(host, service) = GetHostService(checkable);
-       double ts = Utility::GetTime();
 
-       String notification_type_str = Notification::NotificationTypeToString(notification_type);
+       String notificationTypeString = Notification::NotificationTypeToString(notificationType);
 
-       String author_comment = "";
+       String authorComment = "";
 
-       if (notification_type == NotificationCustom || notification_type == NotificationAcknowledgement) {
-               author_comment = author + ";" + comment_text;
+       if (notificationType == NotificationCustom || notificationType == NotificationAcknowledgement) {
+               authorComment = author + ";" + commentText;
        }
 
        String output;
+       double ts = Utility::GetTime();
 
        if (cr) {
                output = CompatUtility::GetCheckResultOutput(cr);
@@ -211,30 +321,37 @@ void GelfWriter::NotificationToUserHandler(const Notification::Ptr& notification
                fields->Set("short_message", output);
        } else {
                fields->Set("_type", "HOST NOTIFICATION");
+               //TODO: why?
                fields->Set("short_message", "(" + CompatUtility::GetHostStateString(host) + ")");
        }
 
        fields->Set("_state", service ? Service::StateToString(service->GetState()) : Host::StateToString(host->GetState()));
 
        fields->Set("_hostname", host->GetName());
-       fields->Set("_command", command_name);
-       fields->Set("_notification_type", notification_type_str);
-       fields->Set("_comment", author_comment);
+       fields->Set("_command", commandName);
+       fields->Set("_notification_type", notificationTypeString);
+       fields->Set("_comment", authorComment);
 
        SendLogMessage(ComposeGelfMessage(fields, GetSource(), ts));
 }
 
 void GelfWriter::StateChangeHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type)
 {
+       m_WorkQueue.Enqueue(boost::bind(&GelfWriter::StateChangeHandlerInternal, this, checkable, cr, type));
+}
+
+void GelfWriter::StateChangeHandlerInternal(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type)
+{
+       AssertOnWorkQueue();
+
        CONTEXT("GELF Processing state change '" + checkable->GetName() + "'");
 
        Log(LogDebug, "GelfWriter")
-           << "GELF Processing state change for '" << checkable->GetName() << "'";
+           << "Processing state change for '" << checkable->GetName() << "'";
 
        Host::Ptr host;
        Service::Ptr service;
        tie(host, service) = GetHostService(checkable);
-       double ts = Utility::GetTime();
 
        Dictionary::Ptr fields = new Dictionary();
 
@@ -254,6 +371,8 @@ void GelfWriter::StateChangeHandler(const Checkable::Ptr& checkable, const Check
                fields->Set("_last_hard_state", host->GetLastHardState());
        }
 
+       double ts = Utility::GetTime();
+
        if (cr) {
                fields->Set("short_message", CompatUtility::GetCheckResultOutput(cr));
                fields->Set("full_message", cr->GetOutput());
@@ -273,28 +392,28 @@ String GelfWriter::ComposeGelfMessage(const Dictionary::Ptr& fields, const Strin
        return JsonEncode(fields);
 }
 
-void GelfWriter::SendLogMessage(const String& gelf)
+void GelfWriter::SendLogMessage(const String& gelfMessage)
 {
        std::ostringstream msgbuf;
-       msgbuf << gelf;
+       msgbuf << gelfMessage;
        msgbuf << '\0';
 
        String log = msgbuf.str();
 
        ObjectLock olock(this);
 
-       if (!m_Stream)
+       if (!GetConnected())
                return;
 
        try {
-               //TODO remove
                Log(LogDebug, "GelfWriter")
                    << "Sending '" << log << "'.";
+
                m_Stream->Write(log.CStr(), log.GetLength());
        } catch (const std::exception& ex) {
                Log(LogCritical, "GelfWriter")
                    << "Cannot write to TCP socket on host '" << GetHost() << "' port '" << GetPort() << "'.";
 
-               m_Stream.reset();
+               throw ex;
        }
 }
index 26bb1c42692c600b0eeab6f57769255c6be160d8..9cd7d281171ead533060055d7fb8b7f97fd92363 100644 (file)
 #include "base/configobject.hpp"
 #include "base/tcpsocket.hpp"
 #include "base/timer.hpp"
+#include "base/workqueue.hpp"
 #include <fstream>
 
 namespace icinga
 {
 
 /**
- * An Icinga gelf writer.
+ * An Icinga Gelf writer for Graylog.
  *
  * @ingroup perfdata
  */
@@ -41,24 +42,43 @@ public:
        DECLARE_OBJECT(GelfWriter);
        DECLARE_OBJECTNAME(GelfWriter);
 
+       GelfWriter(void);
+
+       static void StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata);
+
 protected:
+       virtual void OnConfigLoaded(void) override;
        virtual void Start(bool runtimeCreated) override;
        virtual void Stop(bool runtimeRemoved) override;
 
 private:
        Stream::Ptr m_Stream;
+       WorkQueue m_WorkQueue;
 
        Timer::Ptr m_ReconnectTimer;
 
        void CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
+       void CheckResultHandlerInternal(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
        void NotificationToUserHandler(const Notification::Ptr& notification, const Checkable::Ptr& checkable,
-       const User::Ptr& user, NotificationType notification_type, CheckResult::Ptr const& cr,
-       const String& author, const String& comment_text, const String& command_name);
-       String ComposeGelfMessage(const Dictionary::Ptr& fields, const String& source, double ts);
+           const User::Ptr& user, NotificationType notificationType, const CheckResult::Ptr& cr,
+           const String& author, const String& commentText, const String& commandName);
+       void NotificationToUserHandlerInternal(const Notification::Ptr& notification, const Checkable::Ptr& checkable,
+           const User::Ptr& user, NotificationType notification_type, const CheckResult::Ptr& cr,
+           const String& author, const String& comment_text, const String& command_name);
        void StateChangeHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type);
-       void SendLogMessage(const String& gelf);
+       void StateChangeHandlerInternal(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type);
+
+       String ComposeGelfMessage(const Dictionary::Ptr& fields, const String& source, double ts);
+       void SendLogMessage(const String& gelfMessage);
 
        void ReconnectTimerHandler(void);
+
+       void Disconnect(void);
+       void Reconnect(void);
+
+       void AssertOnWorkQueue(void);
+
+       void ExceptionHandler(boost::exception_ptr exp);
 };
 
 }
index 63540daec38d032eb9c1a13e5154649cbe1aaa55..f6f06d2ec188821f4a34e8536f46d5f5d67971a7 100644 (file)
@@ -38,6 +38,11 @@ class GelfWriter : ConfigObject
        [config] bool enable_send_perfdata {
                default {{{ return false; }}}
        };
+
+       [no_user_modify] bool connected;
+       [no_user_modify] bool should_connect {
+               default {{{ return true; }}}
+       };
 };
 
 }