1 /* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */
3 #include "perfdata/elasticsearchwriter.hpp"
4 #include "perfdata/elasticsearchwriter-ti.cpp"
5 #include "remote/url.hpp"
6 #include "remote/httprequest.hpp"
7 #include "remote/httpresponse.hpp"
8 #include "icinga/compatutility.hpp"
9 #include "icinga/service.hpp"
10 #include "icinga/checkcommand.hpp"
11 #include "base/defer.hpp"
12 #include "base/tcpsocket.hpp"
13 #include "base/stream.hpp"
14 #include "base/base64.hpp"
15 #include "base/json.hpp"
16 #include "base/utility.hpp"
17 #include "base/networkstream.hpp"
18 #include "base/perfdatavalue.hpp"
19 #include "base/exception.hpp"
20 #include "base/statsfunction.hpp"
21 #include <boost/algorithm/string.hpp>
22 #include <boost/scoped_array.hpp>
25 using namespace icinga;
27 REGISTER_TYPE(ElasticsearchWriter);
29 REGISTER_STATSFUNCTION(ElasticsearchWriter, &ElasticsearchWriter::StatsFunc);
31 void ElasticsearchWriter::OnConfigLoaded()
33 ObjectImpl<ElasticsearchWriter>::OnConfigLoaded();
35 m_WorkQueue.SetName("ElasticsearchWriter, " + GetName());
38 Log(LogDebug, "ElasticsearchWriter")
39 << "HA functionality disabled. Won't pause connection: " << GetName();
41 SetHAMode(HARunEverywhere);
47 void ElasticsearchWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata)
51 for (const ElasticsearchWriter::Ptr& elasticsearchwriter : ConfigType::GetObjectsByType<ElasticsearchWriter>()) {
52 size_t workQueueItems = elasticsearchwriter->m_WorkQueue.GetLength();
53 double workQueueItemRate = elasticsearchwriter->m_WorkQueue.GetTaskCount(60) / 60.0;
55 nodes.emplace_back(elasticsearchwriter->GetName(), new Dictionary({
56 { "work_queue_items", workQueueItems },
57 { "work_queue_item_rate", workQueueItemRate }
60 perfdata->Add(new PerfdataValue("elasticsearchwriter_" + elasticsearchwriter->GetName() + "_work_queue_items", workQueueItems));
61 perfdata->Add(new PerfdataValue("elasticsearchwriter_" + elasticsearchwriter->GetName() + "_work_queue_item_rate", workQueueItemRate));
64 status->Set("elasticsearchwriter", new Dictionary(std::move(nodes)));
67 void ElasticsearchWriter::Resume()
69 ObjectImpl<ElasticsearchWriter>::Resume();
71 m_EventPrefix = "icinga2.event.";
73 Log(LogInformation, "ElasticsearchWriter")
74 << "'" << GetName() << "' resumed.";
76 m_WorkQueue.SetExceptionCallback(std::bind(&ElasticsearchWriter::ExceptionHandler, this, _1));
78 /* Setup timer for periodically flushing m_DataBuffer */
79 m_FlushTimer = new Timer();
80 m_FlushTimer->SetInterval(GetFlushInterval());
81 m_FlushTimer->OnTimerExpired.connect(std::bind(&ElasticsearchWriter::FlushTimeout, this));
82 m_FlushTimer->Start();
83 m_FlushTimer->Reschedule(0);
85 /* Register for new metrics. */
86 Checkable::OnNewCheckResult.connect(std::bind(&ElasticsearchWriter::CheckResultHandler, this, _1, _2));
87 Checkable::OnStateChange.connect(std::bind(&ElasticsearchWriter::StateChangeHandler, this, _1, _2, _3));
88 Checkable::OnNotificationSentToAllUsers.connect(std::bind(&ElasticsearchWriter::NotificationSentToAllUsersHandler, this, _1, _2, _3, _4, _5, _6, _7));
91 /* Pause is equivalent to Stop, but with HA capabilities to resume at runtime. */
92 void ElasticsearchWriter::Pause()
98 Log(LogInformation, "ElasticsearchWriter")
99 << "'" << GetName() << "' paused.";
101 ObjectImpl<ElasticsearchWriter>::Pause();
104 void ElasticsearchWriter::AddCheckResult(const Dictionary::Ptr& fields, const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
106 String prefix = "check_result.";
108 fields->Set(prefix + "output", cr->GetOutput());
109 fields->Set(prefix + "check_source", cr->GetCheckSource());
110 fields->Set(prefix + "exit_status", cr->GetExitStatus());
111 fields->Set(prefix + "command", cr->GetCommand());
112 fields->Set(prefix + "state", cr->GetState());
113 fields->Set(prefix + "vars_before", cr->GetVarsBefore());
114 fields->Set(prefix + "vars_after", cr->GetVarsAfter());
116 fields->Set(prefix + "execution_start", FormatTimestamp(cr->GetExecutionStart()));
117 fields->Set(prefix + "execution_end", FormatTimestamp(cr->GetExecutionEnd()));
118 fields->Set(prefix + "schedule_start", FormatTimestamp(cr->GetScheduleStart()));
119 fields->Set(prefix + "schedule_end", FormatTimestamp(cr->GetScheduleEnd()));
121 /* Add extra calculated field. */
122 fields->Set(prefix + "latency", cr->CalculateLatency());
123 fields->Set(prefix + "execution_time", cr->CalculateExecutionTime());
125 if (!GetEnableSendPerfdata())
128 Array::Ptr perfdata = cr->GetPerformanceData();
131 ObjectLock olock(perfdata);
132 for (const Value& val : perfdata) {
133 PerfdataValue::Ptr pdv;
135 if (val.IsObjectType<PerfdataValue>())
139 pdv = PerfdataValue::Parse(val);
140 } catch (const std::exception&) {
141 Log(LogWarning, "ElasticsearchWriter")
142 << "Ignoring invalid perfdata value: '" << val << "' for object '"
143 << checkable->GetName() << "'.";
148 String escapedKey = pdv->GetLabel();
149 boost::replace_all(escapedKey, " ", "_");
150 boost::replace_all(escapedKey, ".", "_");
151 boost::replace_all(escapedKey, "\\", "_");
152 boost::algorithm::replace_all(escapedKey, "::", ".");
154 String perfdataPrefix = prefix + "perfdata." + escapedKey;
156 fields->Set(perfdataPrefix + ".value", pdv->GetValue());
159 fields->Set(perfdataPrefix + ".min", pdv->GetMin());
161 fields->Set(perfdataPrefix + ".max", pdv->GetMax());
163 fields->Set(perfdataPrefix + ".warn", pdv->GetWarn());
165 fields->Set(perfdataPrefix + ".crit", pdv->GetCrit());
167 if (!pdv->GetUnit().IsEmpty())
168 fields->Set(perfdataPrefix + ".unit", pdv->GetUnit());
173 void ElasticsearchWriter::CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
178 m_WorkQueue.Enqueue(std::bind(&ElasticsearchWriter::InternalCheckResultHandler, this, checkable, cr));
181 void ElasticsearchWriter::InternalCheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
185 CONTEXT("Elasticwriter processing check result for '" + checkable->GetName() + "'");
187 if (!IcingaApplication::GetInstance()->GetEnablePerfdata() || !checkable->GetEnablePerfdata())
191 Service::Ptr service;
192 tie(host, service) = GetHostService(checkable);
194 Dictionary::Ptr fields = new Dictionary();
197 fields->Set("service", service->GetShortName());
198 fields->Set("state", service->GetState());
199 fields->Set("last_state", service->GetLastState());
200 fields->Set("last_hard_state", service->GetLastHardState());
202 fields->Set("state", host->GetState());
203 fields->Set("last_state", host->GetLastState());
204 fields->Set("last_hard_state", host->GetLastHardState());
207 fields->Set("host", host->GetName());
208 fields->Set("state_type", checkable->GetStateType());
210 fields->Set("current_check_attempt", checkable->GetCheckAttempt());
211 fields->Set("max_check_attempts", checkable->GetMaxCheckAttempts());
213 fields->Set("reachable", checkable->IsReachable());
215 CheckCommand::Ptr commandObj = checkable->GetCheckCommand();
218 fields->Set("check_command", commandObj->GetName());
220 double ts = Utility::GetTime();
223 AddCheckResult(fields, checkable, cr);
224 ts = cr->GetExecutionEnd();
227 Enqueue("checkresult", fields, ts);
230 void ElasticsearchWriter::StateChangeHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type)
235 m_WorkQueue.Enqueue(std::bind(&ElasticsearchWriter::StateChangeHandlerInternal, this, checkable, cr, type));
238 void ElasticsearchWriter::StateChangeHandlerInternal(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type)
242 CONTEXT("Elasticwriter processing state change '" + checkable->GetName() + "'");
245 Service::Ptr service;
246 tie(host, service) = GetHostService(checkable);
248 Dictionary::Ptr fields = new Dictionary();
250 fields->Set("current_check_attempt", checkable->GetCheckAttempt());
251 fields->Set("max_check_attempts", checkable->GetMaxCheckAttempts());
252 fields->Set("host", host->GetName());
255 fields->Set("service", service->GetShortName());
256 fields->Set("state", service->GetState());
257 fields->Set("last_state", service->GetLastState());
258 fields->Set("last_hard_state", service->GetLastHardState());
260 fields->Set("state", host->GetState());
261 fields->Set("last_state", host->GetLastState());
262 fields->Set("last_hard_state", host->GetLastHardState());
265 CheckCommand::Ptr commandObj = checkable->GetCheckCommand();
268 fields->Set("check_command", commandObj->GetName());
270 double ts = Utility::GetTime();
273 AddCheckResult(fields, checkable, cr);
274 ts = cr->GetExecutionEnd();
277 Enqueue("statechange", fields, ts);
280 void ElasticsearchWriter::NotificationSentToAllUsersHandler(const Notification::Ptr& notification,
281 const Checkable::Ptr& checkable, const std::set<User::Ptr>& users, NotificationType type,
282 const CheckResult::Ptr& cr, const String& author, const String& text)
287 m_WorkQueue.Enqueue(std::bind(&ElasticsearchWriter::NotificationSentToAllUsersHandlerInternal, this,
288 notification, checkable, users, type, cr, author, text));
291 void ElasticsearchWriter::NotificationSentToAllUsersHandlerInternal(const Notification::Ptr& notification,
292 const Checkable::Ptr& checkable, const std::set<User::Ptr>& users, NotificationType type,
293 const CheckResult::Ptr& cr, const String& author, const String& text)
297 CONTEXT("Elasticwriter processing notification to all users '" + checkable->GetName() + "'");
299 Log(LogDebug, "ElasticsearchWriter")
300 << "Processing notification for '" << checkable->GetName() << "'";
303 Service::Ptr service;
304 tie(host, service) = GetHostService(checkable);
306 String notificationTypeString = Notification::NotificationTypeToString(type);
308 Dictionary::Ptr fields = new Dictionary();
311 fields->Set("service", service->GetShortName());
312 fields->Set("state", service->GetState());
313 fields->Set("last_state", service->GetLastState());
314 fields->Set("last_hard_state", service->GetLastHardState());
316 fields->Set("state", host->GetState());
317 fields->Set("last_state", host->GetLastState());
318 fields->Set("last_hard_state", host->GetLastHardState());
321 fields->Set("host", host->GetName());
325 for (const User::Ptr& user : users) {
326 userNames.push_back(user->GetName());
329 fields->Set("users", new Array(std::move(userNames)));
330 fields->Set("notification_type", notificationTypeString);
331 fields->Set("author", author);
332 fields->Set("text", text);
334 CheckCommand::Ptr commandObj = checkable->GetCheckCommand();
337 fields->Set("check_command", commandObj->GetName());
339 double ts = Utility::GetTime();
342 AddCheckResult(fields, checkable, cr);
343 ts = cr->GetExecutionEnd();
346 Enqueue("notification", fields, ts);
349 void ElasticsearchWriter::Enqueue(const String& type, const Dictionary::Ptr& fields, double ts)
351 /* Atomically buffer the data point. */
352 boost::mutex::scoped_lock lock(m_DataBufferMutex);
354 /* Format the timestamps to dynamically select the date datatype inside the index. */
355 fields->Set("@timestamp", FormatTimestamp(ts));
356 fields->Set("timestamp", FormatTimestamp(ts));
358 String eventType = m_EventPrefix + type;
359 fields->Set("type", eventType);
361 /* Every payload needs a line describing the index.
362 * We do it this way to avoid problems with a near full queue.
364 String indexBody = "{\"index\": {} }\n";
365 String fieldsBody = JsonEncode(fields);
367 Log(LogDebug, "ElasticsearchWriter")
368 << "Add to fields to message list: '" << fieldsBody << "'.";
370 m_DataBuffer.emplace_back(indexBody + fieldsBody);
372 /* Flush if we've buffered too much to prevent excessive memory use. */
373 if (static_cast<int>(m_DataBuffer.size()) >= GetFlushThreshold()) {
374 Log(LogDebug, "ElasticsearchWriter")
375 << "Data buffer overflow writing " << m_DataBuffer.size() << " data points";
380 void ElasticsearchWriter::FlushTimeout()
382 /* Prevent new data points from being added to the array, there is a
383 * race condition where they could disappear.
385 boost::mutex::scoped_lock lock(m_DataBufferMutex);
387 /* Flush if there are any data available. */
388 if (m_DataBuffer.size() > 0) {
389 Log(LogDebug, "ElasticsearchWriter")
390 << "Timer expired writing " << m_DataBuffer.size() << " data points";
395 void ElasticsearchWriter::Flush()
397 /* Ensure you hold a lock against m_DataBuffer so that things
398 * don't go missing after creating the body and clearing the buffer.
400 String body = boost::algorithm::join(m_DataBuffer, "\n");
401 m_DataBuffer.clear();
403 /* Elasticsearch 6.x requires a new line. This is compatible to 5.x.
404 * Tested with 6.0.0 and 5.6.4.
411 void ElasticsearchWriter::SendRequest(const String& body)
413 Url::Ptr url = new Url();
415 url->SetScheme(GetEnableTls() ? "https" : "http");
416 url->SetHost(GetHost());
417 url->SetPort(GetPort());
419 std::vector<String> path;
421 /* Specify the index path. Best practice is a daily rotation.
422 * Example: http://localhost:9200/icinga2-2017.09.11?pretty=1
424 path.emplace_back(GetIndex() + "-" + Utility::FormatDateTime("%Y.%m.%d", Utility::GetTime()));
426 /* ES 6 removes multiple _type mappings: https://www.elastic.co/guide/en/elasticsearch/reference/6.x/removal-of-types.html
427 * Best practice is to statically define 'doc', as ES 5.X does not allow types starting with '_'.
429 path.emplace_back("doc");
431 /* Use the bulk message format. */
432 path.emplace_back("_bulk");
440 } catch (const std::exception& ex) {
441 Log(LogWarning, "ElasticsearchWriter")
442 << "Flush failed, cannot connect to Elasticsearch: " << DiagnosticInformation(ex, false);
449 Defer close ([&stream]() { stream->Close(); });
451 HttpRequest req(stream);
453 /* Specify required headers by Elasticsearch. */
454 req.AddHeader("Accept", "application/json");
455 req.AddHeader("Content-Type", "application/json");
457 /* Send authentication if configured. */
458 String username = GetUsername();
459 String password = GetPassword();
461 if (!username.IsEmpty() && !password.IsEmpty())
462 req.AddHeader("Authorization", "Basic " + Base64::Encode(username + ":" + password));
464 req.RequestMethod = "POST";
465 req.RequestUrl = url;
467 /* Don't log the request body to debug log, this is already done above. */
468 Log(LogDebug, "ElasticsearchWriter")
469 << "Sending " << req.RequestMethod << " request" << ((!username.IsEmpty() && !password.IsEmpty()) ? " with basic auth" : "" )
470 << " to '" << url->Format() << "'.";
473 req.WriteBody(body.CStr(), body.GetLength());
475 } catch (const std::exception& ex) {
476 Log(LogWarning, "ElasticsearchWriter")
477 << "Cannot write to HTTP API on host '" << GetHost() << "' port '" << GetPort() << "'.";
481 HttpResponse resp(stream, req);
482 StreamReadContext context;
485 resp.Parse(context, true);
486 while (resp.Parse(context, true) && !resp.Complete)
488 } catch (const std::exception& ex) {
489 Log(LogWarning, "ElasticsearchWriter")
490 << "Failed to parse HTTP response from host '" << GetHost() << "' port '" << GetPort() << "': " << DiagnosticInformation(ex, false);
494 if (!resp.Complete) {
495 Log(LogWarning, "ElasticsearchWriter")
496 << "Failed to read a complete HTTP response from the Elasticsearch server.";
500 if (resp.StatusCode > 299) {
501 if (resp.StatusCode == 401) {
502 /* More verbose error logging with Elasticsearch is hidden behind a proxy. */
503 if (!username.IsEmpty() && !password.IsEmpty()) {
504 Log(LogCritical, "ElasticsearchWriter")
505 << "401 Unauthorized. Please ensure that the user '" << username
506 << "' is able to authenticate against the HTTP API/Proxy.";
508 Log(LogCritical, "ElasticsearchWriter")
509 << "401 Unauthorized. The HTTP API requires authentication but no username/password has been configured.";
515 std::ostringstream msgbuf;
516 msgbuf << "Unexpected response code " << resp.StatusCode << " from URL '" << req.RequestUrl->Format() << "'";
518 String contentType = resp.Headers->Get("content-type");
520 if (contentType != "application/json" && contentType != "application/json; charset=utf-8") {
521 msgbuf << "; Unexpected Content-Type: '" << contentType << "'";
524 size_t responseSize = resp.GetBodySize();
525 boost::scoped_array<char> buffer(new char[responseSize + 1]);
526 resp.ReadBody(buffer.get(), responseSize);
527 buffer.get()[responseSize] = '\0';
530 msgbuf << "; Response body: '" << buffer.get() << "'";
531 #endif /* I2_DEBUG */
533 /* {"statusCode":404,"error":"Not Found","message":"Not Found"} */
534 Dictionary::Ptr jsonResponse;
536 jsonResponse = JsonDecode(buffer.get());
538 Log(LogWarning, "ElasticsearchWriter")
539 << "Unable to parse JSON response:\n" << buffer.get();
543 String error = jsonResponse->Get("error");
545 Log(LogCritical, "ElasticsearchWriter")
546 << "Error: '" << error << "'. " << msgbuf.str();
552 Stream::Ptr ElasticsearchWriter::Connect()
554 TcpSocket::Ptr socket = new TcpSocket();
556 Log(LogNotice, "ElasticsearchWriter")
557 << "Connecting to Elasticsearch on host '" << GetHost() << "' port '" << GetPort() << "'.";
560 socket->Connect(GetHost(), GetPort());
561 } catch (const std::exception& ex) {
562 Log(LogWarning, "ElasticsearchWriter")
563 << "Can't connect to Elasticsearch on host '" << GetHost() << "' port '" << GetPort() << "'.";
567 if (GetEnableTls()) {
568 std::shared_ptr<SSL_CTX> sslContext;
571 sslContext = MakeSSLContext(GetCertPath(), GetKeyPath(), GetCaPath());
572 } catch (const std::exception& ex) {
573 Log(LogWarning, "ElasticsearchWriter")
574 << "Unable to create SSL context.";
578 TlsStream::Ptr tlsStream = new TlsStream(socket, GetHost(), RoleClient, sslContext);
581 tlsStream->Handshake();
582 } catch (const std::exception& ex) {
583 Log(LogWarning, "ElasticsearchWriter")
584 << "TLS handshake with host '" << GetHost() << "' on port " << GetPort() << " failed.";
590 return new NetworkStream(socket);
594 void ElasticsearchWriter::AssertOnWorkQueue()
596 ASSERT(m_WorkQueue.IsWorkerThread());
599 void ElasticsearchWriter::ExceptionHandler(boost::exception_ptr exp)
601 Log(LogCritical, "ElasticsearchWriter", "Exception during Elastic operation: Verify that your backend is operational!");
603 Log(LogDebug, "ElasticsearchWriter")
604 << "Exception during Elasticsearch operation: " << DiagnosticInformation(std::move(exp));
607 String ElasticsearchWriter::FormatTimestamp(double ts)
609 /* The date format must match the default dynamic date detection
610 * pattern in indexes. This enables applications like Kibana to
611 * detect a qualified timestamp index for time-series data.
613 * Example: 2017-09-11T10:56:21.463+0200
616 * https://www.elastic.co/guide/en/elasticsearch/reference/current/dynamic-field-mapping.html#date-detection
617 * https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping-date-format.html
618 * https://www.elastic.co/guide/en/elasticsearch/reference/current/date.html
620 auto milliSeconds = static_cast<int>((ts - static_cast<int>(ts)) * 1000);
622 return Utility::FormatDateTime("%Y-%m-%dT%H:%M:%S", ts) + "." + Convert::ToString(milliSeconds) + Utility::FormatDateTime("%z", ts);