{
ObjectImpl<RedisWriter>::Start(runtimeCreated);
- boost::thread thread(boost::bind(&RedisWriter::HandleEvents, this));
+ boost::thread thread(boost::bind(&RedisWriter::ConnectionThreadProc, this));
thread.detach();
+}
+void RedisWriter::ConnectionThreadProc(void)
+{
String path = GetPath();
String host = GetHost();
+ Log(LogInformation, "RedisWriter", "Trying to connecto redis server");
+
if (path.IsEmpty())
m_Context = redisConnect(host.CStr(), GetPort());
else
m_Context = redisConnectUnix(path.CStr());
- String password = GetPassword();
+ if (!m_Context || m_Context->err) {
+ if (!m_Context) {
+ Log(LogWarning, "RedisWriter", "Cannot allocate redis context.");
+ } else {
+ Log(LogWarning, "RedisWriter", "Connection error: ")
+ << m_Context->errstr;
+ }
+ }
+
+ for (;;) {
+ String password = GetPassword();
+
+ if (!password.IsEmpty()) {
+ redisReply *reply = reinterpret_cast<redisReply *>(redisCommand(m_Context, "AUTH %s", password.CStr()));
+
+ if (reply->type == REDIS_REPLY_STATUS || reply->type == REDIS_REPLY_ERROR) {
+ Log(LogInformation, "RedisWriter")
+ << "AUTH: " << reply->str;
+ }
+
+ freeReplyObject(reply);
+ }
+
+ HandleEvents();
+
+ for (;;) {
+ Log(LogInformation, "RedisWriter", "Trying to reconnect to redis server");
+
+ if (redisReconnect(m_Context) == REDIS_OK) {
+ Log(LogInformation, "RedisWriter", "Connection to redis server was reestablished");
+ break;
+ }
- void *reply = redisCommand(m_Context, "AUTH %s", password.CStr());
- freeReplyObject(reply);
+ Log(LogInformation, "RedisWriter", "Unable to reconnect to redis server: Waiting for next attempt");
+
+ Utility::Sleep(15);
+ }
+ }
}
void RedisWriter::HandleEvents(void)
String body = JsonEncode(result);
- //TODO: Reconnect handling
- try {
- void *reply = redisCommand(m_Context, "LPUSH icinga:events %s", body.CStr());
+ redisReply *reply = reinterpret_cast<redisReply *>(redisCommand(m_Context, "LPUSH icinga:events %s", body.CStr()));
+
+ if (!reply)
+ break;
+
+ if (reply->type == REDIS_REPLY_STATUS || reply->type == REDIS_REPLY_ERROR) {
+ Log(LogInformation, "RedisWriter")
+ << "LPUSH icinga:events: " << reply->str;
+ }
+
+ if (reply->type == REDIS_REPLY_ERROR) {
freeReplyObject(reply);
- } catch (const std::exception&) {
- queue->RemoveClient(this);
- EventQueue::UnregisterIfUnused(queueName, queue);
- throw;
+ break;
}
+
+ freeReplyObject(reply);
}
+
+ queue->RemoveClient(this);
+ EventQueue::UnregisterIfUnused(queueName, queue);
}