]> granicus.if.org Git - icinga2/blob - lib/livestatus/livestatusquery.cpp
Remove logger_fwd.hpp
[icinga2] / lib / livestatus / livestatusquery.cpp
1 /******************************************************************************
2  * Icinga 2                                                                   *
3  * Copyright (C) 2012-2014 Icinga Development Team (http://www.icinga.org)    *
4  *                                                                            *
5  * This program is free software; you can redistribute it and/or              *
6  * modify it under the terms of the GNU General Public License                *
7  * as published by the Free Software Foundation; either version 2             *
8  * of the License, or (at your option) any later version.                     *
9  *                                                                            *
10  * This program is distributed in the hope that it will be useful,            *
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of             *
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the              *
13  * GNU General Public License for more details.                               *
14  *                                                                            *
15  * You should have received a copy of the GNU General Public License          *
16  * along with this program; if not, write to the Free Software Foundation     *
17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA.             *
18  ******************************************************************************/
19
20 #include "livestatus/livestatusquery.hpp"
21 #include "livestatus/countaggregator.hpp"
22 #include "livestatus/sumaggregator.hpp"
23 #include "livestatus/minaggregator.hpp"
24 #include "livestatus/maxaggregator.hpp"
25 #include "livestatus/avgaggregator.hpp"
26 #include "livestatus/stdaggregator.hpp"
27 #include "livestatus/invsumaggregator.hpp"
28 #include "livestatus/invavgaggregator.hpp"
29 #include "livestatus/attributefilter.hpp"
30 #include "livestatus/negatefilter.hpp"
31 #include "livestatus/orfilter.hpp"
32 #include "livestatus/andfilter.hpp"
33 #include "icinga/externalcommandprocessor.hpp"
34 #include "base/debug.hpp"
35 #include "base/convert.hpp"
36 #include "base/objectlock.hpp"
37 #include "base/logger.hpp"
38 #include "base/exception.hpp"
39 #include "base/utility.hpp"
40 #include "base/serializer.hpp"
41 #include <boost/algorithm/string/classification.hpp>
42 #include <boost/foreach.hpp>
43 #include <boost/algorithm/string/replace.hpp>
44 #include <boost/algorithm/string/split.hpp>
45
46 using namespace icinga;
47
48 static int l_ExternalCommands = 0;
49 static boost::mutex l_QueryMutex;
50
51 LivestatusQuery::LivestatusQuery(const std::vector<String>& lines, const String& compat_log_path)
52         : m_KeepAlive(false), m_OutputFormat("csv"), m_ColumnHeaders(true),
53           m_LogTimeFrom(0), m_LogTimeUntil(static_cast<long>(Utility::GetTime()))
54 {
55         if (lines.size() == 0) {
56                 m_Verb = "ERROR";
57                 m_ErrorCode = LivestatusErrorQuery;
58                 m_ErrorMessage = "Empty Query. Aborting.";
59                 return;
60         }
61
62         String msg;
63         BOOST_FOREACH(const String& line, lines) {
64                 msg += line + "\n";
65         }
66         Log(LogDebug, "LivestatusQuery", msg);
67
68         m_CompatLogPath = compat_log_path;
69
70         /* default separators */
71         m_Separators.push_back("\n");
72         m_Separators.push_back(";");
73         m_Separators.push_back(",");
74         m_Separators.push_back("|");
75
76         String line = lines[0];
77
78         size_t sp_index = line.FindFirstOf(" ");
79
80         if (sp_index == String::NPos)
81                 BOOST_THROW_EXCEPTION(std::runtime_error("Livestatus header must contain a verb."));
82
83         String verb = line.SubStr(0, sp_index);
84         String target = line.SubStr(sp_index + 1);
85
86         m_Verb = verb;
87
88         if (m_Verb == "COMMAND") {
89                 m_KeepAlive = true;
90                 m_Command = target;
91         } else if (m_Verb == "GET") {
92                 m_Table = target;
93         } else {
94                 m_Verb = "ERROR";
95                 m_ErrorCode = LivestatusErrorQuery;
96                 m_ErrorMessage = "Unknown livestatus verb: " + m_Verb;
97                 return;
98         }
99
100         std::deque<Filter::Ptr> filters, stats;
101         std::deque<Aggregator::Ptr> aggregators;
102
103         for (unsigned int i = 1; i < lines.size(); i++) {
104                 line = lines[i];
105
106                 size_t col_index = line.FindFirstOf(":");
107                 String header = line.SubStr(0, col_index);
108                 String params;
109
110                 //OutputFormat:json or OutputFormat: json
111                 if (line.GetLength() > col_index + 1)
112                         params = line.SubStr(col_index + 1);
113
114                 params.Trim();
115
116                 if (header == "ResponseHeader")
117                         m_ResponseHeader = params;
118                 else if (header == "OutputFormat")
119                         m_OutputFormat = params;
120                 else if (header == "KeepAlive")
121                         m_KeepAlive = (params == "on");
122                 else if (header == "Columns") {
123                         m_ColumnHeaders = false; // Might be explicitly re-enabled later on
124                         boost::algorithm::split(m_Columns, params, boost::is_any_of(" "));
125                 } else if (header == "Separators") {
126                         std::vector<String> separators;
127
128                         boost::algorithm::split(separators, params, boost::is_any_of(" "));
129                         /* ugly ascii long to char conversion, but works */
130                         if (separators.size() > 0)
131                                 m_Separators[0] = String(1, static_cast<char>(Convert::ToLong(separators[0])));
132                         if (separators.size() > 1)
133                                 m_Separators[1] = String(1, static_cast<char>(Convert::ToLong(separators[1])));
134                         if (separators.size() > 2)
135                                 m_Separators[2] = String(1, static_cast<char>(Convert::ToLong(separators[2])));
136                         if (separators.size() > 3)
137                                 m_Separators[3] = String(1, static_cast<char>(Convert::ToLong(separators[3])));
138                 } else if (header == "ColumnHeaders")
139                         m_ColumnHeaders = (params == "on");
140                 else if (header == "Filter") {
141                         Filter::Ptr filter = ParseFilter(params, m_LogTimeFrom, m_LogTimeUntil);
142
143                         if (!filter) {
144                                 m_Verb = "ERROR";
145                                 m_ErrorCode = LivestatusErrorQuery;
146                                 m_ErrorMessage = "Invalid filter specification: " + line;
147                                 return;
148                         }
149
150                         filters.push_back(filter);
151                 } else if (header == "Stats") {
152                         m_ColumnHeaders = false; // Might be explicitly re-enabled later on
153
154                         std::vector<String> tokens;
155                         boost::algorithm::split(tokens, params, boost::is_any_of(" "));
156
157                         if (tokens.size() < 2) {
158                                 m_Verb = "ERROR";
159                                 m_ErrorCode = LivestatusErrorQuery;
160                                 m_ErrorMessage = "Missing aggregator column name: " + line;
161                                 return;
162                         }
163
164                         String aggregate_arg = tokens[0];
165                         String aggregate_attr = tokens[1];
166
167                         Aggregator::Ptr aggregator;
168                         Filter::Ptr filter;
169
170                         if (aggregate_arg == "sum") {
171                                 aggregator = make_shared<SumAggregator>(aggregate_attr);
172                         } else if (aggregate_arg == "min") {
173                                 aggregator = make_shared<MinAggregator>(aggregate_attr);
174                         } else if (aggregate_arg == "max") {
175                                 aggregator = make_shared<MaxAggregator>(aggregate_attr);
176                         } else if (aggregate_arg == "avg") {
177                                 aggregator = make_shared<AvgAggregator>(aggregate_attr);
178                         } else if (aggregate_arg == "std") {
179                                 aggregator = make_shared<StdAggregator>(aggregate_attr);
180                         } else if (aggregate_arg == "suminv") {
181                                 aggregator = make_shared<InvSumAggregator>(aggregate_attr);
182                         } else if (aggregate_arg == "avginv") {
183                                 aggregator = make_shared<InvAvgAggregator>(aggregate_attr);
184                         } else {
185                                 filter = ParseFilter(params, m_LogTimeFrom, m_LogTimeUntil);
186
187                                 if (!filter) {
188                                         m_Verb = "ERROR";
189                                         m_ErrorCode = LivestatusErrorQuery;
190                                         m_ErrorMessage = "Invalid filter specification: " + line;
191                                         return;
192                                 }
193
194                                 aggregator = make_shared<CountAggregator>();
195                         }
196
197                         aggregator->SetFilter(filter);
198                         aggregators.push_back(aggregator);
199
200                         stats.push_back(filter);
201                 } else if (header == "Or" || header == "And" || header == "StatsOr" || header == "StatsAnd") {
202                         std::deque<Filter::Ptr>& deq = (header == "Or" || header == "And") ? filters : stats;
203
204                         unsigned int num = Convert::ToLong(params);
205                         CombinerFilter::Ptr filter;
206
207                         if (header == "Or" || header == "StatsOr") {
208                                 filter = make_shared<OrFilter>();
209                                 Log(LogDebug, "LivestatusQuery", "Add OR filter for " + params + " column(s). " + Convert::ToString(deq.size()) + " filters available.");
210                         } else {
211                                 filter = make_shared<AndFilter>();
212                                 Log(LogDebug, "LivestatusQuery", "Add AND filter for " + params + " column(s). " + Convert::ToString(deq.size()) + " filters available.");
213                         }
214
215                         if (num > deq.size()) {
216                                 m_Verb = "ERROR";
217                                 m_ErrorCode = 451;
218                                 m_ErrorMessage = "Or/StatsOr is referencing " + Convert::ToString(num) + " filters; stack only contains " + Convert::ToString(static_cast<long>(deq.size())) + " filters";
219                                 return;
220                         }
221
222                         while (num > 0 && num--) {
223                                 filter->AddSubFilter(deq.back());
224                                 Log(LogDebug, "LivestatusQuery", "Add " +  Convert::ToString(num) + " filter.");
225                                 deq.pop_back();
226                                 if (&deq == &stats)
227                                         aggregators.pop_back();
228                         }
229
230                         deq.push_back(filter);
231                         if (&deq == &stats) {
232                                 Aggregator::Ptr aggregator = make_shared<CountAggregator>();
233                                 aggregator->SetFilter(filter);
234                                 aggregators.push_back(aggregator);
235                         }
236                 } else if (header == "Negate" || header == "StatsNegate") {
237                         std::deque<Filter::Ptr>& deq = (header == "Negate") ? filters : stats;
238
239                         if (deq.empty()) {
240                                 m_Verb = "ERROR";
241                                 m_ErrorCode = 451;
242                                 m_ErrorMessage = "Negate/StatsNegate used, however the filter stack is empty";
243                                 return;
244                         }
245
246                         Filter::Ptr filter = deq.back();
247                         deq.pop_back();
248
249                         if (!filter) {
250                                 m_Verb = "ERROR";
251                                 m_ErrorCode = 451;
252                                 m_ErrorMessage = "Negate/StatsNegate used, however last stats doesn't have a filter";
253                                 return;
254                         }
255
256                         deq.push_back(make_shared<NegateFilter>(filter));
257
258                         if (deq == stats) {
259                                 Aggregator::Ptr aggregator = aggregators.back();
260                                 aggregator->SetFilter(filter);
261                         }
262                 }
263         }
264
265         /* Combine all top-level filters into a single filter. */
266         AndFilter::Ptr top_filter = make_shared<AndFilter>();
267
268         BOOST_FOREACH(const Filter::Ptr& filter, filters) {
269                 top_filter->AddSubFilter(filter);
270         }
271
272         m_Filter = top_filter;
273         m_Aggregators.swap(aggregators);
274 }
275
276 int LivestatusQuery::GetExternalCommands(void)
277 {
278         boost::mutex::scoped_lock lock(l_QueryMutex);
279
280         return l_ExternalCommands;
281 }
282
283 Filter::Ptr LivestatusQuery::ParseFilter(const String& params, unsigned long& from, unsigned long& until)
284 {
285         /*
286          * time >= 1382696656
287          * type = SERVICE FLAPPING ALERT
288          */
289         std::vector<String> tokens;
290         size_t sp_index;
291         String temp_buffer = params;
292
293         /* extract attr and op */
294         for (int i = 0; i < 2; i++) {
295                 sp_index = temp_buffer.FindFirstOf(" ");
296
297                 /* check if this is the last argument */
298                 if (sp_index == String::NPos) {
299                         /* 'attr op' or 'attr op val' is valid */
300                         if (i < 1)
301                                 BOOST_THROW_EXCEPTION(std::runtime_error("Livestatus filter '" + params + "' does not contain all required fields."));
302
303                         break;
304                 }
305
306                 tokens.push_back(temp_buffer.SubStr(0, sp_index));
307                 temp_buffer = temp_buffer.SubStr(sp_index + 1);
308         }
309
310         /* add the rest as value */
311         tokens.push_back(temp_buffer);
312
313         if (tokens.size() == 2)
314                 tokens.push_back("");
315
316         if (tokens.size() < 3)
317                 return Filter::Ptr();
318
319         bool negate = false;
320         String attr = tokens[0];
321         String op = tokens[1];
322         String val = tokens[2];
323
324         if (op == "!=") {
325                 op = "=";
326                 negate = true;
327         } else if (op == "!~") {
328                 op = "~";
329                 negate = true;
330         } else if (op == "!=~") {
331                 op = "=~";
332                 negate = true;
333         } else if (op == "!~~") {
334                 op = "~~";
335                 negate = true;
336         }
337
338         Filter::Ptr filter = make_shared<AttributeFilter>(attr, op, val);
339
340         if (negate)
341                 filter = make_shared<NegateFilter>(filter);
342
343         /* pre-filter log time duration */
344         if (attr == "time") {
345                 if (op == "<" || op == "<=") {
346                         until = Convert::ToLong(val);
347                 } else if (op == ">" || op == ">=") {
348                         from = Convert::ToLong(val);
349                 }
350         }
351
352         Log(LogDebug, "LivestatusQuery", "Parsed filter with attr: '" + attr + "' op: '" + op + "' val: '" + val + "'.");
353
354         return filter;
355 }
356
357 void LivestatusQuery::PrintResultSet(std::ostream& fp, const Array::Ptr& rs) const
358 {
359         if (m_OutputFormat == "csv") {
360                 ObjectLock olock(rs);
361
362                 BOOST_FOREACH(const Array::Ptr& row, rs) {
363                         bool first = true;
364
365                         ObjectLock rlock(row);
366                         BOOST_FOREACH(const Value& value, row) {
367                                 if (first)
368                                         first = false;
369                                 else
370                                         fp << m_Separators[1];
371
372                                 if (value.IsObjectType<Array>())
373                                         PrintCsvArray(fp, value, 0);
374                                 else
375                                         fp << value;
376                         }
377
378                         fp << m_Separators[0];
379                 }
380         } else if (m_OutputFormat == "json") {
381                 fp << JsonSerialize(rs);
382         } else if (m_OutputFormat == "python") {
383                 PrintPythonArray(fp, rs);
384         }
385 }
386
387 void LivestatusQuery::PrintCsvArray(std::ostream& fp, const Array::Ptr& array, int level) const
388 {
389         bool first = true;
390
391         ObjectLock olock(array);
392         BOOST_FOREACH(const Value& value, array) {
393                 if (first)
394                         first = false;
395                 else
396                         fp << ((level == 0) ? m_Separators[2] : m_Separators[3]);
397
398                 if (value.IsObjectType<Array>())
399                         PrintCsvArray(fp, value, level + 1);
400                 else
401                         fp << value;
402         }
403 }
404
405 void LivestatusQuery::PrintPythonArray(std::ostream& fp, const Array::Ptr& rs) const
406 {
407         fp << "[ ";
408
409         bool first = true;
410
411         BOOST_FOREACH(const Value& value, rs) {
412                 if (first)
413                         first = false;
414                 else
415                         fp << ", ";
416
417                 if (value.IsObjectType<Array>())
418                         PrintPythonArray(fp, value);
419                 else if (value.IsNumber())
420                         fp << value;
421                 else
422                         fp << QuoteStringPython(value);
423         }
424
425         fp << " ]";
426 }
427
428 String LivestatusQuery::QuoteStringPython(const String& str) {
429         String result = str;
430         boost::algorithm::replace_all(result, "\"", "\\\"");
431         return "r\"" + result + "\"";
432 }
433
434 void LivestatusQuery::ExecuteGetHelper(const Stream::Ptr& stream)
435 {
436         Log(LogInformation, "LivestatusQuery", "Table: " + m_Table);
437
438         Table::Ptr table = Table::GetByName(m_Table, m_CompatLogPath, m_LogTimeFrom, m_LogTimeUntil);
439
440         if (!table) {
441                 SendResponse(stream, LivestatusErrorNotFound, "Table '" + m_Table + "' does not exist.");
442
443                 return;
444         }
445
446         std::vector<Value> objects = table->FilterRows(m_Filter);
447         std::vector<String> columns;
448
449         if (m_Columns.size() > 0)
450                 columns = m_Columns;
451         else
452                 columns = table->GetColumnNames();
453
454         Array::Ptr rs = make_shared<Array>();
455
456         if (m_Aggregators.empty()) {
457                 Array::Ptr header = make_shared<Array>();
458
459                 BOOST_FOREACH(const Value& object, objects) {
460                         Array::Ptr row = make_shared<Array>();
461
462                         BOOST_FOREACH(const String& columnName, columns) {
463                                 Column column = table->GetColumn(columnName);
464
465                                 if (m_ColumnHeaders)
466                                         header->Add(columnName);
467
468                                 row->Add(column.ExtractValue(object));
469                         }
470
471                         if (m_ColumnHeaders) {
472                                 rs->Add(header);
473                                 m_ColumnHeaders = false;
474                         }
475
476                         rs->Add(row);
477                 }
478         } else {
479                 std::vector<double> stats(m_Aggregators.size(), 0);
480                 int index = 0;
481
482                 /* add aggregated stats */
483                 BOOST_FOREACH(const Aggregator::Ptr aggregator, m_Aggregators) {
484                         BOOST_FOREACH(const Value& object, objects) {
485                                 aggregator->Apply(table, object);
486                         }
487
488                         stats[index] = aggregator->GetResult();
489                         index++;
490                 }
491
492                 /* add column headers both for raw and aggregated data */
493                 if (m_ColumnHeaders) {
494                         Array::Ptr header = make_shared<Array>();
495
496                         BOOST_FOREACH(const String& columnName, m_Columns) {
497                                 header->Add(columnName);
498                         }
499
500                         for (size_t i = 1; i <= m_Aggregators.size(); i++) {
501                                 header->Add("stats_" + Convert::ToString(i));
502                         }
503
504                         rs->Add(header);
505                 }
506
507                 Array::Ptr row = make_shared<Array>();
508
509                 /*
510                  * add selected columns next to stats
511                  * may not be accurate for grouping!
512                  */
513                 if (objects.size() > 0 && m_Columns.size() > 0) {
514                         BOOST_FOREACH(const String& columnName, m_Columns) {
515                                 Column column = table->GetColumn(columnName);
516
517                                 row->Add(column.ExtractValue(objects[0])); // first object wins
518                         }
519                 }
520
521                 for (size_t i = 0; i < m_Aggregators.size(); i++)
522                         row->Add(stats[i]);
523
524                 rs->Add(row);
525         }
526
527         std::ostringstream result;
528         PrintResultSet(result, rs);
529
530         SendResponse(stream, LivestatusErrorOK, result.str());
531 }
532
533 void LivestatusQuery::ExecuteCommandHelper(const Stream::Ptr& stream)
534 {
535         {
536                 boost::mutex::scoped_lock lock(l_QueryMutex);
537
538                 l_ExternalCommands++;
539         }
540
541         Log(LogInformation, "LivestatusQuery", "Executing command: " + m_Command);
542         ExternalCommandProcessor::Execute(m_Command);
543         SendResponse(stream, LivestatusErrorOK, "");
544 }
545
546 void LivestatusQuery::ExecuteErrorHelper(const Stream::Ptr& stream)
547 {
548         Log(LogDebug, "LivestatusQuery", "ERROR: Code: '" + Convert::ToString(m_ErrorCode) + "' Message: '" + m_ErrorMessage + "'.");
549         SendResponse(stream, m_ErrorCode, m_ErrorMessage);
550 }
551
552 void LivestatusQuery::SendResponse(const Stream::Ptr& stream, int code, const String& data)
553 {
554         if (m_ResponseHeader == "fixed16")
555                 PrintFixed16(stream, code, data);
556
557         if (m_ResponseHeader == "fixed16" || code == LivestatusErrorOK) {
558                 try {
559                         stream->Write(data.CStr(), data.GetLength());
560                 } catch (const std::exception&) {
561                         Log(LogCritical, "LivestatusQuery", "Cannot write to tcp socket.");
562                 }
563         }
564 }
565
566 void LivestatusQuery::PrintFixed16(const Stream::Ptr& stream, int code, const String& data)
567 {
568         ASSERT(code >= 100 && code <= 999);
569
570         String sCode = Convert::ToString(code);
571         String sLength = Convert::ToString(static_cast<long>(data.GetLength()));
572
573         String header = sCode + String(16 - 3 - sLength.GetLength() - 1, ' ') + sLength + m_Separators[0];
574
575         try {
576                 stream->Write(header.CStr(), header.GetLength());
577         } catch (const std::exception&) {
578                 Log(LogCritical, "LivestatusQuery", "Cannot write to tcp socket.");
579         }
580 }
581
582 bool LivestatusQuery::Execute(const Stream::Ptr& stream)
583 {
584         try {
585                 Log(LogInformation, "LivestatusQuery", "Executing livestatus query: " + m_Verb);
586
587                 if (m_Verb == "GET")
588                         ExecuteGetHelper(stream);
589                 else if (m_Verb == "COMMAND")
590                         ExecuteCommandHelper(stream);
591                 else if (m_Verb == "ERROR")
592                         ExecuteErrorHelper(stream);
593                 else
594                         BOOST_THROW_EXCEPTION(std::runtime_error("Invalid livestatus query verb."));
595         } catch (const std::exception& ex) {
596                 SendResponse(stream, LivestatusErrorQuery, DiagnosticInformation(ex));
597         }
598
599         if (!m_KeepAlive) {
600                 stream->Close();
601                 return false;
602         }
603
604         return true;
605 }