]> granicus.if.org Git - icinga2/commitdiff
InfluxdbWriter: Use a work queue for async message processing; add stats log/api 5232/head
authorMichael Friedrich <michael.friedrich@icinga.com>
Thu, 4 May 2017 08:29:49 +0000 (10:29 +0200)
committerMichael Friedrich <michael.friedrich@icinga.com>
Fri, 5 May 2017 15:56:51 +0000 (17:56 +0200)
doc/9-object-types.md
etc/icinga2/features-available/influxdb.conf
lib/perfdata/influxdbwriter.cpp
lib/perfdata/influxdbwriter.hpp

index f16d09fabed66aef812c27b528c26490d7e338c9..73ab4fdd45087603e53a72aa7f5a02c9ffafc45d 100644 (file)
@@ -886,6 +886,10 @@ Example:
       host = "127.0.0.1"
       port = 8086
       database = "icinga2"
+
+      flush_threshold = 1024
+      flush_interval = 10s
+
       host_template = {
         measurement = "$host.check_command$"
         tags = {
@@ -935,6 +939,10 @@ Configuration Attributes:
   flush_threshold        | **Optional.** How many data points to buffer before forcing a transfer to InfluxDB.  Defaults to `1024`.
   socket_timeout         | **Optional.** How long to wait for InfluxDB to respond.  Defaults to `5s`.
 
+Note: If `flush_threshold` is set too low, this will always force the feature to flush all data
+to InfluxDB. Experiment with the setting, if you are processing more than 1024 metrics per second
+or similar.
+
 ### <a id="objecttype-influxdbwriter-instance-tags"></a> Instance Tagging
 
 Consider the following service check:
index 058568bf1a777554715d6fcf97004df6b58b3d61..20f9ed253b0e36367b31122b31dd54a215f194ca 100644 (file)
@@ -9,6 +9,8 @@ object InfluxdbWriter "influxdb" {
   //host = "127.0.0.1"
   //port = 8086
   //database = "icinga2"
+  //flush_threshold = 1024
+  //flush_interval = 10s
   //host_template = {
   //  measurement = "$host.check_command$"
   //  tags = {
index 198d0658f3e6611ba862015603ac1bf14cdca1fe..a8e63d23516b6332e33cb84718aef0b6ec07d3d6 100644 (file)
@@ -25,7 +25,6 @@
 #include "icinga/service.hpp"
 #include "icinga/macroprocessor.hpp"
 #include "icinga/icingaapplication.hpp"
-#include "icinga/compatutility.hpp"
 #include "icinga/perfdatavalue.hpp"
 #include "icinga/checkcommand.hpp"
 #include "base/tcpsocket.hpp"
@@ -34,7 +33,6 @@
 #include "base/logger.hpp"
 #include "base/convert.hpp"
 #include "base/utility.hpp"
-#include "base/application.hpp"
 #include "base/stream.hpp"
 #include "base/networkstream.hpp"
 #include "base/exception.hpp"
@@ -52,12 +50,32 @@ REGISTER_TYPE(InfluxdbWriter);
 
 REGISTER_STATSFUNCTION(InfluxdbWriter, &InfluxdbWriter::StatsFunc);
 
+//TODO: Evaluate whether multiple WQ threads and InfluxDB connections are possible. 10 threads will hog InfluxDB in large scale environments.
+InfluxdbWriter::InfluxdbWriter(void)
+    : m_WorkQueue(10000000, 1), m_TaskStats(15 * 60), m_PendingTasks(0), m_PendingTasksTimestamp(0)
+{ }
+
+void InfluxdbWriter::OnConfigLoaded(void)
+{
+       ObjectImpl<InfluxdbWriter>::OnConfigLoaded();
+
+       m_WorkQueue.SetName("InfluxdbWriter, " + GetName());
+}
+
 void InfluxdbWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr&)
 {
        Dictionary::Ptr nodes = new Dictionary();
 
        for (const InfluxdbWriter::Ptr& influxdbwriter : ConfigType::GetObjectsByType<InfluxdbWriter>()) {
-               nodes->Set(influxdbwriter->GetName(), 1); //add more stats
+               size_t workQueueItems = influxdbwriter->m_WorkQueue.GetLength();
+               size_t dataBufferItems = influxdbwriter->m_DataBuffer.size();
+
+               //TODO: Collect more stats
+               Dictionary::Ptr stats = new Dictionary();
+               stats->Set("work_queue_items", workQueueItems);
+               stats->Set("data_buffer_items", dataBufferItems);
+
+               nodes->Set(influxdbwriter->GetName(), stats);
        }
 
        status->Set("influxdbwriter", nodes);
@@ -65,19 +83,28 @@ void InfluxdbWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr&)
 
 void InfluxdbWriter::Start(bool runtimeCreated)
 {
-       m_DataBuffer = new Array();
-
        ObjectImpl<InfluxdbWriter>::Start(runtimeCreated);
 
        Log(LogInformation, "InfluxdbWriter")
            << "'" << GetName() << "' started.";
 
+       /* Register exception handler for WQ tasks. */
+       m_WorkQueue.SetExceptionCallback(boost::bind(&InfluxdbWriter::ExceptionHandler, this, _1));
+
+       /* Setup timer for periodically flushing m_DataBuffer */
        m_FlushTimer = new Timer();
        m_FlushTimer->SetInterval(GetFlushInterval());
        m_FlushTimer->OnTimerExpired.connect(boost::bind(&InfluxdbWriter::FlushTimeout, this));
        m_FlushTimer->Start();
        m_FlushTimer->Reschedule(0);
 
+       /* Timer for updating and logging work queue stats */
+       m_StatsLoggerTimer = new Timer();
+       m_StatsLoggerTimer->SetInterval(60); // don't be too noisy
+       m_StatsLoggerTimer->OnTimerExpired.connect(boost::bind(&InfluxdbWriter::StatsLoggerTimerHandler, this));
+       m_StatsLoggerTimer->Start();
+
+       /* Register for new metrics. */
        Service::OnNewCheckResult.connect(boost::bind(&InfluxdbWriter::CheckResultHandler, this, _1, _2));
 }
 
@@ -86,9 +113,54 @@ void InfluxdbWriter::Stop(bool runtimeRemoved)
        Log(LogInformation, "InfluxdbWriter")
            << "'" << GetName() << "' stopped.";
 
+       m_WorkQueue.Join();
+
        ObjectImpl<InfluxdbWriter>::Stop(runtimeRemoved);
 }
 
+void InfluxdbWriter::AssertOnWorkQueue(void)
+{
+       ASSERT(m_WorkQueue.IsWorkerThread());
+}
+
+void InfluxdbWriter::ExceptionHandler(boost::exception_ptr exp)
+{
+       Log(LogCritical, "InfluxdbWriter", "Exception during InfluxDB operation: Verify that your backend is operational!");
+
+       Log(LogDebug, "InfluxdbWriter")
+           << "Exception during InfluxDB operation: " << DiagnosticInformation(exp);
+
+       //TODO: Close the connection, if we keep it open.
+}
+
+void InfluxdbWriter::StatsLoggerTimerHandler(void)
+{
+       int pending = m_WorkQueue.GetLength();
+
+       double now = Utility::GetTime();
+       double gradient = (pending - m_PendingTasks) / (now - m_PendingTasksTimestamp);
+       double timeToZero = pending / gradient;
+
+       String timeInfo;
+
+       if (pending > GetTaskCount(5)) {
+               timeInfo = " empty in ";
+               if (timeToZero < 0)
+                       timeInfo += "infinite time, your backend isn't able to keep up";
+               else
+                       timeInfo += Utility::FormatDuration(timeToZero);
+       }
+
+       m_PendingTasks = pending;
+       m_PendingTasksTimestamp = now;
+
+       Log(LogInformation, "InfluxdbWriter")
+           << "Work queue items: " << pending
+           << ", rate: " << std::setw(2) << GetTaskCount(60) / 60.0 << "/s"
+           << " (" << GetTaskCount(60) << "/min " << GetTaskCount(60 * 5) << "/5min " << GetTaskCount(60 * 15) << "/15min);"
+           << timeInfo;
+}
+
 Stream::Ptr InfluxdbWriter::Connect(TcpSocket::Ptr& socket)
 {
        socket = new TcpSocket();
@@ -98,32 +170,32 @@ Stream::Ptr InfluxdbWriter::Connect(TcpSocket::Ptr& socket)
 
        try {
                socket->Connect(GetHost(), GetPort());
-       } catch (std::exception&) {
+       } catch (const std::exception& ex) {
                Log(LogWarning, "InfluxdbWriter")
                    << "Can't connect to InfluxDB on host '" << GetHost() << "' port '" << GetPort() << "'.";
-               return Stream::Ptr();
+               throw ex;
        }
 
        if (GetSslEnable()) {
-               boost::shared_ptr<SSL_CTX> ssl_context;
+               boost::shared_ptr<SSL_CTX> sslContext;
                try {
-                       ssl_context = MakeSSLContext(GetSslCert(), GetSslKey(), GetSslCaCert());
-               } catch (std::exception&) {
+                       sslContext = MakeSSLContext(GetSslCert(), GetSslKey(), GetSslCaCert());
+               } catch (const std::exception& ex) {
                        Log(LogWarning, "InfluxdbWriter")
                            << "Unable to create SSL context.";
-                       return Stream::Ptr();
+                       throw ex;
                }
 
-               TlsStream::Ptr tls_stream = new TlsStream(socket, GetHost(), RoleClient, ssl_context);
+               TlsStream::Ptr tlsStream = new TlsStream(socket, GetHost(), RoleClient, sslContext);
                try {
-                       tls_stream->Handshake();
-               } catch (std::exception&) {
+                       tlsStream->Handshake();
+               } catch (const std::exception& ex) {
                        Log(LogWarning, "InfluxdbWriter")
                            << "TLS handshake with host '" << GetHost() << "' failed.";
-                       return Stream::Ptr();
+                       throw ex;
                }
 
-               return tls_stream;
+               return tlsStream;
        } else {
                return new NetworkStream(socket);
        }
@@ -176,7 +248,7 @@ String InfluxdbWriter::FormatInteger(const int val)
 
 String InfluxdbWriter::FormatBoolean(const bool val)
 {
-       return val ? "true" : "false";
+       return String(val);
 }
 
 void InfluxdbWriter::SendPerfdata(const Dictionary::Ptr& tmpl, const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, double ts)
@@ -265,6 +337,8 @@ String InfluxdbWriter::EscapeKey(const String& str)
 
 String InfluxdbWriter::EscapeField(const String& str)
 {
+       //TODO: Evaluate whether boost::regex is really needed here.
+
        // Handle integers
        boost::regex integer("-?\\d+i");
        if (boost::regex_match(str.GetData(), integer)) {
@@ -313,29 +387,33 @@ void InfluxdbWriter::SendMetric(const Dictionary::Ptr& tmpl, const String& label
 
        msgbuf << " ";
 
-       bool first = true;
-       ObjectLock fieldLock(fields);
-       for (const Dictionary::Pair& pair : fields) {
-               if (first)
-                       first = false;
-               else
-                       msgbuf << ",";
-               msgbuf << EscapeKey(pair.first) << "=" << EscapeField(pair.second);
+       {
+               bool first = true;
+
+               ObjectLock fieldLock(fields);
+               for (const Dictionary::Pair& pair : fields) {
+                       if (first)
+                               first = false;
+                       else
+                               msgbuf << ",";
+
+                       msgbuf << EscapeKey(pair.first) << "=" << EscapeField(pair.second);
+               }
        }
 
        msgbuf << " " <<  static_cast<unsigned long>(ts);
 
        Log(LogDebug, "InfluxdbWriter")
-           << "Add to metric list:'" << msgbuf.str() << "'.";
+           << "Add to metric list: '" << msgbuf.str() << "'.";
 
        // Atomically buffer the data point
-       ObjectLock olock(m_DataBuffer);
-       m_DataBuffer->Add(String(msgbuf.str()));
+       boost::mutex::scoped_lock lock(m_DataBufferMutex);
+       m_DataBuffer.push_back(String(msgbuf.str()));
 
        // Flush if we've buffered too much to prevent excessive memory use
-       if (static_cast<int>(m_DataBuffer->GetLength()) >= GetFlushThreshold()) {
+       if (m_DataBuffer.size() >= GetFlushThreshold()) {
                Log(LogDebug, "InfluxdbWriter")
-                   << "Data buffer overflow writing " << m_DataBuffer->GetLength() << " data points";
+                   << "Data buffer overflow writing " << m_DataBuffer.size() << " data points";
                Flush();
        }
 }
@@ -344,27 +422,38 @@ void InfluxdbWriter::FlushTimeout(void)
 {
        // Prevent new data points from being added to the array, there is a
        // race condition where they could disappear
-       ObjectLock olock(m_DataBuffer);
+       boost::mutex::scoped_lock lock(m_DataBufferMutex);
 
        // Flush if there are any data available
-       if (m_DataBuffer->GetLength() > 0) {
+       if (m_DataBuffer.size() > 0) {
                Log(LogDebug, "InfluxdbWriter")
-                   << "Timer expired writing " << m_DataBuffer->GetLength() << " data points";
+                   << "Timer expired writing " << m_DataBuffer.size() << " data points";
                Flush();
        }
 }
 
 void InfluxdbWriter::Flush(void)
 {
+       // Ensure you hold a lock against m_DataBuffer so that things
+       // don't go missing after creating the body and clearing the buffer
+       String body = boost::algorithm::join(m_DataBuffer, "\n");
+       m_DataBuffer.clear();
+
+       // Asynchronously flush the metric body to InfluxDB
+       m_WorkQueue.Enqueue(boost::bind(&InfluxdbWriter::FlushHandler, this, body));
+}
+
+void InfluxdbWriter::FlushHandler(const String& body)
+{
+       AssertOnWorkQueue();
+
        TcpSocket::Ptr socket;
        Stream::Ptr stream = Connect(socket);
 
-       // Unable to connect, play it safe and lose the data points
-       // to avoid a memory leak
-       if (!stream.get()) {
-               m_DataBuffer->Clear();
+       if (!stream)
                return;
-       }
+
+       IncreaseTaskCount();
 
        Url::Ptr url = new Url();
        url->SetScheme(GetSslEnable() ? "https" : "http");
@@ -382,11 +471,6 @@ void InfluxdbWriter::Flush(void)
        if (!GetPassword().IsEmpty())
                url->AddQueryElement("p", GetPassword());
 
-       // Ensure you hold a lock against m_DataBuffer so that things
-       // don't go missing after creating the body and clearing the buffer
-       String body = Utility::Join(m_DataBuffer, '\n', false);
-       m_DataBuffer->Clear();
-
        HttpRequest req(stream);
        req.RequestMethod = "POST";
        req.RequestUrl = url;
@@ -394,16 +478,18 @@ void InfluxdbWriter::Flush(void)
        try {
                req.WriteBody(body.CStr(), body.GetLength());
                req.Finish();
-       } catch (const std::exception&) {
+       } catch (const std::exception& ex) {
                Log(LogWarning, "InfluxdbWriter")
                    << "Cannot write to TCP socket on host '" << GetHost() << "' port '" << GetPort() << "'.";
-               return;
+               throw ex;
        }
 
+       //TODO: Evaluate whether waiting for the result makes sense here. KeepAlive and close are options.
        HttpResponse resp(stream, req);
        StreamReadContext context;
 
        struct timeval timeout = { GetSocketTimeout(), 0 };
+
        if (!socket->Poll(true, false, &timeout)) {
                Log(LogWarning, "InfluxdbWriter")
                    << "Response timeout of TCP socket from host '" << GetHost() << "' port '" << GetPort() << "'.";
@@ -412,10 +498,10 @@ void InfluxdbWriter::Flush(void)
 
        try {
                resp.Parse(context, true);
-       } catch (const std::exception&) {
+       } catch (const std::exception& ex) {
                Log(LogWarning, "InfluxdbWriter")
                    << "Cannot read from TCP socket from host '" << GetHost() << "' port '" << GetPort() << "'.";
-               return;
+               throw ex;
        }
 
        if (resp.StatusCode != 204) {
@@ -424,6 +510,20 @@ void InfluxdbWriter::Flush(void)
        }
 }
 
+void InfluxdbWriter::IncreaseTaskCount(void)
+{
+       double now = Utility::GetTime();
+
+       boost::mutex::scoped_lock lock(m_StatsMutex);
+       m_TaskStats.InsertValue(now, 1);
+}
+
+int InfluxdbWriter::GetTaskCount(RingBuffer::SizeType span) const
+{
+       boost::mutex::scoped_lock lock(m_StatsMutex);
+       return m_TaskStats.GetValues(span);
+}
+
 void InfluxdbWriter::ValidateHostTemplate(const Dictionary::Ptr& value, const ValidationUtils& utils)
 {
        ObjectImpl<InfluxdbWriter>::ValidateHostTemplate(value, utils);
index d7fbe6e18e52941c811348632380c8552c401e22..3b5d183890afc16b3fe7f8dc5843a04619da996a 100644 (file)
@@ -25,6 +25,9 @@
 #include "base/configobject.hpp"
 #include "base/tcpsocket.hpp"
 #include "base/timer.hpp"
+#include "base/ringbuffer.hpp"
+#include "base/workqueue.hpp"
+#include <boost/thread/mutex.hpp>
 #include <fstream>
 
 namespace icinga
@@ -41,18 +44,35 @@ public:
        DECLARE_OBJECT(InfluxdbWriter);
        DECLARE_OBJECTNAME(InfluxdbWriter);
 
+       InfluxdbWriter(void);
+
        static void StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata);
 
+       int GetTaskCount(RingBuffer::SizeType span) const;
+
        virtual void ValidateHostTemplate(const Dictionary::Ptr& value, const ValidationUtils& utils) override;
        virtual void ValidateServiceTemplate(const Dictionary::Ptr& value, const ValidationUtils& utils) override;
 
 protected:
+       virtual void OnConfigLoaded(void) override;
        virtual void Start(bool runtimeCreated) override;
        virtual void Stop(bool runtimeRemoved) override;
 
+       void IncreaseTaskCount(void);
+
 private:
+       WorkQueue m_WorkQueue;
        Timer::Ptr m_FlushTimer;
-       Array::Ptr m_DataBuffer;
+       std::vector<String> m_DataBuffer;
+       boost::mutex m_DataBufferMutex;
+
+       mutable boost::mutex m_StatsMutex;
+       RingBuffer m_TaskStats;
+       int m_PendingTasks;
+       double m_PendingTasksTimestamp;
+
+       Timer::Ptr m_StatsLoggerTimer;
+       void StatsLoggerTimerHandler(void);
 
        void CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
        void SendPerfdata(const Dictionary::Ptr& tmpl, const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, double ts);
@@ -60,6 +80,8 @@ private:
        void FlushTimeout(void);
        void Flush(void);
 
+       void FlushHandler(const String& body);
+
        static String FormatInteger(const int val);
        static String FormatBoolean(const bool val);
 
@@ -67,6 +89,10 @@ private:
        static String EscapeField(const String& str);
 
        Stream::Ptr Connect(TcpSocket::Ptr& socket);
+
+       void AssertOnWorkQueue(void);
+
+       void ExceptionHandler(boost::exception_ptr exp);
 };
 
 }