From: Gunnar Beutner Date: Mon, 13 Mar 2017 09:37:51 +0000 (+0100) Subject: Implement support for expiring subscriptions X-Git-Tag: v2.7.0~211 X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=10ddcbe4d3413daa8319801896d94cf8f47d1025;p=icinga2 Implement support for expiring subscriptions refs #4991 --- diff --git a/lib/redis/rediswriter.cpp b/lib/redis/rediswriter.cpp index f43a8c07c..a04b53cb3 100644 --- a/lib/redis/rediswriter.cpp +++ b/lib/redis/rediswriter.cpp @@ -123,37 +123,67 @@ void RedisWriter::UpdateSubscriptions(void) Log(LogInformation, "RedisWriter", "Updating Redis subscriptions"); - redisReply *reply = reinterpret_cast(redisCommand(m_Context, "HGETALL icinga:subscription")); + std::map subscriptions; + long long cursor = 0; - if (!reply) { - redisFree(m_Context); - m_Context = NULL; - return; - } + do { + redisReply *reply = reinterpret_cast(redisCommand(m_Context, "SCAN %lld MATCH icinga:subscription:* COUNT 1000", cursor)); - if (reply->type == REDIS_REPLY_STATUS || reply->type == REDIS_REPLY_ERROR) { - Log(LogInformation, "RedisWriter") - << "HGETALL icinga:subscription: " << reply->str; - } + if (!reply) { + redisFree(m_Context); + m_Context = NULL; + return; + } - m_Subscriptions.clear(); + if (reply->type == REDIS_REPLY_STATUS || reply->type == REDIS_REPLY_ERROR) { + Log(LogInformation, "RedisWriter") + << "SCAN " << cursor << " MATCH icinga:subscription:* COUNT 1000: " << reply->str; + } - //TODO - VERIFY(reply->type == REDIS_REPLY_ARRAY); - VERIFY(reply->elements % 2 == 0); + VERIFY(reply->type == REDIS_REPLY_ARRAY); + VERIFY(reply->elements % 2 == 0); + + redisReply *cursorReply = reply->element[0]; + cursor = cursorReply->integer; + + redisReply *keysReply = reply->element[1]; + + for (size_t i = 0; i < keysReply->elements; i++) { + redisReply *keyReply = keysReply->element[i]; + VERIFY(keyReply->type == REDIS_REPLY_STRING); + + redisReply *vreply = reinterpret_cast(redisCommand(m_Context, "GET %s", keyReply->str)); + + if (!vreply) { + redisFree(m_Context); + m_Context = NULL; + return; + } + + if (vreply->type == REDIS_REPLY_STATUS || vreply->type == REDIS_REPLY_ERROR) { + Log(LogInformation, "RedisWriter") + << "GET " << keyReply->str << ": " << vreply->str; + } - for (size_t i = 0; i < reply->elements; i += 2) { - redisReply *keyReply = reply->element[i]; - VERIFY(keyReply->type == REDIS_REPLY_STRING); + subscriptions[keyReply->str] = vreply->str; - redisReply *valueReply = reply->element[i + 1]; - VERIFY(valueReply->type == REDIS_REPLY_STRING); + freeReplyObject(vreply); + } + + freeReplyObject(reply); + } while (cursor != 0); + + m_Subscriptions.clear(); + + for (const std::pair& kv : subscriptions) { + const String& key = kv.first.SubStr(20); /* removes the "icinga:subscription: prefix */ + const String& value = kv.second; try { - Dictionary::Ptr subscriptionInfo = JsonDecode(valueReply->str); + Dictionary::Ptr subscriptionInfo = JsonDecode(value); Log(LogInformation, "RedisWriter") - << "Subscriber Info - Key: " << keyReply->str << " Value: " << Value(subscriptionInfo); + << "Subscriber Info - Key: " << key << " Value: " << Value(subscriptionInfo); RedisSubscriptionInfo rsi; @@ -162,17 +192,15 @@ void RedisWriter::UpdateSubscriptions(void) if (types) rsi.EventTypes = types->ToSet(); - m_Subscriptions[keyReply->str] = rsi; + m_Subscriptions[key] = rsi; } catch (const std::exception& ex) { Log(LogWarning, "RedisWriter") - << "Invalid Redis subscriber info for subscriber '" << keyReply->str << "': " << DiagnosticInformation(ex); + << "Invalid Redis subscriber info for subscriber '" << key << "': " << DiagnosticInformation(ex); continue; } //TODO } - - freeReplyObject(reply); } void RedisWriter::HandleEvents(void)