1 /******************************************************************************
3 * Copyright (C) 2012-2017 Icinga Development Team (https://www.icinga.com/) *
5 * This program is free software; you can redistribute it and/or *
6 * modify it under the terms of the GNU General Public License *
7 * as published by the Free Software Foundation; either version 2 *
8 * of the License, or (at your option) any later version. *
10 * This program is distributed in the hope that it will be useful, *
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
13 * GNU General Public License for more details. *
15 * You should have received a copy of the GNU General Public License *
16 * along with this program; if not, write to the Free Software Foundation *
17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
18 ******************************************************************************/
20 #include "perfdata/influxdbwriter.hpp"
21 #include "perfdata/influxdbwriter.tcpp"
22 #include "remote/url.hpp"
23 #include "remote/httprequest.hpp"
24 #include "remote/httpresponse.hpp"
25 #include "icinga/service.hpp"
26 #include "icinga/macroprocessor.hpp"
27 #include "icinga/icingaapplication.hpp"
28 #include "icinga/checkcommand.hpp"
29 #include "base/tcpsocket.hpp"
30 #include "base/configtype.hpp"
31 #include "base/objectlock.hpp"
32 #include "base/logger.hpp"
33 #include "base/convert.hpp"
34 #include "base/utility.hpp"
35 #include "base/perfdatavalue.hpp"
36 #include "base/stream.hpp"
37 #include "base/json.hpp"
38 #include "base/networkstream.hpp"
39 #include "base/exception.hpp"
40 #include "base/statsfunction.hpp"
41 #include "base/tlsutility.hpp"
42 #include <boost/algorithm/string.hpp>
43 #include <boost/algorithm/string/classification.hpp>
44 #include <boost/algorithm/string/split.hpp>
45 #include <boost/algorithm/string/replace.hpp>
46 #include <boost/regex.hpp>
47 #include <boost/scoped_array.hpp>
49 using namespace icinga;
51 REGISTER_TYPE(InfluxdbWriter);
53 REGISTER_STATSFUNCTION(InfluxdbWriter, &InfluxdbWriter::StatsFunc);
55 //TODO: Evaluate whether multiple WQ threads and InfluxDB connections are possible. 10 threads will hog InfluxDB in large scale environments.
56 InfluxdbWriter::InfluxdbWriter(void)
57 : m_WorkQueue(10000000, 1), m_TaskStats(15 * 60), m_PendingTasks(0), m_PendingTasksTimestamp(0)
60 void InfluxdbWriter::OnConfigLoaded(void)
62 ObjectImpl<InfluxdbWriter>::OnConfigLoaded();
64 m_WorkQueue.SetName("InfluxdbWriter, " + GetName());
67 void InfluxdbWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr&)
69 Dictionary::Ptr nodes = new Dictionary();
71 for (const InfluxdbWriter::Ptr& influxdbwriter : ConfigType::GetObjectsByType<InfluxdbWriter>()) {
72 size_t workQueueItems = influxdbwriter->m_WorkQueue.GetLength();
73 size_t dataBufferItems = influxdbwriter->m_DataBuffer.size();
75 //TODO: Collect more stats
76 Dictionary::Ptr stats = new Dictionary();
77 stats->Set("work_queue_items", workQueueItems);
78 stats->Set("data_buffer_items", dataBufferItems);
80 nodes->Set(influxdbwriter->GetName(), stats);
83 status->Set("influxdbwriter", nodes);
86 void InfluxdbWriter::Start(bool runtimeCreated)
88 ObjectImpl<InfluxdbWriter>::Start(runtimeCreated);
90 Log(LogInformation, "InfluxdbWriter")
91 << "'" << GetName() << "' started.";
93 /* Register exception handler for WQ tasks. */
94 m_WorkQueue.SetExceptionCallback(boost::bind(&InfluxdbWriter::ExceptionHandler, this, _1));
96 /* Setup timer for periodically flushing m_DataBuffer */
97 m_FlushTimer = new Timer();
98 m_FlushTimer->SetInterval(GetFlushInterval());
99 m_FlushTimer->OnTimerExpired.connect(boost::bind(&InfluxdbWriter::FlushTimeout, this));
100 m_FlushTimer->Start();
101 m_FlushTimer->Reschedule(0);
103 /* Timer for updating and logging work queue stats */
104 m_StatsLoggerTimer = new Timer();
105 m_StatsLoggerTimer->SetInterval(60); // don't be too noisy
106 m_StatsLoggerTimer->OnTimerExpired.connect(boost::bind(&InfluxdbWriter::StatsLoggerTimerHandler, this));
107 m_StatsLoggerTimer->Start();
109 /* Register for new metrics. */
110 Service::OnNewCheckResult.connect(boost::bind(&InfluxdbWriter::CheckResultHandler, this, _1, _2));
113 void InfluxdbWriter::Stop(bool runtimeRemoved)
115 Log(LogInformation, "InfluxdbWriter")
116 << "'" << GetName() << "' stopped.";
120 ObjectImpl<InfluxdbWriter>::Stop(runtimeRemoved);
123 void InfluxdbWriter::AssertOnWorkQueue(void)
125 ASSERT(m_WorkQueue.IsWorkerThread());
128 void InfluxdbWriter::ExceptionHandler(boost::exception_ptr exp)
130 Log(LogCritical, "InfluxdbWriter", "Exception during InfluxDB operation: Verify that your backend is operational!");
132 Log(LogDebug, "InfluxdbWriter")
133 << "Exception during InfluxDB operation: " << DiagnosticInformation(exp);
135 //TODO: Close the connection, if we keep it open.
138 void InfluxdbWriter::StatsLoggerTimerHandler(void)
140 int pending = m_WorkQueue.GetLength();
142 double now = Utility::GetTime();
143 double gradient = (pending - m_PendingTasks) / (now - m_PendingTasksTimestamp);
144 double timeToZero = pending / gradient;
148 if (pending > GetTaskCount(5)) {
149 timeInfo = " empty in ";
151 timeInfo += "infinite time, your backend isn't able to keep up";
153 timeInfo += Utility::FormatDuration(timeToZero);
156 m_PendingTasks = pending;
157 m_PendingTasksTimestamp = now;
159 Log(LogInformation, "InfluxdbWriter")
160 << "Work queue items: " << pending
161 << ", rate: " << std::setw(2) << GetTaskCount(60) / 60.0 << "/s"
162 << " (" << GetTaskCount(60) << "/min " << GetTaskCount(60 * 5) << "/5min " << GetTaskCount(60 * 15) << "/15min);"
166 Stream::Ptr InfluxdbWriter::Connect(TcpSocket::Ptr& socket)
168 socket = new TcpSocket();
170 Log(LogNotice, "InfluxdbWriter")
171 << "Reconnecting to InfluxDB on host '" << GetHost() << "' port '" << GetPort() << "'.";
174 socket->Connect(GetHost(), GetPort());
175 } catch (const std::exception& ex) {
176 Log(LogWarning, "InfluxdbWriter")
177 << "Can't connect to InfluxDB on host '" << GetHost() << "' port '" << GetPort() << "'.";
181 if (GetSslEnable()) {
182 boost::shared_ptr<SSL_CTX> sslContext;
184 sslContext = MakeSSLContext(GetSslCert(), GetSslKey(), GetSslCaCert());
185 } catch (const std::exception& ex) {
186 Log(LogWarning, "InfluxdbWriter")
187 << "Unable to create SSL context.";
191 TlsStream::Ptr tlsStream = new TlsStream(socket, GetHost(), RoleClient, sslContext);
193 tlsStream->Handshake();
194 } catch (const std::exception& ex) {
195 Log(LogWarning, "InfluxdbWriter")
196 << "TLS handshake with host '" << GetHost() << "' failed.";
202 return new NetworkStream(socket);
206 void InfluxdbWriter::CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
208 CONTEXT("Processing check result for '" + checkable->GetName() + "'");
210 if (!IcingaApplication::GetInstance()->GetEnablePerfdata() || !checkable->GetEnablePerfdata())
214 Service::Ptr service;
215 boost::tie(host, service) = GetHostService(checkable);
217 MacroProcessor::ResolverList resolvers;
219 resolvers.push_back(std::make_pair("service", service));
220 resolvers.push_back(std::make_pair("host", host));
221 resolvers.push_back(std::make_pair("icinga", IcingaApplication::GetInstance()));
225 double ts = cr->GetExecutionEnd();
227 // Clone the template and perform an in-place macro expansion of measurement and tag values
228 Dictionary::Ptr tmpl_clean = service ? GetServiceTemplate() : GetHostTemplate();
229 Dictionary::Ptr tmpl = static_pointer_cast<Dictionary>(tmpl_clean->Clone());
230 tmpl->Set("measurement", MacroProcessor::ResolveMacros(tmpl->Get("measurement"), resolvers, cr));
232 Dictionary::Ptr tags = tmpl->Get("tags");
234 ObjectLock olock(tags);
235 for (const Dictionary::Pair& pair : tags) {
236 // Prevent missing macros from warning; will return an empty value
237 // which will be filtered out in SendMetric()
238 String missing_macro;
239 tags->Set(pair.first, MacroProcessor::ResolveMacros(pair.second, resolvers, cr, &missing_macro));
243 SendPerfdata(tmpl, checkable, cr, ts);
246 String InfluxdbWriter::FormatInteger(int val)
248 return Convert::ToString(val) + "i";
251 String InfluxdbWriter::FormatBoolean(bool val)
256 void InfluxdbWriter::SendPerfdata(const Dictionary::Ptr& tmpl, const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, double ts)
258 Array::Ptr perfdata = cr->GetPerformanceData();
260 ObjectLock olock(perfdata);
261 for (const Value& val : perfdata) {
262 PerfdataValue::Ptr pdv;
264 if (val.IsObjectType<PerfdataValue>())
268 pdv = PerfdataValue::Parse(val);
269 } catch (const std::exception&) {
270 Log(LogWarning, "InfluxdbWriter")
271 << "Ignoring invalid perfdata value: " << val;
276 Dictionary::Ptr fields = new Dictionary();
277 fields->Set(String("value"), pdv->GetValue());
279 if (GetEnableSendThresholds()) {
281 fields->Set(String("crit"), pdv->GetCrit());
283 fields->Set(String("warn"), pdv->GetWarn());
285 fields->Set(String("min"), pdv->GetMin());
287 fields->Set(String("max"), pdv->GetMax());
290 SendMetric(tmpl, pdv->GetLabel(), fields, ts);
294 if (GetEnableSendMetadata()) {
296 Service::Ptr service;
297 boost::tie(host, service) = GetHostService(checkable);
299 Dictionary::Ptr fields = new Dictionary();
302 fields->Set(String("state"), FormatInteger(service->GetState()));
304 fields->Set(String("state"), FormatInteger(host->GetState()));
306 fields->Set(String("current_attempt"), FormatInteger(checkable->GetCheckAttempt()));
307 fields->Set(String("max_check_attempts"), FormatInteger(checkable->GetMaxCheckAttempts()));
308 fields->Set(String("state_type"), FormatInteger(checkable->GetStateType()));
309 fields->Set(String("reachable"), FormatBoolean(checkable->IsReachable()));
310 fields->Set(String("downtime_depth"), FormatInteger(checkable->GetDowntimeDepth()));
311 fields->Set(String("acknowledgement"), FormatInteger(checkable->GetAcknowledgement()));
312 fields->Set(String("latency"), cr->CalculateLatency());
313 fields->Set(String("execution_time"), cr->CalculateExecutionTime());
315 SendMetric(tmpl, String(), fields, ts);
319 String InfluxdbWriter::EscapeKey(const String& str)
321 // Iterate over the key name and escape commas and spaces with a backslash
323 boost::algorithm::replace_all(result, "\"", "\\\"");
324 boost::algorithm::replace_all(result, "=", "\\=");
325 boost::algorithm::replace_all(result, ",", "\\,");
326 boost::algorithm::replace_all(result, " ", "\\ ");
328 // InfluxDB 'feature': although backslashes are allowed in keys they also act
329 // as escape sequences when followed by ',' or ' '. When your tag is like
330 // 'metric=C:\' bad things happen. Backslashes themselves cannot be escaped
331 // and through experimentation they also escape '='. To be safe we replace
332 // trailing backslashes with and underscore.
333 size_t length = result.GetLength();
334 if (result[length - 1] == '\\')
335 result[length - 1] = '_';
340 String InfluxdbWriter::EscapeField(const String& str)
342 //TODO: Evaluate whether boost::regex is really needed here.
345 boost::regex integer("-?\\d+i");
346 if (boost::regex_match(str.GetData(), integer)) {
351 boost::regex numeric("-?\\d+(\\.\\d+)?((e|E)[+-]?\\d+)?");
352 if (boost::regex_match(str.GetData(), numeric)) {
357 boost::regex boolean_true("t|true", boost::regex::icase);
358 if (boost::regex_match(str.GetData(), boolean_true))
360 boost::regex boolean_false("f|false", boost::regex::icase);
361 if (boost::regex_match(str.GetData(), boolean_false))
364 // Otherwise it's a string and needs escaping and quoting
366 boost::algorithm::replace_all(result, "\"", "\\\"");
367 return "\"" + result + "\"";
370 void InfluxdbWriter::SendMetric(const Dictionary::Ptr& tmpl, const String& label, const Dictionary::Ptr& fields, double ts)
372 std::ostringstream msgbuf;
373 msgbuf << EscapeKey(tmpl->Get("measurement"));
375 Dictionary::Ptr tags = tmpl->Get("tags");
377 ObjectLock olock(tags);
378 for (const Dictionary::Pair& pair : tags) {
379 // Empty macro expansion, no tag
380 if (!pair.second.IsEmpty()) {
381 msgbuf << "," << EscapeKey(pair.first) << "=" << EscapeKey(pair.second);
386 // Label is may be empty in the case of metadata
387 if (!label.IsEmpty())
388 msgbuf << ",metric=" << EscapeKey(label);
395 ObjectLock fieldLock(fields);
396 for (const Dictionary::Pair& pair : fields) {
402 msgbuf << EscapeKey(pair.first) << "=" << EscapeField(pair.second);
406 msgbuf << " " << static_cast<unsigned long>(ts);
408 Log(LogDebug, "InfluxdbWriter")
409 << "Add to metric list: '" << msgbuf.str() << "'.";
411 // Atomically buffer the data point
412 boost::mutex::scoped_lock lock(m_DataBufferMutex);
413 m_DataBuffer.push_back(String(msgbuf.str()));
415 // Flush if we've buffered too much to prevent excessive memory use
416 if (static_cast<int>(m_DataBuffer.size()) >= GetFlushThreshold()) {
417 Log(LogDebug, "InfluxdbWriter")
418 << "Data buffer overflow writing " << m_DataBuffer.size() << " data points";
423 void InfluxdbWriter::FlushTimeout(void)
425 // Prevent new data points from being added to the array, there is a
426 // race condition where they could disappear
427 boost::mutex::scoped_lock lock(m_DataBufferMutex);
429 // Flush if there are any data available
430 if (m_DataBuffer.size() > 0) {
431 Log(LogDebug, "InfluxdbWriter")
432 << "Timer expired writing " << m_DataBuffer.size() << " data points";
437 void InfluxdbWriter::Flush(void)
439 // Ensure you hold a lock against m_DataBuffer so that things
440 // don't go missing after creating the body and clearing the buffer
441 String body = boost::algorithm::join(m_DataBuffer, "\n");
442 m_DataBuffer.clear();
444 // Asynchronously flush the metric body to InfluxDB
445 m_WorkQueue.Enqueue(boost::bind(&InfluxdbWriter::FlushHandler, this, body));
448 void InfluxdbWriter::FlushHandler(const String& body)
452 TcpSocket::Ptr socket;
453 Stream::Ptr stream = Connect(socket);
460 Url::Ptr url = new Url();
461 url->SetScheme(GetSslEnable() ? "https" : "http");
462 url->SetHost(GetHost());
463 url->SetPort(GetPort());
465 std::vector<String> path;
466 path.push_back("write");
469 url->AddQueryElement("db", GetDatabase());
470 url->AddQueryElement("precision", "s");
471 if (!GetUsername().IsEmpty())
472 url->AddQueryElement("u", GetUsername());
473 if (!GetPassword().IsEmpty())
474 url->AddQueryElement("p", GetPassword());
476 HttpRequest req(stream);
477 req.RequestMethod = "POST";
478 req.RequestUrl = url;
481 req.WriteBody(body.CStr(), body.GetLength());
483 } catch (const std::exception& ex) {
484 Log(LogWarning, "InfluxdbWriter")
485 << "Cannot write to TCP socket on host '" << GetHost() << "' port '" << GetPort() << "'.";
489 //TODO: Evaluate whether waiting for the result makes sense here. KeepAlive and close are options.
490 HttpResponse resp(stream, req);
491 StreamReadContext context;
493 struct timeval timeout = { GetSocketTimeout(), 0 };
495 if (!socket->Poll(true, false, &timeout)) {
496 Log(LogWarning, "InfluxdbWriter")
497 << "Response timeout of TCP socket from host '" << GetHost() << "' port '" << GetPort() << "'.";
502 resp.Parse(context, true);
503 } catch (const std::exception& ex) {
504 Log(LogWarning, "InfluxdbWriter")
505 << "Cannot read from TCP socket from host '" << GetHost() << "' port '" << GetPort() << "'.";
509 if (resp.StatusCode != 204) {
510 Log(LogWarning, "InfluxdbWriter")
511 << "Unexpected response code " << resp.StatusCode;
513 // Finish parsing the headers and body
514 while (!resp.Complete)
515 resp.Parse(context, true);
517 String contentType = resp.Headers->Get("content-type");
518 if (contentType != "application/json") {
519 Log(LogWarning, "InfluxdbWriter")
520 << "Unexpected Content-Type: " << contentType;
524 size_t responseSize = resp.GetBodySize();
525 boost::scoped_array<char> buffer(new char[responseSize + 1]);
526 resp.ReadBody(buffer.get(), responseSize);
527 buffer.get()[responseSize] = '\0';
529 Dictionary::Ptr jsonResponse;
531 jsonResponse = JsonDecode(buffer.get());
533 Log(LogWarning, "InfluxdbWriter")
534 << "Unable to parse JSON response:\n" << buffer.get();
538 String error = jsonResponse->Get("error");
540 Log(LogCritical, "InfluxdbWriter")
541 << "InfluxDB error message:\n" << error;
545 void InfluxdbWriter::IncreaseTaskCount(void)
547 double now = Utility::GetTime();
549 boost::mutex::scoped_lock lock(m_StatsMutex);
550 m_TaskStats.InsertValue(now, 1);
553 int InfluxdbWriter::GetTaskCount(RingBuffer::SizeType span) const
555 boost::mutex::scoped_lock lock(m_StatsMutex);
556 return m_TaskStats.GetValues(span);
559 void InfluxdbWriter::ValidateHostTemplate(const Dictionary::Ptr& value, const ValidationUtils& utils)
561 ObjectImpl<InfluxdbWriter>::ValidateHostTemplate(value, utils);
563 String measurement = value->Get("measurement");
564 if (!MacroProcessor::ValidateMacroString(measurement))
565 BOOST_THROW_EXCEPTION(ValidationError(this, boost::assign::list_of("host_template")("measurement"), "Closing $ not found in macro format string '" + measurement + "'."));
567 Dictionary::Ptr tags = value->Get("tags");
569 ObjectLock olock(tags);
570 for (const Dictionary::Pair& pair : tags) {
571 if (!MacroProcessor::ValidateMacroString(pair.second))
572 BOOST_THROW_EXCEPTION(ValidationError(this, boost::assign::list_of<String>("host_template")("tags")(pair.first), "Closing $ not found in macro format string '" + pair.second));
577 void InfluxdbWriter::ValidateServiceTemplate(const Dictionary::Ptr& value, const ValidationUtils& utils)
579 ObjectImpl<InfluxdbWriter>::ValidateServiceTemplate(value, utils);
581 String measurement = value->Get("measurement");
582 if (!MacroProcessor::ValidateMacroString(measurement))
583 BOOST_THROW_EXCEPTION(ValidationError(this, boost::assign::list_of("service_template")("measurement"), "Closing $ not found in macro format string '" + measurement + "'."));
585 Dictionary::Ptr tags = value->Get("tags");
587 ObjectLock olock(tags);
588 for (const Dictionary::Pair& pair : tags) {
589 if (!MacroProcessor::ValidateMacroString(pair.second))
590 BOOST_THROW_EXCEPTION(ValidationError(this, boost::assign::list_of<String>("service_template")("tags")(pair.first), "Closing $ not found in macro format string '" + pair.second));