Updated documentation.
cout << "[" << timestamp << "]: " << message << endl;
}
-/**
- * Sets the application's command line arguments.
- *
- * @param arguments The arguments.
- */
-void Application::SetArguments(const vector<string>& arguments)
-{
- m_Arguments = arguments;
-}
-
-/**
- * Retrieves the application's command line arguments.
- *
- * @returns The arguments.
- */
-const vector<string>& Application::GetArguments(void) const
-{
- return m_Arguments;
-}
-
/**
* Retrieves the directory the application's binary is contained in.
*
sigaction(SIGPIPE, &sa, NULL);
#endif /* _WIN32 */
- vector<string> args;
-
+ m_Arguments.clear();
for (int i = 0; i < argc; i++)
- args.push_back(string(argv[i]));
-
- SetArguments(args);
+ m_Arguments.push_back(string(argv[i]));
if (IsDebugging()) {
- result = Main(args);
+ result = Main(m_Arguments);
Application::Instance.reset();
} else {
try {
- result = Main(args);
+ result = Main(m_Arguments);
} catch (const Exception& ex) {
Application::Instance.reset();
virtual int Main(const vector<string>& args) = 0;
- void SetArguments(const vector<string>& arguments);
- const vector<string>& GetArguments(void) const;
-
void Shutdown(void);
static void Log(string message);
long configSource;
if (GetConfig()->GetPropertyInteger("configSource", &configSource) && configSource != 0) {
- m_ConfigRpcEndpoint->RegisterMethodHandler("config::FetchObjects",
+ m_ConfigRpcEndpoint->RegisterTopicHandler("config::FetchObjects",
bind_weak(&ConfigRpcComponent::FetchObjectsHandler, shared_from_this()));
configHive->OnObjectCommitted += bind_weak(&ConfigRpcComponent::LocalObjectCommittedHandler, shared_from_this());
configHive->OnObjectRemoved += bind_weak(&ConfigRpcComponent::LocalObjectRemovedHandler, shared_from_this());
- m_ConfigRpcEndpoint->RegisterMethodSource("config::ObjectCommitted");
- m_ConfigRpcEndpoint->RegisterMethodSource("config::ObjectRemoved");
+ m_ConfigRpcEndpoint->RegisterPublication("config::ObjectCommitted");
+ m_ConfigRpcEndpoint->RegisterPublication("config::ObjectRemoved");
}
endpointManager->OnNewEndpoint += bind_weak(&ConfigRpcComponent::NewEndpointHandler, shared_from_this());
- m_ConfigRpcEndpoint->RegisterMethodSource("config::FetchObjects");
- m_ConfigRpcEndpoint->RegisterMethodHandler("config::ObjectCommitted",
+ m_ConfigRpcEndpoint->RegisterPublication("config::FetchObjects");
+ m_ConfigRpcEndpoint->RegisterTopicHandler("config::ObjectCommitted",
bind_weak(&ConfigRpcComponent::RemoteObjectCommittedHandler, shared_from_this()));
- m_ConfigRpcEndpoint->RegisterMethodHandler("config::ObjectRemoved",
+ m_ConfigRpcEndpoint->RegisterTopicHandler("config::ObjectRemoved",
bind_weak(&ConfigRpcComponent::RemoteObjectRemovedHandler, shared_from_this()));
endpointManager->RegisterEndpoint(m_ConfigRpcEndpoint);
int ConfigRpcComponent::SessionEstablishedHandler(const EventArgs& ea)
{
- JsonRpcRequest request;
+ RpcRequest request;
request.SetMethod("config::FetchObjects");
Endpoint::Ptr endpoint = static_pointer_cast<Endpoint>(ea.Source);
- GetEndpointManager()->SendUnicastRequest(m_ConfigRpcEndpoint, endpoint, request);
+ GetEndpointManager()->SendUnicastMessage(m_ConfigRpcEndpoint, endpoint, request);
return 0;
}
-JsonRpcRequest ConfigRpcComponent::MakeObjectMessage(const ConfigObject::Ptr& object, string method, bool includeProperties)
+RpcRequest ConfigRpcComponent::MakeObjectMessage(const ConfigObject::Ptr& object, string method, bool includeProperties)
{
- JsonRpcRequest msg;
+ RpcRequest msg;
msg.SetMethod(method);
Message params;
if (!ShouldReplicateObject(object))
continue;
- JsonRpcRequest request = MakeObjectMessage(object, "config::ObjectCreated", true);
+ RpcRequest request = MakeObjectMessage(object, "config::ObjectCreated", true);
- GetEndpointManager()->SendUnicastRequest(m_ConfigRpcEndpoint, client, request);
+ GetEndpointManager()->SendUnicastMessage(m_ConfigRpcEndpoint, client, request);
}
}
if (!ShouldReplicateObject(object))
return 0;
- GetEndpointManager()->SendMulticastRequest(m_ConfigRpcEndpoint,
+ GetEndpointManager()->SendMulticastMessage(m_ConfigRpcEndpoint,
MakeObjectMessage(object, "config::ObjectCreated", true));
return 0;
if (!ShouldReplicateObject(object))
return 0;
- GetEndpointManager()->SendMulticastRequest(m_ConfigRpcEndpoint,
+ GetEndpointManager()->SendMulticastMessage(m_ConfigRpcEndpoint,
MakeObjectMessage(object, "config::ObjectRemoved", false));
return 0;
int ConfigRpcComponent::RemoteObjectCommittedHandler(const NewRequestEventArgs& ea)
{
- JsonRpcRequest message = ea.Request;
+ RpcRequest message = ea.Request;
bool was_null = false;
Message params;
int ConfigRpcComponent::RemoteObjectRemovedHandler(const NewRequestEventArgs& ea)
{
- JsonRpcRequest message = ea.Request;
+ RpcRequest message = ea.Request;
Message params;
if (!message.GetParams(¶ms))
int RemoteObjectCommittedHandler(const NewRequestEventArgs& ea);
int RemoteObjectRemovedHandler(const NewRequestEventArgs& ea);
- static JsonRpcRequest MakeObjectMessage(const ConfigObject::Ptr& object, string method, bool includeProperties);
+ static RpcRequest MakeObjectMessage(const ConfigObject::Ptr& object, string method, bool includeProperties);
static bool ShouldReplicateObject(const ConfigObject::Ptr& object);
public:
void DemoComponent::Start(void)
{
m_DemoEndpoint = make_shared<VirtualEndpoint>();
- m_DemoEndpoint->RegisterMethodHandler("demo::HelloWorld",
+ m_DemoEndpoint->RegisterTopicHandler("demo::HelloWorld",
bind_weak(&DemoComponent::HelloWorldRequestHandler, shared_from_this()));
- m_DemoEndpoint->RegisterMethodSource("demo::HelloWorld");
+ m_DemoEndpoint->RegisterPublication("demo::HelloWorld");
EndpointManager::Ptr endpointManager = GetIcingaApplication()->GetEndpointManager();
endpointManager->RegisterEndpoint(m_DemoEndpoint);
{
Application::Log("Sending multicast 'hello world' message.");
- JsonRpcRequest request;
+ RpcRequest request;
request.SetMethod("demo::HelloWorld");
EndpointManager::Ptr endpointManager = GetIcingaApplication()->GetEndpointManager();
- endpointManager->SendMulticastRequest(m_DemoEndpoint, request);
+ endpointManager->SendMulticastMessage(m_DemoEndpoint, request);
return 0;
}
{
m_DiscoveryEndpoint = make_shared<VirtualEndpoint>();
- m_DiscoveryEndpoint->RegisterMethodSource("discovery::RegisterComponent");
- m_DiscoveryEndpoint->RegisterMethodHandler("discovery::RegisterComponent",
+ m_DiscoveryEndpoint->RegisterPublication("discovery::RegisterComponent");
+ m_DiscoveryEndpoint->RegisterTopicHandler("discovery::RegisterComponent",
bind_weak(&DiscoveryComponent::RegisterComponentMessageHandler, shared_from_this()));
- m_DiscoveryEndpoint->RegisterMethodSource("discovery::NewComponent");
- m_DiscoveryEndpoint->RegisterMethodHandler("discovery::NewComponent",
+ m_DiscoveryEndpoint->RegisterPublication("discovery::NewComponent");
+ m_DiscoveryEndpoint->RegisterTopicHandler("discovery::NewComponent",
bind_weak(&DiscoveryComponent::NewComponentMessageHandler, shared_from_this()));
- m_DiscoveryEndpoint->RegisterMethodHandler("discovery::Welcome",
+ m_DiscoveryEndpoint->RegisterTopicHandler("discovery::Welcome",
bind_weak(&DiscoveryComponent::WelcomeMessageHandler, shared_from_this()));
GetEndpointManager()->ForEachEndpoint(bind(&DiscoveryComponent::NewEndpointHandler, this, _1));
neea.Endpoint->OnIdentityChanged += bind_weak(&DiscoveryComponent::NewIdentityHandler, shared_from_this());
/* accept discovery::RegisterComponent messages from any endpoint */
- neea.Endpoint->RegisterMethodSource("discovery::RegisterComponent");
+ neea.Endpoint->RegisterPublication("discovery::RegisterComponent");
/* accept discovery::Welcome messages from any endpoint */
- neea.Endpoint->RegisterMethodSource("discovery::Welcome");
+ neea.Endpoint->RegisterPublication("discovery::Welcome");
return 0;
}
/**
- * Registers a new message sink for a component.
- *
- * @param nmea Event args for the new message sink.
- * @param info The component registration information.
- * @returns 0
- */
-int DiscoveryComponent::DiscoverySinkHandler(const NewMethodEventArgs& nmea, ComponentDiscoveryInfo::Ptr info) const
-{
- info->SubscribedMethods.insert(nmea.Method);
- return 0;
-}
-
-/**
- * Registers a new message source for a component.
- *
- * @param nmea Event args for the new message source.
- * @param info The component registration information.
- * @returns 0
- */
-int DiscoveryComponent::DiscoverySourceHandler(const NewMethodEventArgs& nmea, ComponentDiscoveryInfo::Ptr info) const
-{
- info->PublishedMethods.insert(nmea.Method);
- return 0;
-}
-
-/**
- * Registers message sinks/sources in the specified component information object.
+ * Registers message Subscriptions/sources in the specified component information object.
*
* @param neea Event arguments for the endpoint.
* @param info Component information object.
*/
int DiscoveryComponent::DiscoveryEndpointHandler(const NewEndpointEventArgs& neea, ComponentDiscoveryInfo::Ptr info) const
{
- neea.Endpoint->ForEachMethodSink(bind(&DiscoveryComponent::DiscoverySinkHandler, this, _1, info));
- neea.Endpoint->ForEachMethodSource(bind(&DiscoveryComponent::DiscoverySourceHandler, this, _1, info));
+ Endpoint::ConstTopicIterator i;
+
+ for (i = neea.Endpoint->BeginSubscriptions(); i != neea.Endpoint->EndSubscriptions(); i++) {
+ info->Subscriptions.insert(*i);
+ }
+
+ for (i = neea.Endpoint->BeginPublications(); i != neea.Endpoint->EndPublications(); i++) {
+ info->Publications.insert(*i);
+ }
+
return 0;
}
// we assume the other component _always_ wants
// discovery::RegisterComponent messages from us
- endpoint->RegisterMethodSink("discovery::RegisterComponent");
+ endpoint->RegisterSubscription("discovery::RegisterComponent");
// send a discovery::RegisterComponent message, if the
// other component is a broker this makes sure
// we assume the other component _always_ wants
// discovery::NewComponent messages from us
- endpoint->RegisterMethodSink("discovery::NewComponent");
+ endpoint->RegisterSubscription("discovery::NewComponent");
// send discovery::NewComponent message for ourselves
SendDiscoveryMessage("discovery::NewComponent", GetEndpointManager()->GetIdentity(), endpoint);
return 0;
}
- // register published/subscribed methods for this endpoint
+ // register published/subscribed topics for this endpoint
ComponentDiscoveryInfo::Ptr info = ic->second;
set<string>::iterator it;
- for (it = info->PublishedMethods.begin(); it != info->PublishedMethods.end(); it++)
- endpoint->RegisterMethodSource(*it);
+ for (it = info->Publications.begin(); it != info->Publications.end(); it++)
+ endpoint->RegisterPublication(*it);
- for (it = info->SubscribedMethods.begin(); it != info->SubscribedMethods.end(); it++)
- endpoint->RegisterMethodSink(*it);
+ for (it = info->Subscriptions.begin(); it != info->Subscriptions.end(); it++)
+ endpoint->RegisterSubscription(*it);
FinishDiscoverySetup(endpoint);
/**
* Finishes the welcome handshake for a new component
- * by registering message sinks/sources for the component
+ * by registering message Subscriptions/sources for the component
* and sending a welcome message if necessary.
*
* @param endpoint The endpoint to set up.
// we assume the other component _always_ wants
// discovery::Welcome messages from us
- endpoint->RegisterMethodSink("discovery::Welcome");
- JsonRpcRequest request;
+ endpoint->RegisterSubscription("discovery::Welcome");
+ RpcRequest request;
request.SetMethod("discovery::Welcome");
- GetEndpointManager()->SendUnicastRequest(m_DiscoveryEndpoint, endpoint, request);
+ GetEndpointManager()->SendUnicastMessage(m_DiscoveryEndpoint, endpoint, request);
endpoint->SetSentWelcome(true);
*/
void DiscoveryComponent::SendDiscoveryMessage(string method, string identity, Endpoint::Ptr recipient)
{
- JsonRpcRequest request;
+ RpcRequest request;
request.SetMethod(method);
DiscoveryMessage params;
params.SetIdentity(identity);
Message subscriptions;
- params.SetSubscribes(subscriptions);
+ params.SetSubscriptions(subscriptions);
Message publications;
- params.SetProvides(publications);
+ params.SetPublications(publications);
ComponentDiscoveryInfo::Ptr info;
}
set<string>::iterator i;
- for (i = info->PublishedMethods.begin(); i != info->PublishedMethods.end(); i++)
+ for (i = info->Publications.begin(); i != info->Publications.end(); i++)
publications.AddUnnamedPropertyString(*i);
- for (i = info->SubscribedMethods.begin(); i != info->SubscribedMethods.end(); i++)
+ for (i = info->Subscriptions.begin(); i != info->Subscriptions.end(); i++)
subscriptions.AddUnnamedPropertyString(*i);
if (recipient)
- GetEndpointManager()->SendUnicastRequest(m_DiscoveryEndpoint, recipient, request);
+ GetEndpointManager()->SendUnicastMessage(m_DiscoveryEndpoint, recipient, request);
else
- GetEndpointManager()->SendMulticastRequest(m_DiscoveryEndpoint, request);
+ GetEndpointManager()->SendMulticastMessage(m_DiscoveryEndpoint, request);
}
bool DiscoveryComponent::HasMessagePermission(Dictionary::Ptr roles, string messageType, string message)
Endpoint::Ptr endpoint = GetEndpointManager()->GetEndpointByIdentity(identity);
- Message provides;
- if (message.GetProvides(&provides)) {
+ Message publications;
+ if (message.GetPublications(&publications)) {
DictionaryIterator i;
- for (i = provides.GetDictionary()->Begin(); i != provides.GetDictionary()->End(); i++) {
- if (trusted || HasMessagePermission(roles, "publish", i->second)) {
- info->PublishedMethods.insert(i->second);
+ for (i = publications.GetDictionary()->Begin(); i != publications.GetDictionary()->End(); i++) {
+ if (trusted || HasMessagePermission(roles, "publications", i->second)) {
+ info->Publications.insert(i->second);
if (endpoint)
- endpoint->RegisterMethodSource(i->second);
+ endpoint->RegisterPublication(i->second);
}
}
}
- Message subscribes;
- if (message.GetSubscribes(&subscribes)) {
+ Message subscriptions;
+ if (message.GetSubscriptions(&subscriptions)) {
DictionaryIterator i;
- for (i = subscribes.GetDictionary()->Begin(); i != subscribes.GetDictionary()->End(); i++) {
- if (trusted || HasMessagePermission(roles, "subscribe", i->second)) {
- info->SubscribedMethods.insert(i->second);
+ for (i = subscriptions.GetDictionary()->Begin(); i != subscriptions.GetDictionary()->End(); i++) {
+ if (trusted || HasMessagePermission(roles, "subscriptions", i->second)) {
+ info->Subscriptions.insert(i->second);
if (endpoint)
- endpoint->RegisterMethodSink(i->second);
+ endpoint->RegisterSubscription(i->second);
}
}
}
string Node;
string Service;
- set<string> SubscribedMethods;
- set<string> PublishedMethods;
+ set<string> Subscriptions;
+ set<string> Publications;
time_t LastSeen;
};
int CheckExistingEndpoint(Endpoint::Ptr endpoint, const NewEndpointEventArgs& neea);
int DiscoveryEndpointHandler(const NewEndpointEventArgs& neea, ComponentDiscoveryInfo::Ptr info) const;
- int DiscoverySinkHandler(const NewMethodEventArgs& nmea, ComponentDiscoveryInfo::Ptr info) const;
- int DiscoverySourceHandler(const NewMethodEventArgs& nmea, ComponentDiscoveryInfo::Ptr info) const;
int DiscoveryTimerHandler(const TimerEventArgs& tea);
SetPropertyString("service", value);
}
- inline bool GetSubscribes(Message *value) const
+ inline bool GetSubscriptions(Message *value) const
{
- return GetPropertyMessage("subscribes", value);
+ return GetPropertyMessage("subscriptions", value);
}
- inline void SetSubscribes(Message value)
+ inline void SetSubscriptions(Message value)
{
- SetPropertyMessage("subscribes", value);
+ SetPropertyMessage("subscriptions", value);
}
- inline bool GetProvides(Message *value) const
+ inline bool GetPublications(Message *value) const
{
- return GetPropertyMessage("provides", value);
+ return GetPropertyMessage("publications", value);
}
- inline void SetProvides(Message value)
+ inline void SetPublications(Message value)
{
- SetPropertyMessage("provides", value);
+ SetPropertyMessage("publications", value);
}
};
},
"role": {
"broker": {
- "publish": [ "discovery::NewComponent" ]
+ "publications": [ "discovery::NewComponent" ]
},
"demo": {
- "publish": [ "demo::*" ],
- "subscribe": [ "demo::*" ]
+ "publications": [ "demo::*" ],
+ "subscriptions": [ "demo::*" ]
}
}
}
\ No newline at end of file
},
"role": {
"broker": {
- "publish": [ "discovery::NewComponent" ]
+ "publications": [ "discovery::NewComponent" ]
},
"demo": {
- "publish": [ "demo::*" ],
- "subscribe": [ "demo::*" ]
+ "publications": [ "demo::*" ],
+ "subscriptions": [ "demo::*" ]
}
}
}
\ No newline at end of file
},
"role": {
"broker": {
- "publish": [ "discovery::NewComponent" ]
+ "publications": [ "discovery::NewComponent" ]
},
"demo": {
- "publish": [ "demo::*" ],
- "subscribe": [ "demo::*" ]
+ "publications": [ "demo::*" ],
+ "subscriptions": [ "demo::*" ]
}
}
}
\ No newline at end of file
using namespace icinga;
+/**
+ * Constructor for the Endpoint class.
+ */
Endpoint::Endpoint(void)
{
m_ReceivedWelcome = false;
m_SentWelcome = false;
}
+/**
+ * Retrieves the identity of this endpoint.
+ *
+ * @returns The identity of the endpoint.
+ */
string Endpoint::GetIdentity(void) const
{
return m_Identity;
}
+/**
+ * Sets the identity of this endpoint.
+ *
+ * @param identity The new identity of the endpoint.
+ */
void Endpoint::SetIdentity(string identity)
{
m_Identity = identity;
OnIdentityChanged(ea);
}
-bool Endpoint::HasIdentity(void) const
-{
- return !m_Identity.empty();
-}
-
+/**
+ * Retrieves the endpoint manager this endpoint is registered with.
+ *
+ * @returns The EndpointManager object.
+ */
EndpointManager::Ptr Endpoint::GetEndpointManager(void) const
{
return m_EndpointManager.lock();
}
+/**
+ * Sets the endpoint manager this endpoint is registered with.
+ *
+ * @param manager The EndpointManager object.
+ */
void Endpoint::SetEndpointManager(EndpointManager::WeakPtr manager)
{
m_EndpointManager = manager;
}
-void Endpoint::RegisterMethodSink(string method)
-{
- m_MethodSinks.insert(method);
-}
-
-void Endpoint::UnregisterMethodSink(string method)
-{
- m_MethodSinks.erase(method);
-}
-
-bool Endpoint::IsMethodSink(string method) const
-{
- return (m_MethodSinks.find(method) != m_MethodSinks.end());
-}
-
-void Endpoint::ForEachMethodSink(function<int (const NewMethodEventArgs&)> callback)
-{
- for (set<string>::iterator i = m_MethodSinks.begin(); i != m_MethodSinks.end(); i++) {
- NewMethodEventArgs nmea;
- nmea.Source = shared_from_this();
- nmea.Method = *i;
- callback(nmea);
- }
-}
-
-void Endpoint::RegisterMethodSource(string method)
+/**
+ * Registers a topic subscription for this endpoint.
+ *
+ * @param topic The name of the topic.
+ */
+void Endpoint::RegisterSubscription(string topic)
{
- m_MethodSources.insert(method);
+ m_Subscriptions.insert(topic);
}
-void Endpoint::UnregisterMethodSource(string method)
+/**
+ * Removes a topic subscription from this endpoint.
+ *
+ * @param topic The name of the topic.
+ */
+void Endpoint::UnregisterSubscription(string topic)
{
- m_MethodSources.erase(method);
+ m_Subscriptions.erase(topic);
}
-bool Endpoint::IsMethodSource(string method) const
+/**
+ * Checks whether the endpoint has a subscription for the specified topic.
+ *
+ * @param topic The name of the topic.
+ * @returns true if the endpoint is subscribed to the topic, false otherwise.
+ */
+bool Endpoint::HasSubscription(string topic) const
{
- return (m_MethodSources.find(method) != m_MethodSources.end());
+ return (m_Subscriptions.find(topic) != m_Subscriptions.end());
}
-void Endpoint::ForEachMethodSource(function<int (const NewMethodEventArgs&)> callback)
+/**
+ * Registers a topic publication for this endpoint.
+ *
+ * @param topic The name of the topic.
+ */
+void Endpoint::RegisterPublication(string topic)
{
- for (set<string>::iterator i = m_MethodSources.begin(); i != m_MethodSources.end(); i++) {
- NewMethodEventArgs nmea;
- nmea.Source = shared_from_this();
- nmea.Method = *i;
- callback(nmea);
- }
+ m_Publications.insert(topic);
}
-void Endpoint::ClearMethodSinks(void)
+/**
+ * Removes a topic publication from this endpoint.
+ *
+ * @param topic The name of the topic.
+ */
+void Endpoint::UnregisterPublication(string topic)
{
- m_MethodSinks.clear();
+ m_Publications.erase(topic);
}
-void Endpoint::ClearMethodSources(void)
+/**
+ * Checks whether the endpoint has a publication for the specified topic.
+ *
+ * @param topic The name of the topic.
+ * @returns true if the endpoint is publishing this topic, false otherwise.
+ */
+bool Endpoint::HasPublication(string topic) const
{
- m_MethodSources.clear();
+ return (m_Publications.find(topic) != m_Publications.end());
}
-int Endpoint::CountMethodSinks(void) const
+/**
+ * Removes all subscriptions for the endpoint.
+ */
+void Endpoint::ClearSubscriptions(void)
{
- return m_MethodSinks.size();
+ m_Subscriptions.clear();
}
-int Endpoint::CountMethodSources(void) const
+/**
+ * Removes all publications for the endpoint.
+ */
+void Endpoint::ClearPublications(void)
{
- return m_MethodSources.size();
+ m_Publications.clear();
}
-set<string>::const_iterator Endpoint::BeginSinks(void) const
+/**
+ * Returns the beginning of the subscriptions list.
+ *
+ * @returns An iterator that points to the first subscription.
+ */
+Endpoint::ConstTopicIterator Endpoint::BeginSubscriptions(void) const
{
- return m_MethodSinks.begin();
+ return m_Subscriptions.begin();
}
-set<string>::const_iterator Endpoint::EndSinks(void) const
+/**
+ * Returns the end of the subscriptions list.
+ *
+ * @returns An iterator that points past the last subscription.
+ */
+Endpoint::ConstTopicIterator Endpoint::EndSubscriptions(void) const
{
- return m_MethodSinks.end();
+ return m_Subscriptions.end();
}
-set<string>::const_iterator Endpoint::BeginSources(void) const
+/**
+ * Returns the beginning of the publications list.
+ *
+ * @returns An iterator that points to the first publication.
+ */
+Endpoint::ConstTopicIterator Endpoint::BeginPublications(void) const
{
- return m_MethodSources.begin();
+ return m_Publications.begin();
}
-set<string>::const_iterator Endpoint::EndSources(void) const
+/**
+ * Returns the end of the publications list.
+ *
+ * @returns An iterator that points past the last publication.
+ */
+Endpoint::ConstTopicIterator Endpoint::EndPublications(void) const
{
- return m_MethodSources.end();
+ return m_Publications.end();
}
+/**
+ * Sets whether a welcome message has been received from this endpoint.
+ *
+ * @param value Whether we've received a welcome message.
+ */
void Endpoint::SetReceivedWelcome(bool value)
{
m_ReceivedWelcome = value;
}
+/**
+ * Retrieves whether a welcome message has been received from this endpoint.
+ *
+ * @returns Whether we've received a welcome message.
+ */
bool Endpoint::GetReceivedWelcome(void) const
{
return m_ReceivedWelcome;
}
+/**
+ * Sets whether a welcome message has been sent to this endpoint.
+ *
+ * @param value Whether we've sent a welcome message.
+ */
void Endpoint::SetSentWelcome(bool value)
{
m_SentWelcome = value;
}
+/**
+ * Retrieves whether a welcome message has been sent to this endpoint.
+ *
+ * @returns Whether we've sent a welcome message.
+ */
bool Endpoint::GetSentWelcome(void) const
{
return m_SentWelcome;
class EndpointManager;
-struct I2_ICINGA_API NewMethodEventArgs : public EventArgs
-{
- string Method;
-};
-
+/**
+ * An endpoint that can be used to send and receive messages.
+ */
class I2_ICINGA_API Endpoint : public Object
{
private:
- string m_Identity;
- set<string> m_MethodSinks;
- set<string> m_MethodSources;
- bool m_ReceivedWelcome;
- bool m_SentWelcome;
-
- weak_ptr<EndpointManager> m_EndpointManager;
+ string m_Identity; /**< The identity of this endpoint. */
+ set<string> m_Subscriptions; /**< The topics this endpoint is
+ subscribed to. */
+ set<string> m_Publications; /**< The topics this endpoint is
+ publishing. */
+ bool m_ReceivedWelcome; /**< Have we received a welcome message
+ from this endpoint? */
+ bool m_SentWelcome; /**< Have we sent a welcome message to this
+ endpoint? */
+
+ weak_ptr<EndpointManager> m_EndpointManager; /**< The endpoint manager
+ this endpoint is
+ registered with. */
public:
typedef shared_ptr<Endpoint> Ptr;
typedef weak_ptr<Endpoint> WeakPtr;
+ typedef set<string>::const_iterator ConstTopicIterator;
+
Endpoint(void);
virtual string GetAddress(void) const = 0;
string GetIdentity(void) const;
void SetIdentity(string identity);
- bool HasIdentity(void) const;
void SetReceivedWelcome(bool value);
bool GetReceivedWelcome(void) const;
shared_ptr<EndpointManager> GetEndpointManager(void) const;
void SetEndpointManager(weak_ptr<EndpointManager> manager);
- void RegisterMethodSink(string method);
- void UnregisterMethodSink(string method);
- bool IsMethodSink(string method) const;
+ void RegisterSubscription(string topic);
+ void UnregisterSubscription(string topic);
+ bool HasSubscription(string topic) const;
- void RegisterMethodSource(string method);
- void UnregisterMethodSource(string method);
- bool IsMethodSource(string method) const;
+ void RegisterPublication(string topic);
+ void UnregisterPublication(string topic);
+ bool HasPublication(string topic) const;
virtual bool IsLocal(void) const = 0;
virtual bool IsConnected(void) const = 0;
- virtual void ProcessRequest(Endpoint::Ptr sender, const JsonRpcRequest& message) = 0;
- virtual void ProcessResponse(Endpoint::Ptr sender, const JsonRpcResponse& message) = 0;
+ virtual void ProcessRequest(Endpoint::Ptr sender, const RpcRequest& message) = 0;
+ virtual void ProcessResponse(Endpoint::Ptr sender, const RpcResponse& message) = 0;
virtual void Stop(void) = 0;
- Event<NewMethodEventArgs> OnNewMethodSink;
- Event<NewMethodEventArgs> OnNewMethodSource;
-
- void ForEachMethodSink(function<int (const NewMethodEventArgs&)> callback);
- void ForEachMethodSource(function<int (const NewMethodEventArgs&)> callback);
-
- void ClearMethodSinks(void);
- void ClearMethodSources(void);
-
- int CountMethodSinks(void) const;
- int CountMethodSources(void) const;
+ void ClearSubscriptions(void);
+ void ClearPublications(void);
- set<string>::const_iterator BeginSinks(void) const;
- set<string>::const_iterator EndSinks(void) const;
+ ConstTopicIterator BeginSubscriptions(void) const;
+ ConstTopicIterator EndSubscriptions(void) const;
- set<string>::const_iterator BeginSources(void) const;
- set<string>::const_iterator EndSources(void) const;
+ ConstTopicIterator BeginPublications(void) const;
+ ConstTopicIterator EndPublications(void) const;
Event<EventArgs> OnIdentityChanged;
Event<EventArgs> OnSessionEstablished;
using namespace icinga;
+/**
+ * Sets the identity of the endpoint manager. This identity is used when
+ * connecting to remote peers.
+ *
+ * @param identity The new identity.
+ */
void EndpointManager::SetIdentity(string identity)
{
m_Identity = identity;
}
+/**
+ * Retrieves the identity for the endpoint manager.
+ *
+ * @returns The identity.
+ */
string EndpointManager::GetIdentity(void) const
{
return m_Identity;
}
+/**
+ * Sets the SSL context that is used for remote connections.
+ *
+ * @param sslContext The new SSL context.
+ */
void EndpointManager::SetSSLContext(shared_ptr<SSL_CTX> sslContext)
{
m_SSLContext = sslContext;
}
+/**
+ * Retrieves the SSL context that is used for remote connections.
+ *
+ * @returns The SSL context.
+ */
shared_ptr<SSL_CTX> EndpointManager::GetSSLContext(void) const
{
return m_SSLContext;
}
+/**
+ * Creates a new JSON-RPC listener on the specified port.
+ *
+ * @param service The name of the service to listen on (@see getaddrinfo).
+ */
void EndpointManager::AddListener(string service)
{
if (!GetSSLContext())
server->Start();
}
+/**
+ * Creates a new JSON-RPC client and connects to the specified host and port.
+ *
+ * @param node The remote host (@see getaddrinfo).
+ * @param service The remote port (@see getaddrinfo).
+ */
void EndpointManager::AddConnection(string node, string service)
{
stringstream s;
endpoint->Connect(node, service, m_SSLContext);
}
+/**
+ * Registers a new JSON-RPC server with this endpoint manager.
+ *
+ * @param server The JSON-RPC server.
+ */
void EndpointManager::RegisterServer(JsonRpcServer::Ptr server)
{
m_Servers.push_back(server);
server->OnNewClient += bind_weak(&EndpointManager::NewClientHandler, shared_from_this());
}
+/**
+ * Processes a new client connection.
+ *
+ * @param ncea Event arguments.
+ */
int EndpointManager::NewClientHandler(const NewClientEventArgs& ncea)
{
string address = ncea.Client->GetPeerAddress();
return 0;
}
+/**
+ * Unregisters a JSON-RPC server.
+ *
+ * @param server The JSON-RPC server.
+ */
void EndpointManager::UnregisterServer(JsonRpcServer::Ptr server)
{
m_Servers.erase(
// TODO: unbind event
}
+/**
+ * Registers a new endpoint with this endpoint manager.
+ *
+ * @param endpoint The new endpoint.
+ */
void EndpointManager::RegisterEndpoint(Endpoint::Ptr endpoint)
{
if (!endpoint->IsLocal() && endpoint->GetIdentity() != "")
OnNewEndpoint(neea);
}
+/**
+ * Unregisters an endpoint.
+ *
+ * @param endpoint The endpoint.
+ */
void EndpointManager::UnregisterEndpoint(Endpoint::Ptr endpoint)
{
m_Endpoints.erase(
m_Endpoints.end());
}
-void EndpointManager::SendUnicastRequest(Endpoint::Ptr sender, Endpoint::Ptr recipient, const JsonRpcRequest& request, bool fromLocal)
+/**
+ * Sends a unicast message to the specified recipient.
+ *
+ * @param sender The sender of the message.
+ * @param recipient The recipient of the message.
+ * @param message The request.
+ */
+void EndpointManager::SendUnicastMessage(Endpoint::Ptr sender, Endpoint::Ptr recipient, const Message& message)
{
+ /* don't forward messages back to the sender */
if (sender == recipient)
return;
/* don't forward messages between non-local endpoints */
- if (!fromLocal && !recipient->IsLocal())
+ if (!sender->IsLocal() && !recipient->IsLocal())
return;
- string method;
- if (!request.GetMethod(&method))
- throw InvalidArgumentException("Missing 'method' parameter.");
-
- if (recipient->IsMethodSink(method)) {
- //Application::Log(sender->GetAddress() + " -> " + recipient->GetAddress() + ": " + method);
- recipient->ProcessRequest(sender, request);
- }
+ recipient->ProcessRequest(sender, message);
}
-void EndpointManager::SendAnycastRequest(Endpoint::Ptr sender, const JsonRpcRequest& request, bool fromLocal)
+/**
+ * Sends a message to exactly one recipient out of all recipients who have a
+ * subscription for the message's topic.
+ *
+ * @param sender The sender of the message.
+ * @param message The message.
+ */
+void EndpointManager::SendAnycastMessage(Endpoint::Ptr sender, const RpcRequest& message)
{
throw NotImplementedException();
}
-void EndpointManager::SendMulticastRequest(Endpoint::Ptr sender, const JsonRpcRequest& request, bool fromLocal)
+/**
+ * Sends a message to all recipients who have a subscription for the
+ * message's topic.
+ *
+ * @param sender The sender of the message.
+ * @param message The message.
+ */
+void EndpointManager::SendMulticastMessage(Endpoint::Ptr sender, const RpcRequest& message)
{
-#ifdef _DEBUG
string id;
- if (request.GetID(&id))
+ if (message.GetID(&id))
throw InvalidArgumentException("Multicast requests must not have an ID.");
-#endif /* _DEBUG */
string method;
- if (!request.GetMethod(&method))
+ if (!message.GetMethod(&method))
throw InvalidArgumentException("Message is missing the 'method' property.");
for (vector<Endpoint::Ptr>::iterator i = m_Endpoints.begin(); i != m_Endpoints.end(); i++)
{
- SendUnicastRequest(sender, *i, request, fromLocal);
+ Endpoint::Ptr recipient = *i;
+ if (recipient->HasSubscription(method))
+ SendUnicastMessage(sender, recipient, message);
}
}
+/**
+ * Calls the specified callback function for each registered endpoint.
+ *
+ * @param callback The callback function.
+ */
void EndpointManager::ForEachEndpoint(function<int (const NewEndpointEventArgs&)> callback)
{
NewEndpointEventArgs neea;
}
}
+/**
+ * Retrieves an endpoint that has the specified identity.
+ *
+ * @param identity The identity of the endpoint.
+ */
Endpoint::Ptr EndpointManager::GetEndpointByIdentity(string identity) const
{
vector<Endpoint::Ptr>::const_iterator i;
namespace icinga
{
+/**
+ * Event arguments for the "new endpoint registered" event.
+ */
struct I2_ICINGA_API NewEndpointEventArgs : public EventArgs
{
- icinga::Endpoint::Ptr Endpoint;
+ icinga::Endpoint::Ptr Endpoint; /**< The new endpoint. */
};
+/**
+ * Forwards messages between endpoints.
+ */
class I2_ICINGA_API EndpointManager : public Object
{
string m_Identity;
void RegisterEndpoint(Endpoint::Ptr endpoint);
void UnregisterEndpoint(Endpoint::Ptr endpoint);
- void SendUnicastRequest(Endpoint::Ptr sender, Endpoint::Ptr recipient, const JsonRpcRequest& request, bool fromLocal = true);
- void SendAnycastRequest(Endpoint::Ptr sender, const JsonRpcRequest& request, bool fromLocal = true);
- void SendMulticastRequest(Endpoint::Ptr sender, const JsonRpcRequest& request, bool fromLocal = true);
+ void SendUnicastMessage(Endpoint::Ptr sender, Endpoint::Ptr recipient, const Message& message);
+ void SendAnycastMessage(Endpoint::Ptr sender, const RpcRequest& message);
+ void SendMulticastMessage(Endpoint::Ptr sender, const RpcRequest& message);
void ForEachEndpoint(function<int (const NewEndpointEventArgs&)> callback);
using namespace icinga;
+/**
+ * The entry point for the Icinga application.
+ *
+ * @param args Command-line arguments.
+ * @returns An exit status.
+ */
int IcingaApplication::Main(const vector<string>& args)
{
#ifdef _WIN32
#endif /* _WIN32 */
if (args.size() < 2) {
- PrintUsage(args[0]);
+ cout << "Syntax: " << args[0] << " <config-file>" << endl;
return EXIT_FAILURE;
}
return EXIT_SUCCESS;
}
-void IcingaApplication::PrintUsage(const string& programPath)
-{
- cout << "Syntax: " << programPath << " <config-file>" << endl;
-}
-
+/**
+ * Retrieves Icinga's endpoint manager.
+ *
+ * @returns The endpoint manager.
+ */
EndpointManager::Ptr IcingaApplication::GetEndpointManager(void)
{
return m_EndpointManager;
namespace icinga
{
+/**
+ * The Icinga application.
+ */
class I2_ICINGA_API IcingaApplication : public Application
{
private:
int Main(const vector<string>& args);
- void PrintUsage(const string& programPath);
-
EndpointManager::Ptr GetEndpointManager(void);
void SetPrivateKeyFile(string privkey);
namespace icinga
{
+/**
+ * A component that can be loaded into the Icinga application at run-time.
+ */
class I2_ICINGA_API IcingaComponent : public Component
{
protected:
return (bool)m_Client;
}
-void JsonRpcEndpoint::ProcessRequest(Endpoint::Ptr sender, const JsonRpcRequest& message)
+void JsonRpcEndpoint::ProcessRequest(Endpoint::Ptr sender, const RpcRequest& message)
{
if (IsConnected()) {
string id;
}
}
-void JsonRpcEndpoint::ProcessResponse(Endpoint::Ptr sender, const JsonRpcResponse& message)
+void JsonRpcEndpoint::ProcessResponse(Endpoint::Ptr sender, const RpcResponse& message)
{
if (IsConnected())
m_Client->SendMessage(message);
string method;
if (message.GetPropertyString("method", &method)) {
- if (!IsMethodSource(method))
+ if (!HasPublication(method))
return 0;
- JsonRpcRequest request = message;
+ RpcRequest request = message;
string id;
if (request.GetID(&id))
- GetEndpointManager()->SendAnycastRequest(sender, request, false);
+ GetEndpointManager()->SendAnycastMessage(sender, request);
else
- GetEndpointManager()->SendMulticastRequest(sender, request, false);
+ GetEndpointManager()->SendMulticastMessage(sender, request);
} else {
- JsonRpcResponse response = message;
+ RpcResponse response = message;
// TODO: deal with response messages
throw NotImplementedException();
m_PendingCalls.clear();
- // TODO: _only_ clear non-persistent method sources/sinks
- // unregister ourselves if no persistent sources/sinks are left (use a timer for that, once we have a TTL property for the methods)
- ClearMethodSinks();
- ClearMethodSources();
+ // TODO: _only_ clear non-persistent publications/subscriptions
+ // unregister ourselves if no persistent publications/subscriptions are left (use a timer for that, once we have a TTL property for the topics)
+ ClearSubscriptions();
+ ClearPublications();
- if (CountMethodSinks() == 0)
+ // remove the endpoint if there are no more subscriptions */
+ if (BeginSubscriptions() == EndSubscriptions())
GetEndpointManager()->UnregisterEndpoint(static_pointer_cast<Endpoint>(shared_from_this()));
m_Client.reset();
namespace icinga
{
+/**
+ * A JSON-RPC endpoint that can be used to communicate with a remote
+ * Icinga instance. */
class I2_ICINGA_API JsonRpcEndpoint : public Endpoint
{
private:
virtual bool IsLocal(void) const;
virtual bool IsConnected(void) const;
- virtual void ProcessRequest(Endpoint::Ptr sender, const JsonRpcRequest& message);
- virtual void ProcessResponse(Endpoint::Ptr sender, const JsonRpcResponse& message);
+ virtual void ProcessRequest(Endpoint::Ptr sender, const RpcRequest& message);
+ virtual void ProcessResponse(Endpoint::Ptr sender, const RpcResponse& message);
virtual void Stop(void);
};
return true;
}
-void VirtualEndpoint::RegisterMethodHandler(string method, function<int (const NewRequestEventArgs&)> callback)
+void VirtualEndpoint::RegisterTopicHandler(string topic, function<int (const NewRequestEventArgs&)> callback)
{
- m_MethodHandlers[method] += callback;
+ m_TopicHandlers[topic] += callback;
- RegisterMethodSink(method);
+ RegisterSubscription(topic);
}
-void VirtualEndpoint::UnregisterMethodHandler(string method, function<int (const NewRequestEventArgs&)> callback)
+void VirtualEndpoint::UnregisterTopicHandler(string topic, function<int (const NewRequestEventArgs&)> callback)
{
// TODO: implement
- //m_MethodHandlers[method] -= callback;
- //UnregisterMethodSink(method);
+ //m_TopicHandlers[method] -= callback;
+ //UnregisterMethodSubscription(method);
throw NotImplementedException();
}
-void VirtualEndpoint::ProcessRequest(Endpoint::Ptr sender, const JsonRpcRequest& request)
+void VirtualEndpoint::ProcessRequest(Endpoint::Ptr sender, const RpcRequest& request)
{
string method;
if (!request.GetMethod(&method))
return;
- map<string, Event<NewRequestEventArgs> >::iterator i = m_MethodHandlers.find(method);
+ map<string, Event<NewRequestEventArgs> >::iterator i = m_TopicHandlers.find(method);
- if (i == m_MethodHandlers.end())
+ if (i == m_TopicHandlers.end())
return;
NewRequestEventArgs nrea;
i->second(nrea);
}
-void VirtualEndpoint::ProcessResponse(Endpoint::Ptr sender, const JsonRpcResponse& response)
+void VirtualEndpoint::ProcessResponse(Endpoint::Ptr sender, const RpcResponse& response)
{
// TODO: figure out which request this response belongs to and notify the caller
throw NotImplementedException();
typedef weak_ptr<NewRequestEventArgs> WeakPtr;
Endpoint::Ptr Sender;
- JsonRpcRequest Request;
+ RpcRequest Request;
};
+/**
+ * A local endpoint.
+ */
class I2_ICINGA_API VirtualEndpoint : public Endpoint
{
private:
- map< string, Event<NewRequestEventArgs> > m_MethodHandlers;
+ map< string, Event<NewRequestEventArgs> > m_TopicHandlers;
public:
typedef shared_ptr<VirtualEndpoint> Ptr;
typedef weak_ptr<VirtualEndpoint> WeakPtr;
- void RegisterMethodHandler(string method, function<int (const NewRequestEventArgs&)> callback);
- void UnregisterMethodHandler(string method, function<int (const NewRequestEventArgs&)> callback);
+ void RegisterTopicHandler(string topic, function<int (const NewRequestEventArgs&)> callback);
+ void UnregisterTopicHandler(string topic, function<int (const NewRequestEventArgs&)> callback);
virtual string GetAddress(void) const;
virtual bool IsLocal(void) const;
virtual bool IsConnected(void) const;
- virtual void ProcessRequest(Endpoint::Ptr sender, const JsonRpcRequest& message);
- virtual void ProcessResponse(Endpoint::Ptr sender, const JsonRpcResponse& message);
+ virtual void ProcessRequest(Endpoint::Ptr sender, const RpcRequest& message);
+ virtual void ProcessResponse(Endpoint::Ptr sender, const RpcResponse& message);
virtual void Stop(void);
};
i2-jsonrpc.h \
jsonrpcclient.cpp \
jsonrpcclient.h \
- jsonrpcrequest.cpp \
- jsonrpcrequest.h \
- jsonrpcresponse.cpp \
- jsonrpcresponse.h \
jsonrpcserver.cpp \
jsonrpcserver.h \
message.cpp \
message.h \
netstring.cpp \
- netstring.h
+ netstring.h \
+ rpcrequest.cpp \
+ rpcrequest.h \
+ rpcresponse.cpp \
+ rpcresponse.h
libjsonrpc_la_CXXFLAGS = \
-DI2_JSONRPC_BUILD \
#include "variant.h"
#include "dictionary.h"
#include "message.h"
+#include "rpcrequest.h"
+#include "rpcresponse.h"
#include "netstring.h"
-#include "jsonrpcrequest.h"
-#include "jsonrpcresponse.h"
#include "jsonrpcclient.h"
#include "jsonrpcserver.h"
<ItemGroup>
<ClInclude Include="i2-jsonrpc.h" />
<ClInclude Include="jsonrpcclient.h" />
- <ClInclude Include="jsonrpcrequest.h" />
- <ClInclude Include="jsonrpcresponse.h" />
+ <ClInclude Include="rpcrequest.h" />
+ <ClInclude Include="rpcresponse.h" />
<ClInclude Include="jsonrpcserver.h" />
<ClInclude Include="message.h" />
<ClInclude Include="netstring.h" />
</ItemGroup>
<ItemGroup>
<ClCompile Include="jsonrpcclient.cpp" />
- <ClCompile Include="jsonrpcrequest.cpp" />
- <ClCompile Include="jsonrpcresponse.cpp" />
+ <ClCompile Include="rpcrequest.cpp" />
+ <ClCompile Include="rpcresponse.cpp" />
<ClCompile Include="jsonrpcserver.cpp" />
<ClCompile Include="message.cpp" />
<ClCompile Include="netstring.cpp" />
<ClCompile Include="jsonrpcserver.cpp" />
<ClCompile Include="message.cpp" />
<ClCompile Include="netstring.cpp" />
- <ClCompile Include="jsonrpcrequest.cpp" />
- <ClCompile Include="jsonrpcresponse.cpp" />
+ <ClCompile Include="rpcrequest.cpp" />
+ <ClCompile Include="rpcresponse.cpp" />
</ItemGroup>
<ItemGroup>
<ClInclude Include="i2-jsonrpc.h" />
<ClInclude Include="jsonrpcserver.h" />
<ClInclude Include="message.h" />
<ClInclude Include="netstring.h" />
- <ClInclude Include="jsonrpcrequest.h" />
- <ClInclude Include="jsonrpcresponse.h" />
+ <ClInclude Include="rpcrequest.h" />
+ <ClInclude Include="rpcresponse.h" />
</ItemGroup>
</Project>
\ No newline at end of file
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
******************************************************************************/
-#ifndef JSONRPCREQUEST_H
-#define JSONRPCREQUEST_H
+#ifndef RpcRequest_H
+#define RpcRequest_H
namespace icinga
{
-class I2_JSONRPC_API JsonRpcRequest : public Message
+class I2_JSONRPC_API RpcRequest : public Message
{
public:
- JsonRpcRequest(void) : Message() {
+ RpcRequest(void) : Message() {
SetVersion("2.0");
}
- JsonRpcRequest(const Message& message) : Message(message) { }
+ RpcRequest(const Message& message) : Message(message) { }
inline bool GetVersion(string *value) const
{
}
-#endif /* JSONRPCREQUEST_H */
+#endif /* RpcRequest_H */
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
******************************************************************************/
-#ifndef JSONRPCRESPONSE_H
-#define JSONRPCRESPONSE_H
+#ifndef RpcResponse_H
+#define RpcResponse_H
namespace icinga
{
-class I2_JSONRPC_API JsonRpcResponse : public Message
+class I2_JSONRPC_API RpcResponse : public Message
{
public:
- JsonRpcResponse(void) : Message() {
+ RpcResponse(void) : Message() {
SetVersion("2.0");
}
- JsonRpcResponse(const Message& message) : Message(message) { }
+ RpcResponse(const Message& message) : Message(message) { }
inline bool GetVersion(string *value) const
{
}
-#endif /* JSONRPCRESPONSE_H */
+#endif /* RpcResponse_H */