}
}
-Dictionary::Ptr EventQueue::WaitForEvent(void *client, boost::asio::yield_context yc, double timeout)
-{
- double deadline = -1.0;
-
- for (;;) {
- {
- boost::mutex::scoped_try_lock lock(m_Mutex);
-
- if (lock.owns_lock()) {
- auto it = m_Events.find(client);
- ASSERT(it != m_Events.end());
-
- if (it->second.empty()) {
- if (deadline == -1.0) {
- deadline = Utility::GetTime() + timeout;
- } else if (Utility::GetTime() >= deadline) {
- return nullptr;
- }
- } else {
- Dictionary::Ptr result = *it->second.begin();
- it->second.pop_front();
- return result;
- }
- }
- }
-
- IoBoundWorkSlot dontLockTheIoThreadWhileWaiting (yc);
- }
-}
-
std::vector<EventQueue::Ptr> EventQueue::GetQueuesForType(const String& type)
{
EventQueueRegistry::ItemMap queues = EventQueueRegistry::GetInstance()->GetItems();
EventsRouter::GetInstance().Unsubscribe(m_Types, m_Inbox);
}
+const EventsInbox::Ptr& EventsSubscriber::GetInbox()
+{
+ return m_Inbox;
+}
+
EventsFilter::EventsFilter(std::map<std::shared_ptr<Expression>, std::set<EventsInbox::Ptr>> inboxes)
: m_Inboxes(std::move(inboxes))
{
void SetFilter(std::unique_ptr<Expression> filter);
Dictionary::Ptr WaitForEvent(void *client, double timeout = 5);
- Dictionary::Ptr WaitForEvent(void *client, boost::asio::yield_context yc, double timeout = 5);
static std::vector<EventQueue::Ptr> GetQueuesForType(const String& type);
static void UnregisterIfUnused(const String& name, const EventQueue::Ptr& queue);
EventsSubscriber& operator=(EventsSubscriber&&) = delete;
~EventsSubscriber();
+ const EventsInbox::Ptr& GetInbox();
+
private:
std::set<EventType> m_Types;
EventsInbox::Ptr m_Inbox;
#include <boost/asio/buffer.hpp>
#include <boost/asio/write.hpp>
#include <boost/algorithm/string/replace.hpp>
+#include <map>
+#include <set>
using namespace icinga;
REGISTER_URLHANDLER("/v1/events", EventsHandler);
+const std::map<String, EventType> l_EventTypes ({
+ {"AcknowledgementCleared", EventType::AcknowledgementCleared},
+ {"AcknowledgementSet", EventType::AcknowledgementSet},
+ {"CheckResult", EventType::CheckResult},
+ {"CommentAdded", EventType::CommentAdded},
+ {"CommentRemoved", EventType::CommentRemoved},
+ {"DowntimeAdded", EventType::DowntimeAdded},
+ {"DowntimeRemoved", EventType::DowntimeRemoved},
+ {"DowntimeStarted", EventType::DowntimeStarted},
+ {"DowntimeTriggered", EventType::DowntimeTriggered},
+ {"Flapping", EventType::Flapping},
+ {"Notification", EventType::Notification},
+ {"StateChange", EventType::StateChange}
+});
+
+const String l_ApiQuery ("<API query>");
+
bool EventsHandler::HandleRequest(
AsioTlsStream& stream,
const ApiUser::Ptr& user,
return true;
}
- String filter = HttpUtility::GetLastParameter(params, "filter");
-
- std::unique_ptr<Expression> ufilter;
-
- if (!filter.IsEmpty())
- ufilter = ConfigCompiler::CompileText("<API query>", filter);
+ std::set<EventType> eventTypes;
- /* create a new queue or update an existing one */
- EventQueue::Ptr queue = EventQueue::GetByName(queueName);
+ {
+ ObjectLock olock(types);
+ for (const String& type : types) {
+ auto typeId (l_EventTypes.find(type));
- if (!queue) {
- queue = new EventQueue(queueName);
- EventQueue::Register(queueName, queue);
+ if (typeId != l_EventTypes.end()) {
+ eventTypes.emplace(typeId->second);
+ }
+ }
}
- queue->SetTypes(types->ToSet<String>());
- queue->SetFilter(std::move(ufilter));
-
- queue->AddClient(&request);
-
- Defer removeClient ([&queue, &request, &queueName]() {
- queue->RemoveClient(&request);
- EventQueue::UnregisterIfUnused(queueName, queue);
- });
+ EventsSubscriber subscriber (std::move(eventTypes), HttpUtility::GetLastParameter(params, "filter"), l_ApiQuery);
server.StartStreaming();
AsioConditionVariable dontLockOwnStrand (stream.get_io_service(), true);
for (;;) {
- auto event (queue->WaitForEvent(&request, yc));
+ auto event (subscriber.GetInbox()->Shift(yc));
if (event) {
String body = JsonEncode(event);