#include "base/logger_fwd.h"
#include "base/utility.h"
#include <boost/smart_ptr/make_shared.hpp>
+#include <boost/exception/diagnostic_information.hpp>
#include <boost/foreach.hpp>
using namespace icinga;
#include "icinga/host.h"
#include "icinga/timeperiod.h"
#include "icinga/notification.h"
-#include "remoting/requestmessage.h"
-#include "remoting/endpoint.h"
#include "base/i2-base.h"
#include "base/dynamicobject.h"
#include "base/array.h"
i2-remoting.h \
jsonrpc.cpp \
jsonrpc.h \
- messagepart.cpp \
- messagepart.h \
- remoting-type.cpp \
- requestmessage.cpp \
- requestmessage.h \
- responsemessage.cpp \
- responsemessage.h
+ remoting-type.cpp
libremoting_la_CPPFLAGS = \
-DI2_REMOTING_BUILD \
REGISTER_TYPE(Endpoint);
boost::signals2::signal<void (const Endpoint::Ptr&)> Endpoint::OnConnected;
-
-/**
- * Helper function for creating new endpoint objects.
- *
- * @param name The name of the new endpoint.
- * @param replicated Whether replication is enabled for the endpoint object.
- * @param local Whether the new endpoint should be local.
- * @returns The new endpoint.
- */
-Endpoint::Ptr Endpoint::MakeEndpoint(const String& name, bool replicated, bool local)
-{
- ConfigItemBuilder::Ptr endpointConfig = boost::make_shared<ConfigItemBuilder>();
- endpointConfig->SetType("Endpoint");
- endpointConfig->SetName((!replicated && local) ? "local:" + name : name);
- //TODO: endpointConfig->SetLocal(!replicated);
- endpointConfig->AddExpression("local", OperatorSet, local);
-
- ConfigItem::Ptr item = endpointConfig->Compile();
- DynamicObject::Ptr object = item->Commit();
- return dynamic_pointer_cast<Endpoint>(object);
-}
-
-/**
- * Checks whether this is a local endpoint.
- *
- * @returns true if this is a local endpoint, false otherwise.
- */
-bool Endpoint::IsLocalEndpoint(void) const
-{
- return m_Local;
-}
+boost::signals2::signal<void (const Endpoint::Ptr&, const Dictionary::Ptr&)> Endpoint::OnMessageReceived;
/**
* Checks whether this endpoint is connected.
*/
bool Endpoint::IsConnected(void) const
{
- if (IsLocalEndpoint()) {
- return true;
- } else {
- return GetClient();
- }
+ return GetClient();
}
Stream::Ptr Endpoint::GetClient(void) const
OnConnected(GetSelf());
}
-/**
- * Registers a topic subscription for this endpoint.
- *
- * @param topic The name of the topic.
- */
-void Endpoint::RegisterSubscription(const String& topic)
-{
- Dictionary::Ptr subscriptions = GetSubscriptions();
-
- if (!subscriptions)
- subscriptions = boost::make_shared<Dictionary>();
-
- if (!subscriptions->Contains(topic)) {
- Dictionary::Ptr newSubscriptions = subscriptions->ShallowClone();
- newSubscriptions->Set(topic, topic);
-
- ObjectLock olock(this);
- SetSubscriptions(newSubscriptions);
- }
-}
-
-/**
- * Removes a topic subscription from this endpoint.
- *
- * @param topic The name of the topic.
- */
-void Endpoint::UnregisterSubscription(const String& topic)
-{
- Dictionary::Ptr subscriptions = GetSubscriptions();
-
- if (!subscriptions)
- return;
-
- if (subscriptions->Contains(topic)) {
- Dictionary::Ptr newSubscriptions = subscriptions->ShallowClone();
- newSubscriptions->Remove(topic);
- SetSubscriptions(newSubscriptions);
- }
-}
-
-/**
- * Checks whether the endpoint has a subscription for the specified topic.
- *
- * @param topic The name of the topic.
- * @returns true if the endpoint is subscribed to the topic, false otherwise.
- */
-bool Endpoint::HasSubscription(const String& topic) const
-{
- Dictionary::Ptr subscriptions = GetSubscriptions();
-
- return (subscriptions && subscriptions->Contains(topic));
-}
-
-/**
- * Removes all subscriptions for the endpoint.
- */
-void Endpoint::ClearSubscriptions(void)
-{
- m_Subscriptions = Empty;
-}
-
-Dictionary::Ptr Endpoint::GetSubscriptions(void) const
-{
- return m_Subscriptions;
-}
-
-void Endpoint::SetSubscriptions(const Dictionary::Ptr& subscriptions)
-{
- subscriptions->Seal();
- m_Subscriptions = subscriptions;
-}
-
-void Endpoint::RegisterTopicHandler(const String& topic, const boost::function<Endpoint::Callback>& callback)
-{
- ObjectLock olock(this);
-
- std::map<String, shared_ptr<boost::signals2::signal<Endpoint::Callback> > >::iterator it;
- it = m_TopicHandlers.find(topic);
-
- shared_ptr<boost::signals2::signal<Endpoint::Callback> > sig;
-
- if (it == m_TopicHandlers.end()) {
- sig = boost::make_shared<boost::signals2::signal<Endpoint::Callback> >();
- m_TopicHandlers.insert(make_pair(topic, sig));
- } else {
- sig = it->second;
- }
-
- sig->connect(callback);
-
- olock.Unlock();
-
- RegisterSubscription(topic);
-}
-
-void Endpoint::ProcessRequest(const Endpoint::Ptr& sender, const RequestMessage& request)
+void Endpoint::SendMessage(const Dictionary::Ptr& message)
{
if (!IsConnected()) {
// TODO: persist the message
return;
}
- if (IsLocalEndpoint()) {
- ObjectLock olock(this);
-
- String method;
- if (!request.GetMethod(&method))
- return;
-
- std::map<String, shared_ptr<boost::signals2::signal<Endpoint::Callback> > >::iterator it;
- it = m_TopicHandlers.find(method);
-
- if (it == m_TopicHandlers.end())
- return;
-
- Utility::QueueAsyncCallback(boost::bind(boost::ref(*it->second), GetSelf(), sender, request));
- } else {
- try {
- JsonRpc::SendMessage(GetClient(), request);
- } catch (const std::exception& ex) {
- std::ostringstream msgbuf;
- msgbuf << "Error while sending JSON-RPC message for endpoint '" << GetName() << "': " << boost::diagnostic_information(ex);
- Log(LogWarning, "remoting", msgbuf.str());
-
- m_Client.reset();
- }
- }
-}
-
-void Endpoint::ProcessResponse(const Endpoint::Ptr& sender, const ResponseMessage& response)
-{
- if (!IsConnected())
- return;
+ try {
+ JsonRpc::SendMessage(GetClient(), message);
+ } catch (const std::exception& ex) {
+ std::ostringstream msgbuf;
+ msgbuf << "Error while sending JSON-RPC message for endpoint '" << GetName() << "': " << boost::diagnostic_information(ex);
+ Log(LogWarning, "remoting", msgbuf.str());
- if (IsLocalEndpoint())
- EndpointManager::GetInstance()->ProcessResponseMessage(sender, response);
- else {
- try {
- JsonRpc::SendMessage(GetClient(), response);
- } catch (const std::exception& ex) {
- std::ostringstream msgbuf;
- msgbuf << "Error while sending JSON-RPC message for endpoint '" << GetName() << "': " << boost::diagnostic_information(ex);
- Log(LogWarning, "remoting", msgbuf.str());
-
- m_Client.reset();
- }
+ m_Client.reset();
}
}
void Endpoint::MessageThreadProc(const Stream::Ptr& stream)
{
for (;;) {
- MessagePart message;
+ Dictionary::Ptr message;
try {
message = JsonRpc::ReadMessage(stream);
m_Client.reset();
}
- Endpoint::Ptr sender = GetSelf();
-
- if (ResponseMessage::IsResponseMessage(message)) {
- /* rather than routing the message to the right virtual
- * endpoint we just process it here right away. */
- EndpointManager::GetInstance()->ProcessResponseMessage(sender, message);
- return;
- }
-
- RequestMessage request = message;
-
- String method;
- if (!request.GetMethod(&method))
- return;
-
- String id;
- if (request.GetID(&id))
- EndpointManager::GetInstance()->SendAnycastMessage(sender, request);
- else
- EndpointManager::GetInstance()->SendMulticastMessage(sender, request);
+ Utility::QueueAsyncCallback(bind(boost::ref(Endpoint::OnMessageReceived), GetSelf(), message));
}
}
*
* @returns The node address (hostname).
*/
-String Endpoint::GetNode(void) const
+String Endpoint::GetHost(void) const
{
ObjectLock olock(this);
- return m_Node;
+ return m_Host;
}
/**
*
* @returns The service name (port).
*/
-String Endpoint::GetService(void) const
+String Endpoint::GetPort(void) const
{
ObjectLock olock(this);
- return m_Service;
+ return m_Port;
}
void Endpoint::InternalSerialize(const Dictionary::Ptr& bag, int attributeTypes) const
if (attributeTypes & Attribute_Config) {
bag->Set("local", m_Local);
- bag->Set("node", m_Node);
- bag->Set("service", m_Service);
+ bag->Set("host", m_Host);
+ bag->Set("port", m_Port);
}
-
- bag->Set("subscriptions", m_Subscriptions);
}
void Endpoint::InternalDeserialize(const Dictionary::Ptr& bag, int attributeTypes)
if (attributeTypes & Attribute_Config) {
m_Local = bag->Get("local");
- m_Node = bag->Get("node");
- m_Service = bag->Get("service");
+ m_Host = bag->Get("host");
+ m_Port = bag->Get("port");
}
-
- bag->Set("subscriptions", m_Subscriptions);
-}
\ No newline at end of file
+}
#define ENDPOINT_H
#include "remoting/i2-remoting.h"
-#include "remoting/requestmessage.h"
-#include "remoting/responsemessage.h"
#include "base/dynamicobject.h"
#include "base/stream.h"
#include <boost/signals2.hpp>
DECLARE_PTR_TYPEDEFS(Endpoint);
DECLARE_TYPENAME(Endpoint);
- typedef void (Callback)(const Endpoint::Ptr&, const Endpoint::Ptr&, const RequestMessage&);
+ static boost::signals2::signal<void (const Endpoint::Ptr&)> OnConnected;
+ static boost::signals2::signal<void (const Endpoint::Ptr&, const Dictionary::Ptr&)> OnMessageReceived;
Stream::Ptr GetClient(void) const;
void SetClient(const Stream::Ptr& client);
- void RegisterSubscription(const String& topic);
- void UnregisterSubscription(const String& topic);
- bool HasSubscription(const String& topic) const;
-
- Dictionary::Ptr GetSubscriptions(void) const;
- void SetSubscriptions(const Dictionary::Ptr& subscriptions);
-
- bool IsLocalEndpoint(void) const;
bool IsConnected(void) const;
- void ProcessRequest(const Endpoint::Ptr& sender, const RequestMessage& message);
- void ProcessResponse(const Endpoint::Ptr& sender, const ResponseMessage& message);
-
- void ClearSubscriptions(void);
-
- void RegisterTopicHandler(const String& topic, const boost::function<Callback>& callback);
+ void SendMessage(const Dictionary::Ptr& request);
- String GetNode(void) const;
- String GetService(void) const;
-
- static Endpoint::Ptr MakeEndpoint(const String& name, bool replicated, bool local = true);
-
- static boost::signals2::signal<void (const Endpoint::Ptr&)> OnConnected;
+ String GetHost(void) const;
+ String GetPort(void) const;
protected:
virtual void InternalSerialize(const Dictionary::Ptr& bag, int attributeTypes) const;
private:
bool m_Local;
Dictionary::Ptr m_Subscriptions;
- String m_Node;
- String m_Service;
+ String m_Host;
+ String m_Port;
Stream::Ptr m_Client;
- bool m_ReceivedWelcome; /**< Have we received a welcome message
- from this endpoint? */
- bool m_SentWelcome; /**< Have we sent a welcome message to this
- endpoint? */
-
- std::map<String, shared_ptr<boost::signals2::signal<Callback> > > m_TopicHandlers;
-
void MessageThreadProc(const Stream::Ptr& stream);
};
* Constructor for the EndpointManager class.
*/
EndpointManager::EndpointManager(void)
- : m_NextMessageID(0)
{
- m_RequestTimer = boost::make_shared<Timer>();
- m_RequestTimer->OnTimerExpired.connect(boost::bind(&EndpointManager::RequestTimerHandler, this));
- m_RequestTimer->SetInterval(5);
- m_RequestTimer->Start();
-
- m_SubscriptionTimer = boost::make_shared<Timer>();
- m_SubscriptionTimer->OnTimerExpired.connect(boost::bind(&EndpointManager::SubscriptionTimerHandler, this));
- m_SubscriptionTimer->SetInterval(5);
- m_SubscriptionTimer->Start();
-
m_ReconnectTimer = boost::make_shared<Timer>();
m_ReconnectTimer->OnTimerExpired.connect(boost::bind(&EndpointManager::ReconnectTimerHandler, this));
m_ReconnectTimer->SetInterval(5);
ObjectLock olock(this);
m_Identity = identity;
-
- if (m_Endpoint)
- m_Endpoint->Stop();
-
- Endpoint::Ptr endpoint = DynamicObject::GetObject<Endpoint>(identity);
-
- if (endpoint)
- m_Endpoint = endpoint;
- else
- m_Endpoint = Endpoint::MakeEndpoint(identity, true, true);
}
/**
Endpoint::Ptr endpoint = Endpoint::GetByName(identity);
- if (!endpoint)
- endpoint = Endpoint::MakeEndpoint(identity, true);
-
- endpoint->SetClient(tlsStream);
-}
-
-/**
- * Sends an anonymous unicast message to the specified recipient.
- *
- * @param recipient The recipient of the message.
- * @param message The message.
- */
-void EndpointManager::SendUnicastMessage(const Endpoint::Ptr& recipient,
- const MessagePart& message)
-{
- SendUnicastMessage(Endpoint::Ptr(), recipient, message);
-}
-
-/**
- * Sends a unicast message to the specified recipient.
- *
- * @param sender The sender of the message.
- * @param recipient The recipient of the message.
- * @param message The message.
- */
-void EndpointManager::SendUnicastMessage(const Endpoint::Ptr& sender,
- const Endpoint::Ptr& recipient, const MessagePart& message)
-{
- /* don't forward messages between non-local endpoints, assume that
- * anonymous senders (sender == null) are local */
-// if ((sender && !sender->IsLocal()) && !recipient->IsLocal())
-// return;
-
- if (ResponseMessage::IsResponseMessage(message))
- recipient->ProcessResponse(sender, message);
- else
- recipient->ProcessRequest(sender, message);
-}
-
-/**
- * Sends a message to exactly one recipient out of all recipients who have a
- * subscription for the message's topic.
- *
- * @param sender The sender of the message.
- * @param message The message.
- */
-void EndpointManager::SendAnycastMessage(const Endpoint::Ptr& sender,
- const RequestMessage& message)
-{
- String method;
- if (!message.GetMethod(&method))
- BOOST_THROW_EXCEPTION(std::invalid_argument("Message is missing the 'method' property."));
-
- std::vector<Endpoint::Ptr> candidates;
-
- BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects<Endpoint>()) {
- /* don't forward messages between non-local endpoints */
-// if ((sender && !sender->IsLocal()) && !endpoint->IsLocal())
-// continue;
-
- if (endpoint->HasSubscription(method))
- candidates.push_back(endpoint);
- }
-
- if (candidates.empty())
+ if (!endpoint) {
+ Log(LogInformation, "remoting", "Closing endpoint '" + identity + "': No configuration available.");
return;
-
- Endpoint::Ptr recipient = candidates[rand() % candidates.size()];
- SendUnicastMessage(sender, recipient, message);
-}
-
-/**
- * Sends an anonymous message to all recipients who have a subscription for the
- * message's topic.
- *
- * @param message The message.
- */
-void EndpointManager::SendMulticastMessage(const RequestMessage& message)
-{
- SendMulticastMessage(Endpoint::Ptr(), message);
-}
-
-/**
- * Sends a message to all recipients who have a subscription for the
- * message's topic.
- *
- * @param sender The sender of the message.
- * @param message The message.
- */
-void EndpointManager::SendMulticastMessage(const Endpoint::Ptr& sender,
- const RequestMessage& message)
-{
- String id;
- if (message.GetID(&id))
- BOOST_THROW_EXCEPTION(std::invalid_argument("Multicast requests must not have an ID."));
-
- String method;
- if (!message.GetMethod(&method))
- BOOST_THROW_EXCEPTION(std::invalid_argument("Message is missing the 'method' property."));
-
- BOOST_FOREACH(const Endpoint::Ptr& recipient, DynamicType::GetObjects<Endpoint>()) {
- /* don't forward messages back to the sender */
- if (sender == recipient)
- continue;
-
- Log(LogDebug, "remoting", "Send multicast message using method " + method);
- if (recipient->HasSubscription(method))
- SendUnicastMessage(sender, recipient, message);
}
-}
-
-void EndpointManager::SendAPIMessage(const Endpoint::Ptr& sender, const Endpoint::Ptr& recipient,
- RequestMessage& message,
- const EndpointManager::APICallback& callback, double timeout)
-{
- ObjectLock olock(this);
-
- m_NextMessageID++;
-
- String id = Convert::ToString(m_NextMessageID);
- message.SetID(id);
-
- PendingRequest pr;
- pr.Request = message;
- pr.Callback = callback;
- pr.Timeout = Utility::GetTime() + timeout;
-
- m_Requests[id] = pr;
-
- if (!recipient)
- SendAnycastMessage(sender, message);
- else
- SendUnicastMessage(sender, recipient, message);
-}
-bool EndpointManager::RequestTimeoutLessComparer(const std::pair<String, PendingRequest>& a,
- const std::pair<String, PendingRequest>& b)
-{
- return a.second.Timeout < b.second.Timeout;
+ endpoint->SetClient(tlsStream);
}
-void EndpointManager::SubscriptionTimerHandler(void)
-{
- Dictionary::Ptr subscriptions = boost::make_shared<Dictionary>();
-
- BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects<Endpoint>()) {
- /* don't copy subscriptions from non-local endpoints or the identity endpoint */
-// if (!endpoint->IsLocalEndpoint() || endpoint == m_Endpoint)
+///**
+// * Sends an anonymous unicast message to the specified recipient.
+// *
+// * @param recipient The recipient of the message.
+// * @param message The message.
+// */
+//void EndpointManager::SendUnicastMessage(const Endpoint::Ptr& recipient,
+// const MessagePart& message)
+//{
+// SendUnicastMessage(Endpoint::Ptr(), recipient, message);
+//}
+
+///**
+// * Sends a unicast message to the specified recipient.
+// *
+// * @param sender The sender of the message.
+// * @param recipient The recipient of the message.
+// * @param message The message.
+// */
+//void EndpointManager::SendUnicastMessage(const Endpoint::Ptr& sender,
+// const Endpoint::Ptr& recipient, const MessagePart& message)
+//{
+// /* don't forward messages between non-local endpoints, assume that
+// * anonymous senders (sender == null) are local */
+//// if ((sender && !sender->IsLocal()) && !recipient->IsLocal())
+//// return;
+//
+// if (ResponseMessage::IsResponseMessage(message))
+// recipient->ProcessResponse(sender, message);
+// else
+// recipient->ProcessRequest(sender, message);
+//}
+
+///**
+// * Sends a message to exactly one recipient out of all recipients who have a
+// * subscription for the message's topic.
+// *
+// * @param sender The sender of the message.
+// * @param message The message.
+// */
+//void EndpointManager::SendAnycastMessage(const Endpoint::Ptr& sender,
+// const RequestMessage& message)
+//{
+// String method;
+// if (!message.GetMethod(&method))
+// BOOST_THROW_EXCEPTION(std::invalid_argument("Message is missing the 'method' property."));
+//
+// std::vector<Endpoint::Ptr> candidates;
+//
+// BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects<Endpoint>()) {
+// /* don't forward messages between non-local endpoints */
+//// if ((sender && !sender->IsLocal()) && !endpoint->IsLocal())
+//// continue;
+//
+// if (endpoint->HasSubscription(method))
+// candidates.push_back(endpoint);
+// }
+//
+// if (candidates.empty())
+// return;
+//
+// Endpoint::Ptr recipient = candidates[rand() % candidates.size()];
+// SendUnicastMessage(sender, recipient, message);
+//}
+
+///**
+// * Sends an anonymous message to all recipients who have a subscription for the
+// * message's topic.
+// *
+// * @param message The message.
+// */
+//void EndpointManager::SendMulticastMessage(const RequestMessage& message)
+//{
+// SendMulticastMessage(Endpoint::Ptr(), message);
+//}
+
+///**
+// * Sends a message to all recipients who have a subscription for the
+// * message's topic.
+// *
+// * @param sender The sender of the message.
+// * @param message The message.
+// */
+//void EndpointManager::SendBroadcastMessage(const Endpoint::Ptr& sender,
+// const RequestMessage& message)
+//{
+// String method;
+// if (!message.GetMethod(&method))
+// BOOST_THROW_EXCEPTION(std::invalid_argument("Message is missing the 'method' property."));
+//
+// BOOST_FOREACH(const Endpoint::Ptr& recipient, DynamicType::GetObjects<Endpoint>()) {
+// /* don't forward messages back to the sender */
+// if (sender == recipient)
// continue;
-
- Dictionary::Ptr endpointSubscriptions = endpoint->GetSubscriptions();
-
- if (endpointSubscriptions) {
- ObjectLock olock(endpointSubscriptions);
-
- String topic;
- BOOST_FOREACH(boost::tie(boost::tuples::ignore, topic), endpointSubscriptions) {
- subscriptions->Set(topic, topic);
- }
- }
- }
-
- subscriptions->Seal();
-
- if (m_Endpoint) {
- ObjectLock olock(m_Endpoint);
- m_Endpoint->SetSubscriptions(subscriptions);
- }
-}
+//
+// Log(LogDebug, "remoting", "Send multicast message using method " + method);
+// if (recipient->HasSubscription(method))
+// SendUnicastMessage(sender, recipient, message);
+// }
+//}
+
+//void EndpointManager::SendAPIMessage(const Endpoint::Ptr& sender, const Endpoint::Ptr& recipient,
+// RequestMessage& message,
+// const EndpointManager::APICallback& callback, double timeout)
+//{
+// ObjectLock olock(this);
+//
+// m_NextMessageID++;
+//
+// String id = Convert::ToString(m_NextMessageID);
+// message.SetID(id);
+//
+// PendingRequest pr;
+// pr.Request = message;
+// pr.Callback = callback;
+// pr.Timeout = Utility::GetTime() + timeout;
+//
+// m_Requests[id] = pr;
+//
+// if (!recipient)
+// SendAnycastMessage(sender, message);
+// else
+// SendUnicastMessage(sender, recipient, message);
+//}
+//
+//bool EndpointManager::RequestTimeoutLessComparer(const std::pair<String, PendingRequest>& a,
+// const std::pair<String, PendingRequest>& b)
+//{
+// return a.second.Timeout < b.second.Timeout;
+//}
+//
+//void EndpointManager::SubscriptionTimerHandler(void)
+//{
+// Dictionary::Ptr subscriptions = boost::make_shared<Dictionary>();
+//
+// BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects<Endpoint>()) {
+// /* don't copy subscriptions from non-local endpoints or the identity endpoint */
+//// if (!endpoint->IsLocalEndpoint() || endpoint == m_Endpoint)
+//// continue;
+//
+// Dictionary::Ptr endpointSubscriptions = endpoint->GetSubscriptions();
+//
+// if (endpointSubscriptions) {
+// ObjectLock olock(endpointSubscriptions);
+//
+// String topic;
+// BOOST_FOREACH(boost::tie(boost::tuples::ignore, topic), endpointSubscriptions) {
+// subscriptions->Set(topic, topic);
+// }
+// }
+// }
+//
+// subscriptions->Seal();
+//
+// if (m_Endpoint) {
+// ObjectLock olock(m_Endpoint);
+// m_Endpoint->SetSubscriptions(subscriptions);
+// }
+//}
void EndpointManager::ReconnectTimerHandler(void)
{
if (endpoint->IsConnected() || endpoint == m_Endpoint)
continue;
- String node, service;
- node = endpoint->GetNode();
- service = endpoint->GetService();
+ String host, port;
+ host = endpoint->GetHost();
+ port = endpoint->GetPort();
- if (node.IsEmpty() || service.IsEmpty()) {
+ if (host.IsEmpty() || port.IsEmpty()) {
Log(LogWarning, "icinga", "Can't reconnect "
"to endpoint '" + endpoint->GetName() + "': No "
- "node/service information.");
+ "host/port information.");
continue;
}
- AddConnection(node, service);
+ AddConnection(host, port);
}
}
-void EndpointManager::RequestTimerHandler(void)
-{
- ObjectLock olock(this);
-
- std::map<String, PendingRequest>::iterator it;
- for (it = m_Requests.begin(); it != m_Requests.end(); ++it) {
- if (it->second.HasTimedOut()) {
- it->second.Callback(Endpoint::Ptr(), it->second.Request,
- ResponseMessage(), true);
-
- m_Requests.erase(it);
-
- break;
- }
- }
-}
-
-void EndpointManager::ProcessResponseMessage(const Endpoint::Ptr& sender,
- const ResponseMessage& message)
-{
- ObjectLock olock(this);
-
- String id;
- if (!message.GetID(&id))
- BOOST_THROW_EXCEPTION(std::invalid_argument("Response message must have a message ID."));
-
- std::map<String, PendingRequest>::iterator it;
- it = m_Requests.find(id);
-
- if (it == m_Requests.end())
- return;
-
- it->second.Callback(sender, it->second.Request, message, false);
-
- m_Requests.erase(it);
-}
+//void EndpointManager::RequestTimerHandler(void)
+//{
+// ObjectLock olock(this);
+//
+// std::map<String, PendingRequest>::iterator it;
+// for (it = m_Requests.begin(); it != m_Requests.end(); ++it) {
+// if (it->second.HasTimedOut()) {
+// it->second.Callback(Endpoint::Ptr(), it->second.Request,
+// ResponseMessage(), true);
+//
+// m_Requests.erase(it);
+//
+// break;
+// }
+// }
+//}
+
+//void EndpointManager::ProcessResponseMessage(const Endpoint::Ptr& sender,
+// const ResponseMessage& message)
+//{
+// ObjectLock olock(this);
+//
+// String id;
+// if (!message.GetID(&id))
+// BOOST_THROW_EXCEPTION(std::invalid_argument("Response message must have a message ID."));
+//
+// std::map<String, PendingRequest>::iterator it;
+// it = m_Requests.find(id);
+//
+// if (it == m_Requests.end())
+// return;
+//
+// it->second.Callback(sender, it->second.Request, message, false);
+//
+// m_Requests.erase(it);
+//}
EndpointManager *EndpointManager::GetInstance(void)
{
void AddListener(const String& service);
void AddConnection(const String& node, const String& service);
- void SendUnicastMessage(const Endpoint::Ptr& recipient, const MessagePart& message);
- void SendUnicastMessage(const Endpoint::Ptr& sender, const Endpoint::Ptr& recipient, const MessagePart& message);
- void SendAnycastMessage(const Endpoint::Ptr& sender, const RequestMessage& message);
- void SendMulticastMessage(const RequestMessage& message);
- void SendMulticastMessage(const Endpoint::Ptr& sender, const RequestMessage& message);
-
- typedef boost::function<void(const Endpoint::Ptr, const RequestMessage&, const ResponseMessage&, bool TimedOut)> APICallback;
-
- void SendAPIMessage(const Endpoint::Ptr& sender, const Endpoint::Ptr& recipient, RequestMessage& message,
- const APICallback& callback, double timeout = 30);
-
- void ProcessResponseMessage(const Endpoint::Ptr& sender, const ResponseMessage& message);
+ //void SendUnicastMessage(const Endpoint::Ptr& recipient, const Dictionary::Ptr& message);
+ //void SendUnicastMessage(const Endpoint::Ptr& sender, const Endpoint::Ptr& recipient, const MessagePart& message);
+ //void SendAnycastMessage(const Endpoint::Ptr& sender, const RequestMessage& message);
+ //void SendMulticastMessage(const RequestMessage& message);
+ //void SendMulticastMessage(const Endpoint::Ptr& sender, const RequestMessage& message);
boost::signals2::signal<void (const Endpoint::Ptr&)> OnNewEndpoint;
shared_ptr<SSL_CTX> m_SSLContext;
- Timer::Ptr m_SubscriptionTimer;
-
Timer::Ptr m_ReconnectTimer;
std::set<TcpSocket::Ptr> m_Servers;
- /**
- * Information about a pending API request.
- *
- * @ingroup remoting
- */
- struct I2_REMOTING_API PendingRequest
- {
- double Timeout;
- RequestMessage Request;
- boost::function<void(const Endpoint::Ptr, const RequestMessage&, const ResponseMessage&, bool TimedOut)> Callback;
-
- bool HasTimedOut(void) const
- {
- return Utility::GetTime() > Timeout;
- }
- };
-
- long m_NextMessageID;
- std::map<String, PendingRequest> m_Requests;
- Timer::Ptr m_RequestTimer;
-
- static bool RequestTimeoutLessComparer(const std::pair<String, PendingRequest>& a, const std::pair<String, PendingRequest>& b);
- void RequestTimerHandler(void);
-
- void SubscriptionTimerHandler(void);
-
void ReconnectTimerHandler(void);
void NewClientHandler(const Socket::Ptr& client, TlsRole role);
*
* @param message The message.
*/
-void JsonRpc::SendMessage(const Stream::Ptr& stream, const MessagePart& message)
+void JsonRpc::SendMessage(const Stream::Ptr& stream, const Dictionary::Ptr& message)
{
- Value value = message.GetDictionary();
- String json = value.Serialize();
+ String json = Value(message).Serialize();
//std::cerr << ">> " << json << std::endl;
NetString::WriteStringToStream(stream, json);
}
-MessagePart JsonRpc::ReadMessage(const Stream::Ptr& stream)
+Dictionary::Ptr JsonRpc::ReadMessage(const Stream::Ptr& stream)
{
String jsonString;
if (!NetString::ReadStringFromStream(stream, &jsonString))
" message must be a dictionary."));
}
- return MessagePart(value);
+ return value;
}
#define JSONRPC_H
#include "remoting/i2-remoting.h"
-#include "remoting/messagepart.h"
#include "base/stream.h"
+#include "base/dictionary.h"
namespace icinga
{
class I2_REMOTING_API JsonRpc
{
public:
- static void SendMessage(const Stream::Ptr& stream, const MessagePart& message);
- static MessagePart ReadMessage(const Stream::Ptr& stream);
+ static void SendMessage(const Stream::Ptr& stream, const Dictionary::Ptr& message);
+ static Dictionary::Ptr ReadMessage(const Stream::Ptr& stream);
private:
JsonRpc(void);
+++ /dev/null
-/******************************************************************************
- * Icinga 2 *
- * Copyright (C) 2012 Icinga Development Team (http://www.icinga.org/) *
- * *
- * This program is free software; you can redistribute it and/or *
- * modify it under the terms of the GNU General Public License *
- * as published by the Free Software Foundation; either version 2 *
- * of the License, or (at your option) any later version. *
- * *
- * This program is distributed in the hope that it will be useful, *
- * but WITHOUT ANY WARRANTY; without even the implied warranty of *
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
- * GNU General Public License for more details. *
- * *
- * You should have received a copy of the GNU General Public License *
- * along with this program; if not, write to the Free Software Foundation *
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
- ******************************************************************************/
-
-#include "remoting/messagepart.h"
-#include <boost/smart_ptr/make_shared.hpp>
-
-using namespace icinga;
-
-/**
- * Constructor for the MessagePart class.
- */
-MessagePart::MessagePart(void)
-{
- m_Dictionary = boost::make_shared<Dictionary>();
-}
-
-/**
- * Constructor for the MessagePart class.
- *
- * @param dictionary The dictionary that this MessagePart object should wrap.
- */
-MessagePart::MessagePart(const Dictionary::Ptr& dictionary)
-{
- m_Dictionary = dictionary;
-}
-
-/**
- * Copy-constructor for the MessagePart class.
- *
- * @param message The message that should be copied.
- */
-MessagePart::MessagePart(const MessagePart& message)
-{
- m_Dictionary = message.GetDictionary();
-}
-
-/**
- * Retrieves the underlying dictionary for this message.
- *
- * @returns A dictionary.
- */
-Dictionary::Ptr MessagePart::GetDictionary(void) const
-{
- return m_Dictionary;
-}
-
-/**
- * Retrieves a property's value.
- *
- * @param key The name of the property.
- * @param[out] value The value.
- * @returns true if the value was retrieved, false otherwise.
- */
-bool MessagePart::Get(String key, MessagePart *value) const
-{
- Value v;
- v = GetDictionary()->Get(key);
-
- if (!v.IsObjectType<Dictionary>())
- return false;
-
- Dictionary::Ptr dictionary = v;
-
- MessagePart mp(dictionary);
- *value = mp;
- return true;
-}
-
-/**
- * Sets a property's value.
- *
- * @param key The name of the property.
- * @param value The value.
- */
-void MessagePart::Set(String key, const MessagePart& value)
-{
- GetDictionary()->Set(key, value.GetDictionary());
-}
-
-/**
- * Returns an iterator that points to the first element of the dictionary
- * which holds the properties for the message.
- *
- * @returns An iterator.
- */
-Dictionary::Iterator MessagePart::Begin(void)
-{
- return GetDictionary()->Begin();
-}
-
-/**
- * Returns an iterator that points past the last element of the dictionary
- * which holds the properties for the message.
- *
- * @returns An iterator.
- */
-Dictionary::Iterator MessagePart::End(void)
-{
- return GetDictionary()->End();
-}
-
-/**
- * Checks whether the message contains the specified element.
- *
- * @param key The name of the element.
- * @returns true if the message contains the element, false otherwise.
- */
-bool MessagePart::Contains(const String& key) const
-{
- return GetDictionary()->Contains(key);
-}
+++ /dev/null
-/******************************************************************************
- * Icinga 2 *
- * Copyright (C) 2012 Icinga Development Team (http://www.icinga.org/) *
- * *
- * This program is free software; you can redistribute it and/or *
- * modify it under the terms of the GNU General Public License *
- * as published by the Free Software Foundation; either version 2 *
- * of the License, or (at your option) any later version. *
- * *
- * This program is distributed in the hope that it will be useful, *
- * but WITHOUT ANY WARRANTY; without even the implied warranty of *
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
- * GNU General Public License for more details. *
- * *
- * You should have received a copy of the GNU General Public License *
- * along with this program; if not, write to the Free Software Foundation *
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
- ******************************************************************************/
-
-#ifndef MESSAGEPART_H
-#define MESSAGEPART_H
-
-#include "remoting/i2-remoting.h"
-#include "base/dictionary.h"
-
-namespace icinga
-{
-
-/**
- * A part of an RPC message.
- *
- * @ingroup remoting
- */
-class I2_REMOTING_API MessagePart
-{
-public:
- MessagePart(void);
- MessagePart(const MessagePart& message);
- explicit MessagePart(const Dictionary::Ptr& dictionary);
-
- Dictionary::Ptr GetDictionary(void) const;
-
- /**
- * Retrieves a property's value.
- *
- * @param key The name of the property.
- * @param[out] value The value.
- * @returns true if the value was retrieved, false otherwise.
- */
- template<typename T>
- bool Get(String key, T *value) const
- {
- Value v = GetDictionary()->Get(key);
-
- if (v.IsEmpty())
- return false;
-
- *value = static_cast<T>(v);
- return true;
- }
-
- /**
- * Sets a property's value.
- *
- * @param key The name of the property.
- * @param value The value.
- */
- template<typename T>
- void Set(String key, const T& value)
- {
- GetDictionary()->Set(key, value);
- }
-
- bool Get(String key, MessagePart *value) const;
- void Set(String key, const MessagePart& value);
-
- bool Contains(const String& key) const;
-
- Dictionary::Iterator Begin(void);
- Dictionary::Iterator End(void);
-
-private:
- Dictionary::Ptr m_Dictionary;
-};
-
-}
-
-#endif /* MESSAGEPART_H */
+++ /dev/null
-/******************************************************************************
- * Icinga 2 *
- * Copyright (C) 2012 Icinga Development Team (http://www.icinga.org/) *
- * *
- * This program is free software; you can redistribute it and/or *
- * modify it under the terms of the GNU General Public License *
- * as published by the Free Software Foundation; either version 2 *
- * of the License, or (at your option) any later version. *
- * *
- * This program is distributed in the hope that it will be useful, *
- * but WITHOUT ANY WARRANTY; without even the implied warranty of *
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
- * GNU General Public License for more details. *
- * *
- * You should have received a copy of the GNU General Public License *
- * along with this program; if not, write to the Free Software Foundation *
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
- ******************************************************************************/
-
-#include "remoting/requestmessage.h"
-
-using namespace icinga;
+++ /dev/null
-/******************************************************************************
- * Icinga 2 *
- * Copyright (C) 2012 Icinga Development Team (http://www.icinga.org/) *
- * *
- * This program is free software; you can redistribute it and/or *
- * modify it under the terms of the GNU General Public License *
- * as published by the Free Software Foundation; either version 2 *
- * of the License, or (at your option) any later version. *
- * *
- * This program is distributed in the hope that it will be useful, *
- * but WITHOUT ANY WARRANTY; without even the implied warranty of *
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
- * GNU General Public License for more details. *
- * *
- * You should have received a copy of the GNU General Public License *
- * along with this program; if not, write to the Free Software Foundation *
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
- ******************************************************************************/
-
-#ifndef REQUESTMESSAGE_H
-#define REQUESTMESSAGE_H
-
-#include "remoting/i2-remoting.h"
-#include "remoting/messagepart.h"
-
-namespace icinga
-{
-
-/**
- * A JSON-RPC request message.
- *
- * @ingroup remoting
- */
-class I2_REMOTING_API RequestMessage : public MessagePart
-{
-public:
- /**
- * Constructor for the RequestMessage class.
- */
- RequestMessage(void) : MessagePart() {
- SetVersion("2.0");
- }
-
- /**
- * Copy-constructor for the RequestMessage class.
- *
- * @param message The message that is to be copied.
- */
- RequestMessage(const MessagePart& message) : MessagePart(message) { }
-
- /**
- * Retrieves the version of the JSON-RPC protocol.
- *
- * @param[out] value The value.
- * @returns true if the value was retrieved, false otherwise.
- */
- inline bool GetVersion(String *value) const
- {
- return Get("jsonrpc", value);
- }
-
- /**
- * Sets the version of the JSON-RPC protocol that should be used.
- *
- * @param value The version.
- */
- inline void SetVersion(const String& value)
- {
- Set("jsonrpc", value);
- }
-
- /**
- * Retrieves the method of the JSON-RPC call.
- *
- * @param[out] value The method.
- * @returns true if the value was retrieved, false otherwise.
- */
- inline bool GetMethod(String *value) const
- {
- return Get("method", value);
- }
-
- /**
- * Sets the method for the JSON-RPC call.
- *
- * @param value The method.
- */
- inline void SetMethod(const String& value)
- {
- Set("method", value);
- }
-
- /**
- * Retrieves the parameters of the JSON-RPC call.
- *
- * @param[out] value The parameters.
- * @returns true if the value was retrieved, false otherwise.
- */
- inline bool GetParams(MessagePart *value) const
- {
- return Get("params", value);
- }
-
- /**
- * Sets the parameters for the JSON-RPC call.
- *
- * @param value The parameters.
- */
- inline void SetParams(const MessagePart& value)
- {
- Set("params", value);
- }
-
- /**
- * Retrieves the ID of the JSON-RPC call.
- *
- * @param[out] value The ID.
- * @return true if the value was retrieved, false otherwise.
- */
- inline bool GetID(String *value) const
- {
- return Get("id", value);
- }
-
- /**
- * Sets the ID for the JSON-RPC call.
- *
- * @param value The ID.
- */
- inline void SetID(const String& value)
- {
- Set("id", value);
- }
-};
-
-}
-
-#endif /* REQUESTMESSAGE_H */
+++ /dev/null
-/******************************************************************************
- * Icinga 2 *
- * Copyright (C) 2012 Icinga Development Team (http://www.icinga.org/) *
- * *
- * This program is free software; you can redistribute it and/or *
- * modify it under the terms of the GNU General Public License *
- * as published by the Free Software Foundation; either version 2 *
- * of the License, or (at your option) any later version. *
- * *
- * This program is distributed in the hope that it will be useful, *
- * but WITHOUT ANY WARRANTY; without even the implied warranty of *
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
- * GNU General Public License for more details. *
- * *
- * You should have received a copy of the GNU General Public License *
- * along with this program; if not, write to the Free Software Foundation *
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
- ******************************************************************************/
-
-#include "remoting/responsemessage.h"
-
-using namespace icinga;
+++ /dev/null
-/******************************************************************************
- * Icinga 2 *
- * Copyright (C) 2012 Icinga Development Team (http://www.icinga.org/) *
- * *
- * This program is free software; you can redistribute it and/or *
- * modify it under the terms of the GNU General Public License *
- * as published by the Free Software Foundation; either version 2 *
- * of the License, or (at your option) any later version. *
- * *
- * This program is distributed in the hope that it will be useful, *
- * but WITHOUT ANY WARRANTY; without even the implied warranty of *
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
- * GNU General Public License for more details. *
- * *
- * You should have received a copy of the GNU General Public License *
- * along with this program; if not, write to the Free Software Foundation *
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
- ******************************************************************************/
-
-#ifndef RESPONSEMESSAGE_H
-#define RESPONSEMESSAGE_H
-
-#include "remoting/i2-remoting.h"
-#include "remoting/messagepart.h"
-
-namespace icinga
-{
-
-/**
- * A JSON-RPC response message.
- *
- * @ingroup remoting
- */
-class I2_REMOTING_API ResponseMessage : public MessagePart
-{
-public:
- /**
- * Constructor for the ResponseMessage class.
- */
- ResponseMessage(void) : MessagePart() {
- SetVersion("2.0");
- }
-
- /**
- * Copy-constructor for the ResponseMessage class.
- *
- * @param message The message that should be copied.
- */
- ResponseMessage(const MessagePart& message) : MessagePart(message) { }
-
- /**
- * Retrieves the version of the JSON-RPC protocol.
- *
- * @param[out] value The value.
- * @returns true if the value was retrieved, false otherwise.
- */
- inline bool GetVersion(String *value) const
- {
- return Get("jsonrpc", value);
- }
-
- /**
- * Sets the version of the JSON-RPC protocol that should be used.
- *
- * @param value The version.
- */
- inline void SetVersion(const String& value)
- {
- Set("jsonrpc", value);
- }
-
- /**
- * Retrieves the result of the JSON-RPC call.
- *
- * @param[out] value The result.
- * @returns true if the value was retrieved, false otherwise.
- */
- bool GetResult(MessagePart *value) const
- {
- return Get("result", value);
- }
-
- /**
- * Sets the result for the JSON-RPC call.
- *
- * @param value The result.
- */
- void SetResult(const MessagePart& value)
- {
- Set("result", value);
- }
-
- /**
- * Retrieves the error message of the JSON-RPC call.
- *
- * @param[out] value The error message.
- * @returns true if the value was retrieved, false otherwise.
- */
- bool GetError(String *value) const
- {
- return Get("error", value);
- }
-
- /**
- * Sets the error message for the JSON-RPC call.
- *
- * @param value The error message.
- */
- void SetError(const String& value)
- {
- Set("error", value);
- }
-
- /**
- * Retrieves the ID of the JSON-RPC call.
- *
- * @param[out] value The ID.
- * @return true if the value was retrieved, false otherwise.
- */
- bool GetID(String *value) const
- {
- return Get("id", value);
- }
-
- /**
- * Sets the ID for the JSON-RPC call.
- *
- * @param value The ID.
- */
- void SetID(const String& value)
- {
- Set("id", value);
- }
-
- /**
- * Checks whether a message is a response message.
- *
- * @param message The message.
- * @returns true if the message is a response message, false otherwise.
- */
- static bool IsResponseMessage(const MessagePart& message)
- {
- return (message.Contains("result"));
- }
-};
-
-}
-
-#endif /* RESPONSEMESSAGE_H */