REGISTER_STATSFUNCTION(GraphiteWriter, &GraphiteWriter::StatsFunc);
+/*
+ * Enable HA capabilities once the config object is loaded.
+ */
void GraphiteWriter::OnConfigLoaded()
{
ObjectImpl<GraphiteWriter>::OnConfigLoaded();
}
}
+/**
+ * Feature stats interface
+ *
+ * @param status Key value pairs for feature stats
+ * @param perfdata Array of PerfdataValue objects
+ */
void GraphiteWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata)
{
DictionaryData nodes;
status->Set("graphitewriter", new Dictionary(std::move(nodes)));
}
+/**
+ * Resume is equivalent to Start, but with HA capabilities to resume at runtime.
+ */
void GraphiteWriter::Resume()
{
ObjectImpl<GraphiteWriter>::Resume();
Checkable::OnNewCheckResult.connect(std::bind(&GraphiteWriter::CheckResultHandler, this, _1, _2));
}
-/* Pause is equivalent to Stop, but with HA capabilities to resume at runtime. */
+/**
+ * Pause is equivalent to Stop, but with HA capabilities to resume at runtime.
+ */
void GraphiteWriter::Pause()
{
m_ReconnectTimer.reset();
ObjectImpl<GraphiteWriter>::Pause();
}
+/**
+ * Check if method is called inside the WQ thread.
+ */
void GraphiteWriter::AssertOnWorkQueue()
{
ASSERT(m_WorkQueue.IsWorkerThread());
}
+/**
+ * Exception handler for the WQ.
+ *
+ * Closes the connection if connected.
+ *
+ * @param exp Exception pointer
+ */
void GraphiteWriter::ExceptionHandler(boost::exception_ptr exp)
{
Log(LogCritical, "GraphiteWriter", "Exception during Graphite operation: Verify that your backend is operational!");
<< "Exception during Graphite operation: " << DiagnosticInformation(std::move(exp));
if (GetConnected()) {
- m_Stream->Close();
+ m_Stream->close();
SetConnected(false);
}
}
+/**
+ * Reconnect method, stops when the feature is paused in HA zones.
+ *
+ * Called inside the WQ.
+ */
void GraphiteWriter::Reconnect()
{
AssertOnWorkQueue();
ReconnectInternal();
}
+/**
+ * Reconnect method, connects to a TCP Stream
+ */
void GraphiteWriter::ReconnectInternal()
{
double startTime = Utility::GetTime();
if (GetConnected())
return;
- TcpSocket::Ptr socket = new TcpSocket();
-
Log(LogNotice, "GraphiteWriter")
<< "Reconnecting to Graphite on host '" << GetHost() << "' port '" << GetPort() << "'.";
+ m_Stream = std::make_shared<AsioTcpStream>(IoEngine::Get().GetIoService());
+
try {
- socket->Connect(GetHost(), GetPort());
+ icinga::Connect(m_Stream->lowest_layer(), GetHost(), GetPort());
} catch (const std::exception& ex) {
- Log(LogCritical, "GraphiteWriter")
- << "Can't connect to Graphite on host '" << GetHost() << "' port '" << GetPort() << "'.";
- throw ex;
+ Log(LogWarning, "GraphiteWriter")
+ << "Can't connect to Graphite on host '" << GetHost() << "' port '" << GetPort() << ".'";
}
- m_Stream = new NetworkStream(socket);
-
SetConnected(true);
Log(LogInformation, "GraphiteWriter")
<< "Finished reconnecting to Graphite in " << std::setw(2) << Utility::GetTime() - startTime << " second(s).";
}
+/**
+ * Reconnect handler called by the timer.
+ *
+ * Enqueues a reconnect task into the WQ.
+ */
void GraphiteWriter::ReconnectTimerHandler()
{
if (IsPaused())
return;
- m_WorkQueue.Enqueue(std::bind(&GraphiteWriter::Reconnect, this), PriorityNormal);
+ m_WorkQueue.Enqueue(std::bind(&GraphiteWriter::Reconnect, this), PriorityHigh);
}
+/**
+ * Disconnect the stream.
+ *
+ * Called inside the WQ.
+ */
void GraphiteWriter::Disconnect()
{
AssertOnWorkQueue();
DisconnectInternal();
}
+/**
+ * Disconnect the stream.
+ *
+ * Called outside the WQ.
+ */
void GraphiteWriter::DisconnectInternal()
{
if (!GetConnected())
return;
- m_Stream->Close();
+ m_Stream->close();
SetConnected(false);
}
+/**
+ * Check result event handler, checks whether feature is not paused in HA setups.
+ *
+ * @param checkable Host/Service object
+ * @param cr Check result including performance data
+ */
void GraphiteWriter::CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
{
if (IsPaused())
m_WorkQueue.Enqueue(std::bind(&GraphiteWriter::CheckResultHandlerInternal, this, checkable, cr));
}
+/**
+ * Check result event handler, prepares metadata and perfdata values and calls Send*()
+ *
+ * Called inside the WQ.
+ *
+ * @param checkable Host/Service object
+ * @param cr Check result including performance data
+ */
void GraphiteWriter::CheckResultHandlerInternal(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
{
AssertOnWorkQueue();
SendPerfdata(checkable, prefixPerfdata, cr, ts);
}
+/**
+ * Parse performance data from check result and call SendMetric()
+ *
+ * @param checkable Host/service object
+ * @param prefix Metric prefix string
+ * @param cr Check result including performance data
+ * @param ts Timestamp when the check result was created
+ */
void GraphiteWriter::SendPerfdata(const Checkable::Ptr& checkable, const String& prefix, const CheckResult::Ptr& cr, double ts)
{
Array::Ptr perfdata = cr->GetPerformanceData();
}
}
+/**
+ * Computes metric data and sends to Graphite
+ *
+ * @param checkable Host/service object
+ * @param prefix Computed metric prefix string
+ * @param name Metric name
+ * @param value Metric value
+ * @param ts Timestamp when the check result was created
+ */
void GraphiteWriter::SendMetric(const Checkable::Ptr& checkable, const String& prefix, const String& name, double value, double ts)
{
+ namespace asio = boost::asio;
+
std::ostringstream msgbuf;
msgbuf << prefix << "." << name << " " << Convert::ToString(value) << " " << static_cast<long>(ts);
// do not send \n to debug log
msgbuf << "\n";
- String metric = msgbuf.str();
boost::mutex::scoped_lock lock(m_StreamMutex);
return;
try {
- m_Stream->Write(metric.CStr(), metric.GetLength());
+ asio::write(*m_Stream, asio::buffer(msgbuf.str()));
+ m_Stream->flush();
} catch (const std::exception& ex) {
Log(LogCritical, "GraphiteWriter")
<< "Cannot write to TCP socket on host '" << GetHost() << "' port '" << GetPort() << "'.";
}
}
+/**
+ * Escape metric tree elements
+ *
+ * Dots are not allowed, e.g. in host names
+ *
+ * @param str Metric part name
+ * @return Escape string
+ */
String GraphiteWriter::EscapeMetric(const String& str)
{
String result = str;
return result;
}
+/**
+ * Escape metric label
+ *
+ * Dots are allowed - users can create trees from perfdata labels
+ *
+ * @param str Metric label name
+ * @return Escaped string
+ */
String GraphiteWriter::EscapeMetricLabel(const String& str)
{
String result = str;
return result;
}
+/**
+ * Escape macro metrics found via host/service name templates
+ *
+ * @param value Array or string with macro metric names
+ * @return Escaped string. Arrays are joined with dots.
+ */
Value GraphiteWriter::EscapeMacroMetric(const Value& value)
{
if (value.IsObjectType<Array>()) {
return EscapeMetric(value);
}
+/**
+ * Validate the configuration setting 'host_name_template'
+ *
+ * @param lvalue String containing runtime macros.
+ * @param utils Helper, unused
+ */
void GraphiteWriter::ValidateHostNameTemplate(const Lazy<String>& lvalue, const ValidationUtils& utils)
{
ObjectImpl<GraphiteWriter>::ValidateHostNameTemplate(lvalue, utils);
BOOST_THROW_EXCEPTION(ValidationError(this, { "host_name_template" }, "Closing $ not found in macro format string '" + lvalue() + "'."));
}
+/**
+ * Validate the configuration setting 'service_name_template'
+ *
+ * @param lvalue String containing runtime macros.
+ * @param utils Helper, unused
+ */
void GraphiteWriter::ValidateServiceNameTemplate(const Lazy<String>& lvalue, const ValidationUtils& utils)
{
ObjectImpl<GraphiteWriter>::ValidateServiceNameTemplate(lvalue, utils);