#include "icinga/service.hpp"
#include "icinga/macroprocessor.hpp"
#include "icinga/icingaapplication.hpp"
-#include "icinga/compatutility.hpp"
#include "base/tcpsocket.hpp"
#include "base/configtype.hpp"
#include "base/objectlock.hpp"
REGISTER_STATSFUNCTION(GraphiteWriter, &GraphiteWriter::StatsFunc);
-void GraphiteWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr&)
+GraphiteWriter::GraphiteWriter(void)
+ : m_WorkQueue(10000000, 1)
+{ }
+
+void GraphiteWriter::OnConfigLoaded(void)
+{
+ ObjectImpl<GraphiteWriter>::OnConfigLoaded();
+
+ m_WorkQueue.SetName("GraphiteWriter, " + GetName());
+}
+
+void GraphiteWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata)
{
Dictionary::Ptr nodes = new Dictionary();
for (const GraphiteWriter::Ptr& graphitewriter : ConfigType::GetObjectsByType<GraphiteWriter>()) {
- nodes->Set(graphitewriter->GetName(), 1); //add more stats
+ size_t workQueueItems = graphitewriter->m_WorkQueue.GetLength();
+ double workQueueItemRate = graphitewriter->m_WorkQueue.GetTaskCount(60) / 60.0;
+
+ Dictionary::Ptr stats = new Dictionary();
+ stats->Set("work_queue_items", workQueueItems);
+ stats->Set("work_queue_item_rate", workQueueItemRate);
+
+ nodes->Set(graphitewriter->GetName(), stats);
+
+ perfdata->Add(new PerfdataValue("graphitewriter_" + graphitewriter->GetName() + "_work_queue_items", workQueueItems));
+ perfdata->Add(new PerfdataValue("graphitewriter_" + graphitewriter->GetName() + "_work_queue_item_rate", workQueueItemRate));
}
status->Set("graphitewriter", nodes);
Log(LogInformation, "GraphiteWriter")
<< "'" << GetName() << "' started.";
+ /* Register exception handler for WQ tasks. */
+ m_WorkQueue.SetExceptionCallback(boost::bind(&GraphiteWriter::ExceptionHandler, this, _1));
+
+ /* Timer for reconnecting */
m_ReconnectTimer = new Timer();
m_ReconnectTimer->SetInterval(10);
m_ReconnectTimer->OnTimerExpired.connect(boost::bind(&GraphiteWriter::ReconnectTimerHandler, this));
Log(LogInformation, "GraphiteWriter")
<< "'" << GetName() << "' stopped.";
+ m_WorkQueue.Join();
+
ObjectImpl<GraphiteWriter>::Stop(runtimeRemoved);
}
-void GraphiteWriter::ReconnectTimerHandler(void)
+void GraphiteWriter::AssertOnWorkQueue(void)
+{
+ ASSERT(m_WorkQueue.IsWorkerThread());
+}
+
+void GraphiteWriter::ExceptionHandler(boost::exception_ptr exp)
+{
+ Log(LogCritical, "GraphiteWriter", "Exception during Graphite operation: Verify that your backend is operational!");
+
+ Log(LogDebug, "GraphiteWriter")
+ << "Exception during Graphite operation: " << DiagnosticInformation(exp);
+
+ if (GetConnected()) {
+ m_Stream->Close();
+
+ SetConnected(false);
+ }
+}
+
+void GraphiteWriter::Reconnect(void)
{
- if (m_Stream)
+ AssertOnWorkQueue();
+
+ double startTime = Utility::GetTime();
+
+ CONTEXT("Reconnecting to Graphite '" + GetName() + "'");
+
+ SetShouldConnect(true);
+
+ bool reconnect = false;
+
+ if (GetConnected())
return;
TcpSocket::Ptr socket = new TcpSocket();
try {
socket->Connect(GetHost(), GetPort());
- } catch (std::exception&) {
+ } catch (const std::exception& ex) {
Log(LogCritical, "GraphiteWriter")
<< "Can't connect to Graphite on host '" << GetHost() << "' port '" << GetPort() << "'.";
- return;
+ throw ex;
}
m_Stream = new NetworkStream(socket);
+
+ SetConnected(true);
+
+ Log(LogInformation, "GraphiteWriter")
+ << "Finished reconnecting to Graphite in " << std::setw(2) << Utility::GetTime() - startTime << " second(s).";
+}
+
+void GraphiteWriter::ReconnectTimerHandler(void)
+{
+ m_WorkQueue.Enqueue(boost::bind(&GraphiteWriter::Reconnect, this), PriorityNormal);
+}
+
+void GraphiteWriter::Disconnect(void)
+{
+ AssertOnWorkQueue();
+
+ if (!GetConnected())
+ return;
+
+ m_Stream->Close();
+
+ SetConnected(false);
}
void GraphiteWriter::CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
{
+ m_WorkQueue.Enqueue(boost::bind(&GraphiteWriter::InternalCheckResultHandler, this, checkable, cr));
+}
+
+void GraphiteWriter::InternalCheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
+{
+ AssertOnWorkQueue();
+
CONTEXT("Processing check result for '" + checkable->GetName() + "'");
if (!IcingaApplication::GetInstance()->GetEnablePerfdata() || !checkable->GetEnablePerfdata())
#include "base/configobject.hpp"
#include "base/tcpsocket.hpp"
#include "base/timer.hpp"
+#include "base/workqueue.hpp"
#include <fstream>
namespace icinga
DECLARE_OBJECT(GraphiteWriter);
DECLARE_OBJECTNAME(GraphiteWriter);
+ GraphiteWriter(void);
+
static void StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata);
virtual void ValidateHostNameTemplate(const String& value, const ValidationUtils& utils) override;
virtual void ValidateServiceNameTemplate(const String& value, const ValidationUtils& utils) override;
protected:
+ virtual void OnConfigLoaded(void) override;
virtual void Start(bool runtimeCreated) override;
virtual void Stop(bool runtimeRemoved) override;
private:
Stream::Ptr m_Stream;
+ WorkQueue m_WorkQueue;
Timer::Ptr m_ReconnectTimer;
void CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
+ void InternalCheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
void SendMetric(const String& prefix, const String& name, double value, double ts);
void SendPerfdata(const String& prefix, const CheckResult::Ptr& cr, double ts);
static String EscapeMetric(const String& str, bool legacyMode = false);
static Value EscapeMacroMetric(const Value& value, bool legacyMode = false);
void ReconnectTimerHandler(void);
+
+ void Disconnect(void);
+ void Reconnect(void);
+
+ void AssertOnWorkQueue(void);
+
+ void ExceptionHandler(boost::exception_ptr exp);
};
}