]> granicus.if.org Git - icinga2/commitdiff
Port JsonRpcConnection to Boost ASIO
authorAlexander A. Klimov <alexander.klimov@icinga.com>
Tue, 19 Feb 2019 12:57:36 +0000 (13:57 +0100)
committerAlexander A. Klimov <alexander.klimov@icinga.com>
Mon, 1 Apr 2019 09:40:14 +0000 (11:40 +0200)
lib/remote/apilistener-configsync.cpp
lib/remote/apilistener.cpp
lib/remote/jsonrpcconnection-heartbeat.cpp
lib/remote/jsonrpcconnection-pki.cpp
lib/remote/jsonrpcconnection.cpp
lib/remote/jsonrpcconnection.hpp

index d4caff91a009a1391eba9988e1c8e81be35feee8..e06d1d88775b70b40154754b2ae88734043f072e 100644 (file)
@@ -323,7 +323,7 @@ void ApiListener::UpdateConfigObject(const ConfigObject::Ptr& object, const Mess
 #endif /* I2_DEBUG */
 
        if (client)
-               JsonRpc::SendMessage(client->GetStream(), message);
+               client->SendMessage(message);
        else {
                Zone::Ptr target = static_pointer_cast<Zone>(object->GetZone());
 
@@ -373,7 +373,7 @@ void ApiListener::DeleteConfigObject(const ConfigObject::Ptr& object, const Mess
 #endif /* I2_DEBUG */
 
        if (client)
-               JsonRpc::SendMessage(client->GetStream(), message);
+               client->SendMessage(message);
        else {
                Zone::Ptr target = static_pointer_cast<Zone>(object->GetZone());
 
index e0225aa107110732aaa0f8636f80bfd413b009df..87933ae7a12367ea88e35d0c6a1cfb83a3362d42 100644 (file)
@@ -212,16 +212,6 @@ void ApiListener::UpdateSSLContext()
        }
 
        m_SSLContext = context;
-
-       for (const Endpoint::Ptr& endpoint : ConfigType::GetObjectsByType<Endpoint>()) {
-               for (const JsonRpcConnection::Ptr& client : endpoint->GetClients()) {
-                       client->Disconnect();
-               }
-       }
-
-       for (const JsonRpcConnection::Ptr& client : m_AnonymousClients) {
-               client->Disconnect();
-       }
 }
 
 void ApiListener::OnAllConfigLoaded()
@@ -669,7 +659,33 @@ void ApiListener::NewClientHandlerInternal(boost::asio::yield_context yc, const
                }
        }
 
-       if (ctype != ClientJsonRpc) {
+       if (ctype == ClientJsonRpc) {
+               Log(LogNotice, "ApiListener", "New JSON-RPC client");
+
+               JsonRpcConnection::Ptr aclient = new JsonRpcConnection(identity, verify_ok, client, role);
+
+               if (endpoint) {
+                       bool needSync = !endpoint->GetConnected();
+
+                       endpoint->AddClient(aclient);
+
+                       asio::spawn(client->get_io_service(), [this, aclient, endpoint, needSync](asio::yield_context yc) {
+                               CpuBoundWork syncClient (yc);
+
+                               SyncClient(aclient, endpoint, needSync);
+                       });
+               } else if (!AddAnonymousClient(aclient)) {
+                       Log(LogNotice, "ApiListener")
+                               << "Ignoring anonymous JSON-RPC connection " << conninfo
+                               << ". Max connections (" << GetMaxAnonymousClients() << ") exceeded.";
+
+                       aclient = nullptr;
+               }
+
+               if (aclient) {
+                       aclient->Start();
+               }
+       } else {
                Log(LogNotice, "ApiListener", "New HTTP client");
 
                HttpServerConnection::Ptr aclient = new HttpServerConnection(identity, verify_ok, client);
@@ -810,10 +826,9 @@ void ApiListener::ApiTimerHandler()
                }
 
                for (const JsonRpcConnection::Ptr& client : endpoint->GetClients()) {
-                       if (client->GetTimestamp() != maxTs)
-                               client->Disconnect();
-                       else
+                       if (client->GetTimestamp() == maxTs) {
                                client->SendMessage(lmessage);
+                       }
                }
 
                Log(LogNotice, "ApiListener")
@@ -1280,8 +1295,7 @@ void ApiListener::ReplayLog(const JsonRpcConnection::Ptr& client)
                                }
 
                                try  {
-                                       size_t bytesSent = NetString::WriteStringToStream(client->GetStream(), pmessage->Get("message"));
-                                       endpoint->AddMessageSent(bytesSent);
+                                       client->SendMessage(JsonDecode(pmessage->Get("message")));
                                        count++;
                                } catch (const std::exception& ex) {
                                        Log(LogWarning, "ApiListener")
@@ -1306,8 +1320,7 @@ void ApiListener::ReplayLog(const JsonRpcConnection::Ptr& client)
                                                }) }
                                        });
 
-                                       size_t bytesSent = JsonRpc::SendMessage(client->GetStream(), lmessage);
-                                       endpoint->AddMessageSent(bytesSent);
+                                       client->SendMessage(lmessage);
                                }
                        }
 
@@ -1426,11 +1439,8 @@ std::pair<Dictionary::Ptr, Dictionary::Ptr> ApiListener::GetStatus()
        /* connection stats */
        size_t jsonRpcAnonymousClients = GetAnonymousClients().size();
        size_t httpClients = GetHttpClients().size();
-       size_t workQueueItems = JsonRpcConnection::GetWorkQueueLength();
-       size_t workQueueCount = JsonRpcConnection::GetWorkQueueCount();
        size_t syncQueueItems = m_SyncQueue.GetLength();
        size_t relayQueueItems = m_RelayQueue.GetLength();
-       double workQueueItemRate = JsonRpcConnection::GetWorkQueueRate();
        double syncQueueItemRate = m_SyncQueue.GetTaskCount(60) / 60.0;
        double relayQueueItemRate = m_RelayQueue.GetTaskCount(60) / 60.0;
 
@@ -1446,11 +1456,8 @@ std::pair<Dictionary::Ptr, Dictionary::Ptr> ApiListener::GetStatus()
 
                { "json_rpc", new Dictionary({
                        { "anonymous_clients", jsonRpcAnonymousClients },
-                       { "work_queue_items", workQueueItems },
-                       { "work_queue_count", workQueueCount },
                        { "sync_queue_items", syncQueueItems },
                        { "relay_queue_items", relayQueueItems },
-                       { "work_queue_item_rate", workQueueItemRate },
                        { "sync_queue_item_rate", syncQueueItemRate },
                        { "relay_queue_item_rate", relayQueueItemRate }
                }) },
@@ -1467,12 +1474,9 @@ std::pair<Dictionary::Ptr, Dictionary::Ptr> ApiListener::GetStatus()
 
        perfdata->Set("num_json_rpc_anonymous_clients", jsonRpcAnonymousClients);
        perfdata->Set("num_http_clients", httpClients);
-       perfdata->Set("num_json_rpc_work_queue_items", workQueueItems);
-       perfdata->Set("num_json_rpc_work_queue_count", workQueueCount);
        perfdata->Set("num_json_rpc_sync_queue_items", syncQueueItems);
        perfdata->Set("num_json_rpc_relay_queue_items", relayQueueItems);
 
-       perfdata->Set("num_json_rpc_work_queue_item_rate", workQueueItemRate);
        perfdata->Set("num_json_rpc_sync_queue_item_rate", syncQueueItemRate);
        perfdata->Set("num_json_rpc_relay_queue_item_rate", relayQueueItemRate);
 
index 5b466830c788d6d54f91048b8905793b23660ce0..91fa518915edea16c3c9fe6ac3030f3bd8158396 100644 (file)
@@ -12,41 +12,8 @@ using namespace icinga;
 
 REGISTER_APIFUNCTION(Heartbeat, event, &JsonRpcConnection::HeartbeatAPIHandler);
 
-void JsonRpcConnection::HeartbeatTimerHandler()
-{
-       for (const Endpoint::Ptr& endpoint : ConfigType::GetObjectsByType<Endpoint>()) {
-               for (const JsonRpcConnection::Ptr& client : endpoint->GetClients()) {
-                       if (client->m_NextHeartbeat != 0 && client->m_NextHeartbeat < Utility::GetTime()) {
-                               Log(LogWarning, "JsonRpcConnection")
-                                       << "Client for endpoint '" << endpoint->GetName() << "' has requested "
-                                       << "heartbeat message but hasn't responded in time. Closing connection.";
-
-                               client->Disconnect();
-                               continue;
-                       }
-
-                       Dictionary::Ptr request = new Dictionary({
-                               { "jsonrpc", "2.0" },
-                               { "method", "event::Heartbeat" },
-                               { "params", new Dictionary({
-                                       { "timeout", 120 }
-                               }) }
-                       });
-
-                       client->SendMessage(request);
-               }
-       }
-}
-
 Value JsonRpcConnection::HeartbeatAPIHandler(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params)
 {
-       Value vtimeout = params->Get("timeout");
-
-       if (!vtimeout.IsEmpty()) {
-               origin->FromClient->m_NextHeartbeat = Utility::GetTime() + vtimeout;
-               origin->FromClient->m_HeartbeatTimeout = vtimeout;
-       }
-
        return Empty;
 }
 
index 8eb82ed404900933b106da9cae1eed6185eac6c8..66f88479b171c9f1898d48576a472ff5144166d6 100644 (file)
@@ -13,6 +13,8 @@
 #include <boost/thread/once.hpp>
 #include <boost/regex.hpp>
 #include <fstream>
+#include <openssl/ssl.h>
+#include <openssl/x509.h>
 
 using namespace icinga;
 
@@ -30,10 +32,12 @@ Value RequestCertificateHandler(const MessageOrigin::Ptr& origin, const Dictiona
        Dictionary::Ptr result = new Dictionary();
 
        /* Use the presented client certificate if not provided. */
-       if (certText.IsEmpty())
-               cert = origin->FromClient->GetStream()->GetPeerCertificate();
-       else
+       if (certText.IsEmpty()) {
+               auto stream (origin->FromClient->GetStream());
+               cert = std::shared_ptr<X509>(SSL_get_peer_certificate(stream->next_layer().native_handle()), X509_free);
+       } else {
                cert = StringToCertificate(certText);
+       }
 
        if (!cert) {
                Log(LogWarning, "JsonRpcConnection") << "No certificate or CSR received";
@@ -121,7 +125,7 @@ Value RequestCertificateHandler(const MessageOrigin::Ptr& origin, const Dictiona
                                { "method", "pki::UpdateCertificate" },
                                { "params", result }
                        });
-                       JsonRpc::SendMessage(client->GetStream(), message);
+                       client->SendMessage(message);
 
                        return result;
                }
@@ -192,7 +196,7 @@ Value RequestCertificateHandler(const MessageOrigin::Ptr& origin, const Dictiona
                { "method", "pki::UpdateCertificate" },
                { "params", result }
        });
-       JsonRpc::SendMessage(client->GetStream(), message);
+       client->SendMessage(message);
 
        return result;
 
@@ -255,7 +259,7 @@ void JsonRpcConnection::SendCertificateRequest(const JsonRpcConnection::Ptr& acl
         * or b) the local zone and all parents.
         */
        if (aclient)
-               JsonRpc::SendMessage(aclient->GetStream(), message);
+               aclient->SendMessage(message);
        else
                listener->RelayMessage(origin, Zone::GetLocalZone(), message, false);
 }
index 99d0d7febc23599f00e53eabd74ab4a1d5c0f26b..3840a8c893cd32f941e12855f2365b43bf6e309e 100644 (file)
@@ -5,11 +5,17 @@
 #include "remote/apifunction.hpp"
 #include "remote/jsonrpc.hpp"
 #include "base/configtype.hpp"
+#include "base/io-engine.hpp"
 #include "base/objectlock.hpp"
 #include "base/utility.hpp"
 #include "base/logger.hpp"
 #include "base/exception.hpp"
 #include "base/convert.hpp"
+#include "base/tlsstream.hpp"
+#include <memory>
+#include <utility>
+#include <boost/asio/spawn.hpp>
+#include <boost/date_time/posix_time/ptime.hpp>
 #include <boost/thread/once.hpp>
 
 using namespace icinga;
@@ -17,50 +23,121 @@ using namespace icinga;
 static Value SetLogPositionHandler(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params);
 REGISTER_APIFUNCTION(SetLogPosition, log, &SetLogPositionHandler);
 
-static boost::once_flag l_JsonRpcConnectionOnceFlag = BOOST_ONCE_INIT;
-static Timer::Ptr l_JsonRpcConnectionTimeoutTimer;
-static WorkQueue *l_JsonRpcConnectionWorkQueues;
-static size_t l_JsonRpcConnectionWorkQueueCount;
-static int l_JsonRpcConnectionNextID;
-static Timer::Ptr l_HeartbeatTimer;
-
 JsonRpcConnection::JsonRpcConnection(const String& identity, bool authenticated,
-       TlsStream::Ptr stream, ConnectionRole role)
-       : m_ID(l_JsonRpcConnectionNextID++), m_Identity(identity), m_Authenticated(authenticated), m_Stream(std::move(stream)),
-       m_Role(role), m_Timestamp(Utility::GetTime()), m_Seen(Utility::GetTime()), m_NextHeartbeat(0), m_HeartbeatTimeout(0)
+       const std::shared_ptr<AsioTlsStream>& stream, ConnectionRole role)
+       : m_Identity(identity), m_Authenticated(authenticated), m_Stream(stream),
+       m_Role(role), m_Timestamp(Utility::GetTime()), m_IoStrand(stream->get_io_service()),
+       m_OutgoingMessagesQueued(stream->get_io_service()), m_ReaderHasError(false), m_RunningCoroutines(0)
 {
-       boost::call_once(l_JsonRpcConnectionOnceFlag, &JsonRpcConnection::StaticInitialize);
-
        if (authenticated)
                m_Endpoint = Endpoint::GetByName(identity);
+
+       m_OutgoingMessagesQueued.expires_at(boost::posix_time::pos_infin);
+}
+
+void JsonRpcConnection::Start()
+{
+       namespace asio = boost::asio;
+
+       m_RunningCoroutines = 2;
+
+       asio::spawn(m_IoStrand, [this](asio::yield_context yc) { HandleIncomingMessages(yc); });
+       asio::spawn(m_IoStrand, [this](asio::yield_context yc) { WriteOutgoingMessages(yc); });
 }
 
-void JsonRpcConnection::StaticInitialize()
+void JsonRpcConnection::HandleIncomingMessages(boost::asio::yield_context yc)
 {
-       l_JsonRpcConnectionTimeoutTimer = new Timer();
-       l_JsonRpcConnectionTimeoutTimer->OnTimerExpired.connect(std::bind(&JsonRpcConnection::TimeoutTimerHandler));
-       l_JsonRpcConnectionTimeoutTimer->SetInterval(15);
-       l_JsonRpcConnectionTimeoutTimer->Start();
+       Defer shutdownStreamOnce ([this, &yc]() {
+               m_ReaderHasError = true;
+               m_OutgoingMessagesQueued.expires_at(boost::posix_time::neg_infin);
 
-       l_JsonRpcConnectionWorkQueueCount = Configuration::Concurrency;
-       l_JsonRpcConnectionWorkQueues = new WorkQueue[l_JsonRpcConnectionWorkQueueCount];
+               ShutdownStreamOnce(yc);
+       });
 
-       for (size_t i = 0; i < l_JsonRpcConnectionWorkQueueCount; i++) {
-               l_JsonRpcConnectionWorkQueues[i].SetName("JsonRpcConnection, #" + Convert::ToString(i));
+       for (;;) {
+               String message;
+
+               try {
+                       message = JsonRpc::ReadMessage(m_Stream, yc, m_Endpoint ? -1 : 1024 * 1024);
+               } catch (const std::exception& ex) {
+                       Log(LogWarning, "JsonRpcConnection")
+                               << "Error while reading JSON-RPC message for identity '" << m_Identity
+                               << "': " << DiagnosticInformation(ex);
+
+                       break;
+               }
+
+               try {
+                       CpuBoundWork handleMessage (yc);
+
+                       MessageHandler(message);
+               } catch (const std::exception& ex) {
+                       Log(LogWarning, "JsonRpcConnection")
+                               << "Error while processing JSON-RPC message for identity '" << m_Identity
+                               << "': " << DiagnosticInformation(ex);
+
+                       break;
+               }
        }
+}
+
+void JsonRpcConnection::WriteOutgoingMessages(boost::asio::yield_context yc)
+{
+       Defer shutdownStreamOnce ([this, &yc]() { ShutdownStreamOnce(yc); });
+
+       do {
+               try {
+                       m_OutgoingMessagesQueued.async_wait(yc);
+               } catch (...) {
+               }
+
+               auto queue (std::move(m_OutgoingMessagesQueue));
+
+               m_OutgoingMessagesQueue.clear();
+               m_OutgoingMessagesQueued.expires_at(boost::posix_time::pos_infin);
 
-       l_HeartbeatTimer = new Timer();
-       l_HeartbeatTimer->OnTimerExpired.connect(std::bind(&JsonRpcConnection::HeartbeatTimerHandler));
-       l_HeartbeatTimer->SetInterval(10);
-       l_HeartbeatTimer->Start();
+               if (!queue.empty()) {
+                       try {
+                               for (auto& message : queue) {
+                                       size_t bytesSent = JsonRpc::SendMessage(m_Stream, message, yc);
+
+                                       if (m_Endpoint) {
+                                               m_Endpoint->AddMessageSent(bytesSent);
+                                       }
+                               }
+
+                               m_Stream->async_flush(yc);
+                       } catch (const std::exception& ex) {
+                               std::ostringstream info;
+                               info << "Error while sending JSON-RPC message for identity '" << m_Identity << "'";
+                               Log(LogWarning, "JsonRpcConnection")
+                                       << info.str() << "\n" << DiagnosticInformation(ex);
+
+                               break;
+                       }
+               }
+       } while (!m_ReaderHasError);
 }
 
-void JsonRpcConnection::Start()
+void JsonRpcConnection::ShutdownStreamOnce(boost::asio::yield_context& yc)
 {
-       /* the stream holds an owning reference to this object through the callback we're registering here */
-       m_Stream->RegisterDataHandler(std::bind(&JsonRpcConnection::DataAvailableHandler, JsonRpcConnection::Ptr(this)));
-       if (m_Stream->IsDataAvailable())
-               DataAvailableHandler();
+       if (!--m_RunningCoroutines) {
+               try {
+                       m_Stream->next_layer().async_shutdown(yc);
+               } catch (...) {
+                       // https://stackoverflow.com/questions/130117/throwing-exceptions-out-of-a-destructor
+               }
+
+               Log(LogWarning, "JsonRpcConnection")
+                       << "API client disconnected for identity '" << m_Identity << "'";
+
+               if (m_Endpoint) {
+                       m_Endpoint->RemoveClient(this);
+               } else {
+                       auto listener (ApiListener::GetInstance());
+                       listener->RemoveAnonymousClient(this);
+               }
+       }
 }
 
 double JsonRpcConnection::GetTimestamp() const
@@ -83,7 +160,7 @@ Endpoint::Ptr JsonRpcConnection::GetEndpoint() const
        return m_Endpoint;
 }
 
-TlsStream::Ptr JsonRpcConnection::GetStream() const
+std::shared_ptr<AsioTlsStream> JsonRpcConnection::GetStream() const
 {
        return m_Stream;
 }
@@ -95,69 +172,16 @@ ConnectionRole JsonRpcConnection::GetRole() const
 
 void JsonRpcConnection::SendMessage(const Dictionary::Ptr& message)
 {
-       try {
-               ObjectLock olock(m_Stream);
-
-               if (m_Stream->IsEof())
-                       return;
-
-               size_t bytesSent = JsonRpc::SendMessage(m_Stream, message);
-
-               if (m_Endpoint)
-                       m_Endpoint->AddMessageSent(bytesSent);
-
-       } catch (const std::exception& ex) {
-               std::ostringstream info;
-               info << "Error while sending JSON-RPC message for identity '" << m_Identity << "'";
-               Log(LogWarning, "JsonRpcConnection")
-                       << info.str() << "\n" << DiagnosticInformation(ex);
-
-               Disconnect();
-       }
-}
-
-void JsonRpcConnection::Disconnect()
-{
-       Log(LogWarning, "JsonRpcConnection")
-               << "API client disconnected for identity '" << m_Identity << "'";
-
-       m_Stream->Close();
-
-       if (m_Endpoint)
-               m_Endpoint->RemoveClient(this);
-       else {
-               ApiListener::Ptr listener = ApiListener::GetInstance();
-               listener->RemoveAnonymousClient(this);
-       }
-}
-
-void JsonRpcConnection::MessageHandlerWrapper(const String& jsonString)
-{
-       if (m_Stream->IsEof())
-               return;
-
-       try {
-               MessageHandler(jsonString);
-       } catch (const std::exception& ex) {
-               Log(LogWarning, "JsonRpcConnection")
-                       << "Error while reading JSON-RPC message for identity '" << m_Identity
-                       << "': " << DiagnosticInformation(ex);
-
-               Disconnect();
-
-               return;
-       }
+       m_IoStrand.post([this, message]() {
+               m_OutgoingMessagesQueue.emplace_back(message);
+               m_OutgoingMessagesQueued.expires_at(boost::posix_time::neg_infin);
+       });
 }
 
 void JsonRpcConnection::MessageHandler(const String& jsonString)
 {
        Dictionary::Ptr message = JsonRpc::DecodeMessage(jsonString);
 
-       m_Seen = Utility::GetTime();
-
-       if (m_HeartbeatTimeout != 0)
-               m_NextHeartbeat = Utility::GetTime() + m_HeartbeatTimeout;
-
        if (m_Endpoint && message->Contains("ts")) {
                double ts = message->Get("ts");
 
@@ -225,57 +249,10 @@ void JsonRpcConnection::MessageHandler(const String& jsonString)
        if (message->Contains("id")) {
                resultMessage->Set("jsonrpc", "2.0");
                resultMessage->Set("id", message->Get("id"));
-               SendMessage(resultMessage);
-       }
-}
-
-bool JsonRpcConnection::ProcessMessage()
-{
-       /* Limit for anonymous clients (signing requests and not configured endpoints. */
-       ssize_t maxMessageLength = 1024 * 1024;
-
-       if (m_Endpoint)
-               maxMessageLength = -1; /* no limit */
-
-       String message;
-
-       StreamReadStatus srs = JsonRpc::ReadMessage(m_Stream, &message, m_Context, false, maxMessageLength);
-
-       if (srs != StatusNewItem)
-               return false;
-
-       l_JsonRpcConnectionWorkQueues[m_ID % l_JsonRpcConnectionWorkQueueCount].Enqueue(std::bind(&JsonRpcConnection::MessageHandlerWrapper, JsonRpcConnection::Ptr(this), message));
-
-       return true;
-}
-
-void JsonRpcConnection::DataAvailableHandler()
-{
-       bool close = false;
-
-       if (!m_Stream)
-               return;
-
-       if (!m_Stream->IsEof()) {
-               boost::mutex::scoped_lock lock(m_DataHandlerMutex);
-
-               try {
-                       while (ProcessMessage())
-                               ; /* empty loop body */
-               } catch (const std::exception& ex) {
-                       Log(LogWarning, "JsonRpcConnection")
-                               << "Error while reading JSON-RPC message for identity '" << m_Identity
-                               << "': " << DiagnosticInformation(ex);
-
-                       Disconnect();
-
-                       return;
-               }
-       } else
-               close = true;
 
-       if (close)
-               Disconnect();
+               m_OutgoingMessagesQueue.emplace_back(resultMessage);
+               m_OutgoingMessagesQueued.expires_at(boost::posix_time::neg_infin);
+       }
 }
 
 Value SetLogPositionHandler(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params)
@@ -292,57 +269,3 @@ Value SetLogPositionHandler(const MessageOrigin::Ptr& origin, const Dictionary::
        return Empty;
 }
 
-void JsonRpcConnection::CheckLiveness()
-{
-       if (m_Seen < Utility::GetTime() - 60 && (!m_Endpoint || !m_Endpoint->GetSyncing())) {
-               Log(LogInformation, "JsonRpcConnection")
-                       <<  "No messages for identity '" << m_Identity << "' have been received in the last 60 seconds.";
-               Disconnect();
-       }
-}
-
-void JsonRpcConnection::TimeoutTimerHandler()
-{
-       ApiListener::Ptr listener = ApiListener::GetInstance();
-
-       for (const JsonRpcConnection::Ptr& client : listener->GetAnonymousClients()) {
-               client->CheckLiveness();
-       }
-
-       for (const Endpoint::Ptr& endpoint : ConfigType::GetObjectsByType<Endpoint>()) {
-               for (const JsonRpcConnection::Ptr& client : endpoint->GetClients()) {
-                       client->CheckLiveness();
-               }
-       }
-}
-
-size_t JsonRpcConnection::GetWorkQueueCount()
-{
-       return l_JsonRpcConnectionWorkQueueCount;
-}
-
-size_t JsonRpcConnection::GetWorkQueueLength()
-{
-       size_t itemCount = 0;
-
-       for (size_t i = 0; i < GetWorkQueueCount(); i++)
-               itemCount += l_JsonRpcConnectionWorkQueues[i].GetLength();
-
-       return itemCount;
-}
-
-double JsonRpcConnection::GetWorkQueueRate()
-{
-       double rate = 0.0;
-       size_t count = GetWorkQueueCount();
-
-       /* If this is a standalone environment, we don't have any queues. */
-       if (count == 0)
-               return 0.0;
-
-       for (size_t i = 0; i < count; i++)
-               rate += l_JsonRpcConnectionWorkQueues[i].GetTaskCount(60) / 60.0;
-
-       return rate / count;
-}
-
index 40df9af13d2308ca9af394b70385af11ca10c54d..5263b30acbe95561ababaafe28e4e874ef8e011a 100644 (file)
@@ -8,6 +8,11 @@
 #include "base/tlsstream.hpp"
 #include "base/timer.hpp"
 #include "base/workqueue.hpp"
+#include <memory>
+#include <vector>
+#include <boost/asio/io_service_strand.hpp>
+#include <boost/asio/spawn.hpp>
+#include <boost/asio/deadline_timer.hpp>
 
 namespace icinga
 {
@@ -36,7 +41,7 @@ class JsonRpcConnection final : public Object
 public:
        DECLARE_PTR_TYPEDEFS(JsonRpcConnection);
 
-       JsonRpcConnection(const String& identity, bool authenticated, TlsStream::Ptr stream, ConnectionRole role);
+       JsonRpcConnection(const String& identity, bool authenticated, const std::shared_ptr<AsioTlsStream>& stream, ConnectionRole role);
 
        void Start();
 
@@ -44,45 +49,34 @@ public:
        String GetIdentity() const;
        bool IsAuthenticated() const;
        Endpoint::Ptr GetEndpoint() const;
-       TlsStream::Ptr GetStream() const;
+       std::shared_ptr<AsioTlsStream> GetStream() const;
        ConnectionRole GetRole() const;
 
-       void Disconnect();
-
        void SendMessage(const Dictionary::Ptr& request);
 
-       static void HeartbeatTimerHandler();
        static Value HeartbeatAPIHandler(const intrusive_ptr<MessageOrigin>& origin, const Dictionary::Ptr& params);
 
-       static size_t GetWorkQueueCount();
-       static size_t GetWorkQueueLength();
-       static double GetWorkQueueRate();
-
        static void SendCertificateRequest(const JsonRpcConnection::Ptr& aclient, const intrusive_ptr<MessageOrigin>& origin, const String& path);
 
 private:
-       int m_ID;
        String m_Identity;
        bool m_Authenticated;
        Endpoint::Ptr m_Endpoint;
-       TlsStream::Ptr m_Stream;
+       std::shared_ptr<AsioTlsStream> m_Stream;
        ConnectionRole m_Role;
        double m_Timestamp;
-       double m_Seen;
-       double m_NextHeartbeat;
-       double m_HeartbeatTimeout;
-       boost::mutex m_DataHandlerMutex;
+       boost::asio::io_service::strand m_IoStrand;
+       std::vector<Dictionary::Ptr> m_OutgoingMessagesQueue;
+       boost::asio::deadline_timer m_OutgoingMessagesQueued;
+       bool m_ReaderHasError;
+       unsigned char m_RunningCoroutines;
 
-       StreamReadContext m_Context;
+       void HandleIncomingMessages(boost::asio::yield_context yc);
+       void WriteOutgoingMessages(boost::asio::yield_context yc);
+       void ShutdownStreamOnce(boost::asio::yield_context& yc);
 
        bool ProcessMessage();
-       void MessageHandlerWrapper(const String& jsonString);
        void MessageHandler(const String& jsonString);
-       void DataAvailableHandler();
-
-       static void StaticInitialize();
-       static void TimeoutTimerHandler();
-       void CheckLiveness();
 
        void CertificateRequestResponseHandler(const Dictionary::Ptr& message);
 };