1 /******************************************************************************
3 * Copyright (C) 2012-2017 Icinga Development Team (https://www.icinga.com/) *
5 * This program is free software; you can redistribute it and/or *
6 * modify it under the terms of the GNU General Public License *
7 * as published by the Free Software Foundation; either version 2 *
8 * of the License, or (at your option) any later version. *
10 * This program is distributed in the hope that it will be useful, *
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
13 * GNU General Public License for more details. *
15 * You should have received a copy of the GNU General Public License *
16 * along with this program; if not, write to the Free Software Foundation *
17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
18 ******************************************************************************/
20 #include "livestatus/livestatusquery.hpp"
21 #include "livestatus/countaggregator.hpp"
22 #include "livestatus/sumaggregator.hpp"
23 #include "livestatus/minaggregator.hpp"
24 #include "livestatus/maxaggregator.hpp"
25 #include "livestatus/avgaggregator.hpp"
26 #include "livestatus/stdaggregator.hpp"
27 #include "livestatus/invsumaggregator.hpp"
28 #include "livestatus/invavgaggregator.hpp"
29 #include "livestatus/attributefilter.hpp"
30 #include "livestatus/negatefilter.hpp"
31 #include "livestatus/orfilter.hpp"
32 #include "livestatus/andfilter.hpp"
33 #include "icinga/externalcommandprocessor.hpp"
34 #include "base/debug.hpp"
35 #include "base/convert.hpp"
36 #include "base/objectlock.hpp"
37 #include "base/logger.hpp"
38 #include "base/exception.hpp"
39 #include "base/utility.hpp"
40 #include "base/json.hpp"
41 #include "base/serializer.hpp"
42 #include "base/timer.hpp"
43 #include "base/initialize.hpp"
44 #include <boost/algorithm/string/classification.hpp>
45 #include <boost/algorithm/string/replace.hpp>
46 #include <boost/algorithm/string/split.hpp>
47 #include <boost/algorithm/string/join.hpp>
49 using namespace icinga;
51 static int l_ExternalCommands = 0;
52 static boost::mutex l_QueryMutex;
54 LivestatusQuery::LivestatusQuery(const std::vector<String>& lines, const String& compat_log_path)
55 : m_KeepAlive(false), m_OutputFormat("csv"), m_ColumnHeaders(true), m_Limit(-1), m_ErrorCode(0),
56 m_LogTimeFrom(0), m_LogTimeUntil(static_cast<long>(Utility::GetTime()))
58 if (lines.size() == 0) {
60 m_ErrorCode = LivestatusErrorQuery;
61 m_ErrorMessage = "Empty Query. Aborting.";
66 for (const String& line : lines) {
69 Log(LogDebug, "LivestatusQuery", msg);
71 m_CompatLogPath = compat_log_path;
73 /* default separators */
74 m_Separators.push_back("\n");
75 m_Separators.push_back(";");
76 m_Separators.push_back(",");
77 m_Separators.push_back("|");
79 String line = lines[0];
81 size_t sp_index = line.FindFirstOf(" ");
83 if (sp_index == String::NPos)
84 BOOST_THROW_EXCEPTION(std::runtime_error("Livestatus header must contain a verb."));
86 String verb = line.SubStr(0, sp_index);
87 String target = line.SubStr(sp_index + 1);
91 if (m_Verb == "COMMAND") {
94 } else if (m_Verb == "GET") {
98 m_ErrorCode = LivestatusErrorQuery;
99 m_ErrorMessage = "Unknown livestatus verb: " + m_Verb;
103 std::deque<Filter::Ptr> filters, stats;
104 std::deque<Aggregator::Ptr> aggregators;
106 for (unsigned int i = 1; i < lines.size(); i++) {
109 size_t col_index = line.FindFirstOf(":");
110 String header = line.SubStr(0, col_index);
113 //OutputFormat:json or OutputFormat: json
114 if (line.GetLength() > col_index + 1)
115 params = line.SubStr(col_index + 1).Trim();
117 if (header == "ResponseHeader")
118 m_ResponseHeader = params;
119 else if (header == "OutputFormat")
120 m_OutputFormat = params;
121 else if (header == "KeepAlive")
122 m_KeepAlive = (params == "on");
123 else if (header == "Columns") {
124 m_ColumnHeaders = false; // Might be explicitly re-enabled later on
125 boost::algorithm::split(m_Columns, params, boost::is_any_of(" "));
126 } else if (header == "Separators") {
127 std::vector<String> separators;
129 boost::algorithm::split(separators, params, boost::is_any_of(" "));
130 /* ugly ascii long to char conversion, but works */
131 if (separators.size() > 0)
132 m_Separators[0] = String(1, static_cast<char>(Convert::ToLong(separators[0])));
133 if (separators.size() > 1)
134 m_Separators[1] = String(1, static_cast<char>(Convert::ToLong(separators[1])));
135 if (separators.size() > 2)
136 m_Separators[2] = String(1, static_cast<char>(Convert::ToLong(separators[2])));
137 if (separators.size() > 3)
138 m_Separators[3] = String(1, static_cast<char>(Convert::ToLong(separators[3])));
139 } else if (header == "ColumnHeaders")
140 m_ColumnHeaders = (params == "on");
141 else if (header == "Limit")
142 m_Limit = Convert::ToLong(params);
143 else if (header == "Filter") {
144 Filter::Ptr filter = ParseFilter(params, m_LogTimeFrom, m_LogTimeUntil);
148 m_ErrorCode = LivestatusErrorQuery;
149 m_ErrorMessage = "Invalid filter specification: " + line;
153 filters.push_back(filter);
154 } else if (header == "Stats") {
155 m_ColumnHeaders = false; // Might be explicitly re-enabled later on
157 std::vector<String> tokens;
158 boost::algorithm::split(tokens, params, boost::is_any_of(" "));
160 if (tokens.size() < 2) {
162 m_ErrorCode = LivestatusErrorQuery;
163 m_ErrorMessage = "Missing aggregator column name: " + line;
167 String aggregate_arg = tokens[0];
168 String aggregate_attr = tokens[1];
170 Aggregator::Ptr aggregator;
173 if (aggregate_arg == "sum") {
174 aggregator = new SumAggregator(aggregate_attr);
175 } else if (aggregate_arg == "min") {
176 aggregator = new MinAggregator(aggregate_attr);
177 } else if (aggregate_arg == "max") {
178 aggregator = new MaxAggregator(aggregate_attr);
179 } else if (aggregate_arg == "avg") {
180 aggregator = new AvgAggregator(aggregate_attr);
181 } else if (aggregate_arg == "std") {
182 aggregator = new StdAggregator(aggregate_attr);
183 } else if (aggregate_arg == "suminv") {
184 aggregator = new InvSumAggregator(aggregate_attr);
185 } else if (aggregate_arg == "avginv") {
186 aggregator = new InvAvgAggregator(aggregate_attr);
188 filter = ParseFilter(params, m_LogTimeFrom, m_LogTimeUntil);
192 m_ErrorCode = LivestatusErrorQuery;
193 m_ErrorMessage = "Invalid filter specification: " + line;
197 aggregator = new CountAggregator();
200 aggregator->SetFilter(filter);
201 aggregators.push_back(aggregator);
203 stats.push_back(filter);
204 } else if (header == "Or" || header == "And" || header == "StatsOr" || header == "StatsAnd") {
205 std::deque<Filter::Ptr>& deq = (header == "Or" || header == "And") ? filters : stats;
207 unsigned int num = Convert::ToLong(params);
208 CombinerFilter::Ptr filter;
210 if (header == "Or" || header == "StatsOr") {
211 filter = new OrFilter();
212 Log(LogDebug, "LivestatusQuery")
213 << "Add OR filter for " << params << " column(s). " << deq.size() << " filters available.";
215 filter = new AndFilter();
216 Log(LogDebug, "LivestatusQuery")
217 << "Add AND filter for " << params << " column(s). " << deq.size() << " filters available.";
220 if (num > deq.size()) {
223 m_ErrorMessage = "Or/StatsOr is referencing " + Convert::ToString(num) + " filters; stack only contains " + Convert::ToString(static_cast<long>(deq.size())) + " filters";
227 while (num > 0 && num--) {
228 filter->AddSubFilter(deq.back());
229 Log(LogDebug, "LivestatusQuery")
230 << "Add " << num << " filter.";
233 aggregators.pop_back();
236 deq.push_back(filter);
237 if (&deq == &stats) {
238 Aggregator::Ptr aggregator = new CountAggregator();
239 aggregator->SetFilter(filter);
240 aggregators.push_back(aggregator);
242 } else if (header == "Negate" || header == "StatsNegate") {
243 std::deque<Filter::Ptr>& deq = (header == "Negate") ? filters : stats;
248 m_ErrorMessage = "Negate/StatsNegate used, however the filter stack is empty";
252 Filter::Ptr filter = deq.back();
258 m_ErrorMessage = "Negate/StatsNegate used, however last stats doesn't have a filter";
262 deq.push_back(new NegateFilter(filter));
265 Aggregator::Ptr aggregator = aggregators.back();
266 aggregator->SetFilter(filter);
271 /* Combine all top-level filters into a single filter. */
272 AndFilter::Ptr top_filter = new AndFilter();
274 for (const Filter::Ptr& filter : filters) {
275 top_filter->AddSubFilter(filter);
278 m_Filter = top_filter;
279 m_Aggregators.swap(aggregators);
282 int LivestatusQuery::GetExternalCommands(void)
284 boost::mutex::scoped_lock lock(l_QueryMutex);
286 return l_ExternalCommands;
289 Filter::Ptr LivestatusQuery::ParseFilter(const String& params, unsigned long& from, unsigned long& until)
293 * type = SERVICE FLAPPING ALERT
295 std::vector<String> tokens;
297 String temp_buffer = params;
299 /* extract attr and op */
300 for (int i = 0; i < 2; i++) {
301 sp_index = temp_buffer.FindFirstOf(" ");
303 /* check if this is the last argument */
304 if (sp_index == String::NPos) {
305 /* 'attr op' or 'attr op val' is valid */
307 BOOST_THROW_EXCEPTION(std::runtime_error("Livestatus filter '" + params + "' does not contain all required fields."));
312 tokens.push_back(temp_buffer.SubStr(0, sp_index));
313 temp_buffer = temp_buffer.SubStr(sp_index + 1);
316 /* add the rest as value */
317 tokens.push_back(temp_buffer);
319 if (tokens.size() == 2)
320 tokens.push_back("");
322 if (tokens.size() < 3)
323 return Filter::Ptr();
326 String attr = tokens[0];
327 String op = tokens[1];
328 String val = tokens[2];
333 } else if (op == "!~") {
336 } else if (op == "!=~") {
339 } else if (op == "!~~") {
344 Filter::Ptr filter = new AttributeFilter(attr, op, val);
347 filter = new NegateFilter(filter);
349 /* pre-filter log time duration */
350 if (attr == "time") {
351 if (op == "<" || op == "<=") {
352 until = Convert::ToLong(val);
353 } else if (op == ">" || op == ">=") {
354 from = Convert::ToLong(val);
358 Log(LogDebug, "LivestatusQuery")
359 << "Parsed filter with attr: '" << attr << "' op: '" << op << "' val: '" << val << "'.";
364 void LivestatusQuery::BeginResultSet(std::ostream& fp) const
366 if (m_OutputFormat == "json" || m_OutputFormat == "python")
370 void LivestatusQuery::EndResultSet(std::ostream& fp) const
372 if (m_OutputFormat == "json" || m_OutputFormat == "python")
376 void LivestatusQuery::AppendResultRow(std::ostream& fp, const Array::Ptr& row, bool& first_row) const
378 if (m_OutputFormat == "csv") {
381 ObjectLock rlock(row);
382 for (const Value& value : row) {
386 fp << m_Separators[1];
388 if (value.IsObjectType<Array>())
389 PrintCsvArray(fp, value, 0);
394 fp << m_Separators[0];
395 } else if (m_OutputFormat == "json") {
399 fp << JsonEncode(row);
400 } else if (m_OutputFormat == "python") {
404 PrintPythonArray(fp, row);
410 void LivestatusQuery::PrintCsvArray(std::ostream& fp, const Array::Ptr& array, int level) const
414 ObjectLock olock(array);
415 for (const Value& value : array) {
419 fp << ((level == 0) ? m_Separators[2] : m_Separators[3]);
421 if (value.IsObjectType<Array>())
422 PrintCsvArray(fp, value, level + 1);
423 else if (value.IsBoolean())
424 fp << Convert::ToLong(value);
430 void LivestatusQuery::PrintPythonArray(std::ostream& fp, const Array::Ptr& rs) const
436 for (const Value& value : rs) {
442 if (value.IsObjectType<Array>())
443 PrintPythonArray(fp, value);
444 else if (value.IsNumber())
447 fp << QuoteStringPython(value);
453 String LivestatusQuery::QuoteStringPython(const String& str) {
455 boost::algorithm::replace_all(result, "\"", "\\\"");
456 return "r\"" + result + "\"";
459 void LivestatusQuery::ExecuteGetHelper(const Stream::Ptr& stream)
461 Log(LogNotice, "LivestatusQuery")
462 << "Table: " << m_Table;
464 Table::Ptr table = Table::GetByName(m_Table, m_CompatLogPath, m_LogTimeFrom, m_LogTimeUntil);
467 SendResponse(stream, LivestatusErrorNotFound, "Table '" + m_Table + "' does not exist.");
472 std::vector<LivestatusRowValue> objects = table->FilterRows(m_Filter, m_Limit);
473 std::vector<String> columns;
475 if (m_Columns.size() > 0)
478 columns = table->GetColumnNames();
480 std::ostringstream result;
481 bool first_row = true;
482 BeginResultSet(result);
484 if (m_Aggregators.empty()) {
485 Array::Ptr header = new Array();
487 typedef std::pair<String, Column> ColumnPair;
489 std::vector<ColumnPair> column_objs;
490 column_objs.reserve(columns.size());
492 for (const String& columnName : columns)
493 column_objs.push_back(std::make_pair(columnName, table->GetColumn(columnName)));
495 for (const LivestatusRowValue& object : objects) {
496 Array::Ptr row = new Array();
498 row->Reserve(column_objs.size());
500 for (const ColumnPair& cv : column_objs) {
502 header->Add(cv.first);
504 row->Add(cv.second.ExtractValue(object.Row, object.GroupByType, object.GroupByObject));
507 if (m_ColumnHeaders) {
508 AppendResultRow(result, header, first_row);
509 m_ColumnHeaders = false;
512 AppendResultRow(result, row, first_row);
515 std::vector<double> stats(m_Aggregators.size(), 0);
518 /* add aggregated stats */
519 for (const Aggregator::Ptr aggregator : m_Aggregators) {
520 for (const LivestatusRowValue& object : objects) {
521 aggregator->Apply(table, object.Row);
524 stats[index] = aggregator->GetResult();
528 /* add column headers both for raw and aggregated data */
529 if (m_ColumnHeaders) {
530 Array::Ptr header = new Array();
532 for (const String& columnName : m_Columns) {
533 header->Add(columnName);
536 for (size_t i = 1; i <= m_Aggregators.size(); i++) {
537 header->Add("stats_" + Convert::ToString(i));
540 AppendResultRow(result, header, first_row);
543 Array::Ptr row = new Array();
545 row->Reserve(m_Columns.size() + m_Aggregators.size());
548 * add selected columns next to stats
549 * may not be accurate for grouping!
551 if (objects.size() > 0 && m_Columns.size() > 0) {
552 for (const String& columnName : m_Columns) {
553 Column column = table->GetColumn(columnName);
555 LivestatusRowValue object = objects[0]; //first object wins
557 row->Add(column.ExtractValue(object.Row, object.GroupByType, object.GroupByObject));
561 for (size_t i = 0; i < m_Aggregators.size(); i++)
564 AppendResultRow(result, row, first_row);
567 EndResultSet(result);
569 SendResponse(stream, LivestatusErrorOK, result.str());
572 void LivestatusQuery::ExecuteCommandHelper(const Stream::Ptr& stream)
575 boost::mutex::scoped_lock lock(l_QueryMutex);
577 l_ExternalCommands++;
580 Log(LogNotice, "LivestatusQuery")
581 << "Executing command: " << m_Command;
582 ExternalCommandProcessor::Execute(m_Command);
583 SendResponse(stream, LivestatusErrorOK, "");
586 void LivestatusQuery::ExecuteErrorHelper(const Stream::Ptr& stream)
588 Log(LogDebug, "LivestatusQuery")
589 << "ERROR: Code: '" << m_ErrorCode << "' Message: '" << m_ErrorMessage << "'.";
590 SendResponse(stream, m_ErrorCode, m_ErrorMessage);
593 void LivestatusQuery::SendResponse(const Stream::Ptr& stream, int code, const String& data)
595 if (m_ResponseHeader == "fixed16")
596 PrintFixed16(stream, code, data);
598 if (m_ResponseHeader == "fixed16" || code == LivestatusErrorOK) {
600 stream->Write(data.CStr(), data.GetLength());
601 } catch (const std::exception&) {
602 Log(LogCritical, "LivestatusQuery", "Cannot write query response to socket.");
607 void LivestatusQuery::PrintFixed16(const Stream::Ptr& stream, int code, const String& data)
609 ASSERT(code >= 100 && code <= 999);
611 String sCode = Convert::ToString(code);
612 String sLength = Convert::ToString(static_cast<long>(data.GetLength()));
614 String header = sCode + String(16 - 3 - sLength.GetLength() - 1, ' ') + sLength + m_Separators[0];
617 stream->Write(header.CStr(), header.GetLength());
618 } catch (const std::exception&) {
619 Log(LogCritical, "LivestatusQuery", "Cannot write to TCP socket.");
623 bool LivestatusQuery::Execute(const Stream::Ptr& stream)
626 Log(LogNotice, "LivestatusQuery")
627 << "Executing livestatus query: " << m_Verb;
630 ExecuteGetHelper(stream);
631 else if (m_Verb == "COMMAND")
632 ExecuteCommandHelper(stream);
633 else if (m_Verb == "ERROR")
634 ExecuteErrorHelper(stream);
636 BOOST_THROW_EXCEPTION(std::runtime_error("Invalid livestatus query verb."));
637 } catch (const std::exception& ex) {
638 SendResponse(stream, LivestatusErrorQuery, DiagnosticInformation(ex));