return buffer;
}
+
+shared_ptr<X509> Utility::GetX509Certificate(string pemfile)
+{
+ X509 *cert;
+ BIO *fpcert = BIO_new(BIO_s_file());
+
+ if (fpcert == NULL)
+ throw OpenSSLException("BIO_new failed", ERR_get_error());
+
+ if (BIO_read_filename(fpcert, pemfile.c_str()) < 0)
+ throw OpenSSLException("BIO_read_filename failed", ERR_get_error());
+
+ cert = PEM_read_bio_X509_AUX(fpcert, NULL, NULL, NULL);
+ if (cert == NULL)
+ throw OpenSSLException("PEM_read_bio_X509_AUX failed", ERR_get_error());
+
+ BIO_free(fpcert);
+
+ return shared_ptr<X509>(cert, X509_free);
+}
static shared_ptr<SSL_CTX> MakeSSLContext(string pubkey, string privkey, string cakey);
static string GetCertificateCN(const shared_ptr<X509>& certificate);
+ static shared_ptr<X509> GetX509Certificate(string pemfile);
};
}
"demo": { "replicate": "0" }
},
"rpcconnection": {
- "kekslistener": { "replicate": "0", "hostname": "127.0.0.1", "port": "7777" }
+ "kekslistener": { "replicate": "0", "hostname": "::1", "port": "7777" }
},
"rpclistener": {
"kekslistener": { "replicate": "0", "port": "7777" }
{
m_DiscoveryEndpoint = make_shared<VirtualEndpoint>();
m_DiscoveryEndpoint->RegisterMethodHandler("message::Welcome",
- bind_weak(&DiscoveryComponent::GetPeersMessageHandler, shared_from_this()));
+ bind_weak(&DiscoveryComponent::WelcomeMessageHandler, shared_from_this()));
m_DiscoveryEndpoint->RegisterMethodSource("discovery::PeerAvailable");
m_DiscoveryEndpoint->RegisterMethodHandler("discovery::GetPeers",
mgr->UnregisterEndpoint(m_DiscoveryEndpoint);
}
+int DiscoveryComponent::CheckExistingEndpoint(Endpoint::Ptr endpoint, const NewEndpointEventArgs& neea)
+{
+ if (endpoint == neea.Endpoint)
+ return 0;
+
+ if (endpoint->GetIdentity() == neea.Endpoint->GetIdentity()) {
+ Application::Log("Detected duplicate identity (" + endpoint->GetIdentity() + " - Disconnecting endpoint.");
+
+ endpoint->Stop();
+ GetEndpointManager()->UnregisterEndpoint(endpoint);
+ }
+
+ return 0;
+}
+
int DiscoveryComponent::WelcomeMessageHandler(const NewRequestEventArgs& neea)
{
+ if (neea.Sender->GetIdentity() == GetEndpointManager()->GetIdentity()) {
+ Application::Log("Detected loop-back connection - Disconnecting endpoint.");
+
+ neea.Sender->Stop();
+ GetEndpointManager()->UnregisterEndpoint(neea.Sender);
+
+ return 0;
+ }
+
+ GetEndpointManager()->ForeachEndpoint(bind(&DiscoveryComponent::CheckExistingEndpoint, this, neea.Sender, _1));
+
JsonRpcRequest request;
request.SetMethod("discovery::GetPeers");
GetEndpointManager()->SendUnicastRequest(m_DiscoveryEndpoint, neea.Sender, request);
int WelcomeMessageHandler(const NewRequestEventArgs& neea);
int GetPeersMessageHandler(const NewRequestEventArgs& nrea);
+ int CheckExistingEndpoint(Endpoint::Ptr endpoint, const NewEndpointEventArgs& neea);
+
public:
virtual string GetName(void) const;
virtual void Start(void);
virtual void ProcessRequest(Endpoint::Ptr sender, const JsonRpcRequest& message) = 0;
virtual void ProcessResponse(Endpoint::Ptr sender, const JsonRpcResponse& message) = 0;
+ virtual void Stop(void) = 0;
+
Event<NewMethodEventArgs> OnNewMethodSink;
Event<NewMethodEventArgs> OnNewMethodSource;
using namespace icinga;
-EndpointManager::EndpointManager(shared_ptr<SSL_CTX> sslContext)
+EndpointManager::EndpointManager(string identity, shared_ptr<SSL_CTX> sslContext)
{
+ m_Identity = identity;
m_SSLContext = sslContext;
}
+string EndpointManager::GetIdentity(void) const
+{
+ return m_Identity;
+}
+
void EndpointManager::AddListener(unsigned short port)
{
stringstream s;
{
NewEndpointEventArgs neea;
neea.Source = shared_from_this();
- for (list<Endpoint::Ptr>::iterator i = m_Endpoints.begin(); i != m_Endpoints.end(); i++) {
- neea.Endpoint = *i;
+
+ list<Endpoint::Ptr>::iterator prev, i;
+ for (i = m_Endpoints.begin(); i != m_Endpoints.end(); ) {
+ prev = i;
+ i++;
+
+ neea.Endpoint = *prev;
callback(neea);
}
}
class I2_ICINGA_API EndpointManager : public Object
{
+ string m_Identity;
shared_ptr<SSL_CTX> m_SSLContext;
+
list<JsonRpcServer::Ptr> m_Servers;
list<Endpoint::Ptr> m_Endpoints;
typedef shared_ptr<EndpointManager> Ptr;
typedef weak_ptr<EndpointManager> WeakPtr;
- EndpointManager(shared_ptr<SSL_CTX> sslContext);
+ EndpointManager(string identity, shared_ptr<SSL_CTX> sslContext);
+
+ string GetIdentity(void) const;
void AddListener(unsigned short port);
void AddConnection(string host, unsigned short port);
return EXIT_FAILURE;
}
+ shared_ptr<X509> cert = Utility::GetX509Certificate("icinga-c1.crt");
+ string identity = Utility::GetCertificateCN(cert);
+
+ Application::Log("My identity: " + identity);
+
shared_ptr<SSL_CTX> sslContext = Utility::MakeSSLContext("icinga-c1.crt", "icinga-c1.key", "ca.crt");
- m_EndpointManager = make_shared<EndpointManager>(sslContext);
+ m_EndpointManager = make_shared<EndpointManager>(identity, sslContext);
string componentDirectory = GetExeDirectory() + "/../lib/icinga";
AddComponentSearchDir(componentDirectory);
return 0;
}
+
+void JsonRpcEndpoint::Stop(void)
+{
+ if (m_Client) {
+ m_Client->Close();
+ m_Client = JsonRpcClient::Ptr();
+ }
+}
virtual void ProcessRequest(Endpoint::Ptr sender, const JsonRpcRequest& message);
virtual void ProcessResponse(Endpoint::Ptr sender, const JsonRpcResponse& message);
+
+ virtual void Stop(void);
};
}
{
return true;
}
+
+void VirtualEndpoint::Stop(void)
+{
+ /* Nothing to do here. */
+}
virtual void ProcessRequest(Endpoint::Ptr sender, const JsonRpcRequest& message);
virtual void ProcessResponse(Endpoint::Ptr sender, const JsonRpcResponse& message);
+
+ virtual void Stop(void);
};
}