]> granicus.if.org Git - icinga2/blob - lib/icinga/clusterevents-check.cpp
Fix spelling errors.
[icinga2] / lib / icinga / clusterevents-check.cpp
1 /* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */
2
3 #include "icinga/clusterevents.hpp"
4 #include "icinga/icingaapplication.hpp"
5 #include "remote/apilistener.hpp"
6 #include "base/configuration.hpp"
7 #include "base/serializer.hpp"
8 #include "base/exception.hpp"
9 #include <boost/thread/once.hpp>
10 #include <thread>
11
12 using namespace icinga;
13
14 boost::mutex ClusterEvents::m_Mutex;
15 std::deque<std::function<void ()>> ClusterEvents::m_CheckRequestQueue;
16 bool ClusterEvents::m_CheckSchedulerRunning;
17 int ClusterEvents::m_ChecksExecutedDuringInterval;
18 int ClusterEvents::m_ChecksDroppedDuringInterval;
19 Timer::Ptr ClusterEvents::m_LogTimer;
20
21 void ClusterEvents::RemoteCheckThreadProc()
22 {
23         Utility::SetThreadName("Remote Check Scheduler");
24
25         int maxConcurrentChecks = IcingaApplication::GetInstance()->GetMaxConcurrentChecks();
26
27         boost::mutex::scoped_lock lock(m_Mutex);
28
29         for(;;) {
30                 if (m_CheckRequestQueue.empty())
31                         break;
32
33                 lock.unlock();
34                 Checkable::AquirePendingCheckSlot(maxConcurrentChecks);
35                 lock.lock();
36
37                 auto callback = m_CheckRequestQueue.front();
38                 m_CheckRequestQueue.pop_front();
39                 m_ChecksExecutedDuringInterval++;
40                 lock.unlock();
41
42                 callback();
43                 Checkable::DecreasePendingChecks();
44
45                 lock.lock();
46         }
47
48         m_CheckSchedulerRunning = false;
49 }
50
51 void ClusterEvents::EnqueueCheck(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params)
52 {
53         static boost::once_flag once = BOOST_ONCE_INIT;
54
55         boost::call_once(once, []() {
56                 m_LogTimer = new Timer();
57                 m_LogTimer->SetInterval(10);
58                 m_LogTimer->OnTimerExpired.connect(std::bind(ClusterEvents::LogRemoteCheckQueueInformation));
59                 m_LogTimer->Start();
60         });
61
62         boost::mutex::scoped_lock lock(m_Mutex);
63
64         if (m_CheckRequestQueue.size() >= 25000) {
65                 m_ChecksDroppedDuringInterval++;
66                 return;
67         }
68
69         m_CheckRequestQueue.push_back(std::bind(ClusterEvents::ExecuteCheckFromQueue, origin, params));
70
71         if (!m_CheckSchedulerRunning) {
72                 std::thread t(ClusterEvents::RemoteCheckThreadProc);
73                 t.detach();
74                 m_CheckSchedulerRunning = true;
75         }
76 }
77
78 void ClusterEvents::ExecuteCheckFromQueue(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params) {
79
80         Endpoint::Ptr sourceEndpoint = origin->FromClient->GetEndpoint();
81
82         if (!sourceEndpoint || (origin->FromZone && !Zone::GetLocalZone()->IsChildOf(origin->FromZone))) {
83                 Log(LogNotice, "ClusterEvents")
84                                 << "Discarding 'execute command' message from '" << origin->FromClient->GetIdentity() << "': Invalid endpoint origin (client not allowed).";
85                 return;
86         }
87
88         ApiListener::Ptr listener = ApiListener::GetInstance();
89
90         if (!listener) {
91                 Log(LogCritical, "ApiListener", "No instance available.");
92                 return;
93         }
94
95         if (!listener->GetAcceptCommands()) {
96                 Log(LogWarning, "ApiListener")
97                                 << "Ignoring command. '" << listener->GetName() << "' does not accept commands.";
98
99                 Host::Ptr host = new Host();
100                 Dictionary::Ptr attrs = new Dictionary();
101
102                 attrs->Set("__name", params->Get("host"));
103                 attrs->Set("type", "Host");
104                 attrs->Set("enable_active_checks", false);
105
106                 Deserialize(host, attrs, false, FAConfig);
107
108                 if (params->Contains("service"))
109                         host->SetExtension("agent_service_name", params->Get("service"));
110
111                 CheckResult::Ptr cr = new CheckResult();
112                 cr->SetState(ServiceUnknown);
113                 cr->SetOutput("Endpoint '" + Endpoint::GetLocalEndpoint()->GetName() + "' does not accept commands.");
114                 Dictionary::Ptr message = MakeCheckResultMessage(host, cr);
115                 listener->SyncSendMessage(sourceEndpoint, message);
116
117                 return;
118         }
119
120         /* use a virtual host object for executing the command */
121         Host::Ptr host = new Host();
122         Dictionary::Ptr attrs = new Dictionary();
123
124         attrs->Set("__name", params->Get("host"));
125         attrs->Set("type", "Host");
126
127         Deserialize(host, attrs, false, FAConfig);
128
129         if (params->Contains("service"))
130                 host->SetExtension("agent_service_name", params->Get("service"));
131
132         String command = params->Get("command");
133         String command_type = params->Get("command_type");
134
135         if (command_type == "check_command") {
136                 if (!CheckCommand::GetByName(command)) {
137                         CheckResult::Ptr cr = new CheckResult();
138                         cr->SetState(ServiceUnknown);
139                         cr->SetOutput("Check command '" + command + "' does not exist.");
140                         Dictionary::Ptr message = MakeCheckResultMessage(host, cr);
141                         listener->SyncSendMessage(sourceEndpoint, message);
142                         return;
143                 }
144         } else if (command_type == "event_command") {
145                 if (!EventCommand::GetByName(command)) {
146                         Log(LogWarning, "ClusterEvents")
147                                         << "Event command '" << command << "' does not exist.";
148                         return;
149                 }
150         } else
151                 return;
152
153         attrs->Set(command_type, params->Get("command"));
154         attrs->Set("command_endpoint", sourceEndpoint->GetName());
155
156         Deserialize(host, attrs, false, FAConfig);
157
158         host->SetExtension("agent_check", true);
159
160         Dictionary::Ptr macros = params->Get("macros");
161
162         if (command_type == "check_command") {
163                 try {
164                         host->ExecuteRemoteCheck(macros);
165                 } catch (const std::exception& ex) {
166                         CheckResult::Ptr cr = new CheckResult();
167                         cr->SetState(ServiceUnknown);
168
169                         String output = "Exception occurred while checking '" + host->GetName() + "': " + DiagnosticInformation(ex);
170                         cr->SetOutput(output);
171
172                         double now = Utility::GetTime();
173                         cr->SetScheduleStart(now);
174                         cr->SetScheduleEnd(now);
175                         cr->SetExecutionStart(now);
176                         cr->SetExecutionEnd(now);
177
178                         Dictionary::Ptr message = MakeCheckResultMessage(host, cr);
179                         listener->SyncSendMessage(sourceEndpoint, message);
180
181                         Log(LogCritical, "checker", output);
182                 }
183         } else if (command_type == "event_command") {
184                 host->ExecuteEventHandler(macros, true);
185         }
186 }
187
188 int ClusterEvents::GetCheckRequestQueueSize()
189 {
190         return m_CheckRequestQueue.size();
191 }
192
193 void ClusterEvents::LogRemoteCheckQueueInformation() {
194         if (m_ChecksDroppedDuringInterval > 0) {
195                 Log(LogCritical, "ClusterEvents")
196                         << "Remote check queue ran out of slots. "
197                         << m_ChecksDroppedDuringInterval << " checks dropped.";
198                 m_ChecksDroppedDuringInterval = 0;
199         }
200
201         if (m_ChecksExecutedDuringInterval == 0)
202                 return;
203
204         Log(LogInformation, "RemoteCheckQueue")
205                 << "items: " << m_CheckRequestQueue.size()
206                 << ", rate: " << m_ChecksExecutedDuringInterval / 10 << "/s "
207                 << "(" << m_ChecksExecutedDuringInterval * 6 << "/min "
208                 << m_ChecksExecutedDuringInterval * 6 * 5 << "/5min "
209                 << m_ChecksExecutedDuringInterval * 6 * 15 << "/15min" << ");";
210
211         m_ChecksExecutedDuringInterval = 0;
212 }