From: Michael Friedrich Date: Mon, 15 May 2017 15:35:36 +0000 (+0200) Subject: GraphiteWriter: Use a workqueue for event processing X-Git-Tag: v2.7.0~56^2~2 X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=28395b32f06948f04abf977ac735ad867fd7230f;p=icinga2 GraphiteWriter: Use a workqueue for event processing This also adds reconnect handling and exceptions. refs #5132 refs #5133 refs #5280 --- diff --git a/lib/perfdata/graphitewriter.cpp b/lib/perfdata/graphitewriter.cpp index ee59a7363..df493d7d2 100644 --- a/lib/perfdata/graphitewriter.cpp +++ b/lib/perfdata/graphitewriter.cpp @@ -22,7 +22,6 @@ #include "icinga/service.hpp" #include "icinga/macroprocessor.hpp" #include "icinga/icingaapplication.hpp" -#include "icinga/compatutility.hpp" #include "base/tcpsocket.hpp" #include "base/configtype.hpp" #include "base/objectlock.hpp" @@ -46,12 +45,33 @@ REGISTER_TYPE(GraphiteWriter); REGISTER_STATSFUNCTION(GraphiteWriter, &GraphiteWriter::StatsFunc); -void GraphiteWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr&) +GraphiteWriter::GraphiteWriter(void) + : m_WorkQueue(10000000, 1) +{ } + +void GraphiteWriter::OnConfigLoaded(void) +{ + ObjectImpl::OnConfigLoaded(); + + m_WorkQueue.SetName("GraphiteWriter, " + GetName()); +} + +void GraphiteWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata) { Dictionary::Ptr nodes = new Dictionary(); for (const GraphiteWriter::Ptr& graphitewriter : ConfigType::GetObjectsByType()) { - nodes->Set(graphitewriter->GetName(), 1); //add more stats + size_t workQueueItems = graphitewriter->m_WorkQueue.GetLength(); + double workQueueItemRate = graphitewriter->m_WorkQueue.GetTaskCount(60) / 60.0; + + Dictionary::Ptr stats = new Dictionary(); + stats->Set("work_queue_items", workQueueItems); + stats->Set("work_queue_item_rate", workQueueItemRate); + + nodes->Set(graphitewriter->GetName(), stats); + + perfdata->Add(new PerfdataValue("graphitewriter_" + graphitewriter->GetName() + "_work_queue_items", workQueueItems)); + perfdata->Add(new PerfdataValue("graphitewriter_" + graphitewriter->GetName() + "_work_queue_item_rate", workQueueItemRate)); } status->Set("graphitewriter", nodes); @@ -64,6 +84,10 @@ void GraphiteWriter::Start(bool runtimeCreated) Log(LogInformation, "GraphiteWriter") << "'" << GetName() << "' started."; + /* Register exception handler for WQ tasks. */ + m_WorkQueue.SetExceptionCallback(boost::bind(&GraphiteWriter::ExceptionHandler, this, _1)); + + /* Timer for reconnecting */ m_ReconnectTimer = new Timer(); m_ReconnectTimer->SetInterval(10); m_ReconnectTimer->OnTimerExpired.connect(boost::bind(&GraphiteWriter::ReconnectTimerHandler, this)); @@ -78,12 +102,43 @@ void GraphiteWriter::Stop(bool runtimeRemoved) Log(LogInformation, "GraphiteWriter") << "'" << GetName() << "' stopped."; + m_WorkQueue.Join(); + ObjectImpl::Stop(runtimeRemoved); } -void GraphiteWriter::ReconnectTimerHandler(void) +void GraphiteWriter::AssertOnWorkQueue(void) +{ + ASSERT(m_WorkQueue.IsWorkerThread()); +} + +void GraphiteWriter::ExceptionHandler(boost::exception_ptr exp) +{ + Log(LogCritical, "GraphiteWriter", "Exception during Graphite operation: Verify that your backend is operational!"); + + Log(LogDebug, "GraphiteWriter") + << "Exception during Graphite operation: " << DiagnosticInformation(exp); + + if (GetConnected()) { + m_Stream->Close(); + + SetConnected(false); + } +} + +void GraphiteWriter::Reconnect(void) { - if (m_Stream) + AssertOnWorkQueue(); + + double startTime = Utility::GetTime(); + + CONTEXT("Reconnecting to Graphite '" + GetName() + "'"); + + SetShouldConnect(true); + + bool reconnect = false; + + if (GetConnected()) return; TcpSocket::Ptr socket = new TcpSocket(); @@ -93,17 +148,46 @@ void GraphiteWriter::ReconnectTimerHandler(void) try { socket->Connect(GetHost(), GetPort()); - } catch (std::exception&) { + } catch (const std::exception& ex) { Log(LogCritical, "GraphiteWriter") << "Can't connect to Graphite on host '" << GetHost() << "' port '" << GetPort() << "'."; - return; + throw ex; } m_Stream = new NetworkStream(socket); + + SetConnected(true); + + Log(LogInformation, "GraphiteWriter") + << "Finished reconnecting to Graphite in " << std::setw(2) << Utility::GetTime() - startTime << " second(s)."; +} + +void GraphiteWriter::ReconnectTimerHandler(void) +{ + m_WorkQueue.Enqueue(boost::bind(&GraphiteWriter::Reconnect, this), PriorityNormal); +} + +void GraphiteWriter::Disconnect(void) +{ + AssertOnWorkQueue(); + + if (!GetConnected()) + return; + + m_Stream->Close(); + + SetConnected(false); } void GraphiteWriter::CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr) { + m_WorkQueue.Enqueue(boost::bind(&GraphiteWriter::InternalCheckResultHandler, this, checkable, cr)); +} + +void GraphiteWriter::InternalCheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr) +{ + AssertOnWorkQueue(); + CONTEXT("Processing check result for '" + checkable->GetName() + "'"); if (!IcingaApplication::GetInstance()->GetEnablePerfdata() || !checkable->GetEnablePerfdata()) diff --git a/lib/perfdata/graphitewriter.hpp b/lib/perfdata/graphitewriter.hpp index 8eca129b4..1cd3f090c 100644 --- a/lib/perfdata/graphitewriter.hpp +++ b/lib/perfdata/graphitewriter.hpp @@ -25,6 +25,7 @@ #include "base/configobject.hpp" #include "base/tcpsocket.hpp" #include "base/timer.hpp" +#include "base/workqueue.hpp" #include namespace icinga @@ -41,21 +42,26 @@ public: DECLARE_OBJECT(GraphiteWriter); DECLARE_OBJECTNAME(GraphiteWriter); + GraphiteWriter(void); + static void StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata); virtual void ValidateHostNameTemplate(const String& value, const ValidationUtils& utils) override; virtual void ValidateServiceNameTemplate(const String& value, const ValidationUtils& utils) override; protected: + virtual void OnConfigLoaded(void) override; virtual void Start(bool runtimeCreated) override; virtual void Stop(bool runtimeRemoved) override; private: Stream::Ptr m_Stream; + WorkQueue m_WorkQueue; Timer::Ptr m_ReconnectTimer; void CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr); + void InternalCheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr); void SendMetric(const String& prefix, const String& name, double value, double ts); void SendPerfdata(const String& prefix, const CheckResult::Ptr& cr, double ts); static String EscapeMetric(const String& str, bool legacyMode = false); @@ -63,6 +69,13 @@ private: static Value EscapeMacroMetric(const Value& value, bool legacyMode = false); void ReconnectTimerHandler(void); + + void Disconnect(void); + void Reconnect(void); + + void AssertOnWorkQueue(void); + + void ExceptionHandler(boost::exception_ptr exp); }; } diff --git a/lib/perfdata/graphitewriter.ti b/lib/perfdata/graphitewriter.ti index 3e5ff29f8..270635886 100644 --- a/lib/perfdata/graphitewriter.ti +++ b/lib/perfdata/graphitewriter.ti @@ -42,6 +42,10 @@ class GraphiteWriter : ConfigObject [config] bool enable_send_metadata; [config] bool enable_legacy_mode; + [no_user_modify] bool connected; + [no_user_modify] bool should_connect { + default {{{ return true; }}} + }; }; }