]> granicus.if.org Git - icinga2/commitdiff
GraphiteWriter: Use a workqueue for event processing
authorMichael Friedrich <michael.friedrich@icinga.com>
Mon, 15 May 2017 15:35:36 +0000 (17:35 +0200)
committerMichael Friedrich <michael.friedrich@icinga.com>
Fri, 26 May 2017 13:18:14 +0000 (15:18 +0200)
This also adds reconnect handling and exceptions.

refs #5132
refs #5133
refs #5280

lib/perfdata/graphitewriter.cpp
lib/perfdata/graphitewriter.hpp
lib/perfdata/graphitewriter.ti

index ee59a7363c9f7c852027676b8dc0baddf3822199..df493d7d2ceaa2f14462852a25e29e85b9418c99 100644 (file)
@@ -22,7 +22,6 @@
 #include "icinga/service.hpp"
 #include "icinga/macroprocessor.hpp"
 #include "icinga/icingaapplication.hpp"
-#include "icinga/compatutility.hpp"
 #include "base/tcpsocket.hpp"
 #include "base/configtype.hpp"
 #include "base/objectlock.hpp"
@@ -46,12 +45,33 @@ REGISTER_TYPE(GraphiteWriter);
 
 REGISTER_STATSFUNCTION(GraphiteWriter, &GraphiteWriter::StatsFunc);
 
-void GraphiteWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr&)
+GraphiteWriter::GraphiteWriter(void)
+    : m_WorkQueue(10000000, 1)
+{ }
+
+void GraphiteWriter::OnConfigLoaded(void)
+{
+       ObjectImpl<GraphiteWriter>::OnConfigLoaded();
+
+       m_WorkQueue.SetName("GraphiteWriter, " + GetName());
+}
+
+void GraphiteWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata)
 {
        Dictionary::Ptr nodes = new Dictionary();
 
        for (const GraphiteWriter::Ptr& graphitewriter : ConfigType::GetObjectsByType<GraphiteWriter>()) {
-               nodes->Set(graphitewriter->GetName(), 1); //add more stats
+               size_t workQueueItems = graphitewriter->m_WorkQueue.GetLength();
+               double workQueueItemRate = graphitewriter->m_WorkQueue.GetTaskCount(60) / 60.0;
+
+               Dictionary::Ptr stats = new Dictionary();
+               stats->Set("work_queue_items", workQueueItems);
+               stats->Set("work_queue_item_rate", workQueueItemRate);
+
+               nodes->Set(graphitewriter->GetName(), stats);
+
+               perfdata->Add(new PerfdataValue("graphitewriter_" + graphitewriter->GetName() + "_work_queue_items", workQueueItems));
+               perfdata->Add(new PerfdataValue("graphitewriter_" + graphitewriter->GetName() + "_work_queue_item_rate", workQueueItemRate));
        }
 
        status->Set("graphitewriter", nodes);
@@ -64,6 +84,10 @@ void GraphiteWriter::Start(bool runtimeCreated)
        Log(LogInformation, "GraphiteWriter")
            << "'" << GetName() << "' started.";
 
+       /* Register exception handler for WQ tasks. */
+       m_WorkQueue.SetExceptionCallback(boost::bind(&GraphiteWriter::ExceptionHandler, this, _1));
+
+       /* Timer for reconnecting */
        m_ReconnectTimer = new Timer();
        m_ReconnectTimer->SetInterval(10);
        m_ReconnectTimer->OnTimerExpired.connect(boost::bind(&GraphiteWriter::ReconnectTimerHandler, this));
@@ -78,12 +102,43 @@ void GraphiteWriter::Stop(bool runtimeRemoved)
        Log(LogInformation, "GraphiteWriter")
            << "'" << GetName() << "' stopped.";
 
+       m_WorkQueue.Join();
+
        ObjectImpl<GraphiteWriter>::Stop(runtimeRemoved);
 }
 
-void GraphiteWriter::ReconnectTimerHandler(void)
+void GraphiteWriter::AssertOnWorkQueue(void)
+{
+       ASSERT(m_WorkQueue.IsWorkerThread());
+}
+
+void GraphiteWriter::ExceptionHandler(boost::exception_ptr exp)
+{
+       Log(LogCritical, "GraphiteWriter", "Exception during Graphite operation: Verify that your backend is operational!");
+
+       Log(LogDebug, "GraphiteWriter")
+           << "Exception during Graphite operation: " << DiagnosticInformation(exp);
+
+       if (GetConnected()) {
+               m_Stream->Close();
+
+               SetConnected(false);
+       }
+}
+
+void GraphiteWriter::Reconnect(void)
 {
-       if (m_Stream)
+       AssertOnWorkQueue();
+
+       double startTime = Utility::GetTime();
+
+       CONTEXT("Reconnecting to Graphite '" + GetName() + "'");
+
+       SetShouldConnect(true);
+
+       bool reconnect = false;
+
+       if (GetConnected())
                return;
 
        TcpSocket::Ptr socket = new TcpSocket();
@@ -93,17 +148,46 @@ void GraphiteWriter::ReconnectTimerHandler(void)
 
        try {
                socket->Connect(GetHost(), GetPort());
-       } catch (std::exception&) {
+       } catch (const std::exception& ex) {
                Log(LogCritical, "GraphiteWriter")
                    << "Can't connect to Graphite on host '" << GetHost() << "' port '" << GetPort() << "'.";
-               return;
+               throw ex;
        }
 
        m_Stream = new NetworkStream(socket);
+
+       SetConnected(true);
+
+       Log(LogInformation, "GraphiteWriter")
+           << "Finished reconnecting to Graphite in " << std::setw(2) << Utility::GetTime() - startTime << " second(s).";
+}
+
+void GraphiteWriter::ReconnectTimerHandler(void)
+{
+       m_WorkQueue.Enqueue(boost::bind(&GraphiteWriter::Reconnect, this), PriorityNormal);
+}
+
+void GraphiteWriter::Disconnect(void)
+{
+       AssertOnWorkQueue();
+
+       if (!GetConnected())
+               return;
+
+       m_Stream->Close();
+
+       SetConnected(false);
 }
 
 void GraphiteWriter::CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
 {
+       m_WorkQueue.Enqueue(boost::bind(&GraphiteWriter::InternalCheckResultHandler, this, checkable, cr));
+}
+
+void GraphiteWriter::InternalCheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
+{
+       AssertOnWorkQueue();
+
        CONTEXT("Processing check result for '" + checkable->GetName() + "'");
 
        if (!IcingaApplication::GetInstance()->GetEnablePerfdata() || !checkable->GetEnablePerfdata())
index 8eca129b4d1b0d31ba3b58550e604b360c1b5c0b..1cd3f090c377a308fda9b011e02cba18f5c50137 100644 (file)
@@ -25,6 +25,7 @@
 #include "base/configobject.hpp"
 #include "base/tcpsocket.hpp"
 #include "base/timer.hpp"
+#include "base/workqueue.hpp"
 #include <fstream>
 
 namespace icinga
@@ -41,21 +42,26 @@ public:
        DECLARE_OBJECT(GraphiteWriter);
        DECLARE_OBJECTNAME(GraphiteWriter);
 
+       GraphiteWriter(void);
+
        static void StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata);
 
        virtual void ValidateHostNameTemplate(const String& value, const ValidationUtils& utils) override;
        virtual void ValidateServiceNameTemplate(const String& value, const ValidationUtils& utils) override;
 
 protected:
+       virtual void OnConfigLoaded(void) override;
        virtual void Start(bool runtimeCreated) override;
        virtual void Stop(bool runtimeRemoved) override;
 
 private:
        Stream::Ptr m_Stream;
+       WorkQueue m_WorkQueue;
 
        Timer::Ptr m_ReconnectTimer;
 
        void CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
+       void InternalCheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
        void SendMetric(const String& prefix, const String& name, double value, double ts);
        void SendPerfdata(const String& prefix, const CheckResult::Ptr& cr, double ts);
        static String EscapeMetric(const String& str, bool legacyMode = false);
@@ -63,6 +69,13 @@ private:
        static Value EscapeMacroMetric(const Value& value, bool legacyMode = false);
 
        void ReconnectTimerHandler(void);
+
+       void Disconnect(void);
+       void Reconnect(void);
+
+       void AssertOnWorkQueue(void);
+
+       void ExceptionHandler(boost::exception_ptr exp);
 };
 
 }
index 3e5ff29f83e3cb6e419419bf972803921d64275f..270635886faefd8be47aa6c2284a97771ffc3e62 100644 (file)
@@ -42,6 +42,10 @@ class GraphiteWriter : ConfigObject
         [config] bool enable_send_metadata;
         [config] bool enable_legacy_mode;
 
+       [no_user_modify] bool connected;
+       [no_user_modify] bool should_connect {
+               default {{{ return true; }}}
+       };
 };
 
 }