]> granicus.if.org Git - icinga2/commitdiff
Implement new event queue for ASIO consumers
authorAlexander A. Klimov <alexander.klimov@icinga.com>
Fri, 5 Apr 2019 15:02:42 +0000 (17:02 +0200)
committerMichael Friedrich <michael.friedrich@icinga.com>
Thu, 25 Apr 2019 13:56:38 +0000 (15:56 +0200)
lib/remote/eventqueue.cpp
lib/remote/eventqueue.hpp

index 1c273db5193353b722ae93d19a7877b6d6aa5c6c..1865a39fae44e22cd98494a095806b78886fd9cf 100644 (file)
@@ -1,5 +1,6 @@
 /* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */
 
+#include "config/configcompiler.hpp"
 #include "remote/eventqueue.hpp"
 #include "remote/filterutility.hpp"
 #include "base/io-engine.hpp"
@@ -7,6 +8,10 @@
 #include "base/logger.hpp"
 #include "base/utility.hpp"
 #include <boost/asio/spawn.hpp>
+#include <boost/date_time/posix_time/posix_time_duration.hpp>
+#include <boost/date_time/posix_time/ptime.hpp>
+#include <boost/system/error_code.hpp>
+#include <utility>
 
 using namespace icinga;
 
@@ -168,3 +173,201 @@ EventQueueRegistry *EventQueueRegistry::GetInstance()
 {
        return Singleton<EventQueueRegistry>::GetInstance();
 }
+
+std::mutex EventsInbox::m_FiltersMutex;
+std::map<String, EventsInbox::Filter> EventsInbox::m_Filters;
+
+EventsRouter EventsRouter::m_Instance;
+
+EventsInbox::EventsInbox(String filter, const String& filterSource)
+       : m_Timer(IoEngine::Get().GetIoService())
+{
+       std::unique_lock<std::mutex> lock (m_FiltersMutex);
+       m_Filter = m_Filters.find(filter);
+
+       if (m_Filter == m_Filters.end()) {
+               lock.unlock();
+
+               auto expr (ConfigCompiler::CompileText(filterSource, filter));
+
+               lock.lock();
+
+               m_Filter = m_Filters.find(filter);
+
+               if (m_Filter == m_Filters.end()) {
+                       m_Filter = m_Filters.emplace(std::move(filter), Filter{1, std::shared_ptr<Expression>(expr.release())}).first;
+               } else {
+                       ++m_Filter->second.Refs;
+               }
+       } else {
+               ++m_Filter->second.Refs;
+       }
+}
+
+EventsInbox::~EventsInbox()
+{
+       std::unique_lock<std::mutex> lock (m_FiltersMutex);
+
+       if (!--m_Filter->second.Refs) {
+               m_Filters.erase(m_Filter);
+       }
+}
+
+const std::shared_ptr<Expression>& EventsInbox::GetFilter()
+{
+       return m_Filter->second.Expr;
+}
+
+void EventsInbox::Push(Dictionary::Ptr event)
+{
+       std::unique_lock<std::mutex> lock (m_Mutex);
+
+       m_Queue.emplace(std::move(event));
+       m_Timer.expires_at(boost::posix_time::neg_infin);
+}
+
+Dictionary::Ptr EventsInbox::Shift(boost::asio::yield_context yc, double timeout)
+{
+       std::unique_lock<std::mutex> lock (m_Mutex, std::defer_lock);
+
+       m_Timer.expires_at(boost::posix_time::neg_infin);
+
+       {
+               boost::system::error_code ec;
+
+               while (!lock.try_lock()) {
+                       m_Timer.async_wait(yc[ec]);
+               }
+       }
+
+       if (m_Queue.empty()) {
+               m_Timer.expires_from_now(boost::posix_time::milliseconds((unsigned long)(timeout * 1000.0)));
+               lock.unlock();
+
+               {
+                       boost::system::error_code ec;
+                       m_Timer.async_wait(yc[ec]);
+
+                       while (!lock.try_lock()) {
+                               m_Timer.async_wait(yc[ec]);
+                       }
+               }
+
+               if (m_Queue.empty()) {
+                       return nullptr;
+               }
+       }
+
+       auto event (std::move(m_Queue.front()));
+       m_Queue.pop();
+       return std::move(event);
+}
+
+EventsSubscriber::EventsSubscriber(std::set<EventType> types, String filter, const String& filterSource)
+       : m_Types(std::move(types)), m_Inbox(new EventsInbox(std::move(filter), filterSource))
+{
+       EventsRouter::GetInstance().Subscribe(m_Types, m_Inbox);
+}
+
+EventsSubscriber::~EventsSubscriber()
+{
+       EventsRouter::GetInstance().Unsubscribe(m_Types, m_Inbox);
+}
+
+EventsFilter::EventsFilter(std::map<std::shared_ptr<Expression>, std::set<EventsInbox::Ptr>> inboxes)
+       : m_Inboxes(std::move(inboxes))
+{
+}
+
+EventsFilter::operator bool()
+{
+       return !m_Inboxes.empty();
+}
+
+void EventsFilter::Push(Dictionary::Ptr event)
+{
+       for (auto& perFilter : m_Inboxes) {
+               ScriptFrame frame(true);
+               frame.Sandboxed = true;
+
+               try {
+                       if (!FilterUtility::EvaluateFilter(frame, perFilter.first.get(), event, "event")) {
+                               continue;
+                       }
+               } catch (const std::exception& ex) {
+                       Log(LogWarning, "EventQueue")
+                               << "Error occurred while evaluating event filter for queue: " << DiagnosticInformation(ex);
+                       continue;
+               }
+
+               for (auto& inbox : perFilter.second) {
+                       inbox->Push(event);
+               }
+       }
+}
+
+EventsRouter& EventsRouter::GetInstance()
+{
+       return m_Instance;
+}
+
+void EventsRouter::Subscribe(const std::set<EventType>& types, const EventsInbox::Ptr& inbox)
+{
+       const auto& filter (inbox->GetFilter());
+       std::unique_lock<std::mutex> lock (m_Mutex);
+
+       for (auto type : types) {
+               auto perType (m_Subscribers.find(type));
+
+               if (perType == m_Subscribers.end()) {
+                       perType = m_Subscribers.emplace(type, decltype(perType->second)()).first;
+               }
+
+               auto perFilter (perType->second.find(filter));
+
+               if (perFilter == perType->second.end()) {
+                       perFilter = perType->second.emplace(filter, decltype(perFilter->second)()).first;
+               }
+
+               perFilter->second.emplace(inbox);
+       }
+}
+
+void EventsRouter::Unsubscribe(const std::set<EventType>& types, const EventsInbox::Ptr& inbox)
+{
+       const auto& filter (inbox->GetFilter());
+       std::unique_lock<std::mutex> lock (m_Mutex);
+
+       for (auto type : types) {
+               auto perType (m_Subscribers.find(type));
+
+               if (perType != m_Subscribers.end()) {
+                       auto perFilter (perType->second.find(filter));
+
+                       if (perFilter != perType->second.end()) {
+                               perFilter->second.erase(inbox);
+
+                               if (perFilter->second.empty()) {
+                                       perType->second.erase(perFilter);
+                               }
+                       }
+
+                       if (perType->second.empty()) {
+                               m_Subscribers.erase(perType);
+                       }
+               }
+       }
+}
+
+EventsFilter EventsRouter::GetInboxes(EventType type)
+{
+       std::unique_lock<std::mutex> lock (m_Mutex);
+
+       auto perType (m_Subscribers.find(type));
+
+       if (perType == m_Subscribers.end()) {
+               return EventsFilter({});
+       }
+
+       return EventsFilter(perType->second);
+}
index 1a53baabb26d151c14b8c865c8e5914d748127b8..426f9e9a5c1eb4a0e8aa5b701bddff30cd3c0b73 100644 (file)
@@ -6,12 +6,17 @@
 #include "remote/httphandler.hpp"
 #include "base/object.hpp"
 #include "config/expression.hpp"
+#include <boost/asio/deadline_timer.hpp>
 #include <boost/asio/spawn.hpp>
 #include <boost/thread/mutex.hpp>
 #include <boost/thread/condition_variable.hpp>
+#include <cstddef>
+#include <cstdint>
+#include <mutex>
 #include <set>
 #include <map>
 #include <deque>
+#include <queue>
 
 namespace icinga
 {
@@ -64,6 +69,106 @@ public:
        static EventQueueRegistry *GetInstance();
 };
 
+enum class EventType : uint_fast8_t
+{
+       AcknowledgementCleared,
+       AcknowledgementSet,
+       CheckResult,
+       CommentAdded,
+       CommentRemoved,
+       DowntimeAdded,
+       DowntimeRemoved,
+       DowntimeStarted,
+       DowntimeTriggered,
+       Flapping,
+       Notification,
+       StateChange
+};
+
+class EventsInbox : public Object
+{
+public:
+       DECLARE_PTR_TYPEDEFS(EventsInbox);
+
+       EventsInbox(String filter, const String& filterSource);
+       EventsInbox(const EventsInbox&) = delete;
+       EventsInbox(EventsInbox&&) = delete;
+       EventsInbox& operator=(const EventsInbox&) = delete;
+       EventsInbox& operator=(EventsInbox&&) = delete;
+       ~EventsInbox();
+
+       const std::shared_ptr<Expression>& GetFilter();
+
+       void Push(Dictionary::Ptr event);
+       Dictionary::Ptr Shift(boost::asio::yield_context yc, double timeout = 5);
+
+private:
+       struct Filter
+       {
+               std::size_t Refs;
+               std::shared_ptr<Expression> Expr;
+       };
+
+       static std::mutex m_FiltersMutex;
+       static std::map<String, Filter> m_Filters;
+
+       std::mutex m_Mutex;
+       decltype(m_Filters.begin()) m_Filter;
+       std::queue<Dictionary::Ptr> m_Queue;
+       boost::asio::deadline_timer m_Timer;
+};
+
+class EventsSubscriber
+{
+public:
+       EventsSubscriber(std::set<EventType> types, String filter, const String& filterSource);
+       EventsSubscriber(const EventsSubscriber&) = delete;
+       EventsSubscriber(EventsSubscriber&&) = delete;
+       EventsSubscriber& operator=(const EventsSubscriber&) = delete;
+       EventsSubscriber& operator=(EventsSubscriber&&) = delete;
+       ~EventsSubscriber();
+
+private:
+       std::set<EventType> m_Types;
+       EventsInbox::Ptr m_Inbox;
+};
+
+class EventsFilter
+{
+public:
+       EventsFilter(std::map<std::shared_ptr<Expression>, std::set<EventsInbox::Ptr>> inboxes);
+
+       operator bool();
+
+       void Push(Dictionary::Ptr event);
+
+private:
+       std::map<std::shared_ptr<Expression>, std::set<EventsInbox::Ptr>> m_Inboxes;
+};
+
+class EventsRouter
+{
+public:
+       static EventsRouter& GetInstance();
+
+       void Subscribe(const std::set<EventType>& types, const EventsInbox::Ptr& inbox);
+       void Unsubscribe(const std::set<EventType>& types, const EventsInbox::Ptr& inbox);
+       EventsFilter GetInboxes(EventType type);
+
+private:
+       static EventsRouter m_Instance;
+
+       EventsRouter() = default;
+       EventsRouter(const EventsRouter&) = delete;
+       EventsRouter(EventsRouter&&) = delete;
+       EventsRouter& operator=(const EventsRouter&) = delete;
+       EventsRouter& operator=(EventsRouter&&) = delete;
+       ~EventsRouter() = default;
+
+       std::mutex m_Mutex;
+       std::map<EventType, std::map<std::shared_ptr<Expression>, std::set<EventsInbox::Ptr>>> m_Subscribers;
+};
+
 }
 
 #endif /* EVENTQUEUE_H */