From 0466316019cf3795be776575928830242c0b8524 Mon Sep 17 00:00:00 2001 From: Michael Friedrich Date: Mon, 27 May 2019 15:09:26 +0200 Subject: [PATCH] Quality: Rewrite OpenTSDB to use Boost ASIO and I/O engine The connection handling and code isn't really good, but not really actively maintained either. Besides that, the "telnet" method doesn't allow for TLS, this needs a general rewrite against their HTTP API. I've also added function documentation where applicable. --- lib/perfdata/opentsdbwriter.cpp | 107 ++++++++++++++++++++++++++------ lib/perfdata/opentsdbwriter.hpp | 2 +- lib/perfdata/opentsdbwriter.ti | 5 ++ 3 files changed, 93 insertions(+), 21 deletions(-) diff --git a/lib/perfdata/opentsdbwriter.cpp b/lib/perfdata/opentsdbwriter.cpp index 013d51148..0e54b2846 100644 --- a/lib/perfdata/opentsdbwriter.cpp +++ b/lib/perfdata/opentsdbwriter.cpp @@ -28,6 +28,9 @@ REGISTER_TYPE(OpenTsdbWriter); REGISTER_STATSFUNCTION(OpenTsdbWriter, &OpenTsdbWriter::StatsFunc); +/* + * Enable HA capabilities once the config object is loaded. + */ void OpenTsdbWriter::OnConfigLoaded() { ObjectImpl::OnConfigLoaded(); @@ -42,17 +45,27 @@ void OpenTsdbWriter::OnConfigLoaded() } } +/** + * Feature stats interface + * + * @param status Key value pairs for feature stats + */ void OpenTsdbWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr&) { DictionaryData nodes; for (const OpenTsdbWriter::Ptr& opentsdbwriter : ConfigType::GetObjectsByType()) { - nodes.emplace_back(opentsdbwriter->GetName(), 1); //add more stats + nodes.emplace_back(opentsdbwriter->GetName(), new Dictionary({ + { "connected", opentsdbwriter->GetConnected() } + })); } status->Set("opentsdbwriter", new Dictionary(std::move(nodes))); } +/** + * Resume is equivalent to Start, but with HA capabilities to resume at runtime. + */ void OpenTsdbWriter::Resume() { ObjectImpl::Resume(); @@ -69,7 +82,9 @@ void OpenTsdbWriter::Resume() Service::OnNewCheckResult.connect(std::bind(&OpenTsdbWriter::CheckResultHandler, this, _1, _2)); } -/* Pause is equivalent to Stop, but with HA capabilities to resume at runtime. */ +/** + * Pause is equivalent to Stop, but with HA capabilities to resume at runtime. + */ void OpenTsdbWriter::Pause() { m_ReconnectTimer.reset(); @@ -77,33 +92,54 @@ void OpenTsdbWriter::Pause() Log(LogInformation, "OpentsdbWriter") << "'" << GetName() << "' paused."; + m_Stream->close(); + + SetConnected(false); + ObjectImpl::Pause(); } +/** + * Reconnect handler called by the timer. + * Handles TLS + */ void OpenTsdbWriter::ReconnectTimerHandler() { if (IsPaused()) return; - if (m_Stream) - return; + SetShouldConnect(true); - TcpSocket::Ptr socket = new TcpSocket(); + if (GetConnected()) + return; Log(LogNotice, "OpenTsdbWriter") << "Reconnect to OpenTSDB TSD on host '" << GetHost() << "' port '" << GetPort() << "'."; + /* + * We're using telnet as input method. Future PRs may change this into using the HTTP API. + * http://opentsdb.net/docs/build/html/user_guide/writing/index.html#telnet + */ + + m_Stream = std::make_shared(IoEngine::Get().GetIoService()); + try { - socket->Connect(GetHost(), GetPort()); - } catch (std::exception&) { - Log(LogCritical, "OpenTsdbWriter") - << "Can't connect to OpenTSDB TSD on host '" << GetHost() << "' port '" << GetPort() << "'."; - return; + icinga::Connect(m_Stream->lowest_layer(), GetHost(), GetPort()); + } catch (const std::exception& ex) { + Log(LogWarning, "OpenTsdbWriter") + << "Can't connect to OpenTSDB on host '" << GetHost() << "' port '" << GetPort() << ".'"; } - m_Stream = new NetworkStream(socket); + SetConnected(true); } +/** + * Registered check result handler processing data. + * Calculates tags from the config. + * + * @param checkable Host/service object + * @param cr Check result + */ void OpenTsdbWriter::CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr) { if (IsPaused()) @@ -165,6 +201,15 @@ void OpenTsdbWriter::CheckResultHandler(const Checkable::Ptr& checkable, const C SendMetric(checkable, metric + ".execution_time", tags, cr->CalculateExecutionTime(), ts); } +/** + * Parse and send performance data metrics to OpenTSDB + * + * @param checkable Host/service object + * @param metric Full metric name + * @param tags Tag key pairs + * @param cr Check result containing performance data + * @param ts Timestamp when the check result was received + */ void OpenTsdbWriter::SendPerfdata(const Checkable::Ptr& checkable, const String& metric, const std::map& tags, const CheckResult::Ptr& cr, double ts) { @@ -209,6 +254,15 @@ void OpenTsdbWriter::SendPerfdata(const Checkable::Ptr& checkable, const String& } } +/** + * Send given metric to OpenTSDB + * + * @param checkable Host/service object + * @param metric Full metric name + * @param tags Tag key pairs + * @param value Floating point metric value + * @param ts Timestamp where the metric was received from the check result + */ void OpenTsdbWriter::SendMetric(const Checkable::Ptr& checkable, const String& metric, const std::map& tags, double value, double ts) { @@ -220,7 +274,7 @@ void OpenTsdbWriter::SendMetric(const Checkable::Ptr& checkable, const String& m std::ostringstream msgbuf; /* - * must be (http://opentsdb.net/docs/build/html/user_guide/writing.html) + * must be (http://opentsdb.net/docs/build/html/user_guide/query/timeseries.html) * put * "tags" must include at least one tag, we use "host=HOSTNAME" */ @@ -235,21 +289,27 @@ void OpenTsdbWriter::SendMetric(const Checkable::Ptr& checkable, const String& m ObjectLock olock(this); - if (!m_Stream) + if (!GetConnected()) return; try { - m_Stream->Write(put.CStr(), put.GetLength()); + Log(LogDebug, "OpenTsdbWriter") + << "Checkable '" << checkable->GetName() << "' sending message '" << put << "'."; + + boost::asio::write(*m_Stream, boost::asio::buffer(msgbuf.str())); + m_Stream->flush(); } catch (const std::exception& ex) { Log(LogCritical, "OpenTsdbWriter") - << "Cannot write to OpenTSDB TSD on host '" << GetHost() << "' port '" << GetPort() + "'."; - - m_Stream.reset(); + << "Cannot write to TCP socket on host '" << GetHost() << "' port '" << GetPort() << "'."; } } -/* for metric and tag name rules, see - * http://opentsdb.net/docs/build/html/user_guide/writing.html#metrics-and-tags +/** + * Escape tags for OpenTSDB + * http://opentsdb.net/docs/build/html/user_guide/query/timeseries.html#precisions-on-metrics-and-tags + * + * @param str Tag name + * @return Escaped tag */ String OpenTsdbWriter::EscapeTag(const String& str) { @@ -261,6 +321,13 @@ String OpenTsdbWriter::EscapeTag(const String& str) return result; } +/** + * Escape metric name for OpenTSDB + * http://opentsdb.net/docs/build/html/user_guide/query/timeseries.html#precisions-on-metrics-and-tags + * + * @param str Metric name + * @return Escaped metric + */ String OpenTsdbWriter::EscapeMetric(const String& str) { String result = str; @@ -271,4 +338,4 @@ String OpenTsdbWriter::EscapeMetric(const String& str) boost::replace_all(result, ":", "_"); return result; -} +} \ No newline at end of file diff --git a/lib/perfdata/opentsdbwriter.hpp b/lib/perfdata/opentsdbwriter.hpp index 792377556..bf0fc8eb1 100644 --- a/lib/perfdata/opentsdbwriter.hpp +++ b/lib/perfdata/opentsdbwriter.hpp @@ -32,7 +32,7 @@ protected: void Pause() override; private: - Stream::Ptr m_Stream; + std::shared_ptr m_Stream; Timer::Ptr m_ReconnectTimer; diff --git a/lib/perfdata/opentsdbwriter.ti b/lib/perfdata/opentsdbwriter.ti index bce6a11b5..de19a1eac 100644 --- a/lib/perfdata/opentsdbwriter.ti +++ b/lib/perfdata/opentsdbwriter.ti @@ -20,6 +20,11 @@ class OpenTsdbWriter : ConfigObject [config] bool enable_ha { default {{{ return false; }}} }; + + [no_user_modify] bool connected; + [no_user_modify] bool should_connect { + default {{{ return true; }}} + }; }; } -- 2.40.0