From: Michael Friedrich <michael.friedrich@icinga.com>
Date: Mon, 27 May 2019 14:49:51 +0000 (+0200)
Subject: Quality: Use Boost ASIO/IO engine in Graphite feature
X-Git-Tag: v2.11.0-rc1~85^2~3
X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=efd4e8ad408254b1b5b51a1656e9dcf8a5e3dd7e;p=icinga2

Quality: Use Boost ASIO/IO engine in Graphite feature

This commit changes the reconnect priority to high.

Also add function docs.
---

diff --git a/lib/perfdata/graphitewriter.cpp b/lib/perfdata/graphitewriter.cpp
index aee142e5a..9eb2e84a6 100644
--- a/lib/perfdata/graphitewriter.cpp
+++ b/lib/perfdata/graphitewriter.cpp
@@ -28,6 +28,9 @@ REGISTER_TYPE(GraphiteWriter);
 
 REGISTER_STATSFUNCTION(GraphiteWriter, &GraphiteWriter::StatsFunc);
 
+/*
+ * Enable HA capabilities once the config object is loaded.
+ */
 void GraphiteWriter::OnConfigLoaded()
 {
 	ObjectImpl<GraphiteWriter>::OnConfigLoaded();
@@ -44,6 +47,12 @@ void GraphiteWriter::OnConfigLoaded()
 	}
 }
 
+/**
+ * Feature stats interface
+ *
+ * @param status Key value pairs for feature stats
+ * @param perfdata Array of PerfdataValue objects
+ */
 void GraphiteWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata)
 {
 	DictionaryData nodes;
@@ -65,6 +74,9 @@ void GraphiteWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr&
 	status->Set("graphitewriter", new Dictionary(std::move(nodes)));
 }
 
+/**
+ * Resume is equivalent to Start, but with HA capabilities to resume at runtime.
+ */
 void GraphiteWriter::Resume()
 {
 	ObjectImpl<GraphiteWriter>::Resume();
@@ -86,7 +98,9 @@ void GraphiteWriter::Resume()
 	Checkable::OnNewCheckResult.connect(std::bind(&GraphiteWriter::CheckResultHandler, this, _1, _2));
 }
 
-/* Pause is equivalent to Stop, but with HA capabilities to resume at runtime. */
+/**
+ * Pause is equivalent to Stop, but with HA capabilities to resume at runtime.
+ */
 void GraphiteWriter::Pause()
 {
 	m_ReconnectTimer.reset();
@@ -110,11 +124,21 @@ void GraphiteWriter::Pause()
 	ObjectImpl<GraphiteWriter>::Pause();
 }
 
+/**
+ * Check if method is called inside the WQ thread.
+ */
 void GraphiteWriter::AssertOnWorkQueue()
 {
 	ASSERT(m_WorkQueue.IsWorkerThread());
 }
 
+/**
+ * Exception handler for the WQ.
+ *
+ * Closes the connection if connected.
+ *
+ * @param exp Exception pointer
+ */
 void GraphiteWriter::ExceptionHandler(boost::exception_ptr exp)
 {
 	Log(LogCritical, "GraphiteWriter", "Exception during Graphite operation: Verify that your backend is operational!");
@@ -123,12 +147,17 @@ void GraphiteWriter::ExceptionHandler(boost::exception_ptr exp)
 		<< "Exception during Graphite operation: " << DiagnosticInformation(std::move(exp));
 
 	if (GetConnected()) {
-		m_Stream->Close();
+		m_Stream->close();
 
 		SetConnected(false);
 	}
 }
 
+/**
+ * Reconnect method, stops when the feature is paused in HA zones.
+ *
+ * Called inside the WQ.
+ */
 void GraphiteWriter::Reconnect()
 {
 	AssertOnWorkQueue();
@@ -141,6 +170,9 @@ void GraphiteWriter::Reconnect()
 	ReconnectInternal();
 }
 
+/**
+ * Reconnect method, connects to a TCP Stream
+ */
 void GraphiteWriter::ReconnectInternal()
 {
 	double startTime = Utility::GetTime();
@@ -152,35 +184,42 @@ void GraphiteWriter::ReconnectInternal()
 	if (GetConnected())
 		return;
 
-	TcpSocket::Ptr socket = new TcpSocket();
-
 	Log(LogNotice, "GraphiteWriter")
 		<< "Reconnecting to Graphite on host '" << GetHost() << "' port '" << GetPort() << "'.";
 
+	m_Stream = std::make_shared<AsioTcpStream>(IoEngine::Get().GetIoService());
+
 	try {
-		socket->Connect(GetHost(), GetPort());
+		icinga::Connect(m_Stream->lowest_layer(), GetHost(), GetPort());
 	} catch (const std::exception& ex) {
-		Log(LogCritical, "GraphiteWriter")
-			<< "Can't connect to Graphite on host '" << GetHost() << "' port '" << GetPort() << "'.";
-		throw ex;
+		Log(LogWarning, "GraphiteWriter")
+			<< "Can't connect to Graphite on host '" << GetHost() << "' port '" << GetPort() << ".'";
 	}
 
-	m_Stream = new NetworkStream(socket);
-
 	SetConnected(true);
 
 	Log(LogInformation, "GraphiteWriter")
 		<< "Finished reconnecting to Graphite in " << std::setw(2) << Utility::GetTime() - startTime << " second(s).";
 }
 
+/**
+ * Reconnect handler called by the timer.
+ *
+ * Enqueues a reconnect task into the WQ.
+ */
 void GraphiteWriter::ReconnectTimerHandler()
 {
 	if (IsPaused())
 		return;
 
-	m_WorkQueue.Enqueue(std::bind(&GraphiteWriter::Reconnect, this), PriorityNormal);
+	m_WorkQueue.Enqueue(std::bind(&GraphiteWriter::Reconnect, this), PriorityHigh);
 }
 
+/**
+ * Disconnect the stream.
+ *
+ * Called inside the WQ.
+ */
 void GraphiteWriter::Disconnect()
 {
 	AssertOnWorkQueue();
@@ -188,16 +227,27 @@ void GraphiteWriter::Disconnect()
 	DisconnectInternal();
 }
 
+/**
+ * Disconnect the stream.
+ *
+ * Called outside the WQ.
+ */
 void GraphiteWriter::DisconnectInternal()
 {
 	if (!GetConnected())
 		return;
 
-	m_Stream->Close();
+	m_Stream->close();
 
 	SetConnected(false);
 }
 
+/**
+ * Check result event handler, checks whether feature is not paused in HA setups.
+ *
+ * @param checkable Host/Service object
+ * @param cr Check result including performance data
+ */
 void GraphiteWriter::CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
 {
 	if (IsPaused())
@@ -206,6 +256,14 @@ void GraphiteWriter::CheckResultHandler(const Checkable::Ptr& checkable, const C
 	m_WorkQueue.Enqueue(std::bind(&GraphiteWriter::CheckResultHandlerInternal, this, checkable, cr));
 }
 
+/**
+ * Check result event handler, prepares metadata and perfdata values and calls Send*()
+ *
+ * Called inside the WQ.
+ *
+ * @param checkable Host/Service object
+ * @param cr Check result including performance data
+ */
 void GraphiteWriter::CheckResultHandlerInternal(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
 {
 	AssertOnWorkQueue();
@@ -262,6 +320,14 @@ void GraphiteWriter::CheckResultHandlerInternal(const Checkable::Ptr& checkable,
 	SendPerfdata(checkable, prefixPerfdata, cr, ts);
 }
 
+/**
+ * Parse performance data from check result and call SendMetric()
+ *
+ * @param checkable Host/service object
+ * @param prefix Metric prefix string
+ * @param cr Check result including performance data
+ * @param ts Timestamp when the check result was created
+ */
 void GraphiteWriter::SendPerfdata(const Checkable::Ptr& checkable, const String& prefix, const CheckResult::Ptr& cr, double ts)
 {
 	Array::Ptr perfdata = cr->GetPerformanceData();
@@ -306,8 +372,19 @@ void GraphiteWriter::SendPerfdata(const Checkable::Ptr& checkable, const String&
 	}
 }
 
+/**
+ * Computes metric data and sends to Graphite
+ *
+ * @param checkable Host/service object
+ * @param prefix Computed metric prefix string
+ * @param name Metric name
+ * @param value Metric value
+ * @param ts Timestamp when the check result was created
+ */
 void GraphiteWriter::SendMetric(const Checkable::Ptr& checkable, const String& prefix, const String& name, double value, double ts)
 {
+	namespace asio = boost::asio;
+
 	std::ostringstream msgbuf;
 	msgbuf << prefix << "." << name << " " << Convert::ToString(value) << " " << static_cast<long>(ts);
 
@@ -316,7 +393,6 @@ void GraphiteWriter::SendMetric(const Checkable::Ptr& checkable, const String& p
 
 	// do not send \n to debug log
 	msgbuf << "\n";
-	String metric = msgbuf.str();
 
 	boost::mutex::scoped_lock lock(m_StreamMutex);
 
@@ -324,7 +400,8 @@ void GraphiteWriter::SendMetric(const Checkable::Ptr& checkable, const String& p
 		return;
 
 	try {
-		m_Stream->Write(metric.CStr(), metric.GetLength());
+		asio::write(*m_Stream, asio::buffer(msgbuf.str()));
+		m_Stream->flush();
 	} catch (const std::exception& ex) {
 		Log(LogCritical, "GraphiteWriter")
 			<< "Cannot write to TCP socket on host '" << GetHost() << "' port '" << GetPort() << "'.";
@@ -333,6 +410,14 @@ void GraphiteWriter::SendMetric(const Checkable::Ptr& checkable, const String& p
 	}
 }
 
+/**
+ * Escape metric tree elements
+ *
+ * Dots are not allowed, e.g. in host names
+ *
+ * @param str Metric part name
+ * @return Escape string
+ */
 String GraphiteWriter::EscapeMetric(const String& str)
 {
 	String result = str;
@@ -346,6 +431,14 @@ String GraphiteWriter::EscapeMetric(const String& str)
 	return result;
 }
 
+/**
+ * Escape metric label
+ *
+ * Dots are allowed - users can create trees from perfdata labels
+ *
+ * @param str Metric label name
+ * @return Escaped string
+ */
 String GraphiteWriter::EscapeMetricLabel(const String& str)
 {
 	String result = str;
@@ -359,6 +452,12 @@ String GraphiteWriter::EscapeMetricLabel(const String& str)
 	return result;
 }
 
+/**
+ * Escape macro metrics found via host/service name templates
+ *
+ * @param value Array or string with macro metric names
+ * @return Escaped string. Arrays are joined with dots.
+ */
 Value GraphiteWriter::EscapeMacroMetric(const Value& value)
 {
 	if (value.IsObjectType<Array>()) {
@@ -375,6 +474,12 @@ Value GraphiteWriter::EscapeMacroMetric(const Value& value)
 		return EscapeMetric(value);
 }
 
+/**
+ * Validate the configuration setting 'host_name_template'
+ *
+ * @param lvalue String containing runtime macros.
+ * @param utils Helper, unused
+ */
 void GraphiteWriter::ValidateHostNameTemplate(const Lazy<String>& lvalue, const ValidationUtils& utils)
 {
 	ObjectImpl<GraphiteWriter>::ValidateHostNameTemplate(lvalue, utils);
@@ -383,6 +488,12 @@ void GraphiteWriter::ValidateHostNameTemplate(const Lazy<String>& lvalue, const
 		BOOST_THROW_EXCEPTION(ValidationError(this, { "host_name_template" }, "Closing $ not found in macro format string '" + lvalue() + "'."));
 }
 
+/**
+ * Validate the configuration setting 'service_name_template'
+ *
+ * @param lvalue String containing runtime macros.
+ * @param utils Helper, unused
+ */
 void GraphiteWriter::ValidateServiceNameTemplate(const Lazy<String>& lvalue, const ValidationUtils& utils)
 {
 	ObjectImpl<GraphiteWriter>::ValidateServiceNameTemplate(lvalue, utils);
diff --git a/lib/perfdata/graphitewriter.hpp b/lib/perfdata/graphitewriter.hpp
index 2756c0fd4..42f741cee 100644
--- a/lib/perfdata/graphitewriter.hpp
+++ b/lib/perfdata/graphitewriter.hpp
@@ -37,7 +37,7 @@ protected:
 	void Pause() override;
 
 private:
-	Stream::Ptr m_Stream;
+	std::shared_ptr<AsioTcpStream> m_Stream;
 	boost::mutex m_StreamMutex;
 	WorkQueue m_WorkQueue{10000000, 1};