******************************************************************************/
#include "redis/rediswriter.hpp"
+#include "icinga/customvarobject.hpp"
+#include "icinga/host.hpp"
+#include "icinga/service.hpp"
#include "base/json.hpp"
#include "base/logger.hpp"
#include "base/serializer.hpp"
+#include "base/tlsutility.hpp"
+#include "base/initialize.hpp"
using namespace icinga;
value: JsonEncode(Serialize(object, FAState))
*/
-//TODO: OnActiveChanged handling.
+INITIALIZE_ONCE(&RedisWriter::ConfigStaticInitialize);
+
+void RedisWriter::ConfigStaticInitialize(void)
+{
+ /* triggered in ProcessCheckResult(), requires UpdateNextCheck() to be called before */
+ ConfigObject::OnStateChanged.connect(boost::bind(&RedisWriter::StateChangedHandler, _1));
+ CustomVarObject::OnVarsChanged.connect(boost::bind(&RedisWriter::VarsChangedHandler, _1));
+
+ /* triggered on create, update and delete objects */
+ ConfigObject::OnVersionChanged.connect(boost::bind(&RedisWriter::VersionChangedHandler, _1));
+}
+
void RedisWriter::UpdateAllConfigObjects(void)
{
- //TODO: Just use config types
+ AssertOnWorkQueue();
+
+ redisReply *reply1 = reinterpret_cast<redisReply *>(redisCommand(m_Context, "MULTI"));
+
+ if (!reply1) {
+ redisFree(m_Context);
+ m_Context = NULL;
+ return;
+ }
+
+ if (reply1->type == REDIS_REPLY_ERROR) {
+ Log(LogInformation, "RedisWriter")
+ << "MULTI: " << reply1->str;
+ }
+
+ if (reply1->type == REDIS_REPLY_ERROR) {
+ freeReplyObject(reply1);
+ return;
+ }
+
+ freeReplyObject(reply1);
+
for (const Type::Ptr& type : Type::GetAllTypes()) {
if (!ConfigObject::TypeInstance->IsAssignableFrom(type))
continue;
String typeName = type->GetName();
/* replace into aka delete insert is faster than a full diff */
- Log(LogInformation, "RedisWriter")
- << "Flushing icinga:config:" << typeName << " before config dump.";
-
- redisReply *reply = reinterpret_cast<redisReply *>(redisCommand(m_Context, "DEL icinga:config:%s", typeName.CStr()));
+ redisReply *reply2 = reinterpret_cast<redisReply *>(redisCommand(m_Context, "DEL icinga:config:%s icinga:status:%s", typeName.CStr(), typeName.CStr()));
- if (!reply) {
+ if (!reply2) {
redisFree(m_Context);
m_Context = NULL;
return;
}
- if (reply->type == REDIS_REPLY_STATUS || reply->type == REDIS_REPLY_ERROR) {
+ if (reply2->type == REDIS_REPLY_ERROR) {
Log(LogInformation, "RedisWriter")
- << "DEL icinga:config:" << typeName << ": " << reply->str;
+ << "DEL icinga:config:" << typeName << " icinga:status:" << typeName << ": " << reply2->str;
}
- if (reply->type == REDIS_REPLY_ERROR) {
- freeReplyObject(reply);
+ if (reply2->type == REDIS_REPLY_ERROR) {
+ freeReplyObject(reply2);
return;
}
- freeReplyObject(reply);
+ freeReplyObject(reply2);
/* fetch all objects and dump them */
ConfigType *ctype = dynamic_cast<ConfigType *>(type.get());
+ VERIFY(ctype);
- if (ctype) {
- for (const ConfigObject::Ptr& object : ctype->GetObjects()) {
- DumpConfigObject(object, typeName);
- }
+ for (const ConfigObject::Ptr& object : ctype->GetObjects()) {
+ SendConfigUpdate(object, typeName);
+ SendStatusUpdate(object, typeName);
}
}
+
+ redisReply *reply3 = reinterpret_cast<redisReply *>(redisCommand(m_Context, "EXEC"));
+
+ if (!reply3) {
+ redisFree(m_Context);
+ m_Context = NULL;
+ return;
+ }
+
+ if (reply3->type == REDIS_REPLY_ERROR) {
+ Log(LogInformation, "RedisWriter")
+ << "EXEC: " << reply3->str;
+ }
+
+ if (reply3->type == REDIS_REPLY_ERROR) {
+ freeReplyObject(reply3);
+ return;
+ }
+
+ freeReplyObject(reply3);
}
-void RedisWriter::DumpConfigObject(const ConfigObject::Ptr& object, const String& typeName)
+void RedisWriter::SendConfigUpdate(const ConfigObject::Ptr& object, const String& typeName)
{
+ AssertOnWorkQueue();
+
/* Serialize config object attributes */
Dictionary::Ptr objectAttrs = SerializeObjectAttrs(object, FAConfig);
//TODO: checksum
String objectName = object->GetName();
- redisReply *reply = reinterpret_cast<redisReply *>(redisCommand(m_Context, "HSET icinga:config:%s %s %s", typeName.CStr(), objectName.CStr(), jsonBody.CStr()));
+ redisReply *reply1 = reinterpret_cast<redisReply *>(redisCommand(m_Context, "HSET icinga:config:%s %s %s", typeName.CStr(), objectName.CStr(), jsonBody.CStr()));
+
+ if (!reply1) {
+ redisFree(m_Context);
+ m_Context = NULL;
+ return;
+ }
+
+ if (reply1->type == REDIS_REPLY_ERROR) {
+ Log(LogInformation, "RedisWriter")
+ << "HSET icinga:config:" << typeName << " " << objectName << " " << jsonBody << ": " << reply1->str;
+ }
+
+ if (reply1->type == REDIS_REPLY_ERROR) {
+ freeReplyObject(reply1);
+ return;
+ }
+
+ freeReplyObject(reply1);
+
+
+ /* check sums */
+ /* hset icinga:config:Host:checksums localhost { "name_checksum": "...", "properties_checksum": "...", "groups_checksum": "...", "vars_checksum": null } */
+ Dictionary::Ptr checkSum = new Dictionary();
+
+ checkSum->Set("name_checksum", CalculateCheckSumString(object->GetName()));
+
+ // TODO: move this elsewhere
+ Checkable::Ptr checkable = dynamic_pointer_cast<Checkable>(object);
+
+ if (checkable) {
+ Host::Ptr host;
+ Service::Ptr service;
+
+ tie(host, service) = GetHostService(checkable);
+
+ if (service)
+ checkSum->Set("groups_checksum", CalculateCheckSumGroups(service->GetGroups()));
+ else
+ checkSum->Set("groups_checksum", CalculateCheckSumGroups(host->GetGroups()));
+ }
+
+ String checkSumBody = JsonEncode(checkSum);
+
+ redisReply *reply2 = reinterpret_cast<redisReply *>(redisCommand(m_Context, "HSET icinga:config:%s:checksum %s %s", typeName.CStr(), objectName.CStr(), checkSumBody.CStr()));
+
+ if (!reply2) {
+ redisFree(m_Context);
+ m_Context = NULL;
+ return;
+ }
+
+ if (reply2->type == REDIS_REPLY_ERROR) {
+ Log(LogInformation, "RedisWriter")
+ << "HSET icinga:config:" << typeName << " " << objectName << " " << jsonBody << ": " << reply2->str;
+ }
+
+ if (reply2->type == REDIS_REPLY_ERROR) {
+ freeReplyObject(reply2);
+ return;
+ }
+
+ freeReplyObject(reply2);
+}
+
+void RedisWriter::SendStatusUpdate(const ConfigObject::Ptr& object, const String& typeName)
+{
+ AssertOnWorkQueue();
+
+ /* Serialize config object attributes */
+ Dictionary::Ptr objectAttrs = SerializeObjectAttrs(object, FAState);
+
+ String jsonBody = JsonEncode(objectAttrs);
+
+ //TODO: checksum
+ String objectName = object->GetName();
+
+ redisReply *reply = reinterpret_cast<redisReply *>(redisCommand(m_Context, "HSET icinga:status:%s %s %s", typeName.CStr(), objectName.CStr(), jsonBody.CStr()));
if (!reply) {
redisFree(m_Context);
return;
}
- if (reply->type == REDIS_REPLY_STATUS || reply->type == REDIS_REPLY_ERROR) {
+ if (reply->type == REDIS_REPLY_ERROR) {
Log(LogInformation, "RedisWriter")
- << "HSET icinga:config:" << typeName << " " << objectName << " " << jsonBody << ": " << reply->str;
+ << "HSET icinga:status:" << typeName << " " << objectName << " " << jsonBody << ": " << reply->str;
}
if (reply->type == REDIS_REPLY_ERROR) {
freeReplyObject(reply);
}
-Dictionary::Ptr RedisWriter::SerializeObjectAttrs(const Object::Ptr& object, int fieldType)
+void RedisWriter::StateChangedHandler(const ConfigObject::Ptr& object)
{
Type::Ptr type = object->GetReflectionType();
- std::vector<int> fids;
-
- for (int fid = 0; fid < type->GetFieldCount(); fid++) {
- fids.push_back(fid);
+ for (const RedisWriter::Ptr& rw : ConfigType::GetObjectsByType<RedisWriter>()) {
+ rw->m_WorkQueue.Enqueue(boost::bind(&RedisWriter::SendStatusUpdate, rw, object, type->GetName()));
}
+}
- Dictionary::Ptr resultAttrs = new Dictionary();
-
- for (int& fid : fids)
- {
- Field field = type->GetFieldInfo(fid);
-
- Value val = object->GetField(fid);
+void RedisWriter::VarsChangedHandler(const ConfigObject::Ptr& object)
+{
+ Type::Ptr type = object->GetReflectionType();
- /* hide attributes which shouldn't be user-visible */
- if (field.Attributes & FANoUserView)
- continue;
+ for (const RedisWriter::Ptr& rw : ConfigType::GetObjectsByType<RedisWriter>()) {
+ rw->m_WorkQueue.Enqueue(boost::bind(&RedisWriter::SendConfigUpdate, rw, object, type->GetName()));
+ }
+}
- /* hide internal navigation fields */
- if (field.Attributes & FANavigation && !(field.Attributes & (FAConfig | FAState)))
- continue;
+void RedisWriter::VersionChangedHandler(const ConfigObject::Ptr& object)
+{
+ Type::Ptr type = object->GetReflectionType();
- Value sval = Serialize(val, fieldType);
- resultAttrs->Set(field.Name, sval);
+ for (const RedisWriter::Ptr& rw : ConfigType::GetObjectsByType<RedisWriter>()) {
+ rw->m_WorkQueue.Enqueue(boost::bind(&RedisWriter::SendConfigUpdate, rw.get(), object, type->GetName()));
}
-
- return resultAttrs;
}
-