m_ConfigRpcEndpoint->RegisterMethodSource("config::PropertyChanged");
}
+ m_ConfigRpcEndpoint->RegisterMethodHandler("message::Welcome",
+ bind_weak(&ConfigRpcComponent::WelcomeMessageHandler, shared_from_this()));
+
m_ConfigRpcEndpoint->RegisterMethodHandler("config::ObjectCreated",
bind_weak(&ConfigRpcComponent::RemoteObjectUpdatedHandler, shared_from_this()));
m_ConfigRpcEndpoint->RegisterMethodHandler("config::ObjectRemoved",
bind_weak(&ConfigRpcComponent::RemoteObjectUpdatedHandler, shared_from_this()));
endpointManager->RegisterEndpoint(m_ConfigRpcEndpoint);
-
- endpointManager->OnNewEndpoint += bind_weak(&ConfigRpcComponent::NewEndpointHandler, shared_from_this());
- endpointManager->ForeachEndpoint(bind(&ConfigRpcComponent::NewEndpointHandler, this, _1));
}
void ConfigRpcComponent::Stop(void)
// TODO: implement
}
-int ConfigRpcComponent::NewEndpointHandler(const NewEndpointEventArgs& ea)
-{
- ea.Endpoint->OnSessionEstablished += bind_weak(&ConfigRpcComponent::SessionEstablishedHandler, shared_from_this());
-
- return 0;
-}
-
-int ConfigRpcComponent::SessionEstablishedHandler(const EventArgs& ea)
+int ConfigRpcComponent::WelcomeMessageHandler(const NewRequestEventArgs& ea)
{
JsonRpcRequest request;
request.SetMethod("config::FetchObjects");
- Endpoint::Ptr endpoint = static_pointer_cast<Endpoint>(ea.Source);
- GetEndpointManager()->SendUnicastRequest(m_ConfigRpcEndpoint, endpoint, request);
+ GetEndpointManager()->SendUnicastRequest(m_ConfigRpcEndpoint, ea.Sender, request);
return 0;
}
private:
VirtualEndpoint::Ptr m_ConfigRpcEndpoint;
- int NewEndpointHandler(const NewEndpointEventArgs& ea);
- int SessionEstablishedHandler(const EventArgs& ea);
+ int WelcomeMessageHandler(const NewRequestEventArgs& ea);
int LocalObjectCreatedHandler(const EventArgs& ea);
int LocalObjectRemovedHandler(const EventArgs& ea);
"demo": { "replicate": "0" }
},
"rpcconnection": {
- "kekslistener": { "replicate": "0", "hostname": "10.0.10.14", "port": "7777" }
+ "kekslistener": { "replicate": "0", "hostname": "127.0.0.1", "port": "7777" }
+ },
+ "rpclistener": {
+ "kekslistener": { "replicate": "0", "port": "7777" }
},
"host": {
"localhost": { "ipaddr": "127.0.0.1" }
void DiscoveryComponent::Start(void)
{
m_DiscoveryEndpoint = make_shared<VirtualEndpoint>();
+ m_DiscoveryEndpoint->RegisterMethodHandler("message::Welcome",
+ bind_weak(&DiscoveryComponent::GetPeersMessageHandler, shared_from_this()));
+
m_DiscoveryEndpoint->RegisterMethodSource("discovery::PeerAvailable");
m_DiscoveryEndpoint->RegisterMethodHandler("discovery::GetPeers",
bind_weak(&DiscoveryComponent::GetPeersMessageHandler, shared_from_this()));
mgr->UnregisterEndpoint(m_DiscoveryEndpoint);
}
-int DiscoveryComponent::NewEndpointHandler(const NewEndpointEventArgs& neea)
-{
- neea.Endpoint->OnSessionEstablished += bind_weak(&DiscoveryComponent::SessionEstablishedHandler, shared_from_this());
-
- /* TODO: register handler for new sink/source */
-
- return 0;
-}
-
-int DiscoveryComponent::SessionEstablishedHandler(const EventArgs& neea)
+int DiscoveryComponent::WelcomeMessageHandler(const NewRequestEventArgs& neea)
{
JsonRpcRequest request;
request.SetMethod("discovery::GetPeers");
-
- Endpoint::Ptr endpoint = static_pointer_cast<Endpoint>(neea.Source);
- GetEndpointManager()->SendUnicastRequest(m_DiscoveryEndpoint, endpoint, request);
+ GetEndpointManager()->SendUnicastRequest(m_DiscoveryEndpoint, neea.Sender, request);
/* TODO: send information about this client to all other clients */
IcingaApplication::Ptr GetIcingaApplication(void) const;
- int NewEndpointHandler(const NewEndpointEventArgs& neea);
- int SessionEstablishedHandler(const EventArgs& neea);
+ int WelcomeMessageHandler(const NewRequestEventArgs& neea);
int GetPeersMessageHandler(const NewRequestEventArgs& nrea);
public:
EventArgs ea;
ea.Source = shared_from_this();
OnIdentityChanged(ea);
-
- OnSessionEstablished(ea);
}
bool Endpoint::HasIdentity(void) const
int CountMethodSources(void) const;
Event<EventArgs> OnIdentityChanged;
- Event<EventArgs> OnSessionEstablished;
};
}
m_SubscriptionEndpoint = make_shared<VirtualEndpoint>();
m_SubscriptionEndpoint->RegisterMethodHandler("message::Subscribe", bind_weak(&SubscriptionComponent::SubscribeMessageHandler, shared_from_this()));
m_SubscriptionEndpoint->RegisterMethodHandler("message::Provide", bind_weak(&SubscriptionComponent::ProvideMessageHandler, shared_from_this()));
+ m_SubscriptionEndpoint->RegisterMethodSource("message::Welcome");
m_SubscriptionEndpoint->RegisterMethodSource("message::Subscribe");
m_SubscriptionEndpoint->RegisterMethodSource("message::Provide");
SubscriptionMessage subscriptionMessage;
subscriptionMessage.SetMethod(nmea.Method);
request.SetParams(subscriptionMessage);
-
GetEndpointManager()->SendUnicastRequest(m_SubscriptionEndpoint, target, request);
return 0;
neea.Endpoint->AddAllowedMethodSinkPrefix("message::");
neea.Endpoint->AddAllowedMethodSourcePrefix("message::");
+ /* we just assume the peer wants those messages */
+ neea.Endpoint->RegisterMethodSink("message::Welcome");
neea.Endpoint->RegisterMethodSink("message::Subscribe");
neea.Endpoint->RegisterMethodSink("message::Provide");
GetEndpointManager()->ForeachEndpoint(bind(&SubscriptionComponent::SyncSubscriptions, this, neea.Endpoint, _1));
+ /* signal the peer that we're done syncing subscriptions and are now
+ * ready to accept messages. */
+ JsonRpcRequest request;
+ request.SetMethod("message::Welcome");
+ GetEndpointManager()->SendUnicastRequest(m_SubscriptionEndpoint, neea.Endpoint, request);
+
return 0;
}