1 /******************************************************************************
3 * Copyright (C) 2012-2014 Icinga Development Team (http://www.icinga.org) *
5 * This program is free software; you can redistribute it and/or *
6 * modify it under the terms of the GNU General Public License *
7 * as published by the Free Software Foundation; either version 2 *
8 * of the License, or (at your option) any later version. *
10 * This program is distributed in the hope that it will be useful, *
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
13 * GNU General Public License for more details. *
15 * You should have received a copy of the GNU General Public License *
16 * along with this program; if not, write to the Free Software Foundation *
17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
18 ******************************************************************************/
20 #include "livestatus/livestatusquery.hpp"
21 #include "livestatus/countaggregator.hpp"
22 #include "livestatus/sumaggregator.hpp"
23 #include "livestatus/minaggregator.hpp"
24 #include "livestatus/maxaggregator.hpp"
25 #include "livestatus/avgaggregator.hpp"
26 #include "livestatus/stdaggregator.hpp"
27 #include "livestatus/invsumaggregator.hpp"
28 #include "livestatus/invavgaggregator.hpp"
29 #include "livestatus/attributefilter.hpp"
30 #include "livestatus/negatefilter.hpp"
31 #include "livestatus/orfilter.hpp"
32 #include "livestatus/andfilter.hpp"
33 #include "icinga/externalcommandprocessor.hpp"
34 #include "base/debug.hpp"
35 #include "base/convert.hpp"
36 #include "base/objectlock.hpp"
37 #include "base/logger_fwd.hpp"
38 #include "base/exception.hpp"
39 #include "base/utility.hpp"
40 #include "base/serializer.hpp"
41 #include <boost/algorithm/string/classification.hpp>
42 #include <boost/foreach.hpp>
43 #include <boost/algorithm/string/replace.hpp>
44 #include <boost/algorithm/string/split.hpp>
46 using namespace icinga;
48 static int l_ExternalCommands = 0;
49 static boost::mutex l_QueryMutex;
51 LivestatusQuery::LivestatusQuery(const std::vector<String>& lines, const String& compat_log_path)
52 : m_KeepAlive(false), m_OutputFormat("csv"), m_ColumnHeaders(true),
53 m_LogTimeFrom(0), m_LogTimeUntil(static_cast<long>(Utility::GetTime()))
55 if (lines.size() == 0) {
57 m_ErrorCode = LivestatusErrorQuery;
58 m_ErrorMessage = "Empty Query. Aborting.";
63 BOOST_FOREACH(const String& line, lines) {
66 Log(LogDebug, "LivestatusQuery", msg);
68 m_CompatLogPath = compat_log_path;
70 /* default separators */
71 m_Separators.push_back("\n");
72 m_Separators.push_back(";");
73 m_Separators.push_back(",");
74 m_Separators.push_back("|");
76 String line = lines[0];
78 size_t sp_index = line.FindFirstOf(" ");
80 if (sp_index == String::NPos)
81 BOOST_THROW_EXCEPTION(std::runtime_error("Livestatus header must contain a verb."));
83 String verb = line.SubStr(0, sp_index);
84 String target = line.SubStr(sp_index + 1);
88 if (m_Verb == "COMMAND") {
91 } else if (m_Verb == "GET") {
95 m_ErrorCode = LivestatusErrorQuery;
96 m_ErrorMessage = "Unknown livestatus verb: " + m_Verb;
100 std::deque<Filter::Ptr> filters, stats;
101 std::deque<Aggregator::Ptr> aggregators;
103 for (unsigned int i = 1; i < lines.size(); i++) {
106 size_t col_index = line.FindFirstOf(":");
107 String header = line.SubStr(0, col_index);
110 //OutputFormat:json or OutputFormat: json
111 if (line.GetLength() > col_index + 1)
112 params = line.SubStr(col_index + 1);
116 if (header == "ResponseHeader")
117 m_ResponseHeader = params;
118 else if (header == "OutputFormat")
119 m_OutputFormat = params;
120 else if (header == "KeepAlive")
121 m_KeepAlive = (params == "on");
122 else if (header == "Columns") {
123 m_ColumnHeaders = false; // Might be explicitly re-enabled later on
124 boost::algorithm::split(m_Columns, params, boost::is_any_of(" "));
125 } else if (header == "Separators") {
126 std::vector<String> separators;
128 boost::algorithm::split(separators, params, boost::is_any_of(" "));
129 /* ugly ascii long to char conversion, but works */
130 if (separators.size() > 0)
131 m_Separators[0] = String(1, static_cast<char>(Convert::ToLong(separators[0])));
132 if (separators.size() > 1)
133 m_Separators[1] = String(1, static_cast<char>(Convert::ToLong(separators[1])));
134 if (separators.size() > 2)
135 m_Separators[2] = String(1, static_cast<char>(Convert::ToLong(separators[2])));
136 if (separators.size() > 3)
137 m_Separators[3] = String(1, static_cast<char>(Convert::ToLong(separators[3])));
138 } else if (header == "ColumnHeaders")
139 m_ColumnHeaders = (params == "on");
140 else if (header == "Filter") {
141 Filter::Ptr filter = ParseFilter(params, m_LogTimeFrom, m_LogTimeUntil);
145 m_ErrorCode = LivestatusErrorQuery;
146 m_ErrorMessage = "Invalid filter specification: " + line;
150 filters.push_back(filter);
151 } else if (header == "Stats") {
152 m_ColumnHeaders = false; // Might be explicitly re-enabled later on
154 std::vector<String> tokens;
155 boost::algorithm::split(tokens, params, boost::is_any_of(" "));
157 if (tokens.size() < 2) {
159 m_ErrorCode = LivestatusErrorQuery;
160 m_ErrorMessage = "Missing aggregator column name: " + line;
164 String aggregate_arg = tokens[0];
165 String aggregate_attr = tokens[1];
167 Aggregator::Ptr aggregator;
170 if (aggregate_arg == "sum") {
171 aggregator = make_shared<SumAggregator>(aggregate_attr);
172 } else if (aggregate_arg == "min") {
173 aggregator = make_shared<MinAggregator>(aggregate_attr);
174 } else if (aggregate_arg == "max") {
175 aggregator = make_shared<MaxAggregator>(aggregate_attr);
176 } else if (aggregate_arg == "avg") {
177 aggregator = make_shared<AvgAggregator>(aggregate_attr);
178 } else if (aggregate_arg == "std") {
179 aggregator = make_shared<StdAggregator>(aggregate_attr);
180 } else if (aggregate_arg == "suminv") {
181 aggregator = make_shared<InvSumAggregator>(aggregate_attr);
182 } else if (aggregate_arg == "avginv") {
183 aggregator = make_shared<InvAvgAggregator>(aggregate_attr);
185 filter = ParseFilter(params, m_LogTimeFrom, m_LogTimeUntil);
189 m_ErrorCode = LivestatusErrorQuery;
190 m_ErrorMessage = "Invalid filter specification: " + line;
194 aggregator = make_shared<CountAggregator>();
197 aggregator->SetFilter(filter);
198 aggregators.push_back(aggregator);
200 stats.push_back(filter);
201 } else if (header == "Or" || header == "And" || header == "StatsOr" || header == "StatsAnd") {
202 std::deque<Filter::Ptr>& deq = (header == "Or" || header == "And") ? filters : stats;
204 unsigned int num = Convert::ToLong(params);
205 CombinerFilter::Ptr filter;
207 if (header == "Or" || header == "StatsOr") {
208 filter = make_shared<OrFilter>();
209 Log(LogDebug, "LivestatusQuery", "Add OR filter for " + params + " column(s). " + Convert::ToString(deq.size()) + " filters available.");
211 filter = make_shared<AndFilter>();
212 Log(LogDebug, "LivestatusQuery", "Add AND filter for " + params + " column(s). " + Convert::ToString(deq.size()) + " filters available.");
215 if (num > deq.size()) {
218 m_ErrorMessage = "Or/StatsOr is referencing " + Convert::ToString(num) + " filters; stack only contains " + Convert::ToString(static_cast<long>(deq.size())) + " filters";
222 while (num > 0 && num--) {
223 filter->AddSubFilter(deq.back());
224 Log(LogDebug, "LivestatusQuery", "Add " + Convert::ToString(num) + " filter.");
227 aggregators.pop_back();
230 deq.push_back(filter);
231 if (&deq == &stats) {
232 Aggregator::Ptr aggregator = make_shared<CountAggregator>();
233 aggregator->SetFilter(filter);
234 aggregators.push_back(aggregator);
236 } else if (header == "Negate" || header == "StatsNegate") {
237 std::deque<Filter::Ptr>& deq = (header == "Negate") ? filters : stats;
242 m_ErrorMessage = "Negate/StatsNegate used, however the filter stack is empty";
246 Filter::Ptr filter = deq.back();
252 m_ErrorMessage = "Negate/StatsNegate used, however last stats doesn't have a filter";
256 deq.push_back(make_shared<NegateFilter>(filter));
259 Aggregator::Ptr aggregator = aggregators.back();
260 aggregator->SetFilter(filter);
265 /* Combine all top-level filters into a single filter. */
266 AndFilter::Ptr top_filter = make_shared<AndFilter>();
268 BOOST_FOREACH(const Filter::Ptr& filter, filters) {
269 top_filter->AddSubFilter(filter);
272 m_Filter = top_filter;
273 m_Aggregators.swap(aggregators);
276 int LivestatusQuery::GetExternalCommands(void)
278 boost::mutex::scoped_lock lock(l_QueryMutex);
280 return l_ExternalCommands;
283 Filter::Ptr LivestatusQuery::ParseFilter(const String& params, unsigned long& from, unsigned long& until)
287 * type = SERVICE FLAPPING ALERT
289 std::vector<String> tokens;
291 String temp_buffer = params;
293 /* extract attr and op */
294 for (int i = 0; i < 2; i++) {
295 sp_index = temp_buffer.FindFirstOf(" ");
297 /* check if this is the last argument */
298 if (sp_index == String::NPos) {
299 /* 'attr op' or 'attr op val' is valid */
301 BOOST_THROW_EXCEPTION(std::runtime_error("Livestatus filter '" + params + "' does not contain all required fields."));
306 tokens.push_back(temp_buffer.SubStr(0, sp_index));
307 temp_buffer = temp_buffer.SubStr(sp_index + 1);
310 /* add the rest as value */
311 tokens.push_back(temp_buffer);
313 if (tokens.size() == 2)
314 tokens.push_back("");
316 if (tokens.size() < 3)
317 return Filter::Ptr();
320 String attr = tokens[0];
321 String op = tokens[1];
322 String val = tokens[2];
327 } else if (op == "!~") {
330 } else if (op == "!=~") {
333 } else if (op == "!~~") {
338 Filter::Ptr filter = make_shared<AttributeFilter>(attr, op, val);
341 filter = make_shared<NegateFilter>(filter);
343 /* pre-filter log time duration */
344 if (attr == "time") {
345 if (op == "<" || op == "<=") {
346 until = Convert::ToLong(val);
347 } else if (op == ">" || op == ">=") {
348 from = Convert::ToLong(val);
352 Log(LogDebug, "LivestatusQuery", "Parsed filter with attr: '" + attr + "' op: '" + op + "' val: '" + val + "'.");
357 void LivestatusQuery::PrintResultSet(std::ostream& fp, const Array::Ptr& rs) const
359 if (m_OutputFormat == "csv") {
360 ObjectLock olock(rs);
362 BOOST_FOREACH(const Array::Ptr& row, rs) {
365 ObjectLock rlock(row);
366 BOOST_FOREACH(const Value& value, row) {
370 fp << m_Separators[1];
372 if (value.IsObjectType<Array>())
373 PrintCsvArray(fp, value, 0);
378 fp << m_Separators[0];
380 } else if (m_OutputFormat == "json") {
381 fp << JsonSerialize(rs);
382 } else if (m_OutputFormat == "python") {
383 PrintPythonArray(fp, rs);
387 void LivestatusQuery::PrintCsvArray(std::ostream& fp, const Array::Ptr& array, int level) const
391 ObjectLock olock(array);
392 BOOST_FOREACH(const Value& value, array) {
396 fp << ((level == 0) ? m_Separators[2] : m_Separators[3]);
398 if (value.IsObjectType<Array>())
399 PrintCsvArray(fp, value, level + 1);
405 void LivestatusQuery::PrintPythonArray(std::ostream& fp, const Array::Ptr& rs) const
411 BOOST_FOREACH(const Value& value, rs) {
417 if (value.IsObjectType<Array>())
418 PrintPythonArray(fp, value);
419 else if (value.IsNumber())
422 fp << QuoteStringPython(value);
428 String LivestatusQuery::QuoteStringPython(const String& str) {
430 boost::algorithm::replace_all(result, "\"", "\\\"");
431 return "r\"" + result + "\"";
434 void LivestatusQuery::ExecuteGetHelper(const Stream::Ptr& stream)
436 Log(LogInformation, "LivestatusQuery", "Table: " + m_Table);
438 Table::Ptr table = Table::GetByName(m_Table, m_CompatLogPath, m_LogTimeFrom, m_LogTimeUntil);
441 SendResponse(stream, LivestatusErrorNotFound, "Table '" + m_Table + "' does not exist.");
446 std::vector<Value> objects = table->FilterRows(m_Filter);
447 std::vector<String> columns;
449 if (m_Columns.size() > 0)
452 columns = table->GetColumnNames();
454 Array::Ptr rs = make_shared<Array>();
456 if (m_Aggregators.empty()) {
457 Array::Ptr header = make_shared<Array>();
459 BOOST_FOREACH(const Value& object, objects) {
460 Array::Ptr row = make_shared<Array>();
462 BOOST_FOREACH(const String& columnName, columns) {
463 Column column = table->GetColumn(columnName);
466 header->Add(columnName);
468 row->Add(column.ExtractValue(object));
471 if (m_ColumnHeaders) {
473 m_ColumnHeaders = false;
479 std::vector<double> stats(m_Aggregators.size(), 0);
482 /* add aggregated stats */
483 BOOST_FOREACH(const Aggregator::Ptr aggregator, m_Aggregators) {
484 BOOST_FOREACH(const Value& object, objects) {
485 aggregator->Apply(table, object);
488 stats[index] = aggregator->GetResult();
492 /* add column headers both for raw and aggregated data */
493 if (m_ColumnHeaders) {
494 Array::Ptr header = make_shared<Array>();
496 BOOST_FOREACH(const String& columnName, m_Columns) {
497 header->Add(columnName);
500 for (size_t i = 1; i <= m_Aggregators.size(); i++) {
501 header->Add("stats_" + Convert::ToString(i));
507 Array::Ptr row = make_shared<Array>();
510 * add selected columns next to stats
511 * may not be accurate for grouping!
513 if (objects.size() > 0 && m_Columns.size() > 0) {
514 BOOST_FOREACH(const String& columnName, m_Columns) {
515 Column column = table->GetColumn(columnName);
517 row->Add(column.ExtractValue(objects[0])); // first object wins
521 for (size_t i = 0; i < m_Aggregators.size(); i++)
527 std::ostringstream result;
528 PrintResultSet(result, rs);
530 SendResponse(stream, LivestatusErrorOK, result.str());
533 void LivestatusQuery::ExecuteCommandHelper(const Stream::Ptr& stream)
536 boost::mutex::scoped_lock lock(l_QueryMutex);
538 l_ExternalCommands++;
541 Log(LogInformation, "LivestatusQuery", "Executing command: " + m_Command);
542 ExternalCommandProcessor::Execute(m_Command);
543 SendResponse(stream, LivestatusErrorOK, "");
546 void LivestatusQuery::ExecuteErrorHelper(const Stream::Ptr& stream)
548 Log(LogDebug, "LivestatusQuery", "ERROR: Code: '" + Convert::ToString(m_ErrorCode) + "' Message: '" + m_ErrorMessage + "'.");
549 SendResponse(stream, m_ErrorCode, m_ErrorMessage);
552 void LivestatusQuery::SendResponse(const Stream::Ptr& stream, int code, const String& data)
554 if (m_ResponseHeader == "fixed16")
555 PrintFixed16(stream, code, data);
557 if (m_ResponseHeader == "fixed16" || code == LivestatusErrorOK) {
559 stream->Write(data.CStr(), data.GetLength());
560 } catch (const std::exception&) {
561 Log(LogCritical, "LivestatusQuery", "Cannot write to tcp socket.");
566 void LivestatusQuery::PrintFixed16(const Stream::Ptr& stream, int code, const String& data)
568 ASSERT(code >= 100 && code <= 999);
570 String sCode = Convert::ToString(code);
571 String sLength = Convert::ToString(static_cast<long>(data.GetLength()));
573 String header = sCode + String(16 - 3 - sLength.GetLength() - 1, ' ') + sLength + m_Separators[0];
576 stream->Write(header.CStr(), header.GetLength());
577 } catch (const std::exception&) {
578 Log(LogCritical, "LivestatusQuery", "Cannot write to tcp socket.");
582 bool LivestatusQuery::Execute(const Stream::Ptr& stream)
585 Log(LogInformation, "LivestatusQuery", "Executing livestatus query: " + m_Verb);
588 ExecuteGetHelper(stream);
589 else if (m_Verb == "COMMAND")
590 ExecuteCommandHelper(stream);
591 else if (m_Verb == "ERROR")
592 ExecuteErrorHelper(stream);
594 BOOST_THROW_EXCEPTION(std::runtime_error("Invalid livestatus query verb."));
595 } catch (const std::exception& ex) {
596 SendResponse(stream, LivestatusErrorQuery, DiagnosticInformation(ex));