m_ConfigRpcEndpoint->RegisterMethodSource("config::PropertyChanged");
}
+ m_ConfigRpcEndpoint->RegisterMethodHandler("message::Welcome", bind_weak(&ConfigRpcComponent::WelcomeMessageHandler, shared_from_this()));
+
m_ConfigRpcEndpoint->RegisterMethodHandler("config::ObjectCreated", bind_weak(&ConfigRpcComponent::RemoteObjectUpdatedHandler, shared_from_this()));
m_ConfigRpcEndpoint->RegisterMethodHandler("config::ObjectRemoved", bind_weak(&ConfigRpcComponent::RemoteObjectRemovedHandler, shared_from_this()));
m_ConfigRpcEndpoint->RegisterMethodHandler("config::PropertyChanged", bind_weak(&ConfigRpcComponent::RemoteObjectUpdatedHandler, shared_from_this()));
endpointManager->RegisterEndpoint(m_ConfigRpcEndpoint);
+
+ endpointManager->OnNewEndpoint += bind_weak(&ConfigRpcComponent::NewEndpointHandler, shared_from_this());
+ endpointManager->ForeachEndpoint(bind(&ConfigRpcComponent::NewEndpointHandler, this, _1));
}
void ConfigRpcComponent::Stop(void)
// TODO: implement
}
+int ConfigRpcComponent::NewEndpointHandler(const NewEndpointEventArgs& ea)
+{
+ if (ea.Endpoint->HasIdentity()) {
+ JsonRpcRequest request;
+ request.SetVersion("2.0");
+ request.SetMethod("config::FetchObjects");
+ ea.Endpoint->ProcessRequest(m_ConfigRpcEndpoint, request);
+ }
+
+ return 0;
+}
+
+int ConfigRpcComponent::WelcomeMessageHandler(const NewRequestEventArgs& ea)
+{
+ NewEndpointEventArgs neea;
+ neea.Source = shared_from_this();
+ neea.Endpoint = ea.Sender;
+ NewEndpointHandler(neea);
+
+ return 0;
+}
+
JsonRpcRequest ConfigRpcComponent::MakeObjectMessage(const ConfigObject::Ptr& object, string method, bool includeProperties)
{
JsonRpcRequest msg;
IcingaApplication::Ptr GetIcingaApplication(void);
+ int NewEndpointHandler(const NewEndpointEventArgs& ea);
+ int WelcomeMessageHandler(const NewRequestEventArgs& ea);
+
int LocalObjectCreatedHandler(const ConfigObjectEventArgs& ea);
int LocalObjectRemovedHandler(const ConfigObjectEventArgs& ea);
int LocalPropertyChangedHandler(const ConfigObjectEventArgs& ea);
using namespace icinga;
+string Endpoint::GetIdentity(void) const
+{
+ return m_Identity;
+}
+
+void Endpoint::SetIdentity(string identity)
+{
+ m_Identity = identity;
+}
+
+bool Endpoint::HasIdentity(void) const
+{
+ return !m_Identity.empty();
+}
+
EndpointManager::Ptr Endpoint::GetEndpointManager(void) const
{
return m_EndpointManager.lock();
class I2_ICINGA_API Endpoint : public Object
{
private:
+ string m_Identity;
set<string> m_MethodSinks;
set<string> m_MethodSources;
typedef shared_ptr<Endpoint> Ptr;
typedef weak_ptr<Endpoint> WeakPtr;
+ string GetIdentity(void) const;
+ void SetIdentity(string identity);
+ bool HasIdentity(void) const;
+
shared_ptr<EndpointManager> GetEndpointManager(void) const;
void SetEndpointManager(weak_ptr<EndpointManager> manager);
#include "endpointmanager.h"
#include "icingaapplication.h"
#include "subscriptioncomponent.h"
+#include "subscriptionmessage.h"
+#include "identitymessage.h"
#endif /* I2ICINGA_H */
<ClCompile Include="endpoint.cpp" />
<ClCompile Include="endpointmanager.cpp" />
<ClCompile Include="icingaapplication.cpp" />
+ <ClCompile Include="identitymessage.cpp" />
<ClCompile Include="jsonrpcendpoint.cpp" />
<ClCompile Include="subscriptioncomponent.cpp" />
+ <ClCompile Include="subscriptionmessage.cpp" />
<ClCompile Include="virtualendpoint.cpp" />
</ItemGroup>
<ItemGroup>
<ClInclude Include="endpointmanager.h" />
<ClInclude Include="i2-icinga.h" />
<ClInclude Include="icingaapplication.h" />
+ <ClInclude Include="identitymessage.h" />
<ClInclude Include="jsonrpcendpoint.h" />
<ClInclude Include="subscriptioncomponent.h" />
+ <ClInclude Include="subscriptionmessage.h" />
<ClInclude Include="virtualendpoint.h" />
</ItemGroup>
<PropertyGroup Label="Globals">
--- /dev/null
+#include "i2-icinga.h"
+
+using namespace icinga;
--- /dev/null
+#ifndef IDENTITYMESSAGE_H
+#define IDENTITYMESSAGE_H
+
+namespace icinga
+{
+
+class I2_ICINGA_API IdentityMessage : public Message
+{
+
+public:
+ IdentityMessage(void) : Message() { }
+ IdentityMessage(const Message& message) : Message(message) { }
+
+ inline bool GetIdentity(string *value) const
+ {
+ return GetDictionary()->GetValueString("identity", value);
+ }
+
+ inline void SetIdentity(const string& value)
+ {
+ GetDictionary()->SetValueString("identity", value);
+ }
+};
+
+}
+
+#endif /* IDENTITYMESSAGE_H */
client->OnNewMessage += bind_weak(&JsonRpcEndpoint::NewMessageHandler, shared_from_this());
client->OnClosed += bind_weak(&JsonRpcEndpoint::ClientClosedHandler, shared_from_this());
client->OnError += bind_weak(&JsonRpcEndpoint::ClientErrorHandler, shared_from_this());
+
+ JsonRpcRequest request;
+ request.SetVersion("2.0");
+ request.SetMethod("message::SetIdentity");
+
+ IdentityMessage params;
+ params.SetIdentity("keks");
+ request.SetParams(params);
+
+ client->SendMessage(request);
}
bool JsonRpcEndpoint::IsLocal(void) const
string method;
if (message.GetDictionary()->GetValueString("method", &method)) {
JsonRpcRequest request = message;
- Message params;
- string method;
-
- if (request.GetMethod(&method) && request.GetParams(¶ms) &&
- (method == "message::Subscribe" || method == "message::Provide")) {
- string sub_method;
- if (params.GetDictionary()->GetValueString("method", &sub_method)) {
- if (method == "message::Subscribe")
- RegisterMethodSink(sub_method);
- else
- RegisterMethodSource(sub_method);
- }
-
- return 0;
- }
string id;
if (request.GetID(&id))
else
GetEndpointManager()->SendMulticastRequest(sender, request, false);
} else {
+ JsonRpcResponse response = message;
+
// TODO: deal with response messages
throw NotImplementedException();
}
m_SubscriptionEndpoint = make_shared<VirtualEndpoint>();
m_SubscriptionEndpoint->RegisterMethodHandler("message::Subscribe", bind_weak(&SubscriptionComponent::SubscribeMessageHandler, shared_from_this()));
m_SubscriptionEndpoint->RegisterMethodHandler("message::Provide", bind_weak(&SubscriptionComponent::ProvideMessageHandler, shared_from_this()));
+ m_SubscriptionEndpoint->RegisterMethodHandler("message::SetIdentity", bind_weak(&SubscriptionComponent::IdentityMessageHandler, shared_from_this()));
m_SubscriptionEndpoint->RegisterMethodSource("message::Subscribe");
m_SubscriptionEndpoint->RegisterMethodSource("message::Provide");
+ m_SubscriptionEndpoint->RegisterMethodSource("message::Welcome");
EndpointManager::Ptr mgr = GetIcingaApplication()->GetEndpointManager();
mgr->OnNewEndpoint += bind_weak(&SubscriptionComponent::NewEndpointHandler, shared_from_this());
if (!nrea.Request.GetParams(¶ms))
return 0;
+ SubscriptionMessage subscriptionMessage = params;
+
string method;
- if (!params.GetDictionary()->GetValueString("method", &method))
+ if (!subscriptionMessage.GetMethod(&method))
return 0;
nrea.Sender->RegisterMethodSink(method);
if (!nrea.Request.GetParams(¶ms))
return 0;
+ SubscriptionMessage subscriptionMessage = params;
+
string method;
- if (!params.GetDictionary()->GetValueString("method", &method))
+ if (!subscriptionMessage.GetMethod(&method))
return 0;
nrea.Sender->RegisterMethodSource(method);
return 0;
}
+
+int SubscriptionComponent::IdentityMessageHandler(const NewRequestEventArgs& nrea)
+{
+ Message params;
+ if (!nrea.Request.GetParams(¶ms))
+ return 0;
+
+ IdentityMessage identityMessage = params;
+
+ string identity;
+ if (!identityMessage.GetIdentity(&identity))
+ return 0;
+
+ nrea.Sender->SetIdentity(identity);
+
+ /* there's no authentication for now, just tell them it's ok to send messages */
+ JsonRpcRequest request;
+ request.SetVersion("2.0");
+ request.SetMethod("message::Welcome");
+ nrea.Sender->ProcessRequest(m_SubscriptionEndpoint, request);
+
+ return 0;
+}
int NewEndpointHandler(const NewEndpointEventArgs& neea);
int SubscribeMessageHandler(const NewRequestEventArgs& nrea);
int ProvideMessageHandler(const NewRequestEventArgs& nrea);
+ int IdentityMessageHandler(const NewRequestEventArgs& nrea);
int SyncSubscription(Endpoint::Ptr target, string type, const NewMethodEventArgs& nmea);
int SyncSubscriptions(Endpoint::Ptr target, const NewEndpointEventArgs& neea);
--- /dev/null
+#include "i2-icinga.h"
+
+using namespace icinga;
--- /dev/null
+#ifndef SUBSCRIPTIONMESSAGE_H
+#define SUBSCRIPTIONMESSAGE_H
+
+namespace icinga
+{
+
+class I2_ICINGA_API SubscriptionMessage : public Message
+{
+
+public:
+ SubscriptionMessage(void) : Message() { }
+ SubscriptionMessage(const Message& message) : Message(message) { }
+
+ inline bool GetMethod(string *value) const
+ {
+ return GetDictionary()->GetValueString("method", value);
+ }
+
+ inline void SetMethod(const string& value)
+ {
+ GetDictionary()->SetValueString("method", value);
+ }
+};
+
+}
+
+#endif /* SUBSCRIPTIONMESSAGE_H */