]> granicus.if.org Git - icinga2/blob - lib/remote/jsonrpcconnection.cpp
3c775f68d7602fba3ae94385a43bb79272f6e21e
[icinga2] / lib / remote / jsonrpcconnection.cpp
1 /* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */
2
3 #include "remote/jsonrpcconnection.hpp"
4 #include "remote/apilistener.hpp"
5 #include "remote/apifunction.hpp"
6 #include "remote/jsonrpc.hpp"
7 #include "base/defer.hpp"
8 #include "base/configtype.hpp"
9 #include "base/io-engine.hpp"
10 #include "base/json.hpp"
11 #include "base/objectlock.hpp"
12 #include "base/utility.hpp"
13 #include "base/logger.hpp"
14 #include "base/exception.hpp"
15 #include "base/convert.hpp"
16 #include "base/tlsstream.hpp"
17 #include <memory>
18 #include <utility>
19 #include <boost/asio/io_service.hpp>
20 #include <boost/asio/spawn.hpp>
21 #include <boost/date_time/posix_time/posix_time_duration.hpp>
22 #include <boost/system/system_error.hpp>
23 #include <boost/thread/once.hpp>
24
25 using namespace icinga;
26
27 static Value SetLogPositionHandler(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params);
28 REGISTER_APIFUNCTION(SetLogPosition, log, &SetLogPositionHandler);
29
30 static RingBuffer l_TaskStats (15 * 60);
31
32 JsonRpcConnection::JsonRpcConnection(const String& identity, bool authenticated,
33         const std::shared_ptr<AsioTlsStream>& stream, ConnectionRole role)
34         : JsonRpcConnection(identity, authenticated, stream, role, IoEngine::Get().GetIoService())
35 {
36 }
37
38 JsonRpcConnection::JsonRpcConnection(const String& identity, bool authenticated,
39         const std::shared_ptr<AsioTlsStream>& stream, ConnectionRole role, boost::asio::io_service& io)
40         : m_Identity(identity), m_Authenticated(authenticated), m_Stream(stream), m_Role(role),
41         m_Timestamp(Utility::GetTime()), m_Seen(Utility::GetTime()), m_NextHeartbeat(0), m_IoStrand(io),
42         m_OutgoingMessagesQueued(io), m_WriterDone(io), m_ShuttingDown(false),
43         m_CheckLivenessTimer(io), m_HeartbeatTimer(io)
44 {
45         if (authenticated)
46                 m_Endpoint = Endpoint::GetByName(identity);
47 }
48
49 void JsonRpcConnection::Start()
50 {
51         namespace asio = boost::asio;
52
53         JsonRpcConnection::Ptr keepAlive (this);
54
55         asio::spawn(m_IoStrand, [this, keepAlive](asio::yield_context yc) { HandleIncomingMessages(yc); });
56         asio::spawn(m_IoStrand, [this, keepAlive](asio::yield_context yc) { WriteOutgoingMessages(yc); });
57         asio::spawn(m_IoStrand, [this, keepAlive](asio::yield_context yc) { HandleAndWriteHeartbeats(yc); });
58         asio::spawn(m_IoStrand, [this, keepAlive](asio::yield_context yc) { CheckLiveness(yc); });
59 }
60
61 void JsonRpcConnection::HandleIncomingMessages(boost::asio::yield_context yc)
62 {
63         Defer disconnect ([this]() { Disconnect(); });
64
65         for (;;) {
66                 String message;
67
68                 try {
69                         message = JsonRpc::ReadMessage(m_Stream, yc, m_Endpoint ? -1 : 1024 * 1024);
70                 } catch (const std::exception& ex) {
71                         if (!m_ShuttingDown) {
72                                 Log(LogNotice, "JsonRpcConnection")
73                                         << "Error while reading JSON-RPC message for identity '" << m_Identity
74                                         << "': " << DiagnosticInformation(ex);
75                         }
76
77                         break;
78                 }
79
80                 m_Seen = Utility::GetTime();
81
82                 try {
83                         CpuBoundWork handleMessage (yc);
84
85                         MessageHandler(message);
86                 } catch (const std::exception& ex) {
87                         if (!m_ShuttingDown) {
88                                 Log(LogWarning, "JsonRpcConnection")
89                                         << "Error while processing JSON-RPC message for identity '" << m_Identity
90                                         << "': " << DiagnosticInformation(ex);
91                         }
92
93                         break;
94                 }
95
96                 CpuBoundWork taskStats (yc);
97
98                 l_TaskStats.InsertValue(Utility::GetTime(), 1);
99         }
100 }
101
102 void JsonRpcConnection::WriteOutgoingMessages(boost::asio::yield_context yc)
103 {
104         Defer disconnect ([this]() { Disconnect(); });
105
106         Defer signalWriterDone ([this]() { m_WriterDone.Set(); });
107
108         do {
109                 m_OutgoingMessagesQueued.Wait(yc);
110
111                 auto queue (std::move(m_OutgoingMessagesQueue));
112
113                 m_OutgoingMessagesQueue.clear();
114                 m_OutgoingMessagesQueued.Clear();
115
116                 if (!queue.empty()) {
117                         try {
118                                 for (auto& message : queue) {
119                                         size_t bytesSent = JsonRpc::SendRawMessage(m_Stream, message, yc);
120
121                                         if (m_Endpoint) {
122                                                 m_Endpoint->AddMessageSent(bytesSent);
123                                         }
124                                 }
125
126                                 m_Stream->async_flush(yc);
127                         } catch (const std::exception& ex) {
128                                 if (!m_ShuttingDown) {
129                                         std::ostringstream info;
130                                         info << "Error while sending JSON-RPC message for identity '" << m_Identity << "'";
131                                         Log(LogWarning, "JsonRpcConnection")
132                                                 << info.str() << "\n" << DiagnosticInformation(ex);
133                                 }
134
135                                 break;
136                         }
137                 }
138         } while (!m_ShuttingDown);
139 }
140
141 double JsonRpcConnection::GetTimestamp() const
142 {
143         return m_Timestamp;
144 }
145
146 String JsonRpcConnection::GetIdentity() const
147 {
148         return m_Identity;
149 }
150
151 bool JsonRpcConnection::IsAuthenticated() const
152 {
153         return m_Authenticated;
154 }
155
156 Endpoint::Ptr JsonRpcConnection::GetEndpoint() const
157 {
158         return m_Endpoint;
159 }
160
161 std::shared_ptr<AsioTlsStream> JsonRpcConnection::GetStream() const
162 {
163         return m_Stream;
164 }
165
166 ConnectionRole JsonRpcConnection::GetRole() const
167 {
168         return m_Role;
169 }
170
171 void JsonRpcConnection::SendMessage(const Dictionary::Ptr& message)
172 {
173         m_IoStrand.post([this, message]() { SendMessageInternal(message); });
174 }
175
176 void JsonRpcConnection::SendRawMessage(const String& message)
177 {
178         m_IoStrand.post([this, message]() {
179                 m_OutgoingMessagesQueue.emplace_back(message);
180                 m_OutgoingMessagesQueued.Set();
181         });
182 }
183
184 void JsonRpcConnection::SendMessageInternal(const Dictionary::Ptr& message)
185 {
186         m_OutgoingMessagesQueue.emplace_back(JsonEncode(message));
187         m_OutgoingMessagesQueued.Set();
188 }
189
190 void JsonRpcConnection::Disconnect()
191 {
192         namespace asio = boost::asio;
193
194         JsonRpcConnection::Ptr keepAlive (this);
195
196         asio::spawn(m_IoStrand, [this, keepAlive](asio::yield_context yc) {
197                 if (!m_ShuttingDown) {
198                         m_ShuttingDown = true;
199
200                         Log(LogWarning, "JsonRpcConnection")
201                                 << "API client disconnected for identity '" << m_Identity << "'";
202
203                         {
204                                 CpuBoundWork removeClient (yc);
205
206                                 if (m_Endpoint) {
207                                         m_Endpoint->RemoveClient(this);
208                                 } else {
209                                         ApiListener::GetInstance()->RemoveAnonymousClient(this);
210                                 }
211                         }
212
213                         m_OutgoingMessagesQueued.Set();
214
215                         m_WriterDone.Wait(yc);
216
217                         try {
218                                 m_Stream->next_layer().async_shutdown(yc);
219                         } catch (...) {
220                         }
221
222                         try {
223                                 m_Stream->lowest_layer().shutdown(m_Stream->lowest_layer().shutdown_both);
224                         } catch (...) {
225                         }
226
227                         try {
228                                 m_Stream->lowest_layer().cancel();
229                         } catch (...) {
230                         }
231
232                         m_CheckLivenessTimer.cancel();
233                         m_HeartbeatTimer.cancel();
234                 }
235         });
236 }
237
238 void JsonRpcConnection::MessageHandler(const String& jsonString)
239 {
240         Dictionary::Ptr message = JsonRpc::DecodeMessage(jsonString);
241
242         if (m_Endpoint && message->Contains("ts")) {
243                 double ts = message->Get("ts");
244
245                 /* ignore old messages */
246                 if (ts < m_Endpoint->GetRemoteLogPosition())
247                         return;
248
249                 m_Endpoint->SetRemoteLogPosition(ts);
250         }
251
252         MessageOrigin::Ptr origin = new MessageOrigin();
253         origin->FromClient = this;
254
255         if (m_Endpoint) {
256                 if (m_Endpoint->GetZone() != Zone::GetLocalZone())
257                         origin->FromZone = m_Endpoint->GetZone();
258                 else
259                         origin->FromZone = Zone::GetByName(message->Get("originZone"));
260
261                 m_Endpoint->AddMessageReceived(jsonString.GetLength());
262         }
263
264         Value vmethod;
265
266         if (!message->Get("method", &vmethod)) {
267                 Value vid;
268
269                 if (!message->Get("id", &vid))
270                         return;
271
272                 Log(LogWarning, "JsonRpcConnection",
273                         "We received a JSON-RPC response message. This should never happen because we're only ever sending notifications.");
274
275                 return;
276         }
277
278         String method = vmethod;
279
280         Log(LogNotice, "JsonRpcConnection")
281                 << "Received '" << method << "' message from identity '" << m_Identity << "'.";
282
283         Dictionary::Ptr resultMessage = new Dictionary();
284
285         try {
286                 ApiFunction::Ptr afunc = ApiFunction::GetByName(method);
287
288                 if (!afunc) {
289                         Log(LogNotice, "JsonRpcConnection")
290                                 << "Call to non-existent function '" << method << "' from endpoint '" << m_Identity << "'.";
291                 } else {
292                         Dictionary::Ptr params = message->Get("params");
293                         if (params)
294                                 resultMessage->Set("result", afunc->Invoke(origin, params));
295                         else
296                                 resultMessage->Set("result", Empty);
297                 }
298         } catch (const std::exception& ex) {
299                 /* TODO: Add a user readable error message for the remote caller */
300                 String diagInfo = DiagnosticInformation(ex);
301                 resultMessage->Set("error", diagInfo);
302                 Log(LogWarning, "JsonRpcConnection")
303                         << "Error while processing message for identity '" << m_Identity << "'\n" << diagInfo;
304         }
305
306         if (message->Contains("id")) {
307                 resultMessage->Set("jsonrpc", "2.0");
308                 resultMessage->Set("id", message->Get("id"));
309
310                 SendMessageInternal(resultMessage);
311         }
312 }
313
314 Value SetLogPositionHandler(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params)
315 {
316         double log_position = params->Get("log_position");
317         Endpoint::Ptr endpoint = origin->FromClient->GetEndpoint();
318
319         if (!endpoint)
320                 return Empty;
321
322         if (log_position > endpoint->GetLocalLogPosition())
323                 endpoint->SetLocalLogPosition(log_position);
324
325         return Empty;
326 }
327
328 void JsonRpcConnection::CheckLiveness(boost::asio::yield_context yc)
329 {
330         boost::system::error_code ec;
331
332         for (;;) {
333                 m_CheckLivenessTimer.expires_from_now(boost::posix_time::seconds(30));
334                 m_CheckLivenessTimer.async_wait(yc[ec]);
335
336                 if (m_ShuttingDown) {
337                         break;
338                 }
339
340                 if (m_Seen < Utility::GetTime() - 60 && (!m_Endpoint || !m_Endpoint->GetSyncing())) {
341                         Log(LogInformation, "JsonRpcConnection")
342                                 <<  "No messages for identity '" << m_Identity << "' have been received in the last 60 seconds.";
343
344                         Disconnect();
345                         break;
346                 }
347         }
348 }
349
350 double JsonRpcConnection::GetWorkQueueRate()
351 {
352         return l_TaskStats.UpdateAndGetValues(Utility::GetTime(), 60) / 60.0;
353 }