]> granicus.if.org Git - icinga2/blob - lib/remote/apilistener.cpp
Rename C++ header files.
[icinga2] / lib / remote / apilistener.cpp
1 /******************************************************************************
2  * Icinga 2                                                                   *
3  * Copyright (C) 2012-2014 Icinga Development Team (http://www.icinga.org)    *
4  *                                                                            *
5  * This program is free software; you can redistribute it and/or              *
6  * modify it under the terms of the GNU General Public License                *
7  * as published by the Free Software Foundation; either version 2             *
8  * of the License, or (at your option) any later version.                     *
9  *                                                                            *
10  * This program is distributed in the hope that it will be useful,            *
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of             *
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the              *
13  * GNU General Public License for more details.                               *
14  *                                                                            *
15  * You should have received a copy of the GNU General Public License          *
16  * along with this program; if not, write to the Free Software Foundation     *
17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA.             *
18  ******************************************************************************/
19
20 #include "remote/apilistener.hpp"
21 #include "remote/apiclient.hpp"
22 #include "remote/endpoint.hpp"
23 #include "base/convert.hpp"
24 #include "base/netstring.hpp"
25 #include "base/dynamictype.hpp"
26 #include "base/logger_fwd.hpp"
27 #include "base/objectlock.hpp"
28 #include "base/stdiostream.hpp"
29 #include "base/application.hpp"
30 #include "base/context.hpp"
31 #include "base/statsfunction.hpp"
32 #include <fstream>
33
34 using namespace icinga;
35
36 REGISTER_TYPE(ApiListener);
37
38 boost::signals2::signal<void(bool)> ApiListener::OnMasterChanged;
39
40 REGISTER_STATSFUNCTION(ApiListenerStats, &ApiListener::StatsFunc);
41
42 void ApiListener::OnConfigLoaded(void)
43 {
44         /* set up SSL context */
45         shared_ptr<X509> cert = GetX509Certificate(GetCertPath());
46         SetIdentity(GetCertificateCN(cert));
47         Log(LogInformation, "remote", "My API identity: " + GetIdentity());
48
49         m_SSLContext = MakeSSLContext(GetCertPath(), GetKeyPath(), GetCaPath());
50
51         if (!GetCrlPath().IsEmpty())
52                 AddCRLToSSLContext(m_SSLContext, GetCrlPath());
53
54         if (!Endpoint::GetByName(GetIdentity()))
55                 BOOST_THROW_EXCEPTION(std::runtime_error("Endpoint object for '" + GetIdentity() + "' is missing."));
56
57         SyncZoneDirs();
58 }
59
60 /**
61  * Starts the component.
62  */
63 void ApiListener::Start(void)
64 {
65         if (std::distance(DynamicType::GetObjects<ApiListener>().first, DynamicType::GetObjects<ApiListener>().second) > 1)
66                 BOOST_THROW_EXCEPTION(std::runtime_error("Only one ApiListener object is allowed."));
67
68         DynamicObject::Start();
69
70         {
71                 boost::mutex::scoped_lock(m_LogLock);
72                 RotateLogFile();
73                 OpenLogFile();
74         }
75
76         /* create the primary JSON-RPC listener */
77         AddListener(GetBindPort());
78
79         m_Timer = make_shared<Timer>();
80         m_Timer->OnTimerExpired.connect(boost::bind(&ApiListener::ApiTimerHandler, this));
81         m_Timer->SetInterval(5);
82         m_Timer->Start();
83         m_Timer->Reschedule(0);
84
85         OnMasterChanged(true);
86 }
87
88 ApiListener::Ptr ApiListener::GetInstance(void)
89 {
90         BOOST_FOREACH(const ApiListener::Ptr& listener, DynamicType::GetObjects<ApiListener>())
91                 return listener;
92
93         return ApiListener::Ptr();
94 }
95
96 shared_ptr<SSL_CTX> ApiListener::GetSSLContext(void) const
97 {
98         return m_SSLContext;
99 }
100
101 Endpoint::Ptr ApiListener::GetMaster(void) const
102 {
103         Zone::Ptr zone = Zone::GetLocalZone();
104         std::vector<String> names;
105
106         BOOST_FOREACH(const Endpoint::Ptr& endpoint, zone->GetEndpoints())
107                 if (endpoint->IsConnected() || endpoint->GetName() == GetIdentity())
108                         names.push_back(endpoint->GetName());
109
110         std::sort(names.begin(), names.end());
111
112         return Endpoint::GetByName(*names.begin());
113 }
114
115 bool ApiListener::IsMaster(void) const
116 {
117         return GetMaster()->GetName() == GetIdentity();
118 }
119
120 /**
121  * Creates a new JSON-RPC listener on the specified port.
122  *
123  * @param service The port to listen on.
124  */
125 void ApiListener::AddListener(const String& service)
126 {
127         ObjectLock olock(this);
128
129         shared_ptr<SSL_CTX> sslContext = m_SSLContext;
130
131         if (!sslContext)
132                 BOOST_THROW_EXCEPTION(std::logic_error("SSL context is required for AddListener()"));
133
134         std::ostringstream s;
135         s << "Adding new listener: port " << service;
136         Log(LogInformation, "remote", s.str());
137
138         TcpSocket::Ptr server = make_shared<TcpSocket>();
139         server->Bind(service, AF_INET6);
140
141         boost::thread thread(boost::bind(&ApiListener::ListenerThreadProc, this, server));
142         thread.detach();
143
144         m_Servers.insert(server);
145 }
146
147 void ApiListener::ListenerThreadProc(const Socket::Ptr& server)
148 {
149         Utility::SetThreadName("API Listener");
150
151         server->Listen();
152
153         for (;;) {
154                 Socket::Ptr client = server->Accept();
155
156                 Utility::QueueAsyncCallback(boost::bind(&ApiListener::NewClientHandler, this, client, RoleServer));
157         }
158 }
159
160 /**
161  * Creates a new JSON-RPC client and connects to the specified host and port.
162  *
163  * @param node The remote host.
164  * @param service The remote port.
165  */
166 void ApiListener::AddConnection(const String& node, const String& service)
167 {
168         {
169                 ObjectLock olock(this);
170
171                 shared_ptr<SSL_CTX> sslContext = m_SSLContext;
172
173                 if (!sslContext)
174                         BOOST_THROW_EXCEPTION(std::logic_error("SSL context is required for AddConnection()"));
175         }
176
177         TcpSocket::Ptr client = make_shared<TcpSocket>();
178
179         try {
180                 client->Connect(node, service);
181                 Utility::QueueAsyncCallback(boost::bind(&ApiListener::NewClientHandler, this, client, RoleClient));
182         } catch (const std::exception& ex) {
183                 std::ostringstream info, debug;
184                 info << "Cannot connect to host '" << node << "' on port '" << service << "'";
185                 debug << info.str() << std::endl << DiagnosticInformation(ex);
186                 Log(LogCritical, "remote", info.str());
187                 Log(LogDebug, "remote", debug.str());
188         }
189 }
190
191 /**
192  * Processes a new client connection.
193  *
194  * @param client The new client.
195  */
196 void ApiListener::NewClientHandler(const Socket::Ptr& client, ConnectionRole role)
197 {
198         CONTEXT("Handling new API client connection");
199
200         TlsStream::Ptr tlsStream;
201
202         {
203                 ObjectLock olock(this);
204                 tlsStream = make_shared<TlsStream>(client, role, m_SSLContext);
205         }
206
207         tlsStream->Handshake();
208
209         shared_ptr<X509> cert = tlsStream->GetPeerCertificate();
210         String identity = GetCertificateCN(cert);
211
212         Log(LogInformation, "remote", "New client connection for identity '" + identity + "'");
213
214         Endpoint::Ptr endpoint = Endpoint::GetByName(identity);
215
216         bool need_sync = false;
217
218         if (endpoint)
219                 need_sync = !endpoint->IsConnected();
220
221         ApiClient::Ptr aclient = make_shared<ApiClient>(identity, tlsStream, role);
222         aclient->Start();
223
224         if (endpoint) {
225                 if (need_sync) {
226                         {
227                                 ObjectLock olock(endpoint);
228
229                                 endpoint->SetSyncing(true);
230                         }
231
232                         ReplayLog(aclient);
233                 }
234
235                 SendConfigUpdate(aclient);
236
237                 endpoint->AddClient(aclient);
238         } else
239                 AddAnonymousClient(aclient);
240 }
241
242 void ApiListener::ApiTimerHandler(void)
243 {
244         double now = Utility::GetTime();
245
246         std::vector<int> files;
247         Utility::Glob(GetApiDir() + "log/*", boost::bind(&ApiListener::LogGlobHandler, boost::ref(files), _1), GlobFile);
248         std::sort(files.begin(), files.end());
249
250         BOOST_FOREACH(int ts, files) {
251                 bool need = false;
252
253                 BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects<Endpoint>()) {
254                         if (endpoint->GetName() == GetIdentity())
255                                 continue;
256
257                         if (endpoint->GetLogDuration() >= 0 && ts < now - endpoint->GetLogDuration())
258                                 continue;
259
260                         if (ts > endpoint->GetLocalLogPosition()) {
261                                 need = true;
262                                 break;
263                         }
264                 }
265
266                 if (!need) {
267                         String path = GetApiDir() + "log/" + Convert::ToString(ts);
268                         Log(LogNotice, "remote", "Removing old log file: " + path);
269                         (void)unlink(path.CStr());
270                 }
271         }
272
273         if (IsMaster()) {
274                 Zone::Ptr my_zone = Zone::GetLocalZone();
275
276                 BOOST_FOREACH(const Zone::Ptr& zone, DynamicType::GetObjects<Zone>()) {
277                         /* only connect to endpoints in a) the same zone b) our parent zone c) immediate child zones */
278                         if (my_zone != zone && my_zone != zone->GetParent() && zone != my_zone->GetParent())
279                                 continue;
280
281                         bool connected = false;
282
283                         BOOST_FOREACH(const Endpoint::Ptr& endpoint, zone->GetEndpoints()) {
284                                 if (endpoint->IsConnected()) {
285                                         connected = true;
286                                         break;
287                                 }
288                         }
289
290                         /* don't connect to an endpoint if we already have a connection to the zone */
291                         if (connected)
292                                 continue;
293
294                         BOOST_FOREACH(const Endpoint::Ptr& endpoint, zone->GetEndpoints()) {
295                                 /* don't connect to ourselves */
296                                 if (endpoint->GetName() == GetIdentity())
297                                         continue;
298
299                                 /* don't try to connect to endpoints which don't have a host and port */
300                                 if (endpoint->GetHost().IsEmpty() || endpoint->GetPort().IsEmpty())
301                                         continue;
302
303                                 AddConnection(endpoint->GetHost(), endpoint->GetPort());
304                         }
305                 }
306         }
307
308         BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects<Endpoint>()) {
309                 if (!endpoint->IsConnected())
310                         continue;
311
312                 double ts = endpoint->GetRemoteLogPosition();
313
314                 if (ts == 0)
315                         continue;
316
317                 Dictionary::Ptr lparams = make_shared<Dictionary>();
318                 lparams->Set("log_position", ts);
319
320                 Dictionary::Ptr lmessage = make_shared<Dictionary>();
321                 lmessage->Set("jsonrpc", "2.0");
322                 lmessage->Set("method", "log::SetLogPosition");
323                 lmessage->Set("params", lparams);
324
325                 BOOST_FOREACH(const ApiClient::Ptr& client, endpoint->GetClients())
326                         client->SendMessage(lmessage);
327
328                 Log(LogNotice, "remote", "Setting log position for identity '" + endpoint->GetName() + "': " +
329                         Utility::FormatDateTime("%Y/%m/%d %H:%M:%S", ts));
330         }
331
332         Log(LogNotice, "remote", "Current zone master: " + GetMaster()->GetName());
333
334         std::vector<String> names;
335         BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects<Endpoint>())
336                 if (endpoint->IsConnected())
337                         names.push_back(endpoint->GetName() + " (" + Convert::ToString(endpoint->GetClients().size()) + ")");
338
339         Log(LogNotice, "remote", "Connected endpoints: " + Utility::NaturalJoin(names));
340 }
341
342 void ApiListener::RelayMessage(const MessageOrigin& origin, const DynamicObject::Ptr& secobj, const Dictionary::Ptr& message, bool log)
343 {
344         m_RelayQueue.Enqueue(boost::bind(&ApiListener::SyncRelayMessage, this, origin, secobj, message, log));
345 }
346
347 void ApiListener::PersistMessage(const Dictionary::Ptr& message)
348 {
349         double ts = message->Get("ts");
350
351         ASSERT(ts != 0);
352
353         Dictionary::Ptr pmessage = make_shared<Dictionary>();
354         pmessage->Set("timestamp", ts);
355
356         pmessage->Set("message", JsonSerialize(message));
357
358         boost::mutex::scoped_lock lock(m_LogLock);
359         if (m_LogFile) {
360                 NetString::WriteStringToStream(m_LogFile, JsonSerialize(pmessage));
361                 m_LogMessageCount++;
362                 SetLogMessageTimestamp(ts);
363
364                 if (m_LogMessageCount > 50000) {
365                         CloseLogFile();
366                         RotateLogFile();
367                         OpenLogFile();
368                 }
369         }
370 }
371
372 void ApiListener::SyncRelayMessage(const MessageOrigin& origin, const DynamicObject::Ptr& secobj, const Dictionary::Ptr& message, bool log)
373 {
374         double ts = Utility::GetTime();
375         message->Set("ts", ts);
376
377         Log(LogNotice, "remote", "Relaying '" + message->Get("method") + "' message");
378
379         if (log)
380                 m_LogQueue.Enqueue(boost::bind(&ApiListener::PersistMessage, this, message));
381
382         if (origin.FromZone)
383                 message->Set("originZone", origin.FromZone->GetName());
384
385         bool is_master = IsMaster();
386         Endpoint::Ptr master = GetMaster();
387         Zone::Ptr my_zone = Zone::GetLocalZone();
388
389         std::vector<Endpoint::Ptr> skippedEndpoints;
390         std::set<Zone::Ptr> finishedZones;
391
392         BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects<Endpoint>()) {
393                 /* don't relay messages to ourselves or disconnected endpoints */
394                 if (endpoint->GetName() == GetIdentity() || !endpoint->IsConnected())
395                         continue;
396
397                 Zone::Ptr target_zone = endpoint->GetZone();
398
399                 /* don't relay the message to the zone through more than one endpoint */
400                 if (finishedZones.find(target_zone) != finishedZones.end()) {
401                         skippedEndpoints.push_back(endpoint);
402                         continue;
403                 }
404
405                 /* don't relay messages back to the endpoint which we got the message from */
406                 if (origin.FromClient && endpoint == origin.FromClient->GetEndpoint()) {
407                         skippedEndpoints.push_back(endpoint);
408                         continue;
409                 }
410
411                 /* don't relay messages back to the zone which we got the message from */
412                 if (origin.FromZone && target_zone == origin.FromZone) {
413                         skippedEndpoints.push_back(endpoint);
414                         continue;
415                 }
416
417                 /* only relay message to the master if we're not currently the master */
418                 if (!is_master && master != endpoint) {
419                         skippedEndpoints.push_back(endpoint);
420                         continue;
421                 }
422
423                 /* only relay the message to a) the same zone, b) the parent zone and c) direct child zones */
424                 if (target_zone != my_zone && target_zone != my_zone->GetParent() &&
425                     secobj->GetZone() != target_zone->GetName()) {
426                         skippedEndpoints.push_back(endpoint);
427                         continue;
428                 }
429
430                 /* only relay messages to zones which have access to the object */
431                 if (!target_zone->CanAccessObject(secobj))
432                         continue;
433
434                 finishedZones.insert(target_zone);
435
436                 {
437                         ObjectLock olock(endpoint);
438
439                         if (!endpoint->GetSyncing()) {
440                                 Log(LogNotice, "remote", "Sending message to '" + endpoint->GetName() + "'");
441
442                                 BOOST_FOREACH(const ApiClient::Ptr& client, endpoint->GetClients())
443                                         client->SendMessage(message);
444                         }
445                 }
446         }
447
448         BOOST_FOREACH(const Endpoint::Ptr& endpoint, skippedEndpoints)
449                 endpoint->SetLocalLogPosition(ts);
450 }
451
452 String ApiListener::GetApiDir(void)
453 {
454         return Application::GetLocalStateDir() + "/lib/icinga2/api/";
455 }
456
457 /* must hold m_LogLock */
458 void ApiListener::OpenLogFile(void)
459 {
460         String path = GetApiDir() + "log/current";
461
462         std::fstream *fp = new std::fstream(path.CStr(), std::fstream::out | std::ofstream::app);
463
464         if (!fp->good()) {
465                 Log(LogWarning, "cluster", "Could not open spool file: " + path);
466                 return;
467         }
468
469         m_LogFile = make_shared<StdioStream>(fp, true);
470         m_LogMessageCount = 0;
471         SetLogMessageTimestamp(Utility::GetTime());
472 }
473
474 /* must hold m_LogLock */
475 void ApiListener::CloseLogFile(void)
476 {
477         if (!m_LogFile)
478                 return;
479
480         m_LogFile->Close();
481         m_LogFile.reset();
482 }
483
484 /* must hold m_LogLock */
485 void ApiListener::RotateLogFile(void)
486 {
487         double ts = GetLogMessageTimestamp();
488
489         if (ts == 0)
490                 ts = Utility::GetTime();
491
492         String oldpath = GetApiDir() + "log/current";
493         String newpath = GetApiDir() + "log/" + Convert::ToString(static_cast<int>(ts)+1);
494         (void) rename(oldpath.CStr(), newpath.CStr());
495 }
496
497 void ApiListener::LogGlobHandler(std::vector<int>& files, const String& file)
498 {
499         String name = Utility::BaseName(file);
500
501         int ts;
502
503         try {
504                 ts = Convert::ToLong(name);
505         }
506         catch (const std::exception&) {
507                 return;
508         }
509
510         files.push_back(ts);
511 }
512
513 void ApiListener::ReplayLog(const ApiClient::Ptr& client)
514 {
515         Endpoint::Ptr endpoint = client->GetEndpoint();
516
517         CONTEXT("Replaying log for Endpoint '" + endpoint->GetName() + "'");
518
519         int count = -1;
520         double peer_ts = endpoint->GetLocalLogPosition();
521         bool last_sync = false;
522
523         for (;;) {
524                 boost::mutex::scoped_lock lock(m_LogLock);
525
526                 CloseLogFile();
527                 RotateLogFile();
528
529                 if (count == -1 || count > 50000) {
530                         OpenLogFile();
531                         lock.unlock();
532                 } else {
533                         last_sync = true;
534                 }
535
536                 count = 0;
537
538                 std::vector<int> files;
539                 Utility::Glob(GetApiDir() + "log/*", boost::bind(&ApiListener::LogGlobHandler, boost::ref(files), _1), GlobFile);
540                 std::sort(files.begin(), files.end());
541
542                 BOOST_FOREACH(int ts, files) {
543                         String path = GetApiDir() + "log/" + Convert::ToString(ts);
544
545                         if (ts < peer_ts)
546                                 continue;
547
548                         Log(LogNotice, "remote", "Replaying log: " + path);
549
550                         std::fstream *fp = new std::fstream(path.CStr(), std::fstream::in);
551                         StdioStream::Ptr logStream = make_shared<StdioStream>(fp, true);
552
553                         String message;
554                         while (true) {
555                                 Dictionary::Ptr pmessage;
556
557                                 try {
558                                         if (!NetString::ReadStringFromStream(logStream, &message))
559                                                 break;
560
561                                         pmessage = JsonDeserialize(message);
562                                 } catch (const std::exception&) {
563                                         Log(LogWarning, "remote", "Unexpected end-of-file for cluster log: " + path);
564
565                                         /* Log files may be incomplete or corrupted. This is perfectly OK. */
566                                         break;
567                                 }
568
569                                 if (pmessage->Get("timestamp") <= peer_ts)
570                                         continue;
571
572                                 NetString::WriteStringToStream(client->GetStream(), pmessage->Get("message"));
573                                 count++;
574
575                                 peer_ts = pmessage->Get("timestamp");
576                         }
577
578                         logStream->Close();
579                 }
580
581                 Log(LogNotice, "remote", "Replayed " + Convert::ToString(count) + " messages.");
582
583                 if (last_sync) {
584                         {
585                                 ObjectLock olock2(endpoint);
586                                 endpoint->SetSyncing(false);
587                         }
588
589                         OpenLogFile();
590
591                         break;
592                 }
593         }
594 }
595
596 Value ApiListener::StatsFunc(Dictionary::Ptr& status, Dictionary::Ptr& perfdata)
597 {
598         Dictionary::Ptr nodes = make_shared<Dictionary>();
599         std::pair<Dictionary::Ptr, Dictionary::Ptr> stats;
600
601         ApiListener::Ptr listener = ApiListener::GetInstance();
602
603         if (!listener)
604                 return 0;
605
606         stats = listener->GetStatus();
607
608         BOOST_FOREACH(Dictionary::Pair const& kv, stats.second)
609                 perfdata->Set("api_" + kv.first, kv.second);
610
611         status->Set("api", stats.first);
612
613         return 0;
614 }
615
616 std::pair<Dictionary::Ptr, Dictionary::Ptr> ApiListener::GetStatus(void)
617 {
618         Dictionary::Ptr status = make_shared<Dictionary>();
619         Dictionary::Ptr perfdata = make_shared<Dictionary>();
620
621         /* cluster stats */
622         status->Set("identity", GetIdentity());
623
624         double count_endpoints = 0;
625         Array::Ptr not_connected_endpoints = make_shared<Array>();
626         Array::Ptr connected_endpoints = make_shared<Array>();
627
628         BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects<Endpoint>()) {
629                 if (endpoint->GetName() == GetIdentity())
630                         continue;
631
632                 count_endpoints++;
633
634                 if (!endpoint->IsConnected())
635                         not_connected_endpoints->Add(endpoint->GetName());
636                 else
637                         connected_endpoints->Add(endpoint->GetName());
638         }
639
640         status->Set("num_endpoints", count_endpoints);
641         status->Set("num_conn_endpoints", connected_endpoints->GetLength());
642         status->Set("num_not_conn_endpoints", not_connected_endpoints->GetLength());
643         status->Set("conn_endpoints", connected_endpoints);
644         status->Set("not_conn_endpoints", not_connected_endpoints);
645
646         perfdata->Set("num_endpoints", count_endpoints);
647         perfdata->Set("num_conn_endpoints", Convert::ToDouble(connected_endpoints->GetLength()));
648         perfdata->Set("num_not_conn_endpoints", Convert::ToDouble(not_connected_endpoints->GetLength()));
649
650         return std::make_pair(status, perfdata);
651 }
652
653 void ApiListener::AddAnonymousClient(const ApiClient::Ptr& aclient)
654 {
655         ObjectLock olock(this);
656         m_AnonymousClients.insert(aclient);
657 }
658
659 void ApiListener::RemoveAnonymousClient(const ApiClient::Ptr& aclient)
660 {
661         ObjectLock olock(this);
662         m_AnonymousClients.erase(aclient);
663 }
664
665 std::set<ApiClient::Ptr> ApiListener::GetAnonymousClients(void) const
666 {
667         ObjectLock olock(this);
668         return m_AnonymousClients;
669 }