From 6c86c127f1a3bf6f33d6b59e306d48e6f4d4d3b2 Mon Sep 17 00:00:00 2001 From: "Alexander A. Klimov" Date: Tue, 19 Feb 2019 13:57:36 +0100 Subject: [PATCH] Port JsonRpcConnection to Boost ASIO --- lib/remote/apilistener-configsync.cpp | 4 +- lib/remote/apilistener.cpp | 58 ++-- lib/remote/jsonrpcconnection-heartbeat.cpp | 33 --- lib/remote/jsonrpcconnection-pki.cpp | 16 +- lib/remote/jsonrpcconnection.cpp | 307 ++++++++------------- lib/remote/jsonrpcconnection.hpp | 38 ++- 6 files changed, 174 insertions(+), 282 deletions(-) diff --git a/lib/remote/apilistener-configsync.cpp b/lib/remote/apilistener-configsync.cpp index d4caff91a..e06d1d887 100644 --- a/lib/remote/apilistener-configsync.cpp +++ b/lib/remote/apilistener-configsync.cpp @@ -323,7 +323,7 @@ void ApiListener::UpdateConfigObject(const ConfigObject::Ptr& object, const Mess #endif /* I2_DEBUG */ if (client) - JsonRpc::SendMessage(client->GetStream(), message); + client->SendMessage(message); else { Zone::Ptr target = static_pointer_cast(object->GetZone()); @@ -373,7 +373,7 @@ void ApiListener::DeleteConfigObject(const ConfigObject::Ptr& object, const Mess #endif /* I2_DEBUG */ if (client) - JsonRpc::SendMessage(client->GetStream(), message); + client->SendMessage(message); else { Zone::Ptr target = static_pointer_cast(object->GetZone()); diff --git a/lib/remote/apilistener.cpp b/lib/remote/apilistener.cpp index e0225aa10..87933ae7a 100644 --- a/lib/remote/apilistener.cpp +++ b/lib/remote/apilistener.cpp @@ -212,16 +212,6 @@ void ApiListener::UpdateSSLContext() } m_SSLContext = context; - - for (const Endpoint::Ptr& endpoint : ConfigType::GetObjectsByType()) { - for (const JsonRpcConnection::Ptr& client : endpoint->GetClients()) { - client->Disconnect(); - } - } - - for (const JsonRpcConnection::Ptr& client : m_AnonymousClients) { - client->Disconnect(); - } } void ApiListener::OnAllConfigLoaded() @@ -669,7 +659,33 @@ void ApiListener::NewClientHandlerInternal(boost::asio::yield_context yc, const } } - if (ctype != ClientJsonRpc) { + if (ctype == ClientJsonRpc) { + Log(LogNotice, "ApiListener", "New JSON-RPC client"); + + JsonRpcConnection::Ptr aclient = new JsonRpcConnection(identity, verify_ok, client, role); + + if (endpoint) { + bool needSync = !endpoint->GetConnected(); + + endpoint->AddClient(aclient); + + asio::spawn(client->get_io_service(), [this, aclient, endpoint, needSync](asio::yield_context yc) { + CpuBoundWork syncClient (yc); + + SyncClient(aclient, endpoint, needSync); + }); + } else if (!AddAnonymousClient(aclient)) { + Log(LogNotice, "ApiListener") + << "Ignoring anonymous JSON-RPC connection " << conninfo + << ". Max connections (" << GetMaxAnonymousClients() << ") exceeded."; + + aclient = nullptr; + } + + if (aclient) { + aclient->Start(); + } + } else { Log(LogNotice, "ApiListener", "New HTTP client"); HttpServerConnection::Ptr aclient = new HttpServerConnection(identity, verify_ok, client); @@ -810,10 +826,9 @@ void ApiListener::ApiTimerHandler() } for (const JsonRpcConnection::Ptr& client : endpoint->GetClients()) { - if (client->GetTimestamp() != maxTs) - client->Disconnect(); - else + if (client->GetTimestamp() == maxTs) { client->SendMessage(lmessage); + } } Log(LogNotice, "ApiListener") @@ -1280,8 +1295,7 @@ void ApiListener::ReplayLog(const JsonRpcConnection::Ptr& client) } try { - size_t bytesSent = NetString::WriteStringToStream(client->GetStream(), pmessage->Get("message")); - endpoint->AddMessageSent(bytesSent); + client->SendMessage(JsonDecode(pmessage->Get("message"))); count++; } catch (const std::exception& ex) { Log(LogWarning, "ApiListener") @@ -1306,8 +1320,7 @@ void ApiListener::ReplayLog(const JsonRpcConnection::Ptr& client) }) } }); - size_t bytesSent = JsonRpc::SendMessage(client->GetStream(), lmessage); - endpoint->AddMessageSent(bytesSent); + client->SendMessage(lmessage); } } @@ -1426,11 +1439,8 @@ std::pair ApiListener::GetStatus() /* connection stats */ size_t jsonRpcAnonymousClients = GetAnonymousClients().size(); size_t httpClients = GetHttpClients().size(); - size_t workQueueItems = JsonRpcConnection::GetWorkQueueLength(); - size_t workQueueCount = JsonRpcConnection::GetWorkQueueCount(); size_t syncQueueItems = m_SyncQueue.GetLength(); size_t relayQueueItems = m_RelayQueue.GetLength(); - double workQueueItemRate = JsonRpcConnection::GetWorkQueueRate(); double syncQueueItemRate = m_SyncQueue.GetTaskCount(60) / 60.0; double relayQueueItemRate = m_RelayQueue.GetTaskCount(60) / 60.0; @@ -1446,11 +1456,8 @@ std::pair ApiListener::GetStatus() { "json_rpc", new Dictionary({ { "anonymous_clients", jsonRpcAnonymousClients }, - { "work_queue_items", workQueueItems }, - { "work_queue_count", workQueueCount }, { "sync_queue_items", syncQueueItems }, { "relay_queue_items", relayQueueItems }, - { "work_queue_item_rate", workQueueItemRate }, { "sync_queue_item_rate", syncQueueItemRate }, { "relay_queue_item_rate", relayQueueItemRate } }) }, @@ -1467,12 +1474,9 @@ std::pair ApiListener::GetStatus() perfdata->Set("num_json_rpc_anonymous_clients", jsonRpcAnonymousClients); perfdata->Set("num_http_clients", httpClients); - perfdata->Set("num_json_rpc_work_queue_items", workQueueItems); - perfdata->Set("num_json_rpc_work_queue_count", workQueueCount); perfdata->Set("num_json_rpc_sync_queue_items", syncQueueItems); perfdata->Set("num_json_rpc_relay_queue_items", relayQueueItems); - perfdata->Set("num_json_rpc_work_queue_item_rate", workQueueItemRate); perfdata->Set("num_json_rpc_sync_queue_item_rate", syncQueueItemRate); perfdata->Set("num_json_rpc_relay_queue_item_rate", relayQueueItemRate); diff --git a/lib/remote/jsonrpcconnection-heartbeat.cpp b/lib/remote/jsonrpcconnection-heartbeat.cpp index 5b466830c..91fa51891 100644 --- a/lib/remote/jsonrpcconnection-heartbeat.cpp +++ b/lib/remote/jsonrpcconnection-heartbeat.cpp @@ -12,41 +12,8 @@ using namespace icinga; REGISTER_APIFUNCTION(Heartbeat, event, &JsonRpcConnection::HeartbeatAPIHandler); -void JsonRpcConnection::HeartbeatTimerHandler() -{ - for (const Endpoint::Ptr& endpoint : ConfigType::GetObjectsByType()) { - for (const JsonRpcConnection::Ptr& client : endpoint->GetClients()) { - if (client->m_NextHeartbeat != 0 && client->m_NextHeartbeat < Utility::GetTime()) { - Log(LogWarning, "JsonRpcConnection") - << "Client for endpoint '" << endpoint->GetName() << "' has requested " - << "heartbeat message but hasn't responded in time. Closing connection."; - - client->Disconnect(); - continue; - } - - Dictionary::Ptr request = new Dictionary({ - { "jsonrpc", "2.0" }, - { "method", "event::Heartbeat" }, - { "params", new Dictionary({ - { "timeout", 120 } - }) } - }); - - client->SendMessage(request); - } - } -} - Value JsonRpcConnection::HeartbeatAPIHandler(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params) { - Value vtimeout = params->Get("timeout"); - - if (!vtimeout.IsEmpty()) { - origin->FromClient->m_NextHeartbeat = Utility::GetTime() + vtimeout; - origin->FromClient->m_HeartbeatTimeout = vtimeout; - } - return Empty; } diff --git a/lib/remote/jsonrpcconnection-pki.cpp b/lib/remote/jsonrpcconnection-pki.cpp index 8eb82ed40..66f88479b 100644 --- a/lib/remote/jsonrpcconnection-pki.cpp +++ b/lib/remote/jsonrpcconnection-pki.cpp @@ -13,6 +13,8 @@ #include #include #include +#include +#include using namespace icinga; @@ -30,10 +32,12 @@ Value RequestCertificateHandler(const MessageOrigin::Ptr& origin, const Dictiona Dictionary::Ptr result = new Dictionary(); /* Use the presented client certificate if not provided. */ - if (certText.IsEmpty()) - cert = origin->FromClient->GetStream()->GetPeerCertificate(); - else + if (certText.IsEmpty()) { + auto stream (origin->FromClient->GetStream()); + cert = std::shared_ptr(SSL_get_peer_certificate(stream->next_layer().native_handle()), X509_free); + } else { cert = StringToCertificate(certText); + } if (!cert) { Log(LogWarning, "JsonRpcConnection") << "No certificate or CSR received"; @@ -121,7 +125,7 @@ Value RequestCertificateHandler(const MessageOrigin::Ptr& origin, const Dictiona { "method", "pki::UpdateCertificate" }, { "params", result } }); - JsonRpc::SendMessage(client->GetStream(), message); + client->SendMessage(message); return result; } @@ -192,7 +196,7 @@ Value RequestCertificateHandler(const MessageOrigin::Ptr& origin, const Dictiona { "method", "pki::UpdateCertificate" }, { "params", result } }); - JsonRpc::SendMessage(client->GetStream(), message); + client->SendMessage(message); return result; @@ -255,7 +259,7 @@ void JsonRpcConnection::SendCertificateRequest(const JsonRpcConnection::Ptr& acl * or b) the local zone and all parents. */ if (aclient) - JsonRpc::SendMessage(aclient->GetStream(), message); + aclient->SendMessage(message); else listener->RelayMessage(origin, Zone::GetLocalZone(), message, false); } diff --git a/lib/remote/jsonrpcconnection.cpp b/lib/remote/jsonrpcconnection.cpp index 99d0d7feb..3840a8c89 100644 --- a/lib/remote/jsonrpcconnection.cpp +++ b/lib/remote/jsonrpcconnection.cpp @@ -5,11 +5,17 @@ #include "remote/apifunction.hpp" #include "remote/jsonrpc.hpp" #include "base/configtype.hpp" +#include "base/io-engine.hpp" #include "base/objectlock.hpp" #include "base/utility.hpp" #include "base/logger.hpp" #include "base/exception.hpp" #include "base/convert.hpp" +#include "base/tlsstream.hpp" +#include +#include +#include +#include #include using namespace icinga; @@ -17,50 +23,121 @@ using namespace icinga; static Value SetLogPositionHandler(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params); REGISTER_APIFUNCTION(SetLogPosition, log, &SetLogPositionHandler); -static boost::once_flag l_JsonRpcConnectionOnceFlag = BOOST_ONCE_INIT; -static Timer::Ptr l_JsonRpcConnectionTimeoutTimer; -static WorkQueue *l_JsonRpcConnectionWorkQueues; -static size_t l_JsonRpcConnectionWorkQueueCount; -static int l_JsonRpcConnectionNextID; -static Timer::Ptr l_HeartbeatTimer; - JsonRpcConnection::JsonRpcConnection(const String& identity, bool authenticated, - TlsStream::Ptr stream, ConnectionRole role) - : m_ID(l_JsonRpcConnectionNextID++), m_Identity(identity), m_Authenticated(authenticated), m_Stream(std::move(stream)), - m_Role(role), m_Timestamp(Utility::GetTime()), m_Seen(Utility::GetTime()), m_NextHeartbeat(0), m_HeartbeatTimeout(0) + const std::shared_ptr& stream, ConnectionRole role) + : m_Identity(identity), m_Authenticated(authenticated), m_Stream(stream), + m_Role(role), m_Timestamp(Utility::GetTime()), m_IoStrand(stream->get_io_service()), + m_OutgoingMessagesQueued(stream->get_io_service()), m_ReaderHasError(false), m_RunningCoroutines(0) { - boost::call_once(l_JsonRpcConnectionOnceFlag, &JsonRpcConnection::StaticInitialize); - if (authenticated) m_Endpoint = Endpoint::GetByName(identity); + + m_OutgoingMessagesQueued.expires_at(boost::posix_time::pos_infin); +} + +void JsonRpcConnection::Start() +{ + namespace asio = boost::asio; + + m_RunningCoroutines = 2; + + asio::spawn(m_IoStrand, [this](asio::yield_context yc) { HandleIncomingMessages(yc); }); + asio::spawn(m_IoStrand, [this](asio::yield_context yc) { WriteOutgoingMessages(yc); }); } -void JsonRpcConnection::StaticInitialize() +void JsonRpcConnection::HandleIncomingMessages(boost::asio::yield_context yc) { - l_JsonRpcConnectionTimeoutTimer = new Timer(); - l_JsonRpcConnectionTimeoutTimer->OnTimerExpired.connect(std::bind(&JsonRpcConnection::TimeoutTimerHandler)); - l_JsonRpcConnectionTimeoutTimer->SetInterval(15); - l_JsonRpcConnectionTimeoutTimer->Start(); + Defer shutdownStreamOnce ([this, &yc]() { + m_ReaderHasError = true; + m_OutgoingMessagesQueued.expires_at(boost::posix_time::neg_infin); - l_JsonRpcConnectionWorkQueueCount = Configuration::Concurrency; - l_JsonRpcConnectionWorkQueues = new WorkQueue[l_JsonRpcConnectionWorkQueueCount]; + ShutdownStreamOnce(yc); + }); - for (size_t i = 0; i < l_JsonRpcConnectionWorkQueueCount; i++) { - l_JsonRpcConnectionWorkQueues[i].SetName("JsonRpcConnection, #" + Convert::ToString(i)); + for (;;) { + String message; + + try { + message = JsonRpc::ReadMessage(m_Stream, yc, m_Endpoint ? -1 : 1024 * 1024); + } catch (const std::exception& ex) { + Log(LogWarning, "JsonRpcConnection") + << "Error while reading JSON-RPC message for identity '" << m_Identity + << "': " << DiagnosticInformation(ex); + + break; + } + + try { + CpuBoundWork handleMessage (yc); + + MessageHandler(message); + } catch (const std::exception& ex) { + Log(LogWarning, "JsonRpcConnection") + << "Error while processing JSON-RPC message for identity '" << m_Identity + << "': " << DiagnosticInformation(ex); + + break; + } } +} + +void JsonRpcConnection::WriteOutgoingMessages(boost::asio::yield_context yc) +{ + Defer shutdownStreamOnce ([this, &yc]() { ShutdownStreamOnce(yc); }); + + do { + try { + m_OutgoingMessagesQueued.async_wait(yc); + } catch (...) { + } + + auto queue (std::move(m_OutgoingMessagesQueue)); + + m_OutgoingMessagesQueue.clear(); + m_OutgoingMessagesQueued.expires_at(boost::posix_time::pos_infin); - l_HeartbeatTimer = new Timer(); - l_HeartbeatTimer->OnTimerExpired.connect(std::bind(&JsonRpcConnection::HeartbeatTimerHandler)); - l_HeartbeatTimer->SetInterval(10); - l_HeartbeatTimer->Start(); + if (!queue.empty()) { + try { + for (auto& message : queue) { + size_t bytesSent = JsonRpc::SendMessage(m_Stream, message, yc); + + if (m_Endpoint) { + m_Endpoint->AddMessageSent(bytesSent); + } + } + + m_Stream->async_flush(yc); + } catch (const std::exception& ex) { + std::ostringstream info; + info << "Error while sending JSON-RPC message for identity '" << m_Identity << "'"; + Log(LogWarning, "JsonRpcConnection") + << info.str() << "\n" << DiagnosticInformation(ex); + + break; + } + } + } while (!m_ReaderHasError); } -void JsonRpcConnection::Start() +void JsonRpcConnection::ShutdownStreamOnce(boost::asio::yield_context& yc) { - /* the stream holds an owning reference to this object through the callback we're registering here */ - m_Stream->RegisterDataHandler(std::bind(&JsonRpcConnection::DataAvailableHandler, JsonRpcConnection::Ptr(this))); - if (m_Stream->IsDataAvailable()) - DataAvailableHandler(); + if (!--m_RunningCoroutines) { + try { + m_Stream->next_layer().async_shutdown(yc); + } catch (...) { + // https://stackoverflow.com/questions/130117/throwing-exceptions-out-of-a-destructor + } + + Log(LogWarning, "JsonRpcConnection") + << "API client disconnected for identity '" << m_Identity << "'"; + + if (m_Endpoint) { + m_Endpoint->RemoveClient(this); + } else { + auto listener (ApiListener::GetInstance()); + listener->RemoveAnonymousClient(this); + } + } } double JsonRpcConnection::GetTimestamp() const @@ -83,7 +160,7 @@ Endpoint::Ptr JsonRpcConnection::GetEndpoint() const return m_Endpoint; } -TlsStream::Ptr JsonRpcConnection::GetStream() const +std::shared_ptr JsonRpcConnection::GetStream() const { return m_Stream; } @@ -95,69 +172,16 @@ ConnectionRole JsonRpcConnection::GetRole() const void JsonRpcConnection::SendMessage(const Dictionary::Ptr& message) { - try { - ObjectLock olock(m_Stream); - - if (m_Stream->IsEof()) - return; - - size_t bytesSent = JsonRpc::SendMessage(m_Stream, message); - - if (m_Endpoint) - m_Endpoint->AddMessageSent(bytesSent); - - } catch (const std::exception& ex) { - std::ostringstream info; - info << "Error while sending JSON-RPC message for identity '" << m_Identity << "'"; - Log(LogWarning, "JsonRpcConnection") - << info.str() << "\n" << DiagnosticInformation(ex); - - Disconnect(); - } -} - -void JsonRpcConnection::Disconnect() -{ - Log(LogWarning, "JsonRpcConnection") - << "API client disconnected for identity '" << m_Identity << "'"; - - m_Stream->Close(); - - if (m_Endpoint) - m_Endpoint->RemoveClient(this); - else { - ApiListener::Ptr listener = ApiListener::GetInstance(); - listener->RemoveAnonymousClient(this); - } -} - -void JsonRpcConnection::MessageHandlerWrapper(const String& jsonString) -{ - if (m_Stream->IsEof()) - return; - - try { - MessageHandler(jsonString); - } catch (const std::exception& ex) { - Log(LogWarning, "JsonRpcConnection") - << "Error while reading JSON-RPC message for identity '" << m_Identity - << "': " << DiagnosticInformation(ex); - - Disconnect(); - - return; - } + m_IoStrand.post([this, message]() { + m_OutgoingMessagesQueue.emplace_back(message); + m_OutgoingMessagesQueued.expires_at(boost::posix_time::neg_infin); + }); } void JsonRpcConnection::MessageHandler(const String& jsonString) { Dictionary::Ptr message = JsonRpc::DecodeMessage(jsonString); - m_Seen = Utility::GetTime(); - - if (m_HeartbeatTimeout != 0) - m_NextHeartbeat = Utility::GetTime() + m_HeartbeatTimeout; - if (m_Endpoint && message->Contains("ts")) { double ts = message->Get("ts"); @@ -225,57 +249,10 @@ void JsonRpcConnection::MessageHandler(const String& jsonString) if (message->Contains("id")) { resultMessage->Set("jsonrpc", "2.0"); resultMessage->Set("id", message->Get("id")); - SendMessage(resultMessage); - } -} - -bool JsonRpcConnection::ProcessMessage() -{ - /* Limit for anonymous clients (signing requests and not configured endpoints. */ - ssize_t maxMessageLength = 1024 * 1024; - - if (m_Endpoint) - maxMessageLength = -1; /* no limit */ - - String message; - - StreamReadStatus srs = JsonRpc::ReadMessage(m_Stream, &message, m_Context, false, maxMessageLength); - - if (srs != StatusNewItem) - return false; - - l_JsonRpcConnectionWorkQueues[m_ID % l_JsonRpcConnectionWorkQueueCount].Enqueue(std::bind(&JsonRpcConnection::MessageHandlerWrapper, JsonRpcConnection::Ptr(this), message)); - - return true; -} - -void JsonRpcConnection::DataAvailableHandler() -{ - bool close = false; - - if (!m_Stream) - return; - - if (!m_Stream->IsEof()) { - boost::mutex::scoped_lock lock(m_DataHandlerMutex); - - try { - while (ProcessMessage()) - ; /* empty loop body */ - } catch (const std::exception& ex) { - Log(LogWarning, "JsonRpcConnection") - << "Error while reading JSON-RPC message for identity '" << m_Identity - << "': " << DiagnosticInformation(ex); - - Disconnect(); - - return; - } - } else - close = true; - if (close) - Disconnect(); + m_OutgoingMessagesQueue.emplace_back(resultMessage); + m_OutgoingMessagesQueued.expires_at(boost::posix_time::neg_infin); + } } Value SetLogPositionHandler(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params) @@ -292,57 +269,3 @@ Value SetLogPositionHandler(const MessageOrigin::Ptr& origin, const Dictionary:: return Empty; } -void JsonRpcConnection::CheckLiveness() -{ - if (m_Seen < Utility::GetTime() - 60 && (!m_Endpoint || !m_Endpoint->GetSyncing())) { - Log(LogInformation, "JsonRpcConnection") - << "No messages for identity '" << m_Identity << "' have been received in the last 60 seconds."; - Disconnect(); - } -} - -void JsonRpcConnection::TimeoutTimerHandler() -{ - ApiListener::Ptr listener = ApiListener::GetInstance(); - - for (const JsonRpcConnection::Ptr& client : listener->GetAnonymousClients()) { - client->CheckLiveness(); - } - - for (const Endpoint::Ptr& endpoint : ConfigType::GetObjectsByType()) { - for (const JsonRpcConnection::Ptr& client : endpoint->GetClients()) { - client->CheckLiveness(); - } - } -} - -size_t JsonRpcConnection::GetWorkQueueCount() -{ - return l_JsonRpcConnectionWorkQueueCount; -} - -size_t JsonRpcConnection::GetWorkQueueLength() -{ - size_t itemCount = 0; - - for (size_t i = 0; i < GetWorkQueueCount(); i++) - itemCount += l_JsonRpcConnectionWorkQueues[i].GetLength(); - - return itemCount; -} - -double JsonRpcConnection::GetWorkQueueRate() -{ - double rate = 0.0; - size_t count = GetWorkQueueCount(); - - /* If this is a standalone environment, we don't have any queues. */ - if (count == 0) - return 0.0; - - for (size_t i = 0; i < count; i++) - rate += l_JsonRpcConnectionWorkQueues[i].GetTaskCount(60) / 60.0; - - return rate / count; -} - diff --git a/lib/remote/jsonrpcconnection.hpp b/lib/remote/jsonrpcconnection.hpp index 40df9af13..5263b30ac 100644 --- a/lib/remote/jsonrpcconnection.hpp +++ b/lib/remote/jsonrpcconnection.hpp @@ -8,6 +8,11 @@ #include "base/tlsstream.hpp" #include "base/timer.hpp" #include "base/workqueue.hpp" +#include +#include +#include +#include +#include namespace icinga { @@ -36,7 +41,7 @@ class JsonRpcConnection final : public Object public: DECLARE_PTR_TYPEDEFS(JsonRpcConnection); - JsonRpcConnection(const String& identity, bool authenticated, TlsStream::Ptr stream, ConnectionRole role); + JsonRpcConnection(const String& identity, bool authenticated, const std::shared_ptr& stream, ConnectionRole role); void Start(); @@ -44,45 +49,34 @@ public: String GetIdentity() const; bool IsAuthenticated() const; Endpoint::Ptr GetEndpoint() const; - TlsStream::Ptr GetStream() const; + std::shared_ptr GetStream() const; ConnectionRole GetRole() const; - void Disconnect(); - void SendMessage(const Dictionary::Ptr& request); - static void HeartbeatTimerHandler(); static Value HeartbeatAPIHandler(const intrusive_ptr& origin, const Dictionary::Ptr& params); - static size_t GetWorkQueueCount(); - static size_t GetWorkQueueLength(); - static double GetWorkQueueRate(); - static void SendCertificateRequest(const JsonRpcConnection::Ptr& aclient, const intrusive_ptr& origin, const String& path); private: - int m_ID; String m_Identity; bool m_Authenticated; Endpoint::Ptr m_Endpoint; - TlsStream::Ptr m_Stream; + std::shared_ptr m_Stream; ConnectionRole m_Role; double m_Timestamp; - double m_Seen; - double m_NextHeartbeat; - double m_HeartbeatTimeout; - boost::mutex m_DataHandlerMutex; + boost::asio::io_service::strand m_IoStrand; + std::vector m_OutgoingMessagesQueue; + boost::asio::deadline_timer m_OutgoingMessagesQueued; + bool m_ReaderHasError; + unsigned char m_RunningCoroutines; - StreamReadContext m_Context; + void HandleIncomingMessages(boost::asio::yield_context yc); + void WriteOutgoingMessages(boost::asio::yield_context yc); + void ShutdownStreamOnce(boost::asio::yield_context& yc); bool ProcessMessage(); - void MessageHandlerWrapper(const String& jsonString); void MessageHandler(const String& jsonString); - void DataAvailableHandler(); - - static void StaticInitialize(); - static void TimeoutTimerHandler(); - void CheckLiveness(); void CertificateRequestResponseHandler(const Dictionary::Ptr& message); }; -- 2.40.0