const std::shared_ptr<AsioTlsStream>& stream, ConnectionRole role)
: m_Identity(identity), m_Authenticated(authenticated), m_Stream(stream),
m_Role(role), m_Timestamp(Utility::GetTime()), m_IoStrand(stream->get_io_service()),
- m_OutgoingMessagesQueued(stream->get_io_service()), m_ReaderHasError(false), m_RunningCoroutines(0)
+ m_OutgoingMessagesQueued(stream->get_io_service()), m_WriterDone(stream->get_io_service()), m_ShuttingDown(false)
{
if (authenticated)
m_Endpoint = Endpoint::GetByName(identity);
m_OutgoingMessagesQueued.expires_at(boost::posix_time::pos_infin);
+ m_WriterDone.expires_at(boost::posix_time::pos_infin);
}
void JsonRpcConnection::Start()
{
namespace asio = boost::asio;
- m_RunningCoroutines = 2;
+ JsonRpcConnection::Ptr preventGc (this);
- asio::spawn(m_IoStrand, [this](asio::yield_context yc) { HandleIncomingMessages(yc); });
- asio::spawn(m_IoStrand, [this](asio::yield_context yc) { WriteOutgoingMessages(yc); });
+ asio::spawn(m_IoStrand, [this, preventGc](asio::yield_context yc) { HandleIncomingMessages(yc); });
+ asio::spawn(m_IoStrand, [this, preventGc](asio::yield_context yc) { WriteOutgoingMessages(yc); });
}
void JsonRpcConnection::HandleIncomingMessages(boost::asio::yield_context yc)
{
- Defer shutdownStreamOnce ([this, &yc]() {
- m_ReaderHasError = true;
- m_OutgoingMessagesQueued.expires_at(boost::posix_time::neg_infin);
-
- ShutdownStreamOnce(yc);
- });
+ Defer disconnect ([this]() { Disconnect(); });
for (;;) {
String message;
try {
message = JsonRpc::ReadMessage(m_Stream, yc, m_Endpoint ? -1 : 1024 * 1024);
} catch (const std::exception& ex) {
- Log(LogWarning, "JsonRpcConnection")
- << "Error while reading JSON-RPC message for identity '" << m_Identity
- << "': " << DiagnosticInformation(ex);
+ if (!m_ShuttingDown) {
+ Log(LogWarning, "JsonRpcConnection")
+ << "Error while reading JSON-RPC message for identity '" << m_Identity
+ << "': " << DiagnosticInformation(ex);
+ }
break;
}
MessageHandler(message);
} catch (const std::exception& ex) {
- Log(LogWarning, "JsonRpcConnection")
- << "Error while processing JSON-RPC message for identity '" << m_Identity
- << "': " << DiagnosticInformation(ex);
+ if (!m_ShuttingDown) {
+ Log(LogWarning, "JsonRpcConnection")
+ << "Error while processing JSON-RPC message for identity '" << m_Identity
+ << "': " << DiagnosticInformation(ex);
+ }
break;
}
void JsonRpcConnection::WriteOutgoingMessages(boost::asio::yield_context yc)
{
- Defer shutdownStreamOnce ([this, &yc]() { ShutdownStreamOnce(yc); });
+ Defer disconnect ([this]() { Disconnect(); });
+
+ Defer signalWriterDone ([this]() { m_WriterDone.expires_at(boost::posix_time::neg_infin); });
do {
try {
m_Stream->async_flush(yc);
} catch (const std::exception& ex) {
- std::ostringstream info;
- info << "Error while sending JSON-RPC message for identity '" << m_Identity << "'";
- Log(LogWarning, "JsonRpcConnection")
- << info.str() << "\n" << DiagnosticInformation(ex);
+ if (!m_ShuttingDown) {
+ std::ostringstream info;
+ info << "Error while sending JSON-RPC message for identity '" << m_Identity << "'";
+ Log(LogWarning, "JsonRpcConnection")
+ << info.str() << "\n" << DiagnosticInformation(ex);
+ }
break;
}
}
- } while (!m_ReaderHasError);
-}
-
-void JsonRpcConnection::ShutdownStreamOnce(boost::asio::yield_context& yc)
-{
- if (!--m_RunningCoroutines) {
- try {
- m_Stream->next_layer().async_shutdown(yc);
- } catch (...) {
- // https://stackoverflow.com/questions/130117/throwing-exceptions-out-of-a-destructor
- }
-
- Log(LogWarning, "JsonRpcConnection")
- << "API client disconnected for identity '" << m_Identity << "'";
-
- if (m_Endpoint) {
- m_Endpoint->RemoveClient(this);
- } else {
- auto listener (ApiListener::GetInstance());
- listener->RemoveAnonymousClient(this);
- }
- }
+ } while (!m_ShuttingDown);
}
double JsonRpcConnection::GetTimestamp() const
});
}
+void JsonRpcConnection::Disconnect()
+{
+ namespace asio = boost::asio;
+
+ JsonRpcConnection::Ptr preventGc (this);
+
+ asio::spawn(m_IoStrand, [this, preventGc](asio::yield_context yc) {
+ if (!m_ShuttingDown) {
+ m_ShuttingDown = true;
+
+ Log(LogWarning, "JsonRpcConnection")
+ << "API client disconnected for identity '" << m_Identity << "'";
+
+ m_OutgoingMessagesQueued.expires_at(boost::posix_time::neg_infin);
+
+ try {
+ m_WriterDone.async_wait(yc);
+ } catch (...) {
+ }
+
+ try {
+ m_Stream->next_layer().async_shutdown(yc);
+ } catch (...) {
+ }
+
+ try {
+ m_Stream->lowest_layer().shutdown(m_Stream->lowest_layer().shutdown_both);
+ } catch (...) {
+ }
+
+ if (m_Endpoint) {
+ m_Endpoint->RemoveClient(this);
+ } else {
+ auto listener (ApiListener::GetInstance());
+ listener->RemoveAnonymousClient(this);
+ }
+ }
+ });
+}
+
void JsonRpcConnection::MessageHandler(const String& jsonString)
{
Dictionary::Ptr message = JsonRpc::DecodeMessage(jsonString);