From 651379db6feb4f4a55f7623585be26552b60d30d Mon Sep 17 00:00:00 2001 From: Jean Flach Date: Thu, 7 Sep 2017 15:11:57 +0200 Subject: [PATCH] Add ElasticWriter fixes #5538 --- doc/09-object-types.md | 35 ++ doc/14-features.md | 76 ++- etc/icinga2/features-available/elastic.conf | 10 + lib/perfdata/CMakeLists.txt | 8 +- lib/perfdata/elasticwriter.cpp | 535 ++++++++++++++++++++ lib/perfdata/elasticwriter.hpp | 81 +++ lib/perfdata/elasticwriter.ti | 31 ++ 7 files changed, 767 insertions(+), 9 deletions(-) create mode 100644 etc/icinga2/features-available/elastic.conf create mode 100644 lib/perfdata/elasticwriter.cpp create mode 100644 lib/perfdata/elasticwriter.hpp create mode 100644 lib/perfdata/elasticwriter.ti diff --git a/doc/09-object-types.md b/doc/09-object-types.md index aec60a6ce..f3bf882ab 100644 --- a/doc/09-object-types.md +++ b/doc/09-object-types.md @@ -969,6 +969,41 @@ is associated with the service: ... } +## ElasticWriter + +Writes check result metrics and performance data to an Elasticsearch instance. + +Example: + + library "perfdata" + + object ElasticWriter "elastic" { + host = "127.0.0.1" + port = 9200 + index = "icinga2" + + enable_send_perfdata = true + + flush_threshold = 1024 + flush_interval = 10 + } + +The index is rotated daily, as is recommended by Elastic, meaning the index will be renamed to `$index-$d.$M.$y`. + +Configuration Attributes: + + Name |Description + -----------------------|--------------------------------------------------------------------------------------------------------- + host | **Required.** Elasticsearch host address. Defaults to `127.0.0.1`. + port | **Required.** Elasticsearch port. Defaults to `9200`. + index | **Required.** Elasticsearch index name. Defaults to `icinga2`. + enable_send_perfdata | **Optional.** Send parsed performance data metrics for check results. Defaults to `false`. + flush_interval | **Optional.** How long to buffer data points before transfering to Elasticsearch. Defaults to `10`. + flush_threshold | **Optional.** How many data points to buffer before forcing a transfer to Elasticsearch. Defaults to `1024`. + +Note: If `flush_threshold` is set too low, this will force the feature to flush all data to Elasticsearch too often. +Experiment with the setting, if you are processing more than 1024 metrics per second or similar. + ## LiveStatusListener Livestatus API interface available as TCP or UNIX socket. Historical table queries diff --git a/doc/14-features.md b/doc/14-features.md index 0fa7cf62b..70c75098e 100644 --- a/doc/14-features.md +++ b/doc/14-features.md @@ -264,6 +264,74 @@ expects the InfluxDB daemon to listen at `127.0.0.1` on port `8086`. More configuration details can be found [here](09-object-types.md#objecttype-influxdbwriter). +### Elastic Stack Integration + +[Icingabeat](https://github.com/icinga/icingabeat) is an Elastic Beat that fetches data +from the Icinga 2 API and sends it either directly to [Elasticsearch](https://www.elastic.co/products/elasticsearch) +or [Logstash](https://www.elastic.co/products/logstash). + +More integrations: + +* [Logstash output](https://github.com/Icinga/logstash-output-icinga) for the Icinga 2 API. +* [Logstash Grok Pattern](https://github.com/Icinga/logstash-grok-pattern) for Icinga 2 logs. + +#### Elastic Writer + +This feature forwards check results, state changes and notification events +to an [Elasticsearch](https://www.elastic.co/products/elasticsearch) installation over its HTTP API. + +The check results include parsed performance data metrics if enabled. + +> **Note** +> +> Elasticsearch 5.x+ is required. + +Enable the feature and restart Icinga 2. + + # icinga2 feature enable elastic + +The default configuration expects an Elasticsearch instance running on `localhost` on port `9200 + and writes to an index called `icinga2`. + +More configuration details can be found [here](09-object-types.md#objecttype-elasticwriter). + +#### Current Elasticsearch Schema + +The following event types are written to Elasticsearch: + +* icinga2.event.checkresult +* icinga2.event.statechange +* icinga2.event.notification + +Performance data metrics must be explicitly enabled with the `enable_send_perfdata` +attribute. + +Metric values are stored like this: + + check_result.perfdata..value + +The following characters are escaped in perfdata labels: + + Character | Escaped character + --------------|-------------------------- + whitespace | _ + \ | _ + / | _ + :: | . + +Note that perfdata labels may contain dots (`.`) allowing to +add more subsequent levels inside the tree. +`::` adds support for [multi performance labels](http://my-plugin.de/wiki/projects/check_multi/configuration/performance) +and is therefore replaced by `.`. + +Icinga 2 automatically adds the following threshold metrics +if existing: + + check_result.perfdata..min + check_result.perfdata..max + check_result.perfdata..warn + check_result.perfdata..crit + ### Graylog Integration #### GELF Writer @@ -288,14 +356,6 @@ Currently these events are processed: * State changes * Notifications -### Elastic Stack Integration - -[Icingabeat](https://github.com/icinga/icingabeat) is an Elastic Beat that fetches data -from the Icinga 2 API and sends it either directly to Elasticsearch or Logstash. - -More integrations in development: -* [Logstash output](https://github.com/Icinga/logstash-output-icinga) for the Icinga 2 API. -* [Logstash Grok Pattern](https://github.com/Icinga/logstash-grok-pattern) for Icinga 2 logs. ### OpenTSDB Writer diff --git a/etc/icinga2/features-available/elastic.conf b/etc/icinga2/features-available/elastic.conf new file mode 100644 index 000000000..06637e327 --- /dev/null +++ b/etc/icinga2/features-available/elastic.conf @@ -0,0 +1,10 @@ +library "perfdata" + +object ElasticWriter "elastic" { + //host = "127.0.0.1" + //port = 9200 + //index = "icinga2" + //send_enable_perfdata = false + //flush_threshold = 1024 + //flush_interval = 10s +} diff --git a/lib/perfdata/CMakeLists.txt b/lib/perfdata/CMakeLists.txt index 08920dd3d..02f2daefe 100644 --- a/lib/perfdata/CMakeLists.txt +++ b/lib/perfdata/CMakeLists.txt @@ -18,11 +18,12 @@ mkclass_target(gelfwriter.ti gelfwriter.tcpp gelfwriter.thpp) mkclass_target(graphitewriter.ti graphitewriter.tcpp graphitewriter.thpp) mkclass_target(influxdbwriter.ti influxdbwriter.tcpp influxdbwriter.thpp) +mkclass_target(elasticwriter.ti elasticwriter.tcpp elasticwriter.thpp) mkclass_target(opentsdbwriter.ti opentsdbwriter.tcpp opentsdbwriter.thpp) mkclass_target(perfdatawriter.ti perfdatawriter.tcpp perfdatawriter.thpp) set(perfdata_SOURCES - gelfwriter.cpp gelfwriter.thpp graphitewriter.cpp graphitewriter.thpp influxdbwriter.cpp influxdbwriter.thpp opentsdbwriter.cpp opentsdbwriter.thpp perfdatawriter.cpp perfdatawriter.thpp + gelfwriter.cpp gelfwriter.thpp graphitewriter.cpp graphitewriter.thpp influxdbwriter.cpp influxdbwriter.thpp elasticwriter.cpp elasticwriter.thpp opentsdbwriter.cpp opentsdbwriter.thpp perfdatawriter.cpp perfdatawriter.thpp ) if(ICINGA2_UNITY_BUILD) @@ -56,6 +57,11 @@ install_if_not_exists( ${CMAKE_INSTALL_SYSCONFDIR}/icinga2/features-available ) +install_if_not_exists( + ${PROJECT_SOURCE_DIR}/etc/icinga2/features-available/elastic.conf + ${CMAKE_INSTALL_SYSCONFDIR}/icinga2/features-available +) + install_if_not_exists( ${PROJECT_SOURCE_DIR}/etc/icinga2/features-available/opentsdb.conf ${CMAKE_INSTALL_SYSCONFDIR}/icinga2/features-available diff --git a/lib/perfdata/elasticwriter.cpp b/lib/perfdata/elasticwriter.cpp new file mode 100644 index 000000000..667dafbb8 --- /dev/null +++ b/lib/perfdata/elasticwriter.cpp @@ -0,0 +1,535 @@ +/****************************************************************************** + * Icinga 2 * + * Copyright (C) 2012-2017 Icinga Development Team (https://www.icinga.com/) * + * * + * This program is free software; you can redistribute it and/or * + * modify it under the terms of the GNU General Public License * + * as published by the Free Software Foundation; either version 2 * + * of the License, or (at your option) any later version. * + * * + * This program is distributed in the hope that it will be useful, * + * but WITHOUT ANY WARRANTY; without even the implied warranty of * + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * + * GNU General Public License for more details. * + * * + * You should have received a copy of the GNU General Public License * + * along with this program; if not, write to the Free Software Foundation * + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. * + ******************************************************************************/ + +#include "perfdata/elasticwriter.hpp" +#include "perfdata/elasticwriter.tcpp" +#include "remote/url.hpp" +#include "remote/httprequest.hpp" +#include "remote/httpresponse.hpp" +#include "icinga/compatutility.hpp" +#include "icinga/service.hpp" +#include "icinga/checkcommand.hpp" +#include "base/tcpsocket.hpp" +#include "base/stream.hpp" +#include "base/json.hpp" +#include "base/utility.hpp" +#include "base/networkstream.hpp" +#include "base/perfdatavalue.hpp" +#include "base/exception.hpp" +#include "base/statsfunction.hpp" +#include + +using namespace icinga; + +REGISTER_TYPE(ElasticWriter); + +REGISTER_STATSFUNCTION(ElasticWriter, &ElasticWriter::StatsFunc); + +ElasticWriter::ElasticWriter(void) + : m_WorkQueue(10000000, 1) +{ } + +void ElasticWriter::OnConfigLoaded(void) +{ + ObjectImpl::OnConfigLoaded(); + + m_WorkQueue.SetName("ElasticWriter, " + GetName()); +} + +void ElasticWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata) +{ + Dictionary::Ptr nodes = new Dictionary(); + + for (const ElasticWriter::Ptr& elasticwriter : ConfigType::GetObjectsByType()) { + size_t workQueueItems = elasticwriter->m_WorkQueue.GetLength(); + double workQueueItemRate = elasticwriter->m_WorkQueue.GetTaskCount(60) / 60.0; + + Dictionary::Ptr stats = new Dictionary(); + stats->Set("work_queue_items", workQueueItems); + stats->Set("work_queue_item_rate", workQueueItemRate); + + nodes->Set(elasticwriter->GetName(), stats); + + perfdata->Add(new PerfdataValue("elasticwriter_" + elasticwriter->GetName() + "_work_queue_items", workQueueItems)); + perfdata->Add(new PerfdataValue("elasticwriter_" + elasticwriter->GetName() + "_work_queue_item_rate", workQueueItemRate)); + } + + status->Set("elasticwriter", nodes); +} + +void ElasticWriter::Start(bool runtimeCreated) +{ + ObjectImpl::Start(runtimeCreated); + + m_EventPrefix = "icinga2.event."; + + Log(LogInformation, "ElasticWriter") + << "'" << GetName() << "' started."; + + m_WorkQueue.SetExceptionCallback(boost::bind(&ElasticWriter::ExceptionHandler, this, _1)); + + /* Setup timer for periodically flushing m_DataBuffer */ + m_FlushTimer = new Timer(); + m_FlushTimer->SetInterval(GetFlushInterval()); + m_FlushTimer->OnTimerExpired.connect(boost::bind(&ElasticWriter::FlushTimeout, this)); + m_FlushTimer->Start(); + m_FlushTimer->Reschedule(0); + + /* Register for new metrics. */ + Checkable::OnNewCheckResult.connect(boost::bind(&ElasticWriter::CheckResultHandler, this, _1, _2)); + Checkable::OnStateChange.connect(boost::bind(&ElasticWriter::StateChangeHandler, this, _1, _2, _3)); + Checkable::OnNotificationSentToAllUsers.connect(boost::bind(&ElasticWriter::NotificationSentToAllUsersHandler, this, _1, _2, _3, _4, _5, _6, _7)); +} + +void ElasticWriter::Stop(bool runtimeRemoved) +{ + Log(LogInformation, "ElasticWriter") + << "'" << GetName() << "' stopped."; + + m_WorkQueue.Join(); + + ObjectImpl::Stop(runtimeRemoved); +} + +void ElasticWriter::AddCheckResult(const Dictionary::Ptr& fields, const Checkable::Ptr& checkable, const CheckResult::Ptr& cr) +{ + String prefix = "check_result."; + + fields->Set(prefix + "output", cr->GetOutput()); + fields->Set(prefix + "check_source", cr->GetCheckSource()); + fields->Set(prefix + "exit_status", cr->GetExitStatus()); + fields->Set(prefix + "command", cr->GetCommand()); + fields->Set(prefix + "state", cr->GetState()); + fields->Set(prefix + "vars_before", cr->GetVarsBefore()); + fields->Set(prefix + "vars_after", cr->GetVarsAfter()); + + fields->Set(prefix + "execution_start", FormatTimestamp(cr->GetExecutionStart())); + fields->Set(prefix + "execution_end", FormatTimestamp(cr->GetExecutionEnd())); + fields->Set(prefix + "schedule_start", FormatTimestamp(cr->GetScheduleStart())); + fields->Set(prefix + "schedule_end", FormatTimestamp(cr->GetScheduleEnd())); + + /* Add extra calculated field. */ + fields->Set(prefix + "latency", cr->CalculateLatency()); + fields->Set(prefix + "execution_time", cr->CalculateExecutionTime()); + + if (!GetEnableSendPerfdata()) + return; + + Array::Ptr perfdata = cr->GetPerformanceData(); + + if (perfdata) { + ObjectLock olock(perfdata); + for (const Value& val : perfdata) { + PerfdataValue::Ptr pdv; + + if (val.IsObjectType()) + pdv = val; + else { + try { + pdv = PerfdataValue::Parse(val); + } catch (const std::exception&) { + Log(LogWarning, "ElasticWriter") + << "Ignoring invalid perfdata value: '" << val << "' for object '" + << checkable->GetName() << "'."; + } + } + + String escapedKey = pdv->GetLabel(); + boost::replace_all(escapedKey, " ", "_"); + boost::replace_all(escapedKey, ".", "_"); + boost::replace_all(escapedKey, "\\", "_"); + boost::algorithm::replace_all(escapedKey, "::", "."); + + String perfdataPrefix = prefix + "perfdata." + escapedKey; + + fields->Set(perfdataPrefix + ".value", pdv->GetValue()); + + if (pdv->GetMin()) + fields->Set(perfdataPrefix + ".min", pdv->GetMin()); + if (pdv->GetMax()) + fields->Set(perfdataPrefix + ".max", pdv->GetMax()); + if (pdv->GetWarn()) + fields->Set(perfdataPrefix + ".warn", pdv->GetWarn()); + if (pdv->GetCrit()) + fields->Set(perfdataPrefix + ".crit", pdv->GetCrit()); + } + } +} + +void ElasticWriter::CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr) +{ + m_WorkQueue.Enqueue(boost::bind(&ElasticWriter::InternalCheckResultHandler, this, checkable, cr)); +} + +void ElasticWriter::InternalCheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr) +{ + AssertOnWorkQueue(); + + CONTEXT("Elasticwriter processing check result for '" + checkable->GetName() + "'"); + + if (!IcingaApplication::GetInstance()->GetEnablePerfdata() || !checkable->GetEnablePerfdata()) + return; + + Host::Ptr host; + Service::Ptr service; + boost::tie(host, service) = GetHostService(checkable); + + Dictionary::Ptr fields = new Dictionary(); + + if (service) { + fields->Set("service", service->GetShortName()); + fields->Set("state", service->GetState()); + fields->Set("last_state", service->GetLastState()); + fields->Set("last_hard_state", service->GetLastHardState()); + } else { + fields->Set("state", host->GetState()); + fields->Set("last_state", host->GetLastState()); + fields->Set("last_hard_state", host->GetLastHardState()); + } + + fields->Set("host", host->GetName()); + fields->Set("state_type", checkable->GetStateType()); + + fields->Set("current_check_attempt", checkable->GetCheckAttempt()); + fields->Set("max_check_attempts", checkable->GetMaxCheckAttempts()); + + fields->Set("reachable", checkable->IsReachable()); + + CheckCommand::Ptr commandObj = checkable->GetCheckCommand(); + + if (commandObj) + fields->Set("check_command", commandObj->GetName()); + + double ts = Utility::GetTime(); + + if (cr) { + AddCheckResult(fields, checkable, cr); + ts = cr->GetExecutionEnd(); + } + + Enqueue("checkresult", fields, ts); +} + +void ElasticWriter::StateChangeHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type) +{ + m_WorkQueue.Enqueue(boost::bind(&ElasticWriter::StateChangeHandlerInternal, this, checkable, cr, type)); +} + +void ElasticWriter::StateChangeHandlerInternal(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type) +{ + AssertOnWorkQueue(); + + CONTEXT("Elasticwriter processing state change '" + checkable->GetName() + "'"); + + Host::Ptr host; + Service::Ptr service; + tie(host, service) = GetHostService(checkable); + + Dictionary::Ptr fields = new Dictionary(); + + fields->Set("current_check_attempt", checkable->GetCheckAttempt()); + fields->Set("max_check_attempts", checkable->GetMaxCheckAttempts()); + fields->Set("host", host->GetName()); + + if (service) { + fields->Set("service", service->GetShortName()); + fields->Set("state", service->GetState()); + fields->Set("last_state", service->GetLastState()); + fields->Set("last_hard_state", service->GetLastHardState()); + } else { + fields->Set("state", host->GetState()); + fields->Set("last_state", host->GetLastState()); + fields->Set("last_hard_state", host->GetLastHardState()); + } + + CheckCommand::Ptr commandObj = checkable->GetCheckCommand(); + + if (commandObj) + fields->Set("check_command", commandObj->GetName()); + + double ts = Utility::GetTime(); + + if (cr) { + AddCheckResult(fields, checkable, cr); + ts = cr->GetExecutionEnd(); + } + + Enqueue("statechange", fields, ts); +} + +void ElasticWriter::NotificationSentToAllUsersHandler(const Notification::Ptr& notification, + const Checkable::Ptr& checkable, const std::set& users, NotificationType type, + const CheckResult::Ptr& cr, const String& author, const String& text) +{ + m_WorkQueue.Enqueue(boost::bind(&ElasticWriter::NotificationSentToAllUsersHandlerInternal, this, + notification, checkable, users, type, cr, author, text)); +} + +void ElasticWriter::NotificationSentToAllUsersHandlerInternal(const Notification::Ptr& notification, + const Checkable::Ptr& checkable, const std::set& users, NotificationType type, + const CheckResult::Ptr& cr, const String& author, const String& text) +{ + AssertOnWorkQueue(); + + CONTEXT("Elasticwriter processing notification to all users '" + checkable->GetName() + "'"); + + Log(LogDebug, "ElasticWriter") + << "Processing notification for '" << checkable->GetName() << "'"; + + Host::Ptr host; + Service::Ptr service; + tie(host, service) = GetHostService(checkable); + + String notificationTypeString = Notification::NotificationTypeToString(type); + + Dictionary::Ptr fields = new Dictionary(); + + if (service) { + fields->Set("service", service->GetShortName()); + fields->Set("state", service->GetState()); + fields->Set("last_state", service->GetLastState()); + fields->Set("last_hard_state", service->GetLastHardState()); + } else { + fields->Set("state", host->GetState()); + fields->Set("last_state", host->GetLastState()); + fields->Set("last_hard_state", host->GetLastHardState()); + } + + fields->Set("host", host->GetName()); + + Array::Ptr userNames = new Array(); + + for (const User::Ptr& user : users) { + userNames->Add(user->GetName()); + } + + fields->Set("users", userNames); + fields->Set("notification_type", notificationTypeString); + fields->Set("author", author); + fields->Set("text", text); + + CheckCommand::Ptr commandObj = checkable->GetCheckCommand(); + + if (commandObj) + fields->Set("check_command", commandObj->GetName()); + + double ts = Utility::GetTime(); + + if (cr) { + AddCheckResult(fields, checkable, cr); + ts = cr->GetExecutionEnd(); + } + + Enqueue("notification", fields, ts); +} + +void ElasticWriter::Enqueue(String type, const Dictionary::Ptr& fields, double ts) +{ + /* Atomically buffer the data point. */ + boost::mutex::scoped_lock lock(m_DataBufferMutex); + + /* Format the timestamps to dynamically select the date datatype inside the index. */ + fields->Set("@timestamp", FormatTimestamp(ts)); + fields->Set("timestamp", FormatTimestamp(ts)); + + String eventType = m_EventPrefix + type; + fields->Set("type", eventType); + + /* Every payload needs a line describing the index above. + * We do it this way to avoid problems with a near full queue. + */ + + String data; + + data += "{ \"index\" : { \"_type\" : \"" + eventType + "\" } }\n"; + data += JsonEncode(fields); + + m_DataBuffer.push_back(data); + + /* Flush if we've buffered too much to prevent excessive memory use. */ + if (static_cast(m_DataBuffer.size()) >= GetFlushThreshold()) { + Log(LogDebug, "ElasticWriter") + << "Data buffer overflow writing " << m_DataBuffer.size() << " data points"; + Flush(); + } +} + +void ElasticWriter::FlushTimeout(void) +{ + /* Prevent new data points from being added to the array, there is a + * race condition where they could disappear. + */ + boost::mutex::scoped_lock lock(m_DataBufferMutex); + + /* Flush if there are any data available. */ + if (m_DataBuffer.size() > 0) { + Log(LogDebug, "ElasticWriter") + << "Timer expired writing " << m_DataBuffer.size() << " data points"; + Flush(); + } +} + +void ElasticWriter::Flush(void) +{ + /* Ensure you hold a lock against m_DataBuffer so that things + * don't go missing after creating the body and clearing the buffer. + */ + String body = boost::algorithm::join(m_DataBuffer, "\n"); + m_DataBuffer.clear(); + + SendRequest(body); +} + +void ElasticWriter::SendRequest(const String& body) +{ + Url::Ptr url = new Url(); + url->SetScheme("http"); + url->SetHost(GetHost()); + url->SetPort(GetPort()); + + std::vector path; + + /* Specify the index path. Best practice is a daily rotation. + * Example: http://localhost:9200/icinga2-2017.09.11?pretty=1 + */ + path.push_back(GetIndex() + "-" + Utility::FormatDateTime("%Y.%m.%d", Utility::GetTime())); + + /* Use the bulk message format. */ + path.push_back("_bulk"); + + url->SetPath(path); + + Stream::Ptr stream = Connect(); + HttpRequest req(stream); + + /* Specify required headers by Elasticsearch. */ + req.AddHeader("Accept", "application/json"); + req.AddHeader("Content-Type", "application/json"); + + req.RequestMethod = "POST"; + req.RequestUrl = url; + +#ifdef I2_DEBUG /* I2_DEBUG */ + Log(LogDebug, "ElasticWriter") + << "Sending body: " << body; +#endif /* I2_DEBUG */ + + try { + req.WriteBody(body.CStr(), body.GetLength()); + req.Finish(); + } catch (const std::exception& ex) { + Log(LogWarning, "ElasticWriter") + << "Cannot write to HTTP API on host '" << GetHost() << "' port '" << GetPort() << "'."; + throw ex; + } + + HttpResponse resp(stream, req); + StreamReadContext context; + + try { + resp.Parse(context, true); + } catch (const std::exception& ex) { + Log(LogWarning, "ElasticWriter") + << "Cannot read from HTTP API on host '" << GetHost() << "' port '" << GetPort() << "'."; + throw ex; + } + + if (resp.StatusCode > 299) { + Log(LogWarning, "ElasticWriter") + << "Unexpected response code " << resp.StatusCode; + + /* Finish parsing the headers and body. */ + while (!resp.Complete) + resp.Parse(context, true); + + String contentType = resp.Headers->Get("content-type"); + if (contentType != "application/json") { + Log(LogWarning, "ElasticWriter") + << "Unexpected Content-Type: " << contentType; + return; + } + + size_t responseSize = resp.GetBodySize(); + boost::scoped_array buffer(new char[responseSize + 1]); + resp.ReadBody(buffer.get(), responseSize); + buffer.get()[responseSize] = '\0'; + + Dictionary::Ptr jsonResponse; + try { + jsonResponse = JsonDecode(buffer.get()); + } catch (...) { + Log(LogWarning, "ElasticWriter") + << "Unable to parse JSON response:\n" << buffer.get(); + return; + } + + String error = jsonResponse->Get("error"); + + Log(LogCritical, "ElasticWriter") + << "Elasticsearch error message:\n" << error; + } +} + +Stream::Ptr ElasticWriter::Connect(void) +{ + TcpSocket::Ptr socket = new TcpSocket(); + + Log(LogNotice, "ElasticWriter") + << "Connecting to Elasticsearch on host '" << GetHost() << "' port '" << GetPort() << "'."; + + try { + socket->Connect(GetHost(), GetPort()); + } catch (const std::exception& ex) { + Log(LogWarning, "ElasticWriter") + << "Can't connect to Elasticsearch on host '" << GetHost() << "' port '" << GetPort() << "'."; + throw ex; + } + return new NetworkStream(socket); +} + +void ElasticWriter::AssertOnWorkQueue(void) +{ + ASSERT(m_WorkQueue.IsWorkerThread()); +} + +void ElasticWriter::ExceptionHandler(boost::exception_ptr exp) +{ + Log(LogCritical, "ElasticWriter", "Exception during Elastic operation: Verify that your backend is operational!"); + + Log(LogDebug, "ElasticWriter") + << "Exception during Elasticsearch operation: " << DiagnosticInformation(exp); +} + +String ElasticWriter::FormatTimestamp(double ts) +{ + /* The date format must match the default dynamic date detection + * pattern in indexes. This enables applications like Kibana to + * detect a qualified timestamp index for time-series data. + * + * Example: 2017-09-11T10:56:21.463+0200 + * + * References: + * https://www.elastic.co/guide/en/elasticsearch/reference/current/dynamic-field-mapping.html#date-detection + * https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping-date-format.html + * https://www.elastic.co/guide/en/elasticsearch/reference/current/date.html + */ + int milliSeconds = static_cast((ts - static_cast(ts)) * 1000); + + return Utility::FormatDateTime("%Y-%m-%dT%H:%M:%S", ts) + "." + String(milliSeconds) + Utility::FormatDateTime("%z", ts); +} diff --git a/lib/perfdata/elasticwriter.hpp b/lib/perfdata/elasticwriter.hpp new file mode 100644 index 000000000..ea037b8f1 --- /dev/null +++ b/lib/perfdata/elasticwriter.hpp @@ -0,0 +1,81 @@ +/****************************************************************************** + * Icinga 2 * + * Copyright (C) 2012-2017 Icinga Development Team (https://www.icinga.com/) * + * * + * This program is free software; you can redistribute it and/or * + * modify it under the terms of the GNU General Public License * + * as published by the Free Software Foundation; either version 2 * + * of the License, or (at your option) any later version. * + * * + * This program is distributed in the hope that it will be useful, * + * but WITHOUT ANY WARRANTY; without even the implied warranty of * + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * + * GNU General Public License for more details. * + * * + * You should have received a copy of the GNU General Public License * + * along with this program; if not, write to the Free Software Foundation * + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. * + ******************************************************************************/ + +#ifndef ELASTICWRITER_H +#define ELASTICWRITER_H + +#include "perfdata/elasticwriter.thpp" +#include "icinga/service.hpp" +#include "base/configobject.hpp" +#include "base/workqueue.hpp" +#include "base/timer.hpp" + +namespace icinga +{ + +class ElasticWriter : public ObjectImpl +{ +public: + DECLARE_OBJECT(ElasticWriter); + DECLARE_OBJECTNAME(ElasticWriter); + + ElasticWriter(void); + + static void StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata); + + static String FormatTimestamp(double ts); + +protected: + virtual void OnConfigLoaded(void) override; + virtual void Start(bool runtimeCreated) override; + virtual void Stop(bool runtimeRemoved) override; + +private: + String m_EventPrefix; + WorkQueue m_WorkQueue; + Timer::Ptr m_FlushTimer; + std::vector m_DataBuffer; + boost::mutex m_DataBufferMutex; + + void AddCheckResult(const Dictionary::Ptr& fields, const Checkable::Ptr& checkable, const CheckResult::Ptr& cr); + + void StateChangeHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type); + void StateChangeHandlerInternal(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type); + void CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr); + void InternalCheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr); + void NotificationSentToAllUsersHandler(const Notification::Ptr& notification, + const Checkable::Ptr& checkable, const std::set& users, NotificationType type, + const CheckResult::Ptr& cr, const String& author, const String& text); + void NotificationSentToAllUsersHandlerInternal(const Notification::Ptr& notification, + const Checkable::Ptr& checkable, const std::set& users, NotificationType type, + const CheckResult::Ptr& cr, const String& author, const String& text); + + void Enqueue(String type, const Dictionary::Ptr& fields, double ts); + + Stream::Ptr Connect(void); + void AssertOnWorkQueue(void); + void ExceptionHandler(boost::exception_ptr exp); + void FlushTimeout(void); + void Flush(void); + void SendRequest(const String& body); +}; + +} + +#endif /* ELASTICWRITER_H */ diff --git a/lib/perfdata/elasticwriter.ti b/lib/perfdata/elasticwriter.ti new file mode 100644 index 000000000..430bb402c --- /dev/null +++ b/lib/perfdata/elasticwriter.ti @@ -0,0 +1,31 @@ +#include "base/configobject.hpp" + +library perfdata; + +namespace icinga +{ + +class ElasticWriter : ConfigObject +{ + [config, required] String host { + default {{{ return "127.0.0.1"; }}} + }; + [config, required] String port { + default {{{ return "9200"; }}} + }; + [config, required] String index { + default {{{ return "icinga2"; }}} + }; + [config] bool enable_send_perfdata { + default {{{ return false; }}} + }; + + [config] int flush_interval { + default {{{ return 10; }}} + }; + [config] int flush_threshold { + default {{{ return 1024; }}} + }; +}; + +} -- 2.40.0