]> granicus.if.org Git - icinga2/blob - lib/livestatus/livestatusquery.cpp
Livestatus: Fix crash with empty stats columns
[icinga2] / lib / livestatus / livestatusquery.cpp
1 /******************************************************************************
2  * Icinga 2                                                                   *
3  * Copyright (C) 2012-2017 Icinga Development Team (https://www.icinga.com/)  *
4  *                                                                            *
5  * This program is free software; you can redistribute it and/or              *
6  * modify it under the terms of the GNU General Public License                *
7  * as published by the Free Software Foundation; either version 2             *
8  * of the License, or (at your option) any later version.                     *
9  *                                                                            *
10  * This program is distributed in the hope that it will be useful,            *
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of             *
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the              *
13  * GNU General Public License for more details.                               *
14  *                                                                            *
15  * You should have received a copy of the GNU General Public License          *
16  * along with this program; if not, write to the Free Software Foundation     *
17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA.             *
18  ******************************************************************************/
19
20 #include "livestatus/livestatusquery.hpp"
21 #include "livestatus/countaggregator.hpp"
22 #include "livestatus/sumaggregator.hpp"
23 #include "livestatus/minaggregator.hpp"
24 #include "livestatus/maxaggregator.hpp"
25 #include "livestatus/avgaggregator.hpp"
26 #include "livestatus/stdaggregator.hpp"
27 #include "livestatus/invsumaggregator.hpp"
28 #include "livestatus/invavgaggregator.hpp"
29 #include "livestatus/attributefilter.hpp"
30 #include "livestatus/negatefilter.hpp"
31 #include "livestatus/orfilter.hpp"
32 #include "livestatus/andfilter.hpp"
33 #include "icinga/externalcommandprocessor.hpp"
34 #include "base/debug.hpp"
35 #include "base/convert.hpp"
36 #include "base/objectlock.hpp"
37 #include "base/logger.hpp"
38 #include "base/exception.hpp"
39 #include "base/utility.hpp"
40 #include "base/json.hpp"
41 #include "base/serializer.hpp"
42 #include "base/timer.hpp"
43 #include "base/initialize.hpp"
44 #include <boost/algorithm/string/classification.hpp>
45 #include <boost/algorithm/string/replace.hpp>
46 #include <boost/algorithm/string/split.hpp>
47 #include <boost/algorithm/string/join.hpp>
48
49 using namespace icinga;
50
51 static int l_ExternalCommands = 0;
52 static boost::mutex l_QueryMutex;
53
54 LivestatusQuery::LivestatusQuery(const std::vector<String>& lines, const String& compat_log_path)
55         : m_KeepAlive(false), m_OutputFormat("csv"), m_ColumnHeaders(true), m_Limit(-1), m_ErrorCode(0),
56           m_LogTimeFrom(0), m_LogTimeUntil(static_cast<long>(Utility::GetTime()))
57 {
58         if (lines.size() == 0) {
59                 m_Verb = "ERROR";
60                 m_ErrorCode = LivestatusErrorQuery;
61                 m_ErrorMessage = "Empty Query. Aborting.";
62                 return;
63         }
64
65         String msg;
66         for (const String& line : lines) {
67                 msg += line + "\n";
68         }
69         Log(LogDebug, "LivestatusQuery", msg);
70
71         m_CompatLogPath = compat_log_path;
72
73         /* default separators */
74         m_Separators.push_back("\n");
75         m_Separators.push_back(";");
76         m_Separators.push_back(",");
77         m_Separators.push_back("|");
78
79         String line = lines[0];
80
81         size_t sp_index = line.FindFirstOf(" ");
82
83         if (sp_index == String::NPos)
84                 BOOST_THROW_EXCEPTION(std::runtime_error("Livestatus header must contain a verb."));
85
86         String verb = line.SubStr(0, sp_index);
87         String target = line.SubStr(sp_index + 1);
88
89         m_Verb = verb;
90
91         if (m_Verb == "COMMAND") {
92                 m_KeepAlive = true;
93                 m_Command = target;
94         } else if (m_Verb == "GET") {
95                 m_Table = target;
96         } else {
97                 m_Verb = "ERROR";
98                 m_ErrorCode = LivestatusErrorQuery;
99                 m_ErrorMessage = "Unknown livestatus verb: " + m_Verb;
100                 return;
101         }
102
103         std::deque<Filter::Ptr> filters, stats;
104         std::deque<Aggregator::Ptr> aggregators;
105
106         for (unsigned int i = 1; i < lines.size(); i++) {
107                 line = lines[i];
108
109                 size_t col_index = line.FindFirstOf(":");
110                 String header = line.SubStr(0, col_index);
111                 String params;
112
113                 //OutputFormat:json or OutputFormat: json
114                 if (line.GetLength() > col_index + 1)
115                         params = line.SubStr(col_index + 1).Trim();
116
117                 if (header == "ResponseHeader")
118                         m_ResponseHeader = params;
119                 else if (header == "OutputFormat")
120                         m_OutputFormat = params;
121                 else if (header == "KeepAlive")
122                         m_KeepAlive = (params == "on");
123                 else if (header == "Columns") {
124                         m_ColumnHeaders = false; // Might be explicitly re-enabled later on
125                         boost::algorithm::split(m_Columns, params, boost::is_any_of(" "));
126                 } else if (header == "Separators") {
127                         std::vector<String> separators;
128
129                         boost::algorithm::split(separators, params, boost::is_any_of(" "));
130                         /* ugly ascii long to char conversion, but works */
131                         if (separators.size() > 0)
132                                 m_Separators[0] = String(1, static_cast<char>(Convert::ToLong(separators[0])));
133                         if (separators.size() > 1)
134                                 m_Separators[1] = String(1, static_cast<char>(Convert::ToLong(separators[1])));
135                         if (separators.size() > 2)
136                                 m_Separators[2] = String(1, static_cast<char>(Convert::ToLong(separators[2])));
137                         if (separators.size() > 3)
138                                 m_Separators[3] = String(1, static_cast<char>(Convert::ToLong(separators[3])));
139                 } else if (header == "ColumnHeaders")
140                         m_ColumnHeaders = (params == "on");
141                 else if (header == "Limit")
142                         m_Limit = Convert::ToLong(params);
143                 else if (header == "Filter") {
144                         Filter::Ptr filter = ParseFilter(params, m_LogTimeFrom, m_LogTimeUntil);
145
146                         if (!filter) {
147                                 m_Verb = "ERROR";
148                                 m_ErrorCode = LivestatusErrorQuery;
149                                 m_ErrorMessage = "Invalid filter specification: " + line;
150                                 return;
151                         }
152
153                         filters.push_back(filter);
154                 } else if (header == "Stats") {
155                         m_ColumnHeaders = false; // Might be explicitly re-enabled later on
156
157                         std::vector<String> tokens;
158                         boost::algorithm::split(tokens, params, boost::is_any_of(" "));
159
160                         if (tokens.size() < 2) {
161                                 m_Verb = "ERROR";
162                                 m_ErrorCode = LivestatusErrorQuery;
163                                 m_ErrorMessage = "Missing aggregator column name: " + line;
164                                 return;
165                         }
166
167                         String aggregate_arg = tokens[0];
168                         String aggregate_attr = tokens[1];
169
170                         Aggregator::Ptr aggregator;
171                         Filter::Ptr filter;
172
173                         if (aggregate_arg == "sum") {
174                                 aggregator = new SumAggregator(aggregate_attr);
175                         } else if (aggregate_arg == "min") {
176                                 aggregator = new MinAggregator(aggregate_attr);
177                         } else if (aggregate_arg == "max") {
178                                 aggregator = new MaxAggregator(aggregate_attr);
179                         } else if (aggregate_arg == "avg") {
180                                 aggregator = new AvgAggregator(aggregate_attr);
181                         } else if (aggregate_arg == "std") {
182                                 aggregator = new StdAggregator(aggregate_attr);
183                         } else if (aggregate_arg == "suminv") {
184                                 aggregator = new InvSumAggregator(aggregate_attr);
185                         } else if (aggregate_arg == "avginv") {
186                                 aggregator = new InvAvgAggregator(aggregate_attr);
187                         } else {
188                                 filter = ParseFilter(params, m_LogTimeFrom, m_LogTimeUntil);
189
190                                 if (!filter) {
191                                         m_Verb = "ERROR";
192                                         m_ErrorCode = LivestatusErrorQuery;
193                                         m_ErrorMessage = "Invalid filter specification: " + line;
194                                         return;
195                                 }
196
197                                 aggregator = new CountAggregator();
198                         }
199
200                         aggregator->SetFilter(filter);
201                         aggregators.push_back(aggregator);
202
203                         stats.push_back(filter);
204                 } else if (header == "Or" || header == "And" || header == "StatsOr" || header == "StatsAnd") {
205                         std::deque<Filter::Ptr>& deq = (header == "Or" || header == "And") ? filters : stats;
206
207                         unsigned int num = Convert::ToLong(params);
208                         CombinerFilter::Ptr filter;
209
210                         if (header == "Or" || header == "StatsOr") {
211                                 filter = new OrFilter();
212                                 Log(LogDebug, "LivestatusQuery")
213                                     << "Add OR filter for " << params << " column(s). " << deq.size() << " filters available.";
214                         } else {
215                                 filter = new AndFilter();
216                                 Log(LogDebug, "LivestatusQuery")
217                                     << "Add AND filter for " << params << " column(s). " << deq.size() << " filters available.";
218                         }
219
220                         if (num > deq.size()) {
221                                 m_Verb = "ERROR";
222                                 m_ErrorCode = 451;
223                                 m_ErrorMessage = "Or/StatsOr is referencing " + Convert::ToString(num) + " filters; stack only contains " + Convert::ToString(static_cast<long>(deq.size())) + " filters";
224                                 return;
225                         }
226
227                         while (num > 0 && num--) {
228                                 filter->AddSubFilter(deq.back());
229                                 Log(LogDebug, "LivestatusQuery")
230                                     << "Add " << num << " filter.";
231                                 deq.pop_back();
232                                 if (&deq == &stats)
233                                         aggregators.pop_back();
234                         }
235
236                         deq.push_back(filter);
237                         if (&deq == &stats) {
238                                 Aggregator::Ptr aggregator = new CountAggregator();
239                                 aggregator->SetFilter(filter);
240                                 aggregators.push_back(aggregator);
241                         }
242                 } else if (header == "Negate" || header == "StatsNegate") {
243                         std::deque<Filter::Ptr>& deq = (header == "Negate") ? filters : stats;
244
245                         if (deq.empty()) {
246                                 m_Verb = "ERROR";
247                                 m_ErrorCode = 451;
248                                 m_ErrorMessage = "Negate/StatsNegate used, however the filter stack is empty";
249                                 return;
250                         }
251
252                         Filter::Ptr filter = deq.back();
253                         deq.pop_back();
254
255                         if (!filter) {
256                                 m_Verb = "ERROR";
257                                 m_ErrorCode = 451;
258                                 m_ErrorMessage = "Negate/StatsNegate used, however last stats doesn't have a filter";
259                                 return;
260                         }
261
262                         deq.push_back(new NegateFilter(filter));
263
264                         if (deq == stats) {
265                                 Aggregator::Ptr aggregator = aggregators.back();
266                                 aggregator->SetFilter(filter);
267                         }
268                 }
269         }
270
271         /* Combine all top-level filters into a single filter. */
272         AndFilter::Ptr top_filter = new AndFilter();
273
274         for (const Filter::Ptr& filter : filters) {
275                 top_filter->AddSubFilter(filter);
276         }
277
278         m_Filter = top_filter;
279         m_Aggregators.swap(aggregators);
280 }
281
282 int LivestatusQuery::GetExternalCommands(void)
283 {
284         boost::mutex::scoped_lock lock(l_QueryMutex);
285
286         return l_ExternalCommands;
287 }
288
289 Filter::Ptr LivestatusQuery::ParseFilter(const String& params, unsigned long& from, unsigned long& until)
290 {
291         /*
292          * time >= 1382696656
293          * type = SERVICE FLAPPING ALERT
294          */
295         std::vector<String> tokens;
296         size_t sp_index;
297         String temp_buffer = params;
298
299         /* extract attr and op */
300         for (int i = 0; i < 2; i++) {
301                 sp_index = temp_buffer.FindFirstOf(" ");
302
303                 /* check if this is the last argument */
304                 if (sp_index == String::NPos) {
305                         /* 'attr op' or 'attr op val' is valid */
306                         if (i < 1)
307                                 BOOST_THROW_EXCEPTION(std::runtime_error("Livestatus filter '" + params + "' does not contain all required fields."));
308
309                         break;
310                 }
311
312                 tokens.push_back(temp_buffer.SubStr(0, sp_index));
313                 temp_buffer = temp_buffer.SubStr(sp_index + 1);
314         }
315
316         /* add the rest as value */
317         tokens.push_back(temp_buffer);
318
319         if (tokens.size() == 2)
320                 tokens.push_back("");
321
322         if (tokens.size() < 3)
323                 return Filter::Ptr();
324
325         bool negate = false;
326         String attr = tokens[0];
327         String op = tokens[1];
328         String val = tokens[2];
329
330         if (op == "!=") {
331                 op = "=";
332                 negate = true;
333         } else if (op == "!~") {
334                 op = "~";
335                 negate = true;
336         } else if (op == "!=~") {
337                 op = "=~";
338                 negate = true;
339         } else if (op == "!~~") {
340                 op = "~~";
341                 negate = true;
342         }
343
344         Filter::Ptr filter = new AttributeFilter(attr, op, val);
345
346         if (negate)
347                 filter = new NegateFilter(filter);
348
349         /* pre-filter log time duration */
350         if (attr == "time") {
351                 if (op == "<" || op == "<=") {
352                         until = Convert::ToLong(val);
353                 } else if (op == ">" || op == ">=") {
354                         from = Convert::ToLong(val);
355                 }
356         }
357
358         Log(LogDebug, "LivestatusQuery")
359             << "Parsed filter with attr: '" << attr << "' op: '" << op << "' val: '" << val << "'.";
360
361         return filter;
362 }
363
364 void LivestatusQuery::BeginResultSet(std::ostream& fp) const
365 {
366         if (m_OutputFormat == "json" || m_OutputFormat == "python")
367                 fp << "[";
368 }
369
370 void LivestatusQuery::EndResultSet(std::ostream& fp) const
371 {
372         if (m_OutputFormat == "json" || m_OutputFormat == "python")
373                 fp << "]";
374 }
375
376 void LivestatusQuery::AppendResultRow(std::ostream& fp, const Array::Ptr& row, bool& first_row) const
377 {
378         if (m_OutputFormat == "csv") {
379                 bool first = true;
380
381                 ObjectLock rlock(row);
382                 for (const Value& value : row) {
383                         if (first)
384                                 first = false;
385                         else
386                                 fp << m_Separators[1];
387
388                         if (value.IsObjectType<Array>())
389                                 PrintCsvArray(fp, value, 0);
390                         else
391                                 fp << value;
392                 }
393
394                 fp << m_Separators[0];
395         } else if (m_OutputFormat == "json") {
396                 if (!first_row)
397                         fp << ", ";
398
399                 fp << JsonEncode(row);
400         } else if (m_OutputFormat == "python") {
401                 if (!first_row)
402                         fp << ", ";
403
404                 PrintPythonArray(fp, row);
405         }
406
407         first_row = false;
408 }
409
410 void LivestatusQuery::PrintCsvArray(std::ostream& fp, const Array::Ptr& array, int level) const
411 {
412         bool first = true;
413
414         ObjectLock olock(array);
415         for (const Value& value : array) {
416                 if (first)
417                         first = false;
418                 else
419                         fp << ((level == 0) ? m_Separators[2] : m_Separators[3]);
420
421                 if (value.IsObjectType<Array>())
422                         PrintCsvArray(fp, value, level + 1);
423                 else if (value.IsBoolean())
424                         fp << Convert::ToLong(value);
425                 else
426                         fp << value;
427         }
428 }
429
430 void LivestatusQuery::PrintPythonArray(std::ostream& fp, const Array::Ptr& rs) const
431 {
432         fp << "[ ";
433
434         bool first = true;
435
436         for (const Value& value : rs) {
437                 if (first)
438                         first = false;
439                 else
440                         fp << ", ";
441
442                 if (value.IsObjectType<Array>())
443                         PrintPythonArray(fp, value);
444                 else if (value.IsNumber())
445                         fp << value;
446                 else
447                         fp << QuoteStringPython(value);
448         }
449
450         fp << " ]";
451 }
452
453 String LivestatusQuery::QuoteStringPython(const String& str) {
454         String result = str;
455         boost::algorithm::replace_all(result, "\"", "\\\"");
456         return "r\"" + result + "\"";
457 }
458
459 void LivestatusQuery::ExecuteGetHelper(const Stream::Ptr& stream)
460 {
461         Log(LogNotice, "LivestatusQuery")
462             << "Table: " << m_Table;
463
464         Table::Ptr table = Table::GetByName(m_Table, m_CompatLogPath, m_LogTimeFrom, m_LogTimeUntil);
465
466         if (!table) {
467                 SendResponse(stream, LivestatusErrorNotFound, "Table '" + m_Table + "' does not exist.");
468
469                 return;
470         }
471
472         std::vector<LivestatusRowValue> objects = table->FilterRows(m_Filter, m_Limit);
473         std::vector<String> columns;
474
475         if (m_Columns.size() > 0)
476                 columns = m_Columns;
477         else
478                 columns = table->GetColumnNames();
479
480         std::ostringstream result;
481         bool first_row = true;
482         BeginResultSet(result);
483
484         if (m_Aggregators.empty()) {
485                 Array::Ptr header = new Array();
486
487                 typedef std::pair<String, Column> ColumnPair;
488
489                 std::vector<ColumnPair> column_objs;
490                 column_objs.reserve(columns.size());
491
492                 for (const String& columnName : columns)
493                         column_objs.push_back(std::make_pair(columnName, table->GetColumn(columnName)));
494
495                 for (const LivestatusRowValue& object : objects) {
496                         Array::Ptr row = new Array();
497
498                         row->Reserve(column_objs.size());
499
500                         for (const ColumnPair& cv : column_objs) {
501                                 if (m_ColumnHeaders)
502                                         header->Add(cv.first);
503
504                                 row->Add(cv.second.ExtractValue(object.Row, object.GroupByType, object.GroupByObject));
505                         }
506
507                         if (m_ColumnHeaders) {
508                                 AppendResultRow(result, header, first_row);
509                                 m_ColumnHeaders = false;
510                         }
511
512                         AppendResultRow(result, row, first_row);
513                 }
514         } else {
515                 std::map<std::vector<Value>, std::vector<AggregatorState *> > allStats;
516
517                 /* add aggregated stats */
518                 for (const LivestatusRowValue& object : objects) {
519                         std::vector<Value> statsKey;
520
521                         for (const String& columnName : m_Columns) {
522                                 Column column = table->GetColumn(columnName);
523                                 statsKey.push_back(column.ExtractValue(object.Row, object.GroupByType, object.GroupByObject));
524                         }
525
526                         auto it = allStats.find(statsKey);
527
528                         if (it == allStats.end()) {
529                                 std::vector<AggregatorState *> newStats(m_Aggregators.size(), NULL);
530                                 it = allStats.insert(std::make_pair(statsKey, newStats)).first;
531                         }
532
533                         auto& stats = it->second;
534
535                         int index = 0;
536
537                         for (const Aggregator::Ptr aggregator : m_Aggregators) {
538                                 aggregator->Apply(table, object.Row, &stats[index]);
539                                 index++;
540                         }
541                 }
542
543                 /* add column headers both for raw and aggregated data */
544                 if (m_ColumnHeaders) {
545                         Array::Ptr header = new Array();
546
547                         for (const String& columnName : m_Columns) {
548                                 header->Add(columnName);
549                         }
550
551                         for (size_t i = 1; i <= m_Aggregators.size(); i++) {
552                                 header->Add("stats_" + Convert::ToString(i));
553                         }
554
555                         AppendResultRow(result, header, first_row);
556                 }
557
558                 for (const auto& kv : allStats) {
559                         Array::Ptr row = new Array();
560
561                         row->Reserve(m_Columns.size() + m_Aggregators.size());
562
563                         for (const Value& keyPart : kv.first) {
564                                 row->Add(keyPart);
565                         }
566
567                         auto& stats = kv.second;
568
569                         for (size_t i = 0; i < m_Aggregators.size(); i++)
570                                 row->Add(m_Aggregators[i]->GetResultAndFreeState(stats[i]));
571
572                         AppendResultRow(result, row, first_row);
573                 }
574         }
575
576         EndResultSet(result);
577
578         SendResponse(stream, LivestatusErrorOK, result.str());
579 }
580
581 void LivestatusQuery::ExecuteCommandHelper(const Stream::Ptr& stream)
582 {
583         {
584                 boost::mutex::scoped_lock lock(l_QueryMutex);
585
586                 l_ExternalCommands++;
587         }
588
589         Log(LogNotice, "LivestatusQuery")
590             << "Executing command: " << m_Command;
591         ExternalCommandProcessor::Execute(m_Command);
592         SendResponse(stream, LivestatusErrorOK, "");
593 }
594
595 void LivestatusQuery::ExecuteErrorHelper(const Stream::Ptr& stream)
596 {
597         Log(LogDebug, "LivestatusQuery")
598             << "ERROR: Code: '" << m_ErrorCode << "' Message: '" << m_ErrorMessage << "'.";
599         SendResponse(stream, m_ErrorCode, m_ErrorMessage);
600 }
601
602 void LivestatusQuery::SendResponse(const Stream::Ptr& stream, int code, const String& data)
603 {
604         if (m_ResponseHeader == "fixed16")
605                 PrintFixed16(stream, code, data);
606
607         if (m_ResponseHeader == "fixed16" || code == LivestatusErrorOK) {
608                 try {
609                         stream->Write(data.CStr(), data.GetLength());
610                 } catch (const std::exception&) {
611                         Log(LogCritical, "LivestatusQuery", "Cannot write query response to socket.");
612                 }
613         }
614 }
615
616 void LivestatusQuery::PrintFixed16(const Stream::Ptr& stream, int code, const String& data)
617 {
618         ASSERT(code >= 100 && code <= 999);
619
620         String sCode = Convert::ToString(code);
621         String sLength = Convert::ToString(static_cast<long>(data.GetLength()));
622
623         String header = sCode + String(16 - 3 - sLength.GetLength() - 1, ' ') + sLength + m_Separators[0];
624
625         try {
626                 stream->Write(header.CStr(), header.GetLength());
627         } catch (const std::exception&) {
628                 Log(LogCritical, "LivestatusQuery", "Cannot write to TCP socket.");
629         }
630 }
631
632 bool LivestatusQuery::Execute(const Stream::Ptr& stream)
633 {
634         try {
635                 Log(LogNotice, "LivestatusQuery")
636                     << "Executing livestatus query: " << m_Verb;
637
638                 if (m_Verb == "GET")
639                         ExecuteGetHelper(stream);
640                 else if (m_Verb == "COMMAND")
641                         ExecuteCommandHelper(stream);
642                 else if (m_Verb == "ERROR")
643                         ExecuteErrorHelper(stream);
644                 else
645                         BOOST_THROW_EXCEPTION(std::runtime_error("Invalid livestatus query verb."));
646         } catch (const std::exception& ex) {
647                 SendResponse(stream, LivestatusErrorQuery, DiagnosticInformation(ex));
648         }
649
650         if (!m_KeepAlive) {
651                 stream->Close();
652                 return false;
653         }
654
655         return true;
656 }