$(MKDIR_P) $(DESTDIR)${localstatedir}/log/${PACKAGE}/compat/archives
$(MKDIR_P) $(DESTDIR)${localstatedir}/cache/${PACKAGE}
$(MKDIR_P) $(DESTDIR)${localstatedir}/spool/${PACKAGE}/perfdata
- $(MKDIR_P) $(DESTDIR)${localstatedir}/spool/${PACKAGE}/cluster
+ $(MKDIR_P) $(DESTDIR)${localstatedir}/lib/${PACKAGE}/cluster
$(MKDIR_P) $(DESTDIR)${localstatedir}/lib/${PACKAGE}
$(MKDIR_P) $(DESTDIR)${localstatedir}/run/${PACKAGE}
#include "cluster/clustercomponent.h"
#include "cluster/endpoint.h"
+#include "base/netstring.h"
#include "base/dynamictype.h"
#include "base/logger_fwd.h"
#include "base/objectlock.h"
#include "base/networkstream.h"
+#include "base/application.h"
+#include "base/convert.h"
#include <boost/smart_ptr/make_shared.hpp>
+#include <fstream>
using namespace icinga;
{
DynamicObject::Start();
+ OpenLogFile();
+
/* set up SSL context */
shared_ptr<X509> cert = GetX509Certificate(GetCertificateFile());
m_Identity = GetCertificateCN(cert);
*/
void ClusterComponent::Stop(void)
{
- /* Nothing to do here. */
+ CloseLogFile();
}
String ClusterComponent::GetCertificateFile(void) const
void ClusterComponent::RelayMessage(const Endpoint::Ptr& except, const Dictionary::Ptr& message, bool persistent)
{
+ message->Set("ts", Utility::GetTime());
+
+ if (persistent) {
+ Dictionary::Ptr pmessage = boost::make_shared<Dictionary>();
+ pmessage->Set("timestamp", Utility::GetTime());
+ pmessage->Set("message", message);
+
+ ObjectLock olock(this);
+ String json = Value(pmessage).Serialize();
+ NetString::WriteStringToStream(m_LogFile, json);
+ m_LogMessageCount++;
+
+ if (m_LogMessageCount > 250000) {
+ CloseLogFile();
+ OpenLogFile();
+ }
+ }
+
BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects<Endpoint>()) {
if (!persistent && !endpoint->IsConnected())
continue;
}
}
+String ClusterComponent::GetClusterDir(void) const
+{
+ return Application::GetLocalStateDir() + "/lib/icinga2/cluster/";
+}
+
+void ClusterComponent::OpenLogFile(void)
+{
+ std::ostringstream msgbuf;
+ msgbuf << GetClusterDir() << static_cast<long>(Utility::GetTime());
+ String path = msgbuf.str();
+
+ std::fstream *fp = new std::fstream(path.CStr(), std::fstream::out | std::ofstream::app);
+
+ if (!fp->good()) {
+ Log(LogWarning, "cluster", "Could not open spool file: " + path);
+ return;
+ }
+
+ m_LogFile = boost::make_shared<StdioStream>(fp, true);
+ m_LogMessageCount = 0;
+}
+
+void ClusterComponent::CloseLogFile(void)
+{
+ m_LogFile->Close();
+ m_LogFile.reset();
+}
+
+void ClusterComponent::LogGlobHandler(std::vector<int>& files, const String& file)
+{
+ String name = Utility::BaseName(file);
+ int ts = Convert::ToLong(name);
+ files.push_back(ts);
+}
+
+void ClusterComponent::ReplayLog(const Endpoint::Ptr& endpoint, const Stream::Ptr& stream)
+{
+ int count = 0;
+
+ ASSERT(OwnsLock());
+
+ CloseLogFile();
+
+ std::vector<int> files;
+ Utility::Glob(GetClusterDir() + "*", boost::bind(&ClusterComponent::LogGlobHandler, boost::ref(files), _1));
+ std::sort(files.begin(), files.end());
+
+ BOOST_FOREACH(int ts, files) {
+ std::ostringstream msgbuf;
+ msgbuf << GetClusterDir() << ts;
+ String path = msgbuf.str();
+
+ Log(LogInformation, "cluster", "Replaying log: " + path);
+
+ std::fstream *fp = new std::fstream(path.CStr(), std::fstream::in);
+ StdioStream::Ptr lstream = boost::make_shared<StdioStream>(fp, true);
+
+ String message;
+ while (NetString::ReadStringFromStream(lstream, &message)) {
+ Dictionary::Ptr pmessage = Value::Deserialize(message);
+
+ if (pmessage->Get("timestamp") < endpoint->GetLocalLogPosition())
+ continue;
+
+ String json = Value(pmessage->Get("message")).Serialize();
+ NetString::WriteStringToStream(stream, json);
+ count++;
+ }
+
+ lstream->Close();
+ }
+
+ Log(LogInformation, "cluster", "Replayed " + Convert::ToString(count) + " messages.");
+
+ OpenLogFile();
+}
+
/**
* Processes a new client connection.
*
return;
}
+ {
+ ObjectLock olock(this);
+ ReplayLog(endpoint, tlsStream);
+ }
+
endpoint->SetClient(tlsStream);
}
return;
}
+ if (sender->GetRemoteLogPosition() + 10 < message->Get("ts")) {
+ Dictionary::Ptr lparams = boost::make_shared<Dictionary>();
+ lparams->Set("log_position", message->Get("ts"));
+
+ Dictionary::Ptr lmessage = boost::make_shared<Dictionary>();
+ lmessage->Set("jsonrpc", "2.0");
+ lmessage->Set("method", "cluster::SetLogPosition");
+ lmessage->Set("params", lparams);
+
+ sender->SendMessage(lmessage);
+ }
+
Dictionary::Ptr params = message->Get("params");
if (!params)
ObjectLock olock(service);
service->ClearAcknowledgement(sender->GetName());
+ } else if (message->Get("method") == "cluster::SetLogPosition") {
+ sender->SetRemoteLogPosition(params->Get("log_position"));
}
}
#include "base/tlsstream.h"
#include "base/utility.h"
#include "base/tlsutility.h"
+#include "base/stdiostream.h"
#include "icinga/service.h"
#include "cluster/endpoint.h"
String GetBindHost(void) const;
String GetBindPort(void) const;
Array::Ptr GetPeers(void) const;
+ String GetClusterDir(void) const;
shared_ptr<SSL_CTX> GetSSLContext(void) const;
String GetIdentity(void) const;
void RelayMessage(const Endpoint::Ptr& except, const Dictionary::Ptr& message, bool persistent);
+ void OpenLogFile(void);
+ void CloseLogFile(void);
+ static void LogGlobHandler(std::vector<int>& files, const String& file);
+ void ReplayLog(const Endpoint::Ptr& endpoint, const Stream::Ptr& stream);
+
+ StdioStream::Ptr m_LogFile;
+ size_t m_LogMessageCount;
+
void CheckResultHandler(const Service::Ptr& service, const Dictionary::Ptr& cr, const String& authority);
void NextCheckChangedHandler(const Service::Ptr& service, double nextCheck, const String& authority);
void NextNotificationChangedHandler(const Notification::Ptr& notification, double nextCheck, const String& authority);
void AcknowledgementSetHandler(const Service::Ptr& service, const String& author, const String& comment, AcknowledgementType type, double expiry, const String& authority);
void AcknowledgementClearedHandler(const Service::Ptr& service, const String& authority);
void MessageHandler(const Endpoint::Ptr& sender, const Dictionary::Ptr& message);
-
};
}
#include "base/logger_fwd.h"
#include "config/configitembuilder.h"
#include <boost/smart_ptr/make_shared.hpp>
-#include <fstream>
using namespace icinga;
boost::signals2::signal<void (const Endpoint::Ptr&)> Endpoint::OnConnected;
boost::signals2::signal<void (const Endpoint::Ptr&, const Dictionary::Ptr&)> Endpoint::OnMessageReceived;
-void Endpoint::OnConfigLoaded(void)
-{
- ObjectLock olock(this);
-
- OpenSpoolFile();
-}
-
-void Endpoint::Stop(void)
-{
- ObjectLock olock(this);
-
- CloseSpoolFile();
-}
-
-void Endpoint::OpenSpoolFile(void)
-{
- ASSERT(OwnsLock());
-
- String path = GetSpoolPath();
- std::fstream *fp = new std::fstream(path.CStr(), std::fstream::out | std::ofstream::app);
-
- if (!fp->good())
- Log(LogWarning, "cluster", "Could not open spool file: " + path);
-
- m_SpoolFile = boost::make_shared<StdioStream>(fp, true);
-}
-
-void Endpoint::CloseSpoolFile(void)
-{
- ASSERT(OwnsLock());
-
- m_SpoolFile->Close();
- m_SpoolFile.reset();
-}
-
/**
* Checks whether this endpoint is connected.
*
{
ObjectLock olock(this);
- CloseSpoolFile();
-
- String path = GetSpoolPath();
- std::ifstream *fp = new std::ifstream(path.CStr(), std::ifstream::in);
-
- while (fp && !fp->eof()) {
- char data[1024];
- fp->read(data, sizeof(data));
- client->Write(data, fp->gcount());
- }
-
- fp->close();
- delete fp;
-
- (void) unlink(path.CStr());
-
- OpenSpoolFile();
-
m_Client = client;
}
void Endpoint::SendMessage(const Dictionary::Ptr& message)
{
- Stream::Ptr destination;
+ Stream::Ptr client = GetClient();
- if (!IsConnected())
- destination = m_SpoolFile;
- else
- destination = GetClient();
+ if (!client)
+ return;
try {
- JsonRpc::SendMessage(destination, message);
+ JsonRpc::SendMessage(client, message);
} catch (const std::exception& ex) {
std::ostringstream msgbuf;
msgbuf << "Error while sending JSON-RPC message for endpoint '" << GetName() << "': " << boost::diagnostic_information(ex);
m_Seen = ts;
}
-String Endpoint::GetSpoolPath(void) const
+double Endpoint::GetLocalLogPosition(void) const
{
- return Application::GetLocalStateDir() + "/spool/icinga2/cluster/" + GetName() + ".ns";
+ return m_LocalLogPosition;
+}
+
+void Endpoint::SetLocalLogPosition(double ts)
+{
+ m_LocalLogPosition = ts;
+}
+
+double Endpoint::GetRemoteLogPosition(void) const
+{
+ return m_RemoteLogPosition;
+}
+
+void Endpoint::SetRemoteLogPosition(double ts)
+{
+ m_RemoteLogPosition = ts;
}
void Endpoint::InternalSerialize(const Dictionary::Ptr& bag, int attributeTypes) const
bag->Set("port", m_Port);
}
- if (attributeTypes & Attribute_State)
+ if (attributeTypes & Attribute_State) {
bag->Set("seen", m_Seen);
+ bag->Set("local_log_position", m_LocalLogPosition);
+ bag->Set("remote_log_position", m_RemoteLogPosition);
+ }
}
void Endpoint::InternalDeserialize(const Dictionary::Ptr& bag, int attributeTypes)
m_Port = bag->Get("port");
}
- if (attributeTypes & Attribute_State)
+ if (attributeTypes & Attribute_State) {
m_Seen = bag->Get("seen");
+ m_LocalLogPosition = bag->Get("local_log_position");
+ m_RemoteLogPosition = bag->Get("remote_log_position");
+ }
}
#include "base/dynamicobject.h"
#include "base/stream.h"
-#include "base/stdiostream.h"
#include <boost/signals2.hpp>
namespace icinga
double GetSeen(void) const;
void SetSeen(double ts);
- String GetSpoolPath(void) const;
+ double GetLocalLogPosition(void) const;
+ void SetLocalLogPosition(double ts);
+
+ double GetRemoteLogPosition(void) const;
+ void SetRemoteLogPosition(double ts);
protected:
virtual void InternalSerialize(const Dictionary::Ptr& bag, int attributeTypes) const;
virtual void InternalDeserialize(const Dictionary::Ptr& bag, int attributeTypes);
- virtual void OnConfigLoaded(void);
- virtual void Stop(void);
-
private:
Dictionary::Ptr m_Subscriptions;
String m_Host;
Stream::Ptr m_Client;
double m_Seen;
- StdioStream::Ptr m_SpoolFile;
+ double m_LocalLogPosition;
+ double m_RemoteLogPosition;
void MessageThreadProc(const Stream::Ptr& stream);
-
- void OpenSpoolFile(void);
- void CloseSpoolFile(void);
};
}