/* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */
+#include "config/configcompiler.hpp"
#include "remote/eventqueue.hpp"
#include "remote/filterutility.hpp"
#include "base/io-engine.hpp"
#include "base/logger.hpp"
#include "base/utility.hpp"
#include <boost/asio/spawn.hpp>
+#include <boost/date_time/posix_time/posix_time_duration.hpp>
+#include <boost/date_time/posix_time/ptime.hpp>
+#include <boost/system/error_code.hpp>
+#include <utility>
using namespace icinga;
{
return Singleton<EventQueueRegistry>::GetInstance();
}
+
+std::mutex EventsInbox::m_FiltersMutex;
+std::map<String, EventsInbox::Filter> EventsInbox::m_Filters;
+
+EventsRouter EventsRouter::m_Instance;
+
+EventsInbox::EventsInbox(String filter, const String& filterSource)
+ : m_Timer(IoEngine::Get().GetIoService())
+{
+ std::unique_lock<std::mutex> lock (m_FiltersMutex);
+ m_Filter = m_Filters.find(filter);
+
+ if (m_Filter == m_Filters.end()) {
+ lock.unlock();
+
+ auto expr (ConfigCompiler::CompileText(filterSource, filter));
+
+ lock.lock();
+
+ m_Filter = m_Filters.find(filter);
+
+ if (m_Filter == m_Filters.end()) {
+ m_Filter = m_Filters.emplace(std::move(filter), Filter{1, std::shared_ptr<Expression>(expr.release())}).first;
+ } else {
+ ++m_Filter->second.Refs;
+ }
+ } else {
+ ++m_Filter->second.Refs;
+ }
+}
+
+EventsInbox::~EventsInbox()
+{
+ std::unique_lock<std::mutex> lock (m_FiltersMutex);
+
+ if (!--m_Filter->second.Refs) {
+ m_Filters.erase(m_Filter);
+ }
+}
+
+const std::shared_ptr<Expression>& EventsInbox::GetFilter()
+{
+ return m_Filter->second.Expr;
+}
+
+void EventsInbox::Push(Dictionary::Ptr event)
+{
+ std::unique_lock<std::mutex> lock (m_Mutex);
+
+ m_Queue.emplace(std::move(event));
+ m_Timer.expires_at(boost::posix_time::neg_infin);
+}
+
+Dictionary::Ptr EventsInbox::Shift(boost::asio::yield_context yc, double timeout)
+{
+ std::unique_lock<std::mutex> lock (m_Mutex, std::defer_lock);
+
+ m_Timer.expires_at(boost::posix_time::neg_infin);
+
+ {
+ boost::system::error_code ec;
+
+ while (!lock.try_lock()) {
+ m_Timer.async_wait(yc[ec]);
+ }
+ }
+
+ if (m_Queue.empty()) {
+ m_Timer.expires_from_now(boost::posix_time::milliseconds((unsigned long)(timeout * 1000.0)));
+ lock.unlock();
+
+ {
+ boost::system::error_code ec;
+ m_Timer.async_wait(yc[ec]);
+
+ while (!lock.try_lock()) {
+ m_Timer.async_wait(yc[ec]);
+ }
+ }
+
+ if (m_Queue.empty()) {
+ return nullptr;
+ }
+ }
+
+ auto event (std::move(m_Queue.front()));
+ m_Queue.pop();
+ return std::move(event);
+}
+
+EventsSubscriber::EventsSubscriber(std::set<EventType> types, String filter, const String& filterSource)
+ : m_Types(std::move(types)), m_Inbox(new EventsInbox(std::move(filter), filterSource))
+{
+ EventsRouter::GetInstance().Subscribe(m_Types, m_Inbox);
+}
+
+EventsSubscriber::~EventsSubscriber()
+{
+ EventsRouter::GetInstance().Unsubscribe(m_Types, m_Inbox);
+}
+
+EventsFilter::EventsFilter(std::map<std::shared_ptr<Expression>, std::set<EventsInbox::Ptr>> inboxes)
+ : m_Inboxes(std::move(inboxes))
+{
+}
+
+EventsFilter::operator bool()
+{
+ return !m_Inboxes.empty();
+}
+
+void EventsFilter::Push(Dictionary::Ptr event)
+{
+ for (auto& perFilter : m_Inboxes) {
+ ScriptFrame frame(true);
+ frame.Sandboxed = true;
+
+ try {
+ if (!FilterUtility::EvaluateFilter(frame, perFilter.first.get(), event, "event")) {
+ continue;
+ }
+ } catch (const std::exception& ex) {
+ Log(LogWarning, "EventQueue")
+ << "Error occurred while evaluating event filter for queue: " << DiagnosticInformation(ex);
+ continue;
+ }
+
+ for (auto& inbox : perFilter.second) {
+ inbox->Push(event);
+ }
+ }
+}
+
+EventsRouter& EventsRouter::GetInstance()
+{
+ return m_Instance;
+}
+
+void EventsRouter::Subscribe(const std::set<EventType>& types, const EventsInbox::Ptr& inbox)
+{
+ const auto& filter (inbox->GetFilter());
+ std::unique_lock<std::mutex> lock (m_Mutex);
+
+ for (auto type : types) {
+ auto perType (m_Subscribers.find(type));
+
+ if (perType == m_Subscribers.end()) {
+ perType = m_Subscribers.emplace(type, decltype(perType->second)()).first;
+ }
+
+ auto perFilter (perType->second.find(filter));
+
+ if (perFilter == perType->second.end()) {
+ perFilter = perType->second.emplace(filter, decltype(perFilter->second)()).first;
+ }
+
+ perFilter->second.emplace(inbox);
+ }
+}
+
+void EventsRouter::Unsubscribe(const std::set<EventType>& types, const EventsInbox::Ptr& inbox)
+{
+ const auto& filter (inbox->GetFilter());
+ std::unique_lock<std::mutex> lock (m_Mutex);
+
+ for (auto type : types) {
+ auto perType (m_Subscribers.find(type));
+
+ if (perType != m_Subscribers.end()) {
+ auto perFilter (perType->second.find(filter));
+
+ if (perFilter != perType->second.end()) {
+ perFilter->second.erase(inbox);
+
+ if (perFilter->second.empty()) {
+ perType->second.erase(perFilter);
+ }
+ }
+
+ if (perType->second.empty()) {
+ m_Subscribers.erase(perType);
+ }
+ }
+ }
+}
+
+EventsFilter EventsRouter::GetInboxes(EventType type)
+{
+ std::unique_lock<std::mutex> lock (m_Mutex);
+
+ auto perType (m_Subscribers.find(type));
+
+ if (perType == m_Subscribers.end()) {
+ return EventsFilter({});
+ }
+
+ return EventsFilter(perType->second);
+}
#include "remote/httphandler.hpp"
#include "base/object.hpp"
#include "config/expression.hpp"
+#include <boost/asio/deadline_timer.hpp>
#include <boost/asio/spawn.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/thread/condition_variable.hpp>
+#include <cstddef>
+#include <cstdint>
+#include <mutex>
#include <set>
#include <map>
#include <deque>
+#include <queue>
namespace icinga
{
static EventQueueRegistry *GetInstance();
};
+enum class EventType : uint_fast8_t
+{
+ AcknowledgementCleared,
+ AcknowledgementSet,
+ CheckResult,
+ CommentAdded,
+ CommentRemoved,
+ DowntimeAdded,
+ DowntimeRemoved,
+ DowntimeStarted,
+ DowntimeTriggered,
+ Flapping,
+ Notification,
+ StateChange
+};
+
+class EventsInbox : public Object
+{
+public:
+ DECLARE_PTR_TYPEDEFS(EventsInbox);
+
+ EventsInbox(String filter, const String& filterSource);
+ EventsInbox(const EventsInbox&) = delete;
+ EventsInbox(EventsInbox&&) = delete;
+ EventsInbox& operator=(const EventsInbox&) = delete;
+ EventsInbox& operator=(EventsInbox&&) = delete;
+ ~EventsInbox();
+
+ const std::shared_ptr<Expression>& GetFilter();
+
+ void Push(Dictionary::Ptr event);
+ Dictionary::Ptr Shift(boost::asio::yield_context yc, double timeout = 5);
+
+private:
+ struct Filter
+ {
+ std::size_t Refs;
+ std::shared_ptr<Expression> Expr;
+ };
+
+ static std::mutex m_FiltersMutex;
+ static std::map<String, Filter> m_Filters;
+
+ std::mutex m_Mutex;
+ decltype(m_Filters.begin()) m_Filter;
+ std::queue<Dictionary::Ptr> m_Queue;
+ boost::asio::deadline_timer m_Timer;
+};
+
+class EventsSubscriber
+{
+public:
+ EventsSubscriber(std::set<EventType> types, String filter, const String& filterSource);
+ EventsSubscriber(const EventsSubscriber&) = delete;
+ EventsSubscriber(EventsSubscriber&&) = delete;
+ EventsSubscriber& operator=(const EventsSubscriber&) = delete;
+ EventsSubscriber& operator=(EventsSubscriber&&) = delete;
+ ~EventsSubscriber();
+
+private:
+ std::set<EventType> m_Types;
+ EventsInbox::Ptr m_Inbox;
+};
+
+class EventsFilter
+{
+public:
+ EventsFilter(std::map<std::shared_ptr<Expression>, std::set<EventsInbox::Ptr>> inboxes);
+
+ operator bool();
+
+ void Push(Dictionary::Ptr event);
+
+private:
+ std::map<std::shared_ptr<Expression>, std::set<EventsInbox::Ptr>> m_Inboxes;
+};
+
+class EventsRouter
+{
+public:
+ static EventsRouter& GetInstance();
+
+ void Subscribe(const std::set<EventType>& types, const EventsInbox::Ptr& inbox);
+ void Unsubscribe(const std::set<EventType>& types, const EventsInbox::Ptr& inbox);
+ EventsFilter GetInboxes(EventType type);
+
+private:
+ static EventsRouter m_Instance;
+
+ EventsRouter() = default;
+ EventsRouter(const EventsRouter&) = delete;
+ EventsRouter(EventsRouter&&) = delete;
+ EventsRouter& operator=(const EventsRouter&) = delete;
+ EventsRouter& operator=(EventsRouter&&) = delete;
+ ~EventsRouter() = default;
+
+ std::mutex m_Mutex;
+ std::map<EventType, std::map<std::shared_ptr<Expression>, std::set<EventsInbox::Ptr>>> m_Subscribers;
+};
+
}
#endif /* EVENTQUEUE_H */