From: Gunnar Beutner Date: Mon, 7 May 2012 11:48:17 +0000 (+0200) Subject: Cleaned up JSON-RPC client code. X-Git-Tag: v0.0.1~562 X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=28bdbe1ffe3d3e7156313aac6197b3bdd13eaa16;p=icinga2 Cleaned up JSON-RPC client code. --- diff --git a/base/tcpclient.cpp b/base/tcpclient.cpp index 93e3229d9..f5f35df9c 100644 --- a/base/tcpclient.cpp +++ b/base/tcpclient.cpp @@ -23,14 +23,10 @@ void TCPClient::Start(void) OnWritable += bind_weak(&TCPClient::WritableEventHandler, shared_from_this()); } -void TCPClient::Connect(const string& hostname, unsigned short port) +void TCPClient::Connect(const string& node, const string& service) { m_Role = RoleOutbound; - stringstream s; - s << port; - string strPort = s.str(); - addrinfo hints; addrinfo *result; @@ -39,7 +35,7 @@ void TCPClient::Connect(const string& hostname, unsigned short port) hints.ai_socktype = SOCK_STREAM; hints.ai_protocol = IPPROTO_TCP; - int rc = getaddrinfo(hostname.c_str(), strPort.c_str(), &hints, &result); + int rc = getaddrinfo(node.c_str(), service.c_str(), &hints, &result); if (rc < 0) { HandleSocketError(); diff --git a/base/tcpclient.h b/base/tcpclient.h index d2b383251..2ad9eec30 100644 --- a/base/tcpclient.h +++ b/base/tcpclient.h @@ -31,7 +31,7 @@ public: virtual void Start(void); - void Connect(const string& hostname, unsigned short port); + void Connect(const string& node, const string& service); FIFO::Ptr GetSendQueue(void); FIFO::Ptr GetRecvQueue(void); diff --git a/base/tcpserver.cpp b/base/tcpserver.cpp index b35817034..332374eb9 100644 --- a/base/tcpserver.cpp +++ b/base/tcpserver.cpp @@ -29,7 +29,7 @@ void TCPServer::Listen(void) int rc = listen(GetFD(), SOMAXCONN); if (rc < 0) { - Close(); + HandleSocketError(); return; } } diff --git a/base/tcpsocket.cpp b/base/tcpsocket.cpp index 0ab19cc14..d242ab8e6 100644 --- a/base/tcpsocket.cpp +++ b/base/tcpsocket.cpp @@ -17,17 +17,13 @@ void TCPSocket::MakeSocket(int family) SetFD(fd); } -void TCPSocket::Bind(unsigned short port, int family) +void TCPSocket::Bind(string service, int family) { - Bind(NULL, port, family); + Bind(string(), service, family); } -void TCPSocket::Bind(const char *hostname, unsigned short port, int family) +void TCPSocket::Bind(string node, string service, int family) { - stringstream s; - s << port; - string strPort = s.str(); - addrinfo hints; addrinfo *result; @@ -37,7 +33,7 @@ void TCPSocket::Bind(const char *hostname, unsigned short port, int family) hints.ai_protocol = IPPROTO_TCP; hints.ai_flags = AI_PASSIVE; - if (getaddrinfo(hostname, strPort.c_str(), &hints, &result) < 0) { + if (getaddrinfo(node.c_str(), service.c_str(), &hints, &result) < 0) { HandleSocketError(); return; diff --git a/base/tcpsocket.h b/base/tcpsocket.h index 16ae14851..1313efbeb 100644 --- a/base/tcpsocket.h +++ b/base/tcpsocket.h @@ -13,8 +13,8 @@ public: typedef shared_ptr Ptr; typedef weak_ptr WeakPtr; - void Bind(unsigned short port, int family); - void Bind(const char *hostname, unsigned short port, int family); + void Bind(string service, int family); + void Bind(string node, string service, int family); }; } diff --git a/components/discovery/discoverycomponent.cpp b/components/discovery/discoverycomponent.cpp index d49ca05e9..ce88f2028 100644 --- a/components/discovery/discoverycomponent.cpp +++ b/components/discovery/discoverycomponent.cpp @@ -29,6 +29,11 @@ void DiscoveryComponent::Start(void) GetEndpointManager()->OnNewEndpoint += bind_weak(&DiscoveryComponent::NewEndpointHandler, shared_from_this()); GetEndpointManager()->RegisterEndpoint(m_DiscoveryEndpoint); + + m_DiscoveryConnectTimer = make_shared(); + m_DiscoveryConnectTimer->SetInterval(30); + m_DiscoveryConnectTimer->OnTimerExpired += bind_weak(&DiscoveryComponent::ReconnectTimerHandler, shared_from_this()); + m_DiscoveryConnectTimer->Start(); } void DiscoveryComponent::Stop(void) @@ -61,8 +66,15 @@ int DiscoveryComponent::NewEndpointHandler(const NewEndpointEventArgs& neea) { neea.Endpoint->OnIdentityChanged += bind_weak(&DiscoveryComponent::NewIdentityHandler, shared_from_this()); - /* accept discovery::RegisterComponent messages from any endpoint */ - neea.Endpoint->RegisterMethodSource("discovery::RegisterComponent"); + if (IsBroker()) { + /* accept discovery::RegisterComponent messages from any endpoint */ + neea.Endpoint->RegisterMethodSource("discovery::RegisterComponent"); + } + + /* TODO: implement message broker authorisation */ + neea.Endpoint->RegisterMethodSource("discovery::NewComponent"); + + /* TODO: register handler to unregister this endpoint when it's closed */ return 0; } @@ -120,16 +132,18 @@ int DiscoveryComponent::NewIdentityHandler(const EventArgs& ea) Endpoint::Ptr endpoint = static_pointer_cast(ea.Source); string identity = endpoint->GetIdentity(); - if (identity == GetEndpointManager()->GetIdentity()) { - Application::Log("Detected loop-back connection - Disconnecting endpoint."); + if (!GetIcingaApplication()->IsDebugging()) { + if (identity == GetEndpointManager()->GetIdentity()) { + Application::Log("Detected loop-back connection - Disconnecting endpoint."); - endpoint->Stop(); - GetEndpointManager()->UnregisterEndpoint(endpoint); + endpoint->Stop(); + GetEndpointManager()->UnregisterEndpoint(endpoint); - return 0; - } + return 0; + } - GetEndpointManager()->ForeachEndpoint(bind(&DiscoveryComponent::CheckExistingEndpoint, this, endpoint, _1)); + GetEndpointManager()->ForeachEndpoint(bind(&DiscoveryComponent::CheckExistingEndpoint, this, endpoint, _1)); + } // we assume the other component _always_ wants // discovery::RegisterComponent messages from us @@ -194,8 +208,8 @@ void DiscoveryComponent::SendDiscoveryMessage(string method, string identity, En return; if (!info->Node.empty() && !info->Service.empty()) { - params.SetPropertyString("node", info->Node); - params.SetPropertyString("service", info->Service); + params.SetNode(info->Node); + params.SetService(info->Service); } set::iterator i; @@ -214,74 +228,73 @@ void DiscoveryComponent::SendDiscoveryMessage(string method, string identity, En void DiscoveryComponent::ProcessDiscoveryMessage(string identity, DiscoveryMessage message) { ComponentDiscoveryInfo::Ptr info = make_shared(); -} - -int DiscoveryComponent::NewComponentMessageHandler(const NewRequestEventArgs& nrea) -{ - /*Message message; - nrea.Request.GetParams(&message); - ProcessDiscoveryMessage(message.GetPropertyString(, DiscoveryMessage(message));*/ - return 0; -} - -int DiscoveryComponent::RegisterComponentMessageHandler(const NewRequestEventArgs& nrea) -{ - Message message; - nrea.Request.GetParams(&message); - ProcessDiscoveryMessage(nrea.Sender->GetIdentity(), DiscoveryMessage(message)); - return 0; -} -void DiscoveryComponent::AddSubscribedMethod(string identity, string method) -{ - ComponentDiscoveryInfo::Ptr info; + message.GetNode(&info->Node); + message.GetService(&info->Service); - if (!GetComponentDiscoveryInfo(identity, &info)) - return; + Message provides; + if (message.GetProvides(&provides)) { + DictionaryIterator i; + for (i = provides.GetDictionary()->Begin(); i != provides.GetDictionary()->End(); i++) { + info->PublishedMethods.insert(i->second); + } + } - info->SubscribedMethods.insert(method); -} + Message subscribes; + if (message.GetSubscribes(&subscribes)) { + DictionaryIterator i; + for (i = subscribes.GetDictionary()->Begin(); i != subscribes.GetDictionary()->End(); i++) { + info->SubscribedMethods.insert(i->second); + } + } -bool DiscoveryComponent::IsSubscribedMethod(string identity, string method) const -{ - if (GetEndpointManager()->GetIdentity() == identity) - return true; + map::iterator i; - ComponentDiscoveryInfo::Ptr info; + i = m_Components.find(identity); - if (!GetComponentDiscoveryInfo(identity, &info)) - return false; + if (i != m_Components.end()) + m_Components.erase(i); - set::const_iterator i; - i = info->SubscribedMethods.find(method); + m_Components[identity] = info; - return (i != info->SubscribedMethods.end()); + SendDiscoveryMessage("discovery::NewComponent", identity, Endpoint::Ptr()); } -void DiscoveryComponent::AddPublishedMethod(string identity, string method) +int DiscoveryComponent::NewComponentMessageHandler(const NewRequestEventArgs& nrea) { - ComponentDiscoveryInfo::Ptr info; + DiscoveryMessage message; + nrea.Request.GetParams(&message); - if (!GetComponentDiscoveryInfo(identity, &info)) - return; + string identity; + if (!message.GetIdentity(&identity)) + return 0; - info->PublishedMethods.insert(method); + ProcessDiscoveryMessage(identity, message); + return 0; } -bool DiscoveryComponent::IsPublishedMethod(string identity, string method) const +int DiscoveryComponent::RegisterComponentMessageHandler(const NewRequestEventArgs& nrea) { - if (GetEndpointManager()->GetIdentity() == identity) - return true; + DiscoveryMessage message; + nrea.Request.GetParams(&message); + ProcessDiscoveryMessage(nrea.Sender->GetIdentity(), message); + return 0; +} - ComponentDiscoveryInfo::Ptr info; +int DiscoveryComponent::ReconnectTimerHandler(const TimerEventArgs& tea) +{ + EndpointManager::Ptr endpointManager = GetEndpointManager(); - if (!GetComponentDiscoveryInfo(identity, &info)) - return false; + map::iterator i; + for (i = m_Components.begin(); i != m_Components.end(); i++) { + if (endpointManager->HasConnectedEndpoint(i->first)) + continue; - set::const_iterator i; - i = info->PublishedMethods.find(method); + ComponentDiscoveryInfo::Ptr info = i->second; + endpointManager->AddConnection(info->Node, info->Service); + } - return (i != info->PublishedMethods.end()); + return 0; } EXPORT_COMPONENT(DiscoveryComponent); diff --git a/components/discovery/discoverycomponent.h b/components/discovery/discoverycomponent.h index 52e450560..2a20c863b 100644 --- a/components/discovery/discoverycomponent.h +++ b/components/discovery/discoverycomponent.h @@ -22,8 +22,8 @@ class DiscoveryComponent : public IcingaComponent private: VirtualEndpoint::Ptr m_DiscoveryEndpoint; map m_Components; - bool m_Broker; + Timer::Ptr m_DiscoveryConnectTimer; int NewEndpointHandler(const NewEndpointEventArgs& neea); int NewIdentityHandler(const EventArgs& ea); @@ -41,18 +41,14 @@ private: int DiscoverySinkHandler(const NewMethodEventArgs& nmea, ComponentDiscoveryInfo::Ptr info) const; int DiscoverySourceHandler(const NewMethodEventArgs& nmea, ComponentDiscoveryInfo::Ptr info) const; + int ReconnectTimerHandler(const TimerEventArgs& tea); + bool IsBroker(void) const; public: virtual string GetName(void) const; virtual void Start(void); virtual void Stop(void); - - void AddSubscribedMethod(string identity, string method); - bool IsSubscribedMethod(string identity, string method) const; - - void AddPublishedMethod(string identity, string method); - bool IsPublishedMethod(string identity, string method) const; }; } diff --git a/components/discovery/discoverymessage.h b/components/discovery/discoverymessage.h index 0d7d75d43..ad88e3b93 100644 --- a/components/discovery/discoverymessage.h +++ b/components/discovery/discoverymessage.h @@ -21,6 +21,26 @@ public: SetPropertyString("identity", value); } + inline bool GetNode(string *value) const + { + return GetPropertyString("node", value); + } + + inline void SetNode(const string& value) + { + SetPropertyString("node", value); + } + + inline bool GetService(string *value) const + { + return GetPropertyString("service", value); + } + + inline void SetService(const string& value) + { + SetPropertyString("service", value); + } + inline bool GetSubscribes(Message *value) const { return GetPropertyMessage("subscribes", value); diff --git a/icinga-app/icinga.conf b/icinga-app/icinga1.conf similarity index 75% rename from icinga-app/icinga.conf rename to icinga-app/icinga1.conf index 506429da3..4af7a7192 100644 --- a/icinga-app/icinga.conf +++ b/icinga-app/icinga1.conf @@ -14,9 +14,6 @@ "discovery": { "replicate": "0", "broker": "1" } }, "rpclistener": { - "kekslistener": { "replicate": "0", "port": "7777" } - }, - "host": { - "localhost": { "ipaddr": "127.0.0.1" } + "kekslistener": { "replicate": "0", "service": "7777" } } } \ No newline at end of file diff --git a/icinga-app/icinga2.conf b/icinga-app/icinga2.conf new file mode 100644 index 000000000..031dbbd81 --- /dev/null +++ b/icinga-app/icinga2.conf @@ -0,0 +1,22 @@ +{ + "icinga": { + "icinga": { + "privkey": "icinga-c2.key", + "pubkey": "icinga-c2.crt", + "cakey": "ca.crt", + "node": "10.0.10.3", + "service": "8888" + } + }, + "component": { + "configrpc": { "replicate": "0", "configSource": "1" }, + "demo": { "replicate": "0" }, + "discovery": { "replicate": "0", "broker": "0" } + }, + "rpclistener": { + "kekslistener": { "replicate": "0", "service": "8888" } + }, + "rpcconnection": { + "keksclient": { "replicate": "0", "node": "127.0.0.1", "service": "7777" } + } +} \ No newline at end of file diff --git a/icinga/endpointmanager.cpp b/icinga/endpointmanager.cpp index 3f7399885..19e8d9dc1 100644 --- a/icinga/endpointmanager.cpp +++ b/icinga/endpointmanager.cpp @@ -22,29 +22,29 @@ shared_ptr EndpointManager::GetSSLContext(void) const return m_SSLContext; } -void EndpointManager::AddListener(unsigned short port) +void EndpointManager::AddListener(string service) { stringstream s; - s << "Adding new listener: port " << port; + s << "Adding new listener: port " << service; Application::Log(s.str()); JsonRpcServer::Ptr server = make_shared(m_SSLContext); RegisterServer(server); - server->Bind(port, AF_INET6); + server->Bind(service, AF_INET6); server->Listen(); server->Start(); } -void EndpointManager::AddConnection(string host, unsigned short port) +void EndpointManager::AddConnection(string node, string service) { stringstream s; - s << "Adding new endpoint: [" << host << "]:" << port; + s << "Adding new endpoint: [" << node << "]:" << service; Application::Log(s.str()); JsonRpcEndpoint::Ptr endpoint = make_shared(); - endpoint->Connect(host, port, m_SSLContext); RegisterEndpoint(endpoint); + endpoint->Connect(node, service, m_SSLContext); } void EndpointManager::RegisterServer(JsonRpcServer::Ptr server) @@ -103,8 +103,10 @@ void EndpointManager::SendUnicastRequest(Endpoint::Ptr sender, Endpoint::Ptr rec if (!request.GetMethod(&method)) throw InvalidArgumentException("Missing 'method' parameter."); - if (recipient->IsMethodSink(method)) + if (recipient->IsMethodSink(method)) { + Application::Log(sender->GetAddress() + " -> " + recipient->GetAddress() + ": " + method); recipient->ProcessRequest(sender, request); + } } void EndpointManager::SendAnycastRequest(Endpoint::Ptr sender, const JsonRpcRequest& request, bool fromLocal) @@ -144,3 +146,14 @@ void EndpointManager::ForeachEndpoint(function::const_iterator i; + for (i = m_Endpoints.begin(); i != m_Endpoints.end(); i++) { + if ((*i)->GetIdentity() == identity) + return true; + } + + return false; +} diff --git a/icinga/endpointmanager.h b/icinga/endpointmanager.h index 510189185..2061e36c2 100644 --- a/icinga/endpointmanager.h +++ b/icinga/endpointmanager.h @@ -32,8 +32,8 @@ public: void SetSSLContext(shared_ptr sslContext); shared_ptr GetSSLContext(void) const; - void AddListener(unsigned short port); - void AddConnection(string host, unsigned short port); + void AddListener(string service); + void AddConnection(string node, string service); void RegisterEndpoint(Endpoint::Ptr endpoint); void UnregisterEndpoint(Endpoint::Ptr endpoint); @@ -44,6 +44,8 @@ public: void ForeachEndpoint(function callback); + bool HasConnectedEndpoint(string identity) const; + Event OnNewEndpoint; }; diff --git a/icinga/icingaapplication.cpp b/icinga/icingaapplication.cpp index 2134e6e69..09fe759dd 100644 --- a/icinga/icingaapplication.cpp +++ b/icinga/icingaapplication.cpp @@ -172,16 +172,11 @@ int IcingaApplication::NewRpcListenerHandler(const EventArgs& ea) if (object->GetReplicated()) return 0; - long portValue; - if (!object->GetPropertyInteger("port", &portValue)) - throw InvalidArgumentException("Parameter 'port' is required for 'rpclistener' objects."); - - if (portValue < 0 || portValue > USHRT_MAX) - throw InvalidArgumentException("Parameter 'port' contains an invalid value."); - - unsigned short port = (unsigned short)portValue; + string service; + if (!object->GetPropertyString("service", &service)) + throw InvalidArgumentException("Parameter 'service' is required for 'rpclistener' objects."); - GetEndpointManager()->AddListener(port); + GetEndpointManager()->AddListener(service); return 0; } @@ -196,26 +191,20 @@ int IcingaApplication::DeletedRpcListenerHandler(const EventArgs& ea) int IcingaApplication::NewRpcConnectionHandler(const EventArgs& ea) { ConfigObject::Ptr object = static_pointer_cast(ea.Source); - string hostname; - long portValue; - unsigned short port; /* don't allow replicated config objects */ if (object->GetReplicated()) return 0; - if (!object->GetPropertyString("hostname", &hostname)) - throw InvalidArgumentException("Parameter 'hostname' is required for 'rpcconnection' objects."); - - if (!object->GetPropertyInteger("port", &portValue)) - throw InvalidArgumentException("Parameter 'port' is required for 'rpcconnection' objects."); - - if (portValue < 0 || portValue > USHRT_MAX) - throw InvalidArgumentException("Parameter 'port' contains an invalid value."); + string node; + if (!object->GetPropertyString("node", &node)) + throw InvalidArgumentException("Parameter 'node' is required for 'rpcconnection' objects."); - port = (unsigned short)portValue; + string service; + if (!object->GetPropertyString("service", &service)) + throw InvalidArgumentException("Parameter 'service' is required for 'rpcconnection' objects."); - GetEndpointManager()->AddConnection(hostname, port); + GetEndpointManager()->AddConnection(node, service); return 0; } diff --git a/icinga/jsonrpcendpoint.cpp b/icinga/jsonrpcendpoint.cpp index efaf685c9..e5b093e7c 100644 --- a/icinga/jsonrpcendpoint.cpp +++ b/icinga/jsonrpcendpoint.cpp @@ -15,16 +15,12 @@ JsonRpcClient::Ptr JsonRpcEndpoint::GetClient(void) return m_Client; } -void JsonRpcEndpoint::Connect(string host, unsigned short port, shared_ptr sslContext) +void JsonRpcEndpoint::Connect(string node, string service, shared_ptr sslContext) { - m_PeerHostname = host; - m_PeerPort = port; - JsonRpcClient::Ptr client = make_shared(RoleOutbound, sslContext); - client->Connect(host, port); - client->Start(); - SetClient(client); + client->Connect(node, service); + client->Start(); } void JsonRpcEndpoint::SetClient(JsonRpcClient::Ptr client) @@ -99,23 +95,12 @@ int JsonRpcEndpoint::ClientClosedHandler(const EventArgs& ea) m_PendingCalls.clear(); - if (m_PeerHostname != string()) { - Timer::Ptr timer = make_shared(); - timer->SetInterval(30); - timer->SetUserArgs(ea); - timer->OnTimerExpired += bind_weak(&JsonRpcEndpoint::ClientReconnectHandler, shared_from_this()); - timer->Start(); - m_ReconnectTimer = timer; - - Application::Log("Spawned reconnect timer (30 seconds)"); - } - // TODO: _only_ clear non-persistent method sources/sinks // unregister ourselves if no persistent sources/sinks are left (use a timer for that, once we have a TTL property for the methods) ClearMethodSinks(); ClearMethodSources(); - if (CountMethodSinks() == 0 && !m_ReconnectTimer) + if (CountMethodSinks() == 0) GetEndpointManager()->UnregisterEndpoint(static_pointer_cast(shared_from_this())); m_Client.reset(); @@ -132,19 +117,6 @@ int JsonRpcEndpoint::ClientErrorHandler(const SocketErrorEventArgs& ea) return 0; } -int JsonRpcEndpoint::ClientReconnectHandler(const TimerEventArgs& ea) -{ - JsonRpcClient::Ptr client = static_pointer_cast(ea.UserArgs.Source); - Timer::Ptr timer = static_pointer_cast(ea.Source); - - GetEndpointManager()->AddConnection(m_PeerHostname, m_PeerPort); - - timer->Stop(); - m_ReconnectTimer.reset(); - - return 0; -} - int JsonRpcEndpoint::VerifyCertificateHandler(const VerifyCertificateEventArgs& ea) { if (ea.Certificate && ea.ValidCertificate) { diff --git a/icinga/jsonrpcendpoint.h b/icinga/jsonrpcendpoint.h index dc81023be..96ce56dc6 100644 --- a/icinga/jsonrpcendpoint.h +++ b/icinga/jsonrpcendpoint.h @@ -11,22 +11,17 @@ private: string m_Address; JsonRpcClient::Ptr m_Client; map m_PendingCalls; - Timer::Ptr m_ReconnectTimer; - - string m_PeerHostname; - unsigned short m_PeerPort; int NewMessageHandler(const NewMessageEventArgs& nmea); int ClientClosedHandler(const EventArgs& ea); int ClientErrorHandler(const SocketErrorEventArgs& ea); - int ClientReconnectHandler(const TimerEventArgs& ea); int VerifyCertificateHandler(const VerifyCertificateEventArgs& ea); public: typedef shared_ptr Ptr; typedef weak_ptr WeakPtr; - void Connect(string host, unsigned short port, + void Connect(string node, string service, shared_ptr sslContext); JsonRpcClient::Ptr GetClient(void);