1 /* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */
3 #include "icinga/clusterevents.hpp"
4 #include "icinga/icingaapplication.hpp"
5 #include "remote/apilistener.hpp"
6 #include "base/configuration.hpp"
7 #include "base/serializer.hpp"
8 #include "base/exception.hpp"
9 #include <boost/thread/once.hpp>
12 using namespace icinga;
14 boost::mutex ClusterEvents::m_Mutex;
15 std::deque<std::function<void ()>> ClusterEvents::m_CheckRequestQueue;
16 bool ClusterEvents::m_CheckSchedulerRunning;
17 int ClusterEvents::m_ChecksExecutedDuringInterval;
18 int ClusterEvents::m_ChecksDroppedDuringInterval;
19 Timer::Ptr ClusterEvents::m_LogTimer;
21 void ClusterEvents::RemoteCheckThreadProc()
23 Utility::SetThreadName("Remote Check Scheduler");
25 int maxConcurrentChecks = IcingaApplication::GetInstance()->GetMaxConcurrentChecks();
27 boost::mutex::scoped_lock lock(m_Mutex);
30 if (m_CheckRequestQueue.empty())
34 Checkable::AquirePendingCheckSlot(maxConcurrentChecks);
37 auto callback = m_CheckRequestQueue.front();
38 m_CheckRequestQueue.pop_front();
39 m_ChecksExecutedDuringInterval++;
43 Checkable::DecreasePendingChecks();
48 m_CheckSchedulerRunning = false;
51 void ClusterEvents::EnqueueCheck(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params)
53 static boost::once_flag once = BOOST_ONCE_INIT;
55 boost::call_once(once, []() {
56 m_LogTimer = new Timer();
57 m_LogTimer->SetInterval(10);
58 m_LogTimer->OnTimerExpired.connect(std::bind(ClusterEvents::LogRemoteCheckQueueInformation));
62 boost::mutex::scoped_lock lock(m_Mutex);
64 if (m_CheckRequestQueue.size() >= 25000) {
65 m_ChecksDroppedDuringInterval++;
69 m_CheckRequestQueue.push_back(std::bind(ClusterEvents::ExecuteCheckFromQueue, origin, params));
71 if (!m_CheckSchedulerRunning) {
72 std::thread t(ClusterEvents::RemoteCheckThreadProc);
74 m_CheckSchedulerRunning = true;
78 void ClusterEvents::ExecuteCheckFromQueue(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params) {
80 Endpoint::Ptr sourceEndpoint = origin->FromClient->GetEndpoint();
82 if (!sourceEndpoint || (origin->FromZone && !Zone::GetLocalZone()->IsChildOf(origin->FromZone))) {
83 Log(LogNotice, "ClusterEvents")
84 << "Discarding 'execute command' message from '" << origin->FromClient->GetIdentity() << "': Invalid endpoint origin (client not allowed).";
88 ApiListener::Ptr listener = ApiListener::GetInstance();
91 Log(LogCritical, "ApiListener", "No instance available.");
95 if (!listener->GetAcceptCommands()) {
96 Log(LogWarning, "ApiListener")
97 << "Ignoring command. '" << listener->GetName() << "' does not accept commands.";
99 Host::Ptr host = new Host();
100 Dictionary::Ptr attrs = new Dictionary();
102 attrs->Set("__name", params->Get("host"));
103 attrs->Set("type", "Host");
104 attrs->Set("enable_active_checks", false);
106 Deserialize(host, attrs, false, FAConfig);
108 if (params->Contains("service"))
109 host->SetExtension("agent_service_name", params->Get("service"));
111 CheckResult::Ptr cr = new CheckResult();
112 cr->SetState(ServiceUnknown);
113 cr->SetOutput("Endpoint '" + Endpoint::GetLocalEndpoint()->GetName() + "' does not accept commands.");
114 Dictionary::Ptr message = MakeCheckResultMessage(host, cr);
115 listener->SyncSendMessage(sourceEndpoint, message);
120 /* use a virtual host object for executing the command */
121 Host::Ptr host = new Host();
122 Dictionary::Ptr attrs = new Dictionary();
124 attrs->Set("__name", params->Get("host"));
125 attrs->Set("type", "Host");
127 Deserialize(host, attrs, false, FAConfig);
129 if (params->Contains("service"))
130 host->SetExtension("agent_service_name", params->Get("service"));
132 String command = params->Get("command");
133 String command_type = params->Get("command_type");
135 if (command_type == "check_command") {
136 if (!CheckCommand::GetByName(command)) {
137 CheckResult::Ptr cr = new CheckResult();
138 cr->SetState(ServiceUnknown);
139 cr->SetOutput("Check command '" + command + "' does not exist.");
140 Dictionary::Ptr message = MakeCheckResultMessage(host, cr);
141 listener->SyncSendMessage(sourceEndpoint, message);
144 } else if (command_type == "event_command") {
145 if (!EventCommand::GetByName(command)) {
146 Log(LogWarning, "ClusterEvents")
147 << "Event command '" << command << "' does not exist.";
153 attrs->Set(command_type, params->Get("command"));
154 attrs->Set("command_endpoint", sourceEndpoint->GetName());
156 Deserialize(host, attrs, false, FAConfig);
158 host->SetExtension("agent_check", true);
160 Dictionary::Ptr macros = params->Get("macros");
162 if (command_type == "check_command") {
164 host->ExecuteRemoteCheck(macros);
165 } catch (const std::exception& ex) {
166 CheckResult::Ptr cr = new CheckResult();
167 cr->SetState(ServiceUnknown);
169 String output = "Exception occurred while checking '" + host->GetName() + "': " + DiagnosticInformation(ex);
170 cr->SetOutput(output);
172 double now = Utility::GetTime();
173 cr->SetScheduleStart(now);
174 cr->SetScheduleEnd(now);
175 cr->SetExecutionStart(now);
176 cr->SetExecutionEnd(now);
178 Dictionary::Ptr message = MakeCheckResultMessage(host, cr);
179 listener->SyncSendMessage(sourceEndpoint, message);
181 Log(LogCritical, "checker", output);
183 } else if (command_type == "event_command") {
184 host->ExecuteEventHandler(macros, true);
188 int ClusterEvents::GetCheckRequestQueueSize()
190 return m_CheckRequestQueue.size();
193 void ClusterEvents::LogRemoteCheckQueueInformation() {
194 if (m_ChecksDroppedDuringInterval > 0) {
195 Log(LogCritical, "ClusterEvents")
196 << "Remote check queue ran out of slots. "
197 << m_ChecksDroppedDuringInterval << " checks dropped.";
198 m_ChecksDroppedDuringInterval = 0;
201 if (m_ChecksExecutedDuringInterval == 0)
204 Log(LogInformation, "RemoteCheckQueue")
205 << "items: " << m_CheckRequestQueue.size()
206 << ", rate: " << m_ChecksExecutedDuringInterval / 10 << "/s "
207 << "(" << m_ChecksExecutedDuringInterval * 6 << "/min "
208 << m_ChecksExecutedDuringInterval * 6 * 5 << "/5min "
209 << m_ChecksExecutedDuringInterval * 6 * 15 << "/15min" << ");";
211 m_ChecksExecutedDuringInterval = 0;