]> granicus.if.org Git - icinga2/commitdiff
Quality: Use Boost ASIO/IO engine in Graphite feature
authorMichael Friedrich <michael.friedrich@icinga.com>
Mon, 27 May 2019 14:49:51 +0000 (16:49 +0200)
committerMichael Friedrich <michael.friedrich@icinga.com>
Mon, 27 May 2019 14:49:51 +0000 (16:49 +0200)
This commit changes the reconnect priority to high.

Also add function docs.

lib/perfdata/graphitewriter.cpp
lib/perfdata/graphitewriter.hpp

index aee142e5a04c87f5c89515128951ea4acc728239..9eb2e84a6ee2b93b53405a2e8fc251d6141ab6c6 100644 (file)
@@ -28,6 +28,9 @@ REGISTER_TYPE(GraphiteWriter);
 
 REGISTER_STATSFUNCTION(GraphiteWriter, &GraphiteWriter::StatsFunc);
 
+/*
+ * Enable HA capabilities once the config object is loaded.
+ */
 void GraphiteWriter::OnConfigLoaded()
 {
        ObjectImpl<GraphiteWriter>::OnConfigLoaded();
@@ -44,6 +47,12 @@ void GraphiteWriter::OnConfigLoaded()
        }
 }
 
+/**
+ * Feature stats interface
+ *
+ * @param status Key value pairs for feature stats
+ * @param perfdata Array of PerfdataValue objects
+ */
 void GraphiteWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata)
 {
        DictionaryData nodes;
@@ -65,6 +74,9 @@ void GraphiteWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr&
        status->Set("graphitewriter", new Dictionary(std::move(nodes)));
 }
 
+/**
+ * Resume is equivalent to Start, but with HA capabilities to resume at runtime.
+ */
 void GraphiteWriter::Resume()
 {
        ObjectImpl<GraphiteWriter>::Resume();
@@ -86,7 +98,9 @@ void GraphiteWriter::Resume()
        Checkable::OnNewCheckResult.connect(std::bind(&GraphiteWriter::CheckResultHandler, this, _1, _2));
 }
 
-/* Pause is equivalent to Stop, but with HA capabilities to resume at runtime. */
+/**
+ * Pause is equivalent to Stop, but with HA capabilities to resume at runtime.
+ */
 void GraphiteWriter::Pause()
 {
        m_ReconnectTimer.reset();
@@ -110,11 +124,21 @@ void GraphiteWriter::Pause()
        ObjectImpl<GraphiteWriter>::Pause();
 }
 
+/**
+ * Check if method is called inside the WQ thread.
+ */
 void GraphiteWriter::AssertOnWorkQueue()
 {
        ASSERT(m_WorkQueue.IsWorkerThread());
 }
 
+/**
+ * Exception handler for the WQ.
+ *
+ * Closes the connection if connected.
+ *
+ * @param exp Exception pointer
+ */
 void GraphiteWriter::ExceptionHandler(boost::exception_ptr exp)
 {
        Log(LogCritical, "GraphiteWriter", "Exception during Graphite operation: Verify that your backend is operational!");
@@ -123,12 +147,17 @@ void GraphiteWriter::ExceptionHandler(boost::exception_ptr exp)
                << "Exception during Graphite operation: " << DiagnosticInformation(std::move(exp));
 
        if (GetConnected()) {
-               m_Stream->Close();
+               m_Stream->close();
 
                SetConnected(false);
        }
 }
 
+/**
+ * Reconnect method, stops when the feature is paused in HA zones.
+ *
+ * Called inside the WQ.
+ */
 void GraphiteWriter::Reconnect()
 {
        AssertOnWorkQueue();
@@ -141,6 +170,9 @@ void GraphiteWriter::Reconnect()
        ReconnectInternal();
 }
 
+/**
+ * Reconnect method, connects to a TCP Stream
+ */
 void GraphiteWriter::ReconnectInternal()
 {
        double startTime = Utility::GetTime();
@@ -152,35 +184,42 @@ void GraphiteWriter::ReconnectInternal()
        if (GetConnected())
                return;
 
-       TcpSocket::Ptr socket = new TcpSocket();
-
        Log(LogNotice, "GraphiteWriter")
                << "Reconnecting to Graphite on host '" << GetHost() << "' port '" << GetPort() << "'.";
 
+       m_Stream = std::make_shared<AsioTcpStream>(IoEngine::Get().GetIoService());
+
        try {
-               socket->Connect(GetHost(), GetPort());
+               icinga::Connect(m_Stream->lowest_layer(), GetHost(), GetPort());
        } catch (const std::exception& ex) {
-               Log(LogCritical, "GraphiteWriter")
-                       << "Can't connect to Graphite on host '" << GetHost() << "' port '" << GetPort() << "'.";
-               throw ex;
+               Log(LogWarning, "GraphiteWriter")
+                       << "Can't connect to Graphite on host '" << GetHost() << "' port '" << GetPort() << ".'";
        }
 
-       m_Stream = new NetworkStream(socket);
-
        SetConnected(true);
 
        Log(LogInformation, "GraphiteWriter")
                << "Finished reconnecting to Graphite in " << std::setw(2) << Utility::GetTime() - startTime << " second(s).";
 }
 
+/**
+ * Reconnect handler called by the timer.
+ *
+ * Enqueues a reconnect task into the WQ.
+ */
 void GraphiteWriter::ReconnectTimerHandler()
 {
        if (IsPaused())
                return;
 
-       m_WorkQueue.Enqueue(std::bind(&GraphiteWriter::Reconnect, this), PriorityNormal);
+       m_WorkQueue.Enqueue(std::bind(&GraphiteWriter::Reconnect, this), PriorityHigh);
 }
 
+/**
+ * Disconnect the stream.
+ *
+ * Called inside the WQ.
+ */
 void GraphiteWriter::Disconnect()
 {
        AssertOnWorkQueue();
@@ -188,16 +227,27 @@ void GraphiteWriter::Disconnect()
        DisconnectInternal();
 }
 
+/**
+ * Disconnect the stream.
+ *
+ * Called outside the WQ.
+ */
 void GraphiteWriter::DisconnectInternal()
 {
        if (!GetConnected())
                return;
 
-       m_Stream->Close();
+       m_Stream->close();
 
        SetConnected(false);
 }
 
+/**
+ * Check result event handler, checks whether feature is not paused in HA setups.
+ *
+ * @param checkable Host/Service object
+ * @param cr Check result including performance data
+ */
 void GraphiteWriter::CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
 {
        if (IsPaused())
@@ -206,6 +256,14 @@ void GraphiteWriter::CheckResultHandler(const Checkable::Ptr& checkable, const C
        m_WorkQueue.Enqueue(std::bind(&GraphiteWriter::CheckResultHandlerInternal, this, checkable, cr));
 }
 
+/**
+ * Check result event handler, prepares metadata and perfdata values and calls Send*()
+ *
+ * Called inside the WQ.
+ *
+ * @param checkable Host/Service object
+ * @param cr Check result including performance data
+ */
 void GraphiteWriter::CheckResultHandlerInternal(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
 {
        AssertOnWorkQueue();
@@ -262,6 +320,14 @@ void GraphiteWriter::CheckResultHandlerInternal(const Checkable::Ptr& checkable,
        SendPerfdata(checkable, prefixPerfdata, cr, ts);
 }
 
+/**
+ * Parse performance data from check result and call SendMetric()
+ *
+ * @param checkable Host/service object
+ * @param prefix Metric prefix string
+ * @param cr Check result including performance data
+ * @param ts Timestamp when the check result was created
+ */
 void GraphiteWriter::SendPerfdata(const Checkable::Ptr& checkable, const String& prefix, const CheckResult::Ptr& cr, double ts)
 {
        Array::Ptr perfdata = cr->GetPerformanceData();
@@ -306,8 +372,19 @@ void GraphiteWriter::SendPerfdata(const Checkable::Ptr& checkable, const String&
        }
 }
 
+/**
+ * Computes metric data and sends to Graphite
+ *
+ * @param checkable Host/service object
+ * @param prefix Computed metric prefix string
+ * @param name Metric name
+ * @param value Metric value
+ * @param ts Timestamp when the check result was created
+ */
 void GraphiteWriter::SendMetric(const Checkable::Ptr& checkable, const String& prefix, const String& name, double value, double ts)
 {
+       namespace asio = boost::asio;
+
        std::ostringstream msgbuf;
        msgbuf << prefix << "." << name << " " << Convert::ToString(value) << " " << static_cast<long>(ts);
 
@@ -316,7 +393,6 @@ void GraphiteWriter::SendMetric(const Checkable::Ptr& checkable, const String& p
 
        // do not send \n to debug log
        msgbuf << "\n";
-       String metric = msgbuf.str();
 
        boost::mutex::scoped_lock lock(m_StreamMutex);
 
@@ -324,7 +400,8 @@ void GraphiteWriter::SendMetric(const Checkable::Ptr& checkable, const String& p
                return;
 
        try {
-               m_Stream->Write(metric.CStr(), metric.GetLength());
+               asio::write(*m_Stream, asio::buffer(msgbuf.str()));
+               m_Stream->flush();
        } catch (const std::exception& ex) {
                Log(LogCritical, "GraphiteWriter")
                        << "Cannot write to TCP socket on host '" << GetHost() << "' port '" << GetPort() << "'.";
@@ -333,6 +410,14 @@ void GraphiteWriter::SendMetric(const Checkable::Ptr& checkable, const String& p
        }
 }
 
+/**
+ * Escape metric tree elements
+ *
+ * Dots are not allowed, e.g. in host names
+ *
+ * @param str Metric part name
+ * @return Escape string
+ */
 String GraphiteWriter::EscapeMetric(const String& str)
 {
        String result = str;
@@ -346,6 +431,14 @@ String GraphiteWriter::EscapeMetric(const String& str)
        return result;
 }
 
+/**
+ * Escape metric label
+ *
+ * Dots are allowed - users can create trees from perfdata labels
+ *
+ * @param str Metric label name
+ * @return Escaped string
+ */
 String GraphiteWriter::EscapeMetricLabel(const String& str)
 {
        String result = str;
@@ -359,6 +452,12 @@ String GraphiteWriter::EscapeMetricLabel(const String& str)
        return result;
 }
 
+/**
+ * Escape macro metrics found via host/service name templates
+ *
+ * @param value Array or string with macro metric names
+ * @return Escaped string. Arrays are joined with dots.
+ */
 Value GraphiteWriter::EscapeMacroMetric(const Value& value)
 {
        if (value.IsObjectType<Array>()) {
@@ -375,6 +474,12 @@ Value GraphiteWriter::EscapeMacroMetric(const Value& value)
                return EscapeMetric(value);
 }
 
+/**
+ * Validate the configuration setting 'host_name_template'
+ *
+ * @param lvalue String containing runtime macros.
+ * @param utils Helper, unused
+ */
 void GraphiteWriter::ValidateHostNameTemplate(const Lazy<String>& lvalue, const ValidationUtils& utils)
 {
        ObjectImpl<GraphiteWriter>::ValidateHostNameTemplate(lvalue, utils);
@@ -383,6 +488,12 @@ void GraphiteWriter::ValidateHostNameTemplate(const Lazy<String>& lvalue, const
                BOOST_THROW_EXCEPTION(ValidationError(this, { "host_name_template" }, "Closing $ not found in macro format string '" + lvalue() + "'."));
 }
 
+/**
+ * Validate the configuration setting 'service_name_template'
+ *
+ * @param lvalue String containing runtime macros.
+ * @param utils Helper, unused
+ */
 void GraphiteWriter::ValidateServiceNameTemplate(const Lazy<String>& lvalue, const ValidationUtils& utils)
 {
        ObjectImpl<GraphiteWriter>::ValidateServiceNameTemplate(lvalue, utils);
index 2756c0fd451fd5f76431021e1eaca7ecb629eb3d..42f741cee3ba905ef8eb7c72cc079b2a5430749e 100644 (file)
@@ -37,7 +37,7 @@ protected:
        void Pause() override;
 
 private:
-       Stream::Ptr m_Stream;
+       std::shared_ptr<AsioTcpStream> m_Stream;
        boost::mutex m_StreamMutex;
        WorkQueue m_WorkQueue{10000000, 1};