]> granicus.if.org Git - icinga2/blob - lib/perfdata/gelfwriter.cpp
Merge pull request #7527 from Icinga/bugfix/checkable-command-endpoint-zone
[icinga2] / lib / perfdata / gelfwriter.cpp
1 /* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */
2
3 #include "perfdata/gelfwriter.hpp"
4 #include "perfdata/gelfwriter-ti.cpp"
5 #include "icinga/service.hpp"
6 #include "icinga/notification.hpp"
7 #include "icinga/checkcommand.hpp"
8 #include "icinga/macroprocessor.hpp"
9 #include "icinga/compatutility.hpp"
10 #include "base/tcpsocket.hpp"
11 #include "base/configtype.hpp"
12 #include "base/objectlock.hpp"
13 #include "base/logger.hpp"
14 #include "base/utility.hpp"
15 #include "base/perfdatavalue.hpp"
16 #include "base/application.hpp"
17 #include "base/stream.hpp"
18 #include "base/networkstream.hpp"
19 #include "base/context.hpp"
20 #include "base/exception.hpp"
21 #include "base/json.hpp"
22 #include "base/statsfunction.hpp"
23 #include <boost/algorithm/string/replace.hpp>
24 #include <utility>
25 #include "base/io-engine.hpp"
26 #include <boost/asio/write.hpp>
27 #include <boost/asio/buffer.hpp>
28 #include <boost/system/error_code.hpp>
29 #include <boost/asio/error.hpp>
30
31 using namespace icinga;
32
33 REGISTER_TYPE(GelfWriter);
34
35 REGISTER_STATSFUNCTION(GelfWriter, &GelfWriter::StatsFunc);
36
37 void GelfWriter::OnConfigLoaded()
38 {
39         ObjectImpl<GelfWriter>::OnConfigLoaded();
40
41         m_WorkQueue.SetName("GelfWriter, " + GetName());
42
43         if (!GetEnableHa()) {
44                 Log(LogDebug, "GelfWriter")
45                         << "HA functionality disabled. Won't pause connection: " << GetName();
46
47                 SetHAMode(HARunEverywhere);
48         } else {
49                 SetHAMode(HARunOnce);
50         }
51 }
52
53 void GelfWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata)
54 {
55         DictionaryData nodes;
56
57         for (const GelfWriter::Ptr& gelfwriter : ConfigType::GetObjectsByType<GelfWriter>()) {
58                 size_t workQueueItems = gelfwriter->m_WorkQueue.GetLength();
59                 double workQueueItemRate = gelfwriter->m_WorkQueue.GetTaskCount(60) / 60.0;
60
61                 nodes.emplace_back(gelfwriter->GetName(), new Dictionary({
62                         { "work_queue_items", workQueueItems },
63                         { "work_queue_item_rate", workQueueItemRate },
64                         { "connected", gelfwriter->GetConnected() },
65                         { "source", gelfwriter->GetSource() }
66                 }));
67
68                 perfdata->Add(new PerfdataValue("gelfwriter_" + gelfwriter->GetName() + "_work_queue_items", workQueueItems));
69                 perfdata->Add(new PerfdataValue("gelfwriter_" + gelfwriter->GetName() + "_work_queue_item_rate", workQueueItemRate));
70         }
71
72         status->Set("gelfwriter", new Dictionary(std::move(nodes)));
73 }
74
75 void GelfWriter::Resume()
76 {
77         ObjectImpl<GelfWriter>::Resume();
78
79         Log(LogInformation, "GelfWriter")
80                 << "'" << GetName() << "' resumed.";
81
82         /* Register exception handler for WQ tasks. */
83         m_WorkQueue.SetExceptionCallback(std::bind(&GelfWriter::ExceptionHandler, this, _1));
84
85         /* Timer for reconnecting */
86         m_ReconnectTimer = new Timer();
87         m_ReconnectTimer->SetInterval(10);
88         m_ReconnectTimer->OnTimerExpired.connect(std::bind(&GelfWriter::ReconnectTimerHandler, this));
89         m_ReconnectTimer->Start();
90         m_ReconnectTimer->Reschedule(0);
91
92         /* Register event handlers. */
93         Checkable::OnNewCheckResult.connect(std::bind(&GelfWriter::CheckResultHandler, this, _1, _2));
94         Checkable::OnNotificationSentToUser.connect(std::bind(&GelfWriter::NotificationToUserHandler, this, _1, _2, _3, _4, _5, _6, _7, _8, _9));
95         Checkable::OnStateChange.connect(std::bind(&GelfWriter::StateChangeHandler, this, _1, _2, _3));
96 }
97
98 /* Pause is equivalent to Stop, but with HA capabilities to resume at runtime. */
99 void GelfWriter::Pause()
100 {
101         m_ReconnectTimer.reset();
102
103         try {
104                 ReconnectInternal();
105         } catch (const std::exception&) {
106                 Log(LogInformation, "GelfWriter")
107                         << "'" << GetName() << "' paused. Unable to connect, not flushing buffers. Data may be lost on reload.";
108
109                 ObjectImpl<GelfWriter>::Pause();
110                 return;
111         }
112
113         m_WorkQueue.Join();
114         DisconnectInternal();
115
116         Log(LogInformation, "GelfWriter")
117                 << "'" << GetName() << "' paused.";
118
119         ObjectImpl<GelfWriter>::Pause();
120 }
121
122 void GelfWriter::AssertOnWorkQueue()
123 {
124         ASSERT(m_WorkQueue.IsWorkerThread());
125 }
126
127 void GelfWriter::ExceptionHandler(boost::exception_ptr exp)
128 {
129         Log(LogCritical, "GelfWriter", "Exception during Graylog Gelf operation: Verify that your backend is operational!");
130
131         Log(LogDebug, "GelfWriter")
132                 << "Exception during Graylog Gelf operation: " << DiagnosticInformation(std::move(exp));
133
134         DisconnectInternal();
135 }
136
137 void GelfWriter::Reconnect()
138 {
139         AssertOnWorkQueue();
140
141         if (IsPaused()) {
142                 SetConnected(false);
143                 return;
144         }
145
146         ReconnectInternal();
147 }
148
149 void GelfWriter::ReconnectInternal()
150 {
151         double startTime = Utility::GetTime();
152
153         CONTEXT("Reconnecting to Graylog Gelf '" + GetName() + "'");
154
155         SetShouldConnect(true);
156
157         if (GetConnected())
158                 return;
159
160         Log(LogNotice, "GelfWriter")
161                 << "Reconnecting to Graylog Gelf on host '" << GetHost() << "' port '" << GetPort() << "'.";
162
163         bool ssl = GetEnableTls();
164
165         if (ssl) {
166                 std::shared_ptr<boost::asio::ssl::context> sslContext;
167
168                 try {
169                         sslContext = MakeAsioSslContext(GetCertPath(), GetKeyPath(), GetCaPath());
170                 } catch (const std::exception& ex) {
171                         Log(LogWarning, "GelfWriter")
172                                 << "Unable to create SSL context.";
173                         throw;
174                 }
175
176                 m_Stream.first = std::make_shared<AsioTlsStream>(IoEngine::Get().GetIoContext(), *sslContext, GetHost());
177         } else {
178                 m_Stream.second = std::make_shared<AsioTcpStream>(IoEngine::Get().GetIoContext());
179         }
180
181         try {
182                 icinga::Connect(ssl ? m_Stream.first->lowest_layer() : m_Stream.second->lowest_layer(), GetHost(), GetPort());
183         } catch (const std::exception& ex) {
184                 Log(LogWarning, "GelfWriter")
185                         << "Can't connect to Graylog Gelf on host '" << GetHost() << "' port '" << GetPort() << ".'";
186                 throw;
187         }
188
189         if (ssl) {
190                 auto& tlsStream (m_Stream.first->next_layer());
191
192                 try {
193                         tlsStream.handshake(tlsStream.client);
194                 } catch (const std::exception& ex) {
195                         Log(LogWarning, "GelfWriter")
196                                 << "TLS handshake with host '" << GetHost() << " failed.'";
197                         throw;
198                 }
199         }
200
201         SetConnected(true);
202
203         Log(LogInformation, "GelfWriter")
204                 << "Finished reconnecting to Graylog Gelf in " << std::setw(2) << Utility::GetTime() - startTime << " second(s).";
205 }
206
207 void GelfWriter::ReconnectTimerHandler()
208 {
209         m_WorkQueue.Enqueue(std::bind(&GelfWriter::Reconnect, this), PriorityNormal);
210 }
211
212 void GelfWriter::Disconnect()
213 {
214         AssertOnWorkQueue();
215
216         DisconnectInternal();
217 }
218
219 void GelfWriter::DisconnectInternal()
220 {
221         if (!GetConnected())
222                 return;
223
224         if (m_Stream.first) {
225                 boost::system::error_code ec;
226                 m_Stream.first->next_layer().shutdown(ec);
227
228                 // https://stackoverflow.com/a/25703699
229                 // As long as the error code's category is not an SSL category, then the protocol was securely shutdown
230                 if (ec.category() == boost::asio::error::get_ssl_category()) {
231                         Log(LogCritical, "GelfWriter")
232                                 << "TLS shutdown with host '" << GetHost() << "' could not be done securely.";
233                 }
234         } else if (m_Stream.second) {
235                 m_Stream.second->close();
236         }
237
238         SetConnected(false);
239
240 }
241
242 void GelfWriter::CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
243 {
244         if (IsPaused())
245                 return;
246
247         m_WorkQueue.Enqueue(std::bind(&GelfWriter::CheckResultHandlerInternal, this, checkable, cr));
248 }
249
250 void GelfWriter::CheckResultHandlerInternal(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
251 {
252         AssertOnWorkQueue();
253
254         CONTEXT("GELF Processing check result for '" + checkable->GetName() + "'");
255
256         Log(LogDebug, "GelfWriter")
257                 << "Processing check result for '" << checkable->GetName() << "'";
258
259         Host::Ptr host;
260         Service::Ptr service;
261         tie(host, service) = GetHostService(checkable);
262
263         Dictionary::Ptr fields = new Dictionary();
264
265         if (service) {
266                 fields->Set("_service_name", service->GetShortName());
267                 fields->Set("_service_state", Service::StateToString(service->GetState()));
268                 fields->Set("_last_state", service->GetLastState());
269                 fields->Set("_last_hard_state", service->GetLastHardState());
270         } else {
271                 fields->Set("_last_state", host->GetLastState());
272                 fields->Set("_last_hard_state", host->GetLastHardState());
273         }
274
275         fields->Set("_hostname", host->GetName());
276         fields->Set("_type", "CHECK RESULT");
277         fields->Set("_state", service ? Service::StateToString(service->GetState()) : Host::StateToString(host->GetState()));
278
279         fields->Set("_current_check_attempt", checkable->GetCheckAttempt());
280         fields->Set("_max_check_attempts", checkable->GetMaxCheckAttempts());
281
282         fields->Set("_reachable", checkable->IsReachable());
283
284         CheckCommand::Ptr checkCommand = checkable->GetCheckCommand();
285
286         if (checkCommand)
287                 fields->Set("_check_command", checkCommand->GetName());
288
289         double ts = Utility::GetTime();
290
291         if (cr) {
292                 fields->Set("_latency", cr->CalculateLatency());
293                 fields->Set("_execution_time", cr->CalculateExecutionTime());
294                 fields->Set("short_message", CompatUtility::GetCheckResultOutput(cr));
295                 fields->Set("full_message", cr->GetOutput());
296                 fields->Set("_check_source", cr->GetCheckSource());
297                 ts = cr->GetExecutionEnd();
298         }
299
300         if (cr && GetEnableSendPerfdata()) {
301                 Array::Ptr perfdata = cr->GetPerformanceData();
302
303                 if (perfdata) {
304                         ObjectLock olock(perfdata);
305                         for (const Value& val : perfdata) {
306                                 PerfdataValue::Ptr pdv;
307
308                                 if (val.IsObjectType<PerfdataValue>())
309                                         pdv = val;
310                                 else {
311                                         try {
312                                                 pdv = PerfdataValue::Parse(val);
313                                         } catch (const std::exception&) {
314                                                 Log(LogWarning, "GelfWriter")
315                                                         << "Ignoring invalid perfdata for checkable '"
316                                                         << checkable->GetName() << "' and command '"
317                                                         << checkCommand->GetName() << "' with value: " << val;
318                                                 continue;
319                                         }
320                                 }
321
322                                 String escaped_key = pdv->GetLabel();
323                                 boost::replace_all(escaped_key, " ", "_");
324                                 boost::replace_all(escaped_key, ".", "_");
325                                 boost::replace_all(escaped_key, "\\", "_");
326                                 boost::algorithm::replace_all(escaped_key, "::", ".");
327
328                                 fields->Set("_" + escaped_key, pdv->GetValue());
329
330                                 if (pdv->GetMin())
331                                         fields->Set("_" + escaped_key + "_min", pdv->GetMin());
332                                 if (pdv->GetMax())
333                                         fields->Set("_" + escaped_key + "_max", pdv->GetMax());
334                                 if (pdv->GetWarn())
335                                         fields->Set("_" + escaped_key + "_warn", pdv->GetWarn());
336                                 if (pdv->GetCrit())
337                                         fields->Set("_" + escaped_key + "_crit", pdv->GetCrit());
338
339                                 if (!pdv->GetUnit().IsEmpty())
340                                         fields->Set("_" + escaped_key + "_unit", pdv->GetUnit());
341                         }
342                 }
343         }
344
345         SendLogMessage(checkable, ComposeGelfMessage(fields, GetSource(), ts));
346 }
347
348 void GelfWriter::NotificationToUserHandler(const Notification::Ptr& notification, const Checkable::Ptr& checkable,
349         const User::Ptr& user, NotificationType notificationType, const CheckResult::Ptr& cr, const NotificationResult::Ptr& nr,
350         const String& author, const String& commentText, const String& commandName)
351 {
352         if (IsPaused())
353                 return;
354
355         m_WorkQueue.Enqueue(std::bind(&GelfWriter::NotificationToUserHandlerInternal, this,
356                 notification, checkable, user, notificationType, cr, nr, author, commentText, commandName));
357 }
358
359 void GelfWriter::NotificationToUserHandlerInternal(const Notification::Ptr& notification, const Checkable::Ptr& checkable,
360         const User::Ptr& user, NotificationType notificationType, const CheckResult::Ptr& cr, const NotificationResult::Ptr& nr,
361         const String& author, const String& commentText, const String& commandName)
362 {
363         AssertOnWorkQueue();
364
365         CONTEXT("GELF Processing notification to all users '" + checkable->GetName() + "'");
366
367         Log(LogDebug, "GelfWriter")
368                 << "Processing notification for '" << checkable->GetName() << "'";
369
370         Host::Ptr host;
371         Service::Ptr service;
372         tie(host, service) = GetHostService(checkable);
373
374         String notificationTypeString = Notification::NotificationTypeToStringCompat(notificationType); //TODO: Change that to our own types.
375
376         String authorComment = "";
377
378         if (notificationType == NotificationCustom || notificationType == NotificationAcknowledgement) {
379                 authorComment = author + ";" + commentText;
380         }
381
382         String output;
383         double ts = Utility::GetTime();
384
385         if (cr) {
386                 output = CompatUtility::GetCheckResultOutput(cr);
387                 ts = cr->GetExecutionEnd();
388         }
389
390         Dictionary::Ptr fields = new Dictionary();
391
392         if (service) {
393                 fields->Set("_type", "SERVICE NOTIFICATION");
394                 //TODO: fix this to _service_name
395                 fields->Set("_service", service->GetShortName());
396                 fields->Set("short_message", output);
397         } else {
398                 fields->Set("_type", "HOST NOTIFICATION");
399                 fields->Set("short_message", output);
400         }
401
402         fields->Set("_state", service ? Service::StateToString(service->GetState()) : Host::StateToString(host->GetState()));
403
404         fields->Set("_hostname", host->GetName());
405         fields->Set("_command", commandName);
406         fields->Set("_notification_type", notificationTypeString);
407         fields->Set("_comment", authorComment);
408
409         CheckCommand::Ptr commandObj = checkable->GetCheckCommand();
410
411         if (commandObj)
412                 fields->Set("_check_command", commandObj->GetName());
413
414         SendLogMessage(checkable, ComposeGelfMessage(fields, GetSource(), ts));
415 }
416
417 void GelfWriter::StateChangeHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type)
418 {
419         if (IsPaused())
420                 return;
421
422         m_WorkQueue.Enqueue(std::bind(&GelfWriter::StateChangeHandlerInternal, this, checkable, cr, type));
423 }
424
425 void GelfWriter::StateChangeHandlerInternal(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type)
426 {
427         AssertOnWorkQueue();
428
429         CONTEXT("GELF Processing state change '" + checkable->GetName() + "'");
430
431         Log(LogDebug, "GelfWriter")
432                 << "Processing state change for '" << checkable->GetName() << "'";
433
434         Host::Ptr host;
435         Service::Ptr service;
436         tie(host, service) = GetHostService(checkable);
437
438         Dictionary::Ptr fields = new Dictionary();
439
440         fields->Set("_state", service ? Service::StateToString(service->GetState()) : Host::StateToString(host->GetState()));
441         fields->Set("_type", "STATE CHANGE");
442         fields->Set("_current_check_attempt", checkable->GetCheckAttempt());
443         fields->Set("_max_check_attempts", checkable->GetMaxCheckAttempts());
444         fields->Set("_hostname", host->GetName());
445
446         if (service) {
447                 fields->Set("_service_name", service->GetShortName());
448                 fields->Set("_service_state", Service::StateToString(service->GetState()));
449                 fields->Set("_last_state", service->GetLastState());
450                 fields->Set("_last_hard_state", service->GetLastHardState());
451         } else {
452                 fields->Set("_last_state", host->GetLastState());
453                 fields->Set("_last_hard_state", host->GetLastHardState());
454         }
455
456         CheckCommand::Ptr commandObj = checkable->GetCheckCommand();
457
458         if (commandObj)
459                 fields->Set("_check_command", commandObj->GetName());
460
461         double ts = Utility::GetTime();
462
463         if (cr) {
464                 fields->Set("short_message", CompatUtility::GetCheckResultOutput(cr));
465                 fields->Set("full_message", cr->GetOutput());
466                 fields->Set("_check_source", cr->GetCheckSource());
467                 ts = cr->GetExecutionEnd();
468         }
469
470         SendLogMessage(checkable, ComposeGelfMessage(fields, GetSource(), ts));
471 }
472
473 String GelfWriter::ComposeGelfMessage(const Dictionary::Ptr& fields, const String& source, double ts)
474 {
475         fields->Set("version", "1.1");
476         fields->Set("host", source);
477         fields->Set("timestamp", ts);
478
479         return JsonEncode(fields);
480 }
481
482 void GelfWriter::SendLogMessage(const Checkable::Ptr& checkable, const String& gelfMessage)
483 {
484         std::ostringstream msgbuf;
485         msgbuf << gelfMessage;
486         msgbuf << '\0';
487
488         String log = msgbuf.str();
489
490         ObjectLock olock(this);
491
492         if (!GetConnected())
493                 return;
494
495         try {
496                 Log(LogDebug, "GelfWriter")
497                         << "Checkable '" << checkable->GetName() << "' sending message '" << log << "'.";
498
499                 if (m_Stream.first) {
500                         boost::asio::write(*m_Stream.first, boost::asio::buffer(msgbuf.str()));
501                         m_Stream.first->flush();
502                 } else {
503                         boost::asio::write(*m_Stream.second, boost::asio::buffer(msgbuf.str()));
504                         m_Stream.second->flush();
505                 }
506         } catch (const std::exception& ex) {
507                 Log(LogCritical, "GelfWriter")
508                         << "Cannot write to TCP socket on host '" << GetHost() << "' port '" << GetPort() << "'.";
509
510                 throw ex;
511         }
512 }