From 768899460191fd783215f5237b093941fb9829a2 Mon Sep 17 00:00:00 2001 From: "Alexander A. Klimov" Date: Fri, 5 Apr 2019 17:02:42 +0200 Subject: [PATCH] Implement new event queue for ASIO consumers --- lib/remote/eventqueue.cpp | 203 ++++++++++++++++++++++++++++++++++++++ lib/remote/eventqueue.hpp | 105 ++++++++++++++++++++ 2 files changed, 308 insertions(+) diff --git a/lib/remote/eventqueue.cpp b/lib/remote/eventqueue.cpp index 1c273db51..1865a39fa 100644 --- a/lib/remote/eventqueue.cpp +++ b/lib/remote/eventqueue.cpp @@ -1,5 +1,6 @@ /* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */ +#include "config/configcompiler.hpp" #include "remote/eventqueue.hpp" #include "remote/filterutility.hpp" #include "base/io-engine.hpp" @@ -7,6 +8,10 @@ #include "base/logger.hpp" #include "base/utility.hpp" #include +#include +#include +#include +#include using namespace icinga; @@ -168,3 +173,201 @@ EventQueueRegistry *EventQueueRegistry::GetInstance() { return Singleton::GetInstance(); } + +std::mutex EventsInbox::m_FiltersMutex; +std::map EventsInbox::m_Filters; + +EventsRouter EventsRouter::m_Instance; + +EventsInbox::EventsInbox(String filter, const String& filterSource) + : m_Timer(IoEngine::Get().GetIoService()) +{ + std::unique_lock lock (m_FiltersMutex); + m_Filter = m_Filters.find(filter); + + if (m_Filter == m_Filters.end()) { + lock.unlock(); + + auto expr (ConfigCompiler::CompileText(filterSource, filter)); + + lock.lock(); + + m_Filter = m_Filters.find(filter); + + if (m_Filter == m_Filters.end()) { + m_Filter = m_Filters.emplace(std::move(filter), Filter{1, std::shared_ptr(expr.release())}).first; + } else { + ++m_Filter->second.Refs; + } + } else { + ++m_Filter->second.Refs; + } +} + +EventsInbox::~EventsInbox() +{ + std::unique_lock lock (m_FiltersMutex); + + if (!--m_Filter->second.Refs) { + m_Filters.erase(m_Filter); + } +} + +const std::shared_ptr& EventsInbox::GetFilter() +{ + return m_Filter->second.Expr; +} + +void EventsInbox::Push(Dictionary::Ptr event) +{ + std::unique_lock lock (m_Mutex); + + m_Queue.emplace(std::move(event)); + m_Timer.expires_at(boost::posix_time::neg_infin); +} + +Dictionary::Ptr EventsInbox::Shift(boost::asio::yield_context yc, double timeout) +{ + std::unique_lock lock (m_Mutex, std::defer_lock); + + m_Timer.expires_at(boost::posix_time::neg_infin); + + { + boost::system::error_code ec; + + while (!lock.try_lock()) { + m_Timer.async_wait(yc[ec]); + } + } + + if (m_Queue.empty()) { + m_Timer.expires_from_now(boost::posix_time::milliseconds((unsigned long)(timeout * 1000.0))); + lock.unlock(); + + { + boost::system::error_code ec; + m_Timer.async_wait(yc[ec]); + + while (!lock.try_lock()) { + m_Timer.async_wait(yc[ec]); + } + } + + if (m_Queue.empty()) { + return nullptr; + } + } + + auto event (std::move(m_Queue.front())); + m_Queue.pop(); + return std::move(event); +} + +EventsSubscriber::EventsSubscriber(std::set types, String filter, const String& filterSource) + : m_Types(std::move(types)), m_Inbox(new EventsInbox(std::move(filter), filterSource)) +{ + EventsRouter::GetInstance().Subscribe(m_Types, m_Inbox); +} + +EventsSubscriber::~EventsSubscriber() +{ + EventsRouter::GetInstance().Unsubscribe(m_Types, m_Inbox); +} + +EventsFilter::EventsFilter(std::map, std::set> inboxes) + : m_Inboxes(std::move(inboxes)) +{ +} + +EventsFilter::operator bool() +{ + return !m_Inboxes.empty(); +} + +void EventsFilter::Push(Dictionary::Ptr event) +{ + for (auto& perFilter : m_Inboxes) { + ScriptFrame frame(true); + frame.Sandboxed = true; + + try { + if (!FilterUtility::EvaluateFilter(frame, perFilter.first.get(), event, "event")) { + continue; + } + } catch (const std::exception& ex) { + Log(LogWarning, "EventQueue") + << "Error occurred while evaluating event filter for queue: " << DiagnosticInformation(ex); + continue; + } + + for (auto& inbox : perFilter.second) { + inbox->Push(event); + } + } +} + +EventsRouter& EventsRouter::GetInstance() +{ + return m_Instance; +} + +void EventsRouter::Subscribe(const std::set& types, const EventsInbox::Ptr& inbox) +{ + const auto& filter (inbox->GetFilter()); + std::unique_lock lock (m_Mutex); + + for (auto type : types) { + auto perType (m_Subscribers.find(type)); + + if (perType == m_Subscribers.end()) { + perType = m_Subscribers.emplace(type, decltype(perType->second)()).first; + } + + auto perFilter (perType->second.find(filter)); + + if (perFilter == perType->second.end()) { + perFilter = perType->second.emplace(filter, decltype(perFilter->second)()).first; + } + + perFilter->second.emplace(inbox); + } +} + +void EventsRouter::Unsubscribe(const std::set& types, const EventsInbox::Ptr& inbox) +{ + const auto& filter (inbox->GetFilter()); + std::unique_lock lock (m_Mutex); + + for (auto type : types) { + auto perType (m_Subscribers.find(type)); + + if (perType != m_Subscribers.end()) { + auto perFilter (perType->second.find(filter)); + + if (perFilter != perType->second.end()) { + perFilter->second.erase(inbox); + + if (perFilter->second.empty()) { + perType->second.erase(perFilter); + } + } + + if (perType->second.empty()) { + m_Subscribers.erase(perType); + } + } + } +} + +EventsFilter EventsRouter::GetInboxes(EventType type) +{ + std::unique_lock lock (m_Mutex); + + auto perType (m_Subscribers.find(type)); + + if (perType == m_Subscribers.end()) { + return EventsFilter({}); + } + + return EventsFilter(perType->second); +} diff --git a/lib/remote/eventqueue.hpp b/lib/remote/eventqueue.hpp index 1a53baabb..426f9e9a5 100644 --- a/lib/remote/eventqueue.hpp +++ b/lib/remote/eventqueue.hpp @@ -6,12 +6,17 @@ #include "remote/httphandler.hpp" #include "base/object.hpp" #include "config/expression.hpp" +#include #include #include #include +#include +#include +#include #include #include #include +#include namespace icinga { @@ -64,6 +69,106 @@ public: static EventQueueRegistry *GetInstance(); }; +enum class EventType : uint_fast8_t +{ + AcknowledgementCleared, + AcknowledgementSet, + CheckResult, + CommentAdded, + CommentRemoved, + DowntimeAdded, + DowntimeRemoved, + DowntimeStarted, + DowntimeTriggered, + Flapping, + Notification, + StateChange +}; + +class EventsInbox : public Object +{ +public: + DECLARE_PTR_TYPEDEFS(EventsInbox); + + EventsInbox(String filter, const String& filterSource); + EventsInbox(const EventsInbox&) = delete; + EventsInbox(EventsInbox&&) = delete; + EventsInbox& operator=(const EventsInbox&) = delete; + EventsInbox& operator=(EventsInbox&&) = delete; + ~EventsInbox(); + + const std::shared_ptr& GetFilter(); + + void Push(Dictionary::Ptr event); + Dictionary::Ptr Shift(boost::asio::yield_context yc, double timeout = 5); + +private: + struct Filter + { + std::size_t Refs; + std::shared_ptr Expr; + }; + + static std::mutex m_FiltersMutex; + static std::map m_Filters; + + std::mutex m_Mutex; + decltype(m_Filters.begin()) m_Filter; + std::queue m_Queue; + boost::asio::deadline_timer m_Timer; +}; + +class EventsSubscriber +{ +public: + EventsSubscriber(std::set types, String filter, const String& filterSource); + EventsSubscriber(const EventsSubscriber&) = delete; + EventsSubscriber(EventsSubscriber&&) = delete; + EventsSubscriber& operator=(const EventsSubscriber&) = delete; + EventsSubscriber& operator=(EventsSubscriber&&) = delete; + ~EventsSubscriber(); + +private: + std::set m_Types; + EventsInbox::Ptr m_Inbox; +}; + +class EventsFilter +{ +public: + EventsFilter(std::map, std::set> inboxes); + + operator bool(); + + void Push(Dictionary::Ptr event); + +private: + std::map, std::set> m_Inboxes; +}; + +class EventsRouter +{ +public: + static EventsRouter& GetInstance(); + + void Subscribe(const std::set& types, const EventsInbox::Ptr& inbox); + void Unsubscribe(const std::set& types, const EventsInbox::Ptr& inbox); + EventsFilter GetInboxes(EventType type); + +private: + static EventsRouter m_Instance; + + EventsRouter() = default; + EventsRouter(const EventsRouter&) = delete; + EventsRouter(EventsRouter&&) = delete; + EventsRouter& operator=(const EventsRouter&) = delete; + EventsRouter& operator=(EventsRouter&&) = delete; + ~EventsRouter() = default; + + std::mutex m_Mutex; + std::map, std::set>> m_Subscribers; +}; + } #endif /* EVENTQUEUE_H */ -- 2.40.0