]> granicus.if.org Git - icinga2/blob - lib/perfdata/graphitewriter.cpp
Merge pull request #7124 from Icinga/bugfix/namespace-thread-safe
[icinga2] / lib / perfdata / graphitewriter.cpp
1 /* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */
2
3 #include "perfdata/graphitewriter.hpp"
4 #include "perfdata/graphitewriter-ti.cpp"
5 #include "icinga/service.hpp"
6 #include "icinga/checkcommand.hpp"
7 #include "icinga/macroprocessor.hpp"
8 #include "icinga/icingaapplication.hpp"
9 #include "base/tcpsocket.hpp"
10 #include "base/configtype.hpp"
11 #include "base/objectlock.hpp"
12 #include "base/logger.hpp"
13 #include "base/convert.hpp"
14 #include "base/utility.hpp"
15 #include "base/perfdatavalue.hpp"
16 #include "base/application.hpp"
17 #include "base/stream.hpp"
18 #include "base/networkstream.hpp"
19 #include "base/exception.hpp"
20 #include "base/statsfunction.hpp"
21 #include <boost/algorithm/string.hpp>
22 #include <boost/algorithm/string/replace.hpp>
23 #include <utility>
24
25 using namespace icinga;
26
27 REGISTER_TYPE(GraphiteWriter);
28
29 REGISTER_STATSFUNCTION(GraphiteWriter, &GraphiteWriter::StatsFunc);
30
31 void GraphiteWriter::OnConfigLoaded()
32 {
33         ObjectImpl<GraphiteWriter>::OnConfigLoaded();
34
35         m_WorkQueue.SetName("GraphiteWriter, " + GetName());
36
37         if (!GetEnableHa()) {
38                 Log(LogDebug, "GraphiteWriter")
39                         << "HA functionality disabled. Won't pause connection: " << GetName();
40
41                 SetHAMode(HARunEverywhere);
42         } else {
43                 SetHAMode(HARunOnce);
44         }
45 }
46
47 void GraphiteWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata)
48 {
49         DictionaryData nodes;
50
51         for (const GraphiteWriter::Ptr& graphitewriter : ConfigType::GetObjectsByType<GraphiteWriter>()) {
52                 size_t workQueueItems = graphitewriter->m_WorkQueue.GetLength();
53                 double workQueueItemRate = graphitewriter->m_WorkQueue.GetTaskCount(60) / 60.0;
54
55                 nodes.emplace_back(graphitewriter->GetName(), new Dictionary({
56                         { "work_queue_items", workQueueItems },
57                         { "work_queue_item_rate", workQueueItemRate },
58                         { "connected", graphitewriter->GetConnected() }
59                 }));
60
61                 perfdata->Add(new PerfdataValue("graphitewriter_" + graphitewriter->GetName() + "_work_queue_items", workQueueItems));
62                 perfdata->Add(new PerfdataValue("graphitewriter_" + graphitewriter->GetName() + "_work_queue_item_rate", workQueueItemRate));
63         }
64
65         status->Set("graphitewriter", new Dictionary(std::move(nodes)));
66 }
67
68 void GraphiteWriter::Resume()
69 {
70         ObjectImpl<GraphiteWriter>::Resume();
71
72         Log(LogInformation, "GraphiteWriter")
73                 << "'" << GetName() << "' resumed.";
74
75         /* Register exception handler for WQ tasks. */
76         m_WorkQueue.SetExceptionCallback(std::bind(&GraphiteWriter::ExceptionHandler, this, _1));
77
78         /* Timer for reconnecting */
79         m_ReconnectTimer = new Timer();
80         m_ReconnectTimer->SetInterval(10);
81         m_ReconnectTimer->OnTimerExpired.connect(std::bind(&GraphiteWriter::ReconnectTimerHandler, this));
82         m_ReconnectTimer->Start();
83         m_ReconnectTimer->Reschedule(0);
84
85         /* Register event handlers. */
86         Checkable::OnNewCheckResult.connect(std::bind(&GraphiteWriter::CheckResultHandler, this, _1, _2));
87 }
88
89 /* Pause is equivalent to Stop, but with HA capabilities to resume at runtime. */
90 void GraphiteWriter::Pause()
91 {
92         m_ReconnectTimer.reset();
93
94         try {
95                 ReconnectInternal();
96         } catch (const std::exception&) {
97                 Log(LogInformation, "GraphiteWriter")
98                         << "'" << GetName() << "' paused. Unable to connect, not flushing buffers. Data may be lost on reload.";
99
100                 ObjectImpl<GraphiteWriter>::Pause();
101                 return;
102         }
103
104         m_WorkQueue.Join();
105         DisconnectInternal();
106
107         Log(LogInformation, "GraphiteWriter")
108                 << "'" << GetName() << "' paused.";
109
110         ObjectImpl<GraphiteWriter>::Pause();
111 }
112
113 void GraphiteWriter::AssertOnWorkQueue()
114 {
115         ASSERT(m_WorkQueue.IsWorkerThread());
116 }
117
118 void GraphiteWriter::ExceptionHandler(boost::exception_ptr exp)
119 {
120         Log(LogCritical, "GraphiteWriter", "Exception during Graphite operation: Verify that your backend is operational!");
121
122         Log(LogDebug, "GraphiteWriter")
123                 << "Exception during Graphite operation: " << DiagnosticInformation(std::move(exp));
124
125         if (GetConnected()) {
126                 m_Stream->Close();
127
128                 SetConnected(false);
129         }
130 }
131
132 void GraphiteWriter::Reconnect()
133 {
134         AssertOnWorkQueue();
135
136         if (IsPaused()) {
137                 SetConnected(false);
138                 return;
139         }
140
141         ReconnectInternal();
142 }
143
144 void GraphiteWriter::ReconnectInternal()
145 {
146         double startTime = Utility::GetTime();
147
148         CONTEXT("Reconnecting to Graphite '" + GetName() + "'");
149
150         SetShouldConnect(true);
151
152         if (GetConnected())
153                 return;
154
155         TcpSocket::Ptr socket = new TcpSocket();
156
157         Log(LogNotice, "GraphiteWriter")
158                 << "Reconnecting to Graphite on host '" << GetHost() << "' port '" << GetPort() << "'.";
159
160         try {
161                 socket->Connect(GetHost(), GetPort());
162         } catch (const std::exception& ex) {
163                 Log(LogCritical, "GraphiteWriter")
164                         << "Can't connect to Graphite on host '" << GetHost() << "' port '" << GetPort() << "'.";
165                 throw ex;
166         }
167
168         m_Stream = new NetworkStream(socket);
169
170         SetConnected(true);
171
172         Log(LogInformation, "GraphiteWriter")
173                 << "Finished reconnecting to Graphite in " << std::setw(2) << Utility::GetTime() - startTime << " second(s).";
174 }
175
176 void GraphiteWriter::ReconnectTimerHandler()
177 {
178         if (IsPaused())
179                 return;
180
181         m_WorkQueue.Enqueue(std::bind(&GraphiteWriter::Reconnect, this), PriorityNormal);
182 }
183
184 void GraphiteWriter::Disconnect()
185 {
186         AssertOnWorkQueue();
187
188         DisconnectInternal();
189 }
190
191 void GraphiteWriter::DisconnectInternal()
192 {
193         if (!GetConnected())
194                 return;
195
196         m_Stream->Close();
197
198         SetConnected(false);
199 }
200
201 void GraphiteWriter::CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
202 {
203         if (IsPaused())
204                 return;
205
206         m_WorkQueue.Enqueue(std::bind(&GraphiteWriter::CheckResultHandlerInternal, this, checkable, cr));
207 }
208
209 void GraphiteWriter::CheckResultHandlerInternal(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
210 {
211         AssertOnWorkQueue();
212
213         CONTEXT("Processing check result for '" + checkable->GetName() + "'");
214
215         /* TODO: Deal with missing connection here. Needs refactoring
216          * into parsing the actual performance data and then putting it
217          * into a queue for re-inserting. */
218
219         if (!IcingaApplication::GetInstance()->GetEnablePerfdata() || !checkable->GetEnablePerfdata())
220                 return;
221
222         Host::Ptr host;
223         Service::Ptr service;
224         tie(host, service) = GetHostService(checkable);
225
226         MacroProcessor::ResolverList resolvers;
227         if (service)
228                 resolvers.emplace_back("service", service);
229         resolvers.emplace_back("host", host);
230         resolvers.emplace_back("icinga", IcingaApplication::GetInstance());
231
232         String prefix;
233
234         if (service) {
235                 prefix = MacroProcessor::ResolveMacros(GetServiceNameTemplate(), resolvers, cr, nullptr, std::bind(&GraphiteWriter::EscapeMacroMetric, _1));
236         } else {
237                 prefix = MacroProcessor::ResolveMacros(GetHostNameTemplate(), resolvers, cr, nullptr, std::bind(&GraphiteWriter::EscapeMacroMetric, _1));
238         }
239
240         String prefixPerfdata = prefix + ".perfdata";
241         String prefixMetadata = prefix + ".metadata";
242
243         double ts = cr->GetExecutionEnd();
244
245         if (GetEnableSendMetadata()) {
246                 if (service) {
247                         SendMetric(checkable, prefixMetadata, "state", service->GetState(), ts);
248                 } else {
249                         SendMetric(checkable, prefixMetadata, "state", host->GetState(), ts);
250                 }
251
252                 SendMetric(checkable, prefixMetadata, "current_attempt", checkable->GetCheckAttempt(), ts);
253                 SendMetric(checkable, prefixMetadata, "max_check_attempts", checkable->GetMaxCheckAttempts(), ts);
254                 SendMetric(checkable, prefixMetadata, "state_type", checkable->GetStateType(), ts);
255                 SendMetric(checkable, prefixMetadata, "reachable", checkable->IsReachable(), ts);
256                 SendMetric(checkable, prefixMetadata, "downtime_depth", checkable->GetDowntimeDepth(), ts);
257                 SendMetric(checkable, prefixMetadata, "acknowledgement", checkable->GetAcknowledgement(), ts);
258                 SendMetric(checkable, prefixMetadata, "latency", cr->CalculateLatency(), ts);
259                 SendMetric(checkable, prefixMetadata, "execution_time", cr->CalculateExecutionTime(), ts);
260         }
261
262         SendPerfdata(checkable, prefixPerfdata, cr, ts);
263 }
264
265 void GraphiteWriter::SendPerfdata(const Checkable::Ptr& checkable, const String& prefix, const CheckResult::Ptr& cr, double ts)
266 {
267         Array::Ptr perfdata = cr->GetPerformanceData();
268
269         if (!perfdata)
270                 return;
271
272         CheckCommand::Ptr checkCommand = checkable->GetCheckCommand();
273
274         ObjectLock olock(perfdata);
275         for (const Value& val : perfdata) {
276                 PerfdataValue::Ptr pdv;
277
278                 if (val.IsObjectType<PerfdataValue>())
279                         pdv = val;
280                 else {
281                         try {
282                                 pdv = PerfdataValue::Parse(val);
283                         } catch (const std::exception&) {
284                                 Log(LogWarning, "GraphiteWriter")
285                                         << "Ignoring invalid perfdata for checkable '"
286                                         << checkable->GetName() << "' and command '"
287                                         << checkCommand->GetName() << "' with value: " << val;
288                                 continue;
289                         }
290                 }
291
292                 String escapedKey = EscapeMetricLabel(pdv->GetLabel());
293
294                 SendMetric(checkable, prefix, escapedKey + ".value", pdv->GetValue(), ts);
295
296                 if (GetEnableSendThresholds()) {
297                         if (pdv->GetCrit())
298                                 SendMetric(checkable, prefix, escapedKey + ".crit", pdv->GetCrit(), ts);
299                         if (pdv->GetWarn())
300                                 SendMetric(checkable, prefix, escapedKey + ".warn", pdv->GetWarn(), ts);
301                         if (pdv->GetMin())
302                                 SendMetric(checkable, prefix, escapedKey + ".min", pdv->GetMin(), ts);
303                         if (pdv->GetMax())
304                                 SendMetric(checkable, prefix, escapedKey + ".max", pdv->GetMax(), ts);
305                 }
306         }
307 }
308
309 void GraphiteWriter::SendMetric(const Checkable::Ptr& checkable, const String& prefix, const String& name, double value, double ts)
310 {
311         std::ostringstream msgbuf;
312         msgbuf << prefix << "." << name << " " << Convert::ToString(value) << " " << static_cast<long>(ts);
313
314         Log(LogDebug, "GraphiteWriter")
315                 << "Checkable '" << checkable->GetName() << "' adds to metric list: '" << msgbuf.str() << "'.";
316
317         // do not send \n to debug log
318         msgbuf << "\n";
319         String metric = msgbuf.str();
320
321         boost::mutex::scoped_lock lock(m_StreamMutex);
322
323         if (!GetConnected())
324                 return;
325
326         try {
327                 m_Stream->Write(metric.CStr(), metric.GetLength());
328         } catch (const std::exception& ex) {
329                 Log(LogCritical, "GraphiteWriter")
330                         << "Cannot write to TCP socket on host '" << GetHost() << "' port '" << GetPort() << "'.";
331
332                 throw ex;
333         }
334 }
335
336 String GraphiteWriter::EscapeMetric(const String& str)
337 {
338         String result = str;
339
340         //don't allow '.' in metric prefixes
341         boost::replace_all(result, " ", "_");
342         boost::replace_all(result, ".", "_");
343         boost::replace_all(result, "\\", "_");
344         boost::replace_all(result, "/", "_");
345
346         return result;
347 }
348
349 String GraphiteWriter::EscapeMetricLabel(const String& str)
350 {
351         String result = str;
352
353         //allow to pass '.' in perfdata labels
354         boost::replace_all(result, " ", "_");
355         boost::replace_all(result, "\\", "_");
356         boost::replace_all(result, "/", "_");
357         boost::replace_all(result, "::", ".");
358
359         return result;
360 }
361
362 Value GraphiteWriter::EscapeMacroMetric(const Value& value)
363 {
364         if (value.IsObjectType<Array>()) {
365                 Array::Ptr arr = value;
366                 ArrayData result;
367
368                 ObjectLock olock(arr);
369                 for (const Value& arg : arr) {
370                         result.push_back(EscapeMetric(arg));
371                 }
372
373                 return Utility::Join(new Array(std::move(result)), '.');
374         } else
375                 return EscapeMetric(value);
376 }
377
378 void GraphiteWriter::ValidateHostNameTemplate(const Lazy<String>& lvalue, const ValidationUtils& utils)
379 {
380         ObjectImpl<GraphiteWriter>::ValidateHostNameTemplate(lvalue, utils);
381
382         if (!MacroProcessor::ValidateMacroString(lvalue()))
383                 BOOST_THROW_EXCEPTION(ValidationError(this, { "host_name_template" }, "Closing $ not found in macro format string '" + lvalue() + "'."));
384 }
385
386 void GraphiteWriter::ValidateServiceNameTemplate(const Lazy<String>& lvalue, const ValidationUtils& utils)
387 {
388         ObjectImpl<GraphiteWriter>::ValidateServiceNameTemplate(lvalue, utils);
389
390         if (!MacroProcessor::ValidateMacroString(lvalue()))
391                 BOOST_THROW_EXCEPTION(ValidationError(this, { "service_name_template" }, "Closing $ not found in macro format string '" + lvalue() + "'."));
392 }