]> granicus.if.org Git - icinga2/blob - lib/livestatus/livestatusquery.cpp
Replace cJSON with YAJL
[icinga2] / lib / livestatus / livestatusquery.cpp
1 /******************************************************************************
2  * Icinga 2                                                                   *
3  * Copyright (C) 2012-2014 Icinga Development Team (http://www.icinga.org)    *
4  *                                                                            *
5  * This program is free software; you can redistribute it and/or              *
6  * modify it under the terms of the GNU General Public License                *
7  * as published by the Free Software Foundation; either version 2             *
8  * of the License, or (at your option) any later version.                     *
9  *                                                                            *
10  * This program is distributed in the hope that it will be useful,            *
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of             *
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the              *
13  * GNU General Public License for more details.                               *
14  *                                                                            *
15  * You should have received a copy of the GNU General Public License          *
16  * along with this program; if not, write to the Free Software Foundation     *
17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA.             *
18  ******************************************************************************/
19
20 #include "livestatus/livestatusquery.hpp"
21 #include "livestatus/countaggregator.hpp"
22 #include "livestatus/sumaggregator.hpp"
23 #include "livestatus/minaggregator.hpp"
24 #include "livestatus/maxaggregator.hpp"
25 #include "livestatus/avgaggregator.hpp"
26 #include "livestatus/stdaggregator.hpp"
27 #include "livestatus/invsumaggregator.hpp"
28 #include "livestatus/invavgaggregator.hpp"
29 #include "livestatus/attributefilter.hpp"
30 #include "livestatus/negatefilter.hpp"
31 #include "livestatus/orfilter.hpp"
32 #include "livestatus/andfilter.hpp"
33 #include "icinga/externalcommandprocessor.hpp"
34 #include "base/debug.hpp"
35 #include "base/convert.hpp"
36 #include "base/objectlock.hpp"
37 #include "base/logger.hpp"
38 #include "base/exception.hpp"
39 #include "base/utility.hpp"
40 #include "base/json.hpp"
41 #include <boost/algorithm/string/classification.hpp>
42 #include <boost/foreach.hpp>
43 #include <boost/algorithm/string/replace.hpp>
44 #include <boost/algorithm/string/split.hpp>
45
46 using namespace icinga;
47
48 static int l_ExternalCommands = 0;
49 static boost::mutex l_QueryMutex;
50
51 LivestatusQuery::LivestatusQuery(const std::vector<String>& lines, const String& compat_log_path)
52         : m_KeepAlive(false), m_OutputFormat("csv"), m_ColumnHeaders(true),
53           m_LogTimeFrom(0), m_LogTimeUntil(static_cast<long>(Utility::GetTime()))
54 {
55         if (lines.size() == 0) {
56                 m_Verb = "ERROR";
57                 m_ErrorCode = LivestatusErrorQuery;
58                 m_ErrorMessage = "Empty Query. Aborting.";
59                 return;
60         }
61
62         String msg;
63         BOOST_FOREACH(const String& line, lines) {
64                 msg += line + "\n";
65         }
66         Log(LogDebug, "LivestatusQuery", msg);
67
68         m_CompatLogPath = compat_log_path;
69
70         /* default separators */
71         m_Separators.push_back("\n");
72         m_Separators.push_back(";");
73         m_Separators.push_back(",");
74         m_Separators.push_back("|");
75
76         String line = lines[0];
77
78         size_t sp_index = line.FindFirstOf(" ");
79
80         if (sp_index == String::NPos)
81                 BOOST_THROW_EXCEPTION(std::runtime_error("Livestatus header must contain a verb."));
82
83         String verb = line.SubStr(0, sp_index);
84         String target = line.SubStr(sp_index + 1);
85
86         m_Verb = verb;
87
88         if (m_Verb == "COMMAND") {
89                 m_KeepAlive = true;
90                 m_Command = target;
91         } else if (m_Verb == "GET") {
92                 m_Table = target;
93         } else {
94                 m_Verb = "ERROR";
95                 m_ErrorCode = LivestatusErrorQuery;
96                 m_ErrorMessage = "Unknown livestatus verb: " + m_Verb;
97                 return;
98         }
99
100         std::deque<Filter::Ptr> filters, stats;
101         std::deque<Aggregator::Ptr> aggregators;
102
103         for (unsigned int i = 1; i < lines.size(); i++) {
104                 line = lines[i];
105
106                 size_t col_index = line.FindFirstOf(":");
107                 String header = line.SubStr(0, col_index);
108                 String params;
109
110                 //OutputFormat:json or OutputFormat: json
111                 if (line.GetLength() > col_index + 1)
112                         params = line.SubStr(col_index + 1);
113
114                 params.Trim();
115
116                 if (header == "ResponseHeader")
117                         m_ResponseHeader = params;
118                 else if (header == "OutputFormat")
119                         m_OutputFormat = params;
120                 else if (header == "KeepAlive")
121                         m_KeepAlive = (params == "on");
122                 else if (header == "Columns") {
123                         m_ColumnHeaders = false; // Might be explicitly re-enabled later on
124                         boost::algorithm::split(m_Columns, params, boost::is_any_of(" "));
125                 } else if (header == "Separators") {
126                         std::vector<String> separators;
127
128                         boost::algorithm::split(separators, params, boost::is_any_of(" "));
129                         /* ugly ascii long to char conversion, but works */
130                         if (separators.size() > 0)
131                                 m_Separators[0] = String(1, static_cast<char>(Convert::ToLong(separators[0])));
132                         if (separators.size() > 1)
133                                 m_Separators[1] = String(1, static_cast<char>(Convert::ToLong(separators[1])));
134                         if (separators.size() > 2)
135                                 m_Separators[2] = String(1, static_cast<char>(Convert::ToLong(separators[2])));
136                         if (separators.size() > 3)
137                                 m_Separators[3] = String(1, static_cast<char>(Convert::ToLong(separators[3])));
138                 } else if (header == "ColumnHeaders")
139                         m_ColumnHeaders = (params == "on");
140                 else if (header == "Filter") {
141                         Filter::Ptr filter = ParseFilter(params, m_LogTimeFrom, m_LogTimeUntil);
142
143                         if (!filter) {
144                                 m_Verb = "ERROR";
145                                 m_ErrorCode = LivestatusErrorQuery;
146                                 m_ErrorMessage = "Invalid filter specification: " + line;
147                                 return;
148                         }
149
150                         filters.push_back(filter);
151                 } else if (header == "Stats") {
152                         m_ColumnHeaders = false; // Might be explicitly re-enabled later on
153
154                         std::vector<String> tokens;
155                         boost::algorithm::split(tokens, params, boost::is_any_of(" "));
156
157                         if (tokens.size() < 2) {
158                                 m_Verb = "ERROR";
159                                 m_ErrorCode = LivestatusErrorQuery;
160                                 m_ErrorMessage = "Missing aggregator column name: " + line;
161                                 return;
162                         }
163
164                         String aggregate_arg = tokens[0];
165                         String aggregate_attr = tokens[1];
166
167                         Aggregator::Ptr aggregator;
168                         Filter::Ptr filter;
169
170                         if (aggregate_arg == "sum") {
171                                 aggregator = make_shared<SumAggregator>(aggregate_attr);
172                         } else if (aggregate_arg == "min") {
173                                 aggregator = make_shared<MinAggregator>(aggregate_attr);
174                         } else if (aggregate_arg == "max") {
175                                 aggregator = make_shared<MaxAggregator>(aggregate_attr);
176                         } else if (aggregate_arg == "avg") {
177                                 aggregator = make_shared<AvgAggregator>(aggregate_attr);
178                         } else if (aggregate_arg == "std") {
179                                 aggregator = make_shared<StdAggregator>(aggregate_attr);
180                         } else if (aggregate_arg == "suminv") {
181                                 aggregator = make_shared<InvSumAggregator>(aggregate_attr);
182                         } else if (aggregate_arg == "avginv") {
183                                 aggregator = make_shared<InvAvgAggregator>(aggregate_attr);
184                         } else {
185                                 filter = ParseFilter(params, m_LogTimeFrom, m_LogTimeUntil);
186
187                                 if (!filter) {
188                                         m_Verb = "ERROR";
189                                         m_ErrorCode = LivestatusErrorQuery;
190                                         m_ErrorMessage = "Invalid filter specification: " + line;
191                                         return;
192                                 }
193
194                                 aggregator = make_shared<CountAggregator>();
195                         }
196
197                         aggregator->SetFilter(filter);
198                         aggregators.push_back(aggregator);
199
200                         stats.push_back(filter);
201                 } else if (header == "Or" || header == "And" || header == "StatsOr" || header == "StatsAnd") {
202                         std::deque<Filter::Ptr>& deq = (header == "Or" || header == "And") ? filters : stats;
203
204                         unsigned int num = Convert::ToLong(params);
205                         CombinerFilter::Ptr filter;
206
207                         if (header == "Or" || header == "StatsOr") {
208                                 filter = make_shared<OrFilter>();
209                                 Log(LogDebug, "LivestatusQuery")
210                                     << "Add OR filter for " << params << " column(s). " << deq.size() << " filters available.";
211                         } else {
212                                 filter = make_shared<AndFilter>();
213                                 Log(LogDebug, "LivestatusQuery")
214                                     << "Add AND filter for " << params << " column(s). " << deq.size() << " filters available.";
215                         }
216
217                         if (num > deq.size()) {
218                                 m_Verb = "ERROR";
219                                 m_ErrorCode = 451;
220                                 m_ErrorMessage = "Or/StatsOr is referencing " + Convert::ToString(num) + " filters; stack only contains " + Convert::ToString(static_cast<long>(deq.size())) + " filters";
221                                 return;
222                         }
223
224                         while (num > 0 && num--) {
225                                 filter->AddSubFilter(deq.back());
226                                 Log(LogDebug, "LivestatusQuery")
227                                     << "Add " << num << " filter.";
228                                 deq.pop_back();
229                                 if (&deq == &stats)
230                                         aggregators.pop_back();
231                         }
232
233                         deq.push_back(filter);
234                         if (&deq == &stats) {
235                                 Aggregator::Ptr aggregator = make_shared<CountAggregator>();
236                                 aggregator->SetFilter(filter);
237                                 aggregators.push_back(aggregator);
238                         }
239                 } else if (header == "Negate" || header == "StatsNegate") {
240                         std::deque<Filter::Ptr>& deq = (header == "Negate") ? filters : stats;
241
242                         if (deq.empty()) {
243                                 m_Verb = "ERROR";
244                                 m_ErrorCode = 451;
245                                 m_ErrorMessage = "Negate/StatsNegate used, however the filter stack is empty";
246                                 return;
247                         }
248
249                         Filter::Ptr filter = deq.back();
250                         deq.pop_back();
251
252                         if (!filter) {
253                                 m_Verb = "ERROR";
254                                 m_ErrorCode = 451;
255                                 m_ErrorMessage = "Negate/StatsNegate used, however last stats doesn't have a filter";
256                                 return;
257                         }
258
259                         deq.push_back(make_shared<NegateFilter>(filter));
260
261                         if (deq == stats) {
262                                 Aggregator::Ptr aggregator = aggregators.back();
263                                 aggregator->SetFilter(filter);
264                         }
265                 }
266         }
267
268         /* Combine all top-level filters into a single filter. */
269         AndFilter::Ptr top_filter = make_shared<AndFilter>();
270
271         BOOST_FOREACH(const Filter::Ptr& filter, filters) {
272                 top_filter->AddSubFilter(filter);
273         }
274
275         m_Filter = top_filter;
276         m_Aggregators.swap(aggregators);
277 }
278
279 int LivestatusQuery::GetExternalCommands(void)
280 {
281         boost::mutex::scoped_lock lock(l_QueryMutex);
282
283         return l_ExternalCommands;
284 }
285
286 Filter::Ptr LivestatusQuery::ParseFilter(const String& params, unsigned long& from, unsigned long& until)
287 {
288         /*
289          * time >= 1382696656
290          * type = SERVICE FLAPPING ALERT
291          */
292         std::vector<String> tokens;
293         size_t sp_index;
294         String temp_buffer = params;
295
296         /* extract attr and op */
297         for (int i = 0; i < 2; i++) {
298                 sp_index = temp_buffer.FindFirstOf(" ");
299
300                 /* check if this is the last argument */
301                 if (sp_index == String::NPos) {
302                         /* 'attr op' or 'attr op val' is valid */
303                         if (i < 1)
304                                 BOOST_THROW_EXCEPTION(std::runtime_error("Livestatus filter '" + params + "' does not contain all required fields."));
305
306                         break;
307                 }
308
309                 tokens.push_back(temp_buffer.SubStr(0, sp_index));
310                 temp_buffer = temp_buffer.SubStr(sp_index + 1);
311         }
312
313         /* add the rest as value */
314         tokens.push_back(temp_buffer);
315
316         if (tokens.size() == 2)
317                 tokens.push_back("");
318
319         if (tokens.size() < 3)
320                 return Filter::Ptr();
321
322         bool negate = false;
323         String attr = tokens[0];
324         String op = tokens[1];
325         String val = tokens[2];
326
327         if (op == "!=") {
328                 op = "=";
329                 negate = true;
330         } else if (op == "!~") {
331                 op = "~";
332                 negate = true;
333         } else if (op == "!=~") {
334                 op = "=~";
335                 negate = true;
336         } else if (op == "!~~") {
337                 op = "~~";
338                 negate = true;
339         }
340
341         Filter::Ptr filter = make_shared<AttributeFilter>(attr, op, val);
342
343         if (negate)
344                 filter = make_shared<NegateFilter>(filter);
345
346         /* pre-filter log time duration */
347         if (attr == "time") {
348                 if (op == "<" || op == "<=") {
349                         until = Convert::ToLong(val);
350                 } else if (op == ">" || op == ">=") {
351                         from = Convert::ToLong(val);
352                 }
353         }
354
355         Log(LogDebug, "LivestatusQuery")
356             << "Parsed filter with attr: '" << attr << "' op: '" << op << "' val: '" << val << "'.";
357
358         return filter;
359 }
360
361 void LivestatusQuery::PrintResultSet(std::ostream& fp, const Array::Ptr& rs) const
362 {
363         if (m_OutputFormat == "csv") {
364                 ObjectLock olock(rs);
365
366                 BOOST_FOREACH(const Array::Ptr& row, rs) {
367                         bool first = true;
368
369                         ObjectLock rlock(row);
370                         BOOST_FOREACH(const Value& value, row) {
371                                 if (first)
372                                         first = false;
373                                 else
374                                         fp << m_Separators[1];
375
376                                 if (value.IsObjectType<Array>())
377                                         PrintCsvArray(fp, value, 0);
378                                 else
379                                         fp << value;
380                         }
381
382                         fp << m_Separators[0];
383                 }
384         } else if (m_OutputFormat == "json") {
385                 fp << JsonEncode(rs);
386         } else if (m_OutputFormat == "python") {
387                 PrintPythonArray(fp, rs);
388         }
389 }
390
391 void LivestatusQuery::PrintCsvArray(std::ostream& fp, const Array::Ptr& array, int level) const
392 {
393         bool first = true;
394
395         ObjectLock olock(array);
396         BOOST_FOREACH(const Value& value, array) {
397                 if (first)
398                         first = false;
399                 else
400                         fp << ((level == 0) ? m_Separators[2] : m_Separators[3]);
401
402                 if (value.IsObjectType<Array>())
403                         PrintCsvArray(fp, value, level + 1);
404                 else
405                         fp << value;
406         }
407 }
408
409 void LivestatusQuery::PrintPythonArray(std::ostream& fp, const Array::Ptr& rs) const
410 {
411         fp << "[ ";
412
413         bool first = true;
414
415         BOOST_FOREACH(const Value& value, rs) {
416                 if (first)
417                         first = false;
418                 else
419                         fp << ", ";
420
421                 if (value.IsObjectType<Array>())
422                         PrintPythonArray(fp, value);
423                 else if (value.IsNumber())
424                         fp << value;
425                 else
426                         fp << QuoteStringPython(value);
427         }
428
429         fp << " ]";
430 }
431
432 String LivestatusQuery::QuoteStringPython(const String& str) {
433         String result = str;
434         boost::algorithm::replace_all(result, "\"", "\\\"");
435         return "r\"" + result + "\"";
436 }
437
438 void LivestatusQuery::ExecuteGetHelper(const Stream::Ptr& stream)
439 {
440         Log(LogInformation, "LivestatusQuery")
441             << "Table: " << m_Table;
442
443         Table::Ptr table = Table::GetByName(m_Table, m_CompatLogPath, m_LogTimeFrom, m_LogTimeUntil);
444
445         if (!table) {
446                 SendResponse(stream, LivestatusErrorNotFound, "Table '" + m_Table + "' does not exist.");
447
448                 return;
449         }
450
451         std::vector<Value> objects = table->FilterRows(m_Filter);
452         std::vector<String> columns;
453
454         if (m_Columns.size() > 0)
455                 columns = m_Columns;
456         else
457                 columns = table->GetColumnNames();
458
459         Array::Ptr rs = make_shared<Array>();
460
461         if (m_Aggregators.empty()) {
462                 Array::Ptr header = make_shared<Array>();
463
464                 BOOST_FOREACH(const Value& object, objects) {
465                         Array::Ptr row = make_shared<Array>();
466
467                         BOOST_FOREACH(const String& columnName, columns) {
468                                 Column column = table->GetColumn(columnName);
469
470                                 if (m_ColumnHeaders)
471                                         header->Add(columnName);
472
473                                 row->Add(column.ExtractValue(object));
474                         }
475
476                         if (m_ColumnHeaders) {
477                                 rs->Add(header);
478                                 m_ColumnHeaders = false;
479                         }
480
481                         rs->Add(row);
482                 }
483         } else {
484                 std::vector<double> stats(m_Aggregators.size(), 0);
485                 int index = 0;
486
487                 /* add aggregated stats */
488                 BOOST_FOREACH(const Aggregator::Ptr aggregator, m_Aggregators) {
489                         BOOST_FOREACH(const Value& object, objects) {
490                                 aggregator->Apply(table, object);
491                         }
492
493                         stats[index] = aggregator->GetResult();
494                         index++;
495                 }
496
497                 /* add column headers both for raw and aggregated data */
498                 if (m_ColumnHeaders) {
499                         Array::Ptr header = make_shared<Array>();
500
501                         BOOST_FOREACH(const String& columnName, m_Columns) {
502                                 header->Add(columnName);
503                         }
504
505                         for (size_t i = 1; i <= m_Aggregators.size(); i++) {
506                                 header->Add("stats_" + Convert::ToString(i));
507                         }
508
509                         rs->Add(header);
510                 }
511
512                 Array::Ptr row = make_shared<Array>();
513
514                 /*
515                  * add selected columns next to stats
516                  * may not be accurate for grouping!
517                  */
518                 if (objects.size() > 0 && m_Columns.size() > 0) {
519                         BOOST_FOREACH(const String& columnName, m_Columns) {
520                                 Column column = table->GetColumn(columnName);
521
522                                 row->Add(column.ExtractValue(objects[0])); // first object wins
523                         }
524                 }
525
526                 for (size_t i = 0; i < m_Aggregators.size(); i++)
527                         row->Add(stats[i]);
528
529                 rs->Add(row);
530         }
531
532         std::ostringstream result;
533         PrintResultSet(result, rs);
534
535         SendResponse(stream, LivestatusErrorOK, result.str());
536 }
537
538 void LivestatusQuery::ExecuteCommandHelper(const Stream::Ptr& stream)
539 {
540         {
541                 boost::mutex::scoped_lock lock(l_QueryMutex);
542
543                 l_ExternalCommands++;
544         }
545
546         Log(LogInformation, "LivestatusQuery")
547             << "Executing command: " << m_Command;
548         ExternalCommandProcessor::Execute(m_Command);
549         SendResponse(stream, LivestatusErrorOK, "");
550 }
551
552 void LivestatusQuery::ExecuteErrorHelper(const Stream::Ptr& stream)
553 {
554         Log(LogDebug, "LivestatusQuery")
555             << "ERROR: Code: '" << m_ErrorCode << "' Message: '" << m_ErrorMessage << "'.";
556         SendResponse(stream, m_ErrorCode, m_ErrorMessage);
557 }
558
559 void LivestatusQuery::SendResponse(const Stream::Ptr& stream, int code, const String& data)
560 {
561         if (m_ResponseHeader == "fixed16")
562                 PrintFixed16(stream, code, data);
563
564         if (m_ResponseHeader == "fixed16" || code == LivestatusErrorOK) {
565                 try {
566                         stream->Write(data.CStr(), data.GetLength());
567                 } catch (const std::exception&) {
568                         Log(LogCritical, "LivestatusQuery", "Cannot write to TCP socket.");
569                 }
570         }
571 }
572
573 void LivestatusQuery::PrintFixed16(const Stream::Ptr& stream, int code, const String& data)
574 {
575         ASSERT(code >= 100 && code <= 999);
576
577         String sCode = Convert::ToString(code);
578         String sLength = Convert::ToString(static_cast<long>(data.GetLength()));
579
580         String header = sCode + String(16 - 3 - sLength.GetLength() - 1, ' ') + sLength + m_Separators[0];
581
582         try {
583                 stream->Write(header.CStr(), header.GetLength());
584         } catch (const std::exception&) {
585                 Log(LogCritical, "LivestatusQuery", "Cannot write to TCP socket.");
586         }
587 }
588
589 bool LivestatusQuery::Execute(const Stream::Ptr& stream)
590 {
591         try {
592                 Log(LogInformation, "LivestatusQuery")
593                     << "Executing livestatus query: " << m_Verb;
594
595                 if (m_Verb == "GET")
596                         ExecuteGetHelper(stream);
597                 else if (m_Verb == "COMMAND")
598                         ExecuteCommandHelper(stream);
599                 else if (m_Verb == "ERROR")
600                         ExecuteErrorHelper(stream);
601                 else
602                         BOOST_THROW_EXCEPTION(std::runtime_error("Invalid livestatus query verb."));
603         } catch (const std::exception& ex) {
604                 SendResponse(stream, LivestatusErrorQuery, DiagnosticInformation(ex));
605         }
606
607         if (!m_KeepAlive) {
608                 stream->Close();
609                 return false;
610         }
611
612         return true;
613 }