]> granicus.if.org Git - icinga2/commitdiff
/v1/events: don't lock I/O thread
authorAlexander A. Klimov <alexander.klimov@icinga.com>
Fri, 15 Feb 2019 11:32:22 +0000 (12:32 +0100)
committerAlexander A. Klimov <alexander.klimov@icinga.com>
Mon, 1 Apr 2019 09:40:14 +0000 (11:40 +0200)
lib/remote/eventqueue.cpp
lib/remote/eventqueue.hpp
lib/remote/eventshandler.cpp

index 20c4af688ec1169d980c74baf42050ace7481e63..6defc211482f31e732958657eec58e62c5cf9f89 100644 (file)
@@ -2,8 +2,10 @@
 
 #include "remote/eventqueue.hpp"
 #include "remote/filterutility.hpp"
+#include "base/io-engine.hpp"
 #include "base/singleton.hpp"
 #include "base/logger.hpp"
+#include <boost/asio/spawn.hpp>
 
 using namespace icinga;
 
@@ -100,6 +102,26 @@ Dictionary::Ptr EventQueue::WaitForEvent(void *client, double timeout)
        }
 }
 
+Dictionary::Ptr EventQueue::WaitForEvent(void *client, boost::asio::yield_context yc)
+{
+       for (;;) {
+               {
+                       boost::mutex::scoped_lock lock(m_Mutex);
+
+                       auto it = m_Events.find(client);
+                       ASSERT(it != m_Events.end());
+
+                       if (!it->second.empty()) {
+                               Dictionary::Ptr result = *it->second.begin();
+                               it->second.pop_front();
+                               return result;
+                       }
+               }
+
+               IoBoundWorkSlot dontLockTheIoThreadWhileWaiting (yc);
+       }
+}
+
 std::vector<EventQueue::Ptr> EventQueue::GetQueuesForType(const String& type)
 {
        EventQueueRegistry::ItemMap queues = EventQueueRegistry::GetInstance()->GetItems();
index 45d23af23bdd42f691ddf3744bbdbb79b75690d2..8f6a76c0b29c7df1b33de08e70afa0dc30c84d03 100644 (file)
@@ -6,6 +6,7 @@
 #include "remote/httphandler.hpp"
 #include "base/object.hpp"
 #include "config/expression.hpp"
+#include <boost/asio/spawn.hpp>
 #include <boost/thread/mutex.hpp>
 #include <boost/thread/condition_variable.hpp>
 #include <set>
@@ -31,6 +32,7 @@ public:
        void SetFilter(std::unique_ptr<Expression> filter);
 
        Dictionary::Ptr WaitForEvent(void *client, double timeout = 5);
+       Dictionary::Ptr WaitForEvent(void *client, boost::asio::yield_context yc);
 
        static std::vector<EventQueue::Ptr> GetQueuesForType(const String& type);
        static void UnregisterIfUnused(const String& name, const EventQueue::Ptr& queue);
index 99843d481d5460217fb716a49d1f3565dec611d7..cf07d63056ecbbf2d07dd48c86043ff518a2e653 100644 (file)
@@ -6,6 +6,7 @@
 #include "config/configcompiler.hpp"
 #include "config/expression.hpp"
 #include "base/defer.hpp"
+#include "base/io-engine.hpp"
 #include "base/objectlock.hpp"
 #include "base/json.hpp"
 #include <boost/asio/buffer.hpp>
@@ -91,23 +92,24 @@ bool EventsHandler::HandleRequest(
        response.result(http::status::ok);
        response.set(http::field::content_type, "application/json");
 
-       http::async_write(stream, response, yc);
-       stream.async_flush(yc);
+       {
+               IoBoundWorkSlot dontLockTheIoThreadWhileWriting (yc);
+
+               http::async_write(stream, response, yc);
+               stream.async_flush(yc);
+       }
 
        asio::const_buffer newLine ("\n", 1);
 
        for (;;) {
-               Dictionary::Ptr result = queue->WaitForEvent(&request);
-
-               if (!result)
-                       continue;
-
-               String body = JsonEncode(result);
+               String body = JsonEncode(queue->WaitForEvent(&request, yc));
 
                boost::algorithm::replace_all(body, "\n", "");
 
                asio::const_buffer payload (body.CStr(), body.GetLength());
 
+               IoBoundWorkSlot dontLockTheIoThreadWhileWriting (yc);
+
                stream.async_write_some(payload, yc);
                stream.async_write_some(newLine, yc);
                stream.async_flush(yc);