]> granicus.if.org Git - icinga2/commitdiff
Implement support for expiring subscriptions
authorGunnar Beutner <gunnar.beutner@icinga.com>
Mon, 13 Mar 2017 09:37:51 +0000 (10:37 +0100)
committerMichael Friedrich <michael.friedrich@icinga.com>
Tue, 14 Mar 2017 14:19:02 +0000 (15:19 +0100)
refs #4991

lib/redis/rediswriter.cpp

index f43a8c07c84dfaf29998b5849390920cb9b8a296..a04b53cb326abb28419d754c36b876bbc3aa8477 100644 (file)
@@ -123,37 +123,67 @@ void RedisWriter::UpdateSubscriptions(void)
 
        Log(LogInformation, "RedisWriter", "Updating Redis subscriptions");
 
-       redisReply *reply = reinterpret_cast<redisReply *>(redisCommand(m_Context, "HGETALL icinga:subscription"));
+       std::map<String, String> subscriptions;
+       long long cursor = 0;
 
-       if (!reply) {
-               redisFree(m_Context);
-               m_Context = NULL;
-               return;
-       }
+       do {
+               redisReply *reply = reinterpret_cast<redisReply *>(redisCommand(m_Context, "SCAN %lld MATCH icinga:subscription:* COUNT 1000", cursor));
 
-       if (reply->type == REDIS_REPLY_STATUS || reply->type == REDIS_REPLY_ERROR) {
-               Log(LogInformation, "RedisWriter")
-                   << "HGETALL icinga:subscription: " << reply->str;
-       }
+               if (!reply) {
+                       redisFree(m_Context);
+                       m_Context = NULL;
+                       return;
+               }
 
-       m_Subscriptions.clear();
+               if (reply->type == REDIS_REPLY_STATUS || reply->type == REDIS_REPLY_ERROR) {
+                       Log(LogInformation, "RedisWriter")
+                           << "SCAN " << cursor << " MATCH icinga:subscription:* COUNT 1000: " << reply->str;
+               }
 
-       //TODO
-       VERIFY(reply->type == REDIS_REPLY_ARRAY);
-       VERIFY(reply->elements % 2 == 0);
+               VERIFY(reply->type == REDIS_REPLY_ARRAY);
+               VERIFY(reply->elements % 2 == 0);
+
+               redisReply *cursorReply = reply->element[0];
+               cursor = cursorReply->integer;
+
+               redisReply *keysReply = reply->element[1];
+
+               for (size_t i = 0; i < keysReply->elements; i++) {
+                       redisReply *keyReply = keysReply->element[i];
+                       VERIFY(keyReply->type == REDIS_REPLY_STRING);
+
+                       redisReply *vreply = reinterpret_cast<redisReply *>(redisCommand(m_Context, "GET %s", keyReply->str));
+
+                       if (!vreply) {
+                               redisFree(m_Context);
+                               m_Context = NULL;
+                               return;
+                       }
+
+                       if (vreply->type == REDIS_REPLY_STATUS || vreply->type == REDIS_REPLY_ERROR) {
+                               Log(LogInformation, "RedisWriter")
+                                   << "GET " << keyReply->str << ": " << vreply->str;
+                       }
 
-       for (size_t i = 0; i < reply->elements; i += 2) {
-               redisReply *keyReply = reply->element[i];
-               VERIFY(keyReply->type == REDIS_REPLY_STRING);
+                       subscriptions[keyReply->str] = vreply->str;
 
-               redisReply *valueReply = reply->element[i + 1];
-               VERIFY(valueReply->type == REDIS_REPLY_STRING);
+                       freeReplyObject(vreply);
+               }
+
+               freeReplyObject(reply);
+       } while (cursor != 0);
+
+       m_Subscriptions.clear();
+
+       for (const std::pair<String, String>& kv : subscriptions) {
+               const String& key = kv.first.SubStr(20); /* removes the "icinga:subscription: prefix */
+               const String& value = kv.second;
 
                try {
-                       Dictionary::Ptr subscriptionInfo = JsonDecode(valueReply->str);
+                       Dictionary::Ptr subscriptionInfo = JsonDecode(value);
 
                        Log(LogInformation, "RedisWriter")
-                           << "Subscriber Info - Key: " << keyReply->str << " Value: " << Value(subscriptionInfo);
+                           << "Subscriber Info - Key: " << key << " Value: " << Value(subscriptionInfo);
 
                        RedisSubscriptionInfo rsi;
 
@@ -162,17 +192,15 @@ void RedisWriter::UpdateSubscriptions(void)
                        if (types)
                                rsi.EventTypes = types->ToSet<String>();
 
-                       m_Subscriptions[keyReply->str] = rsi;
+                       m_Subscriptions[key] = rsi;
                } catch (const std::exception& ex) {
                        Log(LogWarning, "RedisWriter")
-                           << "Invalid Redis subscriber info for subscriber '" << keyReply->str << "': " << DiagnosticInformation(ex);
+                           << "Invalid Redis subscriber info for subscriber '" << key << "': " << DiagnosticInformation(ex);
 
                        continue;
                }
                //TODO
        }
-
-       freeReplyObject(reply);
 }
 
 void RedisWriter::HandleEvents(void)