From: Gunnar Beutner Date: Thu, 14 Jun 2012 09:18:20 +0000 (+0200) Subject: Implemented asynchronous (internal) API calls. X-Git-Tag: v0.0.1~433 X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=739d0c1c29d4134f69ddc300480e83d9dc59d944;p=icinga2 Implemented asynchronous (internal) API calls. --- diff --git a/base/dictionary.cpp b/base/dictionary.cpp index 90d6c7c06..81da81251 100644 --- a/base/dictionary.cpp +++ b/base/dictionary.cpp @@ -50,3 +50,8 @@ long Dictionary::GetLength(void) const { return m_Data.size(); } + +bool Dictionary::Contains(const string& key) const +{ + return (m_Data.find(key) != m_Data.end()); +} diff --git a/base/dictionary.h b/base/dictionary.h index 549d4c74a..3515b2f4e 100644 --- a/base/dictionary.h +++ b/base/dictionary.h @@ -116,6 +116,8 @@ public: SetProperty(key, value); } + bool Contains(const string& key) const; + DictionaryIterator Begin(void); DictionaryIterator End(void); diff --git a/base/timer.cpp b/base/timer.cpp index 9143314c7..592c068b6 100644 --- a/base/timer.cpp +++ b/base/timer.cpp @@ -151,6 +151,8 @@ EventArgs Timer::GetUserArgs(void) const */ void Timer::Start(void) { + Stop(); + Timers.push_back(static_pointer_cast(shared_from_this())); Reschedule(time(NULL) + m_Interval); diff --git a/components/Makefile.am b/components/Makefile.am index 7d7618781..4a6cb77a7 100644 --- a/components/Makefile.am +++ b/components/Makefile.am @@ -1,7 +1,9 @@ ## Process this file with automake to produce Makefile.in ## Created by Anjuta -SUBDIRS = configfile \ +SUBDIRS = \ + checker \ + configfile \ configrpc \ demo \ discovery diff --git a/components/demo/democomponent.cpp b/components/demo/democomponent.cpp index 2d9ff2385..5bf073cc5 100644 --- a/components/demo/democomponent.cpp +++ b/components/demo/democomponent.cpp @@ -40,9 +40,7 @@ void DemoComponent::Start(void) m_DemoEndpoint->RegisterTopicHandler("demo::HelloWorld", bind_weak(&DemoComponent::HelloWorldRequestHandler, shared_from_this())); m_DemoEndpoint->RegisterPublication("demo::HelloWorld"); - - EndpointManager::Ptr endpointManager = GetIcingaApplication()->GetEndpointManager(); - endpointManager->RegisterEndpoint(m_DemoEndpoint); + GetEndpointManager()->RegisterEndpoint(m_DemoEndpoint); m_DemoTimer = make_shared(); m_DemoTimer->SetInterval(5); diff --git a/configure.ac b/configure.ac index 010e6a4da..11ad8707f 100644 --- a/configure.ac +++ b/configure.ac @@ -69,6 +69,7 @@ Makefile compat/Makefile base/Makefile components/Makefile +components/checker/Makefile components/configfile/Makefile components/configrpc/Makefile components/demo/Makefile diff --git a/icinga-app/icinga-standalone.conf b/icinga-app/icinga-standalone.conf index d5e71f27b..c4d42e4dd 100644 --- a/icinga-app/icinga-standalone.conf +++ b/icinga-app/icinga-standalone.conf @@ -2,7 +2,7 @@ local object application "icinga" { } -local object component "demo" { +local object component "checker" { } @@ -20,7 +20,8 @@ abstract object service "nagios-service" { abstract object service "ping" inherits "nagios-service" { check_type = "nagios", - check_command = "$plugindir$/check_ping -H $address$" + check_command = "$plugindir$/check_ping -H $address$", + check_interval = 30 } object service "localhost-ping" inherits "ping" { diff --git a/icinga.sln b/icinga.sln index 1cfbaaa5f..8d3824492 100644 --- a/icinga.sln +++ b/icinga.sln @@ -24,6 +24,7 @@ Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "icinga-app", "icinga-app\ic {2E6C1133-730F-4875-A72C-B455B1DD4C5C} = {2E6C1133-730F-4875-A72C-B455B1DD4C5C} {697C6D7E-3109-484C-A7AF-384D28711610} = {697C6D7E-3109-484C-A7AF-384D28711610} {E58F1DA7-B723-412B-B2B7-7FF58E2A944E} = {E58F1DA7-B723-412B-B2B7-7FF58E2A944E} + {38CE81CC-2660-4EF0-A936-4A337591DA3E} = {38CE81CC-2660-4EF0-A936-4A337591DA3E} {C1FC77E1-04A4-481B-A78B-2F7AF489C2F8} = {C1FC77E1-04A4-481B-A78B-2F7AF489C2F8} EndProjectSection EndProject @@ -62,6 +63,11 @@ Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "dyntest", "dyntest\dyntest. {B26AFFA6-2BDF-42E6-A224-2591FFD9BFB7} = {B26AFFA6-2BDF-42E6-A224-2591FFD9BFB7} EndProjectSection EndProject +Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "checker", "components\checker\checker.vcxproj", "{38CE81CC-2660-4EF0-A936-4A337591DA3E}" + ProjectSection(ProjectDependencies) = postProject + {C1FC77E1-04A4-481B-A78B-2F7AF489C2F8} = {C1FC77E1-04A4-481B-A78B-2F7AF489C2F8} + EndProjectSection +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Win32 = Debug|Win32 @@ -116,6 +122,10 @@ Global {E6FA740D-0939-4711-AFBC-3D9E913510A1}.Debug|Win32.Build.0 = Debug|Win32 {E6FA740D-0939-4711-AFBC-3D9E913510A1}.Release|Win32.ActiveCfg = Release|Win32 {E6FA740D-0939-4711-AFBC-3D9E913510A1}.Release|Win32.Build.0 = Release|Win32 + {38CE81CC-2660-4EF0-A936-4A337591DA3E}.Debug|Win32.ActiveCfg = Debug|Win32 + {38CE81CC-2660-4EF0-A936-4A337591DA3E}.Debug|Win32.Build.0 = Debug|Win32 + {38CE81CC-2660-4EF0-A936-4A337591DA3E}.Release|Win32.ActiveCfg = Release|Win32 + {38CE81CC-2660-4EF0-A936-4A337591DA3E}.Release|Win32.Build.0 = Release|Win32 EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE diff --git a/icinga/checktask.h b/icinga/checktask.h index 622e018aa..00fc29096 100644 --- a/icinga/checktask.h +++ b/icinga/checktask.h @@ -24,7 +24,7 @@ struct CheckResult Dictionary::Ptr PerformanceData; }; -class CheckTask : public Object +class I2_ICINGA_API CheckTask : public Object { public: typedef shared_ptr Ptr; diff --git a/icinga/endpointmanager.cpp b/icinga/endpointmanager.cpp index 728285b11..94856bc1d 100644 --- a/icinga/endpointmanager.cpp +++ b/icinga/endpointmanager.cpp @@ -184,15 +184,14 @@ void EndpointManager::UnregisterEndpoint(Endpoint::Ptr endpoint) void EndpointManager::SendUnicastMessage(Endpoint::Ptr sender, Endpoint::Ptr recipient, const MessagePart& message) { - /* don't forward messages back to the sender */ - if (sender == recipient) - return; - /* don't forward messages between non-local endpoints */ if (!sender->IsLocal() && !recipient->IsLocal()) return; - recipient->ProcessRequest(sender, message); + if (ResponseMessage::IsResponseMessage(message)) + recipient->ProcessResponse(sender, message); + else + recipient->ProcessRequest(sender, message); } /** @@ -205,7 +204,23 @@ void EndpointManager::SendUnicastMessage(Endpoint::Ptr sender, void EndpointManager::SendAnycastMessage(Endpoint::Ptr sender, const RequestMessage& message) { - throw NotImplementedException(); + string method; + if (!message.GetMethod(&method)) + throw invalid_argument("Message is missing the 'method' property."); + + vector candidates; + for (vector::iterator i = m_Endpoints.begin(); i != m_Endpoints.end(); i++) + { + Endpoint::Ptr endpoint = *i; + if (endpoint->HasSubscription(method)) + candidates.push_back(endpoint); + } + + if (candidates.size() == 0) + return; + + Endpoint::Ptr recipient = candidates[rand() % candidates.size()]; + SendUnicastMessage(sender, recipient, message); } /** @@ -229,6 +244,11 @@ void EndpointManager::SendMulticastMessage(Endpoint::Ptr sender, for (vector::iterator i = m_Endpoints.begin(); i != m_Endpoints.end(); i++) { Endpoint::Ptr recipient = *i; + + /* don't forward messages back to the sender */ + if (sender == recipient) + continue; + if (recipient->HasSubscription(method)) SendUnicastMessage(sender, recipient, message); } @@ -269,3 +289,103 @@ Endpoint::Ptr EndpointManager::GetEndpointByIdentity(string identity) const return Endpoint::Ptr(); } + +void EndpointManager::SendAPIMessage(Endpoint::Ptr sender, + RequestMessage& message, + function callback, time_t timeout) +{ + m_NextMessageID++; + + stringstream idstream; + idstream << m_NextMessageID; + + string id = idstream.str(); + message.SetID(id); + + PendingRequest pr; + pr.Request = message; + pr.Callback = callback; + pr.Timeout = time(NULL) + timeout; + + m_Requests[id] = pr; + RescheduleRequestTimer(); + + SendAnycastMessage(sender, message); +} + +bool EndpointManager::RequestTimeoutLessComparer(const pair& a, + const pair& b) +{ + return a.second.Timeout < b.second.Timeout; +} + +void EndpointManager::RescheduleRequestTimer(void) +{ + map::iterator it; + it = min_element(m_Requests.begin(), m_Requests.end(), + &EndpointManager::RequestTimeoutLessComparer); + + if (!m_RequestTimer) { + m_RequestTimer = make_shared(); + m_RequestTimer->OnTimerExpired += bind_weak(&EndpointManager::RequestTimerHandler, shared_from_this()); + } + + if (it != m_Requests.end()) { + time_t now; + time(&now); + + time_t next_timeout = (it->second.Timeout < now) ? now : it->second.Timeout; + m_RequestTimer->SetInterval(next_timeout - now); + m_RequestTimer->Start(); + } else { + m_RequestTimer->Stop(); + } +} + +int EndpointManager::RequestTimerHandler(const TimerEventArgs& ea) +{ + map::iterator it; + for (it = m_Requests.begin(); it != m_Requests.end(); it++) { + if (it->second.HasTimedOut()) { + NewResponseEventArgs nrea; + nrea.Request = it->second.Request; + nrea.Source = shared_from_this(); + nrea.TimedOut = true; + + it->second.Callback(nrea); + + m_Requests.erase(it); + + break; + } + } + + RescheduleRequestTimer(); + + return 0; +} + +void EndpointManager::ProcessResponseMessage(const Endpoint::Ptr& sender, const ResponseMessage& message) +{ + string id; + if (!message.GetID(&id)) + throw invalid_argument("Response message must have a message ID."); + + map::iterator it; + it = m_Requests.find(id); + + if (it == m_Requests.end()) + return; + + NewResponseEventArgs nrea; + nrea.Sender = sender; + nrea.Request = it->second.Request; + nrea.Response = message; + nrea.Source = shared_from_this(); + nrea.TimedOut = false; + + it->second.Callback(nrea); + + m_Requests.erase(it); + RescheduleRequestTimer(); +} diff --git a/icinga/endpointmanager.h b/icinga/endpointmanager.h index ed2413a1d..699efd590 100644 --- a/icinga/endpointmanager.h +++ b/icinga/endpointmanager.h @@ -33,6 +33,38 @@ struct I2_ICINGA_API NewEndpointEventArgs : public EventArgs icinga::Endpoint::Ptr Endpoint; /**< The new endpoint. */ }; +struct NewResponseEventArgs; + +/** + * Information about a pending API request. + * + * @ingroup icinga + */ +struct I2_ICINGA_API PendingRequest +{ + time_t Timeout; + RequestMessage Request; + function Callback; + + bool HasTimedOut(void) const + { + return time(NULL) > Timeout; + } +}; + +/** + * Event arguments for the "new response" event. + * + * @ingroup icinga + */ +struct I2_ICINGA_API NewResponseEventArgs : public EventArgs +{ + Endpoint::Ptr Sender; + RequestMessage Request; + ResponseMessage Response; + bool TimedOut; +}; + /** * Forwards messages between endpoints. * @@ -44,6 +76,10 @@ public: typedef shared_ptr Ptr; typedef weak_ptr WeakPtr; + EndpointManager(void) + : m_NextMessageID(0) + { } + void SetIdentity(string identity); string GetIdentity(void) const; @@ -60,6 +96,11 @@ public: void SendAnycastMessage(Endpoint::Ptr sender, const RequestMessage& message); void SendMulticastMessage(Endpoint::Ptr sender, const RequestMessage& message); + void SendAPIMessage(Endpoint::Ptr sender, RequestMessage& message, + function callback, time_t timeout = 10); + + void ProcessResponseMessage(const Endpoint::Ptr& sender, const ResponseMessage& message); + void ForEachEndpoint(function callback); Endpoint::Ptr GetEndpointByIdentity(string identity) const; @@ -73,9 +114,17 @@ private: vector m_Servers; vector m_Endpoints; + long m_NextMessageID; + map m_Requests; + Timer::Ptr m_RequestTimer; + void RegisterServer(JsonRpcServer::Ptr server); void UnregisterServer(JsonRpcServer::Ptr server); + static bool RequestTimeoutLessComparer(const pair& a, const pair& b); + void RescheduleRequestTimer(void); + int RequestTimerHandler(const TimerEventArgs& ea); + int NewClientHandler(const NewClientEventArgs& ncea); }; diff --git a/icinga/icingaapplication.cpp b/icinga/icingaapplication.cpp index bb907df5e..4cdfb5f6e 100644 --- a/icinga/icingaapplication.cpp +++ b/icinga/icingaapplication.cpp @@ -91,16 +91,6 @@ int IcingaApplication::Main(const vector& args) m_EndpointManager->SetSSLContext(sslContext); } - CheckTask::RegisterType("nagios", NagiosCheckTask::CreateTask); - - ConfigObject::TMap::Range range = ConfigObject::GetObjects("service"); - - for (ConfigObject::TMap::Iterator it = range.first; it != range.second; it++) { - Service svc(it->second); - CheckTask::Ptr ct = CheckTask::CreateTask(svc); - CheckResult cr = ct->Execute(); - } - /* create the primary RPC listener */ string service = GetService(); if (!service.empty()) diff --git a/icinga/jsonrpcendpoint.cpp b/icinga/jsonrpcendpoint.cpp index 6386269c0..2d89a7fb5 100644 --- a/icinga/jsonrpcendpoint.cpp +++ b/icinga/jsonrpcendpoint.cpp @@ -77,8 +77,7 @@ void JsonRpcEndpoint::ProcessRequest(Endpoint::Ptr sender, const RequestMessage& void JsonRpcEndpoint::ProcessResponse(Endpoint::Ptr sender, const ResponseMessage& message) { - if (IsConnected()) - m_Client->SendMessage(message); + m_Client->SendMessage(message); } int JsonRpcEndpoint::NewMessageHandler(const NewMessageEventArgs& nmea) @@ -86,24 +85,27 @@ int JsonRpcEndpoint::NewMessageHandler(const NewMessageEventArgs& nmea) const MessagePart& message = nmea.Message; Endpoint::Ptr sender = static_pointer_cast(shared_from_this()); + if (ResponseMessage::IsResponseMessage(message)) { + /* rather than routing the message to the right virtual + * endpoint we just process it here right away. */ + GetEndpointManager()->ProcessResponseMessage(sender, message); + return 0; + } + string method; - if (message.GetProperty("method", &method)) { - if (!HasPublication(method)) - return 0; + if (!message.GetProperty("method", &method)) + return 0; - RequestMessage request = message; + if (!HasPublication(method)) + return 0; - string id; - if (request.GetID(&id)) - GetEndpointManager()->SendAnycastMessage(sender, request); - else - GetEndpointManager()->SendMulticastMessage(sender, request); - } else { - ResponseMessage response = message; + RequestMessage request = message; - // TODO: deal with response messages - throw NotImplementedException(); - } + string id; + if (request.GetID(&id)) + GetEndpointManager()->SendAnycastMessage(sender, request); + else + GetEndpointManager()->SendMulticastMessage(sender, request); return 0; } diff --git a/icinga/macroprocessor.h b/icinga/macroprocessor.h index 3f756c097..3af0c1555 100644 --- a/icinga/macroprocessor.h +++ b/icinga/macroprocessor.h @@ -4,7 +4,7 @@ namespace icinga { -class MacroProcessor +class I2_ICINGA_API MacroProcessor { public: static string ResolveMacros(string str, Dictionary::Ptr macros); diff --git a/icinga/nagioschecktask.h b/icinga/nagioschecktask.h index 60c146414..99cdd7092 100644 --- a/icinga/nagioschecktask.h +++ b/icinga/nagioschecktask.h @@ -4,7 +4,7 @@ namespace icinga { -class NagiosCheckTask : public CheckTask +class I2_ICINGA_API NagiosCheckTask : public CheckTask { public: NagiosCheckTask(const Service& service); diff --git a/icinga/service.cpp b/icinga/service.cpp index 3e38f9e30..6e8b606d7 100644 --- a/icinga/service.cpp +++ b/icinga/service.cpp @@ -51,14 +51,26 @@ long Service::GetMaxCheckAttempts(void) const long Service::GetCheckInterval(void) const { - long value = 1; + long value = 300; GetConfigObject()->GetProperty("check_interval", &value); return value; } long Service::GetRetryInterval(void) const { - long value = 1; + long value = 15; GetConfigObject()->GetProperty("retry_interval", &value); return value; } + +void Service::SetNextCheck(time_t nextCheck) +{ + GetConfigObject()->SetTag("next_check", static_cast(nextCheck)); +} + +time_t Service::GetNextCheck(void) const +{ + long value = 0; + GetConfigObject()->GetTag("next_check", &value); + return value; +} diff --git a/icinga/service.h b/icinga/service.h index ca47ae3e1..4257ec764 100644 --- a/icinga/service.h +++ b/icinga/service.h @@ -19,6 +19,9 @@ public: long GetMaxCheckAttempts(void) const; long GetCheckInterval(void) const; long GetRetryInterval(void) const; + + void SetNextCheck(time_t nextCheck); + time_t GetNextCheck(void) const; }; } diff --git a/icinga/virtualendpoint.cpp b/icinga/virtualendpoint.cpp index be8b0c28b..fbb8b76a6 100644 --- a/icinga/virtualendpoint.cpp +++ b/icinga/virtualendpoint.cpp @@ -74,8 +74,7 @@ void VirtualEndpoint::ProcessRequest(Endpoint::Ptr sender, const RequestMessage& void VirtualEndpoint::ProcessResponse(Endpoint::Ptr sender, const ResponseMessage& response) { - // TODO: figure out which request this response belongs to and notify the caller - throw NotImplementedException(); + GetEndpointManager()->ProcessResponseMessage(sender, response); } void VirtualEndpoint::Stop(void) diff --git a/jsonrpc/messagepart.cpp b/jsonrpc/messagepart.cpp index 19d1d1c88..ee405c081 100644 --- a/jsonrpc/messagepart.cpp +++ b/jsonrpc/messagepart.cpp @@ -232,3 +232,14 @@ DictionaryIterator MessagePart::End(void) { return GetDictionary()->End(); } + +/** + * Checks whether the message contains the specified element. + * + * @param key The name of the element. + * @returns true if the message contains the element, false otherwise. + */ +bool MessagePart::Contains(const string& key) const +{ + return GetDictionary()->Contains(key); +} diff --git a/jsonrpc/messagepart.h b/jsonrpc/messagepart.h index 19ae4b161..82d76d330 100644 --- a/jsonrpc/messagepart.h +++ b/jsonrpc/messagepart.h @@ -85,6 +85,8 @@ public: void AddUnnamedProperty(const MessagePart& value); + bool Contains(const string& key) const; + DictionaryIterator Begin(void); DictionaryIterator End(void); diff --git a/jsonrpc/responsemessage.h b/jsonrpc/responsemessage.h index 785c5c0bf..8a6431085 100644 --- a/jsonrpc/responsemessage.h +++ b/jsonrpc/responsemessage.h @@ -128,6 +128,17 @@ public: { SetProperty("id", value); } + + /** + * Checks whether a message is a response message. + * + * @param message The message. + * @returns true if the message is a response message, false otherwise. + */ + static bool IsResponseMessage(const MessagePart& message) + { + return (message.Contains("result")); + } }; }