Log(LogInformation, "RedisWriter", "Updating Redis subscriptions");
- redisReply *reply = reinterpret_cast<redisReply *>(redisCommand(m_Context, "HGETALL icinga:subscription"));
+ std::map<String, String> subscriptions;
+ long long cursor = 0;
- if (!reply) {
- redisFree(m_Context);
- m_Context = NULL;
- return;
- }
+ do {
+ redisReply *reply = reinterpret_cast<redisReply *>(redisCommand(m_Context, "SCAN %lld MATCH icinga:subscription:* COUNT 1000", cursor));
- if (reply->type == REDIS_REPLY_STATUS || reply->type == REDIS_REPLY_ERROR) {
- Log(LogInformation, "RedisWriter")
- << "HGETALL icinga:subscription: " << reply->str;
- }
+ if (!reply) {
+ redisFree(m_Context);
+ m_Context = NULL;
+ return;
+ }
- m_Subscriptions.clear();
+ if (reply->type == REDIS_REPLY_STATUS || reply->type == REDIS_REPLY_ERROR) {
+ Log(LogInformation, "RedisWriter")
+ << "SCAN " << cursor << " MATCH icinga:subscription:* COUNT 1000: " << reply->str;
+ }
- //TODO
- VERIFY(reply->type == REDIS_REPLY_ARRAY);
- VERIFY(reply->elements % 2 == 0);
+ VERIFY(reply->type == REDIS_REPLY_ARRAY);
+ VERIFY(reply->elements % 2 == 0);
+
+ redisReply *cursorReply = reply->element[0];
+ cursor = cursorReply->integer;
+
+ redisReply *keysReply = reply->element[1];
+
+ for (size_t i = 0; i < keysReply->elements; i++) {
+ redisReply *keyReply = keysReply->element[i];
+ VERIFY(keyReply->type == REDIS_REPLY_STRING);
+
+ redisReply *vreply = reinterpret_cast<redisReply *>(redisCommand(m_Context, "GET %s", keyReply->str));
+
+ if (!vreply) {
+ redisFree(m_Context);
+ m_Context = NULL;
+ return;
+ }
+
+ if (vreply->type == REDIS_REPLY_STATUS || vreply->type == REDIS_REPLY_ERROR) {
+ Log(LogInformation, "RedisWriter")
+ << "GET " << keyReply->str << ": " << vreply->str;
+ }
- for (size_t i = 0; i < reply->elements; i += 2) {
- redisReply *keyReply = reply->element[i];
- VERIFY(keyReply->type == REDIS_REPLY_STRING);
+ subscriptions[keyReply->str] = vreply->str;
- redisReply *valueReply = reply->element[i + 1];
- VERIFY(valueReply->type == REDIS_REPLY_STRING);
+ freeReplyObject(vreply);
+ }
+
+ freeReplyObject(reply);
+ } while (cursor != 0);
+
+ m_Subscriptions.clear();
+
+ for (const std::pair<String, String>& kv : subscriptions) {
+ const String& key = kv.first.SubStr(20); /* removes the "icinga:subscription: prefix */
+ const String& value = kv.second;
try {
- Dictionary::Ptr subscriptionInfo = JsonDecode(valueReply->str);
+ Dictionary::Ptr subscriptionInfo = JsonDecode(value);
Log(LogInformation, "RedisWriter")
- << "Subscriber Info - Key: " << keyReply->str << " Value: " << Value(subscriptionInfo);
+ << "Subscriber Info - Key: " << key << " Value: " << Value(subscriptionInfo);
RedisSubscriptionInfo rsi;
if (types)
rsi.EventTypes = types->ToSet<String>();
- m_Subscriptions[keyReply->str] = rsi;
+ m_Subscriptions[key] = rsi;
} catch (const std::exception& ex) {
Log(LogWarning, "RedisWriter")
- << "Invalid Redis subscriber info for subscriber '" << keyReply->str << "': " << DiagnosticInformation(ex);
+ << "Invalid Redis subscriber info for subscriber '" << key << "': " << DiagnosticInformation(ex);
continue;
}
//TODO
}
-
- freeReplyObject(reply);
}
void RedisWriter::HandleEvents(void)