#include "remote/eventqueue.hpp"
#include "remote/filterutility.hpp"
+#include "base/io-engine.hpp"
#include "base/singleton.hpp"
#include "base/logger.hpp"
+#include <boost/asio/spawn.hpp>
using namespace icinga;
}
}
+Dictionary::Ptr EventQueue::WaitForEvent(void *client, boost::asio::yield_context yc)
+{
+ for (;;) {
+ {
+ boost::mutex::scoped_lock lock(m_Mutex);
+
+ auto it = m_Events.find(client);
+ ASSERT(it != m_Events.end());
+
+ if (!it->second.empty()) {
+ Dictionary::Ptr result = *it->second.begin();
+ it->second.pop_front();
+ return result;
+ }
+ }
+
+ IoBoundWorkSlot dontLockTheIoThreadWhileWaiting (yc);
+ }
+}
+
std::vector<EventQueue::Ptr> EventQueue::GetQueuesForType(const String& type)
{
EventQueueRegistry::ItemMap queues = EventQueueRegistry::GetInstance()->GetItems();
#include "remote/httphandler.hpp"
#include "base/object.hpp"
#include "config/expression.hpp"
+#include <boost/asio/spawn.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/thread/condition_variable.hpp>
#include <set>
void SetFilter(std::unique_ptr<Expression> filter);
Dictionary::Ptr WaitForEvent(void *client, double timeout = 5);
+ Dictionary::Ptr WaitForEvent(void *client, boost::asio::yield_context yc);
static std::vector<EventQueue::Ptr> GetQueuesForType(const String& type);
static void UnregisterIfUnused(const String& name, const EventQueue::Ptr& queue);
#include "config/configcompiler.hpp"
#include "config/expression.hpp"
#include "base/defer.hpp"
+#include "base/io-engine.hpp"
#include "base/objectlock.hpp"
#include "base/json.hpp"
#include <boost/asio/buffer.hpp>
response.result(http::status::ok);
response.set(http::field::content_type, "application/json");
- http::async_write(stream, response, yc);
- stream.async_flush(yc);
+ {
+ IoBoundWorkSlot dontLockTheIoThreadWhileWriting (yc);
+
+ http::async_write(stream, response, yc);
+ stream.async_flush(yc);
+ }
asio::const_buffer newLine ("\n", 1);
for (;;) {
- Dictionary::Ptr result = queue->WaitForEvent(&request);
-
- if (!result)
- continue;
-
- String body = JsonEncode(result);
+ String body = JsonEncode(queue->WaitForEvent(&request, yc));
boost::algorithm::replace_all(body, "\n", "");
asio::const_buffer payload (body.CStr(), body.GetLength());
+ IoBoundWorkSlot dontLockTheIoThreadWhileWriting (yc);
+
stream.async_write_some(payload, yc);
stream.async_write_some(newLine, yc);
stream.async_flush(yc);