#include "base/io-engine.hpp"
#include "base/singleton.hpp"
#include "base/logger.hpp"
+#include "base/utility.hpp"
#include <boost/asio/spawn.hpp>
using namespace icinga;
}
}
-Dictionary::Ptr EventQueue::WaitForEvent(void *client, boost::asio::yield_context yc)
+Dictionary::Ptr EventQueue::WaitForEvent(void *client, boost::asio::yield_context yc, double timeout)
{
+ double deadline = -1.0;
+
for (;;) {
{
boost::mutex::scoped_try_lock lock(m_Mutex);
auto it = m_Events.find(client);
ASSERT(it != m_Events.end());
- if (!it->second.empty()) {
+ if (it->second.empty()) {
+ if (deadline == -1.0) {
+ deadline = Utility::GetTime() + timeout;
+ } else if (Utility::GetTime() >= deadline) {
+ return nullptr;
+ }
+ } else {
Dictionary::Ptr result = *it->second.begin();
it->second.pop_front();
return result;
void SetFilter(std::unique_ptr<Expression> filter);
Dictionary::Ptr WaitForEvent(void *client, double timeout = 5);
- Dictionary::Ptr WaitForEvent(void *client, boost::asio::yield_context yc);
+ Dictionary::Ptr WaitForEvent(void *client, boost::asio::yield_context yc, double timeout = 5);
static std::vector<EventQueue::Ptr> GetQueuesForType(const String& type);
static void UnregisterIfUnused(const String& name, const EventQueue::Ptr& queue);
asio::const_buffer newLine ("\n", 1);
for (;;) {
- String body = JsonEncode(queue->WaitForEvent(&request, yc));
+ auto event (queue->WaitForEvent(&request, yc));
- boost::algorithm::replace_all(body, "\n", "");
+ if (event) {
+ String body = JsonEncode(event);
- asio::const_buffer payload (body.CStr(), body.GetLength());
+ boost::algorithm::replace_all(body, "\n", "");
- IoBoundWorkSlot dontLockTheIoThreadWhileWriting (yc);
+ asio::const_buffer payload (body.CStr(), body.GetLength());
- asio::async_write(stream, payload, yc);
- asio::async_write(stream, newLine, yc);
- stream.async_flush(yc);
+ IoBoundWorkSlot dontLockTheIoThreadWhileWriting (yc);
+
+ asio::async_write(stream, payload, yc);
+ asio::async_write(stream, newLine, yc);
+ stream.async_flush(yc);
+ }
}
}