]> granicus.if.org Git - icinga2/commitdiff
Introduce AsioConditionVariable
authorAlexander A. Klimov <alexander.klimov@icinga.com>
Fri, 22 Feb 2019 15:13:28 +0000 (16:13 +0100)
committerAlexander A. Klimov <alexander.klimov@icinga.com>
Mon, 1 Apr 2019 11:31:16 +0000 (13:31 +0200)
lib/base/io-engine.cpp
lib/base/io-engine.hpp
lib/remote/jsonrpcconnection-heartbeat.cpp
lib/remote/jsonrpcconnection.cpp
lib/remote/jsonrpcconnection.hpp

index c93a4d87a489a544e4894716c84aa1b718994a49..b7005055228dae179e33828f11169966bafa1e13 100644 (file)
@@ -27,6 +27,7 @@
 #include <boost/asio/io_service.hpp>
 #include <boost/asio/spawn.hpp>
 #include <boost/date_time/posix_time/ptime.hpp>
+#include <boost/system/error_code.hpp>
 
 using namespace icinga;
 
@@ -137,3 +138,25 @@ void IoEngine::RunEventLoop()
                }
        }
 }
+
+AsioConditionVariable::AsioConditionVariable(boost::asio::io_service& io, bool init)
+       : m_Timer(io)
+{
+       m_Timer.expires_at(init ? boost::posix_time::neg_infin : boost::posix_time::pos_infin);
+}
+
+void AsioConditionVariable::Set()
+{
+       m_Timer.expires_at(boost::posix_time::neg_infin);
+}
+
+void AsioConditionVariable::Clear()
+{
+       m_Timer.expires_at(boost::posix_time::pos_infin);
+}
+
+void AsioConditionVariable::Wait(boost::asio::yield_context yc)
+{
+       boost::system::error_code ec;
+       m_Timer.async_wait(yc[ec]);
+}
index 65059db68702a2220659235c747d1d78d4f0657f..d085250816a0573e86b3c1737226ffdff8a69646 100644 (file)
@@ -109,4 +109,22 @@ class TerminateIoThread : public std::exception
 {
 };
 
+/**
+ * Condition variable which doesn't block I/O threads
+ *
+ * @ingroup base
+ */
+class AsioConditionVariable
+{
+public:
+       AsioConditionVariable(boost::asio::io_service& io, bool init = false);
+
+       void Set();
+       void Clear();
+       void Wait(boost::asio::yield_context yc);
+
+private:
+       boost::asio::deadline_timer m_Timer;
+};
+
 #endif /* IO_ENGINE_H */
index 569ffbb1cd46ad0309933d86072f8308f5c95159..da6afe4b72967c97eeba35e320862da2b840fbae 100644 (file)
@@ -44,7 +44,7 @@ void JsonRpcConnection::HandleAndWriteHeartbeats(boost::asio::yield_context yc)
                        }) }
                }));
 
-               m_OutgoingMessagesQueued.expires_at(boost::posix_time::neg_infin);
+               m_OutgoingMessagesQueued.Set();
        }
 }
 
index fa8b65ffc8347352f03b4cdefa1c96f085adb4f9..a0f37f950339f9466828b16193e7e9ffdb7f71af 100644 (file)
@@ -17,7 +17,6 @@
 #include <boost/asio/deadline_timer.hpp>
 #include <boost/asio/spawn.hpp>
 #include <boost/date_time/posix_time/posix_time_duration.hpp>
-#include <boost/date_time/posix_time/ptime.hpp>
 #include <boost/thread/once.hpp>
 
 using namespace icinga;
@@ -35,9 +34,6 @@ JsonRpcConnection::JsonRpcConnection(const String& identity, bool authenticated,
 {
        if (authenticated)
                m_Endpoint = Endpoint::GetByName(identity);
-
-       m_OutgoingMessagesQueued.expires_at(boost::posix_time::pos_infin);
-       m_WriterDone.expires_at(boost::posix_time::pos_infin);
 }
 
 void JsonRpcConnection::Start()
@@ -97,18 +93,15 @@ void JsonRpcConnection::WriteOutgoingMessages(boost::asio::yield_context yc)
 {
        Defer disconnect ([this]() { Disconnect(); });
 
-       Defer signalWriterDone ([this]() { m_WriterDone.expires_at(boost::posix_time::neg_infin); });
+       Defer signalWriterDone ([this]() { m_WriterDone.Set(); });
 
        do {
-               try {
-                       m_OutgoingMessagesQueued.async_wait(yc);
-               } catch (...) {
-               }
+               m_OutgoingMessagesQueued.Wait(yc);
 
                auto queue (std::move(m_OutgoingMessagesQueue));
 
                m_OutgoingMessagesQueue.clear();
-               m_OutgoingMessagesQueued.expires_at(boost::posix_time::pos_infin);
+               m_OutgoingMessagesQueued.Clear();
 
                if (!queue.empty()) {
                        try {
@@ -169,7 +162,7 @@ void JsonRpcConnection::SendMessage(const Dictionary::Ptr& message)
 {
        m_IoStrand.post([this, message]() {
                m_OutgoingMessagesQueue.emplace_back(message);
-               m_OutgoingMessagesQueued.expires_at(boost::posix_time::neg_infin);
+               m_OutgoingMessagesQueued.Set();
        });
 }
 
@@ -186,12 +179,9 @@ void JsonRpcConnection::Disconnect()
                        Log(LogWarning, "JsonRpcConnection")
                                << "API client disconnected for identity '" << m_Identity << "'";
 
-                       m_OutgoingMessagesQueued.expires_at(boost::posix_time::neg_infin);
+                       m_OutgoingMessagesQueued.Set();
 
-                       try {
-                               m_WriterDone.async_wait(yc);
-                       } catch (...) {
-                       }
+                       m_WriterDone.Wait(yc);
 
                        try {
                                m_Stream->next_layer().async_shutdown(yc);
@@ -288,7 +278,7 @@ void JsonRpcConnection::MessageHandler(const String& jsonString)
                resultMessage->Set("id", message->Get("id"));
 
                m_OutgoingMessagesQueue.emplace_back(resultMessage);
-               m_OutgoingMessagesQueued.expires_at(boost::posix_time::neg_infin);
+               m_OutgoingMessagesQueued.Set();
        }
 }
 
index bc5fa398d6012d13a6c5e0bdcaec9e9d248a09e3..b0679d36818e65cf8f78c9b4d70163ee9628419c 100644 (file)
@@ -5,6 +5,7 @@
 
 #include "remote/i2-remote.hpp"
 #include "remote/endpoint.hpp"
+#include "base/io-engine.hpp"
 #include "base/tlsstream.hpp"
 #include "base/timer.hpp"
 #include "base/workqueue.hpp"
@@ -12,7 +13,6 @@
 #include <vector>
 #include <boost/asio/io_service_strand.hpp>
 #include <boost/asio/spawn.hpp>
-#include <boost/asio/deadline_timer.hpp>
 
 namespace icinga
 {
@@ -73,8 +73,8 @@ private:
        double m_NextHeartbeat;
        boost::asio::io_service::strand m_IoStrand;
        std::vector<Dictionary::Ptr> m_OutgoingMessagesQueue;
-       boost::asio::deadline_timer m_OutgoingMessagesQueued;
-       boost::asio::deadline_timer m_WriterDone;
+       AsioConditionVariable m_OutgoingMessagesQueued;
+       AsioConditionVariable m_WriterDone;
        bool m_ShuttingDown;
 
        void HandleIncomingMessages(boost::asio::yield_context yc);