]> granicus.if.org Git - icinga2/blob - lib/redis/rediswriter.cpp
7cacd04ffb957dd4e91569f9052682adfc451c7f
[icinga2] / lib / redis / rediswriter.cpp
1 /******************************************************************************
2  * Icinga 2                                                                   *
3  * Copyright (C) 2012-2017 Icinga Development Team (https://www.icinga.com/)  *
4  *                                                                            *
5  * This program is free software; you can redistribute it and/or              *
6  * modify it under the terms of the GNU General Public License                *
7  * as published by the Free Software Foundation; either version 2             *
8  * of the License, or (at your option) any later version.                     *
9  *                                                                            *
10  * This program is distributed in the hope that it will be useful,            *
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of             *
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the              *
13  * GNU General Public License for more details.                               *
14  *                                                                            *
15  * You should have received a copy of the GNU General Public License          *
16  * along with this program; if not, write to the Free Software Foundation     *
17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA.             *
18  ******************************************************************************/
19
20 #include "redis/rediswriter.hpp"
21 #include "redis/rediswriter.tcpp"
22 #include "remote/eventqueue.hpp"
23 #include "base/json.hpp"
24
25 using namespace icinga;
26
27 REGISTER_TYPE(RedisWriter);
28
29 RedisWriter::RedisWriter(void)
30     : m_Context(NULL)
31 { }
32
33 /**
34  * Starts the component.
35  */
36 void RedisWriter::Start(bool runtimeCreated)
37 {
38         ObjectImpl<RedisWriter>::Start(runtimeCreated);
39
40         Log(LogInformation, "RedisWriter")
41             << "'" << GetName() << "' started.";
42
43         m_ReconnectTimer = new Timer();
44         m_ReconnectTimer->SetInterval(15);
45         m_ReconnectTimer->OnTimerExpired.connect(boost::bind(&RedisWriter::ReconnectTimerHandler, this));
46         m_ReconnectTimer->Start();
47         m_ReconnectTimer->Reschedule(0);
48
49         m_SubscriptionTimer = new Timer();
50         m_SubscriptionTimer->SetInterval(15);
51         m_SubscriptionTimer->OnTimerExpired.connect(boost::bind(&RedisWriter::UpdateSubscriptionsTimerHandler, this));
52         m_SubscriptionTimer->Start();
53
54         boost::thread thread(boost::bind(&RedisWriter::HandleEvents, this));
55         thread.detach();
56 }
57
58 void RedisWriter::ReconnectTimerHandler(void)
59 {
60         m_WorkQueue.Enqueue(boost::bind(&RedisWriter::TryToReconnect, this));
61 }
62
63 void RedisWriter::TryToReconnect(void)
64 {
65         if (m_Context)
66                 return;
67
68         String path = GetPath();
69         String host = GetHost();
70
71         Log(LogInformation, "RedisWriter", "Trying to connect to redis server");
72
73         if (path.IsEmpty())
74                 m_Context = redisConnect(host.CStr(), GetPort());
75         else
76                 m_Context = redisConnectUnix(path.CStr());
77
78         if (!m_Context || m_Context->err) {
79                 if (!m_Context) {
80                         Log(LogWarning, "RedisWriter", "Cannot allocate redis context.");
81                 } else {
82                         Log(LogWarning, "RedisWriter", "Connection error: ")
83                             << m_Context->errstr;
84                 }
85
86                 if (m_Context) {
87                         redisFree(m_Context);
88                         m_Context = NULL;
89                 }
90
91                 return;
92         }
93
94         String password = GetPassword();
95
96         if (!password.IsEmpty()) {
97                 redisReply *reply = reinterpret_cast<redisReply *>(redisCommand(m_Context, "AUTH %s", password.CStr()));
98
99                 if (!reply) {
100                         redisFree(m_Context);
101                         m_Context = NULL;
102                         return;
103                 }
104
105                 if (reply->type == REDIS_REPLY_STATUS || reply->type == REDIS_REPLY_ERROR) {
106                         Log(LogInformation, "RedisWriter")
107                             << "AUTH: " << reply->str;
108                 }
109
110                 freeReplyObject(reply);
111         }
112 }
113
114 void RedisWriter::UpdateSubscriptionsTimerHandler(void)
115 {
116         m_WorkQueue.Enqueue(boost::bind(&RedisWriter::UpdateSubscriptions, this));
117 }
118
119 void RedisWriter::UpdateSubscriptions(void)
120 {
121         if (!m_Context)
122                 return;
123
124         Log(LogInformation, "RedisWriter", "Updating Redis subscriptions");
125
126         std::map<String, String> subscriptions;
127         long long cursor = 0;
128
129         do {
130                 redisReply *reply = reinterpret_cast<redisReply *>(redisCommand(m_Context, "SCAN %lld MATCH icinga:subscription:* COUNT 1000", cursor));
131
132                 if (!reply) {
133                         redisFree(m_Context);
134                         m_Context = NULL;
135                         return;
136                 }
137
138                 if (reply->type == REDIS_REPLY_STATUS || reply->type == REDIS_REPLY_ERROR) {
139                         Log(LogInformation, "RedisWriter")
140                             << "SCAN " << cursor << " MATCH icinga:subscription:* COUNT 1000: " << reply->str;
141                 }
142
143                 VERIFY(reply->type == REDIS_REPLY_ARRAY);
144                 VERIFY(reply->elements % 2 == 0);
145
146                 redisReply *cursorReply = reply->element[0];
147                 cursor = Convert::ToLong(cursorReply->str);
148
149                 redisReply *keysReply = reply->element[1];
150
151                 for (size_t i = 0; i < keysReply->elements; i++) {
152                         redisReply *keyReply = keysReply->element[i];
153                         VERIFY(keyReply->type == REDIS_REPLY_STRING);
154
155                         redisReply *vreply = reinterpret_cast<redisReply *>(redisCommand(m_Context, "GET %s", keyReply->str));
156
157                         if (!vreply) {
158                                 freeReplyObject(reply);
159                                 redisFree(m_Context);
160                                 m_Context = NULL;
161                                 return;
162                         }
163
164                         if (vreply->type == REDIS_REPLY_STATUS || vreply->type == REDIS_REPLY_ERROR) {
165                                 Log(LogInformation, "RedisWriter")
166                                     << "GET " << keyReply->str << ": " << vreply->str;
167                         }
168
169                         subscriptions[keyReply->str] = vreply->str;
170
171                         freeReplyObject(vreply);
172                 }
173
174                 freeReplyObject(reply);
175         } while (cursor != 0);
176
177         m_Subscriptions.clear();
178
179         for (const std::pair<String, String>& kv : subscriptions) {
180                 const String& key = kv.first.SubStr(20); /* removes the "icinga:subscription: prefix */
181                 const String& value = kv.second;
182
183                 try {
184                         Dictionary::Ptr subscriptionInfo = JsonDecode(value);
185
186                         Log(LogInformation, "RedisWriter")
187                             << "Subscriber Info - Key: " << key << " Value: " << Value(subscriptionInfo);
188
189                         RedisSubscriptionInfo rsi;
190
191                         Array::Ptr types = subscriptionInfo->Get("types");
192
193                         if (types)
194                                 rsi.EventTypes = types->ToSet<String>();
195
196                         m_Subscriptions[key] = rsi;
197                 } catch (const std::exception& ex) {
198                         Log(LogWarning, "RedisWriter")
199                             << "Invalid Redis subscriber info for subscriber '" << key << "': " << DiagnosticInformation(ex);
200
201                         continue;
202                 }
203                 //TODO
204         }
205
206         Log(LogInformation, "RedisWriter")
207             << "Current Redis event subscriptions: " << m_Subscriptions.size();
208 }
209
210 void RedisWriter::HandleEvents(void)
211 {
212         String queueName = Utility::NewUniqueID();
213         EventQueue::Ptr queue = new EventQueue(queueName);
214         EventQueue::Register(queueName, queue);
215
216         std::set<String> types;
217         types.insert("CheckResult");
218         types.insert("StateChange");
219         types.insert("Notification");
220         types.insert("AcknowledgementSet");
221         types.insert("AcknowledgementCleared");
222         types.insert("CommentAdded");
223         types.insert("CommentRemoved");
224         types.insert("DowntimeAdded");
225         types.insert("DowntimeRemoved");
226         types.insert("DowntimeStarted");
227         types.insert("DowntimeTriggered");
228
229         queue->SetTypes(types);
230
231         queue->AddClient(this);
232
233         for (;;) {
234                 Dictionary::Ptr event = queue->WaitForEvent(this);
235
236                 if (!event)
237                         continue;
238
239                 m_WorkQueue.Enqueue(boost::bind(&RedisWriter::HandleEvent, this, event));
240         }
241
242         queue->RemoveClient(this);
243         EventQueue::UnregisterIfUnused(queueName, queue);
244 }
245
246 void RedisWriter::HandleEvent(const Dictionary::Ptr& event)
247 {
248         if (!m_Context)
249                 return;
250
251         String type = event->Get("type");
252         bool atLeastOneSubscriber = false;
253
254         for (const std::pair<String, RedisSubscriptionInfo>& kv : m_Subscriptions) {
255                 const auto& rsi = kv.second;
256
257                 if (rsi.EventTypes.find(type) == rsi.EventTypes.end())
258                         continue;
259
260                 atLeastOneSubscriber = true;
261         }
262
263         if (!atLeastOneSubscriber)
264                 return;
265
266         Log(LogInformation, "RedisWriter")
267             << "Pushing event to Redis: '" << Value(event) << "'.";
268
269         redisReply *reply1 = reinterpret_cast<redisReply *>(redisCommand(m_Context, "INCR icinga:event.idx"));
270
271         if (!reply1) {
272                 redisFree(m_Context);
273                 m_Context = NULL;
274                 return;
275         }
276
277         Log(LogInformation, "RedisWriter")
278             << "Called INCR in HandleEvent";
279
280         if (reply1->type == REDIS_REPLY_STATUS || reply1->type == REDIS_REPLY_ERROR) {
281                 Log(LogInformation, "RedisWriter")
282                     << "INCR icinga:event.idx: " << reply1->str;
283         }
284
285         if (reply1->type == REDIS_REPLY_ERROR) {
286                 freeReplyObject(reply1);
287                 return;
288         }
289
290         //TODO
291         VERIFY(reply1->type == REDIS_REPLY_INTEGER);
292
293         long long index = reply1->integer;
294
295         freeReplyObject(reply1);
296
297         String body = JsonEncode(event);
298
299         //TODO: Verify that %lld is supported
300         redisReply *reply2 = reinterpret_cast<redisReply *>(redisCommand(m_Context, "SET icinga:event.%d %s", (int)index, body.CStr()));
301
302         if (!reply2) {
303                 redisFree(m_Context);
304                 m_Context = NULL;
305                 return;
306         }
307
308         if (reply2->type == REDIS_REPLY_STATUS || reply2->type == REDIS_REPLY_ERROR) {
309                 Log(LogInformation, "RedisWriter")
310                     << "SET icinga:event." << index << ": " << reply2->str;
311         }
312
313         if (reply2->type == REDIS_REPLY_ERROR) {
314                 freeReplyObject(reply2);
315                 return;
316         }
317
318         freeReplyObject(reply2);
319
320         redisReply *reply3 = reinterpret_cast<redisReply *>(redisCommand(m_Context, "EXPIRE icinga:event.%d 3600", (int)index, body.CStr()));
321
322         if (!reply3) {
323                 redisFree(m_Context);
324                 m_Context = NULL;
325                 return;
326         }
327
328         if (reply3->type == REDIS_REPLY_STATUS || reply3->type == REDIS_REPLY_ERROR) {
329                 Log(LogInformation, "RedisWriter")
330                     << "EXPIRE icinga:event." << index << ": " << reply3->str;
331         }
332
333         if (reply3->type == REDIS_REPLY_ERROR) {
334                 freeReplyObject(reply3);
335                 return;
336         }
337
338         freeReplyObject(reply3);
339
340         for (const std::pair<String, RedisSubscriptionInfo>& kv : m_Subscriptions) {
341                 const auto& name = kv.first;
342                 const auto& rsi = kv.second;
343
344                 if (rsi.EventTypes.find(type) == rsi.EventTypes.end())
345                         continue;
346
347                 redisReply *reply4 = reinterpret_cast<redisReply *>(redisCommand(m_Context, "LPUSH icinga:event:%s %d", name.CStr(), (int)index));
348
349                 if (!reply4) {
350                         redisFree(m_Context);
351                         m_Context = NULL;
352                         return;
353                 }
354
355                 if (reply4->type == REDIS_REPLY_STATUS || reply4->type == REDIS_REPLY_ERROR) {
356                         Log(LogInformation, "RedisWriter")
357                             << "LPUSH icinga:event:" << kv.first << " " << index << ": " << reply4->str;
358                 }
359
360                 if (reply4->type == REDIS_REPLY_ERROR) {
361                         freeReplyObject(reply4);
362                         return;
363                 }
364
365                 freeReplyObject(reply4);
366         }
367 }
368
369 void RedisWriter::Stop(bool runtimeRemoved)
370 {
371         Log(LogInformation, "RedisWriter")
372             << "'" << GetName() << "' stopped.";
373
374         ObjectImpl<RedisWriter>::Stop(runtimeRemoved);
375 }