]> granicus.if.org Git - icinga2/commitdiff
/v1/events: use new event queue
authorAlexander A. Klimov <alexander.klimov@icinga.com>
Fri, 5 Apr 2019 15:34:46 +0000 (17:34 +0200)
committerMichael Friedrich <michael.friedrich@icinga.com>
Thu, 25 Apr 2019 13:56:38 +0000 (15:56 +0200)
lib/remote/eventqueue.cpp
lib/remote/eventqueue.hpp
lib/remote/eventshandler.cpp

index 1865a39fae44e22cd98494a095806b78886fd9cf..83c7c6fc1b71224044347046228ad6b2489f3f87 100644 (file)
@@ -109,36 +109,6 @@ Dictionary::Ptr EventQueue::WaitForEvent(void *client, double timeout)
        }
 }
 
-Dictionary::Ptr EventQueue::WaitForEvent(void *client, boost::asio::yield_context yc, double timeout)
-{
-       double deadline = -1.0;
-
-       for (;;) {
-               {
-                       boost::mutex::scoped_try_lock lock(m_Mutex);
-
-                       if (lock.owns_lock()) {
-                               auto it = m_Events.find(client);
-                               ASSERT(it != m_Events.end());
-
-                               if (it->second.empty()) {
-                                       if (deadline == -1.0) {
-                                               deadline = Utility::GetTime() + timeout;
-                                       } else if (Utility::GetTime() >= deadline) {
-                                               return nullptr;
-                                       }
-                               } else {
-                                       Dictionary::Ptr result = *it->second.begin();
-                                       it->second.pop_front();
-                                       return result;
-                               }
-                       }
-               }
-
-               IoBoundWorkSlot dontLockTheIoThreadWhileWaiting (yc);
-       }
-}
-
 std::vector<EventQueue::Ptr> EventQueue::GetQueuesForType(const String& type)
 {
        EventQueueRegistry::ItemMap queues = EventQueueRegistry::GetInstance()->GetItems();
@@ -274,6 +244,11 @@ EventsSubscriber::~EventsSubscriber()
        EventsRouter::GetInstance().Unsubscribe(m_Types, m_Inbox);
 }
 
+const EventsInbox::Ptr& EventsSubscriber::GetInbox()
+{
+       return m_Inbox;
+}
+
 EventsFilter::EventsFilter(std::map<std::shared_ptr<Expression>, std::set<EventsInbox::Ptr>> inboxes)
        : m_Inboxes(std::move(inboxes))
 {
index 426f9e9a5c1eb4a0e8aa5b701bddff30cd3c0b73..29f9314f73088f36f87f6b2343b0b2039d74c165 100644 (file)
@@ -37,7 +37,6 @@ public:
        void SetFilter(std::unique_ptr<Expression> filter);
 
        Dictionary::Ptr WaitForEvent(void *client, double timeout = 5);
-       Dictionary::Ptr WaitForEvent(void *client, boost::asio::yield_context yc, double timeout = 5);
 
        static std::vector<EventQueue::Ptr> GetQueuesForType(const String& type);
        static void UnregisterIfUnused(const String& name, const EventQueue::Ptr& queue);
@@ -128,6 +127,8 @@ public:
        EventsSubscriber& operator=(EventsSubscriber&&) = delete;
        ~EventsSubscriber();
 
+       const EventsInbox::Ptr& GetInbox();
+
 private:
        std::set<EventType> m_Types;
        EventsInbox::Ptr m_Inbox;
index 98e8153543ae299043a8110d758188a06f0b00ec..38326e2a50707267060687c83ccad63c1dd57883 100644 (file)
 #include <boost/asio/buffer.hpp>
 #include <boost/asio/write.hpp>
 #include <boost/algorithm/string/replace.hpp>
+#include <map>
+#include <set>
 
 using namespace icinga;
 
 REGISTER_URLHANDLER("/v1/events", EventsHandler);
 
+const std::map<String, EventType> l_EventTypes ({
+       {"AcknowledgementCleared", EventType::AcknowledgementCleared},
+       {"AcknowledgementSet", EventType::AcknowledgementSet},
+       {"CheckResult", EventType::CheckResult},
+       {"CommentAdded", EventType::CommentAdded},
+       {"CommentRemoved", EventType::CommentRemoved},
+       {"DowntimeAdded", EventType::DowntimeAdded},
+       {"DowntimeRemoved", EventType::DowntimeRemoved},
+       {"DowntimeStarted", EventType::DowntimeStarted},
+       {"DowntimeTriggered", EventType::DowntimeTriggered},
+       {"Flapping", EventType::Flapping},
+       {"Notification", EventType::Notification},
+       {"StateChange", EventType::StateChange}
+});
+
+const String l_ApiQuery ("<API query>");
+
 bool EventsHandler::HandleRequest(
        AsioTlsStream& stream,
        const ApiUser::Ptr& user,
@@ -63,30 +82,20 @@ bool EventsHandler::HandleRequest(
                return true;
        }
 
-       String filter = HttpUtility::GetLastParameter(params, "filter");
-
-       std::unique_ptr<Expression> ufilter;
-
-       if (!filter.IsEmpty())
-               ufilter = ConfigCompiler::CompileText("<API query>", filter);
+       std::set<EventType> eventTypes;
 
-       /* create a new queue or update an existing one */
-       EventQueue::Ptr queue = EventQueue::GetByName(queueName);
+       {
+               ObjectLock olock(types);
+               for (const String& type : types) {
+                       auto typeId (l_EventTypes.find(type));
 
-       if (!queue) {
-               queue = new EventQueue(queueName);
-               EventQueue::Register(queueName, queue);
+                       if (typeId != l_EventTypes.end()) {
+                               eventTypes.emplace(typeId->second);
+                       }
+               }
        }
 
-       queue->SetTypes(types->ToSet<String>());
-       queue->SetFilter(std::move(ufilter));
-
-       queue->AddClient(&request);
-
-       Defer removeClient ([&queue, &request, &queueName]() {
-               queue->RemoveClient(&request);
-               EventQueue::UnregisterIfUnused(queueName, queue);
-       });
+       EventsSubscriber subscriber (std::move(eventTypes), HttpUtility::GetLastParameter(params, "filter"), l_ApiQuery);
 
        server.StartStreaming();
 
@@ -104,7 +113,7 @@ bool EventsHandler::HandleRequest(
        AsioConditionVariable dontLockOwnStrand (stream.get_io_service(), true);
 
        for (;;) {
-               auto event (queue->WaitForEvent(&request, yc));
+               auto event (subscriber.GetInbox()->Shift(yc));
 
                if (event) {
                        String body = JsonEncode(event);