#include <boost/asio/io_service.hpp>
#include <boost/asio/spawn.hpp>
#include <boost/date_time/posix_time/ptime.hpp>
+#include <boost/system/error_code.hpp>
using namespace icinga;
}
}
}
+
+AsioConditionVariable::AsioConditionVariable(boost::asio::io_service& io, bool init)
+ : m_Timer(io)
+{
+ m_Timer.expires_at(init ? boost::posix_time::neg_infin : boost::posix_time::pos_infin);
+}
+
+void AsioConditionVariable::Set()
+{
+ m_Timer.expires_at(boost::posix_time::neg_infin);
+}
+
+void AsioConditionVariable::Clear()
+{
+ m_Timer.expires_at(boost::posix_time::pos_infin);
+}
+
+void AsioConditionVariable::Wait(boost::asio::yield_context yc)
+{
+ boost::system::error_code ec;
+ m_Timer.async_wait(yc[ec]);
+}
{
};
+/**
+ * Condition variable which doesn't block I/O threads
+ *
+ * @ingroup base
+ */
+class AsioConditionVariable
+{
+public:
+ AsioConditionVariable(boost::asio::io_service& io, bool init = false);
+
+ void Set();
+ void Clear();
+ void Wait(boost::asio::yield_context yc);
+
+private:
+ boost::asio::deadline_timer m_Timer;
+};
+
#endif /* IO_ENGINE_H */
}) }
}));
- m_OutgoingMessagesQueued.expires_at(boost::posix_time::neg_infin);
+ m_OutgoingMessagesQueued.Set();
}
}
#include <boost/asio/deadline_timer.hpp>
#include <boost/asio/spawn.hpp>
#include <boost/date_time/posix_time/posix_time_duration.hpp>
-#include <boost/date_time/posix_time/ptime.hpp>
#include <boost/thread/once.hpp>
using namespace icinga;
{
if (authenticated)
m_Endpoint = Endpoint::GetByName(identity);
-
- m_OutgoingMessagesQueued.expires_at(boost::posix_time::pos_infin);
- m_WriterDone.expires_at(boost::posix_time::pos_infin);
}
void JsonRpcConnection::Start()
{
Defer disconnect ([this]() { Disconnect(); });
- Defer signalWriterDone ([this]() { m_WriterDone.expires_at(boost::posix_time::neg_infin); });
+ Defer signalWriterDone ([this]() { m_WriterDone.Set(); });
do {
- try {
- m_OutgoingMessagesQueued.async_wait(yc);
- } catch (...) {
- }
+ m_OutgoingMessagesQueued.Wait(yc);
auto queue (std::move(m_OutgoingMessagesQueue));
m_OutgoingMessagesQueue.clear();
- m_OutgoingMessagesQueued.expires_at(boost::posix_time::pos_infin);
+ m_OutgoingMessagesQueued.Clear();
if (!queue.empty()) {
try {
{
m_IoStrand.post([this, message]() {
m_OutgoingMessagesQueue.emplace_back(message);
- m_OutgoingMessagesQueued.expires_at(boost::posix_time::neg_infin);
+ m_OutgoingMessagesQueued.Set();
});
}
Log(LogWarning, "JsonRpcConnection")
<< "API client disconnected for identity '" << m_Identity << "'";
- m_OutgoingMessagesQueued.expires_at(boost::posix_time::neg_infin);
+ m_OutgoingMessagesQueued.Set();
- try {
- m_WriterDone.async_wait(yc);
- } catch (...) {
- }
+ m_WriterDone.Wait(yc);
try {
m_Stream->next_layer().async_shutdown(yc);
resultMessage->Set("id", message->Get("id"));
m_OutgoingMessagesQueue.emplace_back(resultMessage);
- m_OutgoingMessagesQueued.expires_at(boost::posix_time::neg_infin);
+ m_OutgoingMessagesQueued.Set();
}
}
#include "remote/i2-remote.hpp"
#include "remote/endpoint.hpp"
+#include "base/io-engine.hpp"
#include "base/tlsstream.hpp"
#include "base/timer.hpp"
#include "base/workqueue.hpp"
#include <vector>
#include <boost/asio/io_service_strand.hpp>
#include <boost/asio/spawn.hpp>
-#include <boost/asio/deadline_timer.hpp>
namespace icinga
{
double m_NextHeartbeat;
boost::asio::io_service::strand m_IoStrand;
std::vector<Dictionary::Ptr> m_OutgoingMessagesQueue;
- boost::asio::deadline_timer m_OutgoingMessagesQueued;
- boost::asio::deadline_timer m_WriterDone;
+ AsioConditionVariable m_OutgoingMessagesQueued;
+ AsioConditionVariable m_WriterDone;
bool m_ShuttingDown;
void HandleIncomingMessages(boost::asio::yield_context yc);