]> granicus.if.org Git - icinga2/blob - lib/perfdata/influxdbwriter.cpp
e1f99e1605d40d96c5f25379d9d9192850d6036c
[icinga2] / lib / perfdata / influxdbwriter.cpp
1 /******************************************************************************
2  * Icinga 2                                                                   *
3  * Copyright (C) 2012-2017 Icinga Development Team (https://www.icinga.com/)  *
4  *                                                                            *
5  * This program is free software; you can redistribute it and/or              *
6  * modify it under the terms of the GNU General Public License                *
7  * as published by the Free Software Foundation; either version 2             *
8  * of the License, or (at your option) any later version.                     *
9  *                                                                            *
10  * This program is distributed in the hope that it will be useful,            *
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of             *
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the              *
13  * GNU General Public License for more details.                               *
14  *                                                                            *
15  * You should have received a copy of the GNU General Public License          *
16  * along with this program; if not, write to the Free Software Foundation     *
17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA.             *
18  ******************************************************************************/
19
20 #include "perfdata/influxdbwriter.hpp"
21 #include "perfdata/influxdbwriter.tcpp"
22 #include "remote/url.hpp"
23 #include "remote/httprequest.hpp"
24 #include "remote/httpresponse.hpp"
25 #include "icinga/service.hpp"
26 #include "icinga/macroprocessor.hpp"
27 #include "icinga/icingaapplication.hpp"
28 #include "icinga/perfdatavalue.hpp"
29 #include "icinga/checkcommand.hpp"
30 #include "base/tcpsocket.hpp"
31 #include "base/configtype.hpp"
32 #include "base/objectlock.hpp"
33 #include "base/logger.hpp"
34 #include "base/convert.hpp"
35 #include "base/utility.hpp"
36 #include "base/stream.hpp"
37 #include "base/json.hpp"
38 #include "base/networkstream.hpp"
39 #include "base/exception.hpp"
40 #include "base/statsfunction.hpp"
41 #include "base/tlsutility.hpp"
42 #include <boost/algorithm/string.hpp>
43 #include <boost/algorithm/string/classification.hpp>
44 #include <boost/algorithm/string/split.hpp>
45 #include <boost/algorithm/string/replace.hpp>
46 #include <boost/regex.hpp>
47 #include <boost/scoped_array.hpp>
48
49 using namespace icinga;
50
51 REGISTER_TYPE(InfluxdbWriter);
52
53 REGISTER_STATSFUNCTION(InfluxdbWriter, &InfluxdbWriter::StatsFunc);
54
55 //TODO: Evaluate whether multiple WQ threads and InfluxDB connections are possible. 10 threads will hog InfluxDB in large scale environments.
56 InfluxdbWriter::InfluxdbWriter(void)
57     : m_WorkQueue(10000000, 1), m_TaskStats(15 * 60), m_PendingTasks(0), m_PendingTasksTimestamp(0)
58 { }
59
60 void InfluxdbWriter::OnConfigLoaded(void)
61 {
62         ObjectImpl<InfluxdbWriter>::OnConfigLoaded();
63
64         m_WorkQueue.SetName("InfluxdbWriter, " + GetName());
65 }
66
67 void InfluxdbWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr&)
68 {
69         Dictionary::Ptr nodes = new Dictionary();
70
71         for (const InfluxdbWriter::Ptr& influxdbwriter : ConfigType::GetObjectsByType<InfluxdbWriter>()) {
72                 size_t workQueueItems = influxdbwriter->m_WorkQueue.GetLength();
73                 size_t dataBufferItems = influxdbwriter->m_DataBuffer.size();
74
75                 //TODO: Collect more stats
76                 Dictionary::Ptr stats = new Dictionary();
77                 stats->Set("work_queue_items", workQueueItems);
78                 stats->Set("data_buffer_items", dataBufferItems);
79
80                 nodes->Set(influxdbwriter->GetName(), stats);
81         }
82
83         status->Set("influxdbwriter", nodes);
84 }
85
86 void InfluxdbWriter::Start(bool runtimeCreated)
87 {
88         ObjectImpl<InfluxdbWriter>::Start(runtimeCreated);
89
90         Log(LogInformation, "InfluxdbWriter")
91             << "'" << GetName() << "' started.";
92
93         /* Register exception handler for WQ tasks. */
94         m_WorkQueue.SetExceptionCallback(boost::bind(&InfluxdbWriter::ExceptionHandler, this, _1));
95
96         /* Setup timer for periodically flushing m_DataBuffer */
97         m_FlushTimer = new Timer();
98         m_FlushTimer->SetInterval(GetFlushInterval());
99         m_FlushTimer->OnTimerExpired.connect(boost::bind(&InfluxdbWriter::FlushTimeout, this));
100         m_FlushTimer->Start();
101         m_FlushTimer->Reschedule(0);
102
103         /* Timer for updating and logging work queue stats */
104         m_StatsLoggerTimer = new Timer();
105         m_StatsLoggerTimer->SetInterval(60); // don't be too noisy
106         m_StatsLoggerTimer->OnTimerExpired.connect(boost::bind(&InfluxdbWriter::StatsLoggerTimerHandler, this));
107         m_StatsLoggerTimer->Start();
108
109         /* Register for new metrics. */
110         Service::OnNewCheckResult.connect(boost::bind(&InfluxdbWriter::CheckResultHandler, this, _1, _2));
111 }
112
113 void InfluxdbWriter::Stop(bool runtimeRemoved)
114 {
115         Log(LogInformation, "InfluxdbWriter")
116             << "'" << GetName() << "' stopped.";
117
118         m_WorkQueue.Join();
119
120         ObjectImpl<InfluxdbWriter>::Stop(runtimeRemoved);
121 }
122
123 void InfluxdbWriter::AssertOnWorkQueue(void)
124 {
125         ASSERT(m_WorkQueue.IsWorkerThread());
126 }
127
128 void InfluxdbWriter::ExceptionHandler(boost::exception_ptr exp)
129 {
130         Log(LogCritical, "InfluxdbWriter", "Exception during InfluxDB operation: Verify that your backend is operational!");
131
132         Log(LogDebug, "InfluxdbWriter")
133             << "Exception during InfluxDB operation: " << DiagnosticInformation(exp);
134
135         //TODO: Close the connection, if we keep it open.
136 }
137
138 void InfluxdbWriter::StatsLoggerTimerHandler(void)
139 {
140         int pending = m_WorkQueue.GetLength();
141
142         double now = Utility::GetTime();
143         double gradient = (pending - m_PendingTasks) / (now - m_PendingTasksTimestamp);
144         double timeToZero = pending / gradient;
145
146         String timeInfo;
147
148         if (pending > GetTaskCount(5)) {
149                 timeInfo = " empty in ";
150                 if (timeToZero < 0)
151                         timeInfo += "infinite time, your backend isn't able to keep up";
152                 else
153                         timeInfo += Utility::FormatDuration(timeToZero);
154         }
155
156         m_PendingTasks = pending;
157         m_PendingTasksTimestamp = now;
158
159         Log(LogInformation, "InfluxdbWriter")
160             << "Work queue items: " << pending
161             << ", rate: " << std::setw(2) << GetTaskCount(60) / 60.0 << "/s"
162             << " (" << GetTaskCount(60) << "/min " << GetTaskCount(60 * 5) << "/5min " << GetTaskCount(60 * 15) << "/15min);"
163             << timeInfo;
164 }
165
166 Stream::Ptr InfluxdbWriter::Connect(TcpSocket::Ptr& socket)
167 {
168         socket = new TcpSocket();
169
170         Log(LogNotice, "InfluxdbWriter")
171             << "Reconnecting to InfluxDB on host '" << GetHost() << "' port '" << GetPort() << "'.";
172
173         try {
174                 socket->Connect(GetHost(), GetPort());
175         } catch (const std::exception& ex) {
176                 Log(LogWarning, "InfluxdbWriter")
177                     << "Can't connect to InfluxDB on host '" << GetHost() << "' port '" << GetPort() << "'.";
178                 throw ex;
179         }
180
181         if (GetSslEnable()) {
182                 boost::shared_ptr<SSL_CTX> sslContext;
183                 try {
184                         sslContext = MakeSSLContext(GetSslCert(), GetSslKey(), GetSslCaCert());
185                 } catch (const std::exception& ex) {
186                         Log(LogWarning, "InfluxdbWriter")
187                             << "Unable to create SSL context.";
188                         throw ex;
189                 }
190
191                 TlsStream::Ptr tlsStream = new TlsStream(socket, GetHost(), RoleClient, sslContext);
192                 try {
193                         tlsStream->Handshake();
194                 } catch (const std::exception& ex) {
195                         Log(LogWarning, "InfluxdbWriter")
196                             << "TLS handshake with host '" << GetHost() << "' failed.";
197                         throw ex;
198                 }
199
200                 return tlsStream;
201         } else {
202                 return new NetworkStream(socket);
203         }
204 }
205
206 void InfluxdbWriter::CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
207 {
208         CONTEXT("Processing check result for '" + checkable->GetName() + "'");
209
210         if (!IcingaApplication::GetInstance()->GetEnablePerfdata() || !checkable->GetEnablePerfdata())
211                 return;
212
213         Host::Ptr host;
214         Service::Ptr service;
215         boost::tie(host, service) = GetHostService(checkable);
216
217         MacroProcessor::ResolverList resolvers;
218         if (service)
219                 resolvers.push_back(std::make_pair("service", service));
220         resolvers.push_back(std::make_pair("host", host));
221         resolvers.push_back(std::make_pair("icinga", IcingaApplication::GetInstance()));
222
223         String prefix;
224
225         double ts = cr->GetExecutionEnd();
226
227         // Clone the template and perform an in-place macro expansion of measurement and tag values
228         Dictionary::Ptr tmpl_clean = service ? GetServiceTemplate() : GetHostTemplate();
229         Dictionary::Ptr tmpl = static_pointer_cast<Dictionary>(tmpl_clean->Clone());
230         tmpl->Set("measurement", MacroProcessor::ResolveMacros(tmpl->Get("measurement"), resolvers, cr));
231
232         Dictionary::Ptr tags = tmpl->Get("tags");
233         if (tags) {
234                 ObjectLock olock(tags);
235                 for (const Dictionary::Pair& pair : tags) {
236                         // Prevent missing macros from warning; will return an empty value
237                         // which will be filtered out in SendMetric()
238                         String missing_macro;
239                         tags->Set(pair.first, MacroProcessor::ResolveMacros(pair.second, resolvers, cr, &missing_macro));
240                 }
241         }
242
243         SendPerfdata(tmpl, checkable, cr, ts);
244 }
245
246 String InfluxdbWriter::FormatInteger(int val)
247 {
248         return Convert::ToString(val) + "i";
249 }
250
251 String InfluxdbWriter::FormatBoolean(bool val)
252 {
253         return String(val);
254 }
255
256 void InfluxdbWriter::SendPerfdata(const Dictionary::Ptr& tmpl, const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, double ts)
257 {
258         Array::Ptr perfdata = cr->GetPerformanceData();
259         if (perfdata) {
260                 ObjectLock olock(perfdata);
261                 for (const Value& val : perfdata) {
262                         PerfdataValue::Ptr pdv;
263
264                         if (val.IsObjectType<PerfdataValue>())
265                                 pdv = val;
266                         else {
267                                 try {
268                                         pdv = PerfdataValue::Parse(val);
269                                 } catch (const std::exception&) {
270                                         Log(LogWarning, "InfluxdbWriter")
271                                             << "Ignoring invalid perfdata value: " << val;
272                                         continue;
273                                 }
274                         }
275
276                         Dictionary::Ptr fields = new Dictionary();
277                         fields->Set(String("value"), pdv->GetValue());
278
279                         if (GetEnableSendThresholds()) {
280                                 if (pdv->GetCrit())
281                                         fields->Set(String("crit"), pdv->GetCrit());
282                                 if (pdv->GetWarn())
283                                         fields->Set(String("warn"), pdv->GetWarn());
284                                 if (pdv->GetMin())
285                                         fields->Set(String("min"), pdv->GetMin());
286                                 if (pdv->GetMax())
287                                         fields->Set(String("max"), pdv->GetMax());
288                         }
289
290                         SendMetric(tmpl, pdv->GetLabel(), fields, ts);
291                 }
292         }
293
294         if (GetEnableSendMetadata()) {
295                 Host::Ptr host;
296                 Service::Ptr service;
297                 boost::tie(host, service) = GetHostService(checkable);
298
299                 Dictionary::Ptr fields = new Dictionary();
300
301                 if (service)
302                         fields->Set(String("state"), FormatInteger(service->GetState()));
303                 else
304                         fields->Set(String("state"), FormatInteger(host->GetState()));
305
306                 fields->Set(String("current_attempt"), FormatInteger(checkable->GetCheckAttempt()));
307                 fields->Set(String("max_check_attempts"), FormatInteger(checkable->GetMaxCheckAttempts()));
308                 fields->Set(String("state_type"), FormatInteger(checkable->GetStateType()));
309                 fields->Set(String("reachable"), FormatBoolean(checkable->IsReachable()));
310                 fields->Set(String("downtime_depth"), FormatInteger(checkable->GetDowntimeDepth()));
311                 fields->Set(String("acknowledgement"), FormatInteger(checkable->GetAcknowledgement()));
312                 fields->Set(String("latency"), cr->CalculateLatency());
313                 fields->Set(String("execution_time"), cr->CalculateExecutionTime());
314
315                 SendMetric(tmpl, String(), fields, ts);
316         }
317 }
318
319 String InfluxdbWriter::EscapeKey(const String& str)
320 {
321         // Iterate over the key name and escape commas and spaces with a backslash
322         String result = str;
323         boost::algorithm::replace_all(result, "\"", "\\\"");
324         boost::algorithm::replace_all(result, "=", "\\=");
325         boost::algorithm::replace_all(result, ",", "\\,");
326         boost::algorithm::replace_all(result, " ", "\\ ");
327
328         // InfluxDB 'feature': although backslashes are allowed in keys they also act
329         // as escape sequences when followed by ',' or ' '.  When your tag is like
330         // 'metric=C:\' bad things happen.  Backslashes themselves cannot be escaped
331         // and through experimentation they also escape '='.  To be safe we replace
332         // trailing backslashes with and underscore.
333         size_t length = result.GetLength();
334         if (result[length - 1] == '\\')
335                 result[length - 1] = '_';
336
337         return result;
338 }
339
340 String InfluxdbWriter::EscapeField(const String& str)
341 {
342         //TODO: Evaluate whether boost::regex is really needed here.
343
344         // Handle integers
345         boost::regex integer("-?\\d+i");
346         if (boost::regex_match(str.GetData(), integer)) {
347                 return str;
348         }
349
350         // Handle numerics
351         boost::regex numeric("-?\\d+(\\.\\d+)?((e|E)[+-]?\\d+)?");
352         if (boost::regex_match(str.GetData(), numeric)) {
353                 return str;
354         }
355
356         // Handle booleans
357         boost::regex boolean_true("t|true", boost::regex::icase);
358         if (boost::regex_match(str.GetData(), boolean_true))
359                 return "true";
360         boost::regex boolean_false("f|false", boost::regex::icase);
361         if (boost::regex_match(str.GetData(), boolean_false))
362                 return "false";
363
364         // Otherwise it's a string and needs escaping and quoting
365         String result = str;
366         boost::algorithm::replace_all(result, "\"", "\\\"");
367         return "\"" + result + "\"";
368 }
369
370 void InfluxdbWriter::SendMetric(const Dictionary::Ptr& tmpl, const String& label, const Dictionary::Ptr& fields, double ts)
371 {
372         std::ostringstream msgbuf;
373         msgbuf << EscapeKey(tmpl->Get("measurement"));
374
375         Dictionary::Ptr tags = tmpl->Get("tags");
376         if (tags) {
377                 ObjectLock olock(tags);
378                 for (const Dictionary::Pair& pair : tags) {
379                         // Empty macro expansion, no tag
380                         if (!pair.second.IsEmpty()) {
381                                 msgbuf << "," << EscapeKey(pair.first) << "=" << EscapeKey(pair.second);
382                         }
383                 }
384         }
385
386         // Label is may be empty in the case of metadata
387         if (!label.IsEmpty())
388                 msgbuf << ",metric=" << EscapeKey(label);
389
390         msgbuf << " ";
391
392         {
393                 bool first = true;
394
395                 ObjectLock fieldLock(fields);
396                 for (const Dictionary::Pair& pair : fields) {
397                         if (first)
398                                 first = false;
399                         else
400                                 msgbuf << ",";
401
402                         msgbuf << EscapeKey(pair.first) << "=" << EscapeField(pair.second);
403                 }
404         }
405
406         msgbuf << " " <<  static_cast<unsigned long>(ts);
407
408         Log(LogDebug, "InfluxdbWriter")
409             << "Add to metric list: '" << msgbuf.str() << "'.";
410
411         // Atomically buffer the data point
412         boost::mutex::scoped_lock lock(m_DataBufferMutex);
413         m_DataBuffer.push_back(String(msgbuf.str()));
414
415         // Flush if we've buffered too much to prevent excessive memory use
416         if (static_cast<int>(m_DataBuffer.size()) >= GetFlushThreshold()) {
417                 Log(LogDebug, "InfluxdbWriter")
418                     << "Data buffer overflow writing " << m_DataBuffer.size() << " data points";
419                 Flush();
420         }
421 }
422
423 void InfluxdbWriter::FlushTimeout(void)
424 {
425         // Prevent new data points from being added to the array, there is a
426         // race condition where they could disappear
427         boost::mutex::scoped_lock lock(m_DataBufferMutex);
428
429         // Flush if there are any data available
430         if (m_DataBuffer.size() > 0) {
431                 Log(LogDebug, "InfluxdbWriter")
432                     << "Timer expired writing " << m_DataBuffer.size() << " data points";
433                 Flush();
434         }
435 }
436
437 void InfluxdbWriter::Flush(void)
438 {
439         // Ensure you hold a lock against m_DataBuffer so that things
440         // don't go missing after creating the body and clearing the buffer
441         String body = boost::algorithm::join(m_DataBuffer, "\n");
442         m_DataBuffer.clear();
443
444         // Asynchronously flush the metric body to InfluxDB
445         m_WorkQueue.Enqueue(boost::bind(&InfluxdbWriter::FlushHandler, this, body));
446 }
447
448 void InfluxdbWriter::FlushHandler(const String& body)
449 {
450         AssertOnWorkQueue();
451
452         TcpSocket::Ptr socket;
453         Stream::Ptr stream = Connect(socket);
454
455         if (!stream)
456                 return;
457
458         IncreaseTaskCount();
459
460         Url::Ptr url = new Url();
461         url->SetScheme(GetSslEnable() ? "https" : "http");
462         url->SetHost(GetHost());
463         url->SetPort(GetPort());
464
465         std::vector<String> path;
466         path.push_back("write");
467         url->SetPath(path);
468
469         url->AddQueryElement("db", GetDatabase());
470         url->AddQueryElement("precision", "s");
471         if (!GetUsername().IsEmpty())
472                 url->AddQueryElement("u", GetUsername());
473         if (!GetPassword().IsEmpty())
474                 url->AddQueryElement("p", GetPassword());
475
476         HttpRequest req(stream);
477         req.RequestMethod = "POST";
478         req.RequestUrl = url;
479
480         try {
481                 req.WriteBody(body.CStr(), body.GetLength());
482                 req.Finish();
483         } catch (const std::exception& ex) {
484                 Log(LogWarning, "InfluxdbWriter")
485                     << "Cannot write to TCP socket on host '" << GetHost() << "' port '" << GetPort() << "'.";
486                 throw ex;
487         }
488
489         //TODO: Evaluate whether waiting for the result makes sense here. KeepAlive and close are options.
490         HttpResponse resp(stream, req);
491         StreamReadContext context;
492
493         struct timeval timeout = { GetSocketTimeout(), 0 };
494
495         if (!socket->Poll(true, false, &timeout)) {
496                 Log(LogWarning, "InfluxdbWriter")
497                     << "Response timeout of TCP socket from host '" << GetHost() << "' port '" << GetPort() << "'.";
498                 return;
499         }
500
501         try {
502                 resp.Parse(context, true);
503         } catch (const std::exception& ex) {
504                 Log(LogWarning, "InfluxdbWriter")
505                     << "Cannot read from TCP socket from host '" << GetHost() << "' port '" << GetPort() << "'.";
506                 throw ex;
507         }
508
509         if (resp.StatusCode != 204) {
510                 Log(LogWarning, "InfluxdbWriter")
511                     << "Unexpected response code " << resp.StatusCode;
512
513                 // Finish parsing the headers and body
514                 while (!resp.Complete)
515                         resp.Parse(context, true);
516
517                 String contentType = resp.Headers->Get("content-type");
518                 if (contentType != "application/json") {
519                         Log(LogWarning, "InfluxdbWriter")
520                             << "Unexpected Content-Type: " << contentType;
521                         return;
522                 }
523
524                 size_t responseSize = resp.GetBodySize();
525                 boost::scoped_array<char> buffer(new char[responseSize + 1]);
526                 resp.ReadBody(buffer.get(), responseSize);
527                 buffer.get()[responseSize] = '\0';
528
529                 Dictionary::Ptr jsonResponse;
530                 try {
531                         jsonResponse = JsonDecode(buffer.get());
532                 } catch (...) {
533                         Log(LogWarning, "InfluxdbWriter")
534                             << "Unable to parse JSON response:\n" << buffer.get();
535                         return;
536                 }
537
538                 String error = jsonResponse->Get("error");
539
540                 Log(LogCritical, "InfluxdbWriter")
541                     << "InfluxDB error message:\n" << error;
542         }
543 }
544
545 void InfluxdbWriter::IncreaseTaskCount(void)
546 {
547         double now = Utility::GetTime();
548
549         boost::mutex::scoped_lock lock(m_StatsMutex);
550         m_TaskStats.InsertValue(now, 1);
551 }
552
553 int InfluxdbWriter::GetTaskCount(RingBuffer::SizeType span) const
554 {
555         boost::mutex::scoped_lock lock(m_StatsMutex);
556         return m_TaskStats.GetValues(span);
557 }
558
559 void InfluxdbWriter::ValidateHostTemplate(const Dictionary::Ptr& value, const ValidationUtils& utils)
560 {
561         ObjectImpl<InfluxdbWriter>::ValidateHostTemplate(value, utils);
562
563         String measurement = value->Get("measurement");
564         if (!MacroProcessor::ValidateMacroString(measurement))
565                 BOOST_THROW_EXCEPTION(ValidationError(this, boost::assign::list_of("host_template")("measurement"), "Closing $ not found in macro format string '" + measurement + "'."));
566
567         Dictionary::Ptr tags = value->Get("tags");
568         if (tags) {
569                 ObjectLock olock(tags);
570                 for (const Dictionary::Pair& pair : tags) {
571                         if (!MacroProcessor::ValidateMacroString(pair.second))
572                                 BOOST_THROW_EXCEPTION(ValidationError(this, boost::assign::list_of<String>("host_template")("tags")(pair.first), "Closing $ not found in macro format string '" + pair.second));
573                 }
574         }
575 }
576
577 void InfluxdbWriter::ValidateServiceTemplate(const Dictionary::Ptr& value, const ValidationUtils& utils)
578 {
579         ObjectImpl<InfluxdbWriter>::ValidateServiceTemplate(value, utils);
580
581         String measurement = value->Get("measurement");
582         if (!MacroProcessor::ValidateMacroString(measurement))
583                 BOOST_THROW_EXCEPTION(ValidationError(this, boost::assign::list_of("service_template")("measurement"), "Closing $ not found in macro format string '" + measurement + "'."));
584
585         Dictionary::Ptr tags = value->Get("tags");
586         if (tags) {
587                 ObjectLock olock(tags);
588                 for (const Dictionary::Pair& pair : tags) {
589                         if (!MacroProcessor::ValidateMacroString(pair.second))
590                                 BOOST_THROW_EXCEPTION(ValidationError(this, boost::assign::list_of<String>("service_template")("tags")(pair.first), "Closing $ not found in macro format string '" + pair.second));
591                 }
592         }
593 }
594