]> granicus.if.org Git - icinga2/blob - lib/perfdata/influxdbwriter.cpp
Remove unused includes
[icinga2] / lib / perfdata / influxdbwriter.cpp
1 /******************************************************************************
2  * Icinga 2                                                                   *
3  * Copyright (C) 2012-2018 Icinga Development Team (https://www.icinga.com/)  *
4  *                                                                            *
5  * This program is free software; you can redistribute it and/or              *
6  * modify it under the terms of the GNU General Public License                *
7  * as published by the Free Software Foundation; either version 2             *
8  * of the License, or (at your option) any later version.                     *
9  *                                                                            *
10  * This program is distributed in the hope that it will be useful,            *
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of             *
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the              *
13  * GNU General Public License for more details.                               *
14  *                                                                            *
15  * You should have received a copy of the GNU General Public License          *
16  * along with this program; if not, write to the Free Software Foundation     *
17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA.             *
18  ******************************************************************************/
19
20 #include "perfdata/influxdbwriter.hpp"
21 #include "perfdata/influxdbwriter-ti.cpp"
22 #include "remote/url.hpp"
23 #include "remote/httprequest.hpp"
24 #include "remote/httpresponse.hpp"
25 #include "icinga/service.hpp"
26 #include "icinga/macroprocessor.hpp"
27 #include "icinga/icingaapplication.hpp"
28 #include "icinga/checkcommand.hpp"
29 #include "base/tcpsocket.hpp"
30 #include "base/configtype.hpp"
31 #include "base/objectlock.hpp"
32 #include "base/logger.hpp"
33 #include "base/convert.hpp"
34 #include "base/utility.hpp"
35 #include "base/perfdatavalue.hpp"
36 #include "base/stream.hpp"
37 #include "base/json.hpp"
38 #include "base/networkstream.hpp"
39 #include "base/exception.hpp"
40 #include "base/statsfunction.hpp"
41 #include "base/tlsutility.hpp"
42 #include <boost/algorithm/string.hpp>
43 #include <boost/algorithm/string/replace.hpp>
44 #include <boost/math/special_functions/fpclassify.hpp>
45 #include <boost/regex.hpp>
46 #include <boost/scoped_array.hpp>
47 #include <utility>
48
49 using namespace icinga;
50
51 class InfluxdbInteger final : public Object
52 {
53 public:
54         DECLARE_PTR_TYPEDEFS(InfluxdbInteger);
55
56         InfluxdbInteger(int value)
57                 : m_Value(value)
58         { }
59
60         int GetValue() const
61         {
62                 return m_Value;
63         }
64
65 private:
66         int m_Value;
67 };
68
69 REGISTER_TYPE(InfluxdbWriter);
70
71 REGISTER_STATSFUNCTION(InfluxdbWriter, &InfluxdbWriter::StatsFunc);
72
73 void InfluxdbWriter::OnConfigLoaded()
74 {
75         ObjectImpl<InfluxdbWriter>::OnConfigLoaded();
76
77         m_WorkQueue.SetName("InfluxdbWriter, " + GetName());
78 }
79
80 void InfluxdbWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata)
81 {
82         DictionaryData nodes;
83
84         for (const InfluxdbWriter::Ptr& influxdbwriter : ConfigType::GetObjectsByType<InfluxdbWriter>()) {
85                 size_t workQueueItems = influxdbwriter->m_WorkQueue.GetLength();
86                 double workQueueItemRate = influxdbwriter->m_WorkQueue.GetTaskCount(60) / 60.0;
87                 size_t dataBufferItems = influxdbwriter->m_DataBuffer.size();
88
89                 nodes.emplace_back(influxdbwriter->GetName(), new Dictionary({
90                         { "work_queue_items", workQueueItems },
91                         { "work_queue_item_rate", workQueueItemRate },
92                         { "data_buffer_items", dataBufferItems }
93                 }));
94
95                 perfdata->Add(new PerfdataValue("influxdbwriter_" + influxdbwriter->GetName() + "_work_queue_items", workQueueItems));
96                 perfdata->Add(new PerfdataValue("influxdbwriter_" + influxdbwriter->GetName() + "_work_queue_item_rate", workQueueItemRate));
97                 perfdata->Add(new PerfdataValue("influxdbwriter_" + influxdbwriter->GetName() + "_data_queue_items", dataBufferItems));
98         }
99
100         status->Set("influxdbwriter", new Dictionary(std::move(nodes)));
101 }
102
103 void InfluxdbWriter::Start(bool runtimeCreated)
104 {
105         ObjectImpl<InfluxdbWriter>::Start(runtimeCreated);
106
107         Log(LogInformation, "InfluxdbWriter")
108                 << "'" << GetName() << "' started.";
109
110         /* Register exception handler for WQ tasks. */
111         m_WorkQueue.SetExceptionCallback(std::bind(&InfluxdbWriter::ExceptionHandler, this, _1));
112
113         /* Setup timer for periodically flushing m_DataBuffer */
114         m_FlushTimer = new Timer();
115         m_FlushTimer->SetInterval(GetFlushInterval());
116         m_FlushTimer->OnTimerExpired.connect(std::bind(&InfluxdbWriter::FlushTimeout, this));
117         m_FlushTimer->Start();
118         m_FlushTimer->Reschedule(0);
119
120         /* Register for new metrics. */
121         Checkable::OnNewCheckResult.connect(std::bind(&InfluxdbWriter::CheckResultHandler, this, _1, _2));
122 }
123
124 void InfluxdbWriter::Stop(bool runtimeRemoved)
125 {
126         Log(LogInformation, "InfluxdbWriter")
127                 << "'" << GetName() << "' stopped.";
128
129         m_WorkQueue.Join();
130
131         ObjectImpl<InfluxdbWriter>::Stop(runtimeRemoved);
132 }
133
134 void InfluxdbWriter::AssertOnWorkQueue()
135 {
136         ASSERT(m_WorkQueue.IsWorkerThread());
137 }
138
139 void InfluxdbWriter::ExceptionHandler(boost::exception_ptr exp)
140 {
141         Log(LogCritical, "InfluxdbWriter", "Exception during InfluxDB operation: Verify that your backend is operational!");
142
143         Log(LogDebug, "InfluxdbWriter")
144                 << "Exception during InfluxDB operation: " << DiagnosticInformation(std::move(exp));
145
146         //TODO: Close the connection, if we keep it open.
147 }
148
149 Stream::Ptr InfluxdbWriter::Connect()
150 {
151         TcpSocket::Ptr socket = new TcpSocket();
152
153         Log(LogNotice, "InfluxdbWriter")
154                 << "Reconnecting to InfluxDB on host '" << GetHost() << "' port '" << GetPort() << "'.";
155
156         try {
157                 socket->Connect(GetHost(), GetPort());
158         } catch (const std::exception& ex) {
159                 Log(LogWarning, "InfluxdbWriter")
160                         << "Can't connect to InfluxDB on host '" << GetHost() << "' port '" << GetPort() << "'.";
161                 throw ex;
162         }
163
164         if (GetSslEnable()) {
165                 std::shared_ptr<SSL_CTX> sslContext;
166                 try {
167                         sslContext = MakeSSLContext(GetSslCert(), GetSslKey(), GetSslCaCert());
168                 } catch (const std::exception& ex) {
169                         Log(LogWarning, "InfluxdbWriter")
170                                 << "Unable to create SSL context.";
171                         throw ex;
172                 }
173
174                 TlsStream::Ptr tlsStream = new TlsStream(socket, GetHost(), RoleClient, sslContext);
175                 try {
176                         tlsStream->Handshake();
177                 } catch (const std::exception& ex) {
178                         Log(LogWarning, "InfluxdbWriter")
179                                 << "TLS handshake with host '" << GetHost() << "' failed.";
180                         throw ex;
181                 }
182
183                 return tlsStream;
184         } else {
185                 return new NetworkStream(socket);
186         }
187 }
188
189 void InfluxdbWriter::CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
190 {
191         m_WorkQueue.Enqueue(std::bind(&InfluxdbWriter::CheckResultHandlerWQ, this, checkable, cr), PriorityLow);
192 }
193
194 void InfluxdbWriter::CheckResultHandlerWQ(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
195 {
196         AssertOnWorkQueue();
197
198         CONTEXT("Processing check result for '" + checkable->GetName() + "'");
199
200         if (!IcingaApplication::GetInstance()->GetEnablePerfdata() || !checkable->GetEnablePerfdata())
201                 return;
202
203         Host::Ptr host;
204         Service::Ptr service;
205         tie(host, service) = GetHostService(checkable);
206
207         MacroProcessor::ResolverList resolvers;
208         if (service)
209                 resolvers.emplace_back("service", service);
210         resolvers.emplace_back("host", host);
211         resolvers.emplace_back("icinga", IcingaApplication::GetInstance());
212
213         String prefix;
214
215         double ts = cr->GetExecutionEnd();
216
217         // Clone the template and perform an in-place macro expansion of measurement and tag values
218         Dictionary::Ptr tmpl_clean = service ? GetServiceTemplate() : GetHostTemplate();
219         Dictionary::Ptr tmpl = static_pointer_cast<Dictionary>(tmpl_clean->Clone());
220         tmpl->Set("measurement", MacroProcessor::ResolveMacros(tmpl->Get("measurement"), resolvers, cr));
221
222         Dictionary::Ptr tags = tmpl->Get("tags");
223         if (tags) {
224                 ObjectLock olock(tags);
225                 for (const Dictionary::Pair& pair : tags) {
226                         String missing_macro;
227                         Value value = MacroProcessor::ResolveMacros(pair.second, resolvers, cr, &missing_macro);
228
229                         if (!missing_macro.IsEmpty())
230                                 continue;
231
232                         tags->Set(pair.first, value);
233                 }
234         }
235
236         Array::Ptr perfdata = cr->GetPerformanceData();
237         if (perfdata) {
238                 ObjectLock olock(perfdata);
239                 for (const Value& val : perfdata) {
240                         PerfdataValue::Ptr pdv;
241
242                         if (val.IsObjectType<PerfdataValue>())
243                                 pdv = val;
244                         else {
245                                 try {
246                                         pdv = PerfdataValue::Parse(val);
247                                 } catch (const std::exception&) {
248                                         Log(LogWarning, "InfluxdbWriter")
249                                                 << "Ignoring invalid perfdata value: " << val;
250                                         continue;
251                                 }
252                         }
253
254                         Dictionary::Ptr fields = new Dictionary();
255                         fields->Set("value", pdv->GetValue());
256
257                         if (GetEnableSendThresholds()) {
258                                 if (pdv->GetCrit())
259                                         fields->Set("crit", pdv->GetCrit());
260                                 if (pdv->GetWarn())
261                                         fields->Set("warn", pdv->GetWarn());
262                                 if (pdv->GetMin())
263                                         fields->Set("min", pdv->GetMin());
264                                 if (pdv->GetMax())
265                                         fields->Set("max", pdv->GetMax());
266                         }
267                         if (!pdv->GetUnit().IsEmpty()) {
268                                 fields->Set("unit", pdv->GetUnit());
269                         }
270
271                         SendMetric(tmpl, pdv->GetLabel(), fields, ts);
272                 }
273         }
274
275         if (GetEnableSendMetadata()) {
276                 Host::Ptr host;
277                 Service::Ptr service;
278                 tie(host, service) = GetHostService(checkable);
279
280                 Dictionary::Ptr fields = new Dictionary();
281
282                 if (service)
283                         fields->Set("state", new InfluxdbInteger(service->GetState()));
284                 else
285                         fields->Set("state", new InfluxdbInteger(host->GetState()));
286
287                 fields->Set("current_attempt", new InfluxdbInteger(checkable->GetCheckAttempt()));
288                 fields->Set("max_check_attempts", new InfluxdbInteger(checkable->GetMaxCheckAttempts()));
289                 fields->Set("state_type", new InfluxdbInteger(checkable->GetStateType()));
290                 fields->Set("reachable", checkable->IsReachable());
291                 fields->Set("downtime_depth", new InfluxdbInteger(checkable->GetDowntimeDepth()));
292                 fields->Set("acknowledgement", new InfluxdbInteger(checkable->GetAcknowledgement()));
293                 fields->Set("latency", cr->CalculateLatency());
294                 fields->Set("execution_time", cr->CalculateExecutionTime());
295
296                 SendMetric(tmpl, Empty, fields, ts);
297         }
298 }
299
300 String InfluxdbWriter::EscapeKeyOrTagValue(const String& str)
301 {
302         // Iterate over the key name and escape commas and spaces with a backslash
303         String result = str;
304         boost::algorithm::replace_all(result, "\"", "\\\"");
305         boost::algorithm::replace_all(result, "=", "\\=");
306         boost::algorithm::replace_all(result, ",", "\\,");
307         boost::algorithm::replace_all(result, " ", "\\ ");
308         return result;
309 }
310
311 String InfluxdbWriter::EscapeValue(const Value& value)
312 {
313         if (value.IsObjectType<InfluxdbInteger>()) {
314                 std::ostringstream os;
315                 os << static_cast<InfluxdbInteger::Ptr>(value)->GetValue() << "i";
316                 return os.str();
317         }
318
319         if (value.IsBoolean())
320                 return value ? "true" : "false";
321
322         if (value.IsString())
323                 return "\"" + EscapeKeyOrTagValue(value) + "\"";
324
325         return value;
326 }
327
328 void InfluxdbWriter::SendMetric(const Dictionary::Ptr& tmpl, const String& label, const Dictionary::Ptr& fields, double ts)
329 {
330         std::ostringstream msgbuf;
331         msgbuf << EscapeKeyOrTagValue(tmpl->Get("measurement"));
332
333         Dictionary::Ptr tags = tmpl->Get("tags");
334         if (tags) {
335                 ObjectLock olock(tags);
336                 for (const Dictionary::Pair& pair : tags) {
337                         // Empty macro expansion, no tag
338                         if (!pair.second.IsEmpty()) {
339                                 msgbuf << "," << EscapeKeyOrTagValue(pair.first) << "=" << EscapeKeyOrTagValue(pair.second);
340                         }
341                 }
342         }
343
344         // Label may be empty in the case of metadata
345         if (!label.IsEmpty())
346                 msgbuf << ",metric=" << EscapeKeyOrTagValue(label);
347
348         msgbuf << " ";
349
350         {
351                 bool first = true;
352
353                 ObjectLock fieldLock(fields);
354                 for (const Dictionary::Pair& pair : fields) {
355                         if (first)
356                                 first = false;
357                         else
358                                 msgbuf << ",";
359
360                         msgbuf << EscapeKeyOrTagValue(pair.first) << "=" << EscapeValue(pair.second);
361                 }
362         }
363
364         msgbuf << " " <<  static_cast<unsigned long>(ts);
365
366 #ifdef I2_DEBUG
367         Log(LogDebug, "InfluxdbWriter")
368                 << "Add to metric list: '" << msgbuf.str() << "'.";
369 #endif /* I2_DEBUG */
370
371         // Buffer the data point
372         m_DataBuffer.emplace_back(msgbuf.str());
373
374         // Flush if we've buffered too much to prevent excessive memory use
375         if (static_cast<int>(m_DataBuffer.size()) >= GetFlushThreshold()) {
376                 Log(LogDebug, "InfluxdbWriter")
377                         << "Data buffer overflow writing " << m_DataBuffer.size() << " data points";
378
379                 try {
380                         Flush();
381                 } catch (...) {
382                         /* Do nothing. */
383                 }
384         }
385 }
386
387 void InfluxdbWriter::FlushTimeout()
388 {
389         m_WorkQueue.Enqueue(boost::bind(&InfluxdbWriter::FlushTimeoutWQ, this), PriorityHigh);
390 }
391
392 void InfluxdbWriter::FlushTimeoutWQ()
393 {
394         AssertOnWorkQueue();
395
396         // Flush if there are any data available
397         if (m_DataBuffer.empty())
398                 return;
399
400         Log(LogDebug, "InfluxdbWriter")
401                 << "Timer expired writing " << m_DataBuffer.size() << " data points";
402
403         Flush();
404 }
405
406 void InfluxdbWriter::Flush()
407 {
408         String body = boost::algorithm::join(m_DataBuffer, "\n");
409         m_DataBuffer.clear();
410
411         Stream::Ptr stream = Connect();
412
413         if (!stream)
414                 return;
415
416         Url::Ptr url = new Url();
417         url->SetScheme(GetSslEnable() ? "https" : "http");
418         url->SetHost(GetHost());
419         url->SetPort(GetPort());
420
421         std::vector<String> path;
422         path.emplace_back("write");
423         url->SetPath(path);
424
425         url->AddQueryElement("db", GetDatabase());
426         url->AddQueryElement("precision", "s");
427         if (!GetUsername().IsEmpty())
428                 url->AddQueryElement("u", GetUsername());
429         if (!GetPassword().IsEmpty())
430                 url->AddQueryElement("p", GetPassword());
431
432         HttpRequest req(stream);
433         req.RequestMethod = "POST";
434         req.RequestUrl = url;
435
436         try {
437                 req.WriteBody(body.CStr(), body.GetLength());
438                 req.Finish();
439         } catch (const std::exception& ex) {
440                 Log(LogWarning, "InfluxdbWriter")
441                         << "Cannot write to TCP socket on host '" << GetHost() << "' port '" << GetPort() << "'.";
442                 throw ex;
443         }
444
445         HttpResponse resp(stream, req);
446         StreamReadContext context;
447
448         try {
449                 while (resp.Parse(context, true) && !resp.Complete)
450                         ; /* Do nothing */
451         } catch (const std::exception& ex) {
452                 Log(LogWarning, "InfluxdbWriter")
453                         << "Failed to parse HTTP response from host '" << GetHost() << "' port '" << GetPort() << "': " << DiagnosticInformation(ex);
454                 throw ex;
455         }
456
457         if (!resp.Complete) {
458                 Log(LogWarning, "InfluxdbWriter")
459                         << "Failed to read a complete HTTP response from the InfluxDB server.";
460                 return;
461         }
462
463         if (resp.StatusCode != 204) {
464                 Log(LogWarning, "InfluxdbWriter")
465                         << "Unexpected response code: " << resp.StatusCode;
466
467                 String contentType = resp.Headers->Get("content-type");
468                 if (contentType != "application/json") {
469                         Log(LogWarning, "InfluxdbWriter")
470                                 << "Unexpected Content-Type: " << contentType;
471                         return;
472                 }
473
474                 size_t responseSize = resp.GetBodySize();
475                 boost::scoped_array<char> buffer(new char[responseSize + 1]);
476                 resp.ReadBody(buffer.get(), responseSize);
477                 buffer.get()[responseSize] = '\0';
478
479                 Dictionary::Ptr jsonResponse;
480                 try {
481                         jsonResponse = JsonDecode(buffer.get());
482                 } catch (...) {
483                         Log(LogWarning, "InfluxdbWriter")
484                                 << "Unable to parse JSON response:\n" << buffer.get();
485                         return;
486                 }
487
488                 String error = jsonResponse->Get("error");
489
490                 Log(LogCritical, "InfluxdbWriter")
491                         << "InfluxDB error message:\n" << error;
492
493                 return;
494         }
495 }
496
497 void InfluxdbWriter::ValidateHostTemplate(const Lazy<Dictionary::Ptr>& lvalue, const ValidationUtils& utils)
498 {
499         ObjectImpl<InfluxdbWriter>::ValidateHostTemplate(lvalue, utils);
500
501         String measurement = lvalue()->Get("measurement");
502         if (!MacroProcessor::ValidateMacroString(measurement))
503                 BOOST_THROW_EXCEPTION(ValidationError(this, { "host_template", "measurement" }, "Closing $ not found in macro format string '" + measurement + "'."));
504
505         Dictionary::Ptr tags = lvalue()->Get("tags");
506         if (tags) {
507                 ObjectLock olock(tags);
508                 for (const Dictionary::Pair& pair : tags) {
509                         if (!MacroProcessor::ValidateMacroString(pair.second))
510                                 BOOST_THROW_EXCEPTION(ValidationError(this, { "host_template", "tags", pair.first }, "Closing $ not found in macro format string '" + pair.second));
511                 }
512         }
513 }
514
515 void InfluxdbWriter::ValidateServiceTemplate(const Lazy<Dictionary::Ptr>& lvalue, const ValidationUtils& utils)
516 {
517         ObjectImpl<InfluxdbWriter>::ValidateServiceTemplate(lvalue, utils);
518
519         String measurement = lvalue()->Get("measurement");
520         if (!MacroProcessor::ValidateMacroString(measurement))
521                 BOOST_THROW_EXCEPTION(ValidationError(this, { "service_template", "measurement" }, "Closing $ not found in macro format string '" + measurement + "'."));
522
523         Dictionary::Ptr tags = lvalue()->Get("tags");
524         if (tags) {
525                 ObjectLock olock(tags);
526                 for (const Dictionary::Pair& pair : tags) {
527                         if (!MacroProcessor::ValidateMacroString(pair.second))
528                                 BOOST_THROW_EXCEPTION(ValidationError(this, { "service_template", "tags", pair.first }, "Closing $ not found in macro format string '" + pair.second));
529                 }
530         }
531 }
532