From: Gunnar Beutner Date: Mon, 26 Aug 2013 14:53:17 +0000 (+0200) Subject: Clean up JSON-RPC library. X-Git-Tag: v0.0.3~685 X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=45b8d9646118d4eb9dd5ec66a1e72797371a5244;p=icinga2 Clean up JSON-RPC library. --- diff --git a/components/notification/notificationcomponent.cpp b/components/notification/notificationcomponent.cpp index c2cf58870..ac1ebe08f 100644 --- a/components/notification/notificationcomponent.cpp +++ b/components/notification/notificationcomponent.cpp @@ -24,6 +24,7 @@ #include "base/logger_fwd.h" #include "base/utility.h" #include +#include #include using namespace icinga; diff --git a/lib/icinga/service.h b/lib/icinga/service.h index 0b3c6fc57..b0347917d 100644 --- a/lib/icinga/service.h +++ b/lib/icinga/service.h @@ -25,8 +25,6 @@ #include "icinga/host.h" #include "icinga/timeperiod.h" #include "icinga/notification.h" -#include "remoting/requestmessage.h" -#include "remoting/endpoint.h" #include "base/i2-base.h" #include "base/dynamicobject.h" #include "base/array.h" diff --git a/lib/remoting/Makefile.am b/lib/remoting/Makefile.am index bbe2c7792..dc15e18d7 100644 --- a/lib/remoting/Makefile.am +++ b/lib/remoting/Makefile.am @@ -15,13 +15,7 @@ libremoting_la_SOURCES = \ i2-remoting.h \ jsonrpc.cpp \ jsonrpc.h \ - messagepart.cpp \ - messagepart.h \ - remoting-type.cpp \ - requestmessage.cpp \ - requestmessage.h \ - responsemessage.cpp \ - responsemessage.h + remoting-type.cpp libremoting_la_CPPFLAGS = \ -DI2_REMOTING_BUILD \ diff --git a/lib/remoting/endpoint.cpp b/lib/remoting/endpoint.cpp index fe6820af0..c6d5c1f5c 100644 --- a/lib/remoting/endpoint.cpp +++ b/lib/remoting/endpoint.cpp @@ -33,37 +33,7 @@ using namespace icinga; REGISTER_TYPE(Endpoint); boost::signals2::signal Endpoint::OnConnected; - -/** - * Helper function for creating new endpoint objects. - * - * @param name The name of the new endpoint. - * @param replicated Whether replication is enabled for the endpoint object. - * @param local Whether the new endpoint should be local. - * @returns The new endpoint. - */ -Endpoint::Ptr Endpoint::MakeEndpoint(const String& name, bool replicated, bool local) -{ - ConfigItemBuilder::Ptr endpointConfig = boost::make_shared(); - endpointConfig->SetType("Endpoint"); - endpointConfig->SetName((!replicated && local) ? "local:" + name : name); - //TODO: endpointConfig->SetLocal(!replicated); - endpointConfig->AddExpression("local", OperatorSet, local); - - ConfigItem::Ptr item = endpointConfig->Compile(); - DynamicObject::Ptr object = item->Commit(); - return dynamic_pointer_cast(object); -} - -/** - * Checks whether this is a local endpoint. - * - * @returns true if this is a local endpoint, false otherwise. - */ -bool Endpoint::IsLocalEndpoint(void) const -{ - return m_Local; -} +boost::signals2::signal Endpoint::OnMessageReceived; /** * Checks whether this endpoint is connected. @@ -72,11 +42,7 @@ bool Endpoint::IsLocalEndpoint(void) const */ bool Endpoint::IsConnected(void) const { - if (IsLocalEndpoint()) { - return true; - } else { - return GetClient(); - } + return GetClient(); } Stream::Ptr Endpoint::GetClient(void) const @@ -100,159 +66,28 @@ void Endpoint::SetClient(const Stream::Ptr& client) OnConnected(GetSelf()); } -/** - * Registers a topic subscription for this endpoint. - * - * @param topic The name of the topic. - */ -void Endpoint::RegisterSubscription(const String& topic) -{ - Dictionary::Ptr subscriptions = GetSubscriptions(); - - if (!subscriptions) - subscriptions = boost::make_shared(); - - if (!subscriptions->Contains(topic)) { - Dictionary::Ptr newSubscriptions = subscriptions->ShallowClone(); - newSubscriptions->Set(topic, topic); - - ObjectLock olock(this); - SetSubscriptions(newSubscriptions); - } -} - -/** - * Removes a topic subscription from this endpoint. - * - * @param topic The name of the topic. - */ -void Endpoint::UnregisterSubscription(const String& topic) -{ - Dictionary::Ptr subscriptions = GetSubscriptions(); - - if (!subscriptions) - return; - - if (subscriptions->Contains(topic)) { - Dictionary::Ptr newSubscriptions = subscriptions->ShallowClone(); - newSubscriptions->Remove(topic); - SetSubscriptions(newSubscriptions); - } -} - -/** - * Checks whether the endpoint has a subscription for the specified topic. - * - * @param topic The name of the topic. - * @returns true if the endpoint is subscribed to the topic, false otherwise. - */ -bool Endpoint::HasSubscription(const String& topic) const -{ - Dictionary::Ptr subscriptions = GetSubscriptions(); - - return (subscriptions && subscriptions->Contains(topic)); -} - -/** - * Removes all subscriptions for the endpoint. - */ -void Endpoint::ClearSubscriptions(void) -{ - m_Subscriptions = Empty; -} - -Dictionary::Ptr Endpoint::GetSubscriptions(void) const -{ - return m_Subscriptions; -} - -void Endpoint::SetSubscriptions(const Dictionary::Ptr& subscriptions) -{ - subscriptions->Seal(); - m_Subscriptions = subscriptions; -} - -void Endpoint::RegisterTopicHandler(const String& topic, const boost::function& callback) -{ - ObjectLock olock(this); - - std::map > >::iterator it; - it = m_TopicHandlers.find(topic); - - shared_ptr > sig; - - if (it == m_TopicHandlers.end()) { - sig = boost::make_shared >(); - m_TopicHandlers.insert(make_pair(topic, sig)); - } else { - sig = it->second; - } - - sig->connect(callback); - - olock.Unlock(); - - RegisterSubscription(topic); -} - -void Endpoint::ProcessRequest(const Endpoint::Ptr& sender, const RequestMessage& request) +void Endpoint::SendMessage(const Dictionary::Ptr& message) { if (!IsConnected()) { // TODO: persist the message return; } - if (IsLocalEndpoint()) { - ObjectLock olock(this); - - String method; - if (!request.GetMethod(&method)) - return; - - std::map > >::iterator it; - it = m_TopicHandlers.find(method); - - if (it == m_TopicHandlers.end()) - return; - - Utility::QueueAsyncCallback(boost::bind(boost::ref(*it->second), GetSelf(), sender, request)); - } else { - try { - JsonRpc::SendMessage(GetClient(), request); - } catch (const std::exception& ex) { - std::ostringstream msgbuf; - msgbuf << "Error while sending JSON-RPC message for endpoint '" << GetName() << "': " << boost::diagnostic_information(ex); - Log(LogWarning, "remoting", msgbuf.str()); - - m_Client.reset(); - } - } -} - -void Endpoint::ProcessResponse(const Endpoint::Ptr& sender, const ResponseMessage& response) -{ - if (!IsConnected()) - return; + try { + JsonRpc::SendMessage(GetClient(), message); + } catch (const std::exception& ex) { + std::ostringstream msgbuf; + msgbuf << "Error while sending JSON-RPC message for endpoint '" << GetName() << "': " << boost::diagnostic_information(ex); + Log(LogWarning, "remoting", msgbuf.str()); - if (IsLocalEndpoint()) - EndpointManager::GetInstance()->ProcessResponseMessage(sender, response); - else { - try { - JsonRpc::SendMessage(GetClient(), response); - } catch (const std::exception& ex) { - std::ostringstream msgbuf; - msgbuf << "Error while sending JSON-RPC message for endpoint '" << GetName() << "': " << boost::diagnostic_information(ex); - Log(LogWarning, "remoting", msgbuf.str()); - - m_Client.reset(); - } + m_Client.reset(); } } void Endpoint::MessageThreadProc(const Stream::Ptr& stream) { for (;;) { - MessagePart message; + Dictionary::Ptr message; try { message = JsonRpc::ReadMessage(stream); @@ -262,26 +97,7 @@ void Endpoint::MessageThreadProc(const Stream::Ptr& stream) m_Client.reset(); } - Endpoint::Ptr sender = GetSelf(); - - if (ResponseMessage::IsResponseMessage(message)) { - /* rather than routing the message to the right virtual - * endpoint we just process it here right away. */ - EndpointManager::GetInstance()->ProcessResponseMessage(sender, message); - return; - } - - RequestMessage request = message; - - String method; - if (!request.GetMethod(&method)) - return; - - String id; - if (request.GetID(&id)) - EndpointManager::GetInstance()->SendAnycastMessage(sender, request); - else - EndpointManager::GetInstance()->SendMulticastMessage(sender, request); + Utility::QueueAsyncCallback(bind(boost::ref(Endpoint::OnMessageReceived), GetSelf(), message)); } } @@ -290,11 +106,11 @@ void Endpoint::MessageThreadProc(const Stream::Ptr& stream) * * @returns The node address (hostname). */ -String Endpoint::GetNode(void) const +String Endpoint::GetHost(void) const { ObjectLock olock(this); - return m_Node; + return m_Host; } /** @@ -302,11 +118,11 @@ String Endpoint::GetNode(void) const * * @returns The service name (port). */ -String Endpoint::GetService(void) const +String Endpoint::GetPort(void) const { ObjectLock olock(this); - return m_Service; + return m_Port; } void Endpoint::InternalSerialize(const Dictionary::Ptr& bag, int attributeTypes) const @@ -315,11 +131,9 @@ void Endpoint::InternalSerialize(const Dictionary::Ptr& bag, int attributeTypes) if (attributeTypes & Attribute_Config) { bag->Set("local", m_Local); - bag->Set("node", m_Node); - bag->Set("service", m_Service); + bag->Set("host", m_Host); + bag->Set("port", m_Port); } - - bag->Set("subscriptions", m_Subscriptions); } void Endpoint::InternalDeserialize(const Dictionary::Ptr& bag, int attributeTypes) @@ -328,9 +142,7 @@ void Endpoint::InternalDeserialize(const Dictionary::Ptr& bag, int attributeType if (attributeTypes & Attribute_Config) { m_Local = bag->Get("local"); - m_Node = bag->Get("node"); - m_Service = bag->Get("service"); + m_Host = bag->Get("host"); + m_Port = bag->Get("port"); } - - bag->Set("subscriptions", m_Subscriptions); -} \ No newline at end of file +} diff --git a/lib/remoting/endpoint.h b/lib/remoting/endpoint.h index 564515fda..22899ed35 100644 --- a/lib/remoting/endpoint.h +++ b/lib/remoting/endpoint.h @@ -21,8 +21,6 @@ #define ENDPOINT_H #include "remoting/i2-remoting.h" -#include "remoting/requestmessage.h" -#include "remoting/responsemessage.h" #include "base/dynamicobject.h" #include "base/stream.h" #include @@ -43,34 +41,18 @@ public: DECLARE_PTR_TYPEDEFS(Endpoint); DECLARE_TYPENAME(Endpoint); - typedef void (Callback)(const Endpoint::Ptr&, const Endpoint::Ptr&, const RequestMessage&); + static boost::signals2::signal OnConnected; + static boost::signals2::signal OnMessageReceived; Stream::Ptr GetClient(void) const; void SetClient(const Stream::Ptr& client); - void RegisterSubscription(const String& topic); - void UnregisterSubscription(const String& topic); - bool HasSubscription(const String& topic) const; - - Dictionary::Ptr GetSubscriptions(void) const; - void SetSubscriptions(const Dictionary::Ptr& subscriptions); - - bool IsLocalEndpoint(void) const; bool IsConnected(void) const; - void ProcessRequest(const Endpoint::Ptr& sender, const RequestMessage& message); - void ProcessResponse(const Endpoint::Ptr& sender, const ResponseMessage& message); - - void ClearSubscriptions(void); - - void RegisterTopicHandler(const String& topic, const boost::function& callback); + void SendMessage(const Dictionary::Ptr& request); - String GetNode(void) const; - String GetService(void) const; - - static Endpoint::Ptr MakeEndpoint(const String& name, bool replicated, bool local = true); - - static boost::signals2::signal OnConnected; + String GetHost(void) const; + String GetPort(void) const; protected: virtual void InternalSerialize(const Dictionary::Ptr& bag, int attributeTypes) const; @@ -79,18 +61,11 @@ protected: private: bool m_Local; Dictionary::Ptr m_Subscriptions; - String m_Node; - String m_Service; + String m_Host; + String m_Port; Stream::Ptr m_Client; - bool m_ReceivedWelcome; /**< Have we received a welcome message - from this endpoint? */ - bool m_SentWelcome; /**< Have we sent a welcome message to this - endpoint? */ - - std::map > > m_TopicHandlers; - void MessageThreadProc(const Stream::Ptr& stream); }; diff --git a/lib/remoting/endpointmanager.cpp b/lib/remoting/endpointmanager.cpp index 3a20d5a6b..b41d974b2 100644 --- a/lib/remoting/endpointmanager.cpp +++ b/lib/remoting/endpointmanager.cpp @@ -34,18 +34,7 @@ using namespace icinga; * Constructor for the EndpointManager class. */ EndpointManager::EndpointManager(void) - : m_NextMessageID(0) { - m_RequestTimer = boost::make_shared(); - m_RequestTimer->OnTimerExpired.connect(boost::bind(&EndpointManager::RequestTimerHandler, this)); - m_RequestTimer->SetInterval(5); - m_RequestTimer->Start(); - - m_SubscriptionTimer = boost::make_shared(); - m_SubscriptionTimer->OnTimerExpired.connect(boost::bind(&EndpointManager::SubscriptionTimerHandler, this)); - m_SubscriptionTimer->SetInterval(5); - m_SubscriptionTimer->Start(); - m_ReconnectTimer = boost::make_shared(); m_ReconnectTimer->OnTimerExpired.connect(boost::bind(&EndpointManager::ReconnectTimerHandler, this)); m_ReconnectTimer->SetInterval(5); @@ -87,16 +76,6 @@ void EndpointManager::SetIdentity(const String& identity) ObjectLock olock(this); m_Identity = identity; - - if (m_Endpoint) - m_Endpoint->Stop(); - - Endpoint::Ptr endpoint = DynamicObject::GetObject(identity); - - if (endpoint) - m_Endpoint = endpoint; - else - m_Endpoint = Endpoint::MakeEndpoint(identity, true, true); } /** @@ -200,175 +179,173 @@ void EndpointManager::NewClientHandler(const Socket::Ptr& client, TlsRole role) Endpoint::Ptr endpoint = Endpoint::GetByName(identity); - if (!endpoint) - endpoint = Endpoint::MakeEndpoint(identity, true); - - endpoint->SetClient(tlsStream); -} - -/** - * Sends an anonymous unicast message to the specified recipient. - * - * @param recipient The recipient of the message. - * @param message The message. - */ -void EndpointManager::SendUnicastMessage(const Endpoint::Ptr& recipient, - const MessagePart& message) -{ - SendUnicastMessage(Endpoint::Ptr(), recipient, message); -} - -/** - * Sends a unicast message to the specified recipient. - * - * @param sender The sender of the message. - * @param recipient The recipient of the message. - * @param message The message. - */ -void EndpointManager::SendUnicastMessage(const Endpoint::Ptr& sender, - const Endpoint::Ptr& recipient, const MessagePart& message) -{ - /* don't forward messages between non-local endpoints, assume that - * anonymous senders (sender == null) are local */ -// if ((sender && !sender->IsLocal()) && !recipient->IsLocal()) -// return; - - if (ResponseMessage::IsResponseMessage(message)) - recipient->ProcessResponse(sender, message); - else - recipient->ProcessRequest(sender, message); -} - -/** - * Sends a message to exactly one recipient out of all recipients who have a - * subscription for the message's topic. - * - * @param sender The sender of the message. - * @param message The message. - */ -void EndpointManager::SendAnycastMessage(const Endpoint::Ptr& sender, - const RequestMessage& message) -{ - String method; - if (!message.GetMethod(&method)) - BOOST_THROW_EXCEPTION(std::invalid_argument("Message is missing the 'method' property.")); - - std::vector candidates; - - BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects()) { - /* don't forward messages between non-local endpoints */ -// if ((sender && !sender->IsLocal()) && !endpoint->IsLocal()) -// continue; - - if (endpoint->HasSubscription(method)) - candidates.push_back(endpoint); - } - - if (candidates.empty()) + if (!endpoint) { + Log(LogInformation, "remoting", "Closing endpoint '" + identity + "': No configuration available."); return; - - Endpoint::Ptr recipient = candidates[rand() % candidates.size()]; - SendUnicastMessage(sender, recipient, message); -} - -/** - * Sends an anonymous message to all recipients who have a subscription for the - * message's topic. - * - * @param message The message. - */ -void EndpointManager::SendMulticastMessage(const RequestMessage& message) -{ - SendMulticastMessage(Endpoint::Ptr(), message); -} - -/** - * Sends a message to all recipients who have a subscription for the - * message's topic. - * - * @param sender The sender of the message. - * @param message The message. - */ -void EndpointManager::SendMulticastMessage(const Endpoint::Ptr& sender, - const RequestMessage& message) -{ - String id; - if (message.GetID(&id)) - BOOST_THROW_EXCEPTION(std::invalid_argument("Multicast requests must not have an ID.")); - - String method; - if (!message.GetMethod(&method)) - BOOST_THROW_EXCEPTION(std::invalid_argument("Message is missing the 'method' property.")); - - BOOST_FOREACH(const Endpoint::Ptr& recipient, DynamicType::GetObjects()) { - /* don't forward messages back to the sender */ - if (sender == recipient) - continue; - - Log(LogDebug, "remoting", "Send multicast message using method " + method); - if (recipient->HasSubscription(method)) - SendUnicastMessage(sender, recipient, message); } -} - -void EndpointManager::SendAPIMessage(const Endpoint::Ptr& sender, const Endpoint::Ptr& recipient, - RequestMessage& message, - const EndpointManager::APICallback& callback, double timeout) -{ - ObjectLock olock(this); - - m_NextMessageID++; - - String id = Convert::ToString(m_NextMessageID); - message.SetID(id); - - PendingRequest pr; - pr.Request = message; - pr.Callback = callback; - pr.Timeout = Utility::GetTime() + timeout; - - m_Requests[id] = pr; - - if (!recipient) - SendAnycastMessage(sender, message); - else - SendUnicastMessage(sender, recipient, message); -} -bool EndpointManager::RequestTimeoutLessComparer(const std::pair& a, - const std::pair& b) -{ - return a.second.Timeout < b.second.Timeout; + endpoint->SetClient(tlsStream); } -void EndpointManager::SubscriptionTimerHandler(void) -{ - Dictionary::Ptr subscriptions = boost::make_shared(); - - BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects()) { - /* don't copy subscriptions from non-local endpoints or the identity endpoint */ -// if (!endpoint->IsLocalEndpoint() || endpoint == m_Endpoint) +///** +// * Sends an anonymous unicast message to the specified recipient. +// * +// * @param recipient The recipient of the message. +// * @param message The message. +// */ +//void EndpointManager::SendUnicastMessage(const Endpoint::Ptr& recipient, +// const MessagePart& message) +//{ +// SendUnicastMessage(Endpoint::Ptr(), recipient, message); +//} + +///** +// * Sends a unicast message to the specified recipient. +// * +// * @param sender The sender of the message. +// * @param recipient The recipient of the message. +// * @param message The message. +// */ +//void EndpointManager::SendUnicastMessage(const Endpoint::Ptr& sender, +// const Endpoint::Ptr& recipient, const MessagePart& message) +//{ +// /* don't forward messages between non-local endpoints, assume that +// * anonymous senders (sender == null) are local */ +//// if ((sender && !sender->IsLocal()) && !recipient->IsLocal()) +//// return; +// +// if (ResponseMessage::IsResponseMessage(message)) +// recipient->ProcessResponse(sender, message); +// else +// recipient->ProcessRequest(sender, message); +//} + +///** +// * Sends a message to exactly one recipient out of all recipients who have a +// * subscription for the message's topic. +// * +// * @param sender The sender of the message. +// * @param message The message. +// */ +//void EndpointManager::SendAnycastMessage(const Endpoint::Ptr& sender, +// const RequestMessage& message) +//{ +// String method; +// if (!message.GetMethod(&method)) +// BOOST_THROW_EXCEPTION(std::invalid_argument("Message is missing the 'method' property.")); +// +// std::vector candidates; +// +// BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects()) { +// /* don't forward messages between non-local endpoints */ +//// if ((sender && !sender->IsLocal()) && !endpoint->IsLocal()) +//// continue; +// +// if (endpoint->HasSubscription(method)) +// candidates.push_back(endpoint); +// } +// +// if (candidates.empty()) +// return; +// +// Endpoint::Ptr recipient = candidates[rand() % candidates.size()]; +// SendUnicastMessage(sender, recipient, message); +//} + +///** +// * Sends an anonymous message to all recipients who have a subscription for the +// * message's topic. +// * +// * @param message The message. +// */ +//void EndpointManager::SendMulticastMessage(const RequestMessage& message) +//{ +// SendMulticastMessage(Endpoint::Ptr(), message); +//} + +///** +// * Sends a message to all recipients who have a subscription for the +// * message's topic. +// * +// * @param sender The sender of the message. +// * @param message The message. +// */ +//void EndpointManager::SendBroadcastMessage(const Endpoint::Ptr& sender, +// const RequestMessage& message) +//{ +// String method; +// if (!message.GetMethod(&method)) +// BOOST_THROW_EXCEPTION(std::invalid_argument("Message is missing the 'method' property.")); +// +// BOOST_FOREACH(const Endpoint::Ptr& recipient, DynamicType::GetObjects()) { +// /* don't forward messages back to the sender */ +// if (sender == recipient) // continue; - - Dictionary::Ptr endpointSubscriptions = endpoint->GetSubscriptions(); - - if (endpointSubscriptions) { - ObjectLock olock(endpointSubscriptions); - - String topic; - BOOST_FOREACH(boost::tie(boost::tuples::ignore, topic), endpointSubscriptions) { - subscriptions->Set(topic, topic); - } - } - } - - subscriptions->Seal(); - - if (m_Endpoint) { - ObjectLock olock(m_Endpoint); - m_Endpoint->SetSubscriptions(subscriptions); - } -} +// +// Log(LogDebug, "remoting", "Send multicast message using method " + method); +// if (recipient->HasSubscription(method)) +// SendUnicastMessage(sender, recipient, message); +// } +//} + +//void EndpointManager::SendAPIMessage(const Endpoint::Ptr& sender, const Endpoint::Ptr& recipient, +// RequestMessage& message, +// const EndpointManager::APICallback& callback, double timeout) +//{ +// ObjectLock olock(this); +// +// m_NextMessageID++; +// +// String id = Convert::ToString(m_NextMessageID); +// message.SetID(id); +// +// PendingRequest pr; +// pr.Request = message; +// pr.Callback = callback; +// pr.Timeout = Utility::GetTime() + timeout; +// +// m_Requests[id] = pr; +// +// if (!recipient) +// SendAnycastMessage(sender, message); +// else +// SendUnicastMessage(sender, recipient, message); +//} +// +//bool EndpointManager::RequestTimeoutLessComparer(const std::pair& a, +// const std::pair& b) +//{ +// return a.second.Timeout < b.second.Timeout; +//} +// +//void EndpointManager::SubscriptionTimerHandler(void) +//{ +// Dictionary::Ptr subscriptions = boost::make_shared(); +// +// BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects()) { +// /* don't copy subscriptions from non-local endpoints or the identity endpoint */ +//// if (!endpoint->IsLocalEndpoint() || endpoint == m_Endpoint) +//// continue; +// +// Dictionary::Ptr endpointSubscriptions = endpoint->GetSubscriptions(); +// +// if (endpointSubscriptions) { +// ObjectLock olock(endpointSubscriptions); +// +// String topic; +// BOOST_FOREACH(boost::tie(boost::tuples::ignore, topic), endpointSubscriptions) { +// subscriptions->Set(topic, topic); +// } +// } +// } +// +// subscriptions->Seal(); +// +// if (m_Endpoint) { +// ObjectLock olock(m_Endpoint); +// m_Endpoint->SetSubscriptions(subscriptions); +// } +//} void EndpointManager::ReconnectTimerHandler(void) { @@ -376,57 +353,57 @@ void EndpointManager::ReconnectTimerHandler(void) if (endpoint->IsConnected() || endpoint == m_Endpoint) continue; - String node, service; - node = endpoint->GetNode(); - service = endpoint->GetService(); + String host, port; + host = endpoint->GetHost(); + port = endpoint->GetPort(); - if (node.IsEmpty() || service.IsEmpty()) { + if (host.IsEmpty() || port.IsEmpty()) { Log(LogWarning, "icinga", "Can't reconnect " "to endpoint '" + endpoint->GetName() + "': No " - "node/service information."); + "host/port information."); continue; } - AddConnection(node, service); + AddConnection(host, port); } } -void EndpointManager::RequestTimerHandler(void) -{ - ObjectLock olock(this); - - std::map::iterator it; - for (it = m_Requests.begin(); it != m_Requests.end(); ++it) { - if (it->second.HasTimedOut()) { - it->second.Callback(Endpoint::Ptr(), it->second.Request, - ResponseMessage(), true); - - m_Requests.erase(it); - - break; - } - } -} - -void EndpointManager::ProcessResponseMessage(const Endpoint::Ptr& sender, - const ResponseMessage& message) -{ - ObjectLock olock(this); - - String id; - if (!message.GetID(&id)) - BOOST_THROW_EXCEPTION(std::invalid_argument("Response message must have a message ID.")); - - std::map::iterator it; - it = m_Requests.find(id); - - if (it == m_Requests.end()) - return; - - it->second.Callback(sender, it->second.Request, message, false); - - m_Requests.erase(it); -} +//void EndpointManager::RequestTimerHandler(void) +//{ +// ObjectLock olock(this); +// +// std::map::iterator it; +// for (it = m_Requests.begin(); it != m_Requests.end(); ++it) { +// if (it->second.HasTimedOut()) { +// it->second.Callback(Endpoint::Ptr(), it->second.Request, +// ResponseMessage(), true); +// +// m_Requests.erase(it); +// +// break; +// } +// } +//} + +//void EndpointManager::ProcessResponseMessage(const Endpoint::Ptr& sender, +// const ResponseMessage& message) +//{ +// ObjectLock olock(this); +// +// String id; +// if (!message.GetID(&id)) +// BOOST_THROW_EXCEPTION(std::invalid_argument("Response message must have a message ID.")); +// +// std::map::iterator it; +// it = m_Requests.find(id); +// +// if (it == m_Requests.end()) +// return; +// +// it->second.Callback(sender, it->second.Request, message, false); +// +// m_Requests.erase(it); +//} EndpointManager *EndpointManager::GetInstance(void) { diff --git a/lib/remoting/endpointmanager.h b/lib/remoting/endpointmanager.h index 7de738541..7fa5d6653 100644 --- a/lib/remoting/endpointmanager.h +++ b/lib/remoting/endpointmanager.h @@ -54,18 +54,11 @@ public: void AddListener(const String& service); void AddConnection(const String& node, const String& service); - void SendUnicastMessage(const Endpoint::Ptr& recipient, const MessagePart& message); - void SendUnicastMessage(const Endpoint::Ptr& sender, const Endpoint::Ptr& recipient, const MessagePart& message); - void SendAnycastMessage(const Endpoint::Ptr& sender, const RequestMessage& message); - void SendMulticastMessage(const RequestMessage& message); - void SendMulticastMessage(const Endpoint::Ptr& sender, const RequestMessage& message); - - typedef boost::function APICallback; - - void SendAPIMessage(const Endpoint::Ptr& sender, const Endpoint::Ptr& recipient, RequestMessage& message, - const APICallback& callback, double timeout = 30); - - void ProcessResponseMessage(const Endpoint::Ptr& sender, const ResponseMessage& message); + //void SendUnicastMessage(const Endpoint::Ptr& recipient, const Dictionary::Ptr& message); + //void SendUnicastMessage(const Endpoint::Ptr& sender, const Endpoint::Ptr& recipient, const MessagePart& message); + //void SendAnycastMessage(const Endpoint::Ptr& sender, const RequestMessage& message); + //void SendMulticastMessage(const RequestMessage& message); + //void SendMulticastMessage(const Endpoint::Ptr& sender, const RequestMessage& message); boost::signals2::signal OnNewEndpoint; @@ -75,38 +68,10 @@ private: shared_ptr m_SSLContext; - Timer::Ptr m_SubscriptionTimer; - Timer::Ptr m_ReconnectTimer; std::set m_Servers; - /** - * Information about a pending API request. - * - * @ingroup remoting - */ - struct I2_REMOTING_API PendingRequest - { - double Timeout; - RequestMessage Request; - boost::function Callback; - - bool HasTimedOut(void) const - { - return Utility::GetTime() > Timeout; - } - }; - - long m_NextMessageID; - std::map m_Requests; - Timer::Ptr m_RequestTimer; - - static bool RequestTimeoutLessComparer(const std::pair& a, const std::pair& b); - void RequestTimerHandler(void); - - void SubscriptionTimerHandler(void); - void ReconnectTimerHandler(void); void NewClientHandler(const Socket::Ptr& client, TlsRole role); diff --git a/lib/remoting/jsonrpc.cpp b/lib/remoting/jsonrpc.cpp index 47d8d8535..73e9f1003 100644 --- a/lib/remoting/jsonrpc.cpp +++ b/lib/remoting/jsonrpc.cpp @@ -31,15 +31,14 @@ using namespace icinga; * * @param message The message. */ -void JsonRpc::SendMessage(const Stream::Ptr& stream, const MessagePart& message) +void JsonRpc::SendMessage(const Stream::Ptr& stream, const Dictionary::Ptr& message) { - Value value = message.GetDictionary(); - String json = value.Serialize(); + String json = Value(message).Serialize(); //std::cerr << ">> " << json << std::endl; NetString::WriteStringToStream(stream, json); } -MessagePart JsonRpc::ReadMessage(const Stream::Ptr& stream) +Dictionary::Ptr JsonRpc::ReadMessage(const Stream::Ptr& stream) { String jsonString; if (!NetString::ReadStringFromStream(stream, &jsonString)) @@ -53,5 +52,5 @@ MessagePart JsonRpc::ReadMessage(const Stream::Ptr& stream) " message must be a dictionary.")); } - return MessagePart(value); + return value; } diff --git a/lib/remoting/jsonrpc.h b/lib/remoting/jsonrpc.h index 31cf1ed49..ece2c096a 100644 --- a/lib/remoting/jsonrpc.h +++ b/lib/remoting/jsonrpc.h @@ -21,8 +21,8 @@ #define JSONRPC_H #include "remoting/i2-remoting.h" -#include "remoting/messagepart.h" #include "base/stream.h" +#include "base/dictionary.h" namespace icinga { @@ -35,8 +35,8 @@ namespace icinga class I2_REMOTING_API JsonRpc { public: - static void SendMessage(const Stream::Ptr& stream, const MessagePart& message); - static MessagePart ReadMessage(const Stream::Ptr& stream); + static void SendMessage(const Stream::Ptr& stream, const Dictionary::Ptr& message); + static Dictionary::Ptr ReadMessage(const Stream::Ptr& stream); private: JsonRpc(void); diff --git a/lib/remoting/messagepart.cpp b/lib/remoting/messagepart.cpp deleted file mode 100644 index 69a1f5fc3..000000000 --- a/lib/remoting/messagepart.cpp +++ /dev/null @@ -1,127 +0,0 @@ -/****************************************************************************** - * Icinga 2 * - * Copyright (C) 2012 Icinga Development Team (http://www.icinga.org/) * - * * - * This program is free software; you can redistribute it and/or * - * modify it under the terms of the GNU General Public License * - * as published by the Free Software Foundation; either version 2 * - * of the License, or (at your option) any later version. * - * * - * This program is distributed in the hope that it will be useful, * - * but WITHOUT ANY WARRANTY; without even the implied warranty of * - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * - * GNU General Public License for more details. * - * * - * You should have received a copy of the GNU General Public License * - * along with this program; if not, write to the Free Software Foundation * - * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. * - ******************************************************************************/ - -#include "remoting/messagepart.h" -#include - -using namespace icinga; - -/** - * Constructor for the MessagePart class. - */ -MessagePart::MessagePart(void) -{ - m_Dictionary = boost::make_shared(); -} - -/** - * Constructor for the MessagePart class. - * - * @param dictionary The dictionary that this MessagePart object should wrap. - */ -MessagePart::MessagePart(const Dictionary::Ptr& dictionary) -{ - m_Dictionary = dictionary; -} - -/** - * Copy-constructor for the MessagePart class. - * - * @param message The message that should be copied. - */ -MessagePart::MessagePart(const MessagePart& message) -{ - m_Dictionary = message.GetDictionary(); -} - -/** - * Retrieves the underlying dictionary for this message. - * - * @returns A dictionary. - */ -Dictionary::Ptr MessagePart::GetDictionary(void) const -{ - return m_Dictionary; -} - -/** - * Retrieves a property's value. - * - * @param key The name of the property. - * @param[out] value The value. - * @returns true if the value was retrieved, false otherwise. - */ -bool MessagePart::Get(String key, MessagePart *value) const -{ - Value v; - v = GetDictionary()->Get(key); - - if (!v.IsObjectType()) - return false; - - Dictionary::Ptr dictionary = v; - - MessagePart mp(dictionary); - *value = mp; - return true; -} - -/** - * Sets a property's value. - * - * @param key The name of the property. - * @param value The value. - */ -void MessagePart::Set(String key, const MessagePart& value) -{ - GetDictionary()->Set(key, value.GetDictionary()); -} - -/** - * Returns an iterator that points to the first element of the dictionary - * which holds the properties for the message. - * - * @returns An iterator. - */ -Dictionary::Iterator MessagePart::Begin(void) -{ - return GetDictionary()->Begin(); -} - -/** - * Returns an iterator that points past the last element of the dictionary - * which holds the properties for the message. - * - * @returns An iterator. - */ -Dictionary::Iterator MessagePart::End(void) -{ - return GetDictionary()->End(); -} - -/** - * Checks whether the message contains the specified element. - * - * @param key The name of the element. - * @returns true if the message contains the element, false otherwise. - */ -bool MessagePart::Contains(const String& key) const -{ - return GetDictionary()->Contains(key); -} diff --git a/lib/remoting/messagepart.h b/lib/remoting/messagepart.h deleted file mode 100644 index 1468f895d..000000000 --- a/lib/remoting/messagepart.h +++ /dev/null @@ -1,88 +0,0 @@ -/****************************************************************************** - * Icinga 2 * - * Copyright (C) 2012 Icinga Development Team (http://www.icinga.org/) * - * * - * This program is free software; you can redistribute it and/or * - * modify it under the terms of the GNU General Public License * - * as published by the Free Software Foundation; either version 2 * - * of the License, or (at your option) any later version. * - * * - * This program is distributed in the hope that it will be useful, * - * but WITHOUT ANY WARRANTY; without even the implied warranty of * - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * - * GNU General Public License for more details. * - * * - * You should have received a copy of the GNU General Public License * - * along with this program; if not, write to the Free Software Foundation * - * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. * - ******************************************************************************/ - -#ifndef MESSAGEPART_H -#define MESSAGEPART_H - -#include "remoting/i2-remoting.h" -#include "base/dictionary.h" - -namespace icinga -{ - -/** - * A part of an RPC message. - * - * @ingroup remoting - */ -class I2_REMOTING_API MessagePart -{ -public: - MessagePart(void); - MessagePart(const MessagePart& message); - explicit MessagePart(const Dictionary::Ptr& dictionary); - - Dictionary::Ptr GetDictionary(void) const; - - /** - * Retrieves a property's value. - * - * @param key The name of the property. - * @param[out] value The value. - * @returns true if the value was retrieved, false otherwise. - */ - template - bool Get(String key, T *value) const - { - Value v = GetDictionary()->Get(key); - - if (v.IsEmpty()) - return false; - - *value = static_cast(v); - return true; - } - - /** - * Sets a property's value. - * - * @param key The name of the property. - * @param value The value. - */ - template - void Set(String key, const T& value) - { - GetDictionary()->Set(key, value); - } - - bool Get(String key, MessagePart *value) const; - void Set(String key, const MessagePart& value); - - bool Contains(const String& key) const; - - Dictionary::Iterator Begin(void); - Dictionary::Iterator End(void); - -private: - Dictionary::Ptr m_Dictionary; -}; - -} - -#endif /* MESSAGEPART_H */ diff --git a/lib/remoting/requestmessage.cpp b/lib/remoting/requestmessage.cpp deleted file mode 100644 index a530317ba..000000000 --- a/lib/remoting/requestmessage.cpp +++ /dev/null @@ -1,22 +0,0 @@ -/****************************************************************************** - * Icinga 2 * - * Copyright (C) 2012 Icinga Development Team (http://www.icinga.org/) * - * * - * This program is free software; you can redistribute it and/or * - * modify it under the terms of the GNU General Public License * - * as published by the Free Software Foundation; either version 2 * - * of the License, or (at your option) any later version. * - * * - * This program is distributed in the hope that it will be useful, * - * but WITHOUT ANY WARRANTY; without even the implied warranty of * - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * - * GNU General Public License for more details. * - * * - * You should have received a copy of the GNU General Public License * - * along with this program; if not, write to the Free Software Foundation * - * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. * - ******************************************************************************/ - -#include "remoting/requestmessage.h" - -using namespace icinga; diff --git a/lib/remoting/requestmessage.h b/lib/remoting/requestmessage.h deleted file mode 100644 index eb75752cb..000000000 --- a/lib/remoting/requestmessage.h +++ /dev/null @@ -1,138 +0,0 @@ -/****************************************************************************** - * Icinga 2 * - * Copyright (C) 2012 Icinga Development Team (http://www.icinga.org/) * - * * - * This program is free software; you can redistribute it and/or * - * modify it under the terms of the GNU General Public License * - * as published by the Free Software Foundation; either version 2 * - * of the License, or (at your option) any later version. * - * * - * This program is distributed in the hope that it will be useful, * - * but WITHOUT ANY WARRANTY; without even the implied warranty of * - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * - * GNU General Public License for more details. * - * * - * You should have received a copy of the GNU General Public License * - * along with this program; if not, write to the Free Software Foundation * - * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. * - ******************************************************************************/ - -#ifndef REQUESTMESSAGE_H -#define REQUESTMESSAGE_H - -#include "remoting/i2-remoting.h" -#include "remoting/messagepart.h" - -namespace icinga -{ - -/** - * A JSON-RPC request message. - * - * @ingroup remoting - */ -class I2_REMOTING_API RequestMessage : public MessagePart -{ -public: - /** - * Constructor for the RequestMessage class. - */ - RequestMessage(void) : MessagePart() { - SetVersion("2.0"); - } - - /** - * Copy-constructor for the RequestMessage class. - * - * @param message The message that is to be copied. - */ - RequestMessage(const MessagePart& message) : MessagePart(message) { } - - /** - * Retrieves the version of the JSON-RPC protocol. - * - * @param[out] value The value. - * @returns true if the value was retrieved, false otherwise. - */ - inline bool GetVersion(String *value) const - { - return Get("jsonrpc", value); - } - - /** - * Sets the version of the JSON-RPC protocol that should be used. - * - * @param value The version. - */ - inline void SetVersion(const String& value) - { - Set("jsonrpc", value); - } - - /** - * Retrieves the method of the JSON-RPC call. - * - * @param[out] value The method. - * @returns true if the value was retrieved, false otherwise. - */ - inline bool GetMethod(String *value) const - { - return Get("method", value); - } - - /** - * Sets the method for the JSON-RPC call. - * - * @param value The method. - */ - inline void SetMethod(const String& value) - { - Set("method", value); - } - - /** - * Retrieves the parameters of the JSON-RPC call. - * - * @param[out] value The parameters. - * @returns true if the value was retrieved, false otherwise. - */ - inline bool GetParams(MessagePart *value) const - { - return Get("params", value); - } - - /** - * Sets the parameters for the JSON-RPC call. - * - * @param value The parameters. - */ - inline void SetParams(const MessagePart& value) - { - Set("params", value); - } - - /** - * Retrieves the ID of the JSON-RPC call. - * - * @param[out] value The ID. - * @return true if the value was retrieved, false otherwise. - */ - inline bool GetID(String *value) const - { - return Get("id", value); - } - - /** - * Sets the ID for the JSON-RPC call. - * - * @param value The ID. - */ - inline void SetID(const String& value) - { - Set("id", value); - } -}; - -} - -#endif /* REQUESTMESSAGE_H */ diff --git a/lib/remoting/responsemessage.cpp b/lib/remoting/responsemessage.cpp deleted file mode 100644 index 5461015e9..000000000 --- a/lib/remoting/responsemessage.cpp +++ /dev/null @@ -1,22 +0,0 @@ -/****************************************************************************** - * Icinga 2 * - * Copyright (C) 2012 Icinga Development Team (http://www.icinga.org/) * - * * - * This program is free software; you can redistribute it and/or * - * modify it under the terms of the GNU General Public License * - * as published by the Free Software Foundation; either version 2 * - * of the License, or (at your option) any later version. * - * * - * This program is distributed in the hope that it will be useful, * - * but WITHOUT ANY WARRANTY; without even the implied warranty of * - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * - * GNU General Public License for more details. * - * * - * You should have received a copy of the GNU General Public License * - * along with this program; if not, write to the Free Software Foundation * - * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. * - ******************************************************************************/ - -#include "remoting/responsemessage.h" - -using namespace icinga; diff --git a/lib/remoting/responsemessage.h b/lib/remoting/responsemessage.h deleted file mode 100644 index a3cd9b9e6..000000000 --- a/lib/remoting/responsemessage.h +++ /dev/null @@ -1,149 +0,0 @@ -/****************************************************************************** - * Icinga 2 * - * Copyright (C) 2012 Icinga Development Team (http://www.icinga.org/) * - * * - * This program is free software; you can redistribute it and/or * - * modify it under the terms of the GNU General Public License * - * as published by the Free Software Foundation; either version 2 * - * of the License, or (at your option) any later version. * - * * - * This program is distributed in the hope that it will be useful, * - * but WITHOUT ANY WARRANTY; without even the implied warranty of * - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * - * GNU General Public License for more details. * - * * - * You should have received a copy of the GNU General Public License * - * along with this program; if not, write to the Free Software Foundation * - * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. * - ******************************************************************************/ - -#ifndef RESPONSEMESSAGE_H -#define RESPONSEMESSAGE_H - -#include "remoting/i2-remoting.h" -#include "remoting/messagepart.h" - -namespace icinga -{ - -/** - * A JSON-RPC response message. - * - * @ingroup remoting - */ -class I2_REMOTING_API ResponseMessage : public MessagePart -{ -public: - /** - * Constructor for the ResponseMessage class. - */ - ResponseMessage(void) : MessagePart() { - SetVersion("2.0"); - } - - /** - * Copy-constructor for the ResponseMessage class. - * - * @param message The message that should be copied. - */ - ResponseMessage(const MessagePart& message) : MessagePart(message) { } - - /** - * Retrieves the version of the JSON-RPC protocol. - * - * @param[out] value The value. - * @returns true if the value was retrieved, false otherwise. - */ - inline bool GetVersion(String *value) const - { - return Get("jsonrpc", value); - } - - /** - * Sets the version of the JSON-RPC protocol that should be used. - * - * @param value The version. - */ - inline void SetVersion(const String& value) - { - Set("jsonrpc", value); - } - - /** - * Retrieves the result of the JSON-RPC call. - * - * @param[out] value The result. - * @returns true if the value was retrieved, false otherwise. - */ - bool GetResult(MessagePart *value) const - { - return Get("result", value); - } - - /** - * Sets the result for the JSON-RPC call. - * - * @param value The result. - */ - void SetResult(const MessagePart& value) - { - Set("result", value); - } - - /** - * Retrieves the error message of the JSON-RPC call. - * - * @param[out] value The error message. - * @returns true if the value was retrieved, false otherwise. - */ - bool GetError(String *value) const - { - return Get("error", value); - } - - /** - * Sets the error message for the JSON-RPC call. - * - * @param value The error message. - */ - void SetError(const String& value) - { - Set("error", value); - } - - /** - * Retrieves the ID of the JSON-RPC call. - * - * @param[out] value The ID. - * @return true if the value was retrieved, false otherwise. - */ - bool GetID(String *value) const - { - return Get("id", value); - } - - /** - * Sets the ID for the JSON-RPC call. - * - * @param value The ID. - */ - void SetID(const String& value) - { - Set("id", value); - } - - /** - * Checks whether a message is a response message. - * - * @param message The message. - * @returns true if the message is a response message, false otherwise. - */ - static bool IsResponseMessage(const MessagePart& message) - { - return (message.Contains("result")); - } -}; - -} - -#endif /* RESPONSEMESSAGE_H */