1 /* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */
3 #include "perfdata/gelfwriter.hpp"
4 #include "perfdata/gelfwriter-ti.cpp"
5 #include "icinga/service.hpp"
6 #include "icinga/notification.hpp"
7 #include "icinga/checkcommand.hpp"
8 #include "icinga/macroprocessor.hpp"
9 #include "icinga/compatutility.hpp"
10 #include "base/tcpsocket.hpp"
11 #include "base/configtype.hpp"
12 #include "base/objectlock.hpp"
13 #include "base/logger.hpp"
14 #include "base/utility.hpp"
15 #include "base/perfdatavalue.hpp"
16 #include "base/application.hpp"
17 #include "base/stream.hpp"
18 #include "base/networkstream.hpp"
19 #include "base/context.hpp"
20 #include "base/exception.hpp"
21 #include "base/json.hpp"
22 #include "base/statsfunction.hpp"
23 #include <boost/algorithm/string/replace.hpp>
26 using namespace icinga;
28 REGISTER_TYPE(GelfWriter);
30 REGISTER_STATSFUNCTION(GelfWriter, &GelfWriter::StatsFunc);
32 void GelfWriter::OnConfigLoaded()
34 ObjectImpl<GelfWriter>::OnConfigLoaded();
36 m_WorkQueue.SetName("GelfWriter, " + GetName());
39 Log(LogDebug, "GelfWriter")
40 << "HA functionality disabled. Won't pause connection: " << GetName();
42 SetHAMode(HARunEverywhere);
48 void GelfWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata)
52 for (const GelfWriter::Ptr& gelfwriter : ConfigType::GetObjectsByType<GelfWriter>()) {
53 size_t workQueueItems = gelfwriter->m_WorkQueue.GetLength();
54 double workQueueItemRate = gelfwriter->m_WorkQueue.GetTaskCount(60) / 60.0;
56 nodes.emplace_back(gelfwriter->GetName(), new Dictionary({
57 { "work_queue_items", workQueueItems },
58 { "work_queue_item_rate", workQueueItemRate },
59 { "connected", gelfwriter->GetConnected() },
60 { "source", gelfwriter->GetSource() }
63 perfdata->Add(new PerfdataValue("gelfwriter_" + gelfwriter->GetName() + "_work_queue_items", workQueueItems));
64 perfdata->Add(new PerfdataValue("gelfwriter_" + gelfwriter->GetName() + "_work_queue_item_rate", workQueueItemRate));
67 status->Set("gelfwriter", new Dictionary(std::move(nodes)));
70 void GelfWriter::Resume()
72 ObjectImpl<GelfWriter>::Resume();
74 Log(LogInformation, "GelfWriter")
75 << "'" << GetName() << "' resumed.";
77 /* Register exception handler for WQ tasks. */
78 m_WorkQueue.SetExceptionCallback(std::bind(&GelfWriter::ExceptionHandler, this, _1));
80 /* Timer for reconnecting */
81 m_ReconnectTimer = new Timer();
82 m_ReconnectTimer->SetInterval(10);
83 m_ReconnectTimer->OnTimerExpired.connect(std::bind(&GelfWriter::ReconnectTimerHandler, this));
84 m_ReconnectTimer->Start();
85 m_ReconnectTimer->Reschedule(0);
87 /* Register event handlers. */
88 Checkable::OnNewCheckResult.connect(std::bind(&GelfWriter::CheckResultHandler, this, _1, _2));
89 Checkable::OnNotificationSentToUser.connect(std::bind(&GelfWriter::NotificationToUserHandler, this, _1, _2, _3, _4, _5, _6, _7, _8, _9));
90 Checkable::OnStateChange.connect(std::bind(&GelfWriter::StateChangeHandler, this, _1, _2, _3));
93 /* Pause is equivalent to Stop, but with HA capabilities to resume at runtime. */
94 void GelfWriter::Pause()
96 m_ReconnectTimer.reset();
100 } catch (const std::exception&) {
101 Log(LogInformation, "GelfWriter")
102 << "'" << GetName() << "' paused. Unable to connect, not flushing buffers. Data may be lost on reload.";
104 ObjectImpl<GelfWriter>::Pause();
109 DisconnectInternal();
111 Log(LogInformation, "GraphiteWriter")
112 << "'" << GetName() << "' paused.";
114 ObjectImpl<GelfWriter>::Pause();
117 void GelfWriter::AssertOnWorkQueue()
119 ASSERT(m_WorkQueue.IsWorkerThread());
122 void GelfWriter::ExceptionHandler(boost::exception_ptr exp)
124 Log(LogCritical, "GelfWriter", "Exception during Graylog Gelf operation: Verify that your backend is operational!");
126 Log(LogDebug, "GelfWriter")
127 << "Exception during Graylog Gelf operation: " << DiagnosticInformation(std::move(exp));
129 if (GetConnected()) {
136 void GelfWriter::Reconnect()
148 void GelfWriter::ReconnectInternal()
150 double startTime = Utility::GetTime();
152 CONTEXT("Reconnecting to Graylog Gelf '" + GetName() + "'");
154 SetShouldConnect(true);
159 TcpSocket::Ptr socket = new TcpSocket();
161 Log(LogNotice, "GelfWriter")
162 << "Reconnecting to Graylog Gelf on host '" << GetHost() << "' port '" << GetPort() << "'.";
165 socket->Connect(GetHost(), GetPort());
166 } catch (const std::exception& ex) {
167 Log(LogCritical, "GelfWriter")
168 << "Can't connect to Graylog Gelf on host '" << GetHost() << "' port '" << GetPort() << "'.";
172 m_Stream = new NetworkStream(socket);
176 Log(LogInformation, "GelfWriter")
177 << "Finished reconnecting to Graylog Gelf in " << std::setw(2) << Utility::GetTime() - startTime << " second(s).";
180 void GelfWriter::ReconnectTimerHandler()
182 m_WorkQueue.Enqueue(std::bind(&GelfWriter::Reconnect, this), PriorityNormal);
185 void GelfWriter::Disconnect()
189 DisconnectInternal();
192 void GelfWriter::DisconnectInternal()
202 void GelfWriter::CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
207 m_WorkQueue.Enqueue(std::bind(&GelfWriter::CheckResultHandlerInternal, this, checkable, cr));
210 void GelfWriter::CheckResultHandlerInternal(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
214 CONTEXT("GELF Processing check result for '" + checkable->GetName() + "'");
216 Log(LogDebug, "GelfWriter")
217 << "Processing check result for '" << checkable->GetName() << "'";
220 Service::Ptr service;
221 tie(host, service) = GetHostService(checkable);
223 Dictionary::Ptr fields = new Dictionary();
226 fields->Set("_service_name", service->GetShortName());
227 fields->Set("_service_state", Service::StateToString(service->GetState()));
228 fields->Set("_last_state", service->GetLastState());
229 fields->Set("_last_hard_state", service->GetLastHardState());
231 fields->Set("_last_state", host->GetLastState());
232 fields->Set("_last_hard_state", host->GetLastHardState());
235 fields->Set("_hostname", host->GetName());
236 fields->Set("_type", "CHECK RESULT");
237 fields->Set("_state", service ? Service::StateToString(service->GetState()) : Host::StateToString(host->GetState()));
239 fields->Set("_current_check_attempt", checkable->GetCheckAttempt());
240 fields->Set("_max_check_attempts", checkable->GetMaxCheckAttempts());
242 fields->Set("_reachable", checkable->IsReachable());
244 CheckCommand::Ptr checkCommand = checkable->GetCheckCommand();
247 fields->Set("_check_command", checkCommand->GetName());
249 double ts = Utility::GetTime();
252 fields->Set("_latency", cr->CalculateLatency());
253 fields->Set("_execution_time", cr->CalculateExecutionTime());
254 fields->Set("short_message", CompatUtility::GetCheckResultOutput(cr));
255 fields->Set("full_message", cr->GetOutput());
256 fields->Set("_check_source", cr->GetCheckSource());
257 ts = cr->GetExecutionEnd();
260 if (cr && GetEnableSendPerfdata()) {
261 Array::Ptr perfdata = cr->GetPerformanceData();
264 ObjectLock olock(perfdata);
265 for (const Value& val : perfdata) {
266 PerfdataValue::Ptr pdv;
268 if (val.IsObjectType<PerfdataValue>())
272 pdv = PerfdataValue::Parse(val);
273 } catch (const std::exception&) {
274 Log(LogWarning, "GelfWriter")
275 << "Ignoring invalid perfdata for checkable '"
276 << checkable->GetName() << "' and command '"
277 << checkCommand->GetName() << "' with value: " << val;
282 String escaped_key = pdv->GetLabel();
283 boost::replace_all(escaped_key, " ", "_");
284 boost::replace_all(escaped_key, ".", "_");
285 boost::replace_all(escaped_key, "\\", "_");
286 boost::algorithm::replace_all(escaped_key, "::", ".");
288 fields->Set("_" + escaped_key, pdv->GetValue());
291 fields->Set("_" + escaped_key + "_min", pdv->GetMin());
293 fields->Set("_" + escaped_key + "_max", pdv->GetMax());
295 fields->Set("_" + escaped_key + "_warn", pdv->GetWarn());
297 fields->Set("_" + escaped_key + "_crit", pdv->GetCrit());
299 if (!pdv->GetUnit().IsEmpty())
300 fields->Set("_" + escaped_key + "_unit", pdv->GetUnit());
305 SendLogMessage(checkable, ComposeGelfMessage(fields, GetSource(), ts));
308 void GelfWriter::NotificationToUserHandler(const Notification::Ptr& notification, const Checkable::Ptr& checkable,
309 const User::Ptr& user, NotificationType notificationType, const CheckResult::Ptr& cr, const NotificationResult::Ptr& nr,
310 const String& author, const String& commentText, const String& commandName)
315 m_WorkQueue.Enqueue(std::bind(&GelfWriter::NotificationToUserHandlerInternal, this,
316 notification, checkable, user, notificationType, cr, nr, author, commentText, commandName));
319 void GelfWriter::NotificationToUserHandlerInternal(const Notification::Ptr& notification, const Checkable::Ptr& checkable,
320 const User::Ptr& user, NotificationType notificationType, const CheckResult::Ptr& cr, const NotificationResult::Ptr& nr,
321 const String& author, const String& commentText, const String& commandName)
325 CONTEXT("GELF Processing notification to all users '" + checkable->GetName() + "'");
327 Log(LogDebug, "GelfWriter")
328 << "Processing notification for '" << checkable->GetName() << "'";
331 Service::Ptr service;
332 tie(host, service) = GetHostService(checkable);
334 String notificationTypeString = Notification::NotificationTypeToString(notificationType);
336 String authorComment = "";
338 if (notificationType == NotificationCustom || notificationType == NotificationAcknowledgement) {
339 authorComment = author + ";" + commentText;
343 double ts = Utility::GetTime();
346 output = CompatUtility::GetCheckResultOutput(cr);
347 ts = cr->GetExecutionEnd();
350 Dictionary::Ptr fields = new Dictionary();
353 fields->Set("_type", "SERVICE NOTIFICATION");
354 //TODO: fix this to _service_name
355 fields->Set("_service", service->GetShortName());
356 fields->Set("short_message", output);
358 fields->Set("_type", "HOST NOTIFICATION");
359 fields->Set("short_message", output);
362 fields->Set("_state", service ? Service::StateToString(service->GetState()) : Host::StateToString(host->GetState()));
364 fields->Set("_hostname", host->GetName());
365 fields->Set("_command", commandName);
366 fields->Set("_notification_type", notificationTypeString);
367 fields->Set("_comment", authorComment);
369 CheckCommand::Ptr commandObj = checkable->GetCheckCommand();
372 fields->Set("_check_command", commandObj->GetName());
374 SendLogMessage(checkable, ComposeGelfMessage(fields, GetSource(), ts));
377 void GelfWriter::StateChangeHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type)
382 m_WorkQueue.Enqueue(std::bind(&GelfWriter::StateChangeHandlerInternal, this, checkable, cr, type));
385 void GelfWriter::StateChangeHandlerInternal(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type)
389 CONTEXT("GELF Processing state change '" + checkable->GetName() + "'");
391 Log(LogDebug, "GelfWriter")
392 << "Processing state change for '" << checkable->GetName() << "'";
395 Service::Ptr service;
396 tie(host, service) = GetHostService(checkable);
398 Dictionary::Ptr fields = new Dictionary();
400 fields->Set("_state", service ? Service::StateToString(service->GetState()) : Host::StateToString(host->GetState()));
401 fields->Set("_type", "STATE CHANGE");
402 fields->Set("_current_check_attempt", checkable->GetCheckAttempt());
403 fields->Set("_max_check_attempts", checkable->GetMaxCheckAttempts());
404 fields->Set("_hostname", host->GetName());
407 fields->Set("_service_name", service->GetShortName());
408 fields->Set("_service_state", Service::StateToString(service->GetState()));
409 fields->Set("_last_state", service->GetLastState());
410 fields->Set("_last_hard_state", service->GetLastHardState());
412 fields->Set("_last_state", host->GetLastState());
413 fields->Set("_last_hard_state", host->GetLastHardState());
416 CheckCommand::Ptr commandObj = checkable->GetCheckCommand();
419 fields->Set("_check_command", commandObj->GetName());
421 double ts = Utility::GetTime();
424 fields->Set("short_message", CompatUtility::GetCheckResultOutput(cr));
425 fields->Set("full_message", cr->GetOutput());
426 fields->Set("_check_source", cr->GetCheckSource());
427 ts = cr->GetExecutionEnd();
430 SendLogMessage(checkable, ComposeGelfMessage(fields, GetSource(), ts));
433 String GelfWriter::ComposeGelfMessage(const Dictionary::Ptr& fields, const String& source, double ts)
435 fields->Set("version", "1.1");
436 fields->Set("host", source);
437 fields->Set("timestamp", ts);
439 return JsonEncode(fields);
442 void GelfWriter::SendLogMessage(const Checkable::Ptr& checkable, const String& gelfMessage)
444 std::ostringstream msgbuf;
445 msgbuf << gelfMessage;
448 String log = msgbuf.str();
450 ObjectLock olock(this);
456 Log(LogDebug, "GelfWriter")
457 << "Checkable '" << checkable->GetName() << "' sending message '" << log << "'.";
459 m_Stream->Write(log.CStr(), log.GetLength());
460 } catch (const std::exception& ex) {
461 Log(LogCritical, "GelfWriter")
462 << "Cannot write to TCP socket on host '" << GetHost() << "' port '" << GetPort() << "'.";