]> granicus.if.org Git - icinga2/blob - lib/perfdata/gelfwriter.cpp
Merge pull request #7124 from Icinga/bugfix/namespace-thread-safe
[icinga2] / lib / perfdata / gelfwriter.cpp
1 /* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */
2
3 #include "perfdata/gelfwriter.hpp"
4 #include "perfdata/gelfwriter-ti.cpp"
5 #include "icinga/service.hpp"
6 #include "icinga/notification.hpp"
7 #include "icinga/checkcommand.hpp"
8 #include "icinga/macroprocessor.hpp"
9 #include "icinga/compatutility.hpp"
10 #include "base/tcpsocket.hpp"
11 #include "base/configtype.hpp"
12 #include "base/objectlock.hpp"
13 #include "base/logger.hpp"
14 #include "base/utility.hpp"
15 #include "base/perfdatavalue.hpp"
16 #include "base/application.hpp"
17 #include "base/stream.hpp"
18 #include "base/networkstream.hpp"
19 #include "base/context.hpp"
20 #include "base/exception.hpp"
21 #include "base/json.hpp"
22 #include "base/statsfunction.hpp"
23 #include <boost/algorithm/string/replace.hpp>
24 #include <utility>
25
26 using namespace icinga;
27
28 REGISTER_TYPE(GelfWriter);
29
30 REGISTER_STATSFUNCTION(GelfWriter, &GelfWriter::StatsFunc);
31
32 void GelfWriter::OnConfigLoaded()
33 {
34         ObjectImpl<GelfWriter>::OnConfigLoaded();
35
36         m_WorkQueue.SetName("GelfWriter, " + GetName());
37
38         if (!GetEnableHa()) {
39                 Log(LogDebug, "GelfWriter")
40                         << "HA functionality disabled. Won't pause connection: " << GetName();
41
42                 SetHAMode(HARunEverywhere);
43         } else {
44                 SetHAMode(HARunOnce);
45         }
46 }
47
48 void GelfWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata)
49 {
50         DictionaryData nodes;
51
52         for (const GelfWriter::Ptr& gelfwriter : ConfigType::GetObjectsByType<GelfWriter>()) {
53                 size_t workQueueItems = gelfwriter->m_WorkQueue.GetLength();
54                 double workQueueItemRate = gelfwriter->m_WorkQueue.GetTaskCount(60) / 60.0;
55
56                 nodes.emplace_back(gelfwriter->GetName(), new Dictionary({
57                         { "work_queue_items", workQueueItems },
58                         { "work_queue_item_rate", workQueueItemRate },
59                         { "connected", gelfwriter->GetConnected() },
60                         { "source", gelfwriter->GetSource() }
61                 }));
62
63                 perfdata->Add(new PerfdataValue("gelfwriter_" + gelfwriter->GetName() + "_work_queue_items", workQueueItems));
64                 perfdata->Add(new PerfdataValue("gelfwriter_" + gelfwriter->GetName() + "_work_queue_item_rate", workQueueItemRate));
65         }
66
67         status->Set("gelfwriter", new Dictionary(std::move(nodes)));
68 }
69
70 void GelfWriter::Resume()
71 {
72         ObjectImpl<GelfWriter>::Resume();
73
74         Log(LogInformation, "GelfWriter")
75                 << "'" << GetName() << "' resumed.";
76
77         /* Register exception handler for WQ tasks. */
78         m_WorkQueue.SetExceptionCallback(std::bind(&GelfWriter::ExceptionHandler, this, _1));
79
80         /* Timer for reconnecting */
81         m_ReconnectTimer = new Timer();
82         m_ReconnectTimer->SetInterval(10);
83         m_ReconnectTimer->OnTimerExpired.connect(std::bind(&GelfWriter::ReconnectTimerHandler, this));
84         m_ReconnectTimer->Start();
85         m_ReconnectTimer->Reschedule(0);
86
87         /* Register event handlers. */
88         Checkable::OnNewCheckResult.connect(std::bind(&GelfWriter::CheckResultHandler, this, _1, _2));
89         Checkable::OnNotificationSentToUser.connect(std::bind(&GelfWriter::NotificationToUserHandler, this, _1, _2, _3, _4, _5, _6, _7, _8, _9));
90         Checkable::OnStateChange.connect(std::bind(&GelfWriter::StateChangeHandler, this, _1, _2, _3));
91 }
92
93 /* Pause is equivalent to Stop, but with HA capabilities to resume at runtime. */
94 void GelfWriter::Pause()
95 {
96         m_ReconnectTimer.reset();
97
98         try {
99                 ReconnectInternal();
100         } catch (const std::exception&) {
101                 Log(LogInformation, "GelfWriter")
102                         << "'" << GetName() << "' paused. Unable to connect, not flushing buffers. Data may be lost on reload.";
103
104                 ObjectImpl<GelfWriter>::Pause();
105                 return;
106         }
107
108         m_WorkQueue.Join();
109         DisconnectInternal();
110
111         Log(LogInformation, "GraphiteWriter")
112                 << "'" << GetName() << "' paused.";
113
114         ObjectImpl<GelfWriter>::Pause();
115 }
116
117 void GelfWriter::AssertOnWorkQueue()
118 {
119         ASSERT(m_WorkQueue.IsWorkerThread());
120 }
121
122 void GelfWriter::ExceptionHandler(boost::exception_ptr exp)
123 {
124         Log(LogCritical, "GelfWriter", "Exception during Graylog Gelf operation: Verify that your backend is operational!");
125
126         Log(LogDebug, "GelfWriter")
127                 << "Exception during Graylog Gelf operation: " << DiagnosticInformation(std::move(exp));
128
129         if (GetConnected()) {
130                 m_Stream->Close();
131
132                 SetConnected(false);
133         }
134 }
135
136 void GelfWriter::Reconnect()
137 {
138         AssertOnWorkQueue();
139
140         if (IsPaused()) {
141                 SetConnected(false);
142                 return;
143         }
144
145         ReconnectInternal();
146 }
147
148 void GelfWriter::ReconnectInternal()
149 {
150         double startTime = Utility::GetTime();
151
152         CONTEXT("Reconnecting to Graylog Gelf '" + GetName() + "'");
153
154         SetShouldConnect(true);
155
156         if (GetConnected())
157                 return;
158
159         TcpSocket::Ptr socket = new TcpSocket();
160
161         Log(LogNotice, "GelfWriter")
162                 << "Reconnecting to Graylog Gelf on host '" << GetHost() << "' port '" << GetPort() << "'.";
163
164         try {
165                 socket->Connect(GetHost(), GetPort());
166         } catch (const std::exception& ex) {
167                 Log(LogCritical, "GelfWriter")
168                         << "Can't connect to Graylog Gelf on host '" << GetHost() << "' port '" << GetPort() << "'.";
169                 throw ex;
170         }
171
172         m_Stream = new NetworkStream(socket);
173
174         SetConnected(true);
175
176         Log(LogInformation, "GelfWriter")
177                 << "Finished reconnecting to Graylog Gelf in " << std::setw(2) << Utility::GetTime() - startTime << " second(s).";
178 }
179
180 void GelfWriter::ReconnectTimerHandler()
181 {
182         m_WorkQueue.Enqueue(std::bind(&GelfWriter::Reconnect, this), PriorityNormal);
183 }
184
185 void GelfWriter::Disconnect()
186 {
187         AssertOnWorkQueue();
188
189         DisconnectInternal();
190 }
191
192 void GelfWriter::DisconnectInternal()
193 {
194         if (!GetConnected())
195                 return;
196
197         m_Stream->Close();
198
199         SetConnected(false);
200 }
201
202 void GelfWriter::CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
203 {
204         if (IsPaused())
205                 return;
206
207         m_WorkQueue.Enqueue(std::bind(&GelfWriter::CheckResultHandlerInternal, this, checkable, cr));
208 }
209
210 void GelfWriter::CheckResultHandlerInternal(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
211 {
212         AssertOnWorkQueue();
213
214         CONTEXT("GELF Processing check result for '" + checkable->GetName() + "'");
215
216         Log(LogDebug, "GelfWriter")
217                 << "Processing check result for '" << checkable->GetName() << "'";
218
219         Host::Ptr host;
220         Service::Ptr service;
221         tie(host, service) = GetHostService(checkable);
222
223         Dictionary::Ptr fields = new Dictionary();
224
225         if (service) {
226                 fields->Set("_service_name", service->GetShortName());
227                 fields->Set("_service_state", Service::StateToString(service->GetState()));
228                 fields->Set("_last_state", service->GetLastState());
229                 fields->Set("_last_hard_state", service->GetLastHardState());
230         } else {
231                 fields->Set("_last_state", host->GetLastState());
232                 fields->Set("_last_hard_state", host->GetLastHardState());
233         }
234
235         fields->Set("_hostname", host->GetName());
236         fields->Set("_type", "CHECK RESULT");
237         fields->Set("_state", service ? Service::StateToString(service->GetState()) : Host::StateToString(host->GetState()));
238
239         fields->Set("_current_check_attempt", checkable->GetCheckAttempt());
240         fields->Set("_max_check_attempts", checkable->GetMaxCheckAttempts());
241
242         fields->Set("_reachable", checkable->IsReachable());
243
244         CheckCommand::Ptr checkCommand = checkable->GetCheckCommand();
245
246         if (checkCommand)
247                 fields->Set("_check_command", checkCommand->GetName());
248
249         double ts = Utility::GetTime();
250
251         if (cr) {
252                 fields->Set("_latency", cr->CalculateLatency());
253                 fields->Set("_execution_time", cr->CalculateExecutionTime());
254                 fields->Set("short_message", CompatUtility::GetCheckResultOutput(cr));
255                 fields->Set("full_message", cr->GetOutput());
256                 fields->Set("_check_source", cr->GetCheckSource());
257                 ts = cr->GetExecutionEnd();
258         }
259
260         if (cr && GetEnableSendPerfdata()) {
261                 Array::Ptr perfdata = cr->GetPerformanceData();
262
263                 if (perfdata) {
264                         ObjectLock olock(perfdata);
265                         for (const Value& val : perfdata) {
266                                 PerfdataValue::Ptr pdv;
267
268                                 if (val.IsObjectType<PerfdataValue>())
269                                         pdv = val;
270                                 else {
271                                         try {
272                                                 pdv = PerfdataValue::Parse(val);
273                                         } catch (const std::exception&) {
274                                                 Log(LogWarning, "GelfWriter")
275                                                         << "Ignoring invalid perfdata for checkable '"
276                                                         << checkable->GetName() << "' and command '"
277                                                         << checkCommand->GetName() << "' with value: " << val;
278                                                 continue;
279                                         }
280                                 }
281
282                                 String escaped_key = pdv->GetLabel();
283                                 boost::replace_all(escaped_key, " ", "_");
284                                 boost::replace_all(escaped_key, ".", "_");
285                                 boost::replace_all(escaped_key, "\\", "_");
286                                 boost::algorithm::replace_all(escaped_key, "::", ".");
287
288                                 fields->Set("_" + escaped_key, pdv->GetValue());
289
290                                 if (pdv->GetMin())
291                                         fields->Set("_" + escaped_key + "_min", pdv->GetMin());
292                                 if (pdv->GetMax())
293                                         fields->Set("_" + escaped_key + "_max", pdv->GetMax());
294                                 if (pdv->GetWarn())
295                                         fields->Set("_" + escaped_key + "_warn", pdv->GetWarn());
296                                 if (pdv->GetCrit())
297                                         fields->Set("_" + escaped_key + "_crit", pdv->GetCrit());
298
299                                 if (!pdv->GetUnit().IsEmpty())
300                                         fields->Set("_" + escaped_key + "_unit", pdv->GetUnit());
301                         }
302                 }
303         }
304
305         SendLogMessage(checkable, ComposeGelfMessage(fields, GetSource(), ts));
306 }
307
308 void GelfWriter::NotificationToUserHandler(const Notification::Ptr& notification, const Checkable::Ptr& checkable,
309         const User::Ptr& user, NotificationType notificationType, const CheckResult::Ptr& cr, const NotificationResult::Ptr& nr,
310         const String& author, const String& commentText, const String& commandName)
311 {
312         if (IsPaused())
313                 return;
314
315         m_WorkQueue.Enqueue(std::bind(&GelfWriter::NotificationToUserHandlerInternal, this,
316                 notification, checkable, user, notificationType, cr, nr, author, commentText, commandName));
317 }
318
319 void GelfWriter::NotificationToUserHandlerInternal(const Notification::Ptr& notification, const Checkable::Ptr& checkable,
320         const User::Ptr& user, NotificationType notificationType, const CheckResult::Ptr& cr, const NotificationResult::Ptr& nr,
321         const String& author, const String& commentText, const String& commandName)
322 {
323         AssertOnWorkQueue();
324
325         CONTEXT("GELF Processing notification to all users '" + checkable->GetName() + "'");
326
327         Log(LogDebug, "GelfWriter")
328                 << "Processing notification for '" << checkable->GetName() << "'";
329
330         Host::Ptr host;
331         Service::Ptr service;
332         tie(host, service) = GetHostService(checkable);
333
334         String notificationTypeString = Notification::NotificationTypeToString(notificationType);
335
336         String authorComment = "";
337
338         if (notificationType == NotificationCustom || notificationType == NotificationAcknowledgement) {
339                 authorComment = author + ";" + commentText;
340         }
341
342         String output;
343         double ts = Utility::GetTime();
344
345         if (cr) {
346                 output = CompatUtility::GetCheckResultOutput(cr);
347                 ts = cr->GetExecutionEnd();
348         }
349
350         Dictionary::Ptr fields = new Dictionary();
351
352         if (service) {
353                 fields->Set("_type", "SERVICE NOTIFICATION");
354                 //TODO: fix this to _service_name
355                 fields->Set("_service", service->GetShortName());
356                 fields->Set("short_message", output);
357         } else {
358                 fields->Set("_type", "HOST NOTIFICATION");
359                 fields->Set("short_message", output);
360         }
361
362         fields->Set("_state", service ? Service::StateToString(service->GetState()) : Host::StateToString(host->GetState()));
363
364         fields->Set("_hostname", host->GetName());
365         fields->Set("_command", commandName);
366         fields->Set("_notification_type", notificationTypeString);
367         fields->Set("_comment", authorComment);
368
369         CheckCommand::Ptr commandObj = checkable->GetCheckCommand();
370
371         if (commandObj)
372                 fields->Set("_check_command", commandObj->GetName());
373
374         SendLogMessage(checkable, ComposeGelfMessage(fields, GetSource(), ts));
375 }
376
377 void GelfWriter::StateChangeHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type)
378 {
379         if (IsPaused())
380                 return;
381
382         m_WorkQueue.Enqueue(std::bind(&GelfWriter::StateChangeHandlerInternal, this, checkable, cr, type));
383 }
384
385 void GelfWriter::StateChangeHandlerInternal(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type)
386 {
387         AssertOnWorkQueue();
388
389         CONTEXT("GELF Processing state change '" + checkable->GetName() + "'");
390
391         Log(LogDebug, "GelfWriter")
392                 << "Processing state change for '" << checkable->GetName() << "'";
393
394         Host::Ptr host;
395         Service::Ptr service;
396         tie(host, service) = GetHostService(checkable);
397
398         Dictionary::Ptr fields = new Dictionary();
399
400         fields->Set("_state", service ? Service::StateToString(service->GetState()) : Host::StateToString(host->GetState()));
401         fields->Set("_type", "STATE CHANGE");
402         fields->Set("_current_check_attempt", checkable->GetCheckAttempt());
403         fields->Set("_max_check_attempts", checkable->GetMaxCheckAttempts());
404         fields->Set("_hostname", host->GetName());
405
406         if (service) {
407                 fields->Set("_service_name", service->GetShortName());
408                 fields->Set("_service_state", Service::StateToString(service->GetState()));
409                 fields->Set("_last_state", service->GetLastState());
410                 fields->Set("_last_hard_state", service->GetLastHardState());
411         } else {
412                 fields->Set("_last_state", host->GetLastState());
413                 fields->Set("_last_hard_state", host->GetLastHardState());
414         }
415
416         CheckCommand::Ptr commandObj = checkable->GetCheckCommand();
417
418         if (commandObj)
419                 fields->Set("_check_command", commandObj->GetName());
420
421         double ts = Utility::GetTime();
422
423         if (cr) {
424                 fields->Set("short_message", CompatUtility::GetCheckResultOutput(cr));
425                 fields->Set("full_message", cr->GetOutput());
426                 fields->Set("_check_source", cr->GetCheckSource());
427                 ts = cr->GetExecutionEnd();
428         }
429
430         SendLogMessage(checkable, ComposeGelfMessage(fields, GetSource(), ts));
431 }
432
433 String GelfWriter::ComposeGelfMessage(const Dictionary::Ptr& fields, const String& source, double ts)
434 {
435         fields->Set("version", "1.1");
436         fields->Set("host", source);
437         fields->Set("timestamp", ts);
438
439         return JsonEncode(fields);
440 }
441
442 void GelfWriter::SendLogMessage(const Checkable::Ptr& checkable, const String& gelfMessage)
443 {
444         std::ostringstream msgbuf;
445         msgbuf << gelfMessage;
446         msgbuf << '\0';
447
448         String log = msgbuf.str();
449
450         ObjectLock olock(this);
451
452         if (!GetConnected())
453                 return;
454
455         try {
456                 Log(LogDebug, "GelfWriter")
457                         << "Checkable '" << checkable->GetName() << "' sending message '" << log << "'.";
458
459                 m_Stream->Write(log.CStr(), log.GetLength());
460         } catch (const std::exception& ex) {
461                 Log(LogCritical, "GelfWriter")
462                         << "Cannot write to TCP socket on host '" << GetHost() << "' port '" << GetPort() << "'.";
463
464                 throw ex;
465         }
466 }