]> granicus.if.org Git - icinga2/blob - lib/perfdata/opentsdbwriter.cpp
Merge pull request #7124 from Icinga/bugfix/namespace-thread-safe
[icinga2] / lib / perfdata / opentsdbwriter.cpp
1 /* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */
2
3 #include "perfdata/opentsdbwriter.hpp"
4 #include "perfdata/opentsdbwriter-ti.cpp"
5 #include "icinga/service.hpp"
6 #include "icinga/checkcommand.hpp"
7 #include "icinga/macroprocessor.hpp"
8 #include "icinga/icingaapplication.hpp"
9 #include "icinga/compatutility.hpp"
10 #include "base/tcpsocket.hpp"
11 #include "base/configtype.hpp"
12 #include "base/objectlock.hpp"
13 #include "base/logger.hpp"
14 #include "base/convert.hpp"
15 #include "base/utility.hpp"
16 #include "base/perfdatavalue.hpp"
17 #include "base/application.hpp"
18 #include "base/stream.hpp"
19 #include "base/networkstream.hpp"
20 #include "base/exception.hpp"
21 #include "base/statsfunction.hpp"
22 #include <boost/algorithm/string.hpp>
23 #include <boost/algorithm/string/replace.hpp>
24
25 using namespace icinga;
26
27 REGISTER_TYPE(OpenTsdbWriter);
28
29 REGISTER_STATSFUNCTION(OpenTsdbWriter, &OpenTsdbWriter::StatsFunc);
30
31 void OpenTsdbWriter::OnConfigLoaded()
32 {
33         ObjectImpl<OpenTsdbWriter>::OnConfigLoaded();
34
35         if (!GetEnableHa()) {
36                 Log(LogDebug, "OpenTsdbWriter")
37                         << "HA functionality disabled. Won't pause connection: " << GetName();
38
39                 SetHAMode(HARunEverywhere);
40         } else {
41                 SetHAMode(HARunOnce);
42         }
43 }
44
45 void OpenTsdbWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr&)
46 {
47         DictionaryData nodes;
48
49         for (const OpenTsdbWriter::Ptr& opentsdbwriter : ConfigType::GetObjectsByType<OpenTsdbWriter>()) {
50                 nodes.emplace_back(opentsdbwriter->GetName(), 1); //add more stats
51         }
52
53         status->Set("opentsdbwriter", new Dictionary(std::move(nodes)));
54 }
55
56 void OpenTsdbWriter::Resume()
57 {
58         ObjectImpl<OpenTsdbWriter>::Resume();
59
60         Log(LogInformation, "OpentsdbWriter")
61                 << "'" << GetName() << "' resumed.";
62
63         m_ReconnectTimer = new Timer();
64         m_ReconnectTimer->SetInterval(10);
65         m_ReconnectTimer->OnTimerExpired.connect(std::bind(&OpenTsdbWriter::ReconnectTimerHandler, this));
66         m_ReconnectTimer->Start();
67         m_ReconnectTimer->Reschedule(0);
68
69         Service::OnNewCheckResult.connect(std::bind(&OpenTsdbWriter::CheckResultHandler, this, _1, _2));
70 }
71
72 /* Pause is equivalent to Stop, but with HA capabilities to resume at runtime. */
73 void OpenTsdbWriter::Pause()
74 {
75         m_ReconnectTimer.reset();
76
77         Log(LogInformation, "OpentsdbWriter")
78                 << "'" << GetName() << "' paused.";
79
80         ObjectImpl<OpenTsdbWriter>::Pause();
81 }
82
83 void OpenTsdbWriter::ReconnectTimerHandler()
84 {
85         if (IsPaused())
86                 return;
87
88         if (m_Stream)
89                 return;
90
91         TcpSocket::Ptr socket = new TcpSocket();
92
93         Log(LogNotice, "OpenTsdbWriter")
94                 << "Reconnect to OpenTSDB TSD on host '" << GetHost() << "' port '" << GetPort() << "'.";
95
96         try {
97                 socket->Connect(GetHost(), GetPort());
98         } catch (std::exception&) {
99                 Log(LogCritical, "OpenTsdbWriter")
100                         << "Can't connect to OpenTSDB TSD on host '" << GetHost() << "' port '" << GetPort() << "'.";
101                 return;
102         }
103
104         m_Stream = new NetworkStream(socket);
105 }
106
107 void OpenTsdbWriter::CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
108 {
109         if (IsPaused())
110                 return;
111
112         CONTEXT("Processing check result for '" + checkable->GetName() + "'");
113
114         if (!IcingaApplication::GetInstance()->GetEnablePerfdata() || !checkable->GetEnablePerfdata())
115                 return;
116
117         Service::Ptr service = dynamic_pointer_cast<Service>(checkable);
118         Host::Ptr host;
119
120         if (service)
121                 host = service->GetHost();
122         else
123                 host = static_pointer_cast<Host>(checkable);
124
125         String metric;
126         std::map<String, String> tags;
127
128         String escaped_hostName = EscapeTag(host->GetName());
129         tags["host"] = escaped_hostName;
130
131         double ts = cr->GetExecutionEnd();
132
133         if (service) {
134                 String serviceName = service->GetShortName();
135                 String escaped_serviceName = EscapeMetric(serviceName);
136                 metric = "icinga.service." + escaped_serviceName;
137
138                 SendMetric(checkable, metric + ".state", tags, service->GetState(), ts);
139         } else {
140                 metric = "icinga.host";
141                 SendMetric(checkable, metric + ".state", tags, host->GetState(), ts);
142         }
143
144         SendMetric(checkable, metric + ".state_type", tags, checkable->GetStateType(), ts);
145         SendMetric(checkable, metric + ".reachable", tags, checkable->IsReachable(), ts);
146         SendMetric(checkable, metric + ".downtime_depth", tags, checkable->GetDowntimeDepth(), ts);
147         SendMetric(checkable, metric + ".acknowledgement", tags, checkable->GetAcknowledgement(), ts);
148
149         SendPerfdata(checkable, metric, tags, cr, ts);
150
151         metric = "icinga.check";
152
153         if (service) {
154                 tags["type"] = "service";
155                 String serviceName = service->GetShortName();
156                 String escaped_serviceName = EscapeTag(serviceName);
157                 tags["service"] = escaped_serviceName;
158         } else {
159                 tags["type"] = "host";
160         }
161
162         SendMetric(checkable, metric + ".current_attempt", tags, checkable->GetCheckAttempt(), ts);
163         SendMetric(checkable, metric + ".max_check_attempts", tags, checkable->GetMaxCheckAttempts(), ts);
164         SendMetric(checkable, metric + ".latency", tags, cr->CalculateLatency(), ts);
165         SendMetric(checkable, metric + ".execution_time", tags, cr->CalculateExecutionTime(), ts);
166 }
167
168 void OpenTsdbWriter::SendPerfdata(const Checkable::Ptr& checkable, const String& metric,
169         const std::map<String, String>& tags, const CheckResult::Ptr& cr, double ts)
170 {
171         Array::Ptr perfdata = cr->GetPerformanceData();
172
173         if (!perfdata)
174                 return;
175
176         CheckCommand::Ptr checkCommand = checkable->GetCheckCommand();
177
178         ObjectLock olock(perfdata);
179         for (const Value& val : perfdata) {
180                 PerfdataValue::Ptr pdv;
181
182                 if (val.IsObjectType<PerfdataValue>())
183                         pdv = val;
184                 else {
185                         try {
186                                 pdv = PerfdataValue::Parse(val);
187                         } catch (const std::exception&) {
188                                 Log(LogWarning, "OpenTsdbWriter")
189                                         << "Ignoring invalid perfdata for checkable '"
190                                         << checkable->GetName() << "' and command '"
191                                         << checkCommand->GetName() << "' with value: " << val;
192                                 continue;
193                         }
194                 }
195
196                 String escaped_key = EscapeMetric(pdv->GetLabel());
197                 boost::algorithm::replace_all(escaped_key, "::", ".");
198
199                 SendMetric(checkable, metric + "." + escaped_key, tags, pdv->GetValue(), ts);
200
201                 if (pdv->GetCrit())
202                         SendMetric(checkable, metric + "." + escaped_key + "_crit", tags, pdv->GetCrit(), ts);
203                 if (pdv->GetWarn())
204                         SendMetric(checkable, metric + "." + escaped_key + "_warn", tags, pdv->GetWarn(), ts);
205                 if (pdv->GetMin())
206                         SendMetric(checkable, metric + "." + escaped_key + "_min", tags, pdv->GetMin(), ts);
207                 if (pdv->GetMax())
208                         SendMetric(checkable, metric + "." + escaped_key + "_max", tags, pdv->GetMax(), ts);
209         }
210 }
211
212 void OpenTsdbWriter::SendMetric(const Checkable::Ptr& checkable, const String& metric,
213         const std::map<String, String>& tags, double value, double ts)
214 {
215         String tags_string = "";
216
217         for (const Dictionary::Pair& tag : tags) {
218                 tags_string += " " + tag.first + "=" + Convert::ToString(tag.second);
219         }
220
221         std::ostringstream msgbuf;
222         /*
223          * must be (http://opentsdb.net/docs/build/html/user_guide/writing.html)
224          * put <metric> <timestamp> <value> <tagk1=tagv1[ tagk2=tagv2 ...tagkN=tagvN]>
225          * "tags" must include at least one tag, we use "host=HOSTNAME"
226          */
227         msgbuf << "put " << metric << " " << static_cast<long>(ts) << " " << Convert::ToString(value) << " " << tags_string;
228
229         Log(LogDebug, "OpenTsdbWriter")
230                 << "Checkable '" << checkable->GetName() << "' adds to metric list: '" << msgbuf.str() << "'.";
231
232         /* do not send \n to debug log */
233         msgbuf << "\n";
234         String put = msgbuf.str();
235
236         ObjectLock olock(this);
237
238         if (!m_Stream)
239                 return;
240
241         try {
242                 m_Stream->Write(put.CStr(), put.GetLength());
243         } catch (const std::exception& ex) {
244                 Log(LogCritical, "OpenTsdbWriter")
245                         << "Cannot write to OpenTSDB TSD on host '" << GetHost() << "' port '" << GetPort() + "'.";
246
247                 m_Stream.reset();
248         }
249 }
250
251 /* for metric and tag name rules, see
252  * http://opentsdb.net/docs/build/html/user_guide/writing.html#metrics-and-tags
253  */
254 String OpenTsdbWriter::EscapeTag(const String& str)
255 {
256         String result = str;
257
258         boost::replace_all(result, " ", "_");
259         boost::replace_all(result, "\\", "_");
260
261         return result;
262 }
263
264 String OpenTsdbWriter::EscapeMetric(const String& str)
265 {
266         String result = str;
267
268         boost::replace_all(result, " ", "_");
269         boost::replace_all(result, ".", "_");
270         boost::replace_all(result, "\\", "_");
271         boost::replace_all(result, ":", "_");
272
273         return result;
274 }