]> granicus.if.org Git - icinga2/blob - lib/perfdata/graphitewriter.cpp
Merge pull request #7527 from Icinga/bugfix/checkable-command-endpoint-zone
[icinga2] / lib / perfdata / graphitewriter.cpp
1 /* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */
2
3 #include "perfdata/graphitewriter.hpp"
4 #include "perfdata/graphitewriter-ti.cpp"
5 #include "icinga/service.hpp"
6 #include "icinga/checkcommand.hpp"
7 #include "icinga/macroprocessor.hpp"
8 #include "icinga/icingaapplication.hpp"
9 #include "base/tcpsocket.hpp"
10 #include "base/configtype.hpp"
11 #include "base/objectlock.hpp"
12 #include "base/logger.hpp"
13 #include "base/convert.hpp"
14 #include "base/utility.hpp"
15 #include "base/perfdatavalue.hpp"
16 #include "base/application.hpp"
17 #include "base/stream.hpp"
18 #include "base/networkstream.hpp"
19 #include "base/exception.hpp"
20 #include "base/statsfunction.hpp"
21 #include <boost/algorithm/string.hpp>
22 #include <boost/algorithm/string/replace.hpp>
23 #include <utility>
24
25 using namespace icinga;
26
27 REGISTER_TYPE(GraphiteWriter);
28
29 REGISTER_STATSFUNCTION(GraphiteWriter, &GraphiteWriter::StatsFunc);
30
31 /*
32  * Enable HA capabilities once the config object is loaded.
33  */
34 void GraphiteWriter::OnConfigLoaded()
35 {
36         ObjectImpl<GraphiteWriter>::OnConfigLoaded();
37
38         m_WorkQueue.SetName("GraphiteWriter, " + GetName());
39
40         if (!GetEnableHa()) {
41                 Log(LogDebug, "GraphiteWriter")
42                         << "HA functionality disabled. Won't pause connection: " << GetName();
43
44                 SetHAMode(HARunEverywhere);
45         } else {
46                 SetHAMode(HARunOnce);
47         }
48 }
49
50 /**
51  * Feature stats interface
52  *
53  * @param status Key value pairs for feature stats
54  * @param perfdata Array of PerfdataValue objects
55  */
56 void GraphiteWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata)
57 {
58         DictionaryData nodes;
59
60         for (const GraphiteWriter::Ptr& graphitewriter : ConfigType::GetObjectsByType<GraphiteWriter>()) {
61                 size_t workQueueItems = graphitewriter->m_WorkQueue.GetLength();
62                 double workQueueItemRate = graphitewriter->m_WorkQueue.GetTaskCount(60) / 60.0;
63
64                 nodes.emplace_back(graphitewriter->GetName(), new Dictionary({
65                         { "work_queue_items", workQueueItems },
66                         { "work_queue_item_rate", workQueueItemRate },
67                         { "connected", graphitewriter->GetConnected() }
68                 }));
69
70                 perfdata->Add(new PerfdataValue("graphitewriter_" + graphitewriter->GetName() + "_work_queue_items", workQueueItems));
71                 perfdata->Add(new PerfdataValue("graphitewriter_" + graphitewriter->GetName() + "_work_queue_item_rate", workQueueItemRate));
72         }
73
74         status->Set("graphitewriter", new Dictionary(std::move(nodes)));
75 }
76
77 /**
78  * Resume is equivalent to Start, but with HA capabilities to resume at runtime.
79  */
80 void GraphiteWriter::Resume()
81 {
82         ObjectImpl<GraphiteWriter>::Resume();
83
84         Log(LogInformation, "GraphiteWriter")
85                 << "'" << GetName() << "' resumed.";
86
87         /* Register exception handler for WQ tasks. */
88         m_WorkQueue.SetExceptionCallback(std::bind(&GraphiteWriter::ExceptionHandler, this, _1));
89
90         /* Timer for reconnecting */
91         m_ReconnectTimer = new Timer();
92         m_ReconnectTimer->SetInterval(10);
93         m_ReconnectTimer->OnTimerExpired.connect(std::bind(&GraphiteWriter::ReconnectTimerHandler, this));
94         m_ReconnectTimer->Start();
95         m_ReconnectTimer->Reschedule(0);
96
97         /* Register event handlers. */
98         Checkable::OnNewCheckResult.connect(std::bind(&GraphiteWriter::CheckResultHandler, this, _1, _2));
99 }
100
101 /**
102  * Pause is equivalent to Stop, but with HA capabilities to resume at runtime.
103  */
104 void GraphiteWriter::Pause()
105 {
106         m_ReconnectTimer.reset();
107
108         try {
109                 ReconnectInternal();
110         } catch (const std::exception&) {
111                 Log(LogInformation, "GraphiteWriter")
112                         << "'" << GetName() << "' paused. Unable to connect, not flushing buffers. Data may be lost on reload.";
113
114                 ObjectImpl<GraphiteWriter>::Pause();
115                 return;
116         }
117
118         m_WorkQueue.Join();
119         DisconnectInternal();
120
121         Log(LogInformation, "GraphiteWriter")
122                 << "'" << GetName() << "' paused.";
123
124         ObjectImpl<GraphiteWriter>::Pause();
125 }
126
127 /**
128  * Check if method is called inside the WQ thread.
129  */
130 void GraphiteWriter::AssertOnWorkQueue()
131 {
132         ASSERT(m_WorkQueue.IsWorkerThread());
133 }
134
135 /**
136  * Exception handler for the WQ.
137  *
138  * Closes the connection if connected.
139  *
140  * @param exp Exception pointer
141  */
142 void GraphiteWriter::ExceptionHandler(boost::exception_ptr exp)
143 {
144         Log(LogCritical, "GraphiteWriter", "Exception during Graphite operation: Verify that your backend is operational!");
145
146         Log(LogDebug, "GraphiteWriter")
147                 << "Exception during Graphite operation: " << DiagnosticInformation(std::move(exp));
148
149         if (GetConnected()) {
150                 m_Stream->close();
151
152                 SetConnected(false);
153         }
154 }
155
156 /**
157  * Reconnect method, stops when the feature is paused in HA zones.
158  *
159  * Called inside the WQ.
160  */
161 void GraphiteWriter::Reconnect()
162 {
163         AssertOnWorkQueue();
164
165         if (IsPaused()) {
166                 SetConnected(false);
167                 return;
168         }
169
170         ReconnectInternal();
171 }
172
173 /**
174  * Reconnect method, connects to a TCP Stream
175  */
176 void GraphiteWriter::ReconnectInternal()
177 {
178         double startTime = Utility::GetTime();
179
180         CONTEXT("Reconnecting to Graphite '" + GetName() + "'");
181
182         SetShouldConnect(true);
183
184         if (GetConnected())
185                 return;
186
187         Log(LogNotice, "GraphiteWriter")
188                 << "Reconnecting to Graphite on host '" << GetHost() << "' port '" << GetPort() << "'.";
189
190         m_Stream = std::make_shared<AsioTcpStream>(IoEngine::Get().GetIoContext());
191
192         try {
193                 icinga::Connect(m_Stream->lowest_layer(), GetHost(), GetPort());
194         } catch (const std::exception& ex) {
195                 Log(LogWarning, "GraphiteWriter")
196                         << "Can't connect to Graphite on host '" << GetHost() << "' port '" << GetPort() << ".'";
197         }
198
199         SetConnected(true);
200
201         Log(LogInformation, "GraphiteWriter")
202                 << "Finished reconnecting to Graphite in " << std::setw(2) << Utility::GetTime() - startTime << " second(s).";
203 }
204
205 /**
206  * Reconnect handler called by the timer.
207  *
208  * Enqueues a reconnect task into the WQ.
209  */
210 void GraphiteWriter::ReconnectTimerHandler()
211 {
212         if (IsPaused())
213                 return;
214
215         m_WorkQueue.Enqueue(std::bind(&GraphiteWriter::Reconnect, this), PriorityHigh);
216 }
217
218 /**
219  * Disconnect the stream.
220  *
221  * Called inside the WQ.
222  */
223 void GraphiteWriter::Disconnect()
224 {
225         AssertOnWorkQueue();
226
227         DisconnectInternal();
228 }
229
230 /**
231  * Disconnect the stream.
232  *
233  * Called outside the WQ.
234  */
235 void GraphiteWriter::DisconnectInternal()
236 {
237         if (!GetConnected())
238                 return;
239
240         m_Stream->close();
241
242         SetConnected(false);
243 }
244
245 /**
246  * Check result event handler, checks whether feature is not paused in HA setups.
247  *
248  * @param checkable Host/Service object
249  * @param cr Check result including performance data
250  */
251 void GraphiteWriter::CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
252 {
253         if (IsPaused())
254                 return;
255
256         m_WorkQueue.Enqueue(std::bind(&GraphiteWriter::CheckResultHandlerInternal, this, checkable, cr));
257 }
258
259 /**
260  * Check result event handler, prepares metadata and perfdata values and calls Send*()
261  *
262  * Called inside the WQ.
263  *
264  * @param checkable Host/Service object
265  * @param cr Check result including performance data
266  */
267 void GraphiteWriter::CheckResultHandlerInternal(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
268 {
269         AssertOnWorkQueue();
270
271         CONTEXT("Processing check result for '" + checkable->GetName() + "'");
272
273         /* TODO: Deal with missing connection here. Needs refactoring
274          * into parsing the actual performance data and then putting it
275          * into a queue for re-inserting. */
276
277         if (!IcingaApplication::GetInstance()->GetEnablePerfdata() || !checkable->GetEnablePerfdata())
278                 return;
279
280         Host::Ptr host;
281         Service::Ptr service;
282         tie(host, service) = GetHostService(checkable);
283
284         MacroProcessor::ResolverList resolvers;
285         if (service)
286                 resolvers.emplace_back("service", service);
287         resolvers.emplace_back("host", host);
288         resolvers.emplace_back("icinga", IcingaApplication::GetInstance());
289
290         String prefix;
291
292         if (service) {
293                 prefix = MacroProcessor::ResolveMacros(GetServiceNameTemplate(), resolvers, cr, nullptr, std::bind(&GraphiteWriter::EscapeMacroMetric, _1));
294         } else {
295                 prefix = MacroProcessor::ResolveMacros(GetHostNameTemplate(), resolvers, cr, nullptr, std::bind(&GraphiteWriter::EscapeMacroMetric, _1));
296         }
297
298         String prefixPerfdata = prefix + ".perfdata";
299         String prefixMetadata = prefix + ".metadata";
300
301         double ts = cr->GetExecutionEnd();
302
303         if (GetEnableSendMetadata()) {
304                 if (service) {
305                         SendMetric(checkable, prefixMetadata, "state", service->GetState(), ts);
306                 } else {
307                         SendMetric(checkable, prefixMetadata, "state", host->GetState(), ts);
308                 }
309
310                 SendMetric(checkable, prefixMetadata, "current_attempt", checkable->GetCheckAttempt(), ts);
311                 SendMetric(checkable, prefixMetadata, "max_check_attempts", checkable->GetMaxCheckAttempts(), ts);
312                 SendMetric(checkable, prefixMetadata, "state_type", checkable->GetStateType(), ts);
313                 SendMetric(checkable, prefixMetadata, "reachable", checkable->IsReachable(), ts);
314                 SendMetric(checkable, prefixMetadata, "downtime_depth", checkable->GetDowntimeDepth(), ts);
315                 SendMetric(checkable, prefixMetadata, "acknowledgement", checkable->GetAcknowledgement(), ts);
316                 SendMetric(checkable, prefixMetadata, "latency", cr->CalculateLatency(), ts);
317                 SendMetric(checkable, prefixMetadata, "execution_time", cr->CalculateExecutionTime(), ts);
318         }
319
320         SendPerfdata(checkable, prefixPerfdata, cr, ts);
321 }
322
323 /**
324  * Parse performance data from check result and call SendMetric()
325  *
326  * @param checkable Host/service object
327  * @param prefix Metric prefix string
328  * @param cr Check result including performance data
329  * @param ts Timestamp when the check result was created
330  */
331 void GraphiteWriter::SendPerfdata(const Checkable::Ptr& checkable, const String& prefix, const CheckResult::Ptr& cr, double ts)
332 {
333         Array::Ptr perfdata = cr->GetPerformanceData();
334
335         if (!perfdata)
336                 return;
337
338         CheckCommand::Ptr checkCommand = checkable->GetCheckCommand();
339
340         ObjectLock olock(perfdata);
341         for (const Value& val : perfdata) {
342                 PerfdataValue::Ptr pdv;
343
344                 if (val.IsObjectType<PerfdataValue>())
345                         pdv = val;
346                 else {
347                         try {
348                                 pdv = PerfdataValue::Parse(val);
349                         } catch (const std::exception&) {
350                                 Log(LogWarning, "GraphiteWriter")
351                                         << "Ignoring invalid perfdata for checkable '"
352                                         << checkable->GetName() << "' and command '"
353                                         << checkCommand->GetName() << "' with value: " << val;
354                                 continue;
355                         }
356                 }
357
358                 String escapedKey = EscapeMetricLabel(pdv->GetLabel());
359
360                 SendMetric(checkable, prefix, escapedKey + ".value", pdv->GetValue(), ts);
361
362                 if (GetEnableSendThresholds()) {
363                         if (pdv->GetCrit())
364                                 SendMetric(checkable, prefix, escapedKey + ".crit", pdv->GetCrit(), ts);
365                         if (pdv->GetWarn())
366                                 SendMetric(checkable, prefix, escapedKey + ".warn", pdv->GetWarn(), ts);
367                         if (pdv->GetMin())
368                                 SendMetric(checkable, prefix, escapedKey + ".min", pdv->GetMin(), ts);
369                         if (pdv->GetMax())
370                                 SendMetric(checkable, prefix, escapedKey + ".max", pdv->GetMax(), ts);
371                 }
372         }
373 }
374
375 /**
376  * Computes metric data and sends to Graphite
377  *
378  * @param checkable Host/service object
379  * @param prefix Computed metric prefix string
380  * @param name Metric name
381  * @param value Metric value
382  * @param ts Timestamp when the check result was created
383  */
384 void GraphiteWriter::SendMetric(const Checkable::Ptr& checkable, const String& prefix, const String& name, double value, double ts)
385 {
386         namespace asio = boost::asio;
387
388         std::ostringstream msgbuf;
389         msgbuf << prefix << "." << name << " " << Convert::ToString(value) << " " << static_cast<long>(ts);
390
391         Log(LogDebug, "GraphiteWriter")
392                 << "Checkable '" << checkable->GetName() << "' adds to metric list: '" << msgbuf.str() << "'.";
393
394         // do not send \n to debug log
395         msgbuf << "\n";
396
397         boost::mutex::scoped_lock lock(m_StreamMutex);
398
399         if (!GetConnected())
400                 return;
401
402         try {
403                 asio::write(*m_Stream, asio::buffer(msgbuf.str()));
404                 m_Stream->flush();
405         } catch (const std::exception& ex) {
406                 Log(LogCritical, "GraphiteWriter")
407                         << "Cannot write to TCP socket on host '" << GetHost() << "' port '" << GetPort() << "'.";
408
409                 throw ex;
410         }
411 }
412
413 /**
414  * Escape metric tree elements
415  *
416  * Dots are not allowed, e.g. in host names
417  *
418  * @param str Metric part name
419  * @return Escape string
420  */
421 String GraphiteWriter::EscapeMetric(const String& str)
422 {
423         String result = str;
424
425         //don't allow '.' in metric prefixes
426         boost::replace_all(result, " ", "_");
427         boost::replace_all(result, ".", "_");
428         boost::replace_all(result, "\\", "_");
429         boost::replace_all(result, "/", "_");
430
431         return result;
432 }
433
434 /**
435  * Escape metric label
436  *
437  * Dots are allowed - users can create trees from perfdata labels
438  *
439  * @param str Metric label name
440  * @return Escaped string
441  */
442 String GraphiteWriter::EscapeMetricLabel(const String& str)
443 {
444         String result = str;
445
446         //allow to pass '.' in perfdata labels
447         boost::replace_all(result, " ", "_");
448         boost::replace_all(result, "\\", "_");
449         boost::replace_all(result, "/", "_");
450         boost::replace_all(result, "::", ".");
451
452         return result;
453 }
454
455 /**
456  * Escape macro metrics found via host/service name templates
457  *
458  * @param value Array or string with macro metric names
459  * @return Escaped string. Arrays are joined with dots.
460  */
461 Value GraphiteWriter::EscapeMacroMetric(const Value& value)
462 {
463         if (value.IsObjectType<Array>()) {
464                 Array::Ptr arr = value;
465                 ArrayData result;
466
467                 ObjectLock olock(arr);
468                 for (const Value& arg : arr) {
469                         result.push_back(EscapeMetric(arg));
470                 }
471
472                 return Utility::Join(new Array(std::move(result)), '.');
473         } else
474                 return EscapeMetric(value);
475 }
476
477 /**
478  * Validate the configuration setting 'host_name_template'
479  *
480  * @param lvalue String containing runtime macros.
481  * @param utils Helper, unused
482  */
483 void GraphiteWriter::ValidateHostNameTemplate(const Lazy<String>& lvalue, const ValidationUtils& utils)
484 {
485         ObjectImpl<GraphiteWriter>::ValidateHostNameTemplate(lvalue, utils);
486
487         if (!MacroProcessor::ValidateMacroString(lvalue()))
488                 BOOST_THROW_EXCEPTION(ValidationError(this, { "host_name_template" }, "Closing $ not found in macro format string '" + lvalue() + "'."));
489 }
490
491 /**
492  * Validate the configuration setting 'service_name_template'
493  *
494  * @param lvalue String containing runtime macros.
495  * @param utils Helper, unused
496  */
497 void GraphiteWriter::ValidateServiceNameTemplate(const Lazy<String>& lvalue, const ValidationUtils& utils)
498 {
499         ObjectImpl<GraphiteWriter>::ValidateServiceNameTemplate(lvalue, utils);
500
501         if (!MacroProcessor::ValidateMacroString(lvalue()))
502                 BOOST_THROW_EXCEPTION(ValidationError(this, { "service_name_template" }, "Closing $ not found in macro format string '" + lvalue() + "'."));
503 }