REGISTER_STATSFUNCTION(OpenTsdbWriter, &OpenTsdbWriter::StatsFunc);
+/*
+ * Enable HA capabilities once the config object is loaded.
+ */
void OpenTsdbWriter::OnConfigLoaded()
{
ObjectImpl<OpenTsdbWriter>::OnConfigLoaded();
}
}
+/**
+ * Feature stats interface
+ *
+ * @param status Key value pairs for feature stats
+ */
void OpenTsdbWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr&)
{
DictionaryData nodes;
for (const OpenTsdbWriter::Ptr& opentsdbwriter : ConfigType::GetObjectsByType<OpenTsdbWriter>()) {
- nodes.emplace_back(opentsdbwriter->GetName(), 1); //add more stats
+ nodes.emplace_back(opentsdbwriter->GetName(), new Dictionary({
+ { "connected", opentsdbwriter->GetConnected() }
+ }));
}
status->Set("opentsdbwriter", new Dictionary(std::move(nodes)));
}
+/**
+ * Resume is equivalent to Start, but with HA capabilities to resume at runtime.
+ */
void OpenTsdbWriter::Resume()
{
ObjectImpl<OpenTsdbWriter>::Resume();
Service::OnNewCheckResult.connect(std::bind(&OpenTsdbWriter::CheckResultHandler, this, _1, _2));
}
-/* Pause is equivalent to Stop, but with HA capabilities to resume at runtime. */
+/**
+ * Pause is equivalent to Stop, but with HA capabilities to resume at runtime.
+ */
void OpenTsdbWriter::Pause()
{
m_ReconnectTimer.reset();
Log(LogInformation, "OpentsdbWriter")
<< "'" << GetName() << "' paused.";
+ m_Stream->close();
+
+ SetConnected(false);
+
ObjectImpl<OpenTsdbWriter>::Pause();
}
+/**
+ * Reconnect handler called by the timer.
+ * Handles TLS
+ */
void OpenTsdbWriter::ReconnectTimerHandler()
{
if (IsPaused())
return;
- if (m_Stream)
- return;
+ SetShouldConnect(true);
- TcpSocket::Ptr socket = new TcpSocket();
+ if (GetConnected())
+ return;
Log(LogNotice, "OpenTsdbWriter")
<< "Reconnect to OpenTSDB TSD on host '" << GetHost() << "' port '" << GetPort() << "'.";
+ /*
+ * We're using telnet as input method. Future PRs may change this into using the HTTP API.
+ * http://opentsdb.net/docs/build/html/user_guide/writing/index.html#telnet
+ */
+
+ m_Stream = std::make_shared<AsioTcpStream>(IoEngine::Get().GetIoService());
+
try {
- socket->Connect(GetHost(), GetPort());
- } catch (std::exception&) {
- Log(LogCritical, "OpenTsdbWriter")
- << "Can't connect to OpenTSDB TSD on host '" << GetHost() << "' port '" << GetPort() << "'.";
- return;
+ icinga::Connect(m_Stream->lowest_layer(), GetHost(), GetPort());
+ } catch (const std::exception& ex) {
+ Log(LogWarning, "OpenTsdbWriter")
+ << "Can't connect to OpenTSDB on host '" << GetHost() << "' port '" << GetPort() << ".'";
}
- m_Stream = new NetworkStream(socket);
+ SetConnected(true);
}
+/**
+ * Registered check result handler processing data.
+ * Calculates tags from the config.
+ *
+ * @param checkable Host/service object
+ * @param cr Check result
+ */
void OpenTsdbWriter::CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
{
if (IsPaused())
SendMetric(checkable, metric + ".execution_time", tags, cr->CalculateExecutionTime(), ts);
}
+/**
+ * Parse and send performance data metrics to OpenTSDB
+ *
+ * @param checkable Host/service object
+ * @param metric Full metric name
+ * @param tags Tag key pairs
+ * @param cr Check result containing performance data
+ * @param ts Timestamp when the check result was received
+ */
void OpenTsdbWriter::SendPerfdata(const Checkable::Ptr& checkable, const String& metric,
const std::map<String, String>& tags, const CheckResult::Ptr& cr, double ts)
{
}
}
+/**
+ * Send given metric to OpenTSDB
+ *
+ * @param checkable Host/service object
+ * @param metric Full metric name
+ * @param tags Tag key pairs
+ * @param value Floating point metric value
+ * @param ts Timestamp where the metric was received from the check result
+ */
void OpenTsdbWriter::SendMetric(const Checkable::Ptr& checkable, const String& metric,
const std::map<String, String>& tags, double value, double ts)
{
std::ostringstream msgbuf;
/*
- * must be (http://opentsdb.net/docs/build/html/user_guide/writing.html)
+ * must be (http://opentsdb.net/docs/build/html/user_guide/query/timeseries.html)
* put <metric> <timestamp> <value> <tagk1=tagv1[ tagk2=tagv2 ...tagkN=tagvN]>
* "tags" must include at least one tag, we use "host=HOSTNAME"
*/
ObjectLock olock(this);
- if (!m_Stream)
+ if (!GetConnected())
return;
try {
- m_Stream->Write(put.CStr(), put.GetLength());
+ Log(LogDebug, "OpenTsdbWriter")
+ << "Checkable '" << checkable->GetName() << "' sending message '" << put << "'.";
+
+ boost::asio::write(*m_Stream, boost::asio::buffer(msgbuf.str()));
+ m_Stream->flush();
} catch (const std::exception& ex) {
Log(LogCritical, "OpenTsdbWriter")
- << "Cannot write to OpenTSDB TSD on host '" << GetHost() << "' port '" << GetPort() + "'.";
-
- m_Stream.reset();
+ << "Cannot write to TCP socket on host '" << GetHost() << "' port '" << GetPort() << "'.";
}
}
-/* for metric and tag name rules, see
- * http://opentsdb.net/docs/build/html/user_guide/writing.html#metrics-and-tags
+/**
+ * Escape tags for OpenTSDB
+ * http://opentsdb.net/docs/build/html/user_guide/query/timeseries.html#precisions-on-metrics-and-tags
+ *
+ * @param str Tag name
+ * @return Escaped tag
*/
String OpenTsdbWriter::EscapeTag(const String& str)
{
return result;
}
+/**
+ * Escape metric name for OpenTSDB
+ * http://opentsdb.net/docs/build/html/user_guide/query/timeseries.html#precisions-on-metrics-and-tags
+ *
+ * @param str Metric name
+ * @return Escaped metric
+ */
String OpenTsdbWriter::EscapeMetric(const String& str)
{
String result = str;
boost::replace_all(result, ":", "_");
return result;
-}
+}
\ No newline at end of file