}
m_SSLContext = context;
-
- for (const Endpoint::Ptr& endpoint : ConfigType::GetObjectsByType<Endpoint>()) {
- for (const JsonRpcConnection::Ptr& client : endpoint->GetClients()) {
- client->Disconnect();
- }
- }
-
- for (const JsonRpcConnection::Ptr& client : m_AnonymousClients) {
- client->Disconnect();
- }
}
void ApiListener::OnAllConfigLoaded()
}
}
- if (ctype != ClientJsonRpc) {
+ if (ctype == ClientJsonRpc) {
+ Log(LogNotice, "ApiListener", "New JSON-RPC client");
+
+ JsonRpcConnection::Ptr aclient = new JsonRpcConnection(identity, verify_ok, client, role);
+
+ if (endpoint) {
+ bool needSync = !endpoint->GetConnected();
+
+ endpoint->AddClient(aclient);
+
+ asio::spawn(client->get_io_service(), [this, aclient, endpoint, needSync](asio::yield_context yc) {
+ CpuBoundWork syncClient (yc);
+
+ SyncClient(aclient, endpoint, needSync);
+ });
+ } else if (!AddAnonymousClient(aclient)) {
+ Log(LogNotice, "ApiListener")
+ << "Ignoring anonymous JSON-RPC connection " << conninfo
+ << ". Max connections (" << GetMaxAnonymousClients() << ") exceeded.";
+
+ aclient = nullptr;
+ }
+
+ if (aclient) {
+ aclient->Start();
+ }
+ } else {
Log(LogNotice, "ApiListener", "New HTTP client");
HttpServerConnection::Ptr aclient = new HttpServerConnection(identity, verify_ok, client);
}
for (const JsonRpcConnection::Ptr& client : endpoint->GetClients()) {
- if (client->GetTimestamp() != maxTs)
- client->Disconnect();
- else
+ if (client->GetTimestamp() == maxTs) {
client->SendMessage(lmessage);
+ }
}
Log(LogNotice, "ApiListener")
}
try {
- size_t bytesSent = NetString::WriteStringToStream(client->GetStream(), pmessage->Get("message"));
- endpoint->AddMessageSent(bytesSent);
+ client->SendMessage(JsonDecode(pmessage->Get("message")));
count++;
} catch (const std::exception& ex) {
Log(LogWarning, "ApiListener")
}) }
});
- size_t bytesSent = JsonRpc::SendMessage(client->GetStream(), lmessage);
- endpoint->AddMessageSent(bytesSent);
+ client->SendMessage(lmessage);
}
}
/* connection stats */
size_t jsonRpcAnonymousClients = GetAnonymousClients().size();
size_t httpClients = GetHttpClients().size();
- size_t workQueueItems = JsonRpcConnection::GetWorkQueueLength();
- size_t workQueueCount = JsonRpcConnection::GetWorkQueueCount();
size_t syncQueueItems = m_SyncQueue.GetLength();
size_t relayQueueItems = m_RelayQueue.GetLength();
- double workQueueItemRate = JsonRpcConnection::GetWorkQueueRate();
double syncQueueItemRate = m_SyncQueue.GetTaskCount(60) / 60.0;
double relayQueueItemRate = m_RelayQueue.GetTaskCount(60) / 60.0;
{ "json_rpc", new Dictionary({
{ "anonymous_clients", jsonRpcAnonymousClients },
- { "work_queue_items", workQueueItems },
- { "work_queue_count", workQueueCount },
{ "sync_queue_items", syncQueueItems },
{ "relay_queue_items", relayQueueItems },
- { "work_queue_item_rate", workQueueItemRate },
{ "sync_queue_item_rate", syncQueueItemRate },
{ "relay_queue_item_rate", relayQueueItemRate }
}) },
perfdata->Set("num_json_rpc_anonymous_clients", jsonRpcAnonymousClients);
perfdata->Set("num_http_clients", httpClients);
- perfdata->Set("num_json_rpc_work_queue_items", workQueueItems);
- perfdata->Set("num_json_rpc_work_queue_count", workQueueCount);
perfdata->Set("num_json_rpc_sync_queue_items", syncQueueItems);
perfdata->Set("num_json_rpc_relay_queue_items", relayQueueItems);
- perfdata->Set("num_json_rpc_work_queue_item_rate", workQueueItemRate);
perfdata->Set("num_json_rpc_sync_queue_item_rate", syncQueueItemRate);
perfdata->Set("num_json_rpc_relay_queue_item_rate", relayQueueItemRate);
#include "remote/apifunction.hpp"
#include "remote/jsonrpc.hpp"
#include "base/configtype.hpp"
+#include "base/io-engine.hpp"
#include "base/objectlock.hpp"
#include "base/utility.hpp"
#include "base/logger.hpp"
#include "base/exception.hpp"
#include "base/convert.hpp"
+#include "base/tlsstream.hpp"
+#include <memory>
+#include <utility>
+#include <boost/asio/spawn.hpp>
+#include <boost/date_time/posix_time/ptime.hpp>
#include <boost/thread/once.hpp>
using namespace icinga;
static Value SetLogPositionHandler(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params);
REGISTER_APIFUNCTION(SetLogPosition, log, &SetLogPositionHandler);
-static boost::once_flag l_JsonRpcConnectionOnceFlag = BOOST_ONCE_INIT;
-static Timer::Ptr l_JsonRpcConnectionTimeoutTimer;
-static WorkQueue *l_JsonRpcConnectionWorkQueues;
-static size_t l_JsonRpcConnectionWorkQueueCount;
-static int l_JsonRpcConnectionNextID;
-static Timer::Ptr l_HeartbeatTimer;
-
JsonRpcConnection::JsonRpcConnection(const String& identity, bool authenticated,
- TlsStream::Ptr stream, ConnectionRole role)
- : m_ID(l_JsonRpcConnectionNextID++), m_Identity(identity), m_Authenticated(authenticated), m_Stream(std::move(stream)),
- m_Role(role), m_Timestamp(Utility::GetTime()), m_Seen(Utility::GetTime()), m_NextHeartbeat(0), m_HeartbeatTimeout(0)
+ const std::shared_ptr<AsioTlsStream>& stream, ConnectionRole role)
+ : m_Identity(identity), m_Authenticated(authenticated), m_Stream(stream),
+ m_Role(role), m_Timestamp(Utility::GetTime()), m_IoStrand(stream->get_io_service()),
+ m_OutgoingMessagesQueued(stream->get_io_service()), m_ReaderHasError(false), m_RunningCoroutines(0)
{
- boost::call_once(l_JsonRpcConnectionOnceFlag, &JsonRpcConnection::StaticInitialize);
-
if (authenticated)
m_Endpoint = Endpoint::GetByName(identity);
+
+ m_OutgoingMessagesQueued.expires_at(boost::posix_time::pos_infin);
+}
+
+void JsonRpcConnection::Start()
+{
+ namespace asio = boost::asio;
+
+ m_RunningCoroutines = 2;
+
+ asio::spawn(m_IoStrand, [this](asio::yield_context yc) { HandleIncomingMessages(yc); });
+ asio::spawn(m_IoStrand, [this](asio::yield_context yc) { WriteOutgoingMessages(yc); });
}
-void JsonRpcConnection::StaticInitialize()
+void JsonRpcConnection::HandleIncomingMessages(boost::asio::yield_context yc)
{
- l_JsonRpcConnectionTimeoutTimer = new Timer();
- l_JsonRpcConnectionTimeoutTimer->OnTimerExpired.connect(std::bind(&JsonRpcConnection::TimeoutTimerHandler));
- l_JsonRpcConnectionTimeoutTimer->SetInterval(15);
- l_JsonRpcConnectionTimeoutTimer->Start();
+ Defer shutdownStreamOnce ([this, &yc]() {
+ m_ReaderHasError = true;
+ m_OutgoingMessagesQueued.expires_at(boost::posix_time::neg_infin);
- l_JsonRpcConnectionWorkQueueCount = Configuration::Concurrency;
- l_JsonRpcConnectionWorkQueues = new WorkQueue[l_JsonRpcConnectionWorkQueueCount];
+ ShutdownStreamOnce(yc);
+ });
- for (size_t i = 0; i < l_JsonRpcConnectionWorkQueueCount; i++) {
- l_JsonRpcConnectionWorkQueues[i].SetName("JsonRpcConnection, #" + Convert::ToString(i));
+ for (;;) {
+ String message;
+
+ try {
+ message = JsonRpc::ReadMessage(m_Stream, yc, m_Endpoint ? -1 : 1024 * 1024);
+ } catch (const std::exception& ex) {
+ Log(LogWarning, "JsonRpcConnection")
+ << "Error while reading JSON-RPC message for identity '" << m_Identity
+ << "': " << DiagnosticInformation(ex);
+
+ break;
+ }
+
+ try {
+ CpuBoundWork handleMessage (yc);
+
+ MessageHandler(message);
+ } catch (const std::exception& ex) {
+ Log(LogWarning, "JsonRpcConnection")
+ << "Error while processing JSON-RPC message for identity '" << m_Identity
+ << "': " << DiagnosticInformation(ex);
+
+ break;
+ }
}
+}
+
+void JsonRpcConnection::WriteOutgoingMessages(boost::asio::yield_context yc)
+{
+ Defer shutdownStreamOnce ([this, &yc]() { ShutdownStreamOnce(yc); });
+
+ do {
+ try {
+ m_OutgoingMessagesQueued.async_wait(yc);
+ } catch (...) {
+ }
+
+ auto queue (std::move(m_OutgoingMessagesQueue));
+
+ m_OutgoingMessagesQueue.clear();
+ m_OutgoingMessagesQueued.expires_at(boost::posix_time::pos_infin);
- l_HeartbeatTimer = new Timer();
- l_HeartbeatTimer->OnTimerExpired.connect(std::bind(&JsonRpcConnection::HeartbeatTimerHandler));
- l_HeartbeatTimer->SetInterval(10);
- l_HeartbeatTimer->Start();
+ if (!queue.empty()) {
+ try {
+ for (auto& message : queue) {
+ size_t bytesSent = JsonRpc::SendMessage(m_Stream, message, yc);
+
+ if (m_Endpoint) {
+ m_Endpoint->AddMessageSent(bytesSent);
+ }
+ }
+
+ m_Stream->async_flush(yc);
+ } catch (const std::exception& ex) {
+ std::ostringstream info;
+ info << "Error while sending JSON-RPC message for identity '" << m_Identity << "'";
+ Log(LogWarning, "JsonRpcConnection")
+ << info.str() << "\n" << DiagnosticInformation(ex);
+
+ break;
+ }
+ }
+ } while (!m_ReaderHasError);
}
-void JsonRpcConnection::Start()
+void JsonRpcConnection::ShutdownStreamOnce(boost::asio::yield_context& yc)
{
- /* the stream holds an owning reference to this object through the callback we're registering here */
- m_Stream->RegisterDataHandler(std::bind(&JsonRpcConnection::DataAvailableHandler, JsonRpcConnection::Ptr(this)));
- if (m_Stream->IsDataAvailable())
- DataAvailableHandler();
+ if (!--m_RunningCoroutines) {
+ try {
+ m_Stream->next_layer().async_shutdown(yc);
+ } catch (...) {
+ // https://stackoverflow.com/questions/130117/throwing-exceptions-out-of-a-destructor
+ }
+
+ Log(LogWarning, "JsonRpcConnection")
+ << "API client disconnected for identity '" << m_Identity << "'";
+
+ if (m_Endpoint) {
+ m_Endpoint->RemoveClient(this);
+ } else {
+ auto listener (ApiListener::GetInstance());
+ listener->RemoveAnonymousClient(this);
+ }
+ }
}
double JsonRpcConnection::GetTimestamp() const
return m_Endpoint;
}
-TlsStream::Ptr JsonRpcConnection::GetStream() const
+std::shared_ptr<AsioTlsStream> JsonRpcConnection::GetStream() const
{
return m_Stream;
}
void JsonRpcConnection::SendMessage(const Dictionary::Ptr& message)
{
- try {
- ObjectLock olock(m_Stream);
-
- if (m_Stream->IsEof())
- return;
-
- size_t bytesSent = JsonRpc::SendMessage(m_Stream, message);
-
- if (m_Endpoint)
- m_Endpoint->AddMessageSent(bytesSent);
-
- } catch (const std::exception& ex) {
- std::ostringstream info;
- info << "Error while sending JSON-RPC message for identity '" << m_Identity << "'";
- Log(LogWarning, "JsonRpcConnection")
- << info.str() << "\n" << DiagnosticInformation(ex);
-
- Disconnect();
- }
-}
-
-void JsonRpcConnection::Disconnect()
-{
- Log(LogWarning, "JsonRpcConnection")
- << "API client disconnected for identity '" << m_Identity << "'";
-
- m_Stream->Close();
-
- if (m_Endpoint)
- m_Endpoint->RemoveClient(this);
- else {
- ApiListener::Ptr listener = ApiListener::GetInstance();
- listener->RemoveAnonymousClient(this);
- }
-}
-
-void JsonRpcConnection::MessageHandlerWrapper(const String& jsonString)
-{
- if (m_Stream->IsEof())
- return;
-
- try {
- MessageHandler(jsonString);
- } catch (const std::exception& ex) {
- Log(LogWarning, "JsonRpcConnection")
- << "Error while reading JSON-RPC message for identity '" << m_Identity
- << "': " << DiagnosticInformation(ex);
-
- Disconnect();
-
- return;
- }
+ m_IoStrand.post([this, message]() {
+ m_OutgoingMessagesQueue.emplace_back(message);
+ m_OutgoingMessagesQueued.expires_at(boost::posix_time::neg_infin);
+ });
}
void JsonRpcConnection::MessageHandler(const String& jsonString)
{
Dictionary::Ptr message = JsonRpc::DecodeMessage(jsonString);
- m_Seen = Utility::GetTime();
-
- if (m_HeartbeatTimeout != 0)
- m_NextHeartbeat = Utility::GetTime() + m_HeartbeatTimeout;
-
if (m_Endpoint && message->Contains("ts")) {
double ts = message->Get("ts");
if (message->Contains("id")) {
resultMessage->Set("jsonrpc", "2.0");
resultMessage->Set("id", message->Get("id"));
- SendMessage(resultMessage);
- }
-}
-
-bool JsonRpcConnection::ProcessMessage()
-{
- /* Limit for anonymous clients (signing requests and not configured endpoints. */
- ssize_t maxMessageLength = 1024 * 1024;
-
- if (m_Endpoint)
- maxMessageLength = -1; /* no limit */
-
- String message;
-
- StreamReadStatus srs = JsonRpc::ReadMessage(m_Stream, &message, m_Context, false, maxMessageLength);
-
- if (srs != StatusNewItem)
- return false;
-
- l_JsonRpcConnectionWorkQueues[m_ID % l_JsonRpcConnectionWorkQueueCount].Enqueue(std::bind(&JsonRpcConnection::MessageHandlerWrapper, JsonRpcConnection::Ptr(this), message));
-
- return true;
-}
-
-void JsonRpcConnection::DataAvailableHandler()
-{
- bool close = false;
-
- if (!m_Stream)
- return;
-
- if (!m_Stream->IsEof()) {
- boost::mutex::scoped_lock lock(m_DataHandlerMutex);
-
- try {
- while (ProcessMessage())
- ; /* empty loop body */
- } catch (const std::exception& ex) {
- Log(LogWarning, "JsonRpcConnection")
- << "Error while reading JSON-RPC message for identity '" << m_Identity
- << "': " << DiagnosticInformation(ex);
-
- Disconnect();
-
- return;
- }
- } else
- close = true;
- if (close)
- Disconnect();
+ m_OutgoingMessagesQueue.emplace_back(resultMessage);
+ m_OutgoingMessagesQueued.expires_at(boost::posix_time::neg_infin);
+ }
}
Value SetLogPositionHandler(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params)
return Empty;
}
-void JsonRpcConnection::CheckLiveness()
-{
- if (m_Seen < Utility::GetTime() - 60 && (!m_Endpoint || !m_Endpoint->GetSyncing())) {
- Log(LogInformation, "JsonRpcConnection")
- << "No messages for identity '" << m_Identity << "' have been received in the last 60 seconds.";
- Disconnect();
- }
-}
-
-void JsonRpcConnection::TimeoutTimerHandler()
-{
- ApiListener::Ptr listener = ApiListener::GetInstance();
-
- for (const JsonRpcConnection::Ptr& client : listener->GetAnonymousClients()) {
- client->CheckLiveness();
- }
-
- for (const Endpoint::Ptr& endpoint : ConfigType::GetObjectsByType<Endpoint>()) {
- for (const JsonRpcConnection::Ptr& client : endpoint->GetClients()) {
- client->CheckLiveness();
- }
- }
-}
-
-size_t JsonRpcConnection::GetWorkQueueCount()
-{
- return l_JsonRpcConnectionWorkQueueCount;
-}
-
-size_t JsonRpcConnection::GetWorkQueueLength()
-{
- size_t itemCount = 0;
-
- for (size_t i = 0; i < GetWorkQueueCount(); i++)
- itemCount += l_JsonRpcConnectionWorkQueues[i].GetLength();
-
- return itemCount;
-}
-
-double JsonRpcConnection::GetWorkQueueRate()
-{
- double rate = 0.0;
- size_t count = GetWorkQueueCount();
-
- /* If this is a standalone environment, we don't have any queues. */
- if (count == 0)
- return 0.0;
-
- for (size_t i = 0; i < count; i++)
- rate += l_JsonRpcConnectionWorkQueues[i].GetTaskCount(60) / 60.0;
-
- return rate / count;
-}
-
#include "base/tlsstream.hpp"
#include "base/timer.hpp"
#include "base/workqueue.hpp"
+#include <memory>
+#include <vector>
+#include <boost/asio/io_service_strand.hpp>
+#include <boost/asio/spawn.hpp>
+#include <boost/asio/deadline_timer.hpp>
namespace icinga
{
public:
DECLARE_PTR_TYPEDEFS(JsonRpcConnection);
- JsonRpcConnection(const String& identity, bool authenticated, TlsStream::Ptr stream, ConnectionRole role);
+ JsonRpcConnection(const String& identity, bool authenticated, const std::shared_ptr<AsioTlsStream>& stream, ConnectionRole role);
void Start();
String GetIdentity() const;
bool IsAuthenticated() const;
Endpoint::Ptr GetEndpoint() const;
- TlsStream::Ptr GetStream() const;
+ std::shared_ptr<AsioTlsStream> GetStream() const;
ConnectionRole GetRole() const;
- void Disconnect();
-
void SendMessage(const Dictionary::Ptr& request);
- static void HeartbeatTimerHandler();
static Value HeartbeatAPIHandler(const intrusive_ptr<MessageOrigin>& origin, const Dictionary::Ptr& params);
- static size_t GetWorkQueueCount();
- static size_t GetWorkQueueLength();
- static double GetWorkQueueRate();
-
static void SendCertificateRequest(const JsonRpcConnection::Ptr& aclient, const intrusive_ptr<MessageOrigin>& origin, const String& path);
private:
- int m_ID;
String m_Identity;
bool m_Authenticated;
Endpoint::Ptr m_Endpoint;
- TlsStream::Ptr m_Stream;
+ std::shared_ptr<AsioTlsStream> m_Stream;
ConnectionRole m_Role;
double m_Timestamp;
- double m_Seen;
- double m_NextHeartbeat;
- double m_HeartbeatTimeout;
- boost::mutex m_DataHandlerMutex;
+ boost::asio::io_service::strand m_IoStrand;
+ std::vector<Dictionary::Ptr> m_OutgoingMessagesQueue;
+ boost::asio::deadline_timer m_OutgoingMessagesQueued;
+ bool m_ReaderHasError;
+ unsigned char m_RunningCoroutines;
- StreamReadContext m_Context;
+ void HandleIncomingMessages(boost::asio::yield_context yc);
+ void WriteOutgoingMessages(boost::asio::yield_context yc);
+ void ShutdownStreamOnce(boost::asio::yield_context& yc);
bool ProcessMessage();
- void MessageHandlerWrapper(const String& jsonString);
void MessageHandler(const String& jsonString);
- void DataAvailableHandler();
-
- static void StaticInitialize();
- static void TimeoutTimerHandler();
- void CheckLiveness();
void CertificateRequestResponseHandler(const Dictionary::Ptr& message);
};