]> granicus.if.org Git - icinga2/blob - lib/perfdata/elasticsearchwriter.cpp
Merge pull request #7527 from Icinga/bugfix/checkable-command-endpoint-zone
[icinga2] / lib / perfdata / elasticsearchwriter.cpp
1 /* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */
2
3 #include "perfdata/elasticsearchwriter.hpp"
4 #include "perfdata/elasticsearchwriter-ti.cpp"
5 #include "remote/url.hpp"
6 #include "icinga/compatutility.hpp"
7 #include "icinga/service.hpp"
8 #include "icinga/checkcommand.hpp"
9 #include "base/application.hpp"
10 #include "base/defer.hpp"
11 #include "base/io-engine.hpp"
12 #include "base/tcpsocket.hpp"
13 #include "base/stream.hpp"
14 #include "base/base64.hpp"
15 #include "base/json.hpp"
16 #include "base/utility.hpp"
17 #include "base/networkstream.hpp"
18 #include "base/perfdatavalue.hpp"
19 #include "base/exception.hpp"
20 #include "base/statsfunction.hpp"
21 #include <boost/algorithm/string.hpp>
22 #include <boost/asio/ssl/context.hpp>
23 #include <boost/beast/core/flat_buffer.hpp>
24 #include <boost/beast/http/field.hpp>
25 #include <boost/beast/http/message.hpp>
26 #include <boost/beast/http/parser.hpp>
27 #include <boost/beast/http/read.hpp>
28 #include <boost/beast/http/status.hpp>
29 #include <boost/beast/http/string_body.hpp>
30 #include <boost/beast/http/verb.hpp>
31 #include <boost/beast/http/write.hpp>
32 #include <boost/scoped_array.hpp>
33 #include <memory>
34 #include <string>
35 #include <utility>
36
37 using namespace icinga;
38
39 REGISTER_TYPE(ElasticsearchWriter);
40
41 REGISTER_STATSFUNCTION(ElasticsearchWriter, &ElasticsearchWriter::StatsFunc);
42
43 void ElasticsearchWriter::OnConfigLoaded()
44 {
45         ObjectImpl<ElasticsearchWriter>::OnConfigLoaded();
46
47         m_WorkQueue.SetName("ElasticsearchWriter, " + GetName());
48
49         if (!GetEnableHa()) {
50                 Log(LogDebug, "ElasticsearchWriter")
51                         << "HA functionality disabled. Won't pause connection: " << GetName();
52
53                 SetHAMode(HARunEverywhere);
54         } else {
55                 SetHAMode(HARunOnce);
56         }
57 }
58
59 void ElasticsearchWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata)
60 {
61         DictionaryData nodes;
62
63         for (const ElasticsearchWriter::Ptr& elasticsearchwriter : ConfigType::GetObjectsByType<ElasticsearchWriter>()) {
64                 size_t workQueueItems = elasticsearchwriter->m_WorkQueue.GetLength();
65                 double workQueueItemRate = elasticsearchwriter->m_WorkQueue.GetTaskCount(60) / 60.0;
66
67                 nodes.emplace_back(elasticsearchwriter->GetName(), new Dictionary({
68                         { "work_queue_items", workQueueItems },
69                         { "work_queue_item_rate", workQueueItemRate }
70                 }));
71
72                 perfdata->Add(new PerfdataValue("elasticsearchwriter_" + elasticsearchwriter->GetName() + "_work_queue_items", workQueueItems));
73                 perfdata->Add(new PerfdataValue("elasticsearchwriter_" + elasticsearchwriter->GetName() + "_work_queue_item_rate", workQueueItemRate));
74         }
75
76         status->Set("elasticsearchwriter", new Dictionary(std::move(nodes)));
77 }
78
79 void ElasticsearchWriter::Resume()
80 {
81         ObjectImpl<ElasticsearchWriter>::Resume();
82
83         m_EventPrefix = "icinga2.event.";
84
85         Log(LogInformation, "ElasticsearchWriter")
86                 << "'" << GetName() << "' resumed.";
87
88         m_WorkQueue.SetExceptionCallback(std::bind(&ElasticsearchWriter::ExceptionHandler, this, _1));
89
90         /* Setup timer for periodically flushing m_DataBuffer */
91         m_FlushTimer = new Timer();
92         m_FlushTimer->SetInterval(GetFlushInterval());
93         m_FlushTimer->OnTimerExpired.connect(std::bind(&ElasticsearchWriter::FlushTimeout, this));
94         m_FlushTimer->Start();
95         m_FlushTimer->Reschedule(0);
96
97         /* Register for new metrics. */
98         Checkable::OnNewCheckResult.connect(std::bind(&ElasticsearchWriter::CheckResultHandler, this, _1, _2));
99         Checkable::OnStateChange.connect(std::bind(&ElasticsearchWriter::StateChangeHandler, this, _1, _2, _3));
100         Checkable::OnNotificationSentToAllUsers.connect(std::bind(&ElasticsearchWriter::NotificationSentToAllUsersHandler, this, _1, _2, _3, _4, _5, _6, _7));
101 }
102
103 /* Pause is equivalent to Stop, but with HA capabilities to resume at runtime. */
104 void ElasticsearchWriter::Pause()
105 {
106         Flush();
107         m_WorkQueue.Join();
108         Flush();
109
110         Log(LogInformation, "ElasticsearchWriter")
111                 << "'" << GetName() << "' paused.";
112
113         ObjectImpl<ElasticsearchWriter>::Pause();
114 }
115
116 void ElasticsearchWriter::AddCheckResult(const Dictionary::Ptr& fields, const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
117 {
118         String prefix = "check_result.";
119
120         fields->Set(prefix + "output", cr->GetOutput());
121         fields->Set(prefix + "check_source", cr->GetCheckSource());
122         fields->Set(prefix + "exit_status", cr->GetExitStatus());
123         fields->Set(prefix + "command", cr->GetCommand());
124         fields->Set(prefix + "state", cr->GetState());
125         fields->Set(prefix + "vars_before", cr->GetVarsBefore());
126         fields->Set(prefix + "vars_after", cr->GetVarsAfter());
127
128         fields->Set(prefix + "execution_start", FormatTimestamp(cr->GetExecutionStart()));
129         fields->Set(prefix + "execution_end", FormatTimestamp(cr->GetExecutionEnd()));
130         fields->Set(prefix + "schedule_start", FormatTimestamp(cr->GetScheduleStart()));
131         fields->Set(prefix + "schedule_end", FormatTimestamp(cr->GetScheduleEnd()));
132
133         /* Add extra calculated field. */
134         fields->Set(prefix + "latency", cr->CalculateLatency());
135         fields->Set(prefix + "execution_time", cr->CalculateExecutionTime());
136
137         if (!GetEnableSendPerfdata())
138                 return;
139
140         Array::Ptr perfdata = cr->GetPerformanceData();
141
142         CheckCommand::Ptr checkCommand = checkable->GetCheckCommand();
143
144         if (perfdata) {
145                 ObjectLock olock(perfdata);
146                 for (const Value& val : perfdata) {
147                         PerfdataValue::Ptr pdv;
148
149                         if (val.IsObjectType<PerfdataValue>())
150                                 pdv = val;
151                         else {
152                                 try {
153                                         pdv = PerfdataValue::Parse(val);
154                                 } catch (const std::exception&) {
155                                         Log(LogWarning, "ElasticsearchWriter")
156                                                 << "Ignoring invalid perfdata for checkable '"
157                                                 << checkable->GetName() << "' and command '"
158                                                 << checkCommand->GetName() << "' with value: " << val;
159                                         continue;
160                                 }
161                         }
162
163                         String escapedKey = pdv->GetLabel();
164                         boost::replace_all(escapedKey, " ", "_");
165                         boost::replace_all(escapedKey, ".", "_");
166                         boost::replace_all(escapedKey, "\\", "_");
167                         boost::algorithm::replace_all(escapedKey, "::", ".");
168
169                         String perfdataPrefix = prefix + "perfdata." + escapedKey;
170
171                         fields->Set(perfdataPrefix + ".value", pdv->GetValue());
172
173                         if (pdv->GetMin())
174                                 fields->Set(perfdataPrefix + ".min", pdv->GetMin());
175                         if (pdv->GetMax())
176                                 fields->Set(perfdataPrefix + ".max", pdv->GetMax());
177                         if (pdv->GetWarn())
178                                 fields->Set(perfdataPrefix + ".warn", pdv->GetWarn());
179                         if (pdv->GetCrit())
180                                 fields->Set(perfdataPrefix + ".crit", pdv->GetCrit());
181
182                         if (!pdv->GetUnit().IsEmpty())
183                                 fields->Set(perfdataPrefix + ".unit", pdv->GetUnit());
184                 }
185         }
186 }
187
188 void ElasticsearchWriter::CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
189 {
190         if (IsPaused())
191                 return;
192
193         m_WorkQueue.Enqueue(std::bind(&ElasticsearchWriter::InternalCheckResultHandler, this, checkable, cr));
194 }
195
196 void ElasticsearchWriter::InternalCheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
197 {
198         AssertOnWorkQueue();
199
200         CONTEXT("Elasticwriter processing check result for '" + checkable->GetName() + "'");
201
202         if (!IcingaApplication::GetInstance()->GetEnablePerfdata() || !checkable->GetEnablePerfdata())
203                 return;
204
205         Host::Ptr host;
206         Service::Ptr service;
207         tie(host, service) = GetHostService(checkable);
208
209         Dictionary::Ptr fields = new Dictionary();
210
211         if (service) {
212                 fields->Set("service", service->GetShortName());
213                 fields->Set("state", service->GetState());
214                 fields->Set("last_state", service->GetLastState());
215                 fields->Set("last_hard_state", service->GetLastHardState());
216         } else {
217                 fields->Set("state", host->GetState());
218                 fields->Set("last_state", host->GetLastState());
219                 fields->Set("last_hard_state", host->GetLastHardState());
220         }
221
222         fields->Set("host", host->GetName());
223         fields->Set("state_type", checkable->GetStateType());
224
225         fields->Set("current_check_attempt", checkable->GetCheckAttempt());
226         fields->Set("max_check_attempts", checkable->GetMaxCheckAttempts());
227
228         fields->Set("reachable", checkable->IsReachable());
229
230         CheckCommand::Ptr commandObj = checkable->GetCheckCommand();
231
232         if (commandObj)
233                 fields->Set("check_command", commandObj->GetName());
234
235         double ts = Utility::GetTime();
236
237         if (cr) {
238                 AddCheckResult(fields, checkable, cr);
239                 ts = cr->GetExecutionEnd();
240         }
241
242         Enqueue(checkable, "checkresult", fields, ts);
243 }
244
245 void ElasticsearchWriter::StateChangeHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type)
246 {
247         if (IsPaused())
248                 return;
249
250         m_WorkQueue.Enqueue(std::bind(&ElasticsearchWriter::StateChangeHandlerInternal, this, checkable, cr, type));
251 }
252
253 void ElasticsearchWriter::StateChangeHandlerInternal(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type)
254 {
255         AssertOnWorkQueue();
256
257         CONTEXT("Elasticwriter processing state change '" + checkable->GetName() + "'");
258
259         Host::Ptr host;
260         Service::Ptr service;
261         tie(host, service) = GetHostService(checkable);
262
263         Dictionary::Ptr fields = new Dictionary();
264
265         fields->Set("current_check_attempt", checkable->GetCheckAttempt());
266         fields->Set("max_check_attempts", checkable->GetMaxCheckAttempts());
267         fields->Set("host", host->GetName());
268
269         if (service) {
270                 fields->Set("service", service->GetShortName());
271                 fields->Set("state", service->GetState());
272                 fields->Set("last_state", service->GetLastState());
273                 fields->Set("last_hard_state", service->GetLastHardState());
274         } else {
275                 fields->Set("state", host->GetState());
276                 fields->Set("last_state", host->GetLastState());
277                 fields->Set("last_hard_state", host->GetLastHardState());
278         }
279
280         CheckCommand::Ptr commandObj = checkable->GetCheckCommand();
281
282         if (commandObj)
283                 fields->Set("check_command", commandObj->GetName());
284
285         double ts = Utility::GetTime();
286
287         if (cr) {
288                 AddCheckResult(fields, checkable, cr);
289                 ts = cr->GetExecutionEnd();
290         }
291
292         Enqueue(checkable, "statechange", fields, ts);
293 }
294
295 void ElasticsearchWriter::NotificationSentToAllUsersHandler(const Notification::Ptr& notification,
296         const Checkable::Ptr& checkable, const std::set<User::Ptr>& users, NotificationType type,
297         const CheckResult::Ptr& cr, const String& author, const String& text)
298 {
299         if (IsPaused())
300                 return;
301
302         m_WorkQueue.Enqueue(std::bind(&ElasticsearchWriter::NotificationSentToAllUsersHandlerInternal, this,
303                 notification, checkable, users, type, cr, author, text));
304 }
305
306 void ElasticsearchWriter::NotificationSentToAllUsersHandlerInternal(const Notification::Ptr& notification,
307         const Checkable::Ptr& checkable, const std::set<User::Ptr>& users, NotificationType type,
308         const CheckResult::Ptr& cr, const String& author, const String& text)
309 {
310         AssertOnWorkQueue();
311
312         CONTEXT("Elasticwriter processing notification to all users '" + checkable->GetName() + "'");
313
314         Log(LogDebug, "ElasticsearchWriter")
315                 << "Processing notification for '" << checkable->GetName() << "'";
316
317         Host::Ptr host;
318         Service::Ptr service;
319         tie(host, service) = GetHostService(checkable);
320
321         String notificationTypeString = Notification::NotificationTypeToStringCompat(type); //TODO: Change that to our own types.
322
323         Dictionary::Ptr fields = new Dictionary();
324
325         if (service) {
326                 fields->Set("service", service->GetShortName());
327                 fields->Set("state", service->GetState());
328                 fields->Set("last_state", service->GetLastState());
329                 fields->Set("last_hard_state", service->GetLastHardState());
330         } else {
331                 fields->Set("state", host->GetState());
332                 fields->Set("last_state", host->GetLastState());
333                 fields->Set("last_hard_state", host->GetLastHardState());
334         }
335
336         fields->Set("host", host->GetName());
337
338         ArrayData userNames;
339
340         for (const User::Ptr& user : users) {
341                 userNames.push_back(user->GetName());
342         }
343
344         fields->Set("users", new Array(std::move(userNames)));
345         fields->Set("notification_type", notificationTypeString);
346         fields->Set("author", author);
347         fields->Set("text", text);
348
349         CheckCommand::Ptr commandObj = checkable->GetCheckCommand();
350
351         if (commandObj)
352                 fields->Set("check_command", commandObj->GetName());
353
354         double ts = Utility::GetTime();
355
356         if (cr) {
357                 AddCheckResult(fields, checkable, cr);
358                 ts = cr->GetExecutionEnd();
359         }
360
361         Enqueue(checkable, "notification", fields, ts);
362 }
363
364 void ElasticsearchWriter::Enqueue(const Checkable::Ptr& checkable, const String& type,
365         const Dictionary::Ptr& fields, double ts)
366 {
367         /* Atomically buffer the data point. */
368         boost::mutex::scoped_lock lock(m_DataBufferMutex);
369
370         /* Format the timestamps to dynamically select the date datatype inside the index. */
371         fields->Set("@timestamp", FormatTimestamp(ts));
372         fields->Set("timestamp", FormatTimestamp(ts));
373
374         String eventType = m_EventPrefix + type;
375         fields->Set("type", eventType);
376
377         /* Every payload needs a line describing the index.
378          * We do it this way to avoid problems with a near full queue.
379          */
380         String indexBody = "{\"index\": {} }\n";
381         String fieldsBody = JsonEncode(fields);
382
383         Log(LogDebug, "ElasticsearchWriter")
384                 << "Checkable '" << checkable->GetName() << "' adds to metric list: '" << fieldsBody << "'.";
385
386         m_DataBuffer.emplace_back(indexBody + fieldsBody);
387
388         /* Flush if we've buffered too much to prevent excessive memory use. */
389         if (static_cast<int>(m_DataBuffer.size()) >= GetFlushThreshold()) {
390                 Log(LogDebug, "ElasticsearchWriter")
391                         << "Data buffer overflow writing " << m_DataBuffer.size() << " data points";
392                 Flush();
393         }
394 }
395
396 void ElasticsearchWriter::FlushTimeout()
397 {
398         /* Prevent new data points from being added to the array, there is a
399          * race condition where they could disappear.
400          */
401         boost::mutex::scoped_lock lock(m_DataBufferMutex);
402
403         /* Flush if there are any data available. */
404         if (m_DataBuffer.size() > 0) {
405                 Log(LogDebug, "ElasticsearchWriter")
406                         << "Timer expired writing " << m_DataBuffer.size() << " data points";
407                 Flush();
408         }
409 }
410
411 void ElasticsearchWriter::Flush()
412 {
413         /* Flush can be called from 1) Timeout 2) Threshold 3) on shutdown/reload. */
414         if (m_DataBuffer.empty())
415                 return;
416
417         /* Ensure you hold a lock against m_DataBuffer so that things
418          * don't go missing after creating the body and clearing the buffer.
419          */
420         String body = boost::algorithm::join(m_DataBuffer, "\n");
421         m_DataBuffer.clear();
422
423         /* Elasticsearch 6.x requires a new line. This is compatible to 5.x.
424          * Tested with 6.0.0 and 5.6.4.
425          */
426         body += "\n";
427
428         SendRequest(body);
429 }
430
431 void ElasticsearchWriter::SendRequest(const String& body)
432 {
433         namespace beast = boost::beast;
434         namespace http = beast::http;
435
436         Url::Ptr url = new Url();
437
438         url->SetScheme(GetEnableTls() ? "https" : "http");
439         url->SetHost(GetHost());
440         url->SetPort(GetPort());
441
442         std::vector<String> path;
443
444         /* Specify the index path. Best practice is a daily rotation.
445          * Example: http://localhost:9200/icinga2-2017.09.11?pretty=1
446          */
447         path.emplace_back(GetIndex() + "-" + Utility::FormatDateTime("%Y.%m.%d", Utility::GetTime()));
448
449         /* ES 6 removes multiple _type mappings: https://www.elastic.co/guide/en/elasticsearch/reference/6.x/removal-of-types.html
450          * Best practice is to statically define 'doc', as ES 5.X does not allow types starting with '_'.
451          */
452         path.emplace_back("doc");
453
454         /* Use the bulk message format. */
455         path.emplace_back("_bulk");
456
457         url->SetPath(path);
458
459         OptionalTlsStream stream;
460
461         try {
462                 stream = Connect();
463         } catch (const std::exception& ex) {
464                 Log(LogWarning, "ElasticsearchWriter")
465                         << "Flush failed, cannot connect to Elasticsearch: " << DiagnosticInformation(ex, false);
466                 return;
467         }
468
469         Defer s ([&stream]() {
470                 if (stream.first) {
471                         stream.first->next_layer().shutdown();
472                 }
473         });
474
475         http::request<http::string_body> request (http::verb::post, std::string(url->Format(true)), 10);
476
477         request.set(http::field::user_agent, "Icinga/" + Application::GetAppVersion());
478         request.set(http::field::host, url->GetHost() + ":" + url->GetPort());
479
480         /* Specify required headers by Elasticsearch. */
481         request.set(http::field::accept, "application/json");
482
483         /* Use application/x-ndjson for bulk streams. While ES
484          * is able to handle application/json, the newline separator
485          * causes problems with Logstash (#6609).
486          */
487         request.set(http::field::content_type, "application/x-ndjson");
488
489         /* Send authentication if configured. */
490         String username = GetUsername();
491         String password = GetPassword();
492
493         if (!username.IsEmpty() && !password.IsEmpty())
494                 request.set(http::field::authorization, "Basic " + Base64::Encode(username + ":" + password));
495
496         request.body() = body;
497         request.set(http::field::content_length, request.body().size());
498
499         /* Don't log the request body to debug log, this is already done above. */
500         Log(LogDebug, "ElasticsearchWriter")
501                 << "Sending " << request.method_string() << " request" << ((!username.IsEmpty() && !password.IsEmpty()) ? " with basic auth" : "" )
502                 << " to '" << url->Format() << "'.";
503
504         try {
505                 if (stream.first) {
506                         http::write(*stream.first, request);
507                         stream.first->flush();
508                 } else {
509                         http::write(*stream.second, request);
510                         stream.second->flush();
511                 }
512         } catch (const std::exception&) {
513                 Log(LogWarning, "ElasticsearchWriter")
514                         << "Cannot write to HTTP API on host '" << GetHost() << "' port '" << GetPort() << "'.";
515                 throw;
516         }
517
518         http::parser<false, http::string_body> parser;
519         beast::flat_buffer buf;
520
521         try {
522                 if (stream.first) {
523                         http::read(*stream.first, buf, parser);
524                 } else {
525                         http::read(*stream.second, buf, parser);
526                 }
527         } catch (const std::exception& ex) {
528                 Log(LogWarning, "ElasticsearchWriter")
529                         << "Failed to parse HTTP response from host '" << GetHost() << "' port '" << GetPort() << "': " << DiagnosticInformation(ex, false);
530                 throw;
531         }
532
533         auto& response (parser.get());
534
535         if (response.result_int() > 299) {
536                 if (response.result() == http::status::unauthorized) {
537                         /* More verbose error logging with Elasticsearch is hidden behind a proxy. */
538                         if (!username.IsEmpty() && !password.IsEmpty()) {
539                                 Log(LogCritical, "ElasticsearchWriter")
540                                         << "401 Unauthorized. Please ensure that the user '" << username
541                                         << "' is able to authenticate against the HTTP API/Proxy.";
542                         } else {
543                                 Log(LogCritical, "ElasticsearchWriter")
544                                         << "401 Unauthorized. The HTTP API requires authentication but no username/password has been configured.";
545                         }
546
547                         return;
548                 }
549
550                 std::ostringstream msgbuf;
551                 msgbuf << "Unexpected response code " << response.result_int() << " from URL '" << url->Format() << "'";
552
553                 auto& contentType (response[http::field::content_type]);
554
555                 if (contentType != "application/json" && contentType != "application/json; charset=utf-8") {
556                         msgbuf << "; Unexpected Content-Type: '" << contentType << "'";
557                 }
558
559                 auto& body (response.body());
560
561 #ifdef I2_DEBUG
562                 msgbuf << "; Response body: '" << body << "'";
563 #endif /* I2_DEBUG */
564
565                 Dictionary::Ptr jsonResponse;
566
567                 try {
568                         jsonResponse = JsonDecode(body);
569                 } catch (...) {
570                         Log(LogWarning, "ElasticsearchWriter")
571                                 << "Unable to parse JSON response:\n" << body;
572                         return;
573                 }
574
575                 String error = jsonResponse->Get("error");
576
577                 Log(LogCritical, "ElasticsearchWriter")
578                         << "Error: '" << error << "'. " << msgbuf.str();
579         }
580 }
581
582 OptionalTlsStream ElasticsearchWriter::Connect()
583 {
584         Log(LogNotice, "ElasticsearchWriter")
585                 << "Connecting to Elasticsearch on host '" << GetHost() << "' port '" << GetPort() << "'.";
586
587         OptionalTlsStream stream;
588         bool tls = GetEnableTls();
589
590         if (tls) {
591                 std::shared_ptr<boost::asio::ssl::context> sslContext;
592
593                 try {
594                         sslContext = MakeAsioSslContext(GetCertPath(), GetKeyPath(), GetCaPath());
595                 } catch (const std::exception&) {
596                         Log(LogWarning, "ElasticsearchWriter")
597                                 << "Unable to create SSL context.";
598                         throw;
599                 }
600
601                 stream.first = std::make_shared<AsioTlsStream>(IoEngine::Get().GetIoContext(), *sslContext, GetHost());
602         } else {
603                 stream.second = std::make_shared<AsioTcpStream>(IoEngine::Get().GetIoContext());
604         }
605
606         try {
607                 icinga::Connect(tls ? stream.first->lowest_layer() : stream.second->lowest_layer(), GetHost(), GetPort());
608         } catch (const std::exception&) {
609                 Log(LogWarning, "ElasticsearchWriter")
610                         << "Can't connect to Elasticsearch on host '" << GetHost() << "' port '" << GetPort() << "'.";
611                 throw;
612         }
613
614         if (tls) {
615                 auto& tlsStream (stream.first->next_layer());
616
617                 try {
618                         tlsStream.handshake(tlsStream.client);
619                 } catch (const std::exception&) {
620                         Log(LogWarning, "ElasticsearchWriter")
621                                 << "TLS handshake with host '" << GetHost() << "' on port " << GetPort() << " failed.";
622                         throw;
623                 }
624         }
625
626         return std::move(stream);
627 }
628
629 void ElasticsearchWriter::AssertOnWorkQueue()
630 {
631         ASSERT(m_WorkQueue.IsWorkerThread());
632 }
633
634 void ElasticsearchWriter::ExceptionHandler(boost::exception_ptr exp)
635 {
636         Log(LogCritical, "ElasticsearchWriter", "Exception during Elastic operation: Verify that your backend is operational!");
637
638         Log(LogDebug, "ElasticsearchWriter")
639                 << "Exception during Elasticsearch operation: " << DiagnosticInformation(std::move(exp));
640 }
641
642 String ElasticsearchWriter::FormatTimestamp(double ts)
643 {
644         /* The date format must match the default dynamic date detection
645          * pattern in indexes. This enables applications like Kibana to
646          * detect a qualified timestamp index for time-series data.
647          *
648          * Example: 2017-09-11T10:56:21.463+0200
649          *
650          * References:
651          * https://www.elastic.co/guide/en/elasticsearch/reference/current/dynamic-field-mapping.html#date-detection
652          * https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping-date-format.html
653          * https://www.elastic.co/guide/en/elasticsearch/reference/current/date.html
654          */
655         auto milliSeconds = static_cast<int>((ts - static_cast<int>(ts)) * 1000);
656
657         return Utility::FormatDateTime("%Y-%m-%dT%H:%M:%S", ts) + "." + Convert::ToString(milliSeconds) + Utility::FormatDateTime("%z", ts);
658 }