]> granicus.if.org Git - icinga2/commitdiff
Add ElasticWriter
authorJean Flach <jean-marcel.flach@icinga.com>
Thu, 7 Sep 2017 13:11:57 +0000 (15:11 +0200)
committerJean Flach <jean-marcel.flach@icinga.com>
Mon, 11 Sep 2017 12:27:04 +0000 (14:27 +0200)
fixes #5538

doc/09-object-types.md
doc/14-features.md
etc/icinga2/features-available/elastic.conf [new file with mode: 0644]
lib/perfdata/CMakeLists.txt
lib/perfdata/elasticwriter.cpp [new file with mode: 0644]
lib/perfdata/elasticwriter.hpp [new file with mode: 0644]
lib/perfdata/elasticwriter.ti [new file with mode: 0644]

index aec60a6cec16c1b275f63fa97b6703774a0060b9..f3bf882abfb018c6d4cac37220deb6a5a2aa0449 100644 (file)
@@ -969,6 +969,41 @@ is associated with the service:
       ...
     }
 
+## ElasticWriter <a id="objecttype-elasticwriter"></a>
+
+Writes check result metrics and performance data to an Elasticsearch instance.
+
+Example:
+
+    library "perfdata"
+
+    object ElasticWriter "elastic" {
+      host = "127.0.0.1"
+      port = 9200
+      index = "icinga2"
+
+      enable_send_perfdata = true
+
+      flush_threshold = 1024
+      flush_interval = 10
+    }
+
+The index is rotated daily, as is recommended by Elastic, meaning the index will be renamed to `$index-$d.$M.$y`.
+
+Configuration Attributes:
+
+  Name                   |Description
+  -----------------------|---------------------------------------------------------------------------------------------------------
+  host                   | **Required.** Elasticsearch host address. Defaults to `127.0.0.1`.
+  port                   | **Required.** Elasticsearch port. Defaults to `9200`.
+  index                  | **Required.** Elasticsearch index name. Defaults to `icinga2`.
+  enable_send_perfdata   | **Optional.** Send parsed performance data metrics for check results. Defaults to `false`.
+  flush_interval         | **Optional.** How long to buffer data points before transfering to Elasticsearch. Defaults to `10`.
+  flush_threshold        | **Optional.** How many data points to buffer before forcing a transfer to Elasticsearch.  Defaults to `1024`.
+
+Note: If `flush_threshold` is set too low, this will force the feature to flush all data to Elasticsearch too often.
+Experiment with the setting, if you are processing more than 1024 metrics per second or similar.
+
 ## LiveStatusListener <a id="objecttype-livestatuslistener"></a>
 
 Livestatus API interface available as TCP or UNIX socket. Historical table queries
index 0fa7cf62b07c696a2a841f7c69bd8ee26c6bdc41..70c75098ee746cca8b22f3ff2b751e108dca8845 100644 (file)
@@ -264,6 +264,74 @@ expects the InfluxDB daemon to listen at `127.0.0.1` on port `8086`.
 
 More configuration details can be found [here](09-object-types.md#objecttype-influxdbwriter).
 
+### Elastic Stack Integration <a id="elastic-stack-integration"></a>
+
+[Icingabeat](https://github.com/icinga/icingabeat) is an Elastic Beat that fetches data
+from the Icinga 2 API and sends it either directly to [Elasticsearch](https://www.elastic.co/products/elasticsearch)
+or [Logstash](https://www.elastic.co/products/logstash).
+
+More integrations:
+
+* [Logstash output](https://github.com/Icinga/logstash-output-icinga) for the Icinga 2 API.
+* [Logstash Grok Pattern](https://github.com/Icinga/logstash-grok-pattern) for Icinga 2 logs.
+
+#### Elastic Writer <a id="elastic-writer"></a>
+
+This feature forwards check results, state changes and notification events
+to an [Elasticsearch](https://www.elastic.co/products/elasticsearch) installation over its HTTP API.
+
+The check results include parsed performance data metrics if enabled.
+
+> **Note**
+>
+> Elasticsearch 5.x+ is required.
+
+Enable the feature and restart Icinga 2.
+
+    # icinga2 feature enable elastic
+
+The default configuration expects an Elasticsearch instance running on `localhost` on port `9200
+ and writes to an index called `icinga2`.
+
+More configuration details can be found [here](09-object-types.md#objecttype-elasticwriter).
+
+#### Current Elasticsearch Schema <a id="elastic-writer-schema"></a>
+
+The following event types are written to Elasticsearch:
+
+* icinga2.event.checkresult
+* icinga2.event.statechange
+* icinga2.event.notification
+
+Performance data metrics must be explicitly enabled with the `enable_send_perfdata`
+attribute.
+
+Metric values are stored like this:
+
+    check_result.perfdata.<perfdata-label>.value
+
+The following characters are escaped in perfdata labels:
+
+  Character    | Escaped character
+  --------------|--------------------------
+  whitespace   | _
+  \            | _
+  /            | _
+  ::           | .
+
+Note that perfdata labels may contain dots (`.`) allowing to
+add more subsequent levels inside the tree.
+`::` adds support for [multi performance labels](http://my-plugin.de/wiki/projects/check_multi/configuration/performance)
+and is therefore replaced by `.`.
+
+Icinga 2 automatically adds the following threshold metrics
+if existing:
+
+    check_result.perfdata.<perfdata-label>.min
+    check_result.perfdata.<perfdata-label>.max
+    check_result.perfdata.<perfdata-label>.warn
+    check_result.perfdata.<perfdata-label>.crit
+
 ### Graylog Integration <a id="graylog-integration"></a>
 
 #### GELF Writer <a id="gelfwriter"></a>
@@ -288,14 +356,6 @@ Currently these events are processed:
 * State changes
 * Notifications
 
-### Elastic Stack Integration <a id="elastic-stack-integration"></a>
-
-[Icingabeat](https://github.com/icinga/icingabeat) is an Elastic Beat that fetches data
-from the Icinga 2 API and sends it either directly to Elasticsearch or Logstash.
-
-More integrations in development:
-* [Logstash output](https://github.com/Icinga/logstash-output-icinga) for the Icinga 2 API.
-* [Logstash Grok Pattern](https://github.com/Icinga/logstash-grok-pattern) for Icinga 2 logs.
 
 ### OpenTSDB Writer <a id="opentsdb-writer"></a>
 
diff --git a/etc/icinga2/features-available/elastic.conf b/etc/icinga2/features-available/elastic.conf
new file mode 100644 (file)
index 0000000..06637e3
--- /dev/null
@@ -0,0 +1,10 @@
+library "perfdata"
+
+object ElasticWriter "elastic" {
+  //host = "127.0.0.1"
+  //port = 9200
+  //index = "icinga2"
+  //send_enable_perfdata = false
+  //flush_threshold = 1024
+  //flush_interval = 10s
+}
index 08920dd3ddb6256669bf35f0bb0719db9cfce81e..02f2daefe08244b4d0edb2b615cc33a7dd2b7295 100644 (file)
 mkclass_target(gelfwriter.ti gelfwriter.tcpp gelfwriter.thpp)
 mkclass_target(graphitewriter.ti graphitewriter.tcpp graphitewriter.thpp)
 mkclass_target(influxdbwriter.ti influxdbwriter.tcpp influxdbwriter.thpp)
+mkclass_target(elasticwriter.ti elasticwriter.tcpp elasticwriter.thpp)
 mkclass_target(opentsdbwriter.ti opentsdbwriter.tcpp opentsdbwriter.thpp)
 mkclass_target(perfdatawriter.ti perfdatawriter.tcpp perfdatawriter.thpp)
 
 set(perfdata_SOURCES
-  gelfwriter.cpp gelfwriter.thpp graphitewriter.cpp graphitewriter.thpp influxdbwriter.cpp influxdbwriter.thpp opentsdbwriter.cpp opentsdbwriter.thpp perfdatawriter.cpp perfdatawriter.thpp
+  gelfwriter.cpp gelfwriter.thpp graphitewriter.cpp graphitewriter.thpp influxdbwriter.cpp influxdbwriter.thpp elasticwriter.cpp elasticwriter.thpp opentsdbwriter.cpp opentsdbwriter.thpp perfdatawriter.cpp perfdatawriter.thpp
 )
 
 if(ICINGA2_UNITY_BUILD)
@@ -56,6 +57,11 @@ install_if_not_exists(
   ${CMAKE_INSTALL_SYSCONFDIR}/icinga2/features-available
 )
 
+install_if_not_exists(
+  ${PROJECT_SOURCE_DIR}/etc/icinga2/features-available/elastic.conf
+  ${CMAKE_INSTALL_SYSCONFDIR}/icinga2/features-available
+)
+
 install_if_not_exists(
   ${PROJECT_SOURCE_DIR}/etc/icinga2/features-available/opentsdb.conf
   ${CMAKE_INSTALL_SYSCONFDIR}/icinga2/features-available
diff --git a/lib/perfdata/elasticwriter.cpp b/lib/perfdata/elasticwriter.cpp
new file mode 100644 (file)
index 0000000..667dafb
--- /dev/null
@@ -0,0 +1,535 @@
+/******************************************************************************
+ * Icinga 2                                                                   *
+ * Copyright (C) 2012-2017 Icinga Development Team (https://www.icinga.com/)  *
+ *                                                                            *
+ * This program is free software; you can redistribute it and/or              *
+ * modify it under the terms of the GNU General Public License                *
+ * as published by the Free Software Foundation; either version 2             *
+ * of the License, or (at your option) any later version.                     *
+ *                                                                            *
+ * This program is distributed in the hope that it will be useful,            *
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of             *
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the              *
+ * GNU General Public License for more details.                               *
+ *                                                                            *
+ * You should have received a copy of the GNU General Public License          *
+ * along with this program; if not, write to the Free Software Foundation     *
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA.             *
+ ******************************************************************************/
+
+#include "perfdata/elasticwriter.hpp"
+#include "perfdata/elasticwriter.tcpp"
+#include "remote/url.hpp"
+#include "remote/httprequest.hpp"
+#include "remote/httpresponse.hpp"
+#include "icinga/compatutility.hpp"
+#include "icinga/service.hpp"
+#include "icinga/checkcommand.hpp"
+#include "base/tcpsocket.hpp"
+#include "base/stream.hpp"
+#include "base/json.hpp"
+#include "base/utility.hpp"
+#include "base/networkstream.hpp"
+#include "base/perfdatavalue.hpp"
+#include "base/exception.hpp"
+#include "base/statsfunction.hpp"
+#include <boost/algorithm/string.hpp>
+
+using namespace icinga;
+
+REGISTER_TYPE(ElasticWriter);
+
+REGISTER_STATSFUNCTION(ElasticWriter, &ElasticWriter::StatsFunc);
+
+ElasticWriter::ElasticWriter(void)
+       : m_WorkQueue(10000000, 1)
+{ }
+
+void ElasticWriter::OnConfigLoaded(void)
+{
+       ObjectImpl<ElasticWriter>::OnConfigLoaded();
+
+       m_WorkQueue.SetName("ElasticWriter, " + GetName());
+}
+
+void ElasticWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata)
+{
+       Dictionary::Ptr nodes = new Dictionary();
+
+       for (const ElasticWriter::Ptr& elasticwriter : ConfigType::GetObjectsByType<ElasticWriter>()) {
+               size_t workQueueItems = elasticwriter->m_WorkQueue.GetLength();
+               double workQueueItemRate = elasticwriter->m_WorkQueue.GetTaskCount(60) / 60.0;
+
+               Dictionary::Ptr stats = new Dictionary();
+               stats->Set("work_queue_items", workQueueItems);
+               stats->Set("work_queue_item_rate", workQueueItemRate);
+
+               nodes->Set(elasticwriter->GetName(), stats);
+
+               perfdata->Add(new PerfdataValue("elasticwriter_" + elasticwriter->GetName() + "_work_queue_items", workQueueItems));
+               perfdata->Add(new PerfdataValue("elasticwriter_" + elasticwriter->GetName() + "_work_queue_item_rate", workQueueItemRate));
+       }
+
+       status->Set("elasticwriter", nodes);
+}
+
+void ElasticWriter::Start(bool runtimeCreated)
+{
+       ObjectImpl<ElasticWriter>::Start(runtimeCreated);
+
+       m_EventPrefix = "icinga2.event.";
+
+       Log(LogInformation, "ElasticWriter")
+           << "'" << GetName() << "' started.";
+
+       m_WorkQueue.SetExceptionCallback(boost::bind(&ElasticWriter::ExceptionHandler, this, _1));
+
+       /* Setup timer for periodically flushing m_DataBuffer */
+       m_FlushTimer = new Timer();
+       m_FlushTimer->SetInterval(GetFlushInterval());
+       m_FlushTimer->OnTimerExpired.connect(boost::bind(&ElasticWriter::FlushTimeout, this));
+       m_FlushTimer->Start();
+       m_FlushTimer->Reschedule(0);
+
+       /* Register for new metrics. */
+       Checkable::OnNewCheckResult.connect(boost::bind(&ElasticWriter::CheckResultHandler, this, _1, _2));
+       Checkable::OnStateChange.connect(boost::bind(&ElasticWriter::StateChangeHandler, this, _1, _2, _3));
+       Checkable::OnNotificationSentToAllUsers.connect(boost::bind(&ElasticWriter::NotificationSentToAllUsersHandler, this, _1, _2, _3, _4, _5, _6, _7));
+}
+
+void ElasticWriter::Stop(bool runtimeRemoved)
+{
+       Log(LogInformation, "ElasticWriter")
+           << "'" << GetName() << "' stopped.";
+
+       m_WorkQueue.Join();
+
+       ObjectImpl<ElasticWriter>::Stop(runtimeRemoved);
+}
+
+void ElasticWriter::AddCheckResult(const Dictionary::Ptr& fields, const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
+{
+       String prefix = "check_result.";
+
+       fields->Set(prefix + "output", cr->GetOutput());
+       fields->Set(prefix + "check_source", cr->GetCheckSource());
+       fields->Set(prefix + "exit_status", cr->GetExitStatus());
+       fields->Set(prefix + "command", cr->GetCommand());
+       fields->Set(prefix + "state", cr->GetState());
+       fields->Set(prefix + "vars_before", cr->GetVarsBefore());
+       fields->Set(prefix + "vars_after", cr->GetVarsAfter());
+
+       fields->Set(prefix + "execution_start", FormatTimestamp(cr->GetExecutionStart()));
+       fields->Set(prefix + "execution_end", FormatTimestamp(cr->GetExecutionEnd()));
+       fields->Set(prefix + "schedule_start", FormatTimestamp(cr->GetScheduleStart()));
+       fields->Set(prefix + "schedule_end", FormatTimestamp(cr->GetScheduleEnd()));
+
+       /* Add extra calculated field. */
+       fields->Set(prefix + "latency", cr->CalculateLatency());
+       fields->Set(prefix + "execution_time", cr->CalculateExecutionTime());
+
+       if (!GetEnableSendPerfdata())
+               return;
+
+       Array::Ptr perfdata = cr->GetPerformanceData();
+
+       if (perfdata) {
+               ObjectLock olock(perfdata);
+               for (const Value& val : perfdata) {
+                       PerfdataValue::Ptr pdv;
+
+                       if (val.IsObjectType<PerfdataValue>())
+                               pdv = val;
+                       else {
+                               try {
+                                       pdv = PerfdataValue::Parse(val);
+                               } catch (const std::exception&) {
+                                       Log(LogWarning, "ElasticWriter")
+                                           << "Ignoring invalid perfdata value: '" << val << "' for object '"
+                                           << checkable->GetName() << "'.";
+                               }
+                       }
+
+                       String escapedKey = pdv->GetLabel();
+                       boost::replace_all(escapedKey, " ", "_");
+                       boost::replace_all(escapedKey, ".", "_");
+                       boost::replace_all(escapedKey, "\\", "_");
+                       boost::algorithm::replace_all(escapedKey, "::", ".");
+
+                       String perfdataPrefix = prefix + "perfdata." + escapedKey;
+
+                       fields->Set(perfdataPrefix + ".value", pdv->GetValue());
+
+                       if (pdv->GetMin())
+                               fields->Set(perfdataPrefix + ".min", pdv->GetMin());
+                       if (pdv->GetMax())
+                               fields->Set(perfdataPrefix + ".max", pdv->GetMax());
+                       if (pdv->GetWarn())
+                               fields->Set(perfdataPrefix + ".warn", pdv->GetWarn());
+                       if (pdv->GetCrit())
+                               fields->Set(perfdataPrefix + ".crit", pdv->GetCrit());
+               }
+       }
+}
+
+void ElasticWriter::CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
+{
+       m_WorkQueue.Enqueue(boost::bind(&ElasticWriter::InternalCheckResultHandler, this, checkable, cr));
+}
+
+void ElasticWriter::InternalCheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
+{
+       AssertOnWorkQueue();
+
+       CONTEXT("Elasticwriter processing check result for '" + checkable->GetName() + "'");
+
+       if (!IcingaApplication::GetInstance()->GetEnablePerfdata() || !checkable->GetEnablePerfdata())
+               return;
+
+       Host::Ptr host;
+       Service::Ptr service;
+       boost::tie(host, service) = GetHostService(checkable);
+
+       Dictionary::Ptr fields = new Dictionary();
+
+       if (service) {
+               fields->Set("service", service->GetShortName());
+               fields->Set("state", service->GetState());
+               fields->Set("last_state", service->GetLastState());
+               fields->Set("last_hard_state", service->GetLastHardState());
+       } else {
+               fields->Set("state", host->GetState());
+               fields->Set("last_state", host->GetLastState());
+               fields->Set("last_hard_state", host->GetLastHardState());
+       }
+
+       fields->Set("host", host->GetName());
+       fields->Set("state_type", checkable->GetStateType());
+
+       fields->Set("current_check_attempt", checkable->GetCheckAttempt());
+       fields->Set("max_check_attempts", checkable->GetMaxCheckAttempts());
+
+       fields->Set("reachable", checkable->IsReachable());
+
+       CheckCommand::Ptr commandObj = checkable->GetCheckCommand();
+
+       if (commandObj)
+               fields->Set("check_command", commandObj->GetName());
+
+       double ts = Utility::GetTime();
+
+       if (cr) {
+               AddCheckResult(fields, checkable, cr);
+               ts = cr->GetExecutionEnd();
+       }
+
+       Enqueue("checkresult", fields, ts);
+}
+
+void ElasticWriter::StateChangeHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type)
+{
+       m_WorkQueue.Enqueue(boost::bind(&ElasticWriter::StateChangeHandlerInternal, this, checkable, cr, type));
+}
+
+void ElasticWriter::StateChangeHandlerInternal(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type)
+{
+       AssertOnWorkQueue();
+
+       CONTEXT("Elasticwriter processing state change '" + checkable->GetName() + "'");
+
+       Host::Ptr host;
+       Service::Ptr service;
+       tie(host, service) = GetHostService(checkable);
+
+       Dictionary::Ptr fields = new Dictionary();
+
+       fields->Set("current_check_attempt", checkable->GetCheckAttempt());
+       fields->Set("max_check_attempts", checkable->GetMaxCheckAttempts());
+       fields->Set("host", host->GetName());
+
+       if (service) {
+               fields->Set("service", service->GetShortName());
+               fields->Set("state", service->GetState());
+               fields->Set("last_state", service->GetLastState());
+               fields->Set("last_hard_state", service->GetLastHardState());
+       } else {
+               fields->Set("state", host->GetState());
+               fields->Set("last_state", host->GetLastState());
+               fields->Set("last_hard_state", host->GetLastHardState());
+       }
+
+       CheckCommand::Ptr commandObj = checkable->GetCheckCommand();
+
+       if (commandObj)
+               fields->Set("check_command", commandObj->GetName());
+
+       double ts = Utility::GetTime();
+
+       if (cr) {
+               AddCheckResult(fields, checkable, cr);
+               ts = cr->GetExecutionEnd();
+       }
+
+       Enqueue("statechange", fields, ts);
+}
+
+void ElasticWriter::NotificationSentToAllUsersHandler(const Notification::Ptr& notification,
+    const Checkable::Ptr& checkable, const std::set<User::Ptr>& users, NotificationType type,
+    const CheckResult::Ptr& cr, const String& author, const String& text)
+{
+       m_WorkQueue.Enqueue(boost::bind(&ElasticWriter::NotificationSentToAllUsersHandlerInternal, this,
+           notification, checkable, users, type, cr, author, text));
+}
+
+void ElasticWriter::NotificationSentToAllUsersHandlerInternal(const Notification::Ptr& notification,
+    const Checkable::Ptr& checkable, const std::set<User::Ptr>& users, NotificationType type,
+    const CheckResult::Ptr& cr, const String& author, const String& text)
+{
+       AssertOnWorkQueue();
+
+       CONTEXT("Elasticwriter processing notification to all users '" + checkable->GetName() + "'");
+
+       Log(LogDebug, "ElasticWriter")
+           << "Processing notification for '" << checkable->GetName() << "'";
+
+       Host::Ptr host;
+       Service::Ptr service;
+       tie(host, service) = GetHostService(checkable);
+
+       String notificationTypeString = Notification::NotificationTypeToString(type);
+
+       Dictionary::Ptr fields = new Dictionary();
+
+       if (service) {
+               fields->Set("service", service->GetShortName());
+               fields->Set("state", service->GetState());
+               fields->Set("last_state", service->GetLastState());
+               fields->Set("last_hard_state", service->GetLastHardState());
+       } else {
+               fields->Set("state", host->GetState());
+               fields->Set("last_state", host->GetLastState());
+               fields->Set("last_hard_state", host->GetLastHardState());
+       }
+
+       fields->Set("host", host->GetName());
+
+       Array::Ptr userNames = new Array();
+
+       for (const User::Ptr& user : users) {
+               userNames->Add(user->GetName());
+       }
+
+       fields->Set("users", userNames);
+       fields->Set("notification_type", notificationTypeString);
+       fields->Set("author", author);
+       fields->Set("text", text);
+
+       CheckCommand::Ptr commandObj = checkable->GetCheckCommand();
+
+       if (commandObj)
+               fields->Set("check_command", commandObj->GetName());
+
+       double ts = Utility::GetTime();
+
+       if (cr) {
+               AddCheckResult(fields, checkable, cr);
+               ts = cr->GetExecutionEnd();
+       }
+
+       Enqueue("notification", fields, ts);
+}
+
+void ElasticWriter::Enqueue(String type, const Dictionary::Ptr& fields, double ts)
+{
+       /* Atomically buffer the data point. */
+       boost::mutex::scoped_lock lock(m_DataBufferMutex);
+
+       /* Format the timestamps to dynamically select the date datatype inside the index. */
+       fields->Set("@timestamp", FormatTimestamp(ts));
+       fields->Set("timestamp", FormatTimestamp(ts));
+
+       String eventType = m_EventPrefix + type;
+       fields->Set("type", eventType);
+
+       /* Every payload needs a line describing the index above.
+        * We do it this way to avoid problems with a near full queue.
+        */
+
+       String data;
+
+       data += "{ \"index\" : { \"_type\" : \"" + eventType + "\" } }\n";
+       data += JsonEncode(fields);
+
+       m_DataBuffer.push_back(data);
+
+       /* Flush if we've buffered too much to prevent excessive memory use. */
+       if (static_cast<int>(m_DataBuffer.size()) >= GetFlushThreshold()) {
+               Log(LogDebug, "ElasticWriter")
+                   << "Data buffer overflow writing " << m_DataBuffer.size() << " data points";
+               Flush();
+       }
+}
+
+void ElasticWriter::FlushTimeout(void)
+{
+       /* Prevent new data points from being added to the array, there is a
+        * race condition where they could disappear.
+        */
+       boost::mutex::scoped_lock lock(m_DataBufferMutex);
+
+       /* Flush if there are any data available. */
+       if (m_DataBuffer.size() > 0) {
+               Log(LogDebug, "ElasticWriter")
+                   << "Timer expired writing " << m_DataBuffer.size() << " data points";
+               Flush();
+       }
+}
+
+void ElasticWriter::Flush(void)
+{
+       /* Ensure you hold a lock against m_DataBuffer so that things
+        * don't go missing after creating the body and clearing the buffer.
+        */
+       String body = boost::algorithm::join(m_DataBuffer, "\n");
+       m_DataBuffer.clear();
+
+       SendRequest(body);
+}
+
+void ElasticWriter::SendRequest(const String& body)
+{
+       Url::Ptr url = new Url();
+       url->SetScheme("http");
+       url->SetHost(GetHost());
+       url->SetPort(GetPort());
+
+       std::vector<String> path;
+
+       /* Specify the index path. Best practice is a daily rotation.
+        * Example: http://localhost:9200/icinga2-2017.09.11?pretty=1
+        */
+       path.push_back(GetIndex() + "-" + Utility::FormatDateTime("%Y.%m.%d", Utility::GetTime()));
+
+       /* Use the bulk message format. */
+       path.push_back("_bulk");
+
+       url->SetPath(path);
+
+       Stream::Ptr stream = Connect();
+       HttpRequest req(stream);
+
+       /* Specify required headers by Elasticsearch. */
+       req.AddHeader("Accept", "application/json");
+       req.AddHeader("Content-Type", "application/json");
+
+       req.RequestMethod = "POST";
+       req.RequestUrl = url;
+
+#ifdef I2_DEBUG /* I2_DEBUG */
+       Log(LogDebug, "ElasticWriter")
+           << "Sending body: " << body;
+#endif /* I2_DEBUG */
+
+       try {
+               req.WriteBody(body.CStr(), body.GetLength());
+               req.Finish();
+       } catch (const std::exception& ex) {
+               Log(LogWarning, "ElasticWriter")
+                       << "Cannot write to HTTP API on host '" << GetHost() << "' port '" << GetPort() << "'.";
+               throw ex;
+       }
+
+       HttpResponse resp(stream, req);
+       StreamReadContext context;
+
+       try {
+               resp.Parse(context, true);
+       } catch (const std::exception& ex) {
+               Log(LogWarning, "ElasticWriter")
+                       << "Cannot read from HTTP API on host '" << GetHost() << "' port '" << GetPort() << "'.";
+               throw ex;
+       }
+
+       if (resp.StatusCode > 299) {
+               Log(LogWarning, "ElasticWriter")
+                   << "Unexpected response code " << resp.StatusCode;
+
+               /* Finish parsing the headers and body. */
+               while (!resp.Complete)
+                       resp.Parse(context, true);
+
+               String contentType = resp.Headers->Get("content-type");
+               if (contentType != "application/json") {
+                       Log(LogWarning, "ElasticWriter")
+                           << "Unexpected Content-Type: " << contentType;
+                       return;
+               }
+
+               size_t responseSize = resp.GetBodySize();
+               boost::scoped_array<char> buffer(new char[responseSize + 1]);
+               resp.ReadBody(buffer.get(), responseSize);
+               buffer.get()[responseSize] = '\0';
+
+               Dictionary::Ptr jsonResponse;
+               try {
+                       jsonResponse = JsonDecode(buffer.get());
+               } catch (...) {
+                       Log(LogWarning, "ElasticWriter")
+                           << "Unable to parse JSON response:\n" << buffer.get();
+                       return;
+               }
+
+               String error = jsonResponse->Get("error");
+
+               Log(LogCritical, "ElasticWriter")
+                   << "Elasticsearch error message:\n" << error;
+       }
+}
+
+Stream::Ptr ElasticWriter::Connect(void)
+{
+       TcpSocket::Ptr socket = new TcpSocket();
+
+       Log(LogNotice, "ElasticWriter")
+           << "Connecting to Elasticsearch on host '" << GetHost() << "' port '" << GetPort() << "'.";
+
+       try {
+               socket->Connect(GetHost(), GetPort());
+       } catch (const std::exception& ex) {
+               Log(LogWarning, "ElasticWriter")
+                   << "Can't connect to Elasticsearch on host '" << GetHost() << "' port '" << GetPort() << "'.";
+               throw ex;
+       }
+       return new NetworkStream(socket);
+}
+
+void ElasticWriter::AssertOnWorkQueue(void)
+{
+       ASSERT(m_WorkQueue.IsWorkerThread());
+}
+
+void ElasticWriter::ExceptionHandler(boost::exception_ptr exp)
+{
+       Log(LogCritical, "ElasticWriter", "Exception during Elastic operation: Verify that your backend is operational!");
+
+       Log(LogDebug, "ElasticWriter")
+               << "Exception during Elasticsearch operation: " << DiagnosticInformation(exp);
+}
+
+String ElasticWriter::FormatTimestamp(double ts)
+{
+       /* The date format must match the default dynamic date detection
+        * pattern in indexes. This enables applications like Kibana to
+        * detect a qualified timestamp index for time-series data.
+        *
+        * Example: 2017-09-11T10:56:21.463+0200
+        *
+        * References:
+        * https://www.elastic.co/guide/en/elasticsearch/reference/current/dynamic-field-mapping.html#date-detection
+        * https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping-date-format.html
+        * https://www.elastic.co/guide/en/elasticsearch/reference/current/date.html
+        */
+       int milliSeconds = static_cast<int>((ts - static_cast<int>(ts)) * 1000);
+
+       return Utility::FormatDateTime("%Y-%m-%dT%H:%M:%S", ts) + "." + String(milliSeconds) + Utility::FormatDateTime("%z", ts);
+}
diff --git a/lib/perfdata/elasticwriter.hpp b/lib/perfdata/elasticwriter.hpp
new file mode 100644 (file)
index 0000000..ea037b8
--- /dev/null
@@ -0,0 +1,81 @@
+/******************************************************************************
+ * Icinga 2                                                                   *
+ * Copyright (C) 2012-2017 Icinga Development Team (https://www.icinga.com/)  *
+ *                                                                            *
+ * This program is free software; you can redistribute it and/or              *
+ * modify it under the terms of the GNU General Public License                *
+ * as published by the Free Software Foundation; either version 2             *
+ * of the License, or (at your option) any later version.                     *
+ *                                                                            *
+ * This program is distributed in the hope that it will be useful,            *
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of             *
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the              *
+ * GNU General Public License for more details.                               *
+ *                                                                            *
+ * You should have received a copy of the GNU General Public License          *
+ * along with this program; if not, write to the Free Software Foundation     *
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA.             *
+ ******************************************************************************/
+
+#ifndef ELASTICWRITER_H
+#define ELASTICWRITER_H
+
+#include "perfdata/elasticwriter.thpp"
+#include "icinga/service.hpp"
+#include "base/configobject.hpp"
+#include "base/workqueue.hpp"
+#include "base/timer.hpp"
+
+namespace icinga
+{
+
+class ElasticWriter : public ObjectImpl<ElasticWriter>
+{
+public:
+       DECLARE_OBJECT(ElasticWriter);
+       DECLARE_OBJECTNAME(ElasticWriter);
+
+       ElasticWriter(void);
+
+       static void StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata);
+
+       static String FormatTimestamp(double ts);
+
+protected:
+       virtual void OnConfigLoaded(void) override;
+       virtual void Start(bool runtimeCreated) override;
+       virtual void Stop(bool runtimeRemoved) override;
+
+private:
+       String m_EventPrefix;
+       WorkQueue m_WorkQueue;
+       Timer::Ptr m_FlushTimer;
+       std::vector<String> m_DataBuffer;
+       boost::mutex m_DataBufferMutex;
+
+       void AddCheckResult(const Dictionary::Ptr& fields, const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
+
+       void StateChangeHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type);
+       void StateChangeHandlerInternal(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type);
+       void CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
+       void InternalCheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
+       void NotificationSentToAllUsersHandler(const Notification::Ptr& notification,
+           const Checkable::Ptr& checkable, const std::set<User::Ptr>& users, NotificationType type,
+           const CheckResult::Ptr& cr, const String& author, const String& text);
+       void NotificationSentToAllUsersHandlerInternal(const Notification::Ptr& notification,
+           const Checkable::Ptr& checkable, const std::set<User::Ptr>& users, NotificationType type,
+           const CheckResult::Ptr& cr, const String& author, const String& text);
+
+       void Enqueue(String type, const Dictionary::Ptr& fields, double ts);
+
+       Stream::Ptr Connect(void);
+       void AssertOnWorkQueue(void);
+       void ExceptionHandler(boost::exception_ptr exp);
+       void FlushTimeout(void);
+       void Flush(void);
+       void SendRequest(const String& body);
+};
+
+}
+
+#endif /* ELASTICWRITER_H */
diff --git a/lib/perfdata/elasticwriter.ti b/lib/perfdata/elasticwriter.ti
new file mode 100644 (file)
index 0000000..430bb40
--- /dev/null
@@ -0,0 +1,31 @@
+#include "base/configobject.hpp"
+
+library perfdata;
+
+namespace icinga
+{
+
+class ElasticWriter : ConfigObject
+{
+       [config, required] String host {
+               default {{{ return "127.0.0.1"; }}}
+       };
+       [config, required] String port {
+               default {{{ return "9200"; }}}
+       };
+       [config, required] String index {
+               default {{{ return "icinga2"; }}}
+       };
+       [config] bool enable_send_perfdata {
+               default {{{ return false; }}}
+       };
+
+       [config] int flush_interval {
+               default {{{ return 10; }}}
+       };
+       [config] int flush_threshold {
+               default {{{ return 1024; }}}
+       };
+};
+
+}