1 /* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */
3 #include "perfdata/elasticsearchwriter.hpp"
4 #include "perfdata/elasticsearchwriter-ti.cpp"
5 #include "remote/url.hpp"
6 #include "remote/httprequest.hpp"
7 #include "remote/httpresponse.hpp"
8 #include "icinga/compatutility.hpp"
9 #include "icinga/service.hpp"
10 #include "icinga/checkcommand.hpp"
11 #include "base/application.hpp"
12 #include "base/defer.hpp"
13 #include "base/io-engine.hpp"
14 #include "base/tcpsocket.hpp"
15 #include "base/stream.hpp"
16 #include "base/base64.hpp"
17 #include "base/json.hpp"
18 #include "base/utility.hpp"
19 #include "base/networkstream.hpp"
20 #include "base/perfdatavalue.hpp"
21 #include "base/exception.hpp"
22 #include "base/statsfunction.hpp"
23 #include <boost/algorithm/string.hpp>
24 #include <boost/asio/ssl/context.hpp>
25 #include <boost/beast/core/flat_buffer.hpp>
26 #include <boost/beast/http/field.hpp>
27 #include <boost/beast/http/message.hpp>
28 #include <boost/beast/http/parser.hpp>
29 #include <boost/beast/http/read.hpp>
30 #include <boost/beast/http/status.hpp>
31 #include <boost/beast/http/string_body.hpp>
32 #include <boost/beast/http/verb.hpp>
33 #include <boost/beast/http/write.hpp>
34 #include <boost/scoped_array.hpp>
39 using namespace icinga;
41 REGISTER_TYPE(ElasticsearchWriter);
43 REGISTER_STATSFUNCTION(ElasticsearchWriter, &ElasticsearchWriter::StatsFunc);
45 void ElasticsearchWriter::OnConfigLoaded()
47 ObjectImpl<ElasticsearchWriter>::OnConfigLoaded();
49 m_WorkQueue.SetName("ElasticsearchWriter, " + GetName());
52 Log(LogDebug, "ElasticsearchWriter")
53 << "HA functionality disabled. Won't pause connection: " << GetName();
55 SetHAMode(HARunEverywhere);
61 void ElasticsearchWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata)
65 for (const ElasticsearchWriter::Ptr& elasticsearchwriter : ConfigType::GetObjectsByType<ElasticsearchWriter>()) {
66 size_t workQueueItems = elasticsearchwriter->m_WorkQueue.GetLength();
67 double workQueueItemRate = elasticsearchwriter->m_WorkQueue.GetTaskCount(60) / 60.0;
69 nodes.emplace_back(elasticsearchwriter->GetName(), new Dictionary({
70 { "work_queue_items", workQueueItems },
71 { "work_queue_item_rate", workQueueItemRate }
74 perfdata->Add(new PerfdataValue("elasticsearchwriter_" + elasticsearchwriter->GetName() + "_work_queue_items", workQueueItems));
75 perfdata->Add(new PerfdataValue("elasticsearchwriter_" + elasticsearchwriter->GetName() + "_work_queue_item_rate", workQueueItemRate));
78 status->Set("elasticsearchwriter", new Dictionary(std::move(nodes)));
81 void ElasticsearchWriter::Resume()
83 ObjectImpl<ElasticsearchWriter>::Resume();
85 m_EventPrefix = "icinga2.event.";
87 Log(LogInformation, "ElasticsearchWriter")
88 << "'" << GetName() << "' resumed.";
90 m_WorkQueue.SetExceptionCallback(std::bind(&ElasticsearchWriter::ExceptionHandler, this, _1));
92 /* Setup timer for periodically flushing m_DataBuffer */
93 m_FlushTimer = new Timer();
94 m_FlushTimer->SetInterval(GetFlushInterval());
95 m_FlushTimer->OnTimerExpired.connect(std::bind(&ElasticsearchWriter::FlushTimeout, this));
96 m_FlushTimer->Start();
97 m_FlushTimer->Reschedule(0);
99 /* Register for new metrics. */
100 Checkable::OnNewCheckResult.connect(std::bind(&ElasticsearchWriter::CheckResultHandler, this, _1, _2));
101 Checkable::OnStateChange.connect(std::bind(&ElasticsearchWriter::StateChangeHandler, this, _1, _2, _3));
102 Checkable::OnNotificationSentToAllUsers.connect(std::bind(&ElasticsearchWriter::NotificationSentToAllUsersHandler, this, _1, _2, _3, _4, _5, _6, _7));
105 /* Pause is equivalent to Stop, but with HA capabilities to resume at runtime. */
106 void ElasticsearchWriter::Pause()
112 Log(LogInformation, "ElasticsearchWriter")
113 << "'" << GetName() << "' paused.";
115 ObjectImpl<ElasticsearchWriter>::Pause();
118 void ElasticsearchWriter::AddCheckResult(const Dictionary::Ptr& fields, const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
120 String prefix = "check_result.";
122 fields->Set(prefix + "output", cr->GetOutput());
123 fields->Set(prefix + "check_source", cr->GetCheckSource());
124 fields->Set(prefix + "exit_status", cr->GetExitStatus());
125 fields->Set(prefix + "command", cr->GetCommand());
126 fields->Set(prefix + "state", cr->GetState());
127 fields->Set(prefix + "vars_before", cr->GetVarsBefore());
128 fields->Set(prefix + "vars_after", cr->GetVarsAfter());
130 fields->Set(prefix + "execution_start", FormatTimestamp(cr->GetExecutionStart()));
131 fields->Set(prefix + "execution_end", FormatTimestamp(cr->GetExecutionEnd()));
132 fields->Set(prefix + "schedule_start", FormatTimestamp(cr->GetScheduleStart()));
133 fields->Set(prefix + "schedule_end", FormatTimestamp(cr->GetScheduleEnd()));
135 /* Add extra calculated field. */
136 fields->Set(prefix + "latency", cr->CalculateLatency());
137 fields->Set(prefix + "execution_time", cr->CalculateExecutionTime());
139 if (!GetEnableSendPerfdata())
142 Array::Ptr perfdata = cr->GetPerformanceData();
144 CheckCommand::Ptr checkCommand = checkable->GetCheckCommand();
147 ObjectLock olock(perfdata);
148 for (const Value& val : perfdata) {
149 PerfdataValue::Ptr pdv;
151 if (val.IsObjectType<PerfdataValue>())
155 pdv = PerfdataValue::Parse(val);
156 } catch (const std::exception&) {
157 Log(LogWarning, "ElasticsearchWriter")
158 << "Ignoring invalid perfdata for checkable '"
159 << checkable->GetName() << "' and command '"
160 << checkCommand->GetName() << "' with value: " << val;
165 String escapedKey = pdv->GetLabel();
166 boost::replace_all(escapedKey, " ", "_");
167 boost::replace_all(escapedKey, ".", "_");
168 boost::replace_all(escapedKey, "\\", "_");
169 boost::algorithm::replace_all(escapedKey, "::", ".");
171 String perfdataPrefix = prefix + "perfdata." + escapedKey;
173 fields->Set(perfdataPrefix + ".value", pdv->GetValue());
176 fields->Set(perfdataPrefix + ".min", pdv->GetMin());
178 fields->Set(perfdataPrefix + ".max", pdv->GetMax());
180 fields->Set(perfdataPrefix + ".warn", pdv->GetWarn());
182 fields->Set(perfdataPrefix + ".crit", pdv->GetCrit());
184 if (!pdv->GetUnit().IsEmpty())
185 fields->Set(perfdataPrefix + ".unit", pdv->GetUnit());
190 void ElasticsearchWriter::CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
195 m_WorkQueue.Enqueue(std::bind(&ElasticsearchWriter::InternalCheckResultHandler, this, checkable, cr));
198 void ElasticsearchWriter::InternalCheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
202 CONTEXT("Elasticwriter processing check result for '" + checkable->GetName() + "'");
204 if (!IcingaApplication::GetInstance()->GetEnablePerfdata() || !checkable->GetEnablePerfdata())
208 Service::Ptr service;
209 tie(host, service) = GetHostService(checkable);
211 Dictionary::Ptr fields = new Dictionary();
214 fields->Set("service", service->GetShortName());
215 fields->Set("state", service->GetState());
216 fields->Set("last_state", service->GetLastState());
217 fields->Set("last_hard_state", service->GetLastHardState());
219 fields->Set("state", host->GetState());
220 fields->Set("last_state", host->GetLastState());
221 fields->Set("last_hard_state", host->GetLastHardState());
224 fields->Set("host", host->GetName());
225 fields->Set("state_type", checkable->GetStateType());
227 fields->Set("current_check_attempt", checkable->GetCheckAttempt());
228 fields->Set("max_check_attempts", checkable->GetMaxCheckAttempts());
230 fields->Set("reachable", checkable->IsReachable());
232 CheckCommand::Ptr commandObj = checkable->GetCheckCommand();
235 fields->Set("check_command", commandObj->GetName());
237 double ts = Utility::GetTime();
240 AddCheckResult(fields, checkable, cr);
241 ts = cr->GetExecutionEnd();
244 Enqueue(checkable, "checkresult", fields, ts);
247 void ElasticsearchWriter::StateChangeHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type)
252 m_WorkQueue.Enqueue(std::bind(&ElasticsearchWriter::StateChangeHandlerInternal, this, checkable, cr, type));
255 void ElasticsearchWriter::StateChangeHandlerInternal(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type)
259 CONTEXT("Elasticwriter processing state change '" + checkable->GetName() + "'");
262 Service::Ptr service;
263 tie(host, service) = GetHostService(checkable);
265 Dictionary::Ptr fields = new Dictionary();
267 fields->Set("current_check_attempt", checkable->GetCheckAttempt());
268 fields->Set("max_check_attempts", checkable->GetMaxCheckAttempts());
269 fields->Set("host", host->GetName());
272 fields->Set("service", service->GetShortName());
273 fields->Set("state", service->GetState());
274 fields->Set("last_state", service->GetLastState());
275 fields->Set("last_hard_state", service->GetLastHardState());
277 fields->Set("state", host->GetState());
278 fields->Set("last_state", host->GetLastState());
279 fields->Set("last_hard_state", host->GetLastHardState());
282 CheckCommand::Ptr commandObj = checkable->GetCheckCommand();
285 fields->Set("check_command", commandObj->GetName());
287 double ts = Utility::GetTime();
290 AddCheckResult(fields, checkable, cr);
291 ts = cr->GetExecutionEnd();
294 Enqueue(checkable, "statechange", fields, ts);
297 void ElasticsearchWriter::NotificationSentToAllUsersHandler(const Notification::Ptr& notification,
298 const Checkable::Ptr& checkable, const std::set<User::Ptr>& users, NotificationType type,
299 const CheckResult::Ptr& cr, const String& author, const String& text)
304 m_WorkQueue.Enqueue(std::bind(&ElasticsearchWriter::NotificationSentToAllUsersHandlerInternal, this,
305 notification, checkable, users, type, cr, author, text));
308 void ElasticsearchWriter::NotificationSentToAllUsersHandlerInternal(const Notification::Ptr& notification,
309 const Checkable::Ptr& checkable, const std::set<User::Ptr>& users, NotificationType type,
310 const CheckResult::Ptr& cr, const String& author, const String& text)
314 CONTEXT("Elasticwriter processing notification to all users '" + checkable->GetName() + "'");
316 Log(LogDebug, "ElasticsearchWriter")
317 << "Processing notification for '" << checkable->GetName() << "'";
320 Service::Ptr service;
321 tie(host, service) = GetHostService(checkable);
323 String notificationTypeString = Notification::NotificationTypeToString(type);
325 Dictionary::Ptr fields = new Dictionary();
328 fields->Set("service", service->GetShortName());
329 fields->Set("state", service->GetState());
330 fields->Set("last_state", service->GetLastState());
331 fields->Set("last_hard_state", service->GetLastHardState());
333 fields->Set("state", host->GetState());
334 fields->Set("last_state", host->GetLastState());
335 fields->Set("last_hard_state", host->GetLastHardState());
338 fields->Set("host", host->GetName());
342 for (const User::Ptr& user : users) {
343 userNames.push_back(user->GetName());
346 fields->Set("users", new Array(std::move(userNames)));
347 fields->Set("notification_type", notificationTypeString);
348 fields->Set("author", author);
349 fields->Set("text", text);
351 CheckCommand::Ptr commandObj = checkable->GetCheckCommand();
354 fields->Set("check_command", commandObj->GetName());
356 double ts = Utility::GetTime();
359 AddCheckResult(fields, checkable, cr);
360 ts = cr->GetExecutionEnd();
363 Enqueue(checkable, "notification", fields, ts);
366 void ElasticsearchWriter::Enqueue(const Checkable::Ptr& checkable, const String& type,
367 const Dictionary::Ptr& fields, double ts)
369 /* Atomically buffer the data point. */
370 boost::mutex::scoped_lock lock(m_DataBufferMutex);
372 /* Format the timestamps to dynamically select the date datatype inside the index. */
373 fields->Set("@timestamp", FormatTimestamp(ts));
374 fields->Set("timestamp", FormatTimestamp(ts));
376 String eventType = m_EventPrefix + type;
377 fields->Set("type", eventType);
379 /* Every payload needs a line describing the index.
380 * We do it this way to avoid problems with a near full queue.
382 String indexBody = "{\"index\": {} }\n";
383 String fieldsBody = JsonEncode(fields);
385 Log(LogDebug, "ElasticsearchWriter")
386 << "Checkable '" << checkable->GetName() << "' adds to metric list: '" << fieldsBody << "'.";
388 m_DataBuffer.emplace_back(indexBody + fieldsBody);
390 /* Flush if we've buffered too much to prevent excessive memory use. */
391 if (static_cast<int>(m_DataBuffer.size()) >= GetFlushThreshold()) {
392 Log(LogDebug, "ElasticsearchWriter")
393 << "Data buffer overflow writing " << m_DataBuffer.size() << " data points";
398 void ElasticsearchWriter::FlushTimeout()
400 /* Prevent new data points from being added to the array, there is a
401 * race condition where they could disappear.
403 boost::mutex::scoped_lock lock(m_DataBufferMutex);
405 /* Flush if there are any data available. */
406 if (m_DataBuffer.size() > 0) {
407 Log(LogDebug, "ElasticsearchWriter")
408 << "Timer expired writing " << m_DataBuffer.size() << " data points";
413 void ElasticsearchWriter::Flush()
415 /* Flush can be called from 1) Timeout 2) Threshold 3) on shutdown/reload. */
416 if (m_DataBuffer.empty())
419 /* Ensure you hold a lock against m_DataBuffer so that things
420 * don't go missing after creating the body and clearing the buffer.
422 String body = boost::algorithm::join(m_DataBuffer, "\n");
423 m_DataBuffer.clear();
425 /* Elasticsearch 6.x requires a new line. This is compatible to 5.x.
426 * Tested with 6.0.0 and 5.6.4.
433 void ElasticsearchWriter::SendRequest(const String& body)
435 namespace beast = boost::beast;
436 namespace http = beast::http;
438 Url::Ptr url = new Url();
440 url->SetScheme(GetEnableTls() ? "https" : "http");
441 url->SetHost(GetHost());
442 url->SetPort(GetPort());
444 std::vector<String> path;
446 /* Specify the index path. Best practice is a daily rotation.
447 * Example: http://localhost:9200/icinga2-2017.09.11?pretty=1
449 path.emplace_back(GetIndex() + "-" + Utility::FormatDateTime("%Y.%m.%d", Utility::GetTime()));
451 /* ES 6 removes multiple _type mappings: https://www.elastic.co/guide/en/elasticsearch/reference/6.x/removal-of-types.html
452 * Best practice is to statically define 'doc', as ES 5.X does not allow types starting with '_'.
454 path.emplace_back("doc");
456 /* Use the bulk message format. */
457 path.emplace_back("_bulk");
461 OptionalTlsStream stream;
465 } catch (const std::exception& ex) {
466 Log(LogWarning, "ElasticsearchWriter")
467 << "Flush failed, cannot connect to Elasticsearch: " << DiagnosticInformation(ex, false);
471 Defer s ([&stream]() {
473 stream.first->next_layer().shutdown();
477 http::request<http::string_body> request (http::verb::post, std::string(url->Format(true)), 10);
479 request.set(http::field::user_agent, "Icinga/" + Application::GetAppVersion());
480 request.set(http::field::host, url->GetHost() + ":" + url->GetPort());
482 /* Specify required headers by Elasticsearch. */
483 request.set(http::field::accept, "application/json");
485 /* Use application/x-ndjson for bulk streams. While ES
486 * is able to handle application/json, the newline separator
487 * causes problems with Logstash (#6609).
489 request.set(http::field::content_type, "application/x-ndjson");
491 /* Send authentication if configured. */
492 String username = GetUsername();
493 String password = GetPassword();
495 if (!username.IsEmpty() && !password.IsEmpty())
496 request.set(http::field::authorization, "Basic " + Base64::Encode(username + ":" + password));
498 request.body() = body;
499 request.set(http::field::content_length, request.body().size());
501 /* Don't log the request body to debug log, this is already done above. */
502 Log(LogDebug, "ElasticsearchWriter")
503 << "Sending " << request.method_string() << " request" << ((!username.IsEmpty() && !password.IsEmpty()) ? " with basic auth" : "" )
504 << " to '" << url->Format() << "'.";
508 http::write(*stream.first, request);
509 stream.first->flush();
511 http::write(*stream.second, request);
512 stream.second->flush();
514 } catch (const std::exception&) {
515 Log(LogWarning, "ElasticsearchWriter")
516 << "Cannot write to HTTP API on host '" << GetHost() << "' port '" << GetPort() << "'.";
520 http::parser<false, http::string_body> parser;
521 beast::flat_buffer buf;
525 http::read(*stream.first, buf, parser);
527 http::read(*stream.second, buf, parser);
529 } catch (const std::exception& ex) {
530 Log(LogWarning, "ElasticsearchWriter")
531 << "Failed to parse HTTP response from host '" << GetHost() << "' port '" << GetPort() << "': " << DiagnosticInformation(ex, false);
535 auto& response (parser.get());
537 if (response.result_int() > 299) {
538 if (response.result() == http::status::unauthorized) {
539 /* More verbose error logging with Elasticsearch is hidden behind a proxy. */
540 if (!username.IsEmpty() && !password.IsEmpty()) {
541 Log(LogCritical, "ElasticsearchWriter")
542 << "401 Unauthorized. Please ensure that the user '" << username
543 << "' is able to authenticate against the HTTP API/Proxy.";
545 Log(LogCritical, "ElasticsearchWriter")
546 << "401 Unauthorized. The HTTP API requires authentication but no username/password has been configured.";
552 std::ostringstream msgbuf;
553 msgbuf << "Unexpected response code " << response.result_int() << " from URL '" << url->Format() << "'";
555 auto& contentType (response[http::field::content_type]);
557 if (contentType != "application/json" && contentType != "application/json; charset=utf-8") {
558 msgbuf << "; Unexpected Content-Type: '" << contentType << "'";
561 auto& body (response.body());
564 msgbuf << "; Response body: '" << body << "'";
565 #endif /* I2_DEBUG */
567 Dictionary::Ptr jsonResponse;
570 jsonResponse = JsonDecode(body);
572 Log(LogWarning, "ElasticsearchWriter")
573 << "Unable to parse JSON response:\n" << body;
577 String error = jsonResponse->Get("error");
579 Log(LogCritical, "ElasticsearchWriter")
580 << "Error: '" << error << "'. " << msgbuf.str();
584 OptionalTlsStream ElasticsearchWriter::Connect()
586 Log(LogNotice, "ElasticsearchWriter")
587 << "Connecting to Elasticsearch on host '" << GetHost() << "' port '" << GetPort() << "'.";
589 OptionalTlsStream stream;
590 bool tls = GetEnableTls();
593 std::shared_ptr<boost::asio::ssl::context> sslContext;
596 sslContext = MakeAsioSslContext(GetCertPath(), GetKeyPath(), GetCaPath());
597 } catch (const std::exception&) {
598 Log(LogWarning, "ElasticsearchWriter")
599 << "Unable to create SSL context.";
603 stream.first = std::make_shared<AsioTlsStream>(IoEngine::Get().GetIoService(), *sslContext, GetHost());
605 stream.second = std::make_shared<AsioTcpStream>(IoEngine::Get().GetIoService());
609 icinga::Connect(tls ? stream.first->lowest_layer() : stream.second->lowest_layer(), GetHost(), GetPort());
610 } catch (const std::exception&) {
611 Log(LogWarning, "ElasticsearchWriter")
612 << "Can't connect to Elasticsearch on host '" << GetHost() << "' port '" << GetPort() << "'.";
617 auto& tlsStream (stream.first->next_layer());
620 tlsStream.handshake(tlsStream.client);
621 } catch (const std::exception&) {
622 Log(LogWarning, "ElasticsearchWriter")
623 << "TLS handshake with host '" << GetHost() << "' on port " << GetPort() << " failed.";
628 return std::move(stream);
631 void ElasticsearchWriter::AssertOnWorkQueue()
633 ASSERT(m_WorkQueue.IsWorkerThread());
636 void ElasticsearchWriter::ExceptionHandler(boost::exception_ptr exp)
638 Log(LogCritical, "ElasticsearchWriter", "Exception during Elastic operation: Verify that your backend is operational!");
640 Log(LogDebug, "ElasticsearchWriter")
641 << "Exception during Elasticsearch operation: " << DiagnosticInformation(std::move(exp));
644 String ElasticsearchWriter::FormatTimestamp(double ts)
646 /* The date format must match the default dynamic date detection
647 * pattern in indexes. This enables applications like Kibana to
648 * detect a qualified timestamp index for time-series data.
650 * Example: 2017-09-11T10:56:21.463+0200
653 * https://www.elastic.co/guide/en/elasticsearch/reference/current/dynamic-field-mapping.html#date-detection
654 * https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping-date-format.html
655 * https://www.elastic.co/guide/en/elasticsearch/reference/current/date.html
657 auto milliSeconds = static_cast<int>((ts - static_cast<int>(ts)) * 1000);
659 return Utility::FormatDateTime("%Y-%m-%dT%H:%M:%S", ts) + "." + Convert::ToString(milliSeconds) + Utility::FormatDateTime("%z", ts);