* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
******************************************************************************/
-#include "cluster/clustercomponent.h"
+#include "cluster/clusterlistener.h"
#include "cluster/endpoint.h"
#include "icinga/domain.h"
#include "base/netstring.h"
using namespace icinga;
-REGISTER_TYPE(ClusterComponent);
+REGISTER_TYPE(ClusterListener);
/**
* Starts the component.
*/
-void ClusterComponent::Start(void)
+void ClusterListener::Start(void)
{
DynamicObject::Start();
AddListener(GetBindPort());
m_ClusterTimer = boost::make_shared<Timer>();
- m_ClusterTimer->OnTimerExpired.connect(boost::bind(&ClusterComponent::ClusterTimerHandler, this));
+ m_ClusterTimer->OnTimerExpired.connect(boost::bind(&ClusterListener::ClusterTimerHandler, this));
m_ClusterTimer->SetInterval(5);
m_ClusterTimer->Start();
- Service::OnNewCheckResult.connect(boost::bind(&ClusterComponent::CheckResultHandler, this, _1, _2, _3));
- Service::OnNextCheckChanged.connect(boost::bind(&ClusterComponent::NextCheckChangedHandler, this, _1, _2, _3));
- Notification::OnNextNotificationChanged.connect(boost::bind(&ClusterComponent::NextNotificationChangedHandler, this, _1, _2, _3));
- Service::OnForceNextCheckChanged.connect(boost::bind(&ClusterComponent::ForceNextCheckChangedHandler, this, _1, _2, _3));
- Service::OnForceNextNotificationChanged.connect(boost::bind(&ClusterComponent::ForceNextNotificationChangedHandler, this, _1, _2, _3));
- Service::OnEnableActiveChecksChanged.connect(boost::bind(&ClusterComponent::EnableActiveChecksChangedHandler, this, _1, _2, _3));
- Service::OnEnablePassiveChecksChanged.connect(boost::bind(&ClusterComponent::EnablePassiveChecksChangedHandler, this, _1, _2, _3));
- Service::OnEnableNotificationsChanged.connect(boost::bind(&ClusterComponent::EnableNotificationsChangedHandler, this, _1, _2, _3));
- Service::OnEnableFlappingChanged.connect(boost::bind(&ClusterComponent::EnableFlappingChangedHandler, this, _1, _2, _3));
- Service::OnCommentAdded.connect(boost::bind(&ClusterComponent::CommentAddedHandler, this, _1, _2, _3));
- Service::OnCommentRemoved.connect(boost::bind(&ClusterComponent::CommentRemovedHandler, this, _1, _2, _3));
- Service::OnDowntimeAdded.connect(boost::bind(&ClusterComponent::DowntimeAddedHandler, this, _1, _2, _3));
- Service::OnDowntimeRemoved.connect(boost::bind(&ClusterComponent::DowntimeRemovedHandler, this, _1, _2, _3));
- Service::OnAcknowledgementSet.connect(boost::bind(&ClusterComponent::AcknowledgementSetHandler, this, _1, _2, _3, _4, _5, _6));
- Service::OnAcknowledgementCleared.connect(boost::bind(&ClusterComponent::AcknowledgementClearedHandler, this, _1, _2));
-
- Endpoint::OnMessageReceived.connect(boost::bind(&ClusterComponent::AsyncMessageHandler, this, _1, _2));
+ Service::OnNewCheckResult.connect(boost::bind(&ClusterListener::CheckResultHandler, this, _1, _2, _3));
+ Service::OnNextCheckChanged.connect(boost::bind(&ClusterListener::NextCheckChangedHandler, this, _1, _2, _3));
+ Notification::OnNextNotificationChanged.connect(boost::bind(&ClusterListener::NextNotificationChangedHandler, this, _1, _2, _3));
+ Service::OnForceNextCheckChanged.connect(boost::bind(&ClusterListener::ForceNextCheckChangedHandler, this, _1, _2, _3));
+ Service::OnForceNextNotificationChanged.connect(boost::bind(&ClusterListener::ForceNextNotificationChangedHandler, this, _1, _2, _3));
+ Service::OnEnableActiveChecksChanged.connect(boost::bind(&ClusterListener::EnableActiveChecksChangedHandler, this, _1, _2, _3));
+ Service::OnEnablePassiveChecksChanged.connect(boost::bind(&ClusterListener::EnablePassiveChecksChangedHandler, this, _1, _2, _3));
+ Service::OnEnableNotificationsChanged.connect(boost::bind(&ClusterListener::EnableNotificationsChangedHandler, this, _1, _2, _3));
+ Service::OnEnableFlappingChanged.connect(boost::bind(&ClusterListener::EnableFlappingChangedHandler, this, _1, _2, _3));
+ Service::OnCommentAdded.connect(boost::bind(&ClusterListener::CommentAddedHandler, this, _1, _2, _3));
+ Service::OnCommentRemoved.connect(boost::bind(&ClusterListener::CommentRemovedHandler, this, _1, _2, _3));
+ Service::OnDowntimeAdded.connect(boost::bind(&ClusterListener::DowntimeAddedHandler, this, _1, _2, _3));
+ Service::OnDowntimeRemoved.connect(boost::bind(&ClusterListener::DowntimeRemovedHandler, this, _1, _2, _3));
+ Service::OnAcknowledgementSet.connect(boost::bind(&ClusterListener::AcknowledgementSetHandler, this, _1, _2, _3, _4, _5, _6));
+ Service::OnAcknowledgementCleared.connect(boost::bind(&ClusterListener::AcknowledgementClearedHandler, this, _1, _2));
+
+ Endpoint::OnMessageReceived.connect(boost::bind(&ClusterListener::AsyncMessageHandler, this, _1, _2));
BOOST_FOREACH(const DynamicType::Ptr& type, DynamicType::GetTypes()) {
BOOST_FOREACH(const DynamicObject::Ptr& object, type->GetObjects()) {
/**
* Stops the component.
*/
-void ClusterComponent::Stop(void)
+void ClusterListener::Stop(void)
{
ObjectLock olock(this);
CloseLogFile();
RotateLogFile();
}
-String ClusterComponent::GetCertificateFile(void) const
+String ClusterListener::GetCertificateFile(void) const
{
ObjectLock olock(this);
return m_CertPath;
}
-String ClusterComponent::GetCAFile(void) const
+String ClusterListener::GetCAFile(void) const
{
ObjectLock olock(this);
return m_CAPath;
}
-String ClusterComponent::GetBindHost(void) const
+String ClusterListener::GetBindHost(void) const
{
ObjectLock olock(this);
return m_BindHost;
}
-String ClusterComponent::GetBindPort(void) const
+String ClusterListener::GetBindPort(void) const
{
ObjectLock olock(this);
return m_BindPort;
}
-Array::Ptr ClusterComponent::GetPeers(void) const
+Array::Ptr ClusterListener::GetPeers(void) const
{
ObjectLock olock(this);
return m_Peers;
}
-shared_ptr<SSL_CTX> ClusterComponent::GetSSLContext(void) const
+shared_ptr<SSL_CTX> ClusterListener::GetSSLContext(void) const
{
ObjectLock olock(this);
return m_SSLContext;
}
-String ClusterComponent::GetIdentity(void) const
+String ClusterListener::GetIdentity(void) const
{
ObjectLock olock(this);
*
* @param service The port to listen on.
*/
-void ClusterComponent::AddListener(const String& service)
+void ClusterListener::AddListener(const String& service)
{
ObjectLock olock(this);
TcpSocket::Ptr server = boost::make_shared<TcpSocket>();
server->Bind(service, AF_INET6);
- boost::thread thread(boost::bind(&ClusterComponent::ListenerThreadProc, this, server));
+ boost::thread thread(boost::bind(&ClusterListener::ListenerThreadProc, this, server));
thread.detach();
m_Servers.insert(server);
}
-void ClusterComponent::ListenerThreadProc(const Socket::Ptr& server)
+void ClusterListener::ListenerThreadProc(const Socket::Ptr& server)
{
Utility::SetThreadName("Cluster Listener");
for (;;) {
Socket::Ptr client = server->Accept();
- Utility::QueueAsyncCallback(boost::bind(&ClusterComponent::NewClientHandler, this, client, TlsRoleServer));
+ Utility::QueueAsyncCallback(boost::bind(&ClusterListener::NewClientHandler, this, client, TlsRoleServer));
}
}
* @param node The remote host.
* @param service The remote port.
*/
-void ClusterComponent::AddConnection(const String& node, const String& service) {
+void ClusterListener::AddConnection(const String& node, const String& service) {
{
ObjectLock olock(this);
TcpSocket::Ptr client = boost::make_shared<TcpSocket>();
client->Connect(node, service);
- Utility::QueueAsyncCallback(boost::bind(&ClusterComponent::NewClientHandler, this, client, TlsRoleClient));
+ Utility::QueueAsyncCallback(boost::bind(&ClusterListener::NewClientHandler, this, client, TlsRoleClient));
}
-void ClusterComponent::AsyncRelayMessage(const Endpoint::Ptr& source, const Dictionary::Ptr& message, bool persistent)
+void ClusterListener::AsyncRelayMessage(const Endpoint::Ptr& source, const Dictionary::Ptr& message, bool persistent)
{
- m_RelayQueue.Enqueue(boost::bind(&ClusterComponent::RelayMessage, this, source, message, persistent));
+ m_RelayQueue.Enqueue(boost::bind(&ClusterListener::RelayMessage, this, source, message, persistent));
}
-void ClusterComponent::PersistMessage(const Endpoint::Ptr& source, const Dictionary::Ptr& message)
+void ClusterListener::PersistMessage(const Endpoint::Ptr& source, const Dictionary::Ptr& message)
{
double ts = message->Get("ts");
}
}
-void ClusterComponent::RelayMessage(const Endpoint::Ptr& source, const Dictionary::Ptr& message, bool persistent)
+void ClusterListener::RelayMessage(const Endpoint::Ptr& source, const Dictionary::Ptr& message, bool persistent)
{
double ts = Utility::GetTime();
message->Set("ts", ts);
if (persistent)
- m_LogQueue.Enqueue(boost::bind(&ClusterComponent::PersistMessage, this, source, message));
+ m_LogQueue.Enqueue(boost::bind(&ClusterListener::PersistMessage, this, source, message));
Dictionary::Ptr security = message->Get("security");
DynamicObject::Ptr secobj;
}
}
-String ClusterComponent::GetClusterDir(void) const
+String ClusterListener::GetClusterDir(void) const
{
return Application::GetLocalStateDir() + "/lib/icinga2/cluster/";
}
-void ClusterComponent::OpenLogFile(void)
+void ClusterListener::OpenLogFile(void)
{
ASSERT(OwnsLock());
m_LogMessageTimestamp = 0;
}
-void ClusterComponent::CloseLogFile(void)
+void ClusterListener::CloseLogFile(void)
{
ASSERT(OwnsLock());
}
-void ClusterComponent::RotateLogFile(void)
+void ClusterListener::RotateLogFile(void)
{
ASSERT(OwnsLock());
(void) rename(oldpath.CStr(), newpath.CStr());
}
-void ClusterComponent::LogGlobHandler(std::vector<int>& files, const String& file)
+void ClusterListener::LogGlobHandler(std::vector<int>& files, const String& file)
{
String name = Utility::BaseName(file);
files.push_back(ts);
}
-void ClusterComponent::ReplayLog(const Endpoint::Ptr& endpoint, const Stream::Ptr& stream)
+void ClusterListener::ReplayLog(const Endpoint::Ptr& endpoint, const Stream::Ptr& stream)
{
int count = -1;
double peer_ts = endpoint->GetLocalLogPosition();
count = 0;
std::vector<int> files;
- Utility::Glob(GetClusterDir() + "log/*", boost::bind(&ClusterComponent::LogGlobHandler, boost::ref(files), _1));
+ Utility::Glob(GetClusterDir() + "log/*", boost::bind(&ClusterListener::LogGlobHandler, boost::ref(files), _1));
std::sort(files.begin(), files.end());
BOOST_FOREACH(int ts, files) {
}
}
-void ClusterComponent::ConfigGlobHandler(const Dictionary::Ptr& config, const String& file, bool basename)
+void ClusterListener::ConfigGlobHandler(const Dictionary::Ptr& config, const String& file, bool basename)
{
Dictionary::Ptr elem = boost::make_shared<Dictionary>();
*
* @param client The new client.
*/
-void ClusterComponent::NewClientHandler(const Socket::Ptr& client, TlsRole role)
+void ClusterListener::NewClientHandler(const Socket::Ptr& client, TlsRole role)
{
NetworkStream::Ptr netStream = boost::make_shared<NetworkStream>(client);
if (configFiles) {
ObjectLock olock(configFiles);
BOOST_FOREACH(const String& pattern, configFiles) {
- Utility::Glob(pattern, boost::bind(&ClusterComponent::ConfigGlobHandler, boost::cref(config), _1, false));
+ Utility::Glob(pattern, boost::bind(&ClusterListener::ConfigGlobHandler, boost::cref(config), _1, false));
}
}
ReplayLog(endpoint, tlsStream);
}
-void ClusterComponent::ClusterTimerHandler(void)
+void ClusterListener::ClusterTimerHandler(void)
{
/* broadcast a heartbeat message */
Dictionary::Ptr params = boost::make_shared<Dictionary>();
}
std::vector<int> files;
- Utility::Glob(GetClusterDir() + "log/*", boost::bind(&ClusterComponent::LogGlobHandler, boost::ref(files), _1));
+ Utility::Glob(GetClusterDir() + "log/*", boost::bind(&ClusterListener::LogGlobHandler, boost::ref(files), _1));
std::sort(files.begin(), files.end());
BOOST_FOREACH(int ts, files) {
}
}
-void ClusterComponent::SetSecurityInfo(const Dictionary::Ptr& message, const DynamicObject::Ptr& object, int privs)
+void ClusterListener::SetSecurityInfo(const Dictionary::Ptr& message, const DynamicObject::Ptr& object, int privs)
{
ASSERT(object);
message->Set("security", security);
}
-void ClusterComponent::CheckResultHandler(const Service::Ptr& service, const Dictionary::Ptr& cr, const String& authority)
+void ClusterListener::CheckResultHandler(const Service::Ptr& service, const Dictionary::Ptr& cr, const String& authority)
{
if (!authority.IsEmpty() && authority != GetIdentity())
return;
AsyncRelayMessage(Endpoint::Ptr(), message, true);
}
-void ClusterComponent::NextCheckChangedHandler(const Service::Ptr& service, double nextCheck, const String& authority)
+void ClusterListener::NextCheckChangedHandler(const Service::Ptr& service, double nextCheck, const String& authority)
{
if (!authority.IsEmpty() && authority != GetIdentity())
return;
AsyncRelayMessage(Endpoint::Ptr(), message, true);
}
-void ClusterComponent::NextNotificationChangedHandler(const Notification::Ptr& notification, double nextNotification, const String& authority)
+void ClusterListener::NextNotificationChangedHandler(const Notification::Ptr& notification, double nextNotification, const String& authority)
{
if (!authority.IsEmpty() && authority != GetIdentity())
return;
AsyncRelayMessage(Endpoint::Ptr(), message, true);
}
-void ClusterComponent::ForceNextCheckChangedHandler(const Service::Ptr& service, bool forced, const String& authority)
+void ClusterListener::ForceNextCheckChangedHandler(const Service::Ptr& service, bool forced, const String& authority)
{
if (!authority.IsEmpty() && authority != GetIdentity())
return;
AsyncRelayMessage(Endpoint::Ptr(), message, true);
}
-void ClusterComponent::ForceNextNotificationChangedHandler(const Service::Ptr& service, bool forced, const String& authority)
+void ClusterListener::ForceNextNotificationChangedHandler(const Service::Ptr& service, bool forced, const String& authority)
{
if (!authority.IsEmpty() && authority != GetIdentity())
return;
AsyncRelayMessage(Endpoint::Ptr(), message, true);
}
-void ClusterComponent::EnableActiveChecksChangedHandler(const Service::Ptr& service, bool enabled, const String& authority)
+void ClusterListener::EnableActiveChecksChangedHandler(const Service::Ptr& service, bool enabled, const String& authority)
{
if (!authority.IsEmpty() && authority != GetIdentity())
return;
AsyncRelayMessage(Endpoint::Ptr(), message, true);
}
-void ClusterComponent::EnablePassiveChecksChangedHandler(const Service::Ptr& service, bool enabled, const String& authority)
+void ClusterListener::EnablePassiveChecksChangedHandler(const Service::Ptr& service, bool enabled, const String& authority)
{
if (!authority.IsEmpty() && authority != GetIdentity())
return;
AsyncRelayMessage(Endpoint::Ptr(), message, true);
}
-void ClusterComponent::EnableNotificationsChangedHandler(const Service::Ptr& service, bool enabled, const String& authority)
+void ClusterListener::EnableNotificationsChangedHandler(const Service::Ptr& service, bool enabled, const String& authority)
{
if (!authority.IsEmpty() && authority != GetIdentity())
return;
AsyncRelayMessage(Endpoint::Ptr(), message, true);
}
-void ClusterComponent::EnableFlappingChangedHandler(const Service::Ptr& service, bool enabled, const String& authority)
+void ClusterListener::EnableFlappingChangedHandler(const Service::Ptr& service, bool enabled, const String& authority)
{
if (!authority.IsEmpty() && authority != GetIdentity())
return;
AsyncRelayMessage(Endpoint::Ptr(), message, true);
}
-void ClusterComponent::CommentAddedHandler(const Service::Ptr& service, const Dictionary::Ptr& comment, const String& authority)
+void ClusterListener::CommentAddedHandler(const Service::Ptr& service, const Dictionary::Ptr& comment, const String& authority)
{
if (!authority.IsEmpty() && authority != GetIdentity())
return;
AsyncRelayMessage(Endpoint::Ptr(), message, true);
}
-void ClusterComponent::CommentRemovedHandler(const Service::Ptr& service, const Dictionary::Ptr& comment, const String& authority)
+void ClusterListener::CommentRemovedHandler(const Service::Ptr& service, const Dictionary::Ptr& comment, const String& authority)
{
if (!authority.IsEmpty() && authority != GetIdentity())
return;
AsyncRelayMessage(Endpoint::Ptr(), message, true);
}
-void ClusterComponent::DowntimeAddedHandler(const Service::Ptr& service, const Dictionary::Ptr& downtime, const String& authority)
+void ClusterListener::DowntimeAddedHandler(const Service::Ptr& service, const Dictionary::Ptr& downtime, const String& authority)
{
if (!authority.IsEmpty() && authority != GetIdentity())
return;
AsyncRelayMessage(Endpoint::Ptr(), message, true);
}
-void ClusterComponent::DowntimeRemovedHandler(const Service::Ptr& service, const Dictionary::Ptr& downtime, const String& authority)
+void ClusterListener::DowntimeRemovedHandler(const Service::Ptr& service, const Dictionary::Ptr& downtime, const String& authority)
{
if (!authority.IsEmpty() && authority != GetIdentity())
return;
AsyncRelayMessage(Endpoint::Ptr(), message, true);
}
-void ClusterComponent::AcknowledgementSetHandler(const Service::Ptr& service, const String& author, const String& comment, AcknowledgementType type, double expiry, const String& authority)
+void ClusterListener::AcknowledgementSetHandler(const Service::Ptr& service, const String& author, const String& comment, AcknowledgementType type, double expiry, const String& authority)
{
if (!authority.IsEmpty() && authority != GetIdentity())
return;
AsyncRelayMessage(Endpoint::Ptr(), message, true);
}
-void ClusterComponent::AcknowledgementClearedHandler(const Service::Ptr& service, const String& authority)
+void ClusterListener::AcknowledgementClearedHandler(const Service::Ptr& service, const String& authority)
{
if (!authority.IsEmpty() && authority != GetIdentity())
return;
AsyncRelayMessage(Endpoint::Ptr(), message, true);
}
-void ClusterComponent::AsyncMessageHandler(const Endpoint::Ptr& sender, const Dictionary::Ptr& message)
+void ClusterListener::AsyncMessageHandler(const Endpoint::Ptr& sender, const Dictionary::Ptr& message)
{
- m_MessageQueue.Enqueue(boost::bind(&ClusterComponent::MessageHandler, this, sender, message));
+ m_MessageQueue.Enqueue(boost::bind(&ClusterListener::MessageHandler, this, sender, message));
}
-void ClusterComponent::MessageHandler(const Endpoint::Ptr& sender, const Dictionary::Ptr& message)
+void ClusterListener::MessageHandler(const Endpoint::Ptr& sender, const Dictionary::Ptr& message)
{
sender->SetSeen(Utility::GetTime());
}
Dictionary::Ptr localConfig = boost::make_shared<Dictionary>();
- Utility::Glob(dir + "/*", boost::bind(&ClusterComponent::ConfigGlobHandler, boost::cref(localConfig), _1, true));
+ Utility::Glob(dir + "/*", boost::bind(&ClusterListener::ConfigGlobHandler, boost::cref(localConfig), _1, true));
bool configChange = false;
}
}
-bool ClusterComponent::IsAuthority(const DynamicObject::Ptr& object, const String& type)
+bool ClusterListener::IsAuthority(const DynamicObject::Ptr& object, const String& type)
{
Array::Ptr authorities = object->GetAuthorities();
std::vector<String> endpoints;
return (endpoints[index] == GetIdentity());
}
-void ClusterComponent::UpdateAuthority(void)
+void ClusterListener::UpdateAuthority(void)
{
Log(LogDebug, "cluster", "Updating authority for objects.");
}
}
-bool ClusterComponent::SupportsChecks(void)
+bool ClusterListener::SupportsChecks(void)
{
DynamicType::Ptr type = DynamicType::GetByName("CheckerComponent");
return !type->GetObjects().empty();
}
-bool ClusterComponent::SupportsNotifications(void)
+bool ClusterListener::SupportsNotifications(void)
{
DynamicType::Ptr type = DynamicType::GetByName("NotificationComponent");
return !type->GetObjects().empty();
}
-void ClusterComponent::InternalSerialize(const Dictionary::Ptr& bag, int attributeTypes) const
+void ClusterListener::InternalSerialize(const Dictionary::Ptr& bag, int attributeTypes) const
{
DynamicObject::InternalSerialize(bag, attributeTypes);
bag->Set("log_message_timestamp", m_LogMessageTimestamp);
}
-void ClusterComponent::InternalDeserialize(const Dictionary::Ptr& bag, int attributeTypes)
+void ClusterListener::InternalDeserialize(const Dictionary::Ptr& bag, int attributeTypes)
{
DynamicObject::InternalDeserialize(bag, attributeTypes);