1 /******************************************************************************
3 * Copyright (C) 2012-2015 Icinga Development Team (http://www.icinga.org) *
5 * This program is free software; you can redistribute it and/or *
6 * modify it under the terms of the GNU General Public License *
7 * as published by the Free Software Foundation; either version 2 *
8 * of the License, or (at your option) any later version. *
10 * This program is distributed in the hope that it will be useful, *
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
13 * GNU General Public License for more details. *
15 * You should have received a copy of the GNU General Public License *
16 * along with this program; if not, write to the Free Software Foundation *
17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
18 ******************************************************************************/
20 #include "livestatus/livestatusquery.hpp"
21 #include "livestatus/countaggregator.hpp"
22 #include "livestatus/sumaggregator.hpp"
23 #include "livestatus/minaggregator.hpp"
24 #include "livestatus/maxaggregator.hpp"
25 #include "livestatus/avgaggregator.hpp"
26 #include "livestatus/stdaggregator.hpp"
27 #include "livestatus/invsumaggregator.hpp"
28 #include "livestatus/invavgaggregator.hpp"
29 #include "livestatus/attributefilter.hpp"
30 #include "livestatus/negatefilter.hpp"
31 #include "livestatus/orfilter.hpp"
32 #include "livestatus/andfilter.hpp"
33 #include "icinga/externalcommandprocessor.hpp"
34 #include "config/configcompiler.hpp"
35 #include "base/debug.hpp"
36 #include "base/convert.hpp"
37 #include "base/objectlock.hpp"
38 #include "base/logger.hpp"
39 #include "base/exception.hpp"
40 #include "base/utility.hpp"
41 #include "base/json.hpp"
42 #include "base/serializer.hpp"
43 #include "base/timer.hpp"
44 #include "base/initialize.hpp"
45 #include <boost/algorithm/string/classification.hpp>
46 #include <boost/foreach.hpp>
47 #include <boost/algorithm/string/replace.hpp>
48 #include <boost/algorithm/string/split.hpp>
49 #include <boost/algorithm/string/join.hpp>
51 using namespace icinga;
53 static int l_ExternalCommands = 0;
54 static boost::mutex l_QueryMutex;
55 static std::map<String, LivestatusScriptFrame> l_LivestatusScriptFrames;
56 static Timer::Ptr l_FrameCleanupTimer;
57 static boost::mutex l_LivestatusScriptMutex;
59 static void ScriptFrameCleanupHandler(void)
61 boost::mutex::scoped_lock lock(l_LivestatusScriptMutex);
63 std::vector<String> cleanup_keys;
65 typedef std::pair<String, LivestatusScriptFrame> KVPair;
67 BOOST_FOREACH(const KVPair& kv, l_LivestatusScriptFrames) {
68 if (kv.second.Seen < Utility::GetTime() - 1800)
69 cleanup_keys.push_back(kv.first);
72 BOOST_FOREACH(const String& key, cleanup_keys)
73 l_LivestatusScriptFrames.erase(key);
76 static void InitScriptFrameCleanup(void)
78 l_FrameCleanupTimer = new Timer();
79 l_FrameCleanupTimer->OnTimerExpired.connect(boost::bind(ScriptFrameCleanupHandler));
80 l_FrameCleanupTimer->SetInterval(30);
81 l_FrameCleanupTimer->Start();
84 INITIALIZE_ONCE(InitScriptFrameCleanup);
86 LivestatusQuery::LivestatusQuery(const std::vector<String>& lines, const String& compat_log_path)
87 : m_KeepAlive(false), m_OutputFormat("csv"), m_ColumnHeaders(true), m_Limit(-1), m_ErrorCode(0),
88 m_LogTimeFrom(0), m_LogTimeUntil(static_cast<long>(Utility::GetTime()))
90 if (lines.size() == 0) {
92 m_ErrorCode = LivestatusErrorQuery;
93 m_ErrorMessage = "Empty Query. Aborting.";
98 BOOST_FOREACH(const String& line, lines) {
101 Log(LogDebug, "LivestatusQuery", msg);
103 m_CompatLogPath = compat_log_path;
105 /* default separators */
106 m_Separators.push_back("\n");
107 m_Separators.push_back(";");
108 m_Separators.push_back(",");
109 m_Separators.push_back("|");
111 String line = lines[0];
113 size_t sp_index = line.FindFirstOf(" ");
115 if (sp_index == String::NPos)
116 BOOST_THROW_EXCEPTION(std::runtime_error("Livestatus header must contain a verb."));
118 String verb = line.SubStr(0, sp_index);
119 String target = line.SubStr(sp_index + 1);
123 if (m_Verb == "COMMAND") {
126 } else if (m_Verb == "SCRIPT") {
129 for (unsigned int i = 1; i < lines.size(); i++) {
132 m_Command += lines[i];
136 } else if (m_Verb == "GET") {
140 m_ErrorCode = LivestatusErrorQuery;
141 m_ErrorMessage = "Unknown livestatus verb: " + m_Verb;
145 std::deque<Filter::Ptr> filters, stats;
146 std::deque<Aggregator::Ptr> aggregators;
148 for (unsigned int i = 1; i < lines.size(); i++) {
151 size_t col_index = line.FindFirstOf(":");
152 String header = line.SubStr(0, col_index);
155 //OutputFormat:json or OutputFormat: json
156 if (line.GetLength() > col_index + 1)
157 params = line.SubStr(col_index + 1);
161 if (header == "ResponseHeader")
162 m_ResponseHeader = params;
163 else if (header == "OutputFormat")
164 m_OutputFormat = params;
165 else if (header == "KeepAlive")
166 m_KeepAlive = (params == "on");
167 else if (header == "Columns") {
168 m_ColumnHeaders = false; // Might be explicitly re-enabled later on
169 boost::algorithm::split(m_Columns, params, boost::is_any_of(" "));
170 } else if (header == "Separators") {
171 std::vector<String> separators;
173 boost::algorithm::split(separators, params, boost::is_any_of(" "));
174 /* ugly ascii long to char conversion, but works */
175 if (separators.size() > 0)
176 m_Separators[0] = String(1, static_cast<char>(Convert::ToLong(separators[0])));
177 if (separators.size() > 1)
178 m_Separators[1] = String(1, static_cast<char>(Convert::ToLong(separators[1])));
179 if (separators.size() > 2)
180 m_Separators[2] = String(1, static_cast<char>(Convert::ToLong(separators[2])));
181 if (separators.size() > 3)
182 m_Separators[3] = String(1, static_cast<char>(Convert::ToLong(separators[3])));
183 } else if (header == "ColumnHeaders")
184 m_ColumnHeaders = (params == "on");
185 else if (header == "Limit")
186 m_Limit = Convert::ToLong(params);
187 else if (header == "Filter") {
188 Filter::Ptr filter = ParseFilter(params, m_LogTimeFrom, m_LogTimeUntil);
192 m_ErrorCode = LivestatusErrorQuery;
193 m_ErrorMessage = "Invalid filter specification: " + line;
197 filters.push_back(filter);
198 } else if (header == "Stats") {
199 m_ColumnHeaders = false; // Might be explicitly re-enabled later on
201 std::vector<String> tokens;
202 boost::algorithm::split(tokens, params, boost::is_any_of(" "));
204 if (tokens.size() < 2) {
206 m_ErrorCode = LivestatusErrorQuery;
207 m_ErrorMessage = "Missing aggregator column name: " + line;
211 String aggregate_arg = tokens[0];
212 String aggregate_attr = tokens[1];
214 Aggregator::Ptr aggregator;
217 if (aggregate_arg == "sum") {
218 aggregator = new SumAggregator(aggregate_attr);
219 } else if (aggregate_arg == "min") {
220 aggregator = new MinAggregator(aggregate_attr);
221 } else if (aggregate_arg == "max") {
222 aggregator = new MaxAggregator(aggregate_attr);
223 } else if (aggregate_arg == "avg") {
224 aggregator = new AvgAggregator(aggregate_attr);
225 } else if (aggregate_arg == "std") {
226 aggregator = new StdAggregator(aggregate_attr);
227 } else if (aggregate_arg == "suminv") {
228 aggregator = new InvSumAggregator(aggregate_attr);
229 } else if (aggregate_arg == "avginv") {
230 aggregator = new InvAvgAggregator(aggregate_attr);
232 filter = ParseFilter(params, m_LogTimeFrom, m_LogTimeUntil);
236 m_ErrorCode = LivestatusErrorQuery;
237 m_ErrorMessage = "Invalid filter specification: " + line;
241 aggregator = new CountAggregator();
244 aggregator->SetFilter(filter);
245 aggregators.push_back(aggregator);
247 stats.push_back(filter);
248 } else if (header == "Or" || header == "And" || header == "StatsOr" || header == "StatsAnd") {
249 std::deque<Filter::Ptr>& deq = (header == "Or" || header == "And") ? filters : stats;
251 unsigned int num = Convert::ToLong(params);
252 CombinerFilter::Ptr filter;
254 if (header == "Or" || header == "StatsOr") {
255 filter = new OrFilter();
256 Log(LogDebug, "LivestatusQuery")
257 << "Add OR filter for " << params << " column(s). " << deq.size() << " filters available.";
259 filter = new AndFilter();
260 Log(LogDebug, "LivestatusQuery")
261 << "Add AND filter for " << params << " column(s). " << deq.size() << " filters available.";
264 if (num > deq.size()) {
267 m_ErrorMessage = "Or/StatsOr is referencing " + Convert::ToString(num) + " filters; stack only contains " + Convert::ToString(static_cast<long>(deq.size())) + " filters";
271 while (num > 0 && num--) {
272 filter->AddSubFilter(deq.back());
273 Log(LogDebug, "LivestatusQuery")
274 << "Add " << num << " filter.";
277 aggregators.pop_back();
280 deq.push_back(filter);
281 if (&deq == &stats) {
282 Aggregator::Ptr aggregator = new CountAggregator();
283 aggregator->SetFilter(filter);
284 aggregators.push_back(aggregator);
286 } else if (header == "Negate" || header == "StatsNegate") {
287 std::deque<Filter::Ptr>& deq = (header == "Negate") ? filters : stats;
292 m_ErrorMessage = "Negate/StatsNegate used, however the filter stack is empty";
296 Filter::Ptr filter = deq.back();
302 m_ErrorMessage = "Negate/StatsNegate used, however last stats doesn't have a filter";
306 deq.push_back(new NegateFilter(filter));
309 Aggregator::Ptr aggregator = aggregators.back();
310 aggregator->SetFilter(filter);
315 /* Combine all top-level filters into a single filter. */
316 AndFilter::Ptr top_filter = new AndFilter();
318 BOOST_FOREACH(const Filter::Ptr& filter, filters) {
319 top_filter->AddSubFilter(filter);
322 m_Filter = top_filter;
323 m_Aggregators.swap(aggregators);
326 int LivestatusQuery::GetExternalCommands(void)
328 boost::mutex::scoped_lock lock(l_QueryMutex);
330 return l_ExternalCommands;
333 Filter::Ptr LivestatusQuery::ParseFilter(const String& params, unsigned long& from, unsigned long& until)
337 * type = SERVICE FLAPPING ALERT
339 std::vector<String> tokens;
341 String temp_buffer = params;
343 /* extract attr and op */
344 for (int i = 0; i < 2; i++) {
345 sp_index = temp_buffer.FindFirstOf(" ");
347 /* check if this is the last argument */
348 if (sp_index == String::NPos) {
349 /* 'attr op' or 'attr op val' is valid */
351 BOOST_THROW_EXCEPTION(std::runtime_error("Livestatus filter '" + params + "' does not contain all required fields."));
356 tokens.push_back(temp_buffer.SubStr(0, sp_index));
357 temp_buffer = temp_buffer.SubStr(sp_index + 1);
360 /* add the rest as value */
361 tokens.push_back(temp_buffer);
363 if (tokens.size() == 2)
364 tokens.push_back("");
366 if (tokens.size() < 3)
367 return Filter::Ptr();
370 String attr = tokens[0];
371 String op = tokens[1];
372 String val = tokens[2];
377 } else if (op == "!~") {
380 } else if (op == "!=~") {
383 } else if (op == "!~~") {
388 Filter::Ptr filter = new AttributeFilter(attr, op, val);
391 filter = new NegateFilter(filter);
393 /* pre-filter log time duration */
394 if (attr == "time") {
395 if (op == "<" || op == "<=") {
396 until = Convert::ToLong(val);
397 } else if (op == ">" || op == ">=") {
398 from = Convert::ToLong(val);
402 Log(LogDebug, "LivestatusQuery")
403 << "Parsed filter with attr: '" << attr << "' op: '" << op << "' val: '" << val << "'.";
408 void LivestatusQuery::PrintResultSet(std::ostream& fp, const Array::Ptr& rs) const
410 if (m_OutputFormat == "csv") {
411 ObjectLock olock(rs);
413 BOOST_FOREACH(const Array::Ptr& row, rs) {
416 ObjectLock rlock(row);
417 BOOST_FOREACH(const Value& value, row) {
421 fp << m_Separators[1];
423 if (value.IsObjectType<Array>())
424 PrintCsvArray(fp, value, 0);
429 fp << m_Separators[0];
431 } else if (m_OutputFormat == "json") {
432 fp << JsonEncode(rs);
433 } else if (m_OutputFormat == "python") {
434 PrintPythonArray(fp, rs);
438 void LivestatusQuery::PrintCsvArray(std::ostream& fp, const Array::Ptr& array, int level) const
442 ObjectLock olock(array);
443 BOOST_FOREACH(const Value& value, array) {
447 fp << ((level == 0) ? m_Separators[2] : m_Separators[3]);
449 if (value.IsObjectType<Array>())
450 PrintCsvArray(fp, value, level + 1);
451 else if (value.IsBoolean())
452 fp << Convert::ToLong(value);
458 void LivestatusQuery::PrintPythonArray(std::ostream& fp, const Array::Ptr& rs) const
464 BOOST_FOREACH(const Value& value, rs) {
470 if (value.IsObjectType<Array>())
471 PrintPythonArray(fp, value);
472 else if (value.IsNumber())
475 fp << QuoteStringPython(value);
481 String LivestatusQuery::QuoteStringPython(const String& str) {
483 boost::algorithm::replace_all(result, "\"", "\\\"");
484 return "r\"" + result + "\"";
487 void LivestatusQuery::ExecuteGetHelper(const Stream::Ptr& stream)
489 Log(LogNotice, "LivestatusQuery")
490 << "Table: " << m_Table;
492 Table::Ptr table = Table::GetByName(m_Table, m_CompatLogPath, m_LogTimeFrom, m_LogTimeUntil);
495 SendResponse(stream, LivestatusErrorNotFound, "Table '" + m_Table + "' does not exist.");
500 std::vector<LivestatusRowValue> objects = table->FilterRows(m_Filter, m_Limit);
501 std::vector<String> columns;
503 if (m_Columns.size() > 0)
506 columns = table->GetColumnNames();
508 Array::Ptr rs = new Array();
510 if (m_Aggregators.empty()) {
511 Array::Ptr header = new Array();
513 typedef std::pair<String, Column> ColumnPair;
515 std::vector<ColumnPair> column_objs;
516 column_objs.reserve(columns.size());
518 BOOST_FOREACH(const String& columnName, columns)
519 column_objs.push_back(std::make_pair(columnName, table->GetColumn(columnName)));
521 rs->Reserve(1 + objects.size());
523 BOOST_FOREACH(const LivestatusRowValue& object, objects) {
524 Array::Ptr row = new Array();
526 row->Reserve(column_objs.size());
528 BOOST_FOREACH(const ColumnPair& cv, column_objs) {
530 header->Add(cv.first);
532 row->Add(cv.second.ExtractValue(object.Row, object.GroupByType, object.GroupByObject));
535 if (m_ColumnHeaders) {
537 m_ColumnHeaders = false;
543 std::vector<double> stats(m_Aggregators.size(), 0);
546 /* add aggregated stats */
547 BOOST_FOREACH(const Aggregator::Ptr aggregator, m_Aggregators) {
548 BOOST_FOREACH(const LivestatusRowValue& object, objects) {
549 aggregator->Apply(table, object.Row);
552 stats[index] = aggregator->GetResult();
556 /* add column headers both for raw and aggregated data */
557 if (m_ColumnHeaders) {
558 Array::Ptr header = new Array();
560 BOOST_FOREACH(const String& columnName, m_Columns) {
561 header->Add(columnName);
564 for (size_t i = 1; i <= m_Aggregators.size(); i++) {
565 header->Add("stats_" + Convert::ToString(i));
571 Array::Ptr row = new Array();
573 row->Reserve(m_Columns.size() + m_Aggregators.size());
576 * add selected columns next to stats
577 * may not be accurate for grouping!
579 if (objects.size() > 0 && m_Columns.size() > 0) {
580 BOOST_FOREACH(const String& columnName, m_Columns) {
581 Column column = table->GetColumn(columnName);
583 LivestatusRowValue object = objects[0]; //first object wins
585 row->Add(column.ExtractValue(object.Row, object.GroupByType, object.GroupByObject));
589 for (size_t i = 0; i < m_Aggregators.size(); i++)
595 std::ostringstream result;
596 PrintResultSet(result, rs);
598 SendResponse(stream, LivestatusErrorOK, result.str());
601 void LivestatusQuery::ExecuteCommandHelper(const Stream::Ptr& stream)
604 boost::mutex::scoped_lock lock(l_QueryMutex);
606 l_ExternalCommands++;
609 Log(LogNotice, "LivestatusQuery")
610 << "Executing command: " << m_Command;
611 ExternalCommandProcessor::Execute(m_Command);
612 SendResponse(stream, LivestatusErrorOK, "");
615 void LivestatusQuery::ExecuteScriptHelper(const Stream::Ptr& stream)
617 Log(LogInformation, "LivestatusQuery")
618 << "Executing expression: " << m_Command;
620 m_ResponseHeader = "fixed16";
622 LivestatusScriptFrame& lsf = l_LivestatusScriptFrames[m_Session];
623 lsf.Seen = Utility::GetTime();
626 lsf.Locals = new Dictionary();
628 String fileName = "<" + Convert::ToString(lsf.NextLine) + ">";
631 lsf.Lines[fileName] = m_Command;
633 Expression *expr = NULL;
636 expr = ConfigCompiler::CompileText(fileName, m_Command);
638 frame.Locals = lsf.Locals;
639 frame.Self = lsf.Locals;
640 result = expr->Evaluate(frame);
641 } catch (const ScriptError& ex) {
644 DebugInfo di = ex.GetDebugInfo();
646 std::ostringstream msgbuf;
648 msgbuf << di.Path << ": " << lsf.Lines[di.Path] << "\n"
649 << String(di.Path.GetLength() + 2, ' ')
650 << String(di.FirstColumn, ' ') << String(di.LastColumn - di.FirstColumn + 1, '^') << "\n"
651 << ex.what() << "\n";
653 SendResponse(stream, LivestatusErrorQuery, msgbuf.str());
660 SendResponse(stream, LivestatusErrorOK, JsonEncode(Serialize(result, FAEphemeral | FAState | FAConfig), true));
663 void LivestatusQuery::ExecuteErrorHelper(const Stream::Ptr& stream)
665 Log(LogDebug, "LivestatusQuery")
666 << "ERROR: Code: '" << m_ErrorCode << "' Message: '" << m_ErrorMessage << "'.";
667 SendResponse(stream, m_ErrorCode, m_ErrorMessage);
670 void LivestatusQuery::SendResponse(const Stream::Ptr& stream, int code, const String& data)
672 if (m_ResponseHeader == "fixed16")
673 PrintFixed16(stream, code, data);
675 if (m_ResponseHeader == "fixed16" || code == LivestatusErrorOK) {
677 stream->Write(data.CStr(), data.GetLength());
678 } catch (const std::exception&) {
679 Log(LogCritical, "LivestatusQuery", "Cannot write to TCP socket.");
684 void LivestatusQuery::PrintFixed16(const Stream::Ptr& stream, int code, const String& data)
686 ASSERT(code >= 100 && code <= 999);
688 String sCode = Convert::ToString(code);
689 String sLength = Convert::ToString(static_cast<long>(data.GetLength()));
691 String header = sCode + String(16 - 3 - sLength.GetLength() - 1, ' ') + sLength + m_Separators[0];
694 stream->Write(header.CStr(), header.GetLength());
695 } catch (const std::exception&) {
696 Log(LogCritical, "LivestatusQuery", "Cannot write to TCP socket.");
700 bool LivestatusQuery::Execute(const Stream::Ptr& stream)
703 Log(LogNotice, "LivestatusQuery")
704 << "Executing livestatus query: " << m_Verb;
707 ExecuteGetHelper(stream);
708 else if (m_Verb == "COMMAND")
709 ExecuteCommandHelper(stream);
710 else if (m_Verb == "SCRIPT")
711 ExecuteScriptHelper(stream);
712 else if (m_Verb == "ERROR")
713 ExecuteErrorHelper(stream);
715 BOOST_THROW_EXCEPTION(std::runtime_error("Invalid livestatus query verb."));
716 } catch (const std::exception& ex) {
717 SendResponse(stream, LivestatusErrorQuery, DiagnosticInformation(ex));