]> granicus.if.org Git - icinga2/commitdiff
cluster: Send config updates.
authorGunnar Beutner <gunnar.beutner@netways.de>
Wed, 4 Sep 2013 13:47:15 +0000 (15:47 +0200)
committerGunnar Beutner <gunnar.beutner@netways.de>
Wed, 4 Sep 2013 13:47:15 +0000 (15:47 +0200)
Makefile.am
components/cluster/cluster-type.conf
components/cluster/clustercomponent.cpp
components/cluster/clustercomponent.h
components/cluster/endpoint.cpp
components/cluster/endpoint.h
lib/base/tlsutility.cpp
lib/base/tlsutility.h

index 183a70d6e507aa6118fdc876897e39524851c323..65ecfd7b9ff43d5e102437015abb24a43168c7c5 100644 (file)
@@ -29,7 +29,8 @@ install-data-local:
        $(MKDIR_P) $(DESTDIR)${localstatedir}/log/${PACKAGE}/compat/archives
        $(MKDIR_P) $(DESTDIR)${localstatedir}/cache/${PACKAGE}
        $(MKDIR_P) $(DESTDIR)${localstatedir}/spool/${PACKAGE}/perfdata
-       $(MKDIR_P) $(DESTDIR)${localstatedir}/lib/${PACKAGE}/cluster
+       $(MKDIR_P) $(DESTDIR)${localstatedir}/lib/${PACKAGE}/cluster/config
+       $(MKDIR_P) $(DESTDIR)${localstatedir}/lib/${PACKAGE}/cluster/log
        $(MKDIR_P) $(DESTDIR)${localstatedir}/lib/${PACKAGE}
        $(MKDIR_P) $(DESTDIR)${localstatedir}/run/${PACKAGE}
 
index dfb9dd7bfbff6f879406e9cff22716e0b8355ee3..8e201e20cf116e9582b594bc7ca5f996cc485986 100644 (file)
@@ -34,5 +34,13 @@ type ClusterComponent {
 
 type Endpoint {
        %attribute string "host",
-       %attribute string "port"
+       %attribute string "port",
+
+       %attribute array "config_files" {
+               %attribute string "*"
+       },
+
+       %attribute array "accept_config" {
+               %attribute string "*"
+       }
 }
index b4f9ac166e3bcb43e6c08726b9d6640708f3ec71..07202c5f5856d5e97089be67f395010da6bc8e0f 100644 (file)
@@ -53,6 +53,11 @@ void ClusterComponent::Start(void)
        m_Identity = GetCertificateCN(cert);
        Log(LogInformation, "cluster", "My identity: " + m_Identity);
 
+       Endpoint::Ptr self = Endpoint::GetByName(GetIdentity());
+
+       if (!self)
+               BOOST_THROW_EXCEPTION(std::invalid_argument("No configuration available for the local endpoint."));
+
        m_SSLContext = MakeSSLContext(GetCertificateFile(), GetCertificateFile(), GetCAFile());
 
        /* create the primary JSON-RPC listener */
@@ -256,7 +261,7 @@ void ClusterComponent::OpenLogFile(void)
 {
        ASSERT(OwnsLock());
 
-       String path = GetClusterDir() + "current";
+       String path = GetClusterDir() + "log/current";
 
        std::fstream *fp = new std::fstream(path.CStr(), std::fstream::out | std::ofstream::app);
 
@@ -292,8 +297,8 @@ void ClusterComponent::RotateLogFile(void)
        if (ts == 0)
                ts = Utility::GetTime();
 
-       String oldpath = GetClusterDir() + "current";
-       String newpath = GetClusterDir() + Convert::ToString(static_cast<int>(ts) + 1);
+       String oldpath = GetClusterDir() + "log/current";
+       String newpath = GetClusterDir() + "log/" + Convert::ToString(static_cast<int>(ts) + 1);
        (void) rename(oldpath.CStr(), newpath.CStr());
 }
 
@@ -322,11 +327,11 @@ void ClusterComponent::ReplayLog(const Endpoint::Ptr& endpoint, const Stream::Pt
        RotateLogFile();
 
        std::vector<int> files;
-       Utility::Glob(GetClusterDir() + "*", boost::bind(&ClusterComponent::LogGlobHandler, boost::ref(files), _1));
+       Utility::Glob(GetClusterDir() + "log/*", boost::bind(&ClusterComponent::LogGlobHandler, boost::ref(files), _1));
        std::sort(files.begin(), files.end());
 
        BOOST_FOREACH(int ts, files) {
-               String path = GetClusterDir() + Convert::ToString(ts);
+               String path = GetClusterDir() + "log/" + Convert::ToString(ts);
 
                if (ts < endpoint->GetLocalLogPosition())
                        continue;
@@ -368,6 +373,20 @@ void ClusterComponent::ReplayLog(const Endpoint::Ptr& endpoint, const Stream::Pt
        OpenLogFile();
 }
 
+void ClusterComponent::ConfigGlobHandler(const Dictionary::Ptr& config, const String& file)
+{
+       Dictionary::Ptr elem = boost::make_shared<Dictionary>();
+
+       std::ifstream fp(file.CStr());
+       if (!fp)
+               return;
+
+       String content((std::istreambuf_iterator<char>(fp)), std::istreambuf_iterator<char>());
+       elem->Set("content", content);
+
+       config->Set(file, elem);
+}
+
 /**
  * Processes a new client connection.
  *
@@ -393,6 +412,26 @@ void ClusterComponent::NewClientHandler(const Socket::Ptr& client, TlsRole role)
                return;
        }
 
+       Dictionary::Ptr config = boost::make_shared<Dictionary>();
+       Array::Ptr configFiles = endpoint->GetConfigFiles();
+
+       if (configFiles) {
+               BOOST_FOREACH(const String& pattern, configFiles) {
+                       Utility::Glob(pattern, boost::bind(&ClusterComponent::ConfigGlobHandler, boost::cref(config), _1));
+               }
+       }
+
+       Dictionary::Ptr params = boost::make_shared<Dictionary>();
+       params->Set("config_files", config);
+
+       Dictionary::Ptr message = boost::make_shared<Dictionary>();
+       message->Set("jsonrpc", "2.0");
+       message->Set("method", "cluster::Config");
+       message->Set("params", params);
+
+       String json = Value(message).Serialize();
+       NetString::WriteStringToStream(tlsStream, json);
+
        {
                ObjectLock olock(this);
                ReplayLog(endpoint, tlsStream);
@@ -448,7 +487,7 @@ void ClusterComponent::ClusterTimerHandler(void)
        }
 
        std::vector<int> files;
-       Utility::Glob(GetClusterDir() + "*", boost::bind(&ClusterComponent::LogGlobHandler, boost::ref(files), _1));
+       Utility::Glob(GetClusterDir() + "log/*", boost::bind(&ClusterComponent::LogGlobHandler, boost::ref(files), _1));
        std::sort(files.begin(), files.end());
 
        BOOST_FOREACH(int ts, files) {
@@ -464,7 +503,7 @@ void ClusterComponent::ClusterTimerHandler(void)
                }
 
                if (!need) {
-                       String path = GetClusterDir() + Convert::ToString(ts);
+                       String path = GetClusterDir() + "log/" + Convert::ToString(ts);
                        Log(LogInformation, "cluster", "Removing old log file: " + path);
                        (void) unlink(path.CStr());
                }
@@ -933,6 +972,41 @@ void ClusterComponent::MessageHandler(const Endpoint::Ptr& sender, const Diction
                service->ClearAcknowledgement(sender->GetName());
        } else if (message->Get("method") == "cluster::SetLogPosition") {
                sender->SetLocalLogPosition(params->Get("log_position"));
+       } else if (message->Get("method") == "cluster::Config") {
+               Dictionary::Ptr files = params->Get("config_files");
+               
+               if (!files)
+                       return;
+
+               Endpoint::Ptr self = Endpoint::GetByName(GetIdentity());
+
+               Array::Ptr acceptConfig = self->GetAcceptConfig();
+
+               bool accept = false;
+
+               if (acceptConfig) {
+                       BOOST_FOREACH(const String& pattern, acceptConfig) {
+                               if (Utility::Match(pattern, sender->GetName())) {
+                                       accept = true;
+                                       break;
+                               }
+                       }
+               }
+
+               if (!accept) {
+                       Log(LogWarning, "cluster", "Ignoring cluster::Config message from endpoint '" + sender->GetName() + "'.");
+                       return;
+               }
+
+               String dir = GetClusterDir() + "config/" + SHA256(sender->GetName());
+               Log(LogInformation, "cluster", "Creating cluster config directory: " + dir);
+               if (mkdir(dir.CStr(), 0700) < 0 && errno != EEXIST) {
+                       BOOST_THROW_EXCEPTION(posix_error()
+                               << boost::errinfo_api_function("localtime")
+                               << boost::errinfo_errno(errno));
+               }
+
+               /* TODO: update files, remove old files, figure out whether we need to restart */
        }
 }
 
index e72795461a982ced7d7654248c03d077c3fd6389..d8289f28aa2b99d87ce919707f7a72b1c8680108 100644 (file)
@@ -78,6 +78,8 @@ private:
        void AddListener(const String& service);
        void AddConnection(const String& node, const String& service);
 
+       static void ConfigGlobHandler(const Dictionary::Ptr& config, const String& file);
+
        void NewClientHandler(const Socket::Ptr& client, TlsRole role);
        void ListenerThreadProc(const Socket::Ptr& server);
 
index 0bd15df35ed544333b4c38d2a6bc841bfeb211e2..e9ab41e55cbc9da64e9c7643890107035cd55535 100644 (file)
@@ -126,6 +126,16 @@ String Endpoint::GetPort(void) const
        return m_Port;
 }
 
+Array::Ptr Endpoint::GetConfigFiles(void) const
+{
+       return m_ConfigFiles;
+}
+
+Array::Ptr Endpoint::GetAcceptConfig(void) const
+{
+       return m_AcceptConfig;
+}
+
 double Endpoint::GetSeen(void) const
 {
        return m_Seen;
@@ -163,6 +173,8 @@ void Endpoint::InternalSerialize(const Dictionary::Ptr& bag, int attributeTypes)
        if (attributeTypes & Attribute_Config) {
                bag->Set("host", m_Host);
                bag->Set("port", m_Port);
+               bag->Set("config_files", m_ConfigFiles);
+               bag->Set("accept_config", m_AcceptConfig);
        }
 
        if (attributeTypes & Attribute_State) {
@@ -179,6 +191,8 @@ void Endpoint::InternalDeserialize(const Dictionary::Ptr& bag, int attributeType
        if (attributeTypes & Attribute_Config) {
                m_Host = bag->Get("host");
                m_Port = bag->Get("port");
+               m_ConfigFiles = bag->Get("config_files");
+               m_AcceptConfig = bag->Get("accept_config");
        }
 
        if (attributeTypes & Attribute_State) {
index 52e28a49322479bb399450640ecd0c8cee96540e..1830e10189988f5f571f6d89e3c606331d68b6bb 100644 (file)
@@ -22,6 +22,7 @@
 
 #include "base/dynamicobject.h"
 #include "base/stream.h"
+#include "base/array.h"
 #include <boost/signals2.hpp>
 
 namespace icinga
@@ -52,6 +53,8 @@ public:
 
        String GetHost(void) const;
        String GetPort(void) const;
+       Array::Ptr GetConfigFiles(void) const;
+       Array::Ptr GetAcceptConfig(void) const;
 
        double GetSeen(void) const;
        void SetSeen(double ts);
@@ -70,6 +73,8 @@ private:
        Dictionary::Ptr m_Subscriptions;
        String m_Host;
        String m_Port;
+       Array::Ptr m_ConfigFiles;
+       Array::Ptr m_AcceptConfig;
 
        Stream::Ptr m_Client;
        double m_Seen;
index 29aaae233ec21d5881105d616137ff4755d31c8a..1c18a4694751e540c51b1d38c22f139edb975dc2 100644 (file)
@@ -148,4 +148,35 @@ shared_ptr<X509> GetX509Certificate(const String& pemfile)
        return shared_ptr<X509>(cert, X509_free);
 }
 
+String SHA256(const String& s)
+{
+       SHA256_CTX context;
+       unsigned char digest[SHA256_DIGEST_LENGTH];
+
+       if (!SHA256_Init(&context)) {
+               BOOST_THROW_EXCEPTION(openssl_error()
+                       << boost::errinfo_api_function("SHA256_Init")
+                       << errinfo_openssl_error(ERR_get_error()));
+       }
+
+       if (!SHA256_Update(&context, (unsigned char*)s.CStr(), s.GetLength())) {
+               BOOST_THROW_EXCEPTION(openssl_error()
+                       << boost::errinfo_api_function("SHA256_Update")
+                       << errinfo_openssl_error(ERR_get_error()));
+       }
+
+       if (!SHA256_Final(digest, &context)) {
+               BOOST_THROW_EXCEPTION(openssl_error()
+                       << boost::errinfo_api_function("SHA256_Final")
+                       << errinfo_openssl_error(ERR_get_error()));
+       }
+
+       int i;
+       char output[SHA256_DIGEST_LENGTH*2+1];
+       for (i = 0; i < 32; i++)
+               sprintf(output + 2 * i, "%02x", digest[i]);
+
+       return output;
+}
+
 }
index e2789bc11e1ac0d1076a6a0e900e02bb005cbbd9..e4962311de868ff6b00527330f2f4134a9779d17 100644 (file)
@@ -28,6 +28,7 @@
 #include <openssl/bio.h>
 #include <openssl/err.h>
 #include <openssl/comp.h>
+#include <openssl/sha.h>
 
 namespace icinga
 {
@@ -35,6 +36,7 @@ namespace icinga
 shared_ptr<SSL_CTX> I2_BASE_API MakeSSLContext(const String& pubkey, const String& privkey, const String& cakey);
 String I2_BASE_API GetCertificateCN(const shared_ptr<X509>& certificate);
 shared_ptr<X509> I2_BASE_API GetX509Certificate(const String& pemfile);
+String SHA256(const String& s);
 
 class I2_BASE_API openssl_error : virtual public std::exception, virtual public boost::exception { };