1 /* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */
3 #include "perfdata/graphitewriter.hpp"
4 #include "perfdata/graphitewriter-ti.cpp"
5 #include "icinga/service.hpp"
6 #include "icinga/checkcommand.hpp"
7 #include "icinga/macroprocessor.hpp"
8 #include "icinga/icingaapplication.hpp"
9 #include "base/tcpsocket.hpp"
10 #include "base/configtype.hpp"
11 #include "base/objectlock.hpp"
12 #include "base/logger.hpp"
13 #include "base/convert.hpp"
14 #include "base/utility.hpp"
15 #include "base/perfdatavalue.hpp"
16 #include "base/application.hpp"
17 #include "base/stream.hpp"
18 #include "base/networkstream.hpp"
19 #include "base/exception.hpp"
20 #include "base/statsfunction.hpp"
21 #include <boost/algorithm/string.hpp>
22 #include <boost/algorithm/string/replace.hpp>
25 using namespace icinga;
27 REGISTER_TYPE(GraphiteWriter);
29 REGISTER_STATSFUNCTION(GraphiteWriter, &GraphiteWriter::StatsFunc);
32 * Enable HA capabilities once the config object is loaded.
34 void GraphiteWriter::OnConfigLoaded()
36 ObjectImpl<GraphiteWriter>::OnConfigLoaded();
38 m_WorkQueue.SetName("GraphiteWriter, " + GetName());
41 Log(LogDebug, "GraphiteWriter")
42 << "HA functionality disabled. Won't pause connection: " << GetName();
44 SetHAMode(HARunEverywhere);
51 * Feature stats interface
53 * @param status Key value pairs for feature stats
54 * @param perfdata Array of PerfdataValue objects
56 void GraphiteWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata)
60 for (const GraphiteWriter::Ptr& graphitewriter : ConfigType::GetObjectsByType<GraphiteWriter>()) {
61 size_t workQueueItems = graphitewriter->m_WorkQueue.GetLength();
62 double workQueueItemRate = graphitewriter->m_WorkQueue.GetTaskCount(60) / 60.0;
64 nodes.emplace_back(graphitewriter->GetName(), new Dictionary({
65 { "work_queue_items", workQueueItems },
66 { "work_queue_item_rate", workQueueItemRate },
67 { "connected", graphitewriter->GetConnected() }
70 perfdata->Add(new PerfdataValue("graphitewriter_" + graphitewriter->GetName() + "_work_queue_items", workQueueItems));
71 perfdata->Add(new PerfdataValue("graphitewriter_" + graphitewriter->GetName() + "_work_queue_item_rate", workQueueItemRate));
74 status->Set("graphitewriter", new Dictionary(std::move(nodes)));
78 * Resume is equivalent to Start, but with HA capabilities to resume at runtime.
80 void GraphiteWriter::Resume()
82 ObjectImpl<GraphiteWriter>::Resume();
84 Log(LogInformation, "GraphiteWriter")
85 << "'" << GetName() << "' resumed.";
87 /* Register exception handler for WQ tasks. */
88 m_WorkQueue.SetExceptionCallback(std::bind(&GraphiteWriter::ExceptionHandler, this, _1));
90 /* Timer for reconnecting */
91 m_ReconnectTimer = new Timer();
92 m_ReconnectTimer->SetInterval(10);
93 m_ReconnectTimer->OnTimerExpired.connect(std::bind(&GraphiteWriter::ReconnectTimerHandler, this));
94 m_ReconnectTimer->Start();
95 m_ReconnectTimer->Reschedule(0);
97 /* Register event handlers. */
98 Checkable::OnNewCheckResult.connect(std::bind(&GraphiteWriter::CheckResultHandler, this, _1, _2));
102 * Pause is equivalent to Stop, but with HA capabilities to resume at runtime.
104 void GraphiteWriter::Pause()
106 m_ReconnectTimer.reset();
110 } catch (const std::exception&) {
111 Log(LogInformation, "GraphiteWriter")
112 << "'" << GetName() << "' paused. Unable to connect, not flushing buffers. Data may be lost on reload.";
114 ObjectImpl<GraphiteWriter>::Pause();
119 DisconnectInternal();
121 Log(LogInformation, "GraphiteWriter")
122 << "'" << GetName() << "' paused.";
124 ObjectImpl<GraphiteWriter>::Pause();
128 * Check if method is called inside the WQ thread.
130 void GraphiteWriter::AssertOnWorkQueue()
132 ASSERT(m_WorkQueue.IsWorkerThread());
136 * Exception handler for the WQ.
138 * Closes the connection if connected.
140 * @param exp Exception pointer
142 void GraphiteWriter::ExceptionHandler(boost::exception_ptr exp)
144 Log(LogCritical, "GraphiteWriter", "Exception during Graphite operation: Verify that your backend is operational!");
146 Log(LogDebug, "GraphiteWriter")
147 << "Exception during Graphite operation: " << DiagnosticInformation(std::move(exp));
149 if (GetConnected()) {
157 * Reconnect method, stops when the feature is paused in HA zones.
159 * Called inside the WQ.
161 void GraphiteWriter::Reconnect()
174 * Reconnect method, connects to a TCP Stream
176 void GraphiteWriter::ReconnectInternal()
178 double startTime = Utility::GetTime();
180 CONTEXT("Reconnecting to Graphite '" + GetName() + "'");
182 SetShouldConnect(true);
187 Log(LogNotice, "GraphiteWriter")
188 << "Reconnecting to Graphite on host '" << GetHost() << "' port '" << GetPort() << "'.";
190 m_Stream = std::make_shared<AsioTcpStream>(IoEngine::Get().GetIoContext());
193 icinga::Connect(m_Stream->lowest_layer(), GetHost(), GetPort());
194 } catch (const std::exception& ex) {
195 Log(LogWarning, "GraphiteWriter")
196 << "Can't connect to Graphite on host '" << GetHost() << "' port '" << GetPort() << ".'";
201 Log(LogInformation, "GraphiteWriter")
202 << "Finished reconnecting to Graphite in " << std::setw(2) << Utility::GetTime() - startTime << " second(s).";
206 * Reconnect handler called by the timer.
208 * Enqueues a reconnect task into the WQ.
210 void GraphiteWriter::ReconnectTimerHandler()
215 m_WorkQueue.Enqueue(std::bind(&GraphiteWriter::Reconnect, this), PriorityHigh);
219 * Disconnect the stream.
221 * Called inside the WQ.
223 void GraphiteWriter::Disconnect()
227 DisconnectInternal();
231 * Disconnect the stream.
233 * Called outside the WQ.
235 void GraphiteWriter::DisconnectInternal()
246 * Check result event handler, checks whether feature is not paused in HA setups.
248 * @param checkable Host/Service object
249 * @param cr Check result including performance data
251 void GraphiteWriter::CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
256 m_WorkQueue.Enqueue(std::bind(&GraphiteWriter::CheckResultHandlerInternal, this, checkable, cr));
260 * Check result event handler, prepares metadata and perfdata values and calls Send*()
262 * Called inside the WQ.
264 * @param checkable Host/Service object
265 * @param cr Check result including performance data
267 void GraphiteWriter::CheckResultHandlerInternal(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
271 CONTEXT("Processing check result for '" + checkable->GetName() + "'");
273 /* TODO: Deal with missing connection here. Needs refactoring
274 * into parsing the actual performance data and then putting it
275 * into a queue for re-inserting. */
277 if (!IcingaApplication::GetInstance()->GetEnablePerfdata() || !checkable->GetEnablePerfdata())
281 Service::Ptr service;
282 tie(host, service) = GetHostService(checkable);
284 MacroProcessor::ResolverList resolvers;
286 resolvers.emplace_back("service", service);
287 resolvers.emplace_back("host", host);
288 resolvers.emplace_back("icinga", IcingaApplication::GetInstance());
293 prefix = MacroProcessor::ResolveMacros(GetServiceNameTemplate(), resolvers, cr, nullptr, std::bind(&GraphiteWriter::EscapeMacroMetric, _1));
295 prefix = MacroProcessor::ResolveMacros(GetHostNameTemplate(), resolvers, cr, nullptr, std::bind(&GraphiteWriter::EscapeMacroMetric, _1));
298 String prefixPerfdata = prefix + ".perfdata";
299 String prefixMetadata = prefix + ".metadata";
301 double ts = cr->GetExecutionEnd();
303 if (GetEnableSendMetadata()) {
305 SendMetric(checkable, prefixMetadata, "state", service->GetState(), ts);
307 SendMetric(checkable, prefixMetadata, "state", host->GetState(), ts);
310 SendMetric(checkable, prefixMetadata, "current_attempt", checkable->GetCheckAttempt(), ts);
311 SendMetric(checkable, prefixMetadata, "max_check_attempts", checkable->GetMaxCheckAttempts(), ts);
312 SendMetric(checkable, prefixMetadata, "state_type", checkable->GetStateType(), ts);
313 SendMetric(checkable, prefixMetadata, "reachable", checkable->IsReachable(), ts);
314 SendMetric(checkable, prefixMetadata, "downtime_depth", checkable->GetDowntimeDepth(), ts);
315 SendMetric(checkable, prefixMetadata, "acknowledgement", checkable->GetAcknowledgement(), ts);
316 SendMetric(checkable, prefixMetadata, "latency", cr->CalculateLatency(), ts);
317 SendMetric(checkable, prefixMetadata, "execution_time", cr->CalculateExecutionTime(), ts);
320 SendPerfdata(checkable, prefixPerfdata, cr, ts);
324 * Parse performance data from check result and call SendMetric()
326 * @param checkable Host/service object
327 * @param prefix Metric prefix string
328 * @param cr Check result including performance data
329 * @param ts Timestamp when the check result was created
331 void GraphiteWriter::SendPerfdata(const Checkable::Ptr& checkable, const String& prefix, const CheckResult::Ptr& cr, double ts)
333 Array::Ptr perfdata = cr->GetPerformanceData();
338 CheckCommand::Ptr checkCommand = checkable->GetCheckCommand();
340 ObjectLock olock(perfdata);
341 for (const Value& val : perfdata) {
342 PerfdataValue::Ptr pdv;
344 if (val.IsObjectType<PerfdataValue>())
348 pdv = PerfdataValue::Parse(val);
349 } catch (const std::exception&) {
350 Log(LogWarning, "GraphiteWriter")
351 << "Ignoring invalid perfdata for checkable '"
352 << checkable->GetName() << "' and command '"
353 << checkCommand->GetName() << "' with value: " << val;
358 String escapedKey = EscapeMetricLabel(pdv->GetLabel());
360 SendMetric(checkable, prefix, escapedKey + ".value", pdv->GetValue(), ts);
362 if (GetEnableSendThresholds()) {
364 SendMetric(checkable, prefix, escapedKey + ".crit", pdv->GetCrit(), ts);
366 SendMetric(checkable, prefix, escapedKey + ".warn", pdv->GetWarn(), ts);
368 SendMetric(checkable, prefix, escapedKey + ".min", pdv->GetMin(), ts);
370 SendMetric(checkable, prefix, escapedKey + ".max", pdv->GetMax(), ts);
376 * Computes metric data and sends to Graphite
378 * @param checkable Host/service object
379 * @param prefix Computed metric prefix string
380 * @param name Metric name
381 * @param value Metric value
382 * @param ts Timestamp when the check result was created
384 void GraphiteWriter::SendMetric(const Checkable::Ptr& checkable, const String& prefix, const String& name, double value, double ts)
386 namespace asio = boost::asio;
388 std::ostringstream msgbuf;
389 msgbuf << prefix << "." << name << " " << Convert::ToString(value) << " " << static_cast<long>(ts);
391 Log(LogDebug, "GraphiteWriter")
392 << "Checkable '" << checkable->GetName() << "' adds to metric list: '" << msgbuf.str() << "'.";
394 // do not send \n to debug log
397 boost::mutex::scoped_lock lock(m_StreamMutex);
403 asio::write(*m_Stream, asio::buffer(msgbuf.str()));
405 } catch (const std::exception& ex) {
406 Log(LogCritical, "GraphiteWriter")
407 << "Cannot write to TCP socket on host '" << GetHost() << "' port '" << GetPort() << "'.";
414 * Escape metric tree elements
416 * Dots are not allowed, e.g. in host names
418 * @param str Metric part name
419 * @return Escape string
421 String GraphiteWriter::EscapeMetric(const String& str)
425 //don't allow '.' in metric prefixes
426 boost::replace_all(result, " ", "_");
427 boost::replace_all(result, ".", "_");
428 boost::replace_all(result, "\\", "_");
429 boost::replace_all(result, "/", "_");
435 * Escape metric label
437 * Dots are allowed - users can create trees from perfdata labels
439 * @param str Metric label name
440 * @return Escaped string
442 String GraphiteWriter::EscapeMetricLabel(const String& str)
446 //allow to pass '.' in perfdata labels
447 boost::replace_all(result, " ", "_");
448 boost::replace_all(result, "\\", "_");
449 boost::replace_all(result, "/", "_");
450 boost::replace_all(result, "::", ".");
456 * Escape macro metrics found via host/service name templates
458 * @param value Array or string with macro metric names
459 * @return Escaped string. Arrays are joined with dots.
461 Value GraphiteWriter::EscapeMacroMetric(const Value& value)
463 if (value.IsObjectType<Array>()) {
464 Array::Ptr arr = value;
467 ObjectLock olock(arr);
468 for (const Value& arg : arr) {
469 result.push_back(EscapeMetric(arg));
472 return Utility::Join(new Array(std::move(result)), '.');
474 return EscapeMetric(value);
478 * Validate the configuration setting 'host_name_template'
480 * @param lvalue String containing runtime macros.
481 * @param utils Helper, unused
483 void GraphiteWriter::ValidateHostNameTemplate(const Lazy<String>& lvalue, const ValidationUtils& utils)
485 ObjectImpl<GraphiteWriter>::ValidateHostNameTemplate(lvalue, utils);
487 if (!MacroProcessor::ValidateMacroString(lvalue()))
488 BOOST_THROW_EXCEPTION(ValidationError(this, { "host_name_template" }, "Closing $ not found in macro format string '" + lvalue() + "'."));
492 * Validate the configuration setting 'service_name_template'
494 * @param lvalue String containing runtime macros.
495 * @param utils Helper, unused
497 void GraphiteWriter::ValidateServiceNameTemplate(const Lazy<String>& lvalue, const ValidationUtils& utils)
499 ObjectImpl<GraphiteWriter>::ValidateServiceNameTemplate(lvalue, utils);
501 if (!MacroProcessor::ValidateMacroString(lvalue()))
502 BOOST_THROW_EXCEPTION(ValidationError(this, { "service_name_template" }, "Closing $ not found in macro format string '" + lvalue() + "'."));