1 /* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */
3 #include "perfdata/opentsdbwriter.hpp"
4 #include "perfdata/opentsdbwriter-ti.cpp"
5 #include "icinga/service.hpp"
6 #include "icinga/checkcommand.hpp"
7 #include "icinga/macroprocessor.hpp"
8 #include "icinga/icingaapplication.hpp"
9 #include "icinga/compatutility.hpp"
10 #include "base/tcpsocket.hpp"
11 #include "base/configtype.hpp"
12 #include "base/objectlock.hpp"
13 #include "base/logger.hpp"
14 #include "base/convert.hpp"
15 #include "base/utility.hpp"
16 #include "base/perfdatavalue.hpp"
17 #include "base/application.hpp"
18 #include "base/stream.hpp"
19 #include "base/networkstream.hpp"
20 #include "base/exception.hpp"
21 #include "base/statsfunction.hpp"
22 #include <boost/algorithm/string.hpp>
23 #include <boost/algorithm/string/replace.hpp>
25 using namespace icinga;
27 REGISTER_TYPE(OpenTsdbWriter);
29 REGISTER_STATSFUNCTION(OpenTsdbWriter, &OpenTsdbWriter::StatsFunc);
31 void OpenTsdbWriter::OnConfigLoaded()
33 ObjectImpl<OpenTsdbWriter>::OnConfigLoaded();
36 Log(LogDebug, "OpenTsdbWriter")
37 << "HA functionality disabled. Won't pause connection: " << GetName();
39 SetHAMode(HARunEverywhere);
45 void OpenTsdbWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr&)
49 for (const OpenTsdbWriter::Ptr& opentsdbwriter : ConfigType::GetObjectsByType<OpenTsdbWriter>()) {
50 nodes.emplace_back(opentsdbwriter->GetName(), 1); //add more stats
53 status->Set("opentsdbwriter", new Dictionary(std::move(nodes)));
56 void OpenTsdbWriter::Resume()
58 ObjectImpl<OpenTsdbWriter>::Resume();
60 Log(LogInformation, "OpentsdbWriter")
61 << "'" << GetName() << "' resumed.";
63 m_ReconnectTimer = new Timer();
64 m_ReconnectTimer->SetInterval(10);
65 m_ReconnectTimer->OnTimerExpired.connect(std::bind(&OpenTsdbWriter::ReconnectTimerHandler, this));
66 m_ReconnectTimer->Start();
67 m_ReconnectTimer->Reschedule(0);
69 Service::OnNewCheckResult.connect(std::bind(&OpenTsdbWriter::CheckResultHandler, this, _1, _2));
72 /* Pause is equivalent to Stop, but with HA capabilities to resume at runtime. */
73 void OpenTsdbWriter::Pause()
75 m_ReconnectTimer.reset();
77 Log(LogInformation, "OpentsdbWriter")
78 << "'" << GetName() << "' paused.";
80 ObjectImpl<OpenTsdbWriter>::Pause();
83 void OpenTsdbWriter::ReconnectTimerHandler()
91 TcpSocket::Ptr socket = new TcpSocket();
93 Log(LogNotice, "OpenTsdbWriter")
94 << "Reconnect to OpenTSDB TSD on host '" << GetHost() << "' port '" << GetPort() << "'.";
97 socket->Connect(GetHost(), GetPort());
98 } catch (std::exception&) {
99 Log(LogCritical, "OpenTsdbWriter")
100 << "Can't connect to OpenTSDB TSD on host '" << GetHost() << "' port '" << GetPort() << "'.";
104 m_Stream = new NetworkStream(socket);
107 void OpenTsdbWriter::CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
112 CONTEXT("Processing check result for '" + checkable->GetName() + "'");
114 if (!IcingaApplication::GetInstance()->GetEnablePerfdata() || !checkable->GetEnablePerfdata())
117 Service::Ptr service = dynamic_pointer_cast<Service>(checkable);
121 host = service->GetHost();
123 host = static_pointer_cast<Host>(checkable);
126 std::map<String, String> tags;
128 String escaped_hostName = EscapeTag(host->GetName());
129 tags["host"] = escaped_hostName;
131 double ts = cr->GetExecutionEnd();
134 String serviceName = service->GetShortName();
135 String escaped_serviceName = EscapeMetric(serviceName);
136 metric = "icinga.service." + escaped_serviceName;
138 SendMetric(checkable, metric + ".state", tags, service->GetState(), ts);
140 metric = "icinga.host";
141 SendMetric(checkable, metric + ".state", tags, host->GetState(), ts);
144 SendMetric(checkable, metric + ".state_type", tags, checkable->GetStateType(), ts);
145 SendMetric(checkable, metric + ".reachable", tags, checkable->IsReachable(), ts);
146 SendMetric(checkable, metric + ".downtime_depth", tags, checkable->GetDowntimeDepth(), ts);
147 SendMetric(checkable, metric + ".acknowledgement", tags, checkable->GetAcknowledgement(), ts);
149 SendPerfdata(checkable, metric, tags, cr, ts);
151 metric = "icinga.check";
154 tags["type"] = "service";
155 String serviceName = service->GetShortName();
156 String escaped_serviceName = EscapeTag(serviceName);
157 tags["service"] = escaped_serviceName;
159 tags["type"] = "host";
162 SendMetric(checkable, metric + ".current_attempt", tags, checkable->GetCheckAttempt(), ts);
163 SendMetric(checkable, metric + ".max_check_attempts", tags, checkable->GetMaxCheckAttempts(), ts);
164 SendMetric(checkable, metric + ".latency", tags, cr->CalculateLatency(), ts);
165 SendMetric(checkable, metric + ".execution_time", tags, cr->CalculateExecutionTime(), ts);
168 void OpenTsdbWriter::SendPerfdata(const Checkable::Ptr& checkable, const String& metric,
169 const std::map<String, String>& tags, const CheckResult::Ptr& cr, double ts)
171 Array::Ptr perfdata = cr->GetPerformanceData();
176 CheckCommand::Ptr checkCommand = checkable->GetCheckCommand();
178 ObjectLock olock(perfdata);
179 for (const Value& val : perfdata) {
180 PerfdataValue::Ptr pdv;
182 if (val.IsObjectType<PerfdataValue>())
186 pdv = PerfdataValue::Parse(val);
187 } catch (const std::exception&) {
188 Log(LogWarning, "OpenTsdbWriter")
189 << "Ignoring invalid perfdata for checkable '"
190 << checkable->GetName() << "' and command '"
191 << checkCommand->GetName() << "' with value: " << val;
196 String escaped_key = EscapeMetric(pdv->GetLabel());
197 boost::algorithm::replace_all(escaped_key, "::", ".");
199 SendMetric(checkable, metric + "." + escaped_key, tags, pdv->GetValue(), ts);
202 SendMetric(checkable, metric + "." + escaped_key + "_crit", tags, pdv->GetCrit(), ts);
204 SendMetric(checkable, metric + "." + escaped_key + "_warn", tags, pdv->GetWarn(), ts);
206 SendMetric(checkable, metric + "." + escaped_key + "_min", tags, pdv->GetMin(), ts);
208 SendMetric(checkable, metric + "." + escaped_key + "_max", tags, pdv->GetMax(), ts);
212 void OpenTsdbWriter::SendMetric(const Checkable::Ptr& checkable, const String& metric,
213 const std::map<String, String>& tags, double value, double ts)
215 String tags_string = "";
217 for (const Dictionary::Pair& tag : tags) {
218 tags_string += " " + tag.first + "=" + Convert::ToString(tag.second);
221 std::ostringstream msgbuf;
223 * must be (http://opentsdb.net/docs/build/html/user_guide/writing.html)
224 * put <metric> <timestamp> <value> <tagk1=tagv1[ tagk2=tagv2 ...tagkN=tagvN]>
225 * "tags" must include at least one tag, we use "host=HOSTNAME"
227 msgbuf << "put " << metric << " " << static_cast<long>(ts) << " " << Convert::ToString(value) << " " << tags_string;
229 Log(LogDebug, "OpenTsdbWriter")
230 << "Checkable '" << checkable->GetName() << "' adds to metric list: '" << msgbuf.str() << "'.";
232 /* do not send \n to debug log */
234 String put = msgbuf.str();
236 ObjectLock olock(this);
242 m_Stream->Write(put.CStr(), put.GetLength());
243 } catch (const std::exception& ex) {
244 Log(LogCritical, "OpenTsdbWriter")
245 << "Cannot write to OpenTSDB TSD on host '" << GetHost() << "' port '" << GetPort() + "'.";
251 /* for metric and tag name rules, see
252 * http://opentsdb.net/docs/build/html/user_guide/writing.html#metrics-and-tags
254 String OpenTsdbWriter::EscapeTag(const String& str)
258 boost::replace_all(result, " ", "_");
259 boost::replace_all(result, "\\", "_");
264 String OpenTsdbWriter::EscapeMetric(const String& str)
268 boost::replace_all(result, " ", "_");
269 boost::replace_all(result, ".", "_");
270 boost::replace_all(result, "\\", "_");
271 boost::replace_all(result, ":", "_");