OnWritable += bind_weak(&TCPClient::WritableEventHandler, shared_from_this());
}
-void TCPClient::Connect(const string& hostname, unsigned short port)
+void TCPClient::Connect(const string& node, const string& service)
{
m_Role = RoleOutbound;
- stringstream s;
- s << port;
- string strPort = s.str();
-
addrinfo hints;
addrinfo *result;
hints.ai_socktype = SOCK_STREAM;
hints.ai_protocol = IPPROTO_TCP;
- int rc = getaddrinfo(hostname.c_str(), strPort.c_str(), &hints, &result);
+ int rc = getaddrinfo(node.c_str(), service.c_str(), &hints, &result);
if (rc < 0) {
HandleSocketError();
virtual void Start(void);
- void Connect(const string& hostname, unsigned short port);
+ void Connect(const string& node, const string& service);
FIFO::Ptr GetSendQueue(void);
FIFO::Ptr GetRecvQueue(void);
int rc = listen(GetFD(), SOMAXCONN);
if (rc < 0) {
- Close();
+ HandleSocketError();
return;
}
}
SetFD(fd);
}
-void TCPSocket::Bind(unsigned short port, int family)
+void TCPSocket::Bind(string service, int family)
{
- Bind(NULL, port, family);
+ Bind(string(), service, family);
}
-void TCPSocket::Bind(const char *hostname, unsigned short port, int family)
+void TCPSocket::Bind(string node, string service, int family)
{
- stringstream s;
- s << port;
- string strPort = s.str();
-
addrinfo hints;
addrinfo *result;
hints.ai_protocol = IPPROTO_TCP;
hints.ai_flags = AI_PASSIVE;
- if (getaddrinfo(hostname, strPort.c_str(), &hints, &result) < 0) {
+ if (getaddrinfo(node.c_str(), service.c_str(), &hints, &result) < 0) {
HandleSocketError();
return;
typedef shared_ptr<TCPSocket> Ptr;
typedef weak_ptr<TCPSocket> WeakPtr;
- void Bind(unsigned short port, int family);
- void Bind(const char *hostname, unsigned short port, int family);
+ void Bind(string service, int family);
+ void Bind(string node, string service, int family);
};
}
GetEndpointManager()->OnNewEndpoint += bind_weak(&DiscoveryComponent::NewEndpointHandler, shared_from_this());
GetEndpointManager()->RegisterEndpoint(m_DiscoveryEndpoint);
+
+ m_DiscoveryConnectTimer = make_shared<Timer>();
+ m_DiscoveryConnectTimer->SetInterval(30);
+ m_DiscoveryConnectTimer->OnTimerExpired += bind_weak(&DiscoveryComponent::ReconnectTimerHandler, shared_from_this());
+ m_DiscoveryConnectTimer->Start();
}
void DiscoveryComponent::Stop(void)
{
neea.Endpoint->OnIdentityChanged += bind_weak(&DiscoveryComponent::NewIdentityHandler, shared_from_this());
- /* accept discovery::RegisterComponent messages from any endpoint */
- neea.Endpoint->RegisterMethodSource("discovery::RegisterComponent");
+ if (IsBroker()) {
+ /* accept discovery::RegisterComponent messages from any endpoint */
+ neea.Endpoint->RegisterMethodSource("discovery::RegisterComponent");
+ }
+
+ /* TODO: implement message broker authorisation */
+ neea.Endpoint->RegisterMethodSource("discovery::NewComponent");
+
+ /* TODO: register handler to unregister this endpoint when it's closed */
return 0;
}
Endpoint::Ptr endpoint = static_pointer_cast<Endpoint>(ea.Source);
string identity = endpoint->GetIdentity();
- if (identity == GetEndpointManager()->GetIdentity()) {
- Application::Log("Detected loop-back connection - Disconnecting endpoint.");
+ if (!GetIcingaApplication()->IsDebugging()) {
+ if (identity == GetEndpointManager()->GetIdentity()) {
+ Application::Log("Detected loop-back connection - Disconnecting endpoint.");
- endpoint->Stop();
- GetEndpointManager()->UnregisterEndpoint(endpoint);
+ endpoint->Stop();
+ GetEndpointManager()->UnregisterEndpoint(endpoint);
- return 0;
- }
+ return 0;
+ }
- GetEndpointManager()->ForeachEndpoint(bind(&DiscoveryComponent::CheckExistingEndpoint, this, endpoint, _1));
+ GetEndpointManager()->ForeachEndpoint(bind(&DiscoveryComponent::CheckExistingEndpoint, this, endpoint, _1));
+ }
// we assume the other component _always_ wants
// discovery::RegisterComponent messages from us
return;
if (!info->Node.empty() && !info->Service.empty()) {
- params.SetPropertyString("node", info->Node);
- params.SetPropertyString("service", info->Service);
+ params.SetNode(info->Node);
+ params.SetService(info->Service);
}
set<string>::iterator i;
void DiscoveryComponent::ProcessDiscoveryMessage(string identity, DiscoveryMessage message)
{
ComponentDiscoveryInfo::Ptr info = make_shared<ComponentDiscoveryInfo>();
-}
-
-int DiscoveryComponent::NewComponentMessageHandler(const NewRequestEventArgs& nrea)
-{
- /*Message message;
- nrea.Request.GetParams(&message);
- ProcessDiscoveryMessage(message.GetPropertyString(, DiscoveryMessage(message));*/
- return 0;
-}
-
-int DiscoveryComponent::RegisterComponentMessageHandler(const NewRequestEventArgs& nrea)
-{
- Message message;
- nrea.Request.GetParams(&message);
- ProcessDiscoveryMessage(nrea.Sender->GetIdentity(), DiscoveryMessage(message));
- return 0;
-}
-void DiscoveryComponent::AddSubscribedMethod(string identity, string method)
-{
- ComponentDiscoveryInfo::Ptr info;
+ message.GetNode(&info->Node);
+ message.GetService(&info->Service);
- if (!GetComponentDiscoveryInfo(identity, &info))
- return;
+ Message provides;
+ if (message.GetProvides(&provides)) {
+ DictionaryIterator i;
+ for (i = provides.GetDictionary()->Begin(); i != provides.GetDictionary()->End(); i++) {
+ info->PublishedMethods.insert(i->second);
+ }
+ }
- info->SubscribedMethods.insert(method);
-}
+ Message subscribes;
+ if (message.GetSubscribes(&subscribes)) {
+ DictionaryIterator i;
+ for (i = subscribes.GetDictionary()->Begin(); i != subscribes.GetDictionary()->End(); i++) {
+ info->SubscribedMethods.insert(i->second);
+ }
+ }
-bool DiscoveryComponent::IsSubscribedMethod(string identity, string method) const
-{
- if (GetEndpointManager()->GetIdentity() == identity)
- return true;
+ map<string, ComponentDiscoveryInfo::Ptr>::iterator i;
- ComponentDiscoveryInfo::Ptr info;
+ i = m_Components.find(identity);
- if (!GetComponentDiscoveryInfo(identity, &info))
- return false;
+ if (i != m_Components.end())
+ m_Components.erase(i);
- set<string>::const_iterator i;
- i = info->SubscribedMethods.find(method);
+ m_Components[identity] = info;
- return (i != info->SubscribedMethods.end());
+ SendDiscoveryMessage("discovery::NewComponent", identity, Endpoint::Ptr());
}
-void DiscoveryComponent::AddPublishedMethod(string identity, string method)
+int DiscoveryComponent::NewComponentMessageHandler(const NewRequestEventArgs& nrea)
{
- ComponentDiscoveryInfo::Ptr info;
+ DiscoveryMessage message;
+ nrea.Request.GetParams(&message);
- if (!GetComponentDiscoveryInfo(identity, &info))
- return;
+ string identity;
+ if (!message.GetIdentity(&identity))
+ return 0;
- info->PublishedMethods.insert(method);
+ ProcessDiscoveryMessage(identity, message);
+ return 0;
}
-bool DiscoveryComponent::IsPublishedMethod(string identity, string method) const
+int DiscoveryComponent::RegisterComponentMessageHandler(const NewRequestEventArgs& nrea)
{
- if (GetEndpointManager()->GetIdentity() == identity)
- return true;
+ DiscoveryMessage message;
+ nrea.Request.GetParams(&message);
+ ProcessDiscoveryMessage(nrea.Sender->GetIdentity(), message);
+ return 0;
+}
- ComponentDiscoveryInfo::Ptr info;
+int DiscoveryComponent::ReconnectTimerHandler(const TimerEventArgs& tea)
+{
+ EndpointManager::Ptr endpointManager = GetEndpointManager();
- if (!GetComponentDiscoveryInfo(identity, &info))
- return false;
+ map<string, ComponentDiscoveryInfo::Ptr>::iterator i;
+ for (i = m_Components.begin(); i != m_Components.end(); i++) {
+ if (endpointManager->HasConnectedEndpoint(i->first))
+ continue;
- set<string>::const_iterator i;
- i = info->PublishedMethods.find(method);
+ ComponentDiscoveryInfo::Ptr info = i->second;
+ endpointManager->AddConnection(info->Node, info->Service);
+ }
- return (i != info->PublishedMethods.end());
+ return 0;
}
EXPORT_COMPONENT(DiscoveryComponent);
private:
VirtualEndpoint::Ptr m_DiscoveryEndpoint;
map<string, ComponentDiscoveryInfo::Ptr> m_Components;
-
bool m_Broker;
+ Timer::Ptr m_DiscoveryConnectTimer;
int NewEndpointHandler(const NewEndpointEventArgs& neea);
int NewIdentityHandler(const EventArgs& ea);
int DiscoverySinkHandler(const NewMethodEventArgs& nmea, ComponentDiscoveryInfo::Ptr info) const;
int DiscoverySourceHandler(const NewMethodEventArgs& nmea, ComponentDiscoveryInfo::Ptr info) const;
+ int ReconnectTimerHandler(const TimerEventArgs& tea);
+
bool IsBroker(void) const;
public:
virtual string GetName(void) const;
virtual void Start(void);
virtual void Stop(void);
-
- void AddSubscribedMethod(string identity, string method);
- bool IsSubscribedMethod(string identity, string method) const;
-
- void AddPublishedMethod(string identity, string method);
- bool IsPublishedMethod(string identity, string method) const;
};
}
SetPropertyString("identity", value);
}
+ inline bool GetNode(string *value) const
+ {
+ return GetPropertyString("node", value);
+ }
+
+ inline void SetNode(const string& value)
+ {
+ SetPropertyString("node", value);
+ }
+
+ inline bool GetService(string *value) const
+ {
+ return GetPropertyString("service", value);
+ }
+
+ inline void SetService(const string& value)
+ {
+ SetPropertyString("service", value);
+ }
+
inline bool GetSubscribes(Message *value) const
{
return GetPropertyMessage("subscribes", value);
"discovery": { "replicate": "0", "broker": "1" }
},
"rpclistener": {
- "kekslistener": { "replicate": "0", "port": "7777" }
- },
- "host": {
- "localhost": { "ipaddr": "127.0.0.1" }
+ "kekslistener": { "replicate": "0", "service": "7777" }
}
}
\ No newline at end of file
--- /dev/null
+{
+ "icinga": {
+ "icinga": {
+ "privkey": "icinga-c2.key",
+ "pubkey": "icinga-c2.crt",
+ "cakey": "ca.crt",
+ "node": "10.0.10.3",
+ "service": "8888"
+ }
+ },
+ "component": {
+ "configrpc": { "replicate": "0", "configSource": "1" },
+ "demo": { "replicate": "0" },
+ "discovery": { "replicate": "0", "broker": "0" }
+ },
+ "rpclistener": {
+ "kekslistener": { "replicate": "0", "service": "8888" }
+ },
+ "rpcconnection": {
+ "keksclient": { "replicate": "0", "node": "127.0.0.1", "service": "7777" }
+ }
+}
\ No newline at end of file
return m_SSLContext;
}
-void EndpointManager::AddListener(unsigned short port)
+void EndpointManager::AddListener(string service)
{
stringstream s;
- s << "Adding new listener: port " << port;
+ s << "Adding new listener: port " << service;
Application::Log(s.str());
JsonRpcServer::Ptr server = make_shared<JsonRpcServer>(m_SSLContext);
RegisterServer(server);
- server->Bind(port, AF_INET6);
+ server->Bind(service, AF_INET6);
server->Listen();
server->Start();
}
-void EndpointManager::AddConnection(string host, unsigned short port)
+void EndpointManager::AddConnection(string node, string service)
{
stringstream s;
- s << "Adding new endpoint: [" << host << "]:" << port;
+ s << "Adding new endpoint: [" << node << "]:" << service;
Application::Log(s.str());
JsonRpcEndpoint::Ptr endpoint = make_shared<JsonRpcEndpoint>();
- endpoint->Connect(host, port, m_SSLContext);
RegisterEndpoint(endpoint);
+ endpoint->Connect(node, service, m_SSLContext);
}
void EndpointManager::RegisterServer(JsonRpcServer::Ptr server)
if (!request.GetMethod(&method))
throw InvalidArgumentException("Missing 'method' parameter.");
- if (recipient->IsMethodSink(method))
+ if (recipient->IsMethodSink(method)) {
+ Application::Log(sender->GetAddress() + " -> " + recipient->GetAddress() + ": " + method);
recipient->ProcessRequest(sender, request);
+ }
}
void EndpointManager::SendAnycastRequest(Endpoint::Ptr sender, const JsonRpcRequest& request, bool fromLocal)
callback(neea);
}
}
+
+bool EndpointManager::HasConnectedEndpoint(string identity) const
+{
+ list<Endpoint::Ptr>::const_iterator i;
+ for (i = m_Endpoints.begin(); i != m_Endpoints.end(); i++) {
+ if ((*i)->GetIdentity() == identity)
+ return true;
+ }
+
+ return false;
+}
void SetSSLContext(shared_ptr<SSL_CTX> sslContext);
shared_ptr<SSL_CTX> GetSSLContext(void) const;
- void AddListener(unsigned short port);
- void AddConnection(string host, unsigned short port);
+ void AddListener(string service);
+ void AddConnection(string node, string service);
void RegisterEndpoint(Endpoint::Ptr endpoint);
void UnregisterEndpoint(Endpoint::Ptr endpoint);
void ForeachEndpoint(function<int (const NewEndpointEventArgs&)> callback);
+ bool HasConnectedEndpoint(string identity) const;
+
Event<NewEndpointEventArgs> OnNewEndpoint;
};
if (object->GetReplicated())
return 0;
- long portValue;
- if (!object->GetPropertyInteger("port", &portValue))
- throw InvalidArgumentException("Parameter 'port' is required for 'rpclistener' objects.");
-
- if (portValue < 0 || portValue > USHRT_MAX)
- throw InvalidArgumentException("Parameter 'port' contains an invalid value.");
-
- unsigned short port = (unsigned short)portValue;
+ string service;
+ if (!object->GetPropertyString("service", &service))
+ throw InvalidArgumentException("Parameter 'service' is required for 'rpclistener' objects.");
- GetEndpointManager()->AddListener(port);
+ GetEndpointManager()->AddListener(service);
return 0;
}
int IcingaApplication::NewRpcConnectionHandler(const EventArgs& ea)
{
ConfigObject::Ptr object = static_pointer_cast<ConfigObject>(ea.Source);
- string hostname;
- long portValue;
- unsigned short port;
/* don't allow replicated config objects */
if (object->GetReplicated())
return 0;
- if (!object->GetPropertyString("hostname", &hostname))
- throw InvalidArgumentException("Parameter 'hostname' is required for 'rpcconnection' objects.");
-
- if (!object->GetPropertyInteger("port", &portValue))
- throw InvalidArgumentException("Parameter 'port' is required for 'rpcconnection' objects.");
-
- if (portValue < 0 || portValue > USHRT_MAX)
- throw InvalidArgumentException("Parameter 'port' contains an invalid value.");
+ string node;
+ if (!object->GetPropertyString("node", &node))
+ throw InvalidArgumentException("Parameter 'node' is required for 'rpcconnection' objects.");
- port = (unsigned short)portValue;
+ string service;
+ if (!object->GetPropertyString("service", &service))
+ throw InvalidArgumentException("Parameter 'service' is required for 'rpcconnection' objects.");
- GetEndpointManager()->AddConnection(hostname, port);
+ GetEndpointManager()->AddConnection(node, service);
return 0;
}
return m_Client;
}
-void JsonRpcEndpoint::Connect(string host, unsigned short port, shared_ptr<SSL_CTX> sslContext)
+void JsonRpcEndpoint::Connect(string node, string service, shared_ptr<SSL_CTX> sslContext)
{
- m_PeerHostname = host;
- m_PeerPort = port;
-
JsonRpcClient::Ptr client = make_shared<JsonRpcClient>(RoleOutbound, sslContext);
- client->Connect(host, port);
- client->Start();
-
SetClient(client);
+ client->Connect(node, service);
+ client->Start();
}
void JsonRpcEndpoint::SetClient(JsonRpcClient::Ptr client)
m_PendingCalls.clear();
- if (m_PeerHostname != string()) {
- Timer::Ptr timer = make_shared<Timer>();
- timer->SetInterval(30);
- timer->SetUserArgs(ea);
- timer->OnTimerExpired += bind_weak(&JsonRpcEndpoint::ClientReconnectHandler, shared_from_this());
- timer->Start();
- m_ReconnectTimer = timer;
-
- Application::Log("Spawned reconnect timer (30 seconds)");
- }
-
// TODO: _only_ clear non-persistent method sources/sinks
// unregister ourselves if no persistent sources/sinks are left (use a timer for that, once we have a TTL property for the methods)
ClearMethodSinks();
ClearMethodSources();
- if (CountMethodSinks() == 0 && !m_ReconnectTimer)
+ if (CountMethodSinks() == 0)
GetEndpointManager()->UnregisterEndpoint(static_pointer_cast<Endpoint>(shared_from_this()));
m_Client.reset();
return 0;
}
-int JsonRpcEndpoint::ClientReconnectHandler(const TimerEventArgs& ea)
-{
- JsonRpcClient::Ptr client = static_pointer_cast<JsonRpcClient>(ea.UserArgs.Source);
- Timer::Ptr timer = static_pointer_cast<Timer>(ea.Source);
-
- GetEndpointManager()->AddConnection(m_PeerHostname, m_PeerPort);
-
- timer->Stop();
- m_ReconnectTimer.reset();
-
- return 0;
-}
-
int JsonRpcEndpoint::VerifyCertificateHandler(const VerifyCertificateEventArgs& ea)
{
if (ea.Certificate && ea.ValidCertificate) {
string m_Address;
JsonRpcClient::Ptr m_Client;
map<string, Endpoint::Ptr> m_PendingCalls;
- Timer::Ptr m_ReconnectTimer;
-
- string m_PeerHostname;
- unsigned short m_PeerPort;
int NewMessageHandler(const NewMessageEventArgs& nmea);
int ClientClosedHandler(const EventArgs& ea);
int ClientErrorHandler(const SocketErrorEventArgs& ea);
- int ClientReconnectHandler(const TimerEventArgs& ea);
int VerifyCertificateHandler(const VerifyCertificateEventArgs& ea);
public:
typedef shared_ptr<JsonRpcEndpoint> Ptr;
typedef weak_ptr<JsonRpcEndpoint> WeakPtr;
- void Connect(string host, unsigned short port,
+ void Connect(string node, string service,
shared_ptr<SSL_CTX> sslContext);
JsonRpcClient::Ptr GetClient(void);