From 7e7a565770dc037aa698c3879f0834a7751a6b7d Mon Sep 17 00:00:00 2001 From: Gunnar Beutner Date: Wed, 4 Sep 2013 15:47:15 +0200 Subject: [PATCH] cluster: Send config updates. --- Makefile.am | 3 +- components/cluster/cluster-type.conf | 10 ++- components/cluster/clustercomponent.cpp | 88 +++++++++++++++++++++++-- components/cluster/clustercomponent.h | 2 + components/cluster/endpoint.cpp | 14 ++++ components/cluster/endpoint.h | 5 ++ lib/base/tlsutility.cpp | 31 +++++++++ lib/base/tlsutility.h | 2 + 8 files changed, 146 insertions(+), 9 deletions(-) diff --git a/Makefile.am b/Makefile.am index 183a70d6e..65ecfd7b9 100644 --- a/Makefile.am +++ b/Makefile.am @@ -29,7 +29,8 @@ install-data-local: $(MKDIR_P) $(DESTDIR)${localstatedir}/log/${PACKAGE}/compat/archives $(MKDIR_P) $(DESTDIR)${localstatedir}/cache/${PACKAGE} $(MKDIR_P) $(DESTDIR)${localstatedir}/spool/${PACKAGE}/perfdata - $(MKDIR_P) $(DESTDIR)${localstatedir}/lib/${PACKAGE}/cluster + $(MKDIR_P) $(DESTDIR)${localstatedir}/lib/${PACKAGE}/cluster/config + $(MKDIR_P) $(DESTDIR)${localstatedir}/lib/${PACKAGE}/cluster/log $(MKDIR_P) $(DESTDIR)${localstatedir}/lib/${PACKAGE} $(MKDIR_P) $(DESTDIR)${localstatedir}/run/${PACKAGE} diff --git a/components/cluster/cluster-type.conf b/components/cluster/cluster-type.conf index dfb9dd7bf..8e201e20c 100644 --- a/components/cluster/cluster-type.conf +++ b/components/cluster/cluster-type.conf @@ -34,5 +34,13 @@ type ClusterComponent { type Endpoint { %attribute string "host", - %attribute string "port" + %attribute string "port", + + %attribute array "config_files" { + %attribute string "*" + }, + + %attribute array "accept_config" { + %attribute string "*" + } } diff --git a/components/cluster/clustercomponent.cpp b/components/cluster/clustercomponent.cpp index b4f9ac166..07202c5f5 100644 --- a/components/cluster/clustercomponent.cpp +++ b/components/cluster/clustercomponent.cpp @@ -53,6 +53,11 @@ void ClusterComponent::Start(void) m_Identity = GetCertificateCN(cert); Log(LogInformation, "cluster", "My identity: " + m_Identity); + Endpoint::Ptr self = Endpoint::GetByName(GetIdentity()); + + if (!self) + BOOST_THROW_EXCEPTION(std::invalid_argument("No configuration available for the local endpoint.")); + m_SSLContext = MakeSSLContext(GetCertificateFile(), GetCertificateFile(), GetCAFile()); /* create the primary JSON-RPC listener */ @@ -256,7 +261,7 @@ void ClusterComponent::OpenLogFile(void) { ASSERT(OwnsLock()); - String path = GetClusterDir() + "current"; + String path = GetClusterDir() + "log/current"; std::fstream *fp = new std::fstream(path.CStr(), std::fstream::out | std::ofstream::app); @@ -292,8 +297,8 @@ void ClusterComponent::RotateLogFile(void) if (ts == 0) ts = Utility::GetTime(); - String oldpath = GetClusterDir() + "current"; - String newpath = GetClusterDir() + Convert::ToString(static_cast(ts) + 1); + String oldpath = GetClusterDir() + "log/current"; + String newpath = GetClusterDir() + "log/" + Convert::ToString(static_cast(ts) + 1); (void) rename(oldpath.CStr(), newpath.CStr()); } @@ -322,11 +327,11 @@ void ClusterComponent::ReplayLog(const Endpoint::Ptr& endpoint, const Stream::Pt RotateLogFile(); std::vector files; - Utility::Glob(GetClusterDir() + "*", boost::bind(&ClusterComponent::LogGlobHandler, boost::ref(files), _1)); + Utility::Glob(GetClusterDir() + "log/*", boost::bind(&ClusterComponent::LogGlobHandler, boost::ref(files), _1)); std::sort(files.begin(), files.end()); BOOST_FOREACH(int ts, files) { - String path = GetClusterDir() + Convert::ToString(ts); + String path = GetClusterDir() + "log/" + Convert::ToString(ts); if (ts < endpoint->GetLocalLogPosition()) continue; @@ -368,6 +373,20 @@ void ClusterComponent::ReplayLog(const Endpoint::Ptr& endpoint, const Stream::Pt OpenLogFile(); } +void ClusterComponent::ConfigGlobHandler(const Dictionary::Ptr& config, const String& file) +{ + Dictionary::Ptr elem = boost::make_shared(); + + std::ifstream fp(file.CStr()); + if (!fp) + return; + + String content((std::istreambuf_iterator(fp)), std::istreambuf_iterator()); + elem->Set("content", content); + + config->Set(file, elem); +} + /** * Processes a new client connection. * @@ -393,6 +412,26 @@ void ClusterComponent::NewClientHandler(const Socket::Ptr& client, TlsRole role) return; } + Dictionary::Ptr config = boost::make_shared(); + Array::Ptr configFiles = endpoint->GetConfigFiles(); + + if (configFiles) { + BOOST_FOREACH(const String& pattern, configFiles) { + Utility::Glob(pattern, boost::bind(&ClusterComponent::ConfigGlobHandler, boost::cref(config), _1)); + } + } + + Dictionary::Ptr params = boost::make_shared(); + params->Set("config_files", config); + + Dictionary::Ptr message = boost::make_shared(); + message->Set("jsonrpc", "2.0"); + message->Set("method", "cluster::Config"); + message->Set("params", params); + + String json = Value(message).Serialize(); + NetString::WriteStringToStream(tlsStream, json); + { ObjectLock olock(this); ReplayLog(endpoint, tlsStream); @@ -448,7 +487,7 @@ void ClusterComponent::ClusterTimerHandler(void) } std::vector files; - Utility::Glob(GetClusterDir() + "*", boost::bind(&ClusterComponent::LogGlobHandler, boost::ref(files), _1)); + Utility::Glob(GetClusterDir() + "log/*", boost::bind(&ClusterComponent::LogGlobHandler, boost::ref(files), _1)); std::sort(files.begin(), files.end()); BOOST_FOREACH(int ts, files) { @@ -464,7 +503,7 @@ void ClusterComponent::ClusterTimerHandler(void) } if (!need) { - String path = GetClusterDir() + Convert::ToString(ts); + String path = GetClusterDir() + "log/" + Convert::ToString(ts); Log(LogInformation, "cluster", "Removing old log file: " + path); (void) unlink(path.CStr()); } @@ -933,6 +972,41 @@ void ClusterComponent::MessageHandler(const Endpoint::Ptr& sender, const Diction service->ClearAcknowledgement(sender->GetName()); } else if (message->Get("method") == "cluster::SetLogPosition") { sender->SetLocalLogPosition(params->Get("log_position")); + } else if (message->Get("method") == "cluster::Config") { + Dictionary::Ptr files = params->Get("config_files"); + + if (!files) + return; + + Endpoint::Ptr self = Endpoint::GetByName(GetIdentity()); + + Array::Ptr acceptConfig = self->GetAcceptConfig(); + + bool accept = false; + + if (acceptConfig) { + BOOST_FOREACH(const String& pattern, acceptConfig) { + if (Utility::Match(pattern, sender->GetName())) { + accept = true; + break; + } + } + } + + if (!accept) { + Log(LogWarning, "cluster", "Ignoring cluster::Config message from endpoint '" + sender->GetName() + "'."); + return; + } + + String dir = GetClusterDir() + "config/" + SHA256(sender->GetName()); + Log(LogInformation, "cluster", "Creating cluster config directory: " + dir); + if (mkdir(dir.CStr(), 0700) < 0 && errno != EEXIST) { + BOOST_THROW_EXCEPTION(posix_error() + << boost::errinfo_api_function("localtime") + << boost::errinfo_errno(errno)); + } + + /* TODO: update files, remove old files, figure out whether we need to restart */ } } diff --git a/components/cluster/clustercomponent.h b/components/cluster/clustercomponent.h index e72795461..d8289f28a 100644 --- a/components/cluster/clustercomponent.h +++ b/components/cluster/clustercomponent.h @@ -78,6 +78,8 @@ private: void AddListener(const String& service); void AddConnection(const String& node, const String& service); + static void ConfigGlobHandler(const Dictionary::Ptr& config, const String& file); + void NewClientHandler(const Socket::Ptr& client, TlsRole role); void ListenerThreadProc(const Socket::Ptr& server); diff --git a/components/cluster/endpoint.cpp b/components/cluster/endpoint.cpp index 0bd15df35..e9ab41e55 100644 --- a/components/cluster/endpoint.cpp +++ b/components/cluster/endpoint.cpp @@ -126,6 +126,16 @@ String Endpoint::GetPort(void) const return m_Port; } +Array::Ptr Endpoint::GetConfigFiles(void) const +{ + return m_ConfigFiles; +} + +Array::Ptr Endpoint::GetAcceptConfig(void) const +{ + return m_AcceptConfig; +} + double Endpoint::GetSeen(void) const { return m_Seen; @@ -163,6 +173,8 @@ void Endpoint::InternalSerialize(const Dictionary::Ptr& bag, int attributeTypes) if (attributeTypes & Attribute_Config) { bag->Set("host", m_Host); bag->Set("port", m_Port); + bag->Set("config_files", m_ConfigFiles); + bag->Set("accept_config", m_AcceptConfig); } if (attributeTypes & Attribute_State) { @@ -179,6 +191,8 @@ void Endpoint::InternalDeserialize(const Dictionary::Ptr& bag, int attributeType if (attributeTypes & Attribute_Config) { m_Host = bag->Get("host"); m_Port = bag->Get("port"); + m_ConfigFiles = bag->Get("config_files"); + m_AcceptConfig = bag->Get("accept_config"); } if (attributeTypes & Attribute_State) { diff --git a/components/cluster/endpoint.h b/components/cluster/endpoint.h index 52e28a493..1830e1018 100644 --- a/components/cluster/endpoint.h +++ b/components/cluster/endpoint.h @@ -22,6 +22,7 @@ #include "base/dynamicobject.h" #include "base/stream.h" +#include "base/array.h" #include namespace icinga @@ -52,6 +53,8 @@ public: String GetHost(void) const; String GetPort(void) const; + Array::Ptr GetConfigFiles(void) const; + Array::Ptr GetAcceptConfig(void) const; double GetSeen(void) const; void SetSeen(double ts); @@ -70,6 +73,8 @@ private: Dictionary::Ptr m_Subscriptions; String m_Host; String m_Port; + Array::Ptr m_ConfigFiles; + Array::Ptr m_AcceptConfig; Stream::Ptr m_Client; double m_Seen; diff --git a/lib/base/tlsutility.cpp b/lib/base/tlsutility.cpp index 29aaae233..1c18a4694 100644 --- a/lib/base/tlsutility.cpp +++ b/lib/base/tlsutility.cpp @@ -148,4 +148,35 @@ shared_ptr GetX509Certificate(const String& pemfile) return shared_ptr(cert, X509_free); } +String SHA256(const String& s) +{ + SHA256_CTX context; + unsigned char digest[SHA256_DIGEST_LENGTH]; + + if (!SHA256_Init(&context)) { + BOOST_THROW_EXCEPTION(openssl_error() + << boost::errinfo_api_function("SHA256_Init") + << errinfo_openssl_error(ERR_get_error())); + } + + if (!SHA256_Update(&context, (unsigned char*)s.CStr(), s.GetLength())) { + BOOST_THROW_EXCEPTION(openssl_error() + << boost::errinfo_api_function("SHA256_Update") + << errinfo_openssl_error(ERR_get_error())); + } + + if (!SHA256_Final(digest, &context)) { + BOOST_THROW_EXCEPTION(openssl_error() + << boost::errinfo_api_function("SHA256_Final") + << errinfo_openssl_error(ERR_get_error())); + } + + int i; + char output[SHA256_DIGEST_LENGTH*2+1]; + for (i = 0; i < 32; i++) + sprintf(output + 2 * i, "%02x", digest[i]); + + return output; +} + } diff --git a/lib/base/tlsutility.h b/lib/base/tlsutility.h index e2789bc11..e4962311d 100644 --- a/lib/base/tlsutility.h +++ b/lib/base/tlsutility.h @@ -28,6 +28,7 @@ #include #include #include +#include namespace icinga { @@ -35,6 +36,7 @@ namespace icinga shared_ptr I2_BASE_API MakeSSLContext(const String& pubkey, const String& privkey, const String& cakey); String I2_BASE_API GetCertificateCN(const shared_ptr& certificate); shared_ptr I2_BASE_API GetX509Certificate(const String& pemfile); +String SHA256(const String& s); class I2_BASE_API openssl_error : virtual public std::exception, virtual public boost::exception { }; -- 2.40.0