]> granicus.if.org Git - icinga2/blob - lib/livestatus/livestatusquery.cpp
Merge pull request #7185 from Icinga/bugfix/gelfwriter-wrong-log-facility
[icinga2] / lib / livestatus / livestatusquery.cpp
1 /* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */
2
3 #include "livestatus/livestatusquery.hpp"
4 #include "livestatus/countaggregator.hpp"
5 #include "livestatus/sumaggregator.hpp"
6 #include "livestatus/minaggregator.hpp"
7 #include "livestatus/maxaggregator.hpp"
8 #include "livestatus/avgaggregator.hpp"
9 #include "livestatus/stdaggregator.hpp"
10 #include "livestatus/invsumaggregator.hpp"
11 #include "livestatus/invavgaggregator.hpp"
12 #include "livestatus/attributefilter.hpp"
13 #include "livestatus/negatefilter.hpp"
14 #include "livestatus/orfilter.hpp"
15 #include "livestatus/andfilter.hpp"
16 #include "icinga/externalcommandprocessor.hpp"
17 #include "base/debug.hpp"
18 #include "base/convert.hpp"
19 #include "base/objectlock.hpp"
20 #include "base/logger.hpp"
21 #include "base/exception.hpp"
22 #include "base/utility.hpp"
23 #include "base/json.hpp"
24 #include "base/serializer.hpp"
25 #include "base/timer.hpp"
26 #include "base/initialize.hpp"
27 #include <boost/algorithm/string/replace.hpp>
28 #include <boost/algorithm/string/join.hpp>
29
30 using namespace icinga;
31
32 static int l_ExternalCommands = 0;
33 static boost::mutex l_QueryMutex;
34
35 LivestatusQuery::LivestatusQuery(const std::vector<String>& lines, const String& compat_log_path)
36         : m_KeepAlive(false), m_OutputFormat("csv"), m_ColumnHeaders(true), m_Limit(-1), m_ErrorCode(0),
37         m_LogTimeFrom(0), m_LogTimeUntil(static_cast<long>(Utility::GetTime()))
38 {
39         if (lines.size() == 0) {
40                 m_Verb = "ERROR";
41                 m_ErrorCode = LivestatusErrorQuery;
42                 m_ErrorMessage = "Empty Query. Aborting.";
43                 return;
44         }
45
46         String msg;
47         for (const String& line : lines) {
48                 msg += line + "\n";
49         }
50         Log(LogDebug, "LivestatusQuery", msg);
51
52         m_CompatLogPath = compat_log_path;
53
54         /* default separators */
55         m_Separators.emplace_back("\n");
56         m_Separators.emplace_back(";");
57         m_Separators.emplace_back(",");
58         m_Separators.emplace_back("|");
59
60         String line = lines[0];
61
62         size_t sp_index = line.FindFirstOf(" ");
63
64         if (sp_index == String::NPos)
65                 BOOST_THROW_EXCEPTION(std::runtime_error("Livestatus header must contain a verb."));
66
67         String verb = line.SubStr(0, sp_index);
68         String target = line.SubStr(sp_index + 1);
69
70         m_Verb = verb;
71
72         if (m_Verb == "COMMAND") {
73                 m_KeepAlive = true;
74                 m_Command = target;
75         } else if (m_Verb == "GET") {
76                 m_Table = target;
77         } else {
78                 m_Verb = "ERROR";
79                 m_ErrorCode = LivestatusErrorQuery;
80                 m_ErrorMessage = "Unknown livestatus verb: " + m_Verb;
81                 return;
82         }
83
84         std::deque<Filter::Ptr> filters, stats;
85         std::deque<Aggregator::Ptr> aggregators;
86
87         for (unsigned int i = 1; i < lines.size(); i++) {
88                 line = lines[i];
89
90                 size_t col_index = line.FindFirstOf(":");
91                 String header = line.SubStr(0, col_index);
92                 String params;
93
94                 //OutputFormat:json or OutputFormat: json
95                 if (line.GetLength() > col_index + 1)
96                         params = line.SubStr(col_index + 1).Trim();
97
98                 if (header == "ResponseHeader")
99                         m_ResponseHeader = params;
100                 else if (header == "OutputFormat")
101                         m_OutputFormat = params;
102                 else if (header == "KeepAlive")
103                         m_KeepAlive = (params == "on");
104                 else if (header == "Columns") {
105                         m_ColumnHeaders = false; // Might be explicitly re-enabled later on
106                         m_Columns = params.Split(" ");
107                 } else if (header == "Separators") {
108                         std::vector<String> separators = params.Split(" ");
109
110                         /* ugly ascii long to char conversion, but works */
111                         if (separators.size() > 0)
112                                 m_Separators[0] = String(1, static_cast<char>(Convert::ToLong(separators[0])));
113                         if (separators.size() > 1)
114                                 m_Separators[1] = String(1, static_cast<char>(Convert::ToLong(separators[1])));
115                         if (separators.size() > 2)
116                                 m_Separators[2] = String(1, static_cast<char>(Convert::ToLong(separators[2])));
117                         if (separators.size() > 3)
118                                 m_Separators[3] = String(1, static_cast<char>(Convert::ToLong(separators[3])));
119                 } else if (header == "ColumnHeaders")
120                         m_ColumnHeaders = (params == "on");
121                 else if (header == "Limit")
122                         m_Limit = Convert::ToLong(params);
123                 else if (header == "Filter") {
124                         Filter::Ptr filter = ParseFilter(params, m_LogTimeFrom, m_LogTimeUntil);
125
126                         if (!filter) {
127                                 m_Verb = "ERROR";
128                                 m_ErrorCode = LivestatusErrorQuery;
129                                 m_ErrorMessage = "Invalid filter specification: " + line;
130                                 return;
131                         }
132
133                         filters.push_back(filter);
134                 } else if (header == "Stats") {
135                         m_ColumnHeaders = false; // Might be explicitly re-enabled later on
136
137                         std::vector<String> tokens = params.Split(" ");
138
139                         if (tokens.size() < 2) {
140                                 m_Verb = "ERROR";
141                                 m_ErrorCode = LivestatusErrorQuery;
142                                 m_ErrorMessage = "Missing aggregator column name: " + line;
143                                 return;
144                         }
145
146                         String aggregate_arg = tokens[0];
147                         String aggregate_attr = tokens[1];
148
149                         Aggregator::Ptr aggregator;
150                         Filter::Ptr filter;
151
152                         if (aggregate_arg == "sum") {
153                                 aggregator = new SumAggregator(aggregate_attr);
154                         } else if (aggregate_arg == "min") {
155                                 aggregator = new MinAggregator(aggregate_attr);
156                         } else if (aggregate_arg == "max") {
157                                 aggregator = new MaxAggregator(aggregate_attr);
158                         } else if (aggregate_arg == "avg") {
159                                 aggregator = new AvgAggregator(aggregate_attr);
160                         } else if (aggregate_arg == "std") {
161                                 aggregator = new StdAggregator(aggregate_attr);
162                         } else if (aggregate_arg == "suminv") {
163                                 aggregator = new InvSumAggregator(aggregate_attr);
164                         } else if (aggregate_arg == "avginv") {
165                                 aggregator = new InvAvgAggregator(aggregate_attr);
166                         } else {
167                                 filter = ParseFilter(params, m_LogTimeFrom, m_LogTimeUntil);
168
169                                 if (!filter) {
170                                         m_Verb = "ERROR";
171                                         m_ErrorCode = LivestatusErrorQuery;
172                                         m_ErrorMessage = "Invalid filter specification: " + line;
173                                         return;
174                                 }
175
176                                 aggregator = new CountAggregator();
177                         }
178
179                         aggregator->SetFilter(filter);
180                         aggregators.push_back(aggregator);
181
182                         stats.push_back(filter);
183                 } else if (header == "Or" || header == "And" || header == "StatsOr" || header == "StatsAnd") {
184                         std::deque<Filter::Ptr>& deq = (header == "Or" || header == "And") ? filters : stats;
185
186                         unsigned int num = Convert::ToLong(params);
187                         CombinerFilter::Ptr filter;
188
189                         if (header == "Or" || header == "StatsOr") {
190                                 filter = new OrFilter();
191                                 Log(LogDebug, "LivestatusQuery")
192                                         << "Add OR filter for " << params << " column(s). " << deq.size() << " filters available.";
193                         } else {
194                                 filter = new AndFilter();
195                                 Log(LogDebug, "LivestatusQuery")
196                                         << "Add AND filter for " << params << " column(s). " << deq.size() << " filters available.";
197                         }
198
199                         if (num > deq.size()) {
200                                 m_Verb = "ERROR";
201                                 m_ErrorCode = 451;
202                                 m_ErrorMessage = "Or/StatsOr is referencing " + Convert::ToString(num) + " filters; stack only contains " + Convert::ToString(static_cast<long>(deq.size())) + " filters";
203                                 return;
204                         }
205
206                         while (num > 0 && num--) {
207                                 filter->AddSubFilter(deq.back());
208                                 Log(LogDebug, "LivestatusQuery")
209                                         << "Add " << num << " filter.";
210                                 deq.pop_back();
211                                 if (&deq == &stats)
212                                         aggregators.pop_back();
213                         }
214
215                         deq.emplace_back(filter);
216                         if (&deq == &stats) {
217                                 Aggregator::Ptr aggregator = new CountAggregator();
218                                 aggregator->SetFilter(filter);
219                                 aggregators.push_back(aggregator);
220                         }
221                 } else if (header == "Negate" || header == "StatsNegate") {
222                         std::deque<Filter::Ptr>& deq = (header == "Negate") ? filters : stats;
223
224                         if (deq.empty()) {
225                                 m_Verb = "ERROR";
226                                 m_ErrorCode = 451;
227                                 m_ErrorMessage = "Negate/StatsNegate used, however the filter stack is empty";
228                                 return;
229                         }
230
231                         Filter::Ptr filter = deq.back();
232                         deq.pop_back();
233
234                         if (!filter) {
235                                 m_Verb = "ERROR";
236                                 m_ErrorCode = 451;
237                                 m_ErrorMessage = "Negate/StatsNegate used, however last stats doesn't have a filter";
238                                 return;
239                         }
240
241                         deq.push_back(new NegateFilter(filter));
242
243                         if (deq == stats) {
244                                 Aggregator::Ptr aggregator = aggregators.back();
245                                 aggregator->SetFilter(filter);
246                         }
247                 }
248         }
249
250         /* Combine all top-level filters into a single filter. */
251         AndFilter::Ptr top_filter = new AndFilter();
252
253         for (const Filter::Ptr& filter : filters) {
254                 top_filter->AddSubFilter(filter);
255         }
256
257         m_Filter = top_filter;
258         m_Aggregators.swap(aggregators);
259 }
260
261 int LivestatusQuery::GetExternalCommands()
262 {
263         boost::mutex::scoped_lock lock(l_QueryMutex);
264
265         return l_ExternalCommands;
266 }
267
268 Filter::Ptr LivestatusQuery::ParseFilter(const String& params, unsigned long& from, unsigned long& until)
269 {
270         /*
271          * time >= 1382696656
272          * type = SERVICE FLAPPING ALERT
273          */
274         std::vector<String> tokens;
275         size_t sp_index;
276         String temp_buffer = params;
277
278         /* extract attr and op */
279         for (int i = 0; i < 2; i++) {
280                 sp_index = temp_buffer.FindFirstOf(" ");
281
282                 /* check if this is the last argument */
283                 if (sp_index == String::NPos) {
284                         /* 'attr op' or 'attr op val' is valid */
285                         if (i < 1)
286                                 BOOST_THROW_EXCEPTION(std::runtime_error("Livestatus filter '" + params + "' does not contain all required fields."));
287
288                         break;
289                 }
290
291                 tokens.emplace_back(temp_buffer.SubStr(0, sp_index));
292                 temp_buffer = temp_buffer.SubStr(sp_index + 1);
293         }
294
295         /* add the rest as value */
296         tokens.emplace_back(std::move(temp_buffer));
297
298         if (tokens.size() == 2)
299                 tokens.emplace_back("");
300
301         if (tokens.size() < 3)
302                 return nullptr;
303
304         bool negate = false;
305         String attr = tokens[0];
306         String op = tokens[1];
307         String val = tokens[2];
308
309         if (op == "!=") {
310                 op = "=";
311                 negate = true;
312         } else if (op == "!~") {
313                 op = "~";
314                 negate = true;
315         } else if (op == "!=~") {
316                 op = "=~";
317                 negate = true;
318         } else if (op == "!~~") {
319                 op = "~~";
320                 negate = true;
321         }
322
323         Filter::Ptr filter = new AttributeFilter(attr, op, val);
324
325         if (negate)
326                 filter = new NegateFilter(filter);
327
328         /* pre-filter log time duration */
329         if (attr == "time") {
330                 if (op == "<" || op == "<=") {
331                         until = Convert::ToLong(val);
332                 } else if (op == ">" || op == ">=") {
333                         from = Convert::ToLong(val);
334                 }
335         }
336
337         Log(LogDebug, "LivestatusQuery")
338                 << "Parsed filter with attr: '" << attr << "' op: '" << op << "' val: '" << val << "'.";
339
340         return filter;
341 }
342
343 void LivestatusQuery::BeginResultSet(std::ostream& fp) const
344 {
345         if (m_OutputFormat == "json" || m_OutputFormat == "python")
346                 fp << "[";
347 }
348
349 void LivestatusQuery::EndResultSet(std::ostream& fp) const
350 {
351         if (m_OutputFormat == "json" || m_OutputFormat == "python")
352                 fp << "]";
353 }
354
355 void LivestatusQuery::AppendResultRow(std::ostream& fp, const Array::Ptr& row, bool& first_row) const
356 {
357         if (m_OutputFormat == "csv") {
358                 bool first = true;
359
360                 ObjectLock rlock(row);
361                 for (const Value& value : row) {
362                         if (first)
363                                 first = false;
364                         else
365                                 fp << m_Separators[1];
366
367                         if (value.IsObjectType<Array>())
368                                 PrintCsvArray(fp, value, 0);
369                         else
370                                 fp << value;
371                 }
372
373                 fp << m_Separators[0];
374         } else if (m_OutputFormat == "json") {
375                 if (!first_row)
376                         fp << ", ";
377
378                 fp << JsonEncode(row);
379         } else if (m_OutputFormat == "python") {
380                 if (!first_row)
381                         fp << ", ";
382
383                 PrintPythonArray(fp, row);
384         }
385
386         first_row = false;
387 }
388
389 void LivestatusQuery::PrintCsvArray(std::ostream& fp, const Array::Ptr& array, int level) const
390 {
391         bool first = true;
392
393         ObjectLock olock(array);
394         for (const Value& value : array) {
395                 if (first)
396                         first = false;
397                 else
398                         fp << ((level == 0) ? m_Separators[2] : m_Separators[3]);
399
400                 if (value.IsObjectType<Array>())
401                         PrintCsvArray(fp, value, level + 1);
402                 else if (value.IsBoolean())
403                         fp << Convert::ToLong(value);
404                 else
405                         fp << value;
406         }
407 }
408
409 void LivestatusQuery::PrintPythonArray(std::ostream& fp, const Array::Ptr& rs) const
410 {
411         fp << "[ ";
412
413         bool first = true;
414
415         for (const Value& value : rs) {
416                 if (first)
417                         first = false;
418                 else
419                         fp << ", ";
420
421                 if (value.IsObjectType<Array>())
422                         PrintPythonArray(fp, value);
423                 else if (value.IsNumber())
424                         fp << value;
425                 else
426                         fp << QuoteStringPython(value);
427         }
428
429         fp << " ]";
430 }
431
432 String LivestatusQuery::QuoteStringPython(const String& str) {
433         String result = str;
434         boost::algorithm::replace_all(result, "\"", "\\\"");
435         return "r\"" + result + "\"";
436 }
437
438 void LivestatusQuery::ExecuteGetHelper(const Stream::Ptr& stream)
439 {
440         Log(LogNotice, "LivestatusQuery")
441                 << "Table: " << m_Table;
442
443         Table::Ptr table = Table::GetByName(m_Table, m_CompatLogPath, m_LogTimeFrom, m_LogTimeUntil);
444
445         if (!table) {
446                 SendResponse(stream, LivestatusErrorNotFound, "Table '" + m_Table + "' does not exist.");
447
448                 return;
449         }
450
451         std::vector<LivestatusRowValue> objects = table->FilterRows(m_Filter, m_Limit);
452         std::vector<String> columns;
453
454         if (m_Columns.size() > 0)
455                 columns = m_Columns;
456         else
457                 columns = table->GetColumnNames();
458
459         std::ostringstream result;
460         bool first_row = true;
461         BeginResultSet(result);
462
463         if (m_Aggregators.empty()) {
464                 typedef std::pair<String, Column> ColumnPair;
465
466                 std::vector<ColumnPair> column_objs;
467                 column_objs.reserve(columns.size());
468
469                 for (const String& columnName : columns)
470                         column_objs.emplace_back(columnName, table->GetColumn(columnName));
471
472                 ArrayData header;
473
474                 for (const LivestatusRowValue& object : objects) {
475                         ArrayData row;
476
477                         row.reserve(column_objs.size());
478
479                         for (const ColumnPair& cv : column_objs) {
480                                 if (m_ColumnHeaders)
481                                         header.push_back(cv.first);
482
483                                 row.push_back(cv.second.ExtractValue(object.Row, object.GroupByType, object.GroupByObject));
484                         }
485
486                         if (m_ColumnHeaders) {
487                                 AppendResultRow(result, new Array(std::move(header)), first_row);
488                                 m_ColumnHeaders = false;
489                         }
490
491                         AppendResultRow(result, new Array(std::move(row)), first_row);
492                 }
493         } else {
494                 std::map<std::vector<Value>, std::vector<AggregatorState *> > allStats;
495
496                 /* add aggregated stats */
497                 for (const LivestatusRowValue& object : objects) {
498                         std::vector<Value> statsKey;
499
500                         for (const String& columnName : m_Columns) {
501                                 Column column = table->GetColumn(columnName);
502                                 statsKey.emplace_back(column.ExtractValue(object.Row, object.GroupByType, object.GroupByObject));
503                         }
504
505                         auto it = allStats.find(statsKey);
506
507                         if (it == allStats.end()) {
508                                 std::vector<AggregatorState *> newStats(m_Aggregators.size(), nullptr);
509                                 it = allStats.insert(std::make_pair(statsKey, newStats)).first;
510                         }
511
512                         auto& stats = it->second;
513
514                         int index = 0;
515
516                         for (const Aggregator::Ptr& aggregator : m_Aggregators) {
517                                 aggregator->Apply(table, object.Row, &stats[index]);
518                                 index++;
519                         }
520                 }
521
522                 /* add column headers both for raw and aggregated data */
523                 if (m_ColumnHeaders) {
524                         ArrayData header;
525
526                         for (const String& columnName : m_Columns) {
527                                 header.push_back(columnName);
528                         }
529
530                         for (size_t i = 1; i <= m_Aggregators.size(); i++) {
531                                 header.push_back("stats_" + Convert::ToString(i));
532                         }
533
534                         AppendResultRow(result, new Array(std::move(header)), first_row);
535                 }
536
537                 for (const auto& kv : allStats) {
538                         ArrayData row;
539
540                         row.reserve(m_Columns.size() + m_Aggregators.size());
541
542                         for (const Value& keyPart : kv.first) {
543                                 row.push_back(keyPart);
544                         }
545
546                         auto& stats = kv.second;
547
548                         for (size_t i = 0; i < m_Aggregators.size(); i++)
549                                 row.push_back(m_Aggregators[i]->GetResultAndFreeState(stats[i]));
550
551                         AppendResultRow(result, new Array(std::move(row)), first_row);
552                 }
553
554                 /* add a bogus zero value if aggregated is empty*/
555                 if (allStats.empty()) {
556                         ArrayData row;
557
558                         row.reserve(m_Aggregators.size());
559
560                         for (size_t i = 1; i <= m_Aggregators.size(); i++) {
561                                 row.push_back(0);
562                         }
563
564                         AppendResultRow(result, new Array(std::move(row)), first_row);
565                 }
566         }
567
568         EndResultSet(result);
569
570         SendResponse(stream, LivestatusErrorOK, result.str());
571 }
572
573 void LivestatusQuery::ExecuteCommandHelper(const Stream::Ptr& stream)
574 {
575         {
576                 boost::mutex::scoped_lock lock(l_QueryMutex);
577
578                 l_ExternalCommands++;
579         }
580
581         Log(LogNotice, "LivestatusQuery")
582                 << "Executing command: " << m_Command;
583         ExternalCommandProcessor::Execute(m_Command);
584         SendResponse(stream, LivestatusErrorOK, "");
585 }
586
587 void LivestatusQuery::ExecuteErrorHelper(const Stream::Ptr& stream)
588 {
589         Log(LogDebug, "LivestatusQuery")
590                 << "ERROR: Code: '" << m_ErrorCode << "' Message: '" << m_ErrorMessage << "'.";
591         SendResponse(stream, m_ErrorCode, m_ErrorMessage);
592 }
593
594 void LivestatusQuery::SendResponse(const Stream::Ptr& stream, int code, const String& data)
595 {
596         if (m_ResponseHeader == "fixed16")
597                 PrintFixed16(stream, code, data);
598
599         if (m_ResponseHeader == "fixed16" || code == LivestatusErrorOK) {
600                 try {
601                         stream->Write(data.CStr(), data.GetLength());
602                 } catch (const std::exception&) {
603                         Log(LogCritical, "LivestatusQuery", "Cannot write query response to socket.");
604                 }
605         }
606 }
607
608 void LivestatusQuery::PrintFixed16(const Stream::Ptr& stream, int code, const String& data)
609 {
610         ASSERT(code >= 100 && code <= 999);
611
612         String sCode = Convert::ToString(code);
613         String sLength = Convert::ToString(static_cast<long>(data.GetLength()));
614
615         String header = sCode + String(16 - 3 - sLength.GetLength() - 1, ' ') + sLength + m_Separators[0];
616
617         try {
618                 stream->Write(header.CStr(), header.GetLength());
619         } catch (const std::exception&) {
620                 Log(LogCritical, "LivestatusQuery", "Cannot write to TCP socket.");
621         }
622 }
623
624 bool LivestatusQuery::Execute(const Stream::Ptr& stream)
625 {
626         try {
627                 Log(LogNotice, "LivestatusQuery")
628                         << "Executing livestatus query: " << m_Verb;
629
630                 if (m_Verb == "GET")
631                         ExecuteGetHelper(stream);
632                 else if (m_Verb == "COMMAND")
633                         ExecuteCommandHelper(stream);
634                 else if (m_Verb == "ERROR")
635                         ExecuteErrorHelper(stream);
636                 else
637                         BOOST_THROW_EXCEPTION(std::runtime_error("Invalid livestatus query verb."));
638         } catch (const std::exception& ex) {
639                 SendResponse(stream, LivestatusErrorQuery, DiagnosticInformation(ex));
640         }
641
642         if (!m_KeepAlive) {
643                 stream->Close();
644                 return false;
645         }
646
647         return true;
648 }