From: Noah Hilverling Date: Tue, 16 Jan 2018 09:40:08 +0000 (+0100) Subject: Implement concurrent checks limit for remote checks X-Git-Tag: v2.9.0~173^2~4 X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=e28277175b6e392860c43ec9b06e9cae8c317313;p=icinga2 Implement concurrent checks limit for remote checks fixes #4841 --- diff --git a/lib/icinga/CMakeLists.txt b/lib/icinga/CMakeLists.txt index 26d7d7b72..6f9f14d7e 100644 --- a/lib/icinga/CMakeLists.txt +++ b/lib/icinga/CMakeLists.txt @@ -49,7 +49,7 @@ set(icinga_SOURCES checkcommand.cpp checkcommand.hpp checkcommand-ti.hpp checkresult.cpp checkresult.hpp checkresult-ti.hpp cib.cpp cib.hpp - clusterevents.cpp clusterevents.hpp + clusterevents.cpp clusterevents.hpp clusterevents-check.cpp command.cpp command.hpp command-ti.hpp comment.cpp comment.hpp comment-ti.hpp compatutility.cpp compatutility.hpp diff --git a/lib/icinga/checkable-check.cpp b/lib/icinga/checkable-check.cpp index 5d658d3b0..8bf1fed74 100644 --- a/lib/icinga/checkable-check.cpp +++ b/lib/icinga/checkable-check.cpp @@ -42,6 +42,7 @@ boost::signals2::signal Checkable::OnNextCheckUpda boost::mutex Checkable::m_StatsMutex; int Checkable::m_PendingChecks = 0; +boost::condition_variable Checkable::m_PendingChecksCV; CheckCommand::Ptr Checkable::GetCheckCommand() const { @@ -544,6 +545,7 @@ void Checkable::DecreasePendingChecks() { boost::mutex::scoped_lock lock(m_StatsMutex); m_PendingChecks--; + m_PendingChecksCV.notify_one(); } int Checkable::GetPendingChecks() @@ -551,3 +553,12 @@ int Checkable::GetPendingChecks() boost::mutex::scoped_lock lock(m_StatsMutex); return m_PendingChecks; } + +void Checkable::AquirePendingCheckSlot(int maxPendingChecks) +{ + boost::mutex::scoped_lock lock(m_StatsMutex); + while (m_PendingChecks >= maxPendingChecks) + m_PendingChecksCV.wait(lock); + + m_PendingChecks++; +} diff --git a/lib/icinga/checkable.hpp b/lib/icinga/checkable.hpp index 24582389d..a71487e54 100644 --- a/lib/icinga/checkable.hpp +++ b/lib/icinga/checkable.hpp @@ -197,6 +197,7 @@ public: static void IncreasePendingChecks(); static void DecreasePendingChecks(); static int GetPendingChecks(); + static void AquirePendingCheckSlot(int maxPendingChecks); static Object::Ptr GetPrototype(); @@ -211,6 +212,7 @@ private: static boost::mutex m_StatsMutex; static int m_PendingChecks; + static boost::condition_variable m_PendingChecksCV; /* Downtimes */ std::set m_Downtimes; diff --git a/lib/icinga/clusterevents-check.cpp b/lib/icinga/clusterevents-check.cpp new file mode 100644 index 000000000..41e2be237 --- /dev/null +++ b/lib/icinga/clusterevents-check.cpp @@ -0,0 +1,186 @@ +/****************************************************************************** + * Icinga 2 * + * Copyright (C) 2012-2018 Icinga Development Team (https://www.icinga.com/) * + * * + * This program is free software; you can redistribute it and/or * + * modify it under the terms of the GNU General Public License * + * as published by the Free Software Foundation; either version 2 * + * of the License, or (at your option) any later version. * + * * + * This program is distributed in the hope that it will be useful, * + * but WITHOUT ANY WARRANTY; without even the implied warranty of * + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * + * GNU General Public License for more details. * + * * + * You should have received a copy of the GNU General Public License * + * along with this program; if not, write to the Free Software Foundation * + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. * + ******************************************************************************/ + +#include "icinga/clusterevents.hpp" +#include "remote/apilistener.hpp" +#include "base/serializer.hpp" +#include "base/exception.hpp" +#include + +using namespace icinga; + +boost::mutex ClusterEvents::m_Mutex; +std::deque> ClusterEvents::m_CheckRequestQueue; +bool ClusterEvents::m_CheckSchedulerRunning; + +void ClusterEvents::RemoteCheckThreadProc() +{ + Utility::SetThreadName("Remote Check Scheduler"); + + boost::mutex::scoped_lock lock(m_Mutex); + + for(;;) { + if (m_CheckRequestQueue.empty()) + break; + + lock.unlock(); + Checkable::AquirePendingCheckSlot(Application::GetMaxConcurrentChecks()); + lock.lock(); + + auto callback = m_CheckRequestQueue.front(); + m_CheckRequestQueue.pop_front(); + lock.unlock(); + + callback(); + Checkable::DecreasePendingChecks(); + + lock.lock(); + } + + m_CheckSchedulerRunning = false; +} + +void ClusterEvents::EnqueueCheck(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params) +{ + boost::mutex::scoped_lock lock(m_Mutex); + + if (m_CheckRequestQueue.size() >= 25000) { + Log(LogCritical, "ClusterEvents", "Remote check queue ran out of slots. Discarding remote check request."); + return; + } + + m_CheckRequestQueue.push_back(std::bind(ClusterEvents::ExecuteCheckFromQueue, origin, params)); + + if (!m_CheckSchedulerRunning) { + std::thread t(ClusterEvents::RemoteCheckThreadProc); + t.detach(); + m_CheckSchedulerRunning = true; + } +} + +void ClusterEvents::ExecuteCheckFromQueue(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params) { + + Endpoint::Ptr sourceEndpoint = origin->FromClient->GetEndpoint(); + + if (!sourceEndpoint || (origin->FromZone && !Zone::GetLocalZone()->IsChildOf(origin->FromZone))) { + Log(LogNotice, "ClusterEvents") + << "Discarding 'execute command' message from '" << origin->FromClient->GetIdentity() << "': Invalid endpoint origin (client not allowed)."; + return; + } + + ApiListener::Ptr listener = ApiListener::GetInstance(); + + if (!listener) { + Log(LogCritical, "ApiListener", "No instance available."); + return; + } + + if (!listener->GetAcceptCommands()) { + Log(LogWarning, "ApiListener") + << "Ignoring command. '" << listener->GetName() << "' does not accept commands."; + + Host::Ptr host = new Host(); + Dictionary::Ptr attrs = new Dictionary(); + + attrs->Set("__name", params->Get("host")); + attrs->Set("type", "Host"); + attrs->Set("enable_active_checks", false); + + Deserialize(host, attrs, false, FAConfig); + + if (params->Contains("service")) + host->SetExtension("agent_service_name", params->Get("service")); + + CheckResult::Ptr cr = new CheckResult(); + cr->SetState(ServiceUnknown); + cr->SetOutput("Endpoint '" + Endpoint::GetLocalEndpoint()->GetName() + "' does not accept commands."); + Dictionary::Ptr message = MakeCheckResultMessage(host, cr); + listener->SyncSendMessage(sourceEndpoint, message); + + return; + } + + /* use a virtual host object for executing the command */ + Host::Ptr host = new Host(); + Dictionary::Ptr attrs = new Dictionary(); + + attrs->Set("__name", params->Get("host")); + attrs->Set("type", "Host"); + + Deserialize(host, attrs, false, FAConfig); + + if (params->Contains("service")) + host->SetExtension("agent_service_name", params->Get("service")); + + String command = params->Get("command"); + String command_type = params->Get("command_type"); + + if (command_type == "check_command") { + if (!CheckCommand::GetByName(command)) { + CheckResult::Ptr cr = new CheckResult(); + cr->SetState(ServiceUnknown); + cr->SetOutput("Check command '" + command + "' does not exist."); + Dictionary::Ptr message = MakeCheckResultMessage(host, cr); + listener->SyncSendMessage(sourceEndpoint, message); + return; + } + } else if (command_type == "event_command") { + if (!EventCommand::GetByName(command)) { + Log(LogWarning, "ClusterEvents") + << "Event command '" << command << "' does not exist."; + return; + } + } else + return; + + attrs->Set(command_type, params->Get("command")); + attrs->Set("command_endpoint", sourceEndpoint->GetName()); + + Deserialize(host, attrs, false, FAConfig); + + host->SetExtension("agent_check", true); + + Dictionary::Ptr macros = params->Get("macros"); + + if (command_type == "check_command") { + try { + host->ExecuteRemoteCheck(macros); + } catch (const std::exception& ex) { + CheckResult::Ptr cr = new CheckResult(); + cr->SetState(ServiceUnknown); + + String output = "Exception occured while checking '" + host->GetName() + "': " + DiagnosticInformation(ex); + cr->SetOutput(output); + + double now = Utility::GetTime(); + cr->SetScheduleStart(now); + cr->SetScheduleEnd(now); + cr->SetExecutionStart(now); + cr->SetExecutionEnd(now); + + Dictionary::Ptr message = MakeCheckResultMessage(host, cr); + listener->SyncSendMessage(sourceEndpoint, message); + + Log(LogCritical, "checker", output); + } + } else if (command_type == "event_command") { + host->ExecuteEventHandler(macros, true); + } +} + diff --git a/lib/icinga/clusterevents.cpp b/lib/icinga/clusterevents.cpp index 013bae284..9ae894db1 100644 --- a/lib/icinga/clusterevents.cpp +++ b/lib/icinga/clusterevents.cpp @@ -578,112 +578,7 @@ Value ClusterEvents::AcknowledgementClearedAPIHandler(const MessageOrigin::Ptr& Value ClusterEvents::ExecuteCommandAPIHandler(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params) { - Endpoint::Ptr sourceEndpoint = origin->FromClient->GetEndpoint(); - - if (!sourceEndpoint || (origin->FromZone && !Zone::GetLocalZone()->IsChildOf(origin->FromZone))) { - Log(LogNotice, "ClusterEvents") - << "Discarding 'execute command' message from '" << origin->FromClient->GetIdentity() << "': Invalid endpoint origin (client not allowed)."; - return Empty; - } - - ApiListener::Ptr listener = ApiListener::GetInstance(); - - if (!listener) { - Log(LogCritical, "ApiListener", "No instance available."); - return Empty; - } - - if (!listener->GetAcceptCommands()) { - Log(LogWarning, "ApiListener") - << "Ignoring command. '" << listener->GetName() << "' does not accept commands."; - - Host::Ptr host = new Host(); - Dictionary::Ptr attrs = new Dictionary(); - - attrs->Set("__name", params->Get("host")); - attrs->Set("type", "Host"); - attrs->Set("enable_active_checks", false); - - Deserialize(host, attrs, false, FAConfig); - - if (params->Contains("service")) - host->SetExtension("agent_service_name", params->Get("service")); - - CheckResult::Ptr cr = new CheckResult(); - cr->SetState(ServiceUnknown); - cr->SetOutput("Endpoint '" + Endpoint::GetLocalEndpoint()->GetName() + "' does not accept commands."); - Dictionary::Ptr message = MakeCheckResultMessage(host, cr); - listener->SyncSendMessage(sourceEndpoint, message); - - return Empty; - } - - /* use a virtual host object for executing the command */ - Host::Ptr host = new Host(); - Dictionary::Ptr attrs = new Dictionary(); - - attrs->Set("__name", params->Get("host")); - attrs->Set("type", "Host"); - - Deserialize(host, attrs, false, FAConfig); - - if (params->Contains("service")) - host->SetExtension("agent_service_name", params->Get("service")); - - String command = params->Get("command"); - String command_type = params->Get("command_type"); - - if (command_type == "check_command") { - if (!CheckCommand::GetByName(command)) { - CheckResult::Ptr cr = new CheckResult(); - cr->SetState(ServiceUnknown); - cr->SetOutput("Check command '" + command + "' does not exist."); - Dictionary::Ptr message = MakeCheckResultMessage(host, cr); - listener->SyncSendMessage(sourceEndpoint, message); - return Empty; - } - } else if (command_type == "event_command") { - if (!EventCommand::GetByName(command)) { - Log(LogWarning, "ClusterEvents") - << "Event command '" << command << "' does not exist."; - return Empty; - } - } else - return Empty; - - attrs->Set(command_type, params->Get("command")); - attrs->Set("command_endpoint", sourceEndpoint->GetName()); - - Deserialize(host, attrs, false, FAConfig); - - host->SetExtension("agent_check", true); - - Dictionary::Ptr macros = params->Get("macros"); - - if (command_type == "check_command") { - try { - host->ExecuteRemoteCheck(macros); - } catch (const std::exception& ex) { - CheckResult::Ptr cr = new CheckResult(); - cr->SetState(ServiceUnknown); - - String output = "Exception occured while checking '" + host->GetName() + "': " + DiagnosticInformation(ex); - cr->SetOutput(output); - - double now = Utility::GetTime(); - cr->SetScheduleStart(now); - cr->SetScheduleEnd(now); - cr->SetExecutionStart(now); - cr->SetExecutionEnd(now); - - Dictionary::Ptr message = MakeCheckResultMessage(host, cr); - listener->SyncSendMessage(sourceEndpoint, message); - - Log(LogCritical, "checker", output); - } - } else if (command_type == "event_command") { - host->ExecuteEventHandler(macros, true); - } + EnqueueCheck(origin, params); return Empty; } diff --git a/lib/icinga/clusterevents.hpp b/lib/icinga/clusterevents.hpp index 2f474a0d0..d712b7fb8 100644 --- a/lib/icinga/clusterevents.hpp +++ b/lib/icinga/clusterevents.hpp @@ -74,6 +74,15 @@ public: static void NotificationSentToAllUsersHandler(const Notification::Ptr& notification, const Checkable::Ptr& checkable, const std::set& users, NotificationType notificationType, const CheckResult::Ptr& cr, const String& author, const String& commentText, const MessageOrigin::Ptr& origin); static Value NotificationSentToAllUsersAPIHandler(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params); + +private: + static boost::mutex m_Mutex; + static std::deque> m_CheckRequestQueue; + static bool m_CheckSchedulerRunning; + + static void RemoteCheckThreadProc(); + static void EnqueueCheck(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params); + static void ExecuteCheckFromQueue(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params); }; }