]> granicus.if.org Git - icinga2/blob - lib/perfdata/elasticsearchwriter.cpp
Elasticsearch: Improve error handling/logging
[icinga2] / lib / perfdata / elasticsearchwriter.cpp
1 /* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */
2
3 #include "perfdata/elasticsearchwriter.hpp"
4 #include "perfdata/elasticsearchwriter-ti.cpp"
5 #include "remote/url.hpp"
6 #include "remote/httprequest.hpp"
7 #include "remote/httpresponse.hpp"
8 #include "icinga/compatutility.hpp"
9 #include "icinga/service.hpp"
10 #include "icinga/checkcommand.hpp"
11 #include "base/defer.hpp"
12 #include "base/tcpsocket.hpp"
13 #include "base/stream.hpp"
14 #include "base/base64.hpp"
15 #include "base/json.hpp"
16 #include "base/utility.hpp"
17 #include "base/networkstream.hpp"
18 #include "base/perfdatavalue.hpp"
19 #include "base/exception.hpp"
20 #include "base/statsfunction.hpp"
21 #include <boost/algorithm/string.hpp>
22 #include <boost/scoped_array.hpp>
23 #include <utility>
24
25 using namespace icinga;
26
27 REGISTER_TYPE(ElasticsearchWriter);
28
29 REGISTER_STATSFUNCTION(ElasticsearchWriter, &ElasticsearchWriter::StatsFunc);
30
31 void ElasticsearchWriter::OnConfigLoaded()
32 {
33         ObjectImpl<ElasticsearchWriter>::OnConfigLoaded();
34
35         m_WorkQueue.SetName("ElasticsearchWriter, " + GetName());
36
37         if (!GetEnableHa()) {
38                 Log(LogDebug, "ElasticsearchWriter")
39                         << "HA functionality disabled. Won't pause connection: " << GetName();
40
41                 SetHAMode(HARunEverywhere);
42         } else {
43                 SetHAMode(HARunOnce);
44         }
45 }
46
47 void ElasticsearchWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata)
48 {
49         DictionaryData nodes;
50
51         for (const ElasticsearchWriter::Ptr& elasticsearchwriter : ConfigType::GetObjectsByType<ElasticsearchWriter>()) {
52                 size_t workQueueItems = elasticsearchwriter->m_WorkQueue.GetLength();
53                 double workQueueItemRate = elasticsearchwriter->m_WorkQueue.GetTaskCount(60) / 60.0;
54
55                 nodes.emplace_back(elasticsearchwriter->GetName(), new Dictionary({
56                         { "work_queue_items", workQueueItems },
57                         { "work_queue_item_rate", workQueueItemRate }
58                 }));
59
60                 perfdata->Add(new PerfdataValue("elasticsearchwriter_" + elasticsearchwriter->GetName() + "_work_queue_items", workQueueItems));
61                 perfdata->Add(new PerfdataValue("elasticsearchwriter_" + elasticsearchwriter->GetName() + "_work_queue_item_rate", workQueueItemRate));
62         }
63
64         status->Set("elasticsearchwriter", new Dictionary(std::move(nodes)));
65 }
66
67 void ElasticsearchWriter::Resume()
68 {
69         ObjectImpl<ElasticsearchWriter>::Resume();
70
71         m_EventPrefix = "icinga2.event.";
72
73         Log(LogInformation, "ElasticsearchWriter")
74                 << "'" << GetName() << "' resumed.";
75
76         m_WorkQueue.SetExceptionCallback(std::bind(&ElasticsearchWriter::ExceptionHandler, this, _1));
77
78         /* Setup timer for periodically flushing m_DataBuffer */
79         m_FlushTimer = new Timer();
80         m_FlushTimer->SetInterval(GetFlushInterval());
81         m_FlushTimer->OnTimerExpired.connect(std::bind(&ElasticsearchWriter::FlushTimeout, this));
82         m_FlushTimer->Start();
83         m_FlushTimer->Reschedule(0);
84
85         /* Register for new metrics. */
86         Checkable::OnNewCheckResult.connect(std::bind(&ElasticsearchWriter::CheckResultHandler, this, _1, _2));
87         Checkable::OnStateChange.connect(std::bind(&ElasticsearchWriter::StateChangeHandler, this, _1, _2, _3));
88         Checkable::OnNotificationSentToAllUsers.connect(std::bind(&ElasticsearchWriter::NotificationSentToAllUsersHandler, this, _1, _2, _3, _4, _5, _6, _7));
89 }
90
91 /* Pause is equivalent to Stop, but with HA capabilities to resume at runtime. */
92 void ElasticsearchWriter::Pause()
93 {
94         Flush();
95         m_WorkQueue.Join();
96         Flush();
97
98         Log(LogInformation, "ElasticsearchWriter")
99                 << "'" << GetName() << "' paused.";
100
101         ObjectImpl<ElasticsearchWriter>::Pause();
102 }
103
104 void ElasticsearchWriter::AddCheckResult(const Dictionary::Ptr& fields, const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
105 {
106         String prefix = "check_result.";
107
108         fields->Set(prefix + "output", cr->GetOutput());
109         fields->Set(prefix + "check_source", cr->GetCheckSource());
110         fields->Set(prefix + "exit_status", cr->GetExitStatus());
111         fields->Set(prefix + "command", cr->GetCommand());
112         fields->Set(prefix + "state", cr->GetState());
113         fields->Set(prefix + "vars_before", cr->GetVarsBefore());
114         fields->Set(prefix + "vars_after", cr->GetVarsAfter());
115
116         fields->Set(prefix + "execution_start", FormatTimestamp(cr->GetExecutionStart()));
117         fields->Set(prefix + "execution_end", FormatTimestamp(cr->GetExecutionEnd()));
118         fields->Set(prefix + "schedule_start", FormatTimestamp(cr->GetScheduleStart()));
119         fields->Set(prefix + "schedule_end", FormatTimestamp(cr->GetScheduleEnd()));
120
121         /* Add extra calculated field. */
122         fields->Set(prefix + "latency", cr->CalculateLatency());
123         fields->Set(prefix + "execution_time", cr->CalculateExecutionTime());
124
125         if (!GetEnableSendPerfdata())
126                 return;
127
128         Array::Ptr perfdata = cr->GetPerformanceData();
129
130         if (perfdata) {
131                 ObjectLock olock(perfdata);
132                 for (const Value& val : perfdata) {
133                         PerfdataValue::Ptr pdv;
134
135                         if (val.IsObjectType<PerfdataValue>())
136                                 pdv = val;
137                         else {
138                                 try {
139                                         pdv = PerfdataValue::Parse(val);
140                                 } catch (const std::exception&) {
141                                         Log(LogWarning, "ElasticsearchWriter")
142                                                 << "Ignoring invalid perfdata value: '" << val << "' for object '"
143                                                 << checkable->GetName() << "'.";
144                                         continue;
145                                 }
146                         }
147
148                         String escapedKey = pdv->GetLabel();
149                         boost::replace_all(escapedKey, " ", "_");
150                         boost::replace_all(escapedKey, ".", "_");
151                         boost::replace_all(escapedKey, "\\", "_");
152                         boost::algorithm::replace_all(escapedKey, "::", ".");
153
154                         String perfdataPrefix = prefix + "perfdata." + escapedKey;
155
156                         fields->Set(perfdataPrefix + ".value", pdv->GetValue());
157
158                         if (pdv->GetMin())
159                                 fields->Set(perfdataPrefix + ".min", pdv->GetMin());
160                         if (pdv->GetMax())
161                                 fields->Set(perfdataPrefix + ".max", pdv->GetMax());
162                         if (pdv->GetWarn())
163                                 fields->Set(perfdataPrefix + ".warn", pdv->GetWarn());
164                         if (pdv->GetCrit())
165                                 fields->Set(perfdataPrefix + ".crit", pdv->GetCrit());
166
167                         if (!pdv->GetUnit().IsEmpty())
168                                 fields->Set(perfdataPrefix + ".unit", pdv->GetUnit());
169                 }
170         }
171 }
172
173 void ElasticsearchWriter::CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
174 {
175         if (IsPaused())
176                 return;
177
178         m_WorkQueue.Enqueue(std::bind(&ElasticsearchWriter::InternalCheckResultHandler, this, checkable, cr));
179 }
180
181 void ElasticsearchWriter::InternalCheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
182 {
183         AssertOnWorkQueue();
184
185         CONTEXT("Elasticwriter processing check result for '" + checkable->GetName() + "'");
186
187         if (!IcingaApplication::GetInstance()->GetEnablePerfdata() || !checkable->GetEnablePerfdata())
188                 return;
189
190         Host::Ptr host;
191         Service::Ptr service;
192         tie(host, service) = GetHostService(checkable);
193
194         Dictionary::Ptr fields = new Dictionary();
195
196         if (service) {
197                 fields->Set("service", service->GetShortName());
198                 fields->Set("state", service->GetState());
199                 fields->Set("last_state", service->GetLastState());
200                 fields->Set("last_hard_state", service->GetLastHardState());
201         } else {
202                 fields->Set("state", host->GetState());
203                 fields->Set("last_state", host->GetLastState());
204                 fields->Set("last_hard_state", host->GetLastHardState());
205         }
206
207         fields->Set("host", host->GetName());
208         fields->Set("state_type", checkable->GetStateType());
209
210         fields->Set("current_check_attempt", checkable->GetCheckAttempt());
211         fields->Set("max_check_attempts", checkable->GetMaxCheckAttempts());
212
213         fields->Set("reachable", checkable->IsReachable());
214
215         CheckCommand::Ptr commandObj = checkable->GetCheckCommand();
216
217         if (commandObj)
218                 fields->Set("check_command", commandObj->GetName());
219
220         double ts = Utility::GetTime();
221
222         if (cr) {
223                 AddCheckResult(fields, checkable, cr);
224                 ts = cr->GetExecutionEnd();
225         }
226
227         Enqueue("checkresult", fields, ts);
228 }
229
230 void ElasticsearchWriter::StateChangeHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type)
231 {
232         if (IsPaused())
233                 return;
234
235         m_WorkQueue.Enqueue(std::bind(&ElasticsearchWriter::StateChangeHandlerInternal, this, checkable, cr, type));
236 }
237
238 void ElasticsearchWriter::StateChangeHandlerInternal(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type)
239 {
240         AssertOnWorkQueue();
241
242         CONTEXT("Elasticwriter processing state change '" + checkable->GetName() + "'");
243
244         Host::Ptr host;
245         Service::Ptr service;
246         tie(host, service) = GetHostService(checkable);
247
248         Dictionary::Ptr fields = new Dictionary();
249
250         fields->Set("current_check_attempt", checkable->GetCheckAttempt());
251         fields->Set("max_check_attempts", checkable->GetMaxCheckAttempts());
252         fields->Set("host", host->GetName());
253
254         if (service) {
255                 fields->Set("service", service->GetShortName());
256                 fields->Set("state", service->GetState());
257                 fields->Set("last_state", service->GetLastState());
258                 fields->Set("last_hard_state", service->GetLastHardState());
259         } else {
260                 fields->Set("state", host->GetState());
261                 fields->Set("last_state", host->GetLastState());
262                 fields->Set("last_hard_state", host->GetLastHardState());
263         }
264
265         CheckCommand::Ptr commandObj = checkable->GetCheckCommand();
266
267         if (commandObj)
268                 fields->Set("check_command", commandObj->GetName());
269
270         double ts = Utility::GetTime();
271
272         if (cr) {
273                 AddCheckResult(fields, checkable, cr);
274                 ts = cr->GetExecutionEnd();
275         }
276
277         Enqueue("statechange", fields, ts);
278 }
279
280 void ElasticsearchWriter::NotificationSentToAllUsersHandler(const Notification::Ptr& notification,
281         const Checkable::Ptr& checkable, const std::set<User::Ptr>& users, NotificationType type,
282         const CheckResult::Ptr& cr, const String& author, const String& text)
283 {
284         if (IsPaused())
285                 return;
286
287         m_WorkQueue.Enqueue(std::bind(&ElasticsearchWriter::NotificationSentToAllUsersHandlerInternal, this,
288                 notification, checkable, users, type, cr, author, text));
289 }
290
291 void ElasticsearchWriter::NotificationSentToAllUsersHandlerInternal(const Notification::Ptr& notification,
292         const Checkable::Ptr& checkable, const std::set<User::Ptr>& users, NotificationType type,
293         const CheckResult::Ptr& cr, const String& author, const String& text)
294 {
295         AssertOnWorkQueue();
296
297         CONTEXT("Elasticwriter processing notification to all users '" + checkable->GetName() + "'");
298
299         Log(LogDebug, "ElasticsearchWriter")
300                 << "Processing notification for '" << checkable->GetName() << "'";
301
302         Host::Ptr host;
303         Service::Ptr service;
304         tie(host, service) = GetHostService(checkable);
305
306         String notificationTypeString = Notification::NotificationTypeToString(type);
307
308         Dictionary::Ptr fields = new Dictionary();
309
310         if (service) {
311                 fields->Set("service", service->GetShortName());
312                 fields->Set("state", service->GetState());
313                 fields->Set("last_state", service->GetLastState());
314                 fields->Set("last_hard_state", service->GetLastHardState());
315         } else {
316                 fields->Set("state", host->GetState());
317                 fields->Set("last_state", host->GetLastState());
318                 fields->Set("last_hard_state", host->GetLastHardState());
319         }
320
321         fields->Set("host", host->GetName());
322
323         ArrayData userNames;
324
325         for (const User::Ptr& user : users) {
326                 userNames.push_back(user->GetName());
327         }
328
329         fields->Set("users", new Array(std::move(userNames)));
330         fields->Set("notification_type", notificationTypeString);
331         fields->Set("author", author);
332         fields->Set("text", text);
333
334         CheckCommand::Ptr commandObj = checkable->GetCheckCommand();
335
336         if (commandObj)
337                 fields->Set("check_command", commandObj->GetName());
338
339         double ts = Utility::GetTime();
340
341         if (cr) {
342                 AddCheckResult(fields, checkable, cr);
343                 ts = cr->GetExecutionEnd();
344         }
345
346         Enqueue("notification", fields, ts);
347 }
348
349 void ElasticsearchWriter::Enqueue(const String& type, const Dictionary::Ptr& fields, double ts)
350 {
351         /* Atomically buffer the data point. */
352         boost::mutex::scoped_lock lock(m_DataBufferMutex);
353
354         /* Format the timestamps to dynamically select the date datatype inside the index. */
355         fields->Set("@timestamp", FormatTimestamp(ts));
356         fields->Set("timestamp", FormatTimestamp(ts));
357
358         String eventType = m_EventPrefix + type;
359         fields->Set("type", eventType);
360
361         /* Every payload needs a line describing the index.
362          * We do it this way to avoid problems with a near full queue.
363          */
364         String indexBody = "{\"index\": {} }\n";
365         String fieldsBody = JsonEncode(fields);
366
367         Log(LogDebug, "ElasticsearchWriter")
368                 << "Add to fields to message list: '" << fieldsBody << "'.";
369
370         m_DataBuffer.emplace_back(indexBody + fieldsBody);
371
372         /* Flush if we've buffered too much to prevent excessive memory use. */
373         if (static_cast<int>(m_DataBuffer.size()) >= GetFlushThreshold()) {
374                 Log(LogDebug, "ElasticsearchWriter")
375                         << "Data buffer overflow writing " << m_DataBuffer.size() << " data points";
376                 Flush();
377         }
378 }
379
380 void ElasticsearchWriter::FlushTimeout()
381 {
382         /* Prevent new data points from being added to the array, there is a
383          * race condition where they could disappear.
384          */
385         boost::mutex::scoped_lock lock(m_DataBufferMutex);
386
387         /* Flush if there are any data available. */
388         if (m_DataBuffer.size() > 0) {
389                 Log(LogDebug, "ElasticsearchWriter")
390                         << "Timer expired writing " << m_DataBuffer.size() << " data points";
391                 Flush();
392         }
393 }
394
395 void ElasticsearchWriter::Flush()
396 {
397         /* Ensure you hold a lock against m_DataBuffer so that things
398          * don't go missing after creating the body and clearing the buffer.
399          */
400         String body = boost::algorithm::join(m_DataBuffer, "\n");
401         m_DataBuffer.clear();
402
403         /* Elasticsearch 6.x requires a new line. This is compatible to 5.x.
404          * Tested with 6.0.0 and 5.6.4.
405          */
406         body += "\n";
407
408         SendRequest(body);
409 }
410
411 void ElasticsearchWriter::SendRequest(const String& body)
412 {
413         Url::Ptr url = new Url();
414
415         url->SetScheme(GetEnableTls() ? "https" : "http");
416         url->SetHost(GetHost());
417         url->SetPort(GetPort());
418
419         std::vector<String> path;
420
421         /* Specify the index path. Best practice is a daily rotation.
422          * Example: http://localhost:9200/icinga2-2017.09.11?pretty=1
423          */
424         path.emplace_back(GetIndex() + "-" + Utility::FormatDateTime("%Y.%m.%d", Utility::GetTime()));
425
426         /* ES 6 removes multiple _type mappings: https://www.elastic.co/guide/en/elasticsearch/reference/6.x/removal-of-types.html
427          * Best practice is to statically define 'doc', as ES 5.X does not allow types starting with '_'.
428          */
429         path.emplace_back("doc");
430
431         /* Use the bulk message format. */
432         path.emplace_back("_bulk");
433
434         url->SetPath(path);
435
436         Stream::Ptr stream;
437
438         try {
439                 stream = Connect();
440         } catch (const std::exception& ex) {
441                 Log(LogWarning, "ElasticsearchWriter")
442                         << "Flush failed, cannot connect to Elasticsearch: " << DiagnosticInformation(ex, false);
443                 return;
444         }
445
446         if (!stream)
447                 return;
448
449         Defer close ([&stream]() { stream->Close(); });
450
451         HttpRequest req(stream);
452
453         /* Specify required headers by Elasticsearch. */
454         req.AddHeader("Accept", "application/json");
455         req.AddHeader("Content-Type", "application/json");
456
457         /* Send authentication if configured. */
458         String username = GetUsername();
459         String password = GetPassword();
460
461         if (!username.IsEmpty() && !password.IsEmpty())
462                 req.AddHeader("Authorization", "Basic " + Base64::Encode(username + ":" + password));
463
464         req.RequestMethod = "POST";
465         req.RequestUrl = url;
466
467         /* Don't log the request body to debug log, this is already done above. */
468         Log(LogDebug, "ElasticsearchWriter")
469                 << "Sending " << req.RequestMethod << " request" << ((!username.IsEmpty() && !password.IsEmpty()) ? " with basic auth" : "" )
470                 << " to '" << url->Format() << "'.";
471
472         try {
473                 req.WriteBody(body.CStr(), body.GetLength());
474                 req.Finish();
475         } catch (const std::exception& ex) {
476                 Log(LogWarning, "ElasticsearchWriter")
477                         << "Cannot write to HTTP API on host '" << GetHost() << "' port '" << GetPort() << "'.";
478                 throw ex;
479         }
480
481         HttpResponse resp(stream, req);
482         StreamReadContext context;
483
484         try {
485                 resp.Parse(context, true);
486                 while (resp.Parse(context, true) && !resp.Complete)
487                         ; /* Do nothing */
488         } catch (const std::exception& ex) {
489                 Log(LogWarning, "ElasticsearchWriter")
490                         << "Failed to parse HTTP response from host '" << GetHost() << "' port '" << GetPort() << "': " << DiagnosticInformation(ex, false);
491                 throw ex;
492         }
493
494         if (!resp.Complete) {
495                 Log(LogWarning, "ElasticsearchWriter")
496                         << "Failed to read a complete HTTP response from the Elasticsearch server.";
497                 return;
498         }
499
500         if (resp.StatusCode > 299) {
501                 if (resp.StatusCode == 401) {
502                         /* More verbose error logging with Elasticsearch is hidden behind a proxy. */
503                         if (!username.IsEmpty() && !password.IsEmpty()) {
504                                 Log(LogCritical, "ElasticsearchWriter")
505                                         << "401 Unauthorized. Please ensure that the user '" << username
506                                         << "' is able to authenticate against the HTTP API/Proxy.";
507                         } else {
508                                 Log(LogCritical, "ElasticsearchWriter")
509                                         << "401 Unauthorized. The HTTP API requires authentication but no username/password has been configured.";
510                         }
511
512                         return;
513                 }
514
515                 std::ostringstream msgbuf;
516                 msgbuf << "Unexpected response code " << resp.StatusCode << " from URL '" << req.RequestUrl->Format() << "'";
517
518                 String contentType = resp.Headers->Get("content-type");
519
520                 if (contentType != "application/json" && contentType != "application/json; charset=utf-8") {
521                         msgbuf << "; Unexpected Content-Type: '" << contentType << "'";
522                 }
523
524                 size_t responseSize = resp.GetBodySize();
525                 boost::scoped_array<char> buffer(new char[responseSize + 1]);
526                 resp.ReadBody(buffer.get(), responseSize);
527                 buffer.get()[responseSize] = '\0';
528
529 #ifdef I2_DEBUG
530                 msgbuf << "; Response body: '" << buffer.get() << "'";
531 #endif /* I2_DEBUG */
532
533                 /* {"statusCode":404,"error":"Not Found","message":"Not Found"} */
534                 Dictionary::Ptr jsonResponse;
535                 try {
536                         jsonResponse = JsonDecode(buffer.get());
537                 } catch (...) {
538                         Log(LogWarning, "ElasticsearchWriter")
539                                 << "Unable to parse JSON response:\n" << buffer.get();
540                         return;
541                 }
542
543                 String error = jsonResponse->Get("error");
544
545                 Log(LogCritical, "ElasticsearchWriter")
546                         << "Error: '" << error << "'. " << msgbuf.str();
547
548                 return;
549         }
550 }
551
552 Stream::Ptr ElasticsearchWriter::Connect()
553 {
554         TcpSocket::Ptr socket = new TcpSocket();
555
556         Log(LogNotice, "ElasticsearchWriter")
557                 << "Connecting to Elasticsearch on host '" << GetHost() << "' port '" << GetPort() << "'.";
558
559         try {
560                 socket->Connect(GetHost(), GetPort());
561         } catch (const std::exception& ex) {
562                 Log(LogWarning, "ElasticsearchWriter")
563                         << "Can't connect to Elasticsearch on host '" << GetHost() << "' port '" << GetPort() << "'.";
564                 throw ex;
565         }
566
567         if (GetEnableTls()) {
568                 std::shared_ptr<SSL_CTX> sslContext;
569
570                 try {
571                         sslContext = MakeSSLContext(GetCertPath(), GetKeyPath(), GetCaPath());
572                 } catch (const std::exception& ex) {
573                         Log(LogWarning, "ElasticsearchWriter")
574                                 << "Unable to create SSL context.";
575                         throw ex;
576                 }
577
578                 TlsStream::Ptr tlsStream = new TlsStream(socket, GetHost(), RoleClient, sslContext);
579
580                 try {
581                         tlsStream->Handshake();
582                 } catch (const std::exception& ex) {
583                         Log(LogWarning, "ElasticsearchWriter")
584                                 << "TLS handshake with host '" << GetHost() << "' on port " << GetPort() << " failed.";
585                         throw ex;
586                 }
587
588                 return tlsStream;
589         } else {
590                 return new NetworkStream(socket);
591         }
592 }
593
594 void ElasticsearchWriter::AssertOnWorkQueue()
595 {
596         ASSERT(m_WorkQueue.IsWorkerThread());
597 }
598
599 void ElasticsearchWriter::ExceptionHandler(boost::exception_ptr exp)
600 {
601         Log(LogCritical, "ElasticsearchWriter", "Exception during Elastic operation: Verify that your backend is operational!");
602
603         Log(LogDebug, "ElasticsearchWriter")
604                 << "Exception during Elasticsearch operation: " << DiagnosticInformation(std::move(exp));
605 }
606
607 String ElasticsearchWriter::FormatTimestamp(double ts)
608 {
609         /* The date format must match the default dynamic date detection
610          * pattern in indexes. This enables applications like Kibana to
611          * detect a qualified timestamp index for time-series data.
612          *
613          * Example: 2017-09-11T10:56:21.463+0200
614          *
615          * References:
616          * https://www.elastic.co/guide/en/elasticsearch/reference/current/dynamic-field-mapping.html#date-detection
617          * https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping-date-format.html
618          * https://www.elastic.co/guide/en/elasticsearch/reference/current/date.html
619          */
620         auto milliSeconds = static_cast<int>((ts - static_cast<int>(ts)) * 1000);
621
622         return Utility::FormatDateTime("%Y-%m-%dT%H:%M:%S", ts) + "." + Convert::ToString(milliSeconds) + Utility::FormatDateTime("%z", ts);
623 }