1 /* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */
3 #include "livestatus/livestatusquery.hpp"
4 #include "livestatus/countaggregator.hpp"
5 #include "livestatus/sumaggregator.hpp"
6 #include "livestatus/minaggregator.hpp"
7 #include "livestatus/maxaggregator.hpp"
8 #include "livestatus/avgaggregator.hpp"
9 #include "livestatus/stdaggregator.hpp"
10 #include "livestatus/invsumaggregator.hpp"
11 #include "livestatus/invavgaggregator.hpp"
12 #include "livestatus/attributefilter.hpp"
13 #include "livestatus/negatefilter.hpp"
14 #include "livestatus/orfilter.hpp"
15 #include "livestatus/andfilter.hpp"
16 #include "icinga/externalcommandprocessor.hpp"
17 #include "base/debug.hpp"
18 #include "base/convert.hpp"
19 #include "base/objectlock.hpp"
20 #include "base/logger.hpp"
21 #include "base/exception.hpp"
22 #include "base/utility.hpp"
23 #include "base/json.hpp"
24 #include "base/serializer.hpp"
25 #include "base/timer.hpp"
26 #include "base/initialize.hpp"
27 #include <boost/algorithm/string/replace.hpp>
28 #include <boost/algorithm/string/join.hpp>
30 using namespace icinga;
32 static int l_ExternalCommands = 0;
33 static boost::mutex l_QueryMutex;
35 LivestatusQuery::LivestatusQuery(const std::vector<String>& lines, const String& compat_log_path)
36 : m_KeepAlive(false), m_OutputFormat("csv"), m_ColumnHeaders(true), m_Limit(-1), m_ErrorCode(0),
37 m_LogTimeFrom(0), m_LogTimeUntil(static_cast<long>(Utility::GetTime()))
39 if (lines.size() == 0) {
41 m_ErrorCode = LivestatusErrorQuery;
42 m_ErrorMessage = "Empty Query. Aborting.";
47 for (const String& line : lines) {
50 Log(LogDebug, "LivestatusQuery", msg);
52 m_CompatLogPath = compat_log_path;
54 /* default separators */
55 m_Separators.emplace_back("\n");
56 m_Separators.emplace_back(";");
57 m_Separators.emplace_back(",");
58 m_Separators.emplace_back("|");
60 String line = lines[0];
62 size_t sp_index = line.FindFirstOf(" ");
64 if (sp_index == String::NPos)
65 BOOST_THROW_EXCEPTION(std::runtime_error("Livestatus header must contain a verb."));
67 String verb = line.SubStr(0, sp_index);
68 String target = line.SubStr(sp_index + 1);
72 if (m_Verb == "COMMAND") {
75 } else if (m_Verb == "GET") {
79 m_ErrorCode = LivestatusErrorQuery;
80 m_ErrorMessage = "Unknown livestatus verb: " + m_Verb;
84 std::deque<Filter::Ptr> filters, stats;
85 std::deque<Aggregator::Ptr> aggregators;
87 for (unsigned int i = 1; i < lines.size(); i++) {
90 size_t col_index = line.FindFirstOf(":");
91 String header = line.SubStr(0, col_index);
94 //OutputFormat:json or OutputFormat: json
95 if (line.GetLength() > col_index + 1)
96 params = line.SubStr(col_index + 1).Trim();
98 if (header == "ResponseHeader")
99 m_ResponseHeader = params;
100 else if (header == "OutputFormat")
101 m_OutputFormat = params;
102 else if (header == "KeepAlive")
103 m_KeepAlive = (params == "on");
104 else if (header == "Columns") {
105 m_ColumnHeaders = false; // Might be explicitly re-enabled later on
106 m_Columns = params.Split(" ");
107 } else if (header == "Separators") {
108 std::vector<String> separators = params.Split(" ");
110 /* ugly ascii long to char conversion, but works */
111 if (separators.size() > 0)
112 m_Separators[0] = String(1, static_cast<char>(Convert::ToLong(separators[0])));
113 if (separators.size() > 1)
114 m_Separators[1] = String(1, static_cast<char>(Convert::ToLong(separators[1])));
115 if (separators.size() > 2)
116 m_Separators[2] = String(1, static_cast<char>(Convert::ToLong(separators[2])));
117 if (separators.size() > 3)
118 m_Separators[3] = String(1, static_cast<char>(Convert::ToLong(separators[3])));
119 } else if (header == "ColumnHeaders")
120 m_ColumnHeaders = (params == "on");
121 else if (header == "Limit")
122 m_Limit = Convert::ToLong(params);
123 else if (header == "Filter") {
124 Filter::Ptr filter = ParseFilter(params, m_LogTimeFrom, m_LogTimeUntil);
128 m_ErrorCode = LivestatusErrorQuery;
129 m_ErrorMessage = "Invalid filter specification: " + line;
133 filters.push_back(filter);
134 } else if (header == "Stats") {
135 m_ColumnHeaders = false; // Might be explicitly re-enabled later on
137 std::vector<String> tokens = params.Split(" ");
139 if (tokens.size() < 2) {
141 m_ErrorCode = LivestatusErrorQuery;
142 m_ErrorMessage = "Missing aggregator column name: " + line;
146 String aggregate_arg = tokens[0];
147 String aggregate_attr = tokens[1];
149 Aggregator::Ptr aggregator;
152 if (aggregate_arg == "sum") {
153 aggregator = new SumAggregator(aggregate_attr);
154 } else if (aggregate_arg == "min") {
155 aggregator = new MinAggregator(aggregate_attr);
156 } else if (aggregate_arg == "max") {
157 aggregator = new MaxAggregator(aggregate_attr);
158 } else if (aggregate_arg == "avg") {
159 aggregator = new AvgAggregator(aggregate_attr);
160 } else if (aggregate_arg == "std") {
161 aggregator = new StdAggregator(aggregate_attr);
162 } else if (aggregate_arg == "suminv") {
163 aggregator = new InvSumAggregator(aggregate_attr);
164 } else if (aggregate_arg == "avginv") {
165 aggregator = new InvAvgAggregator(aggregate_attr);
167 filter = ParseFilter(params, m_LogTimeFrom, m_LogTimeUntil);
171 m_ErrorCode = LivestatusErrorQuery;
172 m_ErrorMessage = "Invalid filter specification: " + line;
176 aggregator = new CountAggregator();
179 aggregator->SetFilter(filter);
180 aggregators.push_back(aggregator);
182 stats.push_back(filter);
183 } else if (header == "Or" || header == "And" || header == "StatsOr" || header == "StatsAnd") {
184 std::deque<Filter::Ptr>& deq = (header == "Or" || header == "And") ? filters : stats;
186 unsigned int num = Convert::ToLong(params);
187 CombinerFilter::Ptr filter;
189 if (header == "Or" || header == "StatsOr") {
190 filter = new OrFilter();
191 Log(LogDebug, "LivestatusQuery")
192 << "Add OR filter for " << params << " column(s). " << deq.size() << " filters available.";
194 filter = new AndFilter();
195 Log(LogDebug, "LivestatusQuery")
196 << "Add AND filter for " << params << " column(s). " << deq.size() << " filters available.";
199 if (num > deq.size()) {
202 m_ErrorMessage = "Or/StatsOr is referencing " + Convert::ToString(num) + " filters; stack only contains " + Convert::ToString(static_cast<long>(deq.size())) + " filters";
206 while (num > 0 && num--) {
207 filter->AddSubFilter(deq.back());
208 Log(LogDebug, "LivestatusQuery")
209 << "Add " << num << " filter.";
212 aggregators.pop_back();
215 deq.emplace_back(filter);
216 if (&deq == &stats) {
217 Aggregator::Ptr aggregator = new CountAggregator();
218 aggregator->SetFilter(filter);
219 aggregators.push_back(aggregator);
221 } else if (header == "Negate" || header == "StatsNegate") {
222 std::deque<Filter::Ptr>& deq = (header == "Negate") ? filters : stats;
227 m_ErrorMessage = "Negate/StatsNegate used, however the filter stack is empty";
231 Filter::Ptr filter = deq.back();
237 m_ErrorMessage = "Negate/StatsNegate used, however last stats doesn't have a filter";
241 deq.push_back(new NegateFilter(filter));
244 Aggregator::Ptr aggregator = aggregators.back();
245 aggregator->SetFilter(filter);
250 /* Combine all top-level filters into a single filter. */
251 AndFilter::Ptr top_filter = new AndFilter();
253 for (const Filter::Ptr& filter : filters) {
254 top_filter->AddSubFilter(filter);
257 m_Filter = top_filter;
258 m_Aggregators.swap(aggregators);
261 int LivestatusQuery::GetExternalCommands()
263 boost::mutex::scoped_lock lock(l_QueryMutex);
265 return l_ExternalCommands;
268 Filter::Ptr LivestatusQuery::ParseFilter(const String& params, unsigned long& from, unsigned long& until)
272 * type = SERVICE FLAPPING ALERT
274 std::vector<String> tokens;
276 String temp_buffer = params;
278 /* extract attr and op */
279 for (int i = 0; i < 2; i++) {
280 sp_index = temp_buffer.FindFirstOf(" ");
282 /* check if this is the last argument */
283 if (sp_index == String::NPos) {
284 /* 'attr op' or 'attr op val' is valid */
286 BOOST_THROW_EXCEPTION(std::runtime_error("Livestatus filter '" + params + "' does not contain all required fields."));
291 tokens.emplace_back(temp_buffer.SubStr(0, sp_index));
292 temp_buffer = temp_buffer.SubStr(sp_index + 1);
295 /* add the rest as value */
296 tokens.emplace_back(std::move(temp_buffer));
298 if (tokens.size() == 2)
299 tokens.emplace_back("");
301 if (tokens.size() < 3)
305 String attr = tokens[0];
306 String op = tokens[1];
307 String val = tokens[2];
312 } else if (op == "!~") {
315 } else if (op == "!=~") {
318 } else if (op == "!~~") {
323 Filter::Ptr filter = new AttributeFilter(attr, op, val);
326 filter = new NegateFilter(filter);
328 /* pre-filter log time duration */
329 if (attr == "time") {
330 if (op == "<" || op == "<=") {
331 until = Convert::ToLong(val);
332 } else if (op == ">" || op == ">=") {
333 from = Convert::ToLong(val);
337 Log(LogDebug, "LivestatusQuery")
338 << "Parsed filter with attr: '" << attr << "' op: '" << op << "' val: '" << val << "'.";
343 void LivestatusQuery::BeginResultSet(std::ostream& fp) const
345 if (m_OutputFormat == "json" || m_OutputFormat == "python")
349 void LivestatusQuery::EndResultSet(std::ostream& fp) const
351 if (m_OutputFormat == "json" || m_OutputFormat == "python")
355 void LivestatusQuery::AppendResultRow(std::ostream& fp, const Array::Ptr& row, bool& first_row) const
357 if (m_OutputFormat == "csv") {
360 ObjectLock rlock(row);
361 for (const Value& value : row) {
365 fp << m_Separators[1];
367 if (value.IsObjectType<Array>())
368 PrintCsvArray(fp, value, 0);
373 fp << m_Separators[0];
374 } else if (m_OutputFormat == "json") {
378 fp << JsonEncode(row);
379 } else if (m_OutputFormat == "python") {
383 PrintPythonArray(fp, row);
389 void LivestatusQuery::PrintCsvArray(std::ostream& fp, const Array::Ptr& array, int level) const
393 ObjectLock olock(array);
394 for (const Value& value : array) {
398 fp << ((level == 0) ? m_Separators[2] : m_Separators[3]);
400 if (value.IsObjectType<Array>())
401 PrintCsvArray(fp, value, level + 1);
402 else if (value.IsBoolean())
403 fp << Convert::ToLong(value);
409 void LivestatusQuery::PrintPythonArray(std::ostream& fp, const Array::Ptr& rs) const
415 for (const Value& value : rs) {
421 if (value.IsObjectType<Array>())
422 PrintPythonArray(fp, value);
423 else if (value.IsNumber())
426 fp << QuoteStringPython(value);
432 String LivestatusQuery::QuoteStringPython(const String& str) {
434 boost::algorithm::replace_all(result, "\"", "\\\"");
435 return "r\"" + result + "\"";
438 void LivestatusQuery::ExecuteGetHelper(const Stream::Ptr& stream)
440 Log(LogNotice, "LivestatusQuery")
441 << "Table: " << m_Table;
443 Table::Ptr table = Table::GetByName(m_Table, m_CompatLogPath, m_LogTimeFrom, m_LogTimeUntil);
446 SendResponse(stream, LivestatusErrorNotFound, "Table '" + m_Table + "' does not exist.");
451 std::vector<LivestatusRowValue> objects = table->FilterRows(m_Filter, m_Limit);
452 std::vector<String> columns;
454 if (m_Columns.size() > 0)
457 columns = table->GetColumnNames();
459 std::ostringstream result;
460 bool first_row = true;
461 BeginResultSet(result);
463 if (m_Aggregators.empty()) {
464 typedef std::pair<String, Column> ColumnPair;
466 std::vector<ColumnPair> column_objs;
467 column_objs.reserve(columns.size());
469 for (const String& columnName : columns)
470 column_objs.emplace_back(columnName, table->GetColumn(columnName));
474 for (const LivestatusRowValue& object : objects) {
477 row.reserve(column_objs.size());
479 for (const ColumnPair& cv : column_objs) {
481 header.push_back(cv.first);
483 row.push_back(cv.second.ExtractValue(object.Row, object.GroupByType, object.GroupByObject));
486 if (m_ColumnHeaders) {
487 AppendResultRow(result, new Array(std::move(header)), first_row);
488 m_ColumnHeaders = false;
491 AppendResultRow(result, new Array(std::move(row)), first_row);
494 std::map<std::vector<Value>, std::vector<AggregatorState *> > allStats;
496 /* add aggregated stats */
497 for (const LivestatusRowValue& object : objects) {
498 std::vector<Value> statsKey;
500 for (const String& columnName : m_Columns) {
501 Column column = table->GetColumn(columnName);
502 statsKey.emplace_back(column.ExtractValue(object.Row, object.GroupByType, object.GroupByObject));
505 auto it = allStats.find(statsKey);
507 if (it == allStats.end()) {
508 std::vector<AggregatorState *> newStats(m_Aggregators.size(), nullptr);
509 it = allStats.insert(std::make_pair(statsKey, newStats)).first;
512 auto& stats = it->second;
516 for (const Aggregator::Ptr& aggregator : m_Aggregators) {
517 aggregator->Apply(table, object.Row, &stats[index]);
522 /* add column headers both for raw and aggregated data */
523 if (m_ColumnHeaders) {
526 for (const String& columnName : m_Columns) {
527 header.push_back(columnName);
530 for (size_t i = 1; i <= m_Aggregators.size(); i++) {
531 header.push_back("stats_" + Convert::ToString(i));
534 AppendResultRow(result, new Array(std::move(header)), first_row);
537 for (const auto& kv : allStats) {
540 row.reserve(m_Columns.size() + m_Aggregators.size());
542 for (const Value& keyPart : kv.first) {
543 row.push_back(keyPart);
546 auto& stats = kv.second;
548 for (size_t i = 0; i < m_Aggregators.size(); i++)
549 row.push_back(m_Aggregators[i]->GetResultAndFreeState(stats[i]));
551 AppendResultRow(result, new Array(std::move(row)), first_row);
554 /* add a bogus zero value if aggregated is empty*/
555 if (allStats.empty()) {
558 row.reserve(m_Aggregators.size());
560 for (size_t i = 1; i <= m_Aggregators.size(); i++) {
564 AppendResultRow(result, new Array(std::move(row)), first_row);
568 EndResultSet(result);
570 SendResponse(stream, LivestatusErrorOK, result.str());
573 void LivestatusQuery::ExecuteCommandHelper(const Stream::Ptr& stream)
576 boost::mutex::scoped_lock lock(l_QueryMutex);
578 l_ExternalCommands++;
581 Log(LogNotice, "LivestatusQuery")
582 << "Executing command: " << m_Command;
583 ExternalCommandProcessor::Execute(m_Command);
584 SendResponse(stream, LivestatusErrorOK, "");
587 void LivestatusQuery::ExecuteErrorHelper(const Stream::Ptr& stream)
589 Log(LogDebug, "LivestatusQuery")
590 << "ERROR: Code: '" << m_ErrorCode << "' Message: '" << m_ErrorMessage << "'.";
591 SendResponse(stream, m_ErrorCode, m_ErrorMessage);
594 void LivestatusQuery::SendResponse(const Stream::Ptr& stream, int code, const String& data)
596 if (m_ResponseHeader == "fixed16")
597 PrintFixed16(stream, code, data);
599 if (m_ResponseHeader == "fixed16" || code == LivestatusErrorOK) {
601 stream->Write(data.CStr(), data.GetLength());
602 } catch (const std::exception&) {
603 Log(LogCritical, "LivestatusQuery", "Cannot write query response to socket.");
608 void LivestatusQuery::PrintFixed16(const Stream::Ptr& stream, int code, const String& data)
610 ASSERT(code >= 100 && code <= 999);
612 String sCode = Convert::ToString(code);
613 String sLength = Convert::ToString(static_cast<long>(data.GetLength()));
615 String header = sCode + String(16 - 3 - sLength.GetLength() - 1, ' ') + sLength + m_Separators[0];
618 stream->Write(header.CStr(), header.GetLength());
619 } catch (const std::exception&) {
620 Log(LogCritical, "LivestatusQuery", "Cannot write to TCP socket.");
624 bool LivestatusQuery::Execute(const Stream::Ptr& stream)
627 Log(LogNotice, "LivestatusQuery")
628 << "Executing livestatus query: " << m_Verb;
631 ExecuteGetHelper(stream);
632 else if (m_Verb == "COMMAND")
633 ExecuteCommandHelper(stream);
634 else if (m_Verb == "ERROR")
635 ExecuteErrorHelper(stream);
637 BOOST_THROW_EXCEPTION(std::runtime_error("Invalid livestatus query verb."));
638 } catch (const std::exception& ex) {
639 SendResponse(stream, LivestatusErrorQuery, DiagnosticInformation(ex));