]> granicus.if.org Git - icinga2/blob - lib/perfdata/elasticsearchwriter.cpp
Merge pull request #7124 from Icinga/bugfix/namespace-thread-safe
[icinga2] / lib / perfdata / elasticsearchwriter.cpp
1 /* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */
2
3 #include "perfdata/elasticsearchwriter.hpp"
4 #include "perfdata/elasticsearchwriter-ti.cpp"
5 #include "remote/url.hpp"
6 #include "remote/httprequest.hpp"
7 #include "remote/httpresponse.hpp"
8 #include "icinga/compatutility.hpp"
9 #include "icinga/service.hpp"
10 #include "icinga/checkcommand.hpp"
11 #include "base/application.hpp"
12 #include "base/defer.hpp"
13 #include "base/io-engine.hpp"
14 #include "base/tcpsocket.hpp"
15 #include "base/stream.hpp"
16 #include "base/base64.hpp"
17 #include "base/json.hpp"
18 #include "base/utility.hpp"
19 #include "base/networkstream.hpp"
20 #include "base/perfdatavalue.hpp"
21 #include "base/exception.hpp"
22 #include "base/statsfunction.hpp"
23 #include <boost/algorithm/string.hpp>
24 #include <boost/asio/ssl/context.hpp>
25 #include <boost/beast/core/flat_buffer.hpp>
26 #include <boost/beast/http/field.hpp>
27 #include <boost/beast/http/message.hpp>
28 #include <boost/beast/http/parser.hpp>
29 #include <boost/beast/http/read.hpp>
30 #include <boost/beast/http/status.hpp>
31 #include <boost/beast/http/string_body.hpp>
32 #include <boost/beast/http/verb.hpp>
33 #include <boost/beast/http/write.hpp>
34 #include <boost/scoped_array.hpp>
35 #include <memory>
36 #include <string>
37 #include <utility>
38
39 using namespace icinga;
40
41 REGISTER_TYPE(ElasticsearchWriter);
42
43 REGISTER_STATSFUNCTION(ElasticsearchWriter, &ElasticsearchWriter::StatsFunc);
44
45 void ElasticsearchWriter::OnConfigLoaded()
46 {
47         ObjectImpl<ElasticsearchWriter>::OnConfigLoaded();
48
49         m_WorkQueue.SetName("ElasticsearchWriter, " + GetName());
50
51         if (!GetEnableHa()) {
52                 Log(LogDebug, "ElasticsearchWriter")
53                         << "HA functionality disabled. Won't pause connection: " << GetName();
54
55                 SetHAMode(HARunEverywhere);
56         } else {
57                 SetHAMode(HARunOnce);
58         }
59 }
60
61 void ElasticsearchWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata)
62 {
63         DictionaryData nodes;
64
65         for (const ElasticsearchWriter::Ptr& elasticsearchwriter : ConfigType::GetObjectsByType<ElasticsearchWriter>()) {
66                 size_t workQueueItems = elasticsearchwriter->m_WorkQueue.GetLength();
67                 double workQueueItemRate = elasticsearchwriter->m_WorkQueue.GetTaskCount(60) / 60.0;
68
69                 nodes.emplace_back(elasticsearchwriter->GetName(), new Dictionary({
70                         { "work_queue_items", workQueueItems },
71                         { "work_queue_item_rate", workQueueItemRate }
72                 }));
73
74                 perfdata->Add(new PerfdataValue("elasticsearchwriter_" + elasticsearchwriter->GetName() + "_work_queue_items", workQueueItems));
75                 perfdata->Add(new PerfdataValue("elasticsearchwriter_" + elasticsearchwriter->GetName() + "_work_queue_item_rate", workQueueItemRate));
76         }
77
78         status->Set("elasticsearchwriter", new Dictionary(std::move(nodes)));
79 }
80
81 void ElasticsearchWriter::Resume()
82 {
83         ObjectImpl<ElasticsearchWriter>::Resume();
84
85         m_EventPrefix = "icinga2.event.";
86
87         Log(LogInformation, "ElasticsearchWriter")
88                 << "'" << GetName() << "' resumed.";
89
90         m_WorkQueue.SetExceptionCallback(std::bind(&ElasticsearchWriter::ExceptionHandler, this, _1));
91
92         /* Setup timer for periodically flushing m_DataBuffer */
93         m_FlushTimer = new Timer();
94         m_FlushTimer->SetInterval(GetFlushInterval());
95         m_FlushTimer->OnTimerExpired.connect(std::bind(&ElasticsearchWriter::FlushTimeout, this));
96         m_FlushTimer->Start();
97         m_FlushTimer->Reschedule(0);
98
99         /* Register for new metrics. */
100         Checkable::OnNewCheckResult.connect(std::bind(&ElasticsearchWriter::CheckResultHandler, this, _1, _2));
101         Checkable::OnStateChange.connect(std::bind(&ElasticsearchWriter::StateChangeHandler, this, _1, _2, _3));
102         Checkable::OnNotificationSentToAllUsers.connect(std::bind(&ElasticsearchWriter::NotificationSentToAllUsersHandler, this, _1, _2, _3, _4, _5, _6, _7));
103 }
104
105 /* Pause is equivalent to Stop, but with HA capabilities to resume at runtime. */
106 void ElasticsearchWriter::Pause()
107 {
108         Flush();
109         m_WorkQueue.Join();
110         Flush();
111
112         Log(LogInformation, "ElasticsearchWriter")
113                 << "'" << GetName() << "' paused.";
114
115         ObjectImpl<ElasticsearchWriter>::Pause();
116 }
117
118 void ElasticsearchWriter::AddCheckResult(const Dictionary::Ptr& fields, const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
119 {
120         String prefix = "check_result.";
121
122         fields->Set(prefix + "output", cr->GetOutput());
123         fields->Set(prefix + "check_source", cr->GetCheckSource());
124         fields->Set(prefix + "exit_status", cr->GetExitStatus());
125         fields->Set(prefix + "command", cr->GetCommand());
126         fields->Set(prefix + "state", cr->GetState());
127         fields->Set(prefix + "vars_before", cr->GetVarsBefore());
128         fields->Set(prefix + "vars_after", cr->GetVarsAfter());
129
130         fields->Set(prefix + "execution_start", FormatTimestamp(cr->GetExecutionStart()));
131         fields->Set(prefix + "execution_end", FormatTimestamp(cr->GetExecutionEnd()));
132         fields->Set(prefix + "schedule_start", FormatTimestamp(cr->GetScheduleStart()));
133         fields->Set(prefix + "schedule_end", FormatTimestamp(cr->GetScheduleEnd()));
134
135         /* Add extra calculated field. */
136         fields->Set(prefix + "latency", cr->CalculateLatency());
137         fields->Set(prefix + "execution_time", cr->CalculateExecutionTime());
138
139         if (!GetEnableSendPerfdata())
140                 return;
141
142         Array::Ptr perfdata = cr->GetPerformanceData();
143
144         CheckCommand::Ptr checkCommand = checkable->GetCheckCommand();
145
146         if (perfdata) {
147                 ObjectLock olock(perfdata);
148                 for (const Value& val : perfdata) {
149                         PerfdataValue::Ptr pdv;
150
151                         if (val.IsObjectType<PerfdataValue>())
152                                 pdv = val;
153                         else {
154                                 try {
155                                         pdv = PerfdataValue::Parse(val);
156                                 } catch (const std::exception&) {
157                                         Log(LogWarning, "ElasticsearchWriter")
158                                                 << "Ignoring invalid perfdata for checkable '"
159                                                 << checkable->GetName() << "' and command '"
160                                                 << checkCommand->GetName() << "' with value: " << val;
161                                         continue;
162                                 }
163                         }
164
165                         String escapedKey = pdv->GetLabel();
166                         boost::replace_all(escapedKey, " ", "_");
167                         boost::replace_all(escapedKey, ".", "_");
168                         boost::replace_all(escapedKey, "\\", "_");
169                         boost::algorithm::replace_all(escapedKey, "::", ".");
170
171                         String perfdataPrefix = prefix + "perfdata." + escapedKey;
172
173                         fields->Set(perfdataPrefix + ".value", pdv->GetValue());
174
175                         if (pdv->GetMin())
176                                 fields->Set(perfdataPrefix + ".min", pdv->GetMin());
177                         if (pdv->GetMax())
178                                 fields->Set(perfdataPrefix + ".max", pdv->GetMax());
179                         if (pdv->GetWarn())
180                                 fields->Set(perfdataPrefix + ".warn", pdv->GetWarn());
181                         if (pdv->GetCrit())
182                                 fields->Set(perfdataPrefix + ".crit", pdv->GetCrit());
183
184                         if (!pdv->GetUnit().IsEmpty())
185                                 fields->Set(perfdataPrefix + ".unit", pdv->GetUnit());
186                 }
187         }
188 }
189
190 void ElasticsearchWriter::CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
191 {
192         if (IsPaused())
193                 return;
194
195         m_WorkQueue.Enqueue(std::bind(&ElasticsearchWriter::InternalCheckResultHandler, this, checkable, cr));
196 }
197
198 void ElasticsearchWriter::InternalCheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
199 {
200         AssertOnWorkQueue();
201
202         CONTEXT("Elasticwriter processing check result for '" + checkable->GetName() + "'");
203
204         if (!IcingaApplication::GetInstance()->GetEnablePerfdata() || !checkable->GetEnablePerfdata())
205                 return;
206
207         Host::Ptr host;
208         Service::Ptr service;
209         tie(host, service) = GetHostService(checkable);
210
211         Dictionary::Ptr fields = new Dictionary();
212
213         if (service) {
214                 fields->Set("service", service->GetShortName());
215                 fields->Set("state", service->GetState());
216                 fields->Set("last_state", service->GetLastState());
217                 fields->Set("last_hard_state", service->GetLastHardState());
218         } else {
219                 fields->Set("state", host->GetState());
220                 fields->Set("last_state", host->GetLastState());
221                 fields->Set("last_hard_state", host->GetLastHardState());
222         }
223
224         fields->Set("host", host->GetName());
225         fields->Set("state_type", checkable->GetStateType());
226
227         fields->Set("current_check_attempt", checkable->GetCheckAttempt());
228         fields->Set("max_check_attempts", checkable->GetMaxCheckAttempts());
229
230         fields->Set("reachable", checkable->IsReachable());
231
232         CheckCommand::Ptr commandObj = checkable->GetCheckCommand();
233
234         if (commandObj)
235                 fields->Set("check_command", commandObj->GetName());
236
237         double ts = Utility::GetTime();
238
239         if (cr) {
240                 AddCheckResult(fields, checkable, cr);
241                 ts = cr->GetExecutionEnd();
242         }
243
244         Enqueue(checkable, "checkresult", fields, ts);
245 }
246
247 void ElasticsearchWriter::StateChangeHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type)
248 {
249         if (IsPaused())
250                 return;
251
252         m_WorkQueue.Enqueue(std::bind(&ElasticsearchWriter::StateChangeHandlerInternal, this, checkable, cr, type));
253 }
254
255 void ElasticsearchWriter::StateChangeHandlerInternal(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type)
256 {
257         AssertOnWorkQueue();
258
259         CONTEXT("Elasticwriter processing state change '" + checkable->GetName() + "'");
260
261         Host::Ptr host;
262         Service::Ptr service;
263         tie(host, service) = GetHostService(checkable);
264
265         Dictionary::Ptr fields = new Dictionary();
266
267         fields->Set("current_check_attempt", checkable->GetCheckAttempt());
268         fields->Set("max_check_attempts", checkable->GetMaxCheckAttempts());
269         fields->Set("host", host->GetName());
270
271         if (service) {
272                 fields->Set("service", service->GetShortName());
273                 fields->Set("state", service->GetState());
274                 fields->Set("last_state", service->GetLastState());
275                 fields->Set("last_hard_state", service->GetLastHardState());
276         } else {
277                 fields->Set("state", host->GetState());
278                 fields->Set("last_state", host->GetLastState());
279                 fields->Set("last_hard_state", host->GetLastHardState());
280         }
281
282         CheckCommand::Ptr commandObj = checkable->GetCheckCommand();
283
284         if (commandObj)
285                 fields->Set("check_command", commandObj->GetName());
286
287         double ts = Utility::GetTime();
288
289         if (cr) {
290                 AddCheckResult(fields, checkable, cr);
291                 ts = cr->GetExecutionEnd();
292         }
293
294         Enqueue(checkable, "statechange", fields, ts);
295 }
296
297 void ElasticsearchWriter::NotificationSentToAllUsersHandler(const Notification::Ptr& notification,
298         const Checkable::Ptr& checkable, const std::set<User::Ptr>& users, NotificationType type,
299         const CheckResult::Ptr& cr, const String& author, const String& text)
300 {
301         if (IsPaused())
302                 return;
303
304         m_WorkQueue.Enqueue(std::bind(&ElasticsearchWriter::NotificationSentToAllUsersHandlerInternal, this,
305                 notification, checkable, users, type, cr, author, text));
306 }
307
308 void ElasticsearchWriter::NotificationSentToAllUsersHandlerInternal(const Notification::Ptr& notification,
309         const Checkable::Ptr& checkable, const std::set<User::Ptr>& users, NotificationType type,
310         const CheckResult::Ptr& cr, const String& author, const String& text)
311 {
312         AssertOnWorkQueue();
313
314         CONTEXT("Elasticwriter processing notification to all users '" + checkable->GetName() + "'");
315
316         Log(LogDebug, "ElasticsearchWriter")
317                 << "Processing notification for '" << checkable->GetName() << "'";
318
319         Host::Ptr host;
320         Service::Ptr service;
321         tie(host, service) = GetHostService(checkable);
322
323         String notificationTypeString = Notification::NotificationTypeToString(type);
324
325         Dictionary::Ptr fields = new Dictionary();
326
327         if (service) {
328                 fields->Set("service", service->GetShortName());
329                 fields->Set("state", service->GetState());
330                 fields->Set("last_state", service->GetLastState());
331                 fields->Set("last_hard_state", service->GetLastHardState());
332         } else {
333                 fields->Set("state", host->GetState());
334                 fields->Set("last_state", host->GetLastState());
335                 fields->Set("last_hard_state", host->GetLastHardState());
336         }
337
338         fields->Set("host", host->GetName());
339
340         ArrayData userNames;
341
342         for (const User::Ptr& user : users) {
343                 userNames.push_back(user->GetName());
344         }
345
346         fields->Set("users", new Array(std::move(userNames)));
347         fields->Set("notification_type", notificationTypeString);
348         fields->Set("author", author);
349         fields->Set("text", text);
350
351         CheckCommand::Ptr commandObj = checkable->GetCheckCommand();
352
353         if (commandObj)
354                 fields->Set("check_command", commandObj->GetName());
355
356         double ts = Utility::GetTime();
357
358         if (cr) {
359                 AddCheckResult(fields, checkable, cr);
360                 ts = cr->GetExecutionEnd();
361         }
362
363         Enqueue(checkable, "notification", fields, ts);
364 }
365
366 void ElasticsearchWriter::Enqueue(const Checkable::Ptr& checkable, const String& type,
367         const Dictionary::Ptr& fields, double ts)
368 {
369         /* Atomically buffer the data point. */
370         boost::mutex::scoped_lock lock(m_DataBufferMutex);
371
372         /* Format the timestamps to dynamically select the date datatype inside the index. */
373         fields->Set("@timestamp", FormatTimestamp(ts));
374         fields->Set("timestamp", FormatTimestamp(ts));
375
376         String eventType = m_EventPrefix + type;
377         fields->Set("type", eventType);
378
379         /* Every payload needs a line describing the index.
380          * We do it this way to avoid problems with a near full queue.
381          */
382         String indexBody = "{\"index\": {} }\n";
383         String fieldsBody = JsonEncode(fields);
384
385         Log(LogDebug, "ElasticsearchWriter")
386                 << "Checkable '" << checkable->GetName() << "' adds to metric list: '" << fieldsBody << "'.";
387
388         m_DataBuffer.emplace_back(indexBody + fieldsBody);
389
390         /* Flush if we've buffered too much to prevent excessive memory use. */
391         if (static_cast<int>(m_DataBuffer.size()) >= GetFlushThreshold()) {
392                 Log(LogDebug, "ElasticsearchWriter")
393                         << "Data buffer overflow writing " << m_DataBuffer.size() << " data points";
394                 Flush();
395         }
396 }
397
398 void ElasticsearchWriter::FlushTimeout()
399 {
400         /* Prevent new data points from being added to the array, there is a
401          * race condition where they could disappear.
402          */
403         boost::mutex::scoped_lock lock(m_DataBufferMutex);
404
405         /* Flush if there are any data available. */
406         if (m_DataBuffer.size() > 0) {
407                 Log(LogDebug, "ElasticsearchWriter")
408                         << "Timer expired writing " << m_DataBuffer.size() << " data points";
409                 Flush();
410         }
411 }
412
413 void ElasticsearchWriter::Flush()
414 {
415         /* Flush can be called from 1) Timeout 2) Threshold 3) on shutdown/reload. */
416         if (m_DataBuffer.empty())
417                 return;
418
419         /* Ensure you hold a lock against m_DataBuffer so that things
420          * don't go missing after creating the body and clearing the buffer.
421          */
422         String body = boost::algorithm::join(m_DataBuffer, "\n");
423         m_DataBuffer.clear();
424
425         /* Elasticsearch 6.x requires a new line. This is compatible to 5.x.
426          * Tested with 6.0.0 and 5.6.4.
427          */
428         body += "\n";
429
430         SendRequest(body);
431 }
432
433 void ElasticsearchWriter::SendRequest(const String& body)
434 {
435         namespace beast = boost::beast;
436         namespace http = beast::http;
437
438         Url::Ptr url = new Url();
439
440         url->SetScheme(GetEnableTls() ? "https" : "http");
441         url->SetHost(GetHost());
442         url->SetPort(GetPort());
443
444         std::vector<String> path;
445
446         /* Specify the index path. Best practice is a daily rotation.
447          * Example: http://localhost:9200/icinga2-2017.09.11?pretty=1
448          */
449         path.emplace_back(GetIndex() + "-" + Utility::FormatDateTime("%Y.%m.%d", Utility::GetTime()));
450
451         /* ES 6 removes multiple _type mappings: https://www.elastic.co/guide/en/elasticsearch/reference/6.x/removal-of-types.html
452          * Best practice is to statically define 'doc', as ES 5.X does not allow types starting with '_'.
453          */
454         path.emplace_back("doc");
455
456         /* Use the bulk message format. */
457         path.emplace_back("_bulk");
458
459         url->SetPath(path);
460
461         OptionalTlsStream stream;
462
463         try {
464                 stream = Connect();
465         } catch (const std::exception& ex) {
466                 Log(LogWarning, "ElasticsearchWriter")
467                         << "Flush failed, cannot connect to Elasticsearch: " << DiagnosticInformation(ex, false);
468                 return;
469         }
470
471         Defer s ([&stream]() {
472                 if (stream.first) {
473                         stream.first->next_layer().shutdown();
474                 }
475         });
476
477         http::request<http::string_body> request (http::verb::post, std::string(url->Format(true)), 10);
478
479         request.set(http::field::user_agent, "Icinga/" + Application::GetAppVersion());
480         request.set(http::field::host, url->GetHost() + ":" + url->GetPort());
481
482         /* Specify required headers by Elasticsearch. */
483         request.set(http::field::accept, "application/json");
484
485         /* Use application/x-ndjson for bulk streams. While ES
486          * is able to handle application/json, the newline separator
487          * causes problems with Logstash (#6609).
488          */
489         request.set(http::field::content_type, "application/x-ndjson");
490
491         /* Send authentication if configured. */
492         String username = GetUsername();
493         String password = GetPassword();
494
495         if (!username.IsEmpty() && !password.IsEmpty())
496                 request.set(http::field::authorization, "Basic " + Base64::Encode(username + ":" + password));
497
498         request.body() = body;
499         request.set(http::field::content_length, request.body().size());
500
501         /* Don't log the request body to debug log, this is already done above. */
502         Log(LogDebug, "ElasticsearchWriter")
503                 << "Sending " << request.method_string() << " request" << ((!username.IsEmpty() && !password.IsEmpty()) ? " with basic auth" : "" )
504                 << " to '" << url->Format() << "'.";
505
506         try {
507                 if (stream.first) {
508                         http::write(*stream.first, request);
509                         stream.first->flush();
510                 } else {
511                         http::write(*stream.second, request);
512                         stream.second->flush();
513                 }
514         } catch (const std::exception&) {
515                 Log(LogWarning, "ElasticsearchWriter")
516                         << "Cannot write to HTTP API on host '" << GetHost() << "' port '" << GetPort() << "'.";
517                 throw;
518         }
519
520         http::parser<false, http::string_body> parser;
521         beast::flat_buffer buf;
522
523         try {
524                 if (stream.first) {
525                         http::read(*stream.first, buf, parser);
526                 } else {
527                         http::read(*stream.second, buf, parser);
528                 }
529         } catch (const std::exception& ex) {
530                 Log(LogWarning, "ElasticsearchWriter")
531                         << "Failed to parse HTTP response from host '" << GetHost() << "' port '" << GetPort() << "': " << DiagnosticInformation(ex, false);
532                 throw;
533         }
534
535         auto& response (parser.get());
536
537         if (response.result_int() > 299) {
538                 if (response.result() == http::status::unauthorized) {
539                         /* More verbose error logging with Elasticsearch is hidden behind a proxy. */
540                         if (!username.IsEmpty() && !password.IsEmpty()) {
541                                 Log(LogCritical, "ElasticsearchWriter")
542                                         << "401 Unauthorized. Please ensure that the user '" << username
543                                         << "' is able to authenticate against the HTTP API/Proxy.";
544                         } else {
545                                 Log(LogCritical, "ElasticsearchWriter")
546                                         << "401 Unauthorized. The HTTP API requires authentication but no username/password has been configured.";
547                         }
548
549                         return;
550                 }
551
552                 std::ostringstream msgbuf;
553                 msgbuf << "Unexpected response code " << response.result_int() << " from URL '" << url->Format() << "'";
554
555                 auto& contentType (response[http::field::content_type]);
556
557                 if (contentType != "application/json" && contentType != "application/json; charset=utf-8") {
558                         msgbuf << "; Unexpected Content-Type: '" << contentType << "'";
559                 }
560
561                 auto& body (response.body());
562
563 #ifdef I2_DEBUG
564                 msgbuf << "; Response body: '" << body << "'";
565 #endif /* I2_DEBUG */
566
567                 Dictionary::Ptr jsonResponse;
568
569                 try {
570                         jsonResponse = JsonDecode(body);
571                 } catch (...) {
572                         Log(LogWarning, "ElasticsearchWriter")
573                                 << "Unable to parse JSON response:\n" << body;
574                         return;
575                 }
576
577                 String error = jsonResponse->Get("error");
578
579                 Log(LogCritical, "ElasticsearchWriter")
580                         << "Error: '" << error << "'. " << msgbuf.str();
581         }
582 }
583
584 OptionalTlsStream ElasticsearchWriter::Connect()
585 {
586         Log(LogNotice, "ElasticsearchWriter")
587                 << "Connecting to Elasticsearch on host '" << GetHost() << "' port '" << GetPort() << "'.";
588
589         OptionalTlsStream stream;
590         bool tls = GetEnableTls();
591
592         if (tls) {
593                 std::shared_ptr<boost::asio::ssl::context> sslContext;
594
595                 try {
596                         sslContext = MakeAsioSslContext(GetCertPath(), GetKeyPath(), GetCaPath());
597                 } catch (const std::exception&) {
598                         Log(LogWarning, "ElasticsearchWriter")
599                                 << "Unable to create SSL context.";
600                         throw;
601                 }
602
603                 stream.first = std::make_shared<AsioTlsStream>(IoEngine::Get().GetIoService(), *sslContext, GetHost());
604         } else {
605                 stream.second = std::make_shared<AsioTcpStream>(IoEngine::Get().GetIoService());
606         }
607
608         try {
609                 icinga::Connect(tls ? stream.first->lowest_layer() : stream.second->lowest_layer(), GetHost(), GetPort());
610         } catch (const std::exception&) {
611                 Log(LogWarning, "ElasticsearchWriter")
612                         << "Can't connect to Elasticsearch on host '" << GetHost() << "' port '" << GetPort() << "'.";
613                 throw;
614         }
615
616         if (tls) {
617                 auto& tlsStream (stream.first->next_layer());
618
619                 try {
620                         tlsStream.handshake(tlsStream.client);
621                 } catch (const std::exception&) {
622                         Log(LogWarning, "ElasticsearchWriter")
623                                 << "TLS handshake with host '" << GetHost() << "' on port " << GetPort() << " failed.";
624                         throw;
625                 }
626         }
627
628         return std::move(stream);
629 }
630
631 void ElasticsearchWriter::AssertOnWorkQueue()
632 {
633         ASSERT(m_WorkQueue.IsWorkerThread());
634 }
635
636 void ElasticsearchWriter::ExceptionHandler(boost::exception_ptr exp)
637 {
638         Log(LogCritical, "ElasticsearchWriter", "Exception during Elastic operation: Verify that your backend is operational!");
639
640         Log(LogDebug, "ElasticsearchWriter")
641                 << "Exception during Elasticsearch operation: " << DiagnosticInformation(std::move(exp));
642 }
643
644 String ElasticsearchWriter::FormatTimestamp(double ts)
645 {
646         /* The date format must match the default dynamic date detection
647          * pattern in indexes. This enables applications like Kibana to
648          * detect a qualified timestamp index for time-series data.
649          *
650          * Example: 2017-09-11T10:56:21.463+0200
651          *
652          * References:
653          * https://www.elastic.co/guide/en/elasticsearch/reference/current/dynamic-field-mapping.html#date-detection
654          * https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping-date-format.html
655          * https://www.elastic.co/guide/en/elasticsearch/reference/current/date.html
656          */
657         auto milliSeconds = static_cast<int>((ts - static_cast<int>(ts)) * 1000);
658
659         return Utility::FormatDateTime("%Y-%m-%dT%H:%M:%S", ts) + "." + Convert::ToString(milliSeconds) + Utility::FormatDateTime("%z", ts);
660 }