]> granicus.if.org Git - icinga2/commitdiff
Re-add JsonRpcConnection#Disconnect()
authorAlexander A. Klimov <alexander.klimov@icinga.com>
Wed, 20 Feb 2019 11:00:11 +0000 (12:00 +0100)
committerAlexander A. Klimov <alexander.klimov@icinga.com>
Mon, 1 Apr 2019 11:31:16 +0000 (13:31 +0200)
lib/remote/apilistener.cpp
lib/remote/jsonrpcconnection.cpp
lib/remote/jsonrpcconnection.hpp

index 56fda20e512a00b032d28ec2a2d06299fc247824..dccb8867fecf6c563defd44fb512dc149127785b 100644 (file)
@@ -213,6 +213,16 @@ void ApiListener::UpdateSSLContext()
        }
 
        m_SSLContext = context;
+
+       for (const Endpoint::Ptr& endpoint : ConfigType::GetObjectsByType<Endpoint>()) {
+               for (const JsonRpcConnection::Ptr& client : endpoint->GetClients()) {
+                       client->Disconnect();
+               }
+       }
+
+       for (const JsonRpcConnection::Ptr& client : m_AnonymousClients) {
+               client->Disconnect();
+       }
 }
 
 void ApiListener::OnAllConfigLoaded()
@@ -841,6 +851,8 @@ void ApiListener::ApiTimerHandler()
                for (const JsonRpcConnection::Ptr& client : endpoint->GetClients()) {
                        if (client->GetTimestamp() == maxTs) {
                                client->SendMessage(lmessage);
+                       } else {
+                               client->Disconnect();
                        }
                }
 
index 3840a8c893cd32f941e12855f2365b43bf6e309e..ff2e67ebf5f4e91ad8ece751234d4e9f6fb68c21 100644 (file)
@@ -27,32 +27,28 @@ JsonRpcConnection::JsonRpcConnection(const String& identity, bool authenticated,
        const std::shared_ptr<AsioTlsStream>& stream, ConnectionRole role)
        : m_Identity(identity), m_Authenticated(authenticated), m_Stream(stream),
        m_Role(role), m_Timestamp(Utility::GetTime()), m_IoStrand(stream->get_io_service()),
-       m_OutgoingMessagesQueued(stream->get_io_service()), m_ReaderHasError(false), m_RunningCoroutines(0)
+       m_OutgoingMessagesQueued(stream->get_io_service()), m_WriterDone(stream->get_io_service()), m_ShuttingDown(false)
 {
        if (authenticated)
                m_Endpoint = Endpoint::GetByName(identity);
 
        m_OutgoingMessagesQueued.expires_at(boost::posix_time::pos_infin);
+       m_WriterDone.expires_at(boost::posix_time::pos_infin);
 }
 
 void JsonRpcConnection::Start()
 {
        namespace asio = boost::asio;
 
-       m_RunningCoroutines = 2;
+       JsonRpcConnection::Ptr preventGc (this);
 
-       asio::spawn(m_IoStrand, [this](asio::yield_context yc) { HandleIncomingMessages(yc); });
-       asio::spawn(m_IoStrand, [this](asio::yield_context yc) { WriteOutgoingMessages(yc); });
+       asio::spawn(m_IoStrand, [this, preventGc](asio::yield_context yc) { HandleIncomingMessages(yc); });
+       asio::spawn(m_IoStrand, [this, preventGc](asio::yield_context yc) { WriteOutgoingMessages(yc); });
 }
 
 void JsonRpcConnection::HandleIncomingMessages(boost::asio::yield_context yc)
 {
-       Defer shutdownStreamOnce ([this, &yc]() {
-               m_ReaderHasError = true;
-               m_OutgoingMessagesQueued.expires_at(boost::posix_time::neg_infin);
-
-               ShutdownStreamOnce(yc);
-       });
+       Defer disconnect ([this]() { Disconnect(); });
 
        for (;;) {
                String message;
@@ -60,9 +56,11 @@ void JsonRpcConnection::HandleIncomingMessages(boost::asio::yield_context yc)
                try {
                        message = JsonRpc::ReadMessage(m_Stream, yc, m_Endpoint ? -1 : 1024 * 1024);
                } catch (const std::exception& ex) {
-                       Log(LogWarning, "JsonRpcConnection")
-                               << "Error while reading JSON-RPC message for identity '" << m_Identity
-                               << "': " << DiagnosticInformation(ex);
+                       if (!m_ShuttingDown) {
+                               Log(LogWarning, "JsonRpcConnection")
+                                       << "Error while reading JSON-RPC message for identity '" << m_Identity
+                                       << "': " << DiagnosticInformation(ex);
+                       }
 
                        break;
                }
@@ -72,9 +70,11 @@ void JsonRpcConnection::HandleIncomingMessages(boost::asio::yield_context yc)
 
                        MessageHandler(message);
                } catch (const std::exception& ex) {
-                       Log(LogWarning, "JsonRpcConnection")
-                               << "Error while processing JSON-RPC message for identity '" << m_Identity
-                               << "': " << DiagnosticInformation(ex);
+                       if (!m_ShuttingDown) {
+                               Log(LogWarning, "JsonRpcConnection")
+                                       << "Error while processing JSON-RPC message for identity '" << m_Identity
+                                       << "': " << DiagnosticInformation(ex);
+                       }
 
                        break;
                }
@@ -83,7 +83,9 @@ void JsonRpcConnection::HandleIncomingMessages(boost::asio::yield_context yc)
 
 void JsonRpcConnection::WriteOutgoingMessages(boost::asio::yield_context yc)
 {
-       Defer shutdownStreamOnce ([this, &yc]() { ShutdownStreamOnce(yc); });
+       Defer disconnect ([this]() { Disconnect(); });
+
+       Defer signalWriterDone ([this]() { m_WriterDone.expires_at(boost::posix_time::neg_infin); });
 
        do {
                try {
@@ -108,36 +110,17 @@ void JsonRpcConnection::WriteOutgoingMessages(boost::asio::yield_context yc)
 
                                m_Stream->async_flush(yc);
                        } catch (const std::exception& ex) {
-                               std::ostringstream info;
-                               info << "Error while sending JSON-RPC message for identity '" << m_Identity << "'";
-                               Log(LogWarning, "JsonRpcConnection")
-                                       << info.str() << "\n" << DiagnosticInformation(ex);
+                               if (!m_ShuttingDown) {
+                                       std::ostringstream info;
+                                       info << "Error while sending JSON-RPC message for identity '" << m_Identity << "'";
+                                       Log(LogWarning, "JsonRpcConnection")
+                                               << info.str() << "\n" << DiagnosticInformation(ex);
+                               }
 
                                break;
                        }
                }
-       } while (!m_ReaderHasError);
-}
-
-void JsonRpcConnection::ShutdownStreamOnce(boost::asio::yield_context& yc)
-{
-       if (!--m_RunningCoroutines) {
-               try {
-                       m_Stream->next_layer().async_shutdown(yc);
-               } catch (...) {
-                       // https://stackoverflow.com/questions/130117/throwing-exceptions-out-of-a-destructor
-               }
-
-               Log(LogWarning, "JsonRpcConnection")
-                       << "API client disconnected for identity '" << m_Identity << "'";
-
-               if (m_Endpoint) {
-                       m_Endpoint->RemoveClient(this);
-               } else {
-                       auto listener (ApiListener::GetInstance());
-                       listener->RemoveAnonymousClient(this);
-               }
-       }
+       } while (!m_ShuttingDown);
 }
 
 double JsonRpcConnection::GetTimestamp() const
@@ -178,6 +161,46 @@ void JsonRpcConnection::SendMessage(const Dictionary::Ptr& message)
        });
 }
 
+void JsonRpcConnection::Disconnect()
+{
+       namespace asio = boost::asio;
+
+       JsonRpcConnection::Ptr preventGc (this);
+
+       asio::spawn(m_IoStrand, [this, preventGc](asio::yield_context yc) {
+               if (!m_ShuttingDown) {
+                       m_ShuttingDown = true;
+
+                       Log(LogWarning, "JsonRpcConnection")
+                               << "API client disconnected for identity '" << m_Identity << "'";
+
+                       m_OutgoingMessagesQueued.expires_at(boost::posix_time::neg_infin);
+
+                       try {
+                               m_WriterDone.async_wait(yc);
+                       } catch (...) {
+                       }
+
+                       try {
+                               m_Stream->next_layer().async_shutdown(yc);
+                       } catch (...) {
+                       }
+
+                       try {
+                               m_Stream->lowest_layer().shutdown(m_Stream->lowest_layer().shutdown_both);
+                       } catch (...) {
+                       }
+
+                       if (m_Endpoint) {
+                               m_Endpoint->RemoveClient(this);
+                       } else {
+                               auto listener (ApiListener::GetInstance());
+                               listener->RemoveAnonymousClient(this);
+                       }
+               }
+       });
+}
+
 void JsonRpcConnection::MessageHandler(const String& jsonString)
 {
        Dictionary::Ptr message = JsonRpc::DecodeMessage(jsonString);
index 5263b30acbe95561ababaafe28e4e874ef8e011a..51787244e992f2e511571ce1ec5c112f96ab6498 100644 (file)
@@ -52,6 +52,8 @@ public:
        std::shared_ptr<AsioTlsStream> GetStream() const;
        ConnectionRole GetRole() const;
 
+       void Disconnect();
+
        void SendMessage(const Dictionary::Ptr& request);
 
        static Value HeartbeatAPIHandler(const intrusive_ptr<MessageOrigin>& origin, const Dictionary::Ptr& params);
@@ -68,12 +70,11 @@ private:
        boost::asio::io_service::strand m_IoStrand;
        std::vector<Dictionary::Ptr> m_OutgoingMessagesQueue;
        boost::asio::deadline_timer m_OutgoingMessagesQueued;
-       bool m_ReaderHasError;
-       unsigned char m_RunningCoroutines;
+       boost::asio::deadline_timer m_WriterDone;
+       bool m_ShuttingDown;
 
        void HandleIncomingMessages(boost::asio::yield_context yc);
        void WriteOutgoingMessages(boost::asio::yield_context yc);
-       void ShutdownStreamOnce(boost::asio::yield_context& yc);
 
        bool ProcessMessage();
        void MessageHandler(const String& jsonString);