From: Michael Friedrich Date: Mon, 27 May 2019 14:49:51 +0000 (+0200) Subject: Quality: Use Boost ASIO/IO engine in Graphite feature X-Git-Tag: v2.11.0-rc1~85^2~3 X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=efd4e8ad408254b1b5b51a1656e9dcf8a5e3dd7e;p=icinga2 Quality: Use Boost ASIO/IO engine in Graphite feature This commit changes the reconnect priority to high. Also add function docs. --- diff --git a/lib/perfdata/graphitewriter.cpp b/lib/perfdata/graphitewriter.cpp index aee142e5a..9eb2e84a6 100644 --- a/lib/perfdata/graphitewriter.cpp +++ b/lib/perfdata/graphitewriter.cpp @@ -28,6 +28,9 @@ REGISTER_TYPE(GraphiteWriter); REGISTER_STATSFUNCTION(GraphiteWriter, &GraphiteWriter::StatsFunc); +/* + * Enable HA capabilities once the config object is loaded. + */ void GraphiteWriter::OnConfigLoaded() { ObjectImpl::OnConfigLoaded(); @@ -44,6 +47,12 @@ void GraphiteWriter::OnConfigLoaded() } } +/** + * Feature stats interface + * + * @param status Key value pairs for feature stats + * @param perfdata Array of PerfdataValue objects + */ void GraphiteWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata) { DictionaryData nodes; @@ -65,6 +74,9 @@ void GraphiteWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& status->Set("graphitewriter", new Dictionary(std::move(nodes))); } +/** + * Resume is equivalent to Start, but with HA capabilities to resume at runtime. + */ void GraphiteWriter::Resume() { ObjectImpl::Resume(); @@ -86,7 +98,9 @@ void GraphiteWriter::Resume() Checkable::OnNewCheckResult.connect(std::bind(&GraphiteWriter::CheckResultHandler, this, _1, _2)); } -/* Pause is equivalent to Stop, but with HA capabilities to resume at runtime. */ +/** + * Pause is equivalent to Stop, but with HA capabilities to resume at runtime. + */ void GraphiteWriter::Pause() { m_ReconnectTimer.reset(); @@ -110,11 +124,21 @@ void GraphiteWriter::Pause() ObjectImpl::Pause(); } +/** + * Check if method is called inside the WQ thread. + */ void GraphiteWriter::AssertOnWorkQueue() { ASSERT(m_WorkQueue.IsWorkerThread()); } +/** + * Exception handler for the WQ. + * + * Closes the connection if connected. + * + * @param exp Exception pointer + */ void GraphiteWriter::ExceptionHandler(boost::exception_ptr exp) { Log(LogCritical, "GraphiteWriter", "Exception during Graphite operation: Verify that your backend is operational!"); @@ -123,12 +147,17 @@ void GraphiteWriter::ExceptionHandler(boost::exception_ptr exp) << "Exception during Graphite operation: " << DiagnosticInformation(std::move(exp)); if (GetConnected()) { - m_Stream->Close(); + m_Stream->close(); SetConnected(false); } } +/** + * Reconnect method, stops when the feature is paused in HA zones. + * + * Called inside the WQ. + */ void GraphiteWriter::Reconnect() { AssertOnWorkQueue(); @@ -141,6 +170,9 @@ void GraphiteWriter::Reconnect() ReconnectInternal(); } +/** + * Reconnect method, connects to a TCP Stream + */ void GraphiteWriter::ReconnectInternal() { double startTime = Utility::GetTime(); @@ -152,35 +184,42 @@ void GraphiteWriter::ReconnectInternal() if (GetConnected()) return; - TcpSocket::Ptr socket = new TcpSocket(); - Log(LogNotice, "GraphiteWriter") << "Reconnecting to Graphite on host '" << GetHost() << "' port '" << GetPort() << "'."; + m_Stream = std::make_shared(IoEngine::Get().GetIoService()); + try { - socket->Connect(GetHost(), GetPort()); + icinga::Connect(m_Stream->lowest_layer(), GetHost(), GetPort()); } catch (const std::exception& ex) { - Log(LogCritical, "GraphiteWriter") - << "Can't connect to Graphite on host '" << GetHost() << "' port '" << GetPort() << "'."; - throw ex; + Log(LogWarning, "GraphiteWriter") + << "Can't connect to Graphite on host '" << GetHost() << "' port '" << GetPort() << ".'"; } - m_Stream = new NetworkStream(socket); - SetConnected(true); Log(LogInformation, "GraphiteWriter") << "Finished reconnecting to Graphite in " << std::setw(2) << Utility::GetTime() - startTime << " second(s)."; } +/** + * Reconnect handler called by the timer. + * + * Enqueues a reconnect task into the WQ. + */ void GraphiteWriter::ReconnectTimerHandler() { if (IsPaused()) return; - m_WorkQueue.Enqueue(std::bind(&GraphiteWriter::Reconnect, this), PriorityNormal); + m_WorkQueue.Enqueue(std::bind(&GraphiteWriter::Reconnect, this), PriorityHigh); } +/** + * Disconnect the stream. + * + * Called inside the WQ. + */ void GraphiteWriter::Disconnect() { AssertOnWorkQueue(); @@ -188,16 +227,27 @@ void GraphiteWriter::Disconnect() DisconnectInternal(); } +/** + * Disconnect the stream. + * + * Called outside the WQ. + */ void GraphiteWriter::DisconnectInternal() { if (!GetConnected()) return; - m_Stream->Close(); + m_Stream->close(); SetConnected(false); } +/** + * Check result event handler, checks whether feature is not paused in HA setups. + * + * @param checkable Host/Service object + * @param cr Check result including performance data + */ void GraphiteWriter::CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr) { if (IsPaused()) @@ -206,6 +256,14 @@ void GraphiteWriter::CheckResultHandler(const Checkable::Ptr& checkable, const C m_WorkQueue.Enqueue(std::bind(&GraphiteWriter::CheckResultHandlerInternal, this, checkable, cr)); } +/** + * Check result event handler, prepares metadata and perfdata values and calls Send*() + * + * Called inside the WQ. + * + * @param checkable Host/Service object + * @param cr Check result including performance data + */ void GraphiteWriter::CheckResultHandlerInternal(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr) { AssertOnWorkQueue(); @@ -262,6 +320,14 @@ void GraphiteWriter::CheckResultHandlerInternal(const Checkable::Ptr& checkable, SendPerfdata(checkable, prefixPerfdata, cr, ts); } +/** + * Parse performance data from check result and call SendMetric() + * + * @param checkable Host/service object + * @param prefix Metric prefix string + * @param cr Check result including performance data + * @param ts Timestamp when the check result was created + */ void GraphiteWriter::SendPerfdata(const Checkable::Ptr& checkable, const String& prefix, const CheckResult::Ptr& cr, double ts) { Array::Ptr perfdata = cr->GetPerformanceData(); @@ -306,8 +372,19 @@ void GraphiteWriter::SendPerfdata(const Checkable::Ptr& checkable, const String& } } +/** + * Computes metric data and sends to Graphite + * + * @param checkable Host/service object + * @param prefix Computed metric prefix string + * @param name Metric name + * @param value Metric value + * @param ts Timestamp when the check result was created + */ void GraphiteWriter::SendMetric(const Checkable::Ptr& checkable, const String& prefix, const String& name, double value, double ts) { + namespace asio = boost::asio; + std::ostringstream msgbuf; msgbuf << prefix << "." << name << " " << Convert::ToString(value) << " " << static_cast(ts); @@ -316,7 +393,6 @@ void GraphiteWriter::SendMetric(const Checkable::Ptr& checkable, const String& p // do not send \n to debug log msgbuf << "\n"; - String metric = msgbuf.str(); boost::mutex::scoped_lock lock(m_StreamMutex); @@ -324,7 +400,8 @@ void GraphiteWriter::SendMetric(const Checkable::Ptr& checkable, const String& p return; try { - m_Stream->Write(metric.CStr(), metric.GetLength()); + asio::write(*m_Stream, asio::buffer(msgbuf.str())); + m_Stream->flush(); } catch (const std::exception& ex) { Log(LogCritical, "GraphiteWriter") << "Cannot write to TCP socket on host '" << GetHost() << "' port '" << GetPort() << "'."; @@ -333,6 +410,14 @@ void GraphiteWriter::SendMetric(const Checkable::Ptr& checkable, const String& p } } +/** + * Escape metric tree elements + * + * Dots are not allowed, e.g. in host names + * + * @param str Metric part name + * @return Escape string + */ String GraphiteWriter::EscapeMetric(const String& str) { String result = str; @@ -346,6 +431,14 @@ String GraphiteWriter::EscapeMetric(const String& str) return result; } +/** + * Escape metric label + * + * Dots are allowed - users can create trees from perfdata labels + * + * @param str Metric label name + * @return Escaped string + */ String GraphiteWriter::EscapeMetricLabel(const String& str) { String result = str; @@ -359,6 +452,12 @@ String GraphiteWriter::EscapeMetricLabel(const String& str) return result; } +/** + * Escape macro metrics found via host/service name templates + * + * @param value Array or string with macro metric names + * @return Escaped string. Arrays are joined with dots. + */ Value GraphiteWriter::EscapeMacroMetric(const Value& value) { if (value.IsObjectType()) { @@ -375,6 +474,12 @@ Value GraphiteWriter::EscapeMacroMetric(const Value& value) return EscapeMetric(value); } +/** + * Validate the configuration setting 'host_name_template' + * + * @param lvalue String containing runtime macros. + * @param utils Helper, unused + */ void GraphiteWriter::ValidateHostNameTemplate(const Lazy& lvalue, const ValidationUtils& utils) { ObjectImpl::ValidateHostNameTemplate(lvalue, utils); @@ -383,6 +488,12 @@ void GraphiteWriter::ValidateHostNameTemplate(const Lazy& lvalue, const BOOST_THROW_EXCEPTION(ValidationError(this, { "host_name_template" }, "Closing $ not found in macro format string '" + lvalue() + "'.")); } +/** + * Validate the configuration setting 'service_name_template' + * + * @param lvalue String containing runtime macros. + * @param utils Helper, unused + */ void GraphiteWriter::ValidateServiceNameTemplate(const Lazy& lvalue, const ValidationUtils& utils) { ObjectImpl::ValidateServiceNameTemplate(lvalue, utils); diff --git a/lib/perfdata/graphitewriter.hpp b/lib/perfdata/graphitewriter.hpp index 2756c0fd4..42f741cee 100644 --- a/lib/perfdata/graphitewriter.hpp +++ b/lib/perfdata/graphitewriter.hpp @@ -37,7 +37,7 @@ protected: void Pause() override; private: - Stream::Ptr m_Stream; + std::shared_ptr m_Stream; boost::mutex m_StreamMutex; WorkQueue m_WorkQueue{10000000, 1};