//TODO: Evaluate whether multiple WQ threads and InfluxDB connections are possible. 10 threads will hog InfluxDB in large scale environments.
InfluxdbWriter::InfluxdbWriter(void)
- : m_WorkQueue(10000000, 1), m_TaskStats(15 * 60), m_PendingTasks(0), m_PendingTasksTimestamp(0)
+ : m_WorkQueue(10000000, 1)
{ }
void InfluxdbWriter::OnConfigLoaded(void)
m_FlushTimer->Start();
m_FlushTimer->Reschedule(0);
- /* Timer for updating and logging work queue stats */
- m_StatsLoggerTimer = new Timer();
- m_StatsLoggerTimer->SetInterval(60); // don't be too noisy
- m_StatsLoggerTimer->OnTimerExpired.connect(boost::bind(&InfluxdbWriter::StatsLoggerTimerHandler, this));
- m_StatsLoggerTimer->Start();
-
/* Register for new metrics. */
Service::OnNewCheckResult.connect(boost::bind(&InfluxdbWriter::CheckResultHandler, this, _1, _2));
}
//TODO: Close the connection, if we keep it open.
}
-void InfluxdbWriter::StatsLoggerTimerHandler(void)
-{
- int pending = m_WorkQueue.GetLength();
-
- double now = Utility::GetTime();
- double gradient = (pending - m_PendingTasks) / (now - m_PendingTasksTimestamp);
- double timeToZero = pending / gradient;
-
- String timeInfo;
-
- if (pending > GetTaskCount(5)) {
- timeInfo = " empty in ";
- if (timeToZero < 0)
- timeInfo += "infinite time, your backend isn't able to keep up";
- else
- timeInfo += Utility::FormatDuration(timeToZero);
- }
-
- m_PendingTasks = pending;
- m_PendingTasksTimestamp = now;
-
- Log(LogInformation, "InfluxdbWriter")
- << "Work queue items: " << pending
- << ", rate: " << std::setw(2) << GetTaskCount(60) / 60.0 << "/s"
- << " (" << GetTaskCount(60) << "/min " << GetTaskCount(60 * 5) << "/5min " << GetTaskCount(60 * 15) << "/15min);"
- << timeInfo;
-}
-
Stream::Ptr InfluxdbWriter::Connect(TcpSocket::Ptr& socket)
{
socket = new TcpSocket();
if (!stream)
return;
- IncreaseTaskCount();
-
Url::Ptr url = new Url();
url->SetScheme(GetSslEnable() ? "https" : "http");
url->SetHost(GetHost());
}
}
-void InfluxdbWriter::IncreaseTaskCount(void)
-{
- double now = Utility::GetTime();
-
- boost::mutex::scoped_lock lock(m_StatsMutex);
- m_TaskStats.InsertValue(now, 1);
-}
-
-int InfluxdbWriter::GetTaskCount(RingBuffer::SizeType span) const
-{
- boost::mutex::scoped_lock lock(m_StatsMutex);
- return m_TaskStats.GetValues(span);
-}
-
void InfluxdbWriter::ValidateHostTemplate(const Dictionary::Ptr& value, const ValidationUtils& utils)
{
ObjectImpl<InfluxdbWriter>::ValidateHostTemplate(value, utils);
#include "base/configobject.hpp"
#include "base/tcpsocket.hpp"
#include "base/timer.hpp"
-#include "base/ringbuffer.hpp"
#include "base/workqueue.hpp"
#include <boost/thread/mutex.hpp>
#include <fstream>
static void StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata);
- int GetTaskCount(RingBuffer::SizeType span) const;
-
virtual void ValidateHostTemplate(const Dictionary::Ptr& value, const ValidationUtils& utils) override;
virtual void ValidateServiceTemplate(const Dictionary::Ptr& value, const ValidationUtils& utils) override;
virtual void Start(bool runtimeCreated) override;
virtual void Stop(bool runtimeRemoved) override;
- void IncreaseTaskCount(void);
-
private:
WorkQueue m_WorkQueue;
Timer::Ptr m_FlushTimer;
std::vector<String> m_DataBuffer;
boost::mutex m_DataBufferMutex;
- mutable boost::mutex m_StatsMutex;
- RingBuffer m_TaskStats;
- int m_PendingTasks;
- double m_PendingTasksTimestamp;
-
- Timer::Ptr m_StatsLoggerTimer;
- void StatsLoggerTimerHandler(void);
-
void CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
void SendPerfdata(const Dictionary::Ptr& tmpl, const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, double ts);
void SendMetric(const Dictionary::Ptr& tmpl, const String& label, const Dictionary::Ptr& fields, double ts);