1 /* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */
3 #include "remote/jsonrpcconnection.hpp"
4 #include "remote/apilistener.hpp"
5 #include "remote/apifunction.hpp"
6 #include "remote/jsonrpc.hpp"
7 #include "base/defer.hpp"
8 #include "base/configtype.hpp"
9 #include "base/io-engine.hpp"
10 #include "base/json.hpp"
11 #include "base/objectlock.hpp"
12 #include "base/utility.hpp"
13 #include "base/logger.hpp"
14 #include "base/exception.hpp"
15 #include "base/convert.hpp"
16 #include "base/tlsstream.hpp"
19 #include <boost/asio/io_service.hpp>
20 #include <boost/asio/spawn.hpp>
21 #include <boost/date_time/posix_time/posix_time_duration.hpp>
22 #include <boost/system/system_error.hpp>
23 #include <boost/thread/once.hpp>
25 using namespace icinga;
27 static Value SetLogPositionHandler(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params);
28 REGISTER_APIFUNCTION(SetLogPosition, log, &SetLogPositionHandler);
30 static RingBuffer l_TaskStats (15 * 60);
32 JsonRpcConnection::JsonRpcConnection(const String& identity, bool authenticated,
33 const std::shared_ptr<AsioTlsStream>& stream, ConnectionRole role)
34 : JsonRpcConnection(identity, authenticated, stream, role, IoEngine::Get().GetIoService())
38 JsonRpcConnection::JsonRpcConnection(const String& identity, bool authenticated,
39 const std::shared_ptr<AsioTlsStream>& stream, ConnectionRole role, boost::asio::io_service& io)
40 : m_Identity(identity), m_Authenticated(authenticated), m_Stream(stream), m_Role(role),
41 m_Timestamp(Utility::GetTime()), m_Seen(Utility::GetTime()), m_NextHeartbeat(0), m_IoStrand(io),
42 m_OutgoingMessagesQueued(io), m_WriterDone(io), m_ShuttingDown(false),
43 m_CheckLivenessTimer(io), m_HeartbeatTimer(io)
46 m_Endpoint = Endpoint::GetByName(identity);
49 void JsonRpcConnection::Start()
51 namespace asio = boost::asio;
53 JsonRpcConnection::Ptr keepAlive (this);
55 asio::spawn(m_IoStrand, [this, keepAlive](asio::yield_context yc) { HandleIncomingMessages(yc); });
56 asio::spawn(m_IoStrand, [this, keepAlive](asio::yield_context yc) { WriteOutgoingMessages(yc); });
57 asio::spawn(m_IoStrand, [this, keepAlive](asio::yield_context yc) { HandleAndWriteHeartbeats(yc); });
58 asio::spawn(m_IoStrand, [this, keepAlive](asio::yield_context yc) { CheckLiveness(yc); });
61 void JsonRpcConnection::HandleIncomingMessages(boost::asio::yield_context yc)
63 Defer disconnect ([this]() { Disconnect(); });
69 message = JsonRpc::ReadMessage(m_Stream, yc, m_Endpoint ? -1 : 1024 * 1024);
70 } catch (const std::exception& ex) {
71 if (!m_ShuttingDown) {
72 Log(LogNotice, "JsonRpcConnection")
73 << "Error while reading JSON-RPC message for identity '" << m_Identity
74 << "': " << DiagnosticInformation(ex);
80 m_Seen = Utility::GetTime();
83 CpuBoundWork handleMessage (yc);
85 MessageHandler(message);
86 } catch (const std::exception& ex) {
87 if (!m_ShuttingDown) {
88 Log(LogWarning, "JsonRpcConnection")
89 << "Error while processing JSON-RPC message for identity '" << m_Identity
90 << "': " << DiagnosticInformation(ex);
96 CpuBoundWork taskStats (yc);
98 l_TaskStats.InsertValue(Utility::GetTime(), 1);
102 void JsonRpcConnection::WriteOutgoingMessages(boost::asio::yield_context yc)
104 Defer disconnect ([this]() { Disconnect(); });
106 Defer signalWriterDone ([this]() { m_WriterDone.Set(); });
109 m_OutgoingMessagesQueued.Wait(yc);
111 auto queue (std::move(m_OutgoingMessagesQueue));
113 m_OutgoingMessagesQueue.clear();
114 m_OutgoingMessagesQueued.Clear();
116 if (!queue.empty()) {
118 for (auto& message : queue) {
119 size_t bytesSent = JsonRpc::SendRawMessage(m_Stream, message, yc);
122 m_Endpoint->AddMessageSent(bytesSent);
126 m_Stream->async_flush(yc);
127 } catch (const std::exception& ex) {
128 if (!m_ShuttingDown) {
129 std::ostringstream info;
130 info << "Error while sending JSON-RPC message for identity '" << m_Identity << "'";
131 Log(LogWarning, "JsonRpcConnection")
132 << info.str() << "\n" << DiagnosticInformation(ex);
138 } while (!m_ShuttingDown);
141 double JsonRpcConnection::GetTimestamp() const
146 String JsonRpcConnection::GetIdentity() const
151 bool JsonRpcConnection::IsAuthenticated() const
153 return m_Authenticated;
156 Endpoint::Ptr JsonRpcConnection::GetEndpoint() const
161 std::shared_ptr<AsioTlsStream> JsonRpcConnection::GetStream() const
166 ConnectionRole JsonRpcConnection::GetRole() const
171 void JsonRpcConnection::SendMessage(const Dictionary::Ptr& message)
173 m_IoStrand.post([this, message]() { SendMessageInternal(message); });
176 void JsonRpcConnection::SendRawMessage(const String& message)
178 m_IoStrand.post([this, message]() {
179 m_OutgoingMessagesQueue.emplace_back(message);
180 m_OutgoingMessagesQueued.Set();
184 void JsonRpcConnection::SendMessageInternal(const Dictionary::Ptr& message)
186 m_OutgoingMessagesQueue.emplace_back(JsonEncode(message));
187 m_OutgoingMessagesQueued.Set();
190 void JsonRpcConnection::Disconnect()
192 namespace asio = boost::asio;
194 JsonRpcConnection::Ptr keepAlive (this);
196 asio::spawn(m_IoStrand, [this, keepAlive](asio::yield_context yc) {
197 if (!m_ShuttingDown) {
198 m_ShuttingDown = true;
200 Log(LogWarning, "JsonRpcConnection")
201 << "API client disconnected for identity '" << m_Identity << "'";
204 CpuBoundWork removeClient (yc);
207 m_Endpoint->RemoveClient(this);
209 ApiListener::GetInstance()->RemoveAnonymousClient(this);
213 m_OutgoingMessagesQueued.Set();
215 m_WriterDone.Wait(yc);
218 m_Stream->next_layer().async_shutdown(yc);
223 m_Stream->lowest_layer().shutdown(m_Stream->lowest_layer().shutdown_both);
228 m_Stream->lowest_layer().cancel();
232 m_CheckLivenessTimer.cancel();
233 m_HeartbeatTimer.cancel();
238 void JsonRpcConnection::MessageHandler(const String& jsonString)
240 Dictionary::Ptr message = JsonRpc::DecodeMessage(jsonString);
242 if (m_Endpoint && message->Contains("ts")) {
243 double ts = message->Get("ts");
245 /* ignore old messages */
246 if (ts < m_Endpoint->GetRemoteLogPosition())
249 m_Endpoint->SetRemoteLogPosition(ts);
252 MessageOrigin::Ptr origin = new MessageOrigin();
253 origin->FromClient = this;
256 if (m_Endpoint->GetZone() != Zone::GetLocalZone())
257 origin->FromZone = m_Endpoint->GetZone();
259 origin->FromZone = Zone::GetByName(message->Get("originZone"));
261 m_Endpoint->AddMessageReceived(jsonString.GetLength());
266 if (!message->Get("method", &vmethod)) {
269 if (!message->Get("id", &vid))
272 Log(LogWarning, "JsonRpcConnection",
273 "We received a JSON-RPC response message. This should never happen because we're only ever sending notifications.");
278 String method = vmethod;
280 Log(LogNotice, "JsonRpcConnection")
281 << "Received '" << method << "' message from identity '" << m_Identity << "'.";
283 Dictionary::Ptr resultMessage = new Dictionary();
286 ApiFunction::Ptr afunc = ApiFunction::GetByName(method);
289 Log(LogNotice, "JsonRpcConnection")
290 << "Call to non-existent function '" << method << "' from endpoint '" << m_Identity << "'.";
292 Dictionary::Ptr params = message->Get("params");
294 resultMessage->Set("result", afunc->Invoke(origin, params));
296 resultMessage->Set("result", Empty);
298 } catch (const std::exception& ex) {
299 /* TODO: Add a user readable error message for the remote caller */
300 String diagInfo = DiagnosticInformation(ex);
301 resultMessage->Set("error", diagInfo);
302 Log(LogWarning, "JsonRpcConnection")
303 << "Error while processing message for identity '" << m_Identity << "'\n" << diagInfo;
306 if (message->Contains("id")) {
307 resultMessage->Set("jsonrpc", "2.0");
308 resultMessage->Set("id", message->Get("id"));
310 SendMessageInternal(resultMessage);
314 Value SetLogPositionHandler(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params)
316 double log_position = params->Get("log_position");
317 Endpoint::Ptr endpoint = origin->FromClient->GetEndpoint();
322 if (log_position > endpoint->GetLocalLogPosition())
323 endpoint->SetLocalLogPosition(log_position);
328 void JsonRpcConnection::CheckLiveness(boost::asio::yield_context yc)
330 boost::system::error_code ec;
333 m_CheckLivenessTimer.expires_from_now(boost::posix_time::seconds(30));
334 m_CheckLivenessTimer.async_wait(yc[ec]);
336 if (m_ShuttingDown) {
340 if (m_Seen < Utility::GetTime() - 60 && (!m_Endpoint || !m_Endpoint->GetSyncing())) {
341 Log(LogInformation, "JsonRpcConnection")
342 << "No messages for identity '" << m_Identity << "' have been received in the last 60 seconds.";
350 double JsonRpcConnection::GetWorkQueueRate()
352 return l_TaskStats.UpdateAndGetValues(Utility::GetTime(), 60) / 60.0;