1 /* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */
3 #include "perfdata/gelfwriter.hpp"
4 #include "perfdata/gelfwriter-ti.cpp"
5 #include "icinga/service.hpp"
6 #include "icinga/notification.hpp"
7 #include "icinga/checkcommand.hpp"
8 #include "icinga/macroprocessor.hpp"
9 #include "icinga/compatutility.hpp"
10 #include "base/tcpsocket.hpp"
11 #include "base/configtype.hpp"
12 #include "base/objectlock.hpp"
13 #include "base/logger.hpp"
14 #include "base/utility.hpp"
15 #include "base/perfdatavalue.hpp"
16 #include "base/application.hpp"
17 #include "base/stream.hpp"
18 #include "base/networkstream.hpp"
19 #include "base/context.hpp"
20 #include "base/exception.hpp"
21 #include "base/json.hpp"
22 #include "base/statsfunction.hpp"
23 #include <boost/algorithm/string/replace.hpp>
25 #include "base/io-engine.hpp"
26 #include <boost/asio/write.hpp>
27 #include <boost/asio/buffer.hpp>
28 #include <boost/system/error_code.hpp>
29 #include <boost/asio/error.hpp>
31 using namespace icinga;
33 REGISTER_TYPE(GelfWriter);
35 REGISTER_STATSFUNCTION(GelfWriter, &GelfWriter::StatsFunc);
37 void GelfWriter::OnConfigLoaded()
39 ObjectImpl<GelfWriter>::OnConfigLoaded();
41 m_WorkQueue.SetName("GelfWriter, " + GetName());
44 Log(LogDebug, "GelfWriter")
45 << "HA functionality disabled. Won't pause connection: " << GetName();
47 SetHAMode(HARunEverywhere);
53 void GelfWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata)
57 for (const GelfWriter::Ptr& gelfwriter : ConfigType::GetObjectsByType<GelfWriter>()) {
58 size_t workQueueItems = gelfwriter->m_WorkQueue.GetLength();
59 double workQueueItemRate = gelfwriter->m_WorkQueue.GetTaskCount(60) / 60.0;
61 nodes.emplace_back(gelfwriter->GetName(), new Dictionary({
62 { "work_queue_items", workQueueItems },
63 { "work_queue_item_rate", workQueueItemRate },
64 { "connected", gelfwriter->GetConnected() },
65 { "source", gelfwriter->GetSource() }
68 perfdata->Add(new PerfdataValue("gelfwriter_" + gelfwriter->GetName() + "_work_queue_items", workQueueItems));
69 perfdata->Add(new PerfdataValue("gelfwriter_" + gelfwriter->GetName() + "_work_queue_item_rate", workQueueItemRate));
72 status->Set("gelfwriter", new Dictionary(std::move(nodes)));
75 void GelfWriter::Resume()
77 ObjectImpl<GelfWriter>::Resume();
79 Log(LogInformation, "GelfWriter")
80 << "'" << GetName() << "' resumed.";
82 /* Register exception handler for WQ tasks. */
83 m_WorkQueue.SetExceptionCallback(std::bind(&GelfWriter::ExceptionHandler, this, _1));
85 /* Timer for reconnecting */
86 m_ReconnectTimer = new Timer();
87 m_ReconnectTimer->SetInterval(10);
88 m_ReconnectTimer->OnTimerExpired.connect(std::bind(&GelfWriter::ReconnectTimerHandler, this));
89 m_ReconnectTimer->Start();
90 m_ReconnectTimer->Reschedule(0);
92 /* Register event handlers. */
93 Checkable::OnNewCheckResult.connect(std::bind(&GelfWriter::CheckResultHandler, this, _1, _2));
94 Checkable::OnNotificationSentToUser.connect(std::bind(&GelfWriter::NotificationToUserHandler, this, _1, _2, _3, _4, _5, _6, _7, _8, _9));
95 Checkable::OnStateChange.connect(std::bind(&GelfWriter::StateChangeHandler, this, _1, _2, _3));
98 /* Pause is equivalent to Stop, but with HA capabilities to resume at runtime. */
99 void GelfWriter::Pause()
101 m_ReconnectTimer.reset();
105 } catch (const std::exception&) {
106 Log(LogInformation, "GelfWriter")
107 << "'" << GetName() << "' paused. Unable to connect, not flushing buffers. Data may be lost on reload.";
109 ObjectImpl<GelfWriter>::Pause();
114 DisconnectInternal();
116 Log(LogInformation, "GelfWriter")
117 << "'" << GetName() << "' paused.";
119 ObjectImpl<GelfWriter>::Pause();
122 void GelfWriter::AssertOnWorkQueue()
124 ASSERT(m_WorkQueue.IsWorkerThread());
127 void GelfWriter::ExceptionHandler(boost::exception_ptr exp)
129 Log(LogCritical, "GelfWriter", "Exception during Graylog Gelf operation: Verify that your backend is operational!");
131 Log(LogDebug, "GelfWriter")
132 << "Exception during Graylog Gelf operation: " << DiagnosticInformation(std::move(exp));
134 DisconnectInternal();
137 void GelfWriter::Reconnect()
149 void GelfWriter::ReconnectInternal()
151 double startTime = Utility::GetTime();
153 CONTEXT("Reconnecting to Graylog Gelf '" + GetName() + "'");
155 SetShouldConnect(true);
160 Log(LogNotice, "GelfWriter")
161 << "Reconnecting to Graylog Gelf on host '" << GetHost() << "' port '" << GetPort() << "'.";
163 bool ssl = GetEnableTls();
166 std::shared_ptr<boost::asio::ssl::context> sslContext;
169 sslContext = MakeAsioSslContext(GetCertPath(), GetKeyPath(), GetCaPath());
170 } catch (const std::exception& ex) {
171 Log(LogWarning, "GelfWriter")
172 << "Unable to create SSL context.";
176 m_Stream.first = std::make_shared<AsioTlsStream>(IoEngine::Get().GetIoContext(), *sslContext, GetHost());
178 m_Stream.second = std::make_shared<AsioTcpStream>(IoEngine::Get().GetIoContext());
182 icinga::Connect(ssl ? m_Stream.first->lowest_layer() : m_Stream.second->lowest_layer(), GetHost(), GetPort());
183 } catch (const std::exception& ex) {
184 Log(LogWarning, "GelfWriter")
185 << "Can't connect to Graylog Gelf on host '" << GetHost() << "' port '" << GetPort() << ".'";
190 auto& tlsStream (m_Stream.first->next_layer());
193 tlsStream.handshake(tlsStream.client);
194 } catch (const std::exception& ex) {
195 Log(LogWarning, "GelfWriter")
196 << "TLS handshake with host '" << GetHost() << " failed.'";
203 Log(LogInformation, "GelfWriter")
204 << "Finished reconnecting to Graylog Gelf in " << std::setw(2) << Utility::GetTime() - startTime << " second(s).";
207 void GelfWriter::ReconnectTimerHandler()
209 m_WorkQueue.Enqueue(std::bind(&GelfWriter::Reconnect, this), PriorityNormal);
212 void GelfWriter::Disconnect()
216 DisconnectInternal();
219 void GelfWriter::DisconnectInternal()
224 if (m_Stream.first) {
225 boost::system::error_code ec;
226 m_Stream.first->next_layer().shutdown(ec);
228 // https://stackoverflow.com/a/25703699
229 // As long as the error code's category is not an SSL category, then the protocol was securely shutdown
230 if (ec.category() == boost::asio::error::get_ssl_category()) {
231 Log(LogCritical, "GelfWriter")
232 << "TLS shutdown with host '" << GetHost() << "' could not be done securely.";
234 } else if (m_Stream.second) {
235 m_Stream.second->close();
242 void GelfWriter::CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
247 m_WorkQueue.Enqueue(std::bind(&GelfWriter::CheckResultHandlerInternal, this, checkable, cr));
250 void GelfWriter::CheckResultHandlerInternal(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
254 CONTEXT("GELF Processing check result for '" + checkable->GetName() + "'");
256 Log(LogDebug, "GelfWriter")
257 << "Processing check result for '" << checkable->GetName() << "'";
260 Service::Ptr service;
261 tie(host, service) = GetHostService(checkable);
263 Dictionary::Ptr fields = new Dictionary();
266 fields->Set("_service_name", service->GetShortName());
267 fields->Set("_service_state", Service::StateToString(service->GetState()));
268 fields->Set("_last_state", service->GetLastState());
269 fields->Set("_last_hard_state", service->GetLastHardState());
271 fields->Set("_last_state", host->GetLastState());
272 fields->Set("_last_hard_state", host->GetLastHardState());
275 fields->Set("_hostname", host->GetName());
276 fields->Set("_type", "CHECK RESULT");
277 fields->Set("_state", service ? Service::StateToString(service->GetState()) : Host::StateToString(host->GetState()));
279 fields->Set("_current_check_attempt", checkable->GetCheckAttempt());
280 fields->Set("_max_check_attempts", checkable->GetMaxCheckAttempts());
282 fields->Set("_reachable", checkable->IsReachable());
284 CheckCommand::Ptr checkCommand = checkable->GetCheckCommand();
287 fields->Set("_check_command", checkCommand->GetName());
289 double ts = Utility::GetTime();
292 fields->Set("_latency", cr->CalculateLatency());
293 fields->Set("_execution_time", cr->CalculateExecutionTime());
294 fields->Set("short_message", CompatUtility::GetCheckResultOutput(cr));
295 fields->Set("full_message", cr->GetOutput());
296 fields->Set("_check_source", cr->GetCheckSource());
297 ts = cr->GetExecutionEnd();
300 if (cr && GetEnableSendPerfdata()) {
301 Array::Ptr perfdata = cr->GetPerformanceData();
304 ObjectLock olock(perfdata);
305 for (const Value& val : perfdata) {
306 PerfdataValue::Ptr pdv;
308 if (val.IsObjectType<PerfdataValue>())
312 pdv = PerfdataValue::Parse(val);
313 } catch (const std::exception&) {
314 Log(LogWarning, "GelfWriter")
315 << "Ignoring invalid perfdata for checkable '"
316 << checkable->GetName() << "' and command '"
317 << checkCommand->GetName() << "' with value: " << val;
322 String escaped_key = pdv->GetLabel();
323 boost::replace_all(escaped_key, " ", "_");
324 boost::replace_all(escaped_key, ".", "_");
325 boost::replace_all(escaped_key, "\\", "_");
326 boost::algorithm::replace_all(escaped_key, "::", ".");
328 fields->Set("_" + escaped_key, pdv->GetValue());
331 fields->Set("_" + escaped_key + "_min", pdv->GetMin());
333 fields->Set("_" + escaped_key + "_max", pdv->GetMax());
335 fields->Set("_" + escaped_key + "_warn", pdv->GetWarn());
337 fields->Set("_" + escaped_key + "_crit", pdv->GetCrit());
339 if (!pdv->GetUnit().IsEmpty())
340 fields->Set("_" + escaped_key + "_unit", pdv->GetUnit());
345 SendLogMessage(checkable, ComposeGelfMessage(fields, GetSource(), ts));
348 void GelfWriter::NotificationToUserHandler(const Notification::Ptr& notification, const Checkable::Ptr& checkable,
349 const User::Ptr& user, NotificationType notificationType, const CheckResult::Ptr& cr, const NotificationResult::Ptr& nr,
350 const String& author, const String& commentText, const String& commandName)
355 m_WorkQueue.Enqueue(std::bind(&GelfWriter::NotificationToUserHandlerInternal, this,
356 notification, checkable, user, notificationType, cr, nr, author, commentText, commandName));
359 void GelfWriter::NotificationToUserHandlerInternal(const Notification::Ptr& notification, const Checkable::Ptr& checkable,
360 const User::Ptr& user, NotificationType notificationType, const CheckResult::Ptr& cr, const NotificationResult::Ptr& nr,
361 const String& author, const String& commentText, const String& commandName)
365 CONTEXT("GELF Processing notification to all users '" + checkable->GetName() + "'");
367 Log(LogDebug, "GelfWriter")
368 << "Processing notification for '" << checkable->GetName() << "'";
371 Service::Ptr service;
372 tie(host, service) = GetHostService(checkable);
374 String notificationTypeString = Notification::NotificationTypeToStringCompat(notificationType); //TODO: Change that to our own types.
376 String authorComment = "";
378 if (notificationType == NotificationCustom || notificationType == NotificationAcknowledgement) {
379 authorComment = author + ";" + commentText;
383 double ts = Utility::GetTime();
386 output = CompatUtility::GetCheckResultOutput(cr);
387 ts = cr->GetExecutionEnd();
390 Dictionary::Ptr fields = new Dictionary();
393 fields->Set("_type", "SERVICE NOTIFICATION");
394 //TODO: fix this to _service_name
395 fields->Set("_service", service->GetShortName());
396 fields->Set("short_message", output);
398 fields->Set("_type", "HOST NOTIFICATION");
399 fields->Set("short_message", output);
402 fields->Set("_state", service ? Service::StateToString(service->GetState()) : Host::StateToString(host->GetState()));
404 fields->Set("_hostname", host->GetName());
405 fields->Set("_command", commandName);
406 fields->Set("_notification_type", notificationTypeString);
407 fields->Set("_comment", authorComment);
409 CheckCommand::Ptr commandObj = checkable->GetCheckCommand();
412 fields->Set("_check_command", commandObj->GetName());
414 SendLogMessage(checkable, ComposeGelfMessage(fields, GetSource(), ts));
417 void GelfWriter::StateChangeHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type)
422 m_WorkQueue.Enqueue(std::bind(&GelfWriter::StateChangeHandlerInternal, this, checkable, cr, type));
425 void GelfWriter::StateChangeHandlerInternal(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type)
429 CONTEXT("GELF Processing state change '" + checkable->GetName() + "'");
431 Log(LogDebug, "GelfWriter")
432 << "Processing state change for '" << checkable->GetName() << "'";
435 Service::Ptr service;
436 tie(host, service) = GetHostService(checkable);
438 Dictionary::Ptr fields = new Dictionary();
440 fields->Set("_state", service ? Service::StateToString(service->GetState()) : Host::StateToString(host->GetState()));
441 fields->Set("_type", "STATE CHANGE");
442 fields->Set("_current_check_attempt", checkable->GetCheckAttempt());
443 fields->Set("_max_check_attempts", checkable->GetMaxCheckAttempts());
444 fields->Set("_hostname", host->GetName());
447 fields->Set("_service_name", service->GetShortName());
448 fields->Set("_service_state", Service::StateToString(service->GetState()));
449 fields->Set("_last_state", service->GetLastState());
450 fields->Set("_last_hard_state", service->GetLastHardState());
452 fields->Set("_last_state", host->GetLastState());
453 fields->Set("_last_hard_state", host->GetLastHardState());
456 CheckCommand::Ptr commandObj = checkable->GetCheckCommand();
459 fields->Set("_check_command", commandObj->GetName());
461 double ts = Utility::GetTime();
464 fields->Set("short_message", CompatUtility::GetCheckResultOutput(cr));
465 fields->Set("full_message", cr->GetOutput());
466 fields->Set("_check_source", cr->GetCheckSource());
467 ts = cr->GetExecutionEnd();
470 SendLogMessage(checkable, ComposeGelfMessage(fields, GetSource(), ts));
473 String GelfWriter::ComposeGelfMessage(const Dictionary::Ptr& fields, const String& source, double ts)
475 fields->Set("version", "1.1");
476 fields->Set("host", source);
477 fields->Set("timestamp", ts);
479 return JsonEncode(fields);
482 void GelfWriter::SendLogMessage(const Checkable::Ptr& checkable, const String& gelfMessage)
484 std::ostringstream msgbuf;
485 msgbuf << gelfMessage;
488 String log = msgbuf.str();
490 ObjectLock olock(this);
496 Log(LogDebug, "GelfWriter")
497 << "Checkable '" << checkable->GetName() << "' sending message '" << log << "'.";
499 if (m_Stream.first) {
500 boost::asio::write(*m_Stream.first, boost::asio::buffer(msgbuf.str()));
501 m_Stream.first->flush();
503 boost::asio::write(*m_Stream.second, boost::asio::buffer(msgbuf.str()));
504 m_Stream.second->flush();
506 } catch (const std::exception& ex) {
507 Log(LogCritical, "GelfWriter")
508 << "Cannot write to TCP socket on host '" << GetHost() << "' port '" << GetPort() << "'.";