]> granicus.if.org Git - icinga2/commitdiff
Quality: Rewrite OpenTSDB to use Boost ASIO and I/O engine
authorMichael Friedrich <michael.friedrich@icinga.com>
Mon, 27 May 2019 13:09:26 +0000 (15:09 +0200)
committerMichael Friedrich <michael.friedrich@icinga.com>
Mon, 27 May 2019 13:09:26 +0000 (15:09 +0200)
The connection handling and code isn't really good, but not
really actively maintained either.

Besides that, the "telnet" method doesn't allow for TLS,
this needs a general rewrite against their HTTP API.

I've also added function documentation where applicable.

lib/perfdata/opentsdbwriter.cpp
lib/perfdata/opentsdbwriter.hpp
lib/perfdata/opentsdbwriter.ti

index 013d511486d6b93432563ff8d42812e6464f11ff..0e54b28467c2653e2d0e9c4067c7edf26283e8f5 100644 (file)
@@ -28,6 +28,9 @@ REGISTER_TYPE(OpenTsdbWriter);
 
 REGISTER_STATSFUNCTION(OpenTsdbWriter, &OpenTsdbWriter::StatsFunc);
 
+/*
+ * Enable HA capabilities once the config object is loaded.
+ */
 void OpenTsdbWriter::OnConfigLoaded()
 {
        ObjectImpl<OpenTsdbWriter>::OnConfigLoaded();
@@ -42,17 +45,27 @@ void OpenTsdbWriter::OnConfigLoaded()
        }
 }
 
+/**
+ * Feature stats interface
+ *
+ * @param status Key value pairs for feature stats
+ */
 void OpenTsdbWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr&)
 {
        DictionaryData nodes;
 
        for (const OpenTsdbWriter::Ptr& opentsdbwriter : ConfigType::GetObjectsByType<OpenTsdbWriter>()) {
-               nodes.emplace_back(opentsdbwriter->GetName(), 1); //add more stats
+               nodes.emplace_back(opentsdbwriter->GetName(), new Dictionary({
+                       { "connected", opentsdbwriter->GetConnected() }
+               }));
        }
 
        status->Set("opentsdbwriter", new Dictionary(std::move(nodes)));
 }
 
+/**
+ * Resume is equivalent to Start, but with HA capabilities to resume at runtime.
+ */
 void OpenTsdbWriter::Resume()
 {
        ObjectImpl<OpenTsdbWriter>::Resume();
@@ -69,7 +82,9 @@ void OpenTsdbWriter::Resume()
        Service::OnNewCheckResult.connect(std::bind(&OpenTsdbWriter::CheckResultHandler, this, _1, _2));
 }
 
-/* Pause is equivalent to Stop, but with HA capabilities to resume at runtime. */
+/**
+ * Pause is equivalent to Stop, but with HA capabilities to resume at runtime.
+ */
 void OpenTsdbWriter::Pause()
 {
        m_ReconnectTimer.reset();
@@ -77,33 +92,54 @@ void OpenTsdbWriter::Pause()
        Log(LogInformation, "OpentsdbWriter")
                << "'" << GetName() << "' paused.";
 
+       m_Stream->close();
+
+       SetConnected(false);
+
        ObjectImpl<OpenTsdbWriter>::Pause();
 }
 
+/**
+ * Reconnect handler called by the timer.
+ * Handles TLS
+ */
 void OpenTsdbWriter::ReconnectTimerHandler()
 {
        if (IsPaused())
                return;
 
-       if (m_Stream)
-               return;
+       SetShouldConnect(true);
 
-       TcpSocket::Ptr socket = new TcpSocket();
+       if (GetConnected())
+               return;
 
        Log(LogNotice, "OpenTsdbWriter")
                << "Reconnect to OpenTSDB TSD on host '" << GetHost() << "' port '" << GetPort() << "'.";
 
+       /*
+        * We're using telnet as input method. Future PRs may change this into using the HTTP API.
+        * http://opentsdb.net/docs/build/html/user_guide/writing/index.html#telnet
+        */
+
+       m_Stream = std::make_shared<AsioTcpStream>(IoEngine::Get().GetIoService());
+
        try {
-               socket->Connect(GetHost(), GetPort());
-       } catch (std::exception&) {
-               Log(LogCritical, "OpenTsdbWriter")
-                       << "Can't connect to OpenTSDB TSD on host '" << GetHost() << "' port '" << GetPort() << "'.";
-               return;
+               icinga::Connect(m_Stream->lowest_layer(), GetHost(), GetPort());
+       } catch (const std::exception& ex) {
+               Log(LogWarning, "OpenTsdbWriter")
+                       << "Can't connect to OpenTSDB on host '" << GetHost() << "' port '" << GetPort() << ".'";
        }
 
-       m_Stream = new NetworkStream(socket);
+       SetConnected(true);
 }
 
+/**
+ * Registered check result handler processing data.
+ * Calculates tags from the config.
+ *
+ * @param checkable Host/service object
+ * @param cr Check result
+ */
 void OpenTsdbWriter::CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
 {
        if (IsPaused())
@@ -165,6 +201,15 @@ void OpenTsdbWriter::CheckResultHandler(const Checkable::Ptr& checkable, const C
        SendMetric(checkable, metric + ".execution_time", tags, cr->CalculateExecutionTime(), ts);
 }
 
+/**
+ * Parse and send performance data metrics to OpenTSDB
+ *
+ * @param checkable Host/service object
+ * @param metric Full metric name
+ * @param tags Tag key pairs
+ * @param cr Check result containing performance data
+ * @param ts Timestamp when the check result was received
+ */
 void OpenTsdbWriter::SendPerfdata(const Checkable::Ptr& checkable, const String& metric,
        const std::map<String, String>& tags, const CheckResult::Ptr& cr, double ts)
 {
@@ -209,6 +254,15 @@ void OpenTsdbWriter::SendPerfdata(const Checkable::Ptr& checkable, const String&
        }
 }
 
+/**
+ * Send given metric to OpenTSDB
+ *
+ * @param checkable Host/service object
+ * @param metric Full metric name
+ * @param tags Tag key pairs
+ * @param value Floating point metric value
+ * @param ts Timestamp where the metric was received from the check result
+ */
 void OpenTsdbWriter::SendMetric(const Checkable::Ptr& checkable, const String& metric,
        const std::map<String, String>& tags, double value, double ts)
 {
@@ -220,7 +274,7 @@ void OpenTsdbWriter::SendMetric(const Checkable::Ptr& checkable, const String& m
 
        std::ostringstream msgbuf;
        /*
-        * must be (http://opentsdb.net/docs/build/html/user_guide/writing.html)
+        * must be (http://opentsdb.net/docs/build/html/user_guide/query/timeseries.html)
         * put <metric> <timestamp> <value> <tagk1=tagv1[ tagk2=tagv2 ...tagkN=tagvN]>
         * "tags" must include at least one tag, we use "host=HOSTNAME"
         */
@@ -235,21 +289,27 @@ void OpenTsdbWriter::SendMetric(const Checkable::Ptr& checkable, const String& m
 
        ObjectLock olock(this);
 
-       if (!m_Stream)
+       if (!GetConnected())
                return;
 
        try {
-               m_Stream->Write(put.CStr(), put.GetLength());
+               Log(LogDebug, "OpenTsdbWriter")
+                       << "Checkable '" << checkable->GetName() << "' sending message '" << put << "'.";
+
+               boost::asio::write(*m_Stream, boost::asio::buffer(msgbuf.str()));
+               m_Stream->flush();
        } catch (const std::exception& ex) {
                Log(LogCritical, "OpenTsdbWriter")
-                       << "Cannot write to OpenTSDB TSD on host '" << GetHost() << "' port '" << GetPort() + "'.";
-
-               m_Stream.reset();
+                       << "Cannot write to TCP socket on host '" << GetHost() << "' port '" << GetPort() << "'.";
        }
 }
 
-/* for metric and tag name rules, see
- * http://opentsdb.net/docs/build/html/user_guide/writing.html#metrics-and-tags
+/**
+ * Escape tags for OpenTSDB
+ * http://opentsdb.net/docs/build/html/user_guide/query/timeseries.html#precisions-on-metrics-and-tags
+ *
+ * @param str Tag name
+ * @return Escaped tag
  */
 String OpenTsdbWriter::EscapeTag(const String& str)
 {
@@ -261,6 +321,13 @@ String OpenTsdbWriter::EscapeTag(const String& str)
        return result;
 }
 
+/**
+ * Escape metric name for OpenTSDB
+ * http://opentsdb.net/docs/build/html/user_guide/query/timeseries.html#precisions-on-metrics-and-tags
+ *
+ * @param str Metric name
+ * @return Escaped metric
+ */
 String OpenTsdbWriter::EscapeMetric(const String& str)
 {
        String result = str;
@@ -271,4 +338,4 @@ String OpenTsdbWriter::EscapeMetric(const String& str)
        boost::replace_all(result, ":", "_");
 
        return result;
-}
+}
\ No newline at end of file
index 792377556645cfae5ce4996c1ed267d6348f98d6..bf0fc8eb197acafa2158142fbd81c1e054fbc222 100644 (file)
@@ -32,7 +32,7 @@ protected:
        void Pause() override;
 
 private:
-       Stream::Ptr m_Stream;
+       std::shared_ptr<AsioTcpStream> m_Stream;
 
        Timer::Ptr m_ReconnectTimer;
 
index bce6a11b5d70f8f0858aae201089b9f243f6bd32..de19a1eace772f53cc75995294637b58eb56e3cb 100644 (file)
@@ -20,6 +20,11 @@ class OpenTsdbWriter : ConfigObject
        [config] bool enable_ha {
                default {{{ return false; }}}
        };
+
+       [no_user_modify] bool connected;
+       [no_user_modify] bool should_connect {
+               default {{{ return true; }}}
+       };
 };
 
 }