]> granicus.if.org Git - icinga2/blob - lib/livestatus/livestatusquery.cpp
Update copyright headers for 2016
[icinga2] / lib / livestatus / livestatusquery.cpp
1 /******************************************************************************
2  * Icinga 2                                                                   *
3  * Copyright (C) 2012-2016 Icinga Development Team (https://www.icinga.org/)  *
4  *                                                                            *
5  * This program is free software; you can redistribute it and/or              *
6  * modify it under the terms of the GNU General Public License                *
7  * as published by the Free Software Foundation; either version 2             *
8  * of the License, or (at your option) any later version.                     *
9  *                                                                            *
10  * This program is distributed in the hope that it will be useful,            *
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of             *
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the              *
13  * GNU General Public License for more details.                               *
14  *                                                                            *
15  * You should have received a copy of the GNU General Public License          *
16  * along with this program; if not, write to the Free Software Foundation     *
17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA.             *
18  ******************************************************************************/
19
20 #include "livestatus/livestatusquery.hpp"
21 #include "livestatus/countaggregator.hpp"
22 #include "livestatus/sumaggregator.hpp"
23 #include "livestatus/minaggregator.hpp"
24 #include "livestatus/maxaggregator.hpp"
25 #include "livestatus/avgaggregator.hpp"
26 #include "livestatus/stdaggregator.hpp"
27 #include "livestatus/invsumaggregator.hpp"
28 #include "livestatus/invavgaggregator.hpp"
29 #include "livestatus/attributefilter.hpp"
30 #include "livestatus/negatefilter.hpp"
31 #include "livestatus/orfilter.hpp"
32 #include "livestatus/andfilter.hpp"
33 #include "icinga/externalcommandprocessor.hpp"
34 #include "base/debug.hpp"
35 #include "base/convert.hpp"
36 #include "base/objectlock.hpp"
37 #include "base/logger.hpp"
38 #include "base/exception.hpp"
39 #include "base/utility.hpp"
40 #include "base/json.hpp"
41 #include "base/serializer.hpp"
42 #include "base/timer.hpp"
43 #include "base/initialize.hpp"
44 #include <boost/algorithm/string/classification.hpp>
45 #include <boost/foreach.hpp>
46 #include <boost/algorithm/string/replace.hpp>
47 #include <boost/algorithm/string/split.hpp>
48 #include <boost/algorithm/string/join.hpp>
49
50 using namespace icinga;
51
52 static int l_ExternalCommands = 0;
53 static boost::mutex l_QueryMutex;
54
55 LivestatusQuery::LivestatusQuery(const std::vector<String>& lines, const String& compat_log_path)
56         : m_KeepAlive(false), m_OutputFormat("csv"), m_ColumnHeaders(true), m_Limit(-1), m_ErrorCode(0),
57           m_LogTimeFrom(0), m_LogTimeUntil(static_cast<long>(Utility::GetTime()))
58 {
59         if (lines.size() == 0) {
60                 m_Verb = "ERROR";
61                 m_ErrorCode = LivestatusErrorQuery;
62                 m_ErrorMessage = "Empty Query. Aborting.";
63                 return;
64         }
65
66         String msg;
67         BOOST_FOREACH(const String& line, lines) {
68                 msg += line + "\n";
69         }
70         Log(LogDebug, "LivestatusQuery", msg);
71
72         m_CompatLogPath = compat_log_path;
73
74         /* default separators */
75         m_Separators.push_back("\n");
76         m_Separators.push_back(";");
77         m_Separators.push_back(",");
78         m_Separators.push_back("|");
79
80         String line = lines[0];
81
82         size_t sp_index = line.FindFirstOf(" ");
83
84         if (sp_index == String::NPos)
85                 BOOST_THROW_EXCEPTION(std::runtime_error("Livestatus header must contain a verb."));
86
87         String verb = line.SubStr(0, sp_index);
88         String target = line.SubStr(sp_index + 1);
89
90         m_Verb = verb;
91
92         if (m_Verb == "COMMAND") {
93                 m_KeepAlive = true;
94                 m_Command = target;
95         } else if (m_Verb == "GET") {
96                 m_Table = target;
97         } else {
98                 m_Verb = "ERROR";
99                 m_ErrorCode = LivestatusErrorQuery;
100                 m_ErrorMessage = "Unknown livestatus verb: " + m_Verb;
101                 return;
102         }
103
104         std::deque<Filter::Ptr> filters, stats;
105         std::deque<Aggregator::Ptr> aggregators;
106
107         for (unsigned int i = 1; i < lines.size(); i++) {
108                 line = lines[i];
109
110                 size_t col_index = line.FindFirstOf(":");
111                 String header = line.SubStr(0, col_index);
112                 String params;
113
114                 //OutputFormat:json or OutputFormat: json
115                 if (line.GetLength() > col_index + 1)
116                         params = line.SubStr(col_index + 1).Trim();
117
118                 if (header == "ResponseHeader")
119                         m_ResponseHeader = params;
120                 else if (header == "OutputFormat")
121                         m_OutputFormat = params;
122                 else if (header == "KeepAlive")
123                         m_KeepAlive = (params == "on");
124                 else if (header == "Columns") {
125                         m_ColumnHeaders = false; // Might be explicitly re-enabled later on
126                         boost::algorithm::split(m_Columns, params, boost::is_any_of(" "));
127                 } else if (header == "Separators") {
128                         std::vector<String> separators;
129
130                         boost::algorithm::split(separators, params, boost::is_any_of(" "));
131                         /* ugly ascii long to char conversion, but works */
132                         if (separators.size() > 0)
133                                 m_Separators[0] = String(1, static_cast<char>(Convert::ToLong(separators[0])));
134                         if (separators.size() > 1)
135                                 m_Separators[1] = String(1, static_cast<char>(Convert::ToLong(separators[1])));
136                         if (separators.size() > 2)
137                                 m_Separators[2] = String(1, static_cast<char>(Convert::ToLong(separators[2])));
138                         if (separators.size() > 3)
139                                 m_Separators[3] = String(1, static_cast<char>(Convert::ToLong(separators[3])));
140                 } else if (header == "ColumnHeaders")
141                         m_ColumnHeaders = (params == "on");
142                 else if (header == "Limit")
143                         m_Limit = Convert::ToLong(params);
144                 else if (header == "Filter") {
145                         Filter::Ptr filter = ParseFilter(params, m_LogTimeFrom, m_LogTimeUntil);
146
147                         if (!filter) {
148                                 m_Verb = "ERROR";
149                                 m_ErrorCode = LivestatusErrorQuery;
150                                 m_ErrorMessage = "Invalid filter specification: " + line;
151                                 return;
152                         }
153
154                         filters.push_back(filter);
155                 } else if (header == "Stats") {
156                         m_ColumnHeaders = false; // Might be explicitly re-enabled later on
157
158                         std::vector<String> tokens;
159                         boost::algorithm::split(tokens, params, boost::is_any_of(" "));
160
161                         if (tokens.size() < 2) {
162                                 m_Verb = "ERROR";
163                                 m_ErrorCode = LivestatusErrorQuery;
164                                 m_ErrorMessage = "Missing aggregator column name: " + line;
165                                 return;
166                         }
167
168                         String aggregate_arg = tokens[0];
169                         String aggregate_attr = tokens[1];
170
171                         Aggregator::Ptr aggregator;
172                         Filter::Ptr filter;
173
174                         if (aggregate_arg == "sum") {
175                                 aggregator = new SumAggregator(aggregate_attr);
176                         } else if (aggregate_arg == "min") {
177                                 aggregator = new MinAggregator(aggregate_attr);
178                         } else if (aggregate_arg == "max") {
179                                 aggregator = new MaxAggregator(aggregate_attr);
180                         } else if (aggregate_arg == "avg") {
181                                 aggregator = new AvgAggregator(aggregate_attr);
182                         } else if (aggregate_arg == "std") {
183                                 aggregator = new StdAggregator(aggregate_attr);
184                         } else if (aggregate_arg == "suminv") {
185                                 aggregator = new InvSumAggregator(aggregate_attr);
186                         } else if (aggregate_arg == "avginv") {
187                                 aggregator = new InvAvgAggregator(aggregate_attr);
188                         } else {
189                                 filter = ParseFilter(params, m_LogTimeFrom, m_LogTimeUntil);
190
191                                 if (!filter) {
192                                         m_Verb = "ERROR";
193                                         m_ErrorCode = LivestatusErrorQuery;
194                                         m_ErrorMessage = "Invalid filter specification: " + line;
195                                         return;
196                                 }
197
198                                 aggregator = new CountAggregator();
199                         }
200
201                         aggregator->SetFilter(filter);
202                         aggregators.push_back(aggregator);
203
204                         stats.push_back(filter);
205                 } else if (header == "Or" || header == "And" || header == "StatsOr" || header == "StatsAnd") {
206                         std::deque<Filter::Ptr>& deq = (header == "Or" || header == "And") ? filters : stats;
207
208                         unsigned int num = Convert::ToLong(params);
209                         CombinerFilter::Ptr filter;
210
211                         if (header == "Or" || header == "StatsOr") {
212                                 filter = new OrFilter();
213                                 Log(LogDebug, "LivestatusQuery")
214                                     << "Add OR filter for " << params << " column(s). " << deq.size() << " filters available.";
215                         } else {
216                                 filter = new AndFilter();
217                                 Log(LogDebug, "LivestatusQuery")
218                                     << "Add AND filter for " << params << " column(s). " << deq.size() << " filters available.";
219                         }
220
221                         if (num > deq.size()) {
222                                 m_Verb = "ERROR";
223                                 m_ErrorCode = 451;
224                                 m_ErrorMessage = "Or/StatsOr is referencing " + Convert::ToString(num) + " filters; stack only contains " + Convert::ToString(static_cast<long>(deq.size())) + " filters";
225                                 return;
226                         }
227
228                         while (num > 0 && num--) {
229                                 filter->AddSubFilter(deq.back());
230                                 Log(LogDebug, "LivestatusQuery")
231                                     << "Add " << num << " filter.";
232                                 deq.pop_back();
233                                 if (&deq == &stats)
234                                         aggregators.pop_back();
235                         }
236
237                         deq.push_back(filter);
238                         if (&deq == &stats) {
239                                 Aggregator::Ptr aggregator = new CountAggregator();
240                                 aggregator->SetFilter(filter);
241                                 aggregators.push_back(aggregator);
242                         }
243                 } else if (header == "Negate" || header == "StatsNegate") {
244                         std::deque<Filter::Ptr>& deq = (header == "Negate") ? filters : stats;
245
246                         if (deq.empty()) {
247                                 m_Verb = "ERROR";
248                                 m_ErrorCode = 451;
249                                 m_ErrorMessage = "Negate/StatsNegate used, however the filter stack is empty";
250                                 return;
251                         }
252
253                         Filter::Ptr filter = deq.back();
254                         deq.pop_back();
255
256                         if (!filter) {
257                                 m_Verb = "ERROR";
258                                 m_ErrorCode = 451;
259                                 m_ErrorMessage = "Negate/StatsNegate used, however last stats doesn't have a filter";
260                                 return;
261                         }
262
263                         deq.push_back(new NegateFilter(filter));
264
265                         if (deq == stats) {
266                                 Aggregator::Ptr aggregator = aggregators.back();
267                                 aggregator->SetFilter(filter);
268                         }
269                 }
270         }
271
272         /* Combine all top-level filters into a single filter. */
273         AndFilter::Ptr top_filter = new AndFilter();
274
275         BOOST_FOREACH(const Filter::Ptr& filter, filters) {
276                 top_filter->AddSubFilter(filter);
277         }
278
279         m_Filter = top_filter;
280         m_Aggregators.swap(aggregators);
281 }
282
283 int LivestatusQuery::GetExternalCommands(void)
284 {
285         boost::mutex::scoped_lock lock(l_QueryMutex);
286
287         return l_ExternalCommands;
288 }
289
290 Filter::Ptr LivestatusQuery::ParseFilter(const String& params, unsigned long& from, unsigned long& until)
291 {
292         /*
293          * time >= 1382696656
294          * type = SERVICE FLAPPING ALERT
295          */
296         std::vector<String> tokens;
297         size_t sp_index;
298         String temp_buffer = params;
299
300         /* extract attr and op */
301         for (int i = 0; i < 2; i++) {
302                 sp_index = temp_buffer.FindFirstOf(" ");
303
304                 /* check if this is the last argument */
305                 if (sp_index == String::NPos) {
306                         /* 'attr op' or 'attr op val' is valid */
307                         if (i < 1)
308                                 BOOST_THROW_EXCEPTION(std::runtime_error("Livestatus filter '" + params + "' does not contain all required fields."));
309
310                         break;
311                 }
312
313                 tokens.push_back(temp_buffer.SubStr(0, sp_index));
314                 temp_buffer = temp_buffer.SubStr(sp_index + 1);
315         }
316
317         /* add the rest as value */
318         tokens.push_back(temp_buffer);
319
320         if (tokens.size() == 2)
321                 tokens.push_back("");
322
323         if (tokens.size() < 3)
324                 return Filter::Ptr();
325
326         bool negate = false;
327         String attr = tokens[0];
328         String op = tokens[1];
329         String val = tokens[2];
330
331         if (op == "!=") {
332                 op = "=";
333                 negate = true;
334         } else if (op == "!~") {
335                 op = "~";
336                 negate = true;
337         } else if (op == "!=~") {
338                 op = "=~";
339                 negate = true;
340         } else if (op == "!~~") {
341                 op = "~~";
342                 negate = true;
343         }
344
345         Filter::Ptr filter = new AttributeFilter(attr, op, val);
346
347         if (negate)
348                 filter = new NegateFilter(filter);
349
350         /* pre-filter log time duration */
351         if (attr == "time") {
352                 if (op == "<" || op == "<=") {
353                         until = Convert::ToLong(val);
354                 } else if (op == ">" || op == ">=") {
355                         from = Convert::ToLong(val);
356                 }
357         }
358
359         Log(LogDebug, "LivestatusQuery")
360             << "Parsed filter with attr: '" << attr << "' op: '" << op << "' val: '" << val << "'.";
361
362         return filter;
363 }
364
365 void LivestatusQuery::BeginResultSet(std::ostream& fp) const
366 {
367         if (m_OutputFormat == "json" || m_OutputFormat == "python")
368                 fp << "[";
369 }
370
371 void LivestatusQuery::EndResultSet(std::ostream& fp) const
372 {
373         if (m_OutputFormat == "json" || m_OutputFormat == "python")
374                 fp << "]";
375 }
376
377 void LivestatusQuery::AppendResultRow(std::ostream& fp, const Array::Ptr& row, bool& first_row) const
378 {
379         if (m_OutputFormat == "csv") {
380                 bool first = true;
381
382                 ObjectLock rlock(row);
383                 BOOST_FOREACH(const Value& value, row) {
384                         if (first)
385                                 first = false;
386                         else
387                                 fp << m_Separators[1];
388
389                         if (value.IsObjectType<Array>())
390                                 PrintCsvArray(fp, value, 0);
391                         else
392                                 fp << value;
393                 }
394
395                 fp << m_Separators[0];
396         } else if (m_OutputFormat == "json") {
397                 if (!first_row)
398                         fp << ", ";
399
400                 fp << JsonEncode(row);
401         } else if (m_OutputFormat == "python") {
402                 if (!first_row)
403                         fp << ", ";
404
405                 PrintPythonArray(fp, row);
406         }
407
408         first_row = false;
409 }
410
411 void LivestatusQuery::PrintCsvArray(std::ostream& fp, const Array::Ptr& array, int level) const
412 {
413         bool first = true;
414
415         ObjectLock olock(array);
416         BOOST_FOREACH(const Value& value, array) {
417                 if (first)
418                         first = false;
419                 else
420                         fp << ((level == 0) ? m_Separators[2] : m_Separators[3]);
421
422                 if (value.IsObjectType<Array>())
423                         PrintCsvArray(fp, value, level + 1);
424                 else if (value.IsBoolean())
425                         fp << Convert::ToLong(value);
426                 else
427                         fp << value;
428         }
429 }
430
431 void LivestatusQuery::PrintPythonArray(std::ostream& fp, const Array::Ptr& rs) const
432 {
433         fp << "[ ";
434
435         bool first = true;
436
437         BOOST_FOREACH(const Value& value, rs) {
438                 if (first)
439                         first = false;
440                 else
441                         fp << ", ";
442
443                 if (value.IsObjectType<Array>())
444                         PrintPythonArray(fp, value);
445                 else if (value.IsNumber())
446                         fp << value;
447                 else
448                         fp << QuoteStringPython(value);
449         }
450
451         fp << " ]";
452 }
453
454 String LivestatusQuery::QuoteStringPython(const String& str) {
455         String result = str;
456         boost::algorithm::replace_all(result, "\"", "\\\"");
457         return "r\"" + result + "\"";
458 }
459
460 void LivestatusQuery::ExecuteGetHelper(const Stream::Ptr& stream)
461 {
462         Log(LogNotice, "LivestatusQuery")
463             << "Table: " << m_Table;
464
465         Table::Ptr table = Table::GetByName(m_Table, m_CompatLogPath, m_LogTimeFrom, m_LogTimeUntil);
466
467         if (!table) {
468                 SendResponse(stream, LivestatusErrorNotFound, "Table '" + m_Table + "' does not exist.");
469
470                 return;
471         }
472
473         std::vector<LivestatusRowValue> objects = table->FilterRows(m_Filter, m_Limit);
474         std::vector<String> columns;
475
476         if (m_Columns.size() > 0)
477                 columns = m_Columns;
478         else
479                 columns = table->GetColumnNames();
480
481         std::ostringstream result;
482         bool first_row = true;
483         BeginResultSet(result);
484
485         if (m_Aggregators.empty()) {
486                 Array::Ptr header = new Array();
487
488                 typedef std::pair<String, Column> ColumnPair;
489
490                 std::vector<ColumnPair> column_objs;
491                 column_objs.reserve(columns.size());
492
493                 BOOST_FOREACH(const String& columnName, columns)
494                         column_objs.push_back(std::make_pair(columnName, table->GetColumn(columnName)));
495
496                 BOOST_FOREACH(const LivestatusRowValue& object, objects) {
497                         Array::Ptr row = new Array();
498
499                         row->Reserve(column_objs.size());
500
501                         BOOST_FOREACH(const ColumnPair& cv, column_objs) {
502                                 if (m_ColumnHeaders)
503                                         header->Add(cv.first);
504
505                                 row->Add(cv.second.ExtractValue(object.Row, object.GroupByType, object.GroupByObject));
506                         }
507
508                         if (m_ColumnHeaders) {
509                                 AppendResultRow(result, header, first_row);
510                                 m_ColumnHeaders = false;
511                         }
512
513                         AppendResultRow(result, row, first_row);
514                 }
515         } else {
516                 std::vector<double> stats(m_Aggregators.size(), 0);
517                 int index = 0;
518
519                 /* add aggregated stats */
520                 BOOST_FOREACH(const Aggregator::Ptr aggregator, m_Aggregators) {
521                         BOOST_FOREACH(const LivestatusRowValue& object, objects) {
522                                 aggregator->Apply(table, object.Row);
523                         }
524
525                         stats[index] = aggregator->GetResult();
526                         index++;
527                 }
528
529                 /* add column headers both for raw and aggregated data */
530                 if (m_ColumnHeaders) {
531                         Array::Ptr header = new Array();
532
533                         BOOST_FOREACH(const String& columnName, m_Columns) {
534                                 header->Add(columnName);
535                         }
536
537                         for (size_t i = 1; i <= m_Aggregators.size(); i++) {
538                                 header->Add("stats_" + Convert::ToString(i));
539                         }
540
541                         AppendResultRow(result, header, first_row);
542                 }
543
544                 Array::Ptr row = new Array();
545
546                 row->Reserve(m_Columns.size() + m_Aggregators.size());
547
548                 /*
549                  * add selected columns next to stats
550                  * may not be accurate for grouping!
551                  */
552                 if (objects.size() > 0 && m_Columns.size() > 0) {
553                         BOOST_FOREACH(const String& columnName, m_Columns) {
554                                 Column column = table->GetColumn(columnName);
555
556                                 LivestatusRowValue object = objects[0]; //first object wins
557
558                                 row->Add(column.ExtractValue(object.Row, object.GroupByType, object.GroupByObject));
559                         }
560                 }
561
562                 for (size_t i = 0; i < m_Aggregators.size(); i++)
563                         row->Add(stats[i]);
564
565                 AppendResultRow(result, row, first_row);
566         }
567
568         EndResultSet(result);
569
570         SendResponse(stream, LivestatusErrorOK, result.str());
571 }
572
573 void LivestatusQuery::ExecuteCommandHelper(const Stream::Ptr& stream)
574 {
575         {
576                 boost::mutex::scoped_lock lock(l_QueryMutex);
577
578                 l_ExternalCommands++;
579         }
580
581         Log(LogNotice, "LivestatusQuery")
582             << "Executing command: " << m_Command;
583         ExternalCommandProcessor::Execute(m_Command);
584         SendResponse(stream, LivestatusErrorOK, "");
585 }
586
587 void LivestatusQuery::ExecuteErrorHelper(const Stream::Ptr& stream)
588 {
589         Log(LogDebug, "LivestatusQuery")
590             << "ERROR: Code: '" << m_ErrorCode << "' Message: '" << m_ErrorMessage << "'.";
591         SendResponse(stream, m_ErrorCode, m_ErrorMessage);
592 }
593
594 void LivestatusQuery::SendResponse(const Stream::Ptr& stream, int code, const String& data)
595 {
596         if (m_ResponseHeader == "fixed16")
597                 PrintFixed16(stream, code, data);
598
599         if (m_ResponseHeader == "fixed16" || code == LivestatusErrorOK) {
600                 try {
601                         stream->Write(data.CStr(), data.GetLength());
602                 } catch (const std::exception&) {
603                         Log(LogCritical, "LivestatusQuery", "Cannot write query response to socket.");
604                 }
605         }
606 }
607
608 void LivestatusQuery::PrintFixed16(const Stream::Ptr& stream, int code, const String& data)
609 {
610         ASSERT(code >= 100 && code <= 999);
611
612         String sCode = Convert::ToString(code);
613         String sLength = Convert::ToString(static_cast<long>(data.GetLength()));
614
615         String header = sCode + String(16 - 3 - sLength.GetLength() - 1, ' ') + sLength + m_Separators[0];
616
617         try {
618                 stream->Write(header.CStr(), header.GetLength());
619         } catch (const std::exception&) {
620                 Log(LogCritical, "LivestatusQuery", "Cannot write to TCP socket.");
621         }
622 }
623
624 bool LivestatusQuery::Execute(const Stream::Ptr& stream)
625 {
626         try {
627                 Log(LogNotice, "LivestatusQuery")
628                     << "Executing livestatus query: " << m_Verb;
629
630                 if (m_Verb == "GET")
631                         ExecuteGetHelper(stream);
632                 else if (m_Verb == "COMMAND")
633                         ExecuteCommandHelper(stream);
634                 else if (m_Verb == "ERROR")
635                         ExecuteErrorHelper(stream);
636                 else
637                         BOOST_THROW_EXCEPTION(std::runtime_error("Invalid livestatus query verb."));
638         } catch (const std::exception& ex) {
639                 SendResponse(stream, LivestatusErrorQuery, DiagnosticInformation(ex));
640         }
641
642         if (!m_KeepAlive) {
643                 stream->Close();
644                 return false;
645         }
646
647         return true;
648 }