]> granicus.if.org Git - icinga2/commitdiff
Implemented asynchronous (internal) API calls.
authorGunnar Beutner <gunnar@beutner.name>
Thu, 14 Jun 2012 09:18:20 +0000 (11:18 +0200)
committerGunnar Beutner <gunnar@beutner.name>
Thu, 14 Jun 2012 09:18:20 +0000 (11:18 +0200)
21 files changed:
base/dictionary.cpp
base/dictionary.h
base/timer.cpp
components/Makefile.am
components/demo/democomponent.cpp
configure.ac
icinga-app/icinga-standalone.conf
icinga.sln
icinga/checktask.h
icinga/endpointmanager.cpp
icinga/endpointmanager.h
icinga/icingaapplication.cpp
icinga/jsonrpcendpoint.cpp
icinga/macroprocessor.h
icinga/nagioschecktask.h
icinga/service.cpp
icinga/service.h
icinga/virtualendpoint.cpp
jsonrpc/messagepart.cpp
jsonrpc/messagepart.h
jsonrpc/responsemessage.h

index 90d6c7c0608eaad36ab415e7ae0e81f8d7e9755c..81da81251153c6ae7e340ae134454ed389352f4d 100644 (file)
@@ -50,3 +50,8 @@ long Dictionary::GetLength(void) const
 {
        return m_Data.size();
 }
+
+bool Dictionary::Contains(const string& key) const
+{
+       return (m_Data.find(key) != m_Data.end());
+}
index 549d4c74a1620c1b5bb1552670fdfde67240beb1..3515b2f4e2681c68ba4ad14599d101f1f8bdbdf5 100644 (file)
@@ -116,6 +116,8 @@ public:
                SetProperty(key, value);
        }
 
+       bool Contains(const string& key) const;
+
        DictionaryIterator Begin(void);
        DictionaryIterator End(void);
 
index 9143314c71e8610b120f8b52f12cb4d05874b1df..592c068b6c1dc503bf9c8f4620cec11fcb3fd311 100644 (file)
@@ -151,6 +151,8 @@ EventArgs Timer::GetUserArgs(void) const
  */
 void Timer::Start(void)
 {
+       Stop();
+
        Timers.push_back(static_pointer_cast<Timer>(shared_from_this()));
 
        Reschedule(time(NULL) + m_Interval);
index 7d7618781217fbbff6e161819c5298aa071eb994..4a6cb77a7caec48244b4e15e6931504ee7c2d3f6 100644 (file)
@@ -1,7 +1,9 @@
 ## Process this file with automake to produce Makefile.in
 ## Created by Anjuta
 
-SUBDIRS = configfile \
+SUBDIRS = \
+       checker \
+       configfile \
        configrpc \
        demo \
        discovery
index 2d9ff238579e11d48d328e47fcfae9d8c4fd937e..5bf073cc50a3c711edca3b6d5be055b68030c9ad 100644 (file)
@@ -40,9 +40,7 @@ void DemoComponent::Start(void)
        m_DemoEndpoint->RegisterTopicHandler("demo::HelloWorld",
            bind_weak(&DemoComponent::HelloWorldRequestHandler, shared_from_this()));
        m_DemoEndpoint->RegisterPublication("demo::HelloWorld");
-
-       EndpointManager::Ptr endpointManager = GetIcingaApplication()->GetEndpointManager();
-       endpointManager->RegisterEndpoint(m_DemoEndpoint);
+       GetEndpointManager()->RegisterEndpoint(m_DemoEndpoint);
 
        m_DemoTimer = make_shared<Timer>();
        m_DemoTimer->SetInterval(5);
index 010e6a4daba52b93cc6603ff034a2071cfb06610..11ad8707f3aaf44b5306767da8a6ff489a23adc1 100644 (file)
@@ -69,6 +69,7 @@ Makefile
 compat/Makefile
 base/Makefile
 components/Makefile
+components/checker/Makefile
 components/configfile/Makefile
 components/configrpc/Makefile
 components/demo/Makefile
index d5e71f27b5403ce459e29f06f6405d50c26c53ae..c4d42e4dd6c8dc583c81f3a810d164a7062a5f18 100644 (file)
@@ -2,7 +2,7 @@ local object application "icinga" {
 
 }
 
-local object component "demo" {
+local object component "checker" {
 
 }
 
@@ -20,7 +20,8 @@ abstract object service "nagios-service" {
 
 abstract object service "ping" inherits "nagios-service" {
        check_type = "nagios",
-       check_command = "$plugindir$/check_ping -H $address$"
+       check_command = "$plugindir$/check_ping -H $address$",
+       check_interval = 30
 }
 
 object service "localhost-ping" inherits "ping" {
index 1cfbaaa5fa08858c1c6d3f3f0cfce86d74c38262..8d3824492ee9e730ca8a63d39bc6df72028fa2a9 100644 (file)
@@ -24,6 +24,7 @@ Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "icinga-app", "icinga-app\ic
                {2E6C1133-730F-4875-A72C-B455B1DD4C5C} = {2E6C1133-730F-4875-A72C-B455B1DD4C5C}
                {697C6D7E-3109-484C-A7AF-384D28711610} = {697C6D7E-3109-484C-A7AF-384D28711610}
                {E58F1DA7-B723-412B-B2B7-7FF58E2A944E} = {E58F1DA7-B723-412B-B2B7-7FF58E2A944E}
+               {38CE81CC-2660-4EF0-A936-4A337591DA3E} = {38CE81CC-2660-4EF0-A936-4A337591DA3E}
                {C1FC77E1-04A4-481B-A78B-2F7AF489C2F8} = {C1FC77E1-04A4-481B-A78B-2F7AF489C2F8}
        EndProjectSection
 EndProject
@@ -62,6 +63,11 @@ Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "dyntest", "dyntest\dyntest.
                {B26AFFA6-2BDF-42E6-A224-2591FFD9BFB7} = {B26AFFA6-2BDF-42E6-A224-2591FFD9BFB7}
        EndProjectSection
 EndProject
+Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "checker", "components\checker\checker.vcxproj", "{38CE81CC-2660-4EF0-A936-4A337591DA3E}"
+       ProjectSection(ProjectDependencies) = postProject
+               {C1FC77E1-04A4-481B-A78B-2F7AF489C2F8} = {C1FC77E1-04A4-481B-A78B-2F7AF489C2F8}
+       EndProjectSection
+EndProject
 Global
        GlobalSection(SolutionConfigurationPlatforms) = preSolution
                Debug|Win32 = Debug|Win32
@@ -116,6 +122,10 @@ Global
                {E6FA740D-0939-4711-AFBC-3D9E913510A1}.Debug|Win32.Build.0 = Debug|Win32
                {E6FA740D-0939-4711-AFBC-3D9E913510A1}.Release|Win32.ActiveCfg = Release|Win32
                {E6FA740D-0939-4711-AFBC-3D9E913510A1}.Release|Win32.Build.0 = Release|Win32
+               {38CE81CC-2660-4EF0-A936-4A337591DA3E}.Debug|Win32.ActiveCfg = Debug|Win32
+               {38CE81CC-2660-4EF0-A936-4A337591DA3E}.Debug|Win32.Build.0 = Debug|Win32
+               {38CE81CC-2660-4EF0-A936-4A337591DA3E}.Release|Win32.ActiveCfg = Release|Win32
+               {38CE81CC-2660-4EF0-A936-4A337591DA3E}.Release|Win32.Build.0 = Release|Win32
        EndGlobalSection
        GlobalSection(SolutionProperties) = preSolution
                HideSolutionNode = FALSE
index 622e018aaa11acbe4d3d05c068bba91f684e53a8..00fc29096d857da2000f8b20b524c563dd713c37 100644 (file)
@@ -24,7 +24,7 @@ struct CheckResult
        Dictionary::Ptr PerformanceData;
 };
 
-class CheckTask : public Object
+class I2_ICINGA_API CheckTask : public Object
 {
 public:
        typedef shared_ptr<CheckTask> Ptr;
index 728285b110ac4a4cab4b0030c75081a8824c3eae..94856bc1dd89fa816863fe89717c09a40c0fe1d4 100644 (file)
@@ -184,15 +184,14 @@ void EndpointManager::UnregisterEndpoint(Endpoint::Ptr endpoint)
 void EndpointManager::SendUnicastMessage(Endpoint::Ptr sender,
     Endpoint::Ptr recipient, const MessagePart& message)
 {
-       /* don't forward messages back to the sender */
-       if (sender == recipient)
-               return;
-
        /* don't forward messages between non-local endpoints */
        if (!sender->IsLocal() && !recipient->IsLocal())
                return;
 
-       recipient->ProcessRequest(sender, message);
+       if (ResponseMessage::IsResponseMessage(message))
+               recipient->ProcessResponse(sender, message);
+       else
+               recipient->ProcessRequest(sender, message);
 }
 
 /**
@@ -205,7 +204,23 @@ void EndpointManager::SendUnicastMessage(Endpoint::Ptr sender,
 void EndpointManager::SendAnycastMessage(Endpoint::Ptr sender,
     const RequestMessage& message)
 {
-       throw NotImplementedException();
+       string method;
+       if (!message.GetMethod(&method))
+               throw invalid_argument("Message is missing the 'method' property.");
+
+       vector<Endpoint::Ptr> candidates;
+       for (vector<Endpoint::Ptr>::iterator i = m_Endpoints.begin(); i != m_Endpoints.end(); i++)
+       {
+               Endpoint::Ptr endpoint = *i;
+               if (endpoint->HasSubscription(method))
+                       candidates.push_back(endpoint);
+       }
+
+       if (candidates.size() == 0)
+               return;
+
+       Endpoint::Ptr recipient = candidates[rand() % candidates.size()];
+       SendUnicastMessage(sender, recipient, message);
 }
 
 /**
@@ -229,6 +244,11 @@ void EndpointManager::SendMulticastMessage(Endpoint::Ptr sender,
        for (vector<Endpoint::Ptr>::iterator i = m_Endpoints.begin(); i != m_Endpoints.end(); i++)
        {
                Endpoint::Ptr recipient = *i;
+
+               /* don't forward messages back to the sender */
+               if (sender == recipient)
+                       continue;
+
                if (recipient->HasSubscription(method))
                        SendUnicastMessage(sender, recipient, message);
        }
@@ -269,3 +289,103 @@ Endpoint::Ptr EndpointManager::GetEndpointByIdentity(string identity) const
 
        return Endpoint::Ptr();
 }
+
+void EndpointManager::SendAPIMessage(Endpoint::Ptr sender,
+    RequestMessage& message,
+    function<int(const NewResponseEventArgs&)> callback, time_t timeout)
+{
+       m_NextMessageID++;
+
+       stringstream idstream;
+       idstream << m_NextMessageID;
+
+       string id = idstream.str();
+       message.SetID(id);
+
+       PendingRequest pr;
+       pr.Request = message;
+       pr.Callback = callback;
+       pr.Timeout = time(NULL) + timeout;
+
+       m_Requests[id] = pr;
+       RescheduleRequestTimer();
+
+       SendAnycastMessage(sender, message);
+}
+
+bool EndpointManager::RequestTimeoutLessComparer(const pair<string, PendingRequest>& a,
+    const pair<string, PendingRequest>& b)
+{
+       return a.second.Timeout < b.second.Timeout;
+}
+
+void EndpointManager::RescheduleRequestTimer(void)
+{
+       map<string, PendingRequest>::iterator it;
+       it = min_element(m_Requests.begin(), m_Requests.end(),
+           &EndpointManager::RequestTimeoutLessComparer);
+
+       if (!m_RequestTimer) {
+               m_RequestTimer = make_shared<Timer>();
+               m_RequestTimer->OnTimerExpired += bind_weak(&EndpointManager::RequestTimerHandler, shared_from_this());
+       }
+
+       if (it != m_Requests.end()) {
+               time_t now;
+               time(&now);
+
+               time_t next_timeout = (it->second.Timeout < now) ? now : it->second.Timeout;
+               m_RequestTimer->SetInterval(next_timeout - now);
+               m_RequestTimer->Start();
+       } else {
+               m_RequestTimer->Stop();
+       }
+}
+
+int EndpointManager::RequestTimerHandler(const TimerEventArgs& ea)
+{
+       map<string, PendingRequest>::iterator it;
+       for (it = m_Requests.begin(); it != m_Requests.end(); it++) {
+               if (it->second.HasTimedOut()) {
+                       NewResponseEventArgs nrea;
+                       nrea.Request = it->second.Request;
+                       nrea.Source = shared_from_this();
+                       nrea.TimedOut = true;
+
+                       it->second.Callback(nrea);
+
+                       m_Requests.erase(it);
+
+                       break;
+               }
+       }
+
+       RescheduleRequestTimer();
+
+       return 0;
+}
+
+void EndpointManager::ProcessResponseMessage(const Endpoint::Ptr& sender, const ResponseMessage& message)
+{
+       string id;
+       if (!message.GetID(&id))
+               throw invalid_argument("Response message must have a message ID.");
+
+       map<string, PendingRequest>::iterator it;
+       it = m_Requests.find(id);
+
+       if (it == m_Requests.end())
+               return;
+
+       NewResponseEventArgs nrea;
+       nrea.Sender = sender;
+       nrea.Request = it->second.Request;
+       nrea.Response = message;
+       nrea.Source = shared_from_this();
+       nrea.TimedOut = false;
+
+       it->second.Callback(nrea);
+
+       m_Requests.erase(it);
+       RescheduleRequestTimer();
+}
index ed2413a1da8c512104f3b8b4397acccde73bcf97..699efd5905bfc79f5d8d46b9fa88a9bb31744819 100644 (file)
@@ -33,6 +33,38 @@ struct I2_ICINGA_API NewEndpointEventArgs : public EventArgs
        icinga::Endpoint::Ptr Endpoint; /**< The new endpoint. */
 };
 
+struct NewResponseEventArgs;
+
+/**
+ * Information about a pending API request.
+ *
+ * @ingroup icinga
+ */
+struct I2_ICINGA_API PendingRequest
+{
+       time_t Timeout;
+       RequestMessage Request;
+       function<int(const NewResponseEventArgs&)> Callback;
+
+       bool HasTimedOut(void) const
+       {
+               return time(NULL) > Timeout;
+       }
+};
+
+/**
+ * Event arguments for the "new response" event.
+ *
+ * @ingroup icinga
+ */
+struct I2_ICINGA_API NewResponseEventArgs : public EventArgs
+{
+       Endpoint::Ptr Sender;
+       RequestMessage Request;
+       ResponseMessage Response;
+       bool TimedOut;
+};
+
 /**
  * Forwards messages between endpoints.
  *
@@ -44,6 +76,10 @@ public:
        typedef shared_ptr<EndpointManager> Ptr;
        typedef weak_ptr<EndpointManager> WeakPtr;
 
+       EndpointManager(void)
+               : m_NextMessageID(0)
+       { }
+
        void SetIdentity(string identity);
        string GetIdentity(void) const;
 
@@ -60,6 +96,11 @@ public:
        void SendAnycastMessage(Endpoint::Ptr sender, const RequestMessage& message);
        void SendMulticastMessage(Endpoint::Ptr sender, const RequestMessage& message);
 
+       void SendAPIMessage(Endpoint::Ptr sender, RequestMessage& message,
+           function<int(const NewResponseEventArgs&)> callback, time_t timeout = 10);
+
+       void ProcessResponseMessage(const Endpoint::Ptr& sender, const ResponseMessage& message);
+
        void ForEachEndpoint(function<int (const NewEndpointEventArgs&)> callback);
 
        Endpoint::Ptr GetEndpointByIdentity(string identity) const;
@@ -73,9 +114,17 @@ private:
        vector<JsonRpcServer::Ptr> m_Servers;
        vector<Endpoint::Ptr> m_Endpoints;
 
+       long m_NextMessageID;
+       map<string, PendingRequest> m_Requests;
+       Timer::Ptr m_RequestTimer;
+
        void RegisterServer(JsonRpcServer::Ptr server);
        void UnregisterServer(JsonRpcServer::Ptr server);
 
+       static bool RequestTimeoutLessComparer(const pair<string, PendingRequest>& a, const pair<string, PendingRequest>& b);
+       void RescheduleRequestTimer(void);
+       int RequestTimerHandler(const TimerEventArgs& ea);
+
        int NewClientHandler(const NewClientEventArgs& ncea);
 };
 
index bb907df5e54a0b6cc9d040d27b208104d32511cd..4cdfb5f6e60408d81074ad1e7092244261bbb8ae 100644 (file)
@@ -91,16 +91,6 @@ int IcingaApplication::Main(const vector<string>& args)
                m_EndpointManager->SetSSLContext(sslContext);
        }
 
-       CheckTask::RegisterType("nagios", NagiosCheckTask::CreateTask);
-
-       ConfigObject::TMap::Range range = ConfigObject::GetObjects("service");
-
-       for (ConfigObject::TMap::Iterator it = range.first; it != range.second; it++) {
-               Service svc(it->second);
-               CheckTask::Ptr ct = CheckTask::CreateTask(svc);
-               CheckResult cr = ct->Execute();
-       }
-
        /* create the primary RPC listener */
        string service = GetService();
        if (!service.empty())
index 6386269c0af3d5ddda9eb2ed937b83c8bf9d186e..2d89a7fb5e27fc8ff3993dae74df915687216d8e 100644 (file)
@@ -77,8 +77,7 @@ void JsonRpcEndpoint::ProcessRequest(Endpoint::Ptr sender, const RequestMessage&
 
 void JsonRpcEndpoint::ProcessResponse(Endpoint::Ptr sender, const ResponseMessage& message)
 {
-       if (IsConnected())
-               m_Client->SendMessage(message);
+       m_Client->SendMessage(message);
 }
 
 int JsonRpcEndpoint::NewMessageHandler(const NewMessageEventArgs& nmea)
@@ -86,24 +85,27 @@ int JsonRpcEndpoint::NewMessageHandler(const NewMessageEventArgs& nmea)
        const MessagePart& message = nmea.Message;
        Endpoint::Ptr sender = static_pointer_cast<Endpoint>(shared_from_this());
 
+       if (ResponseMessage::IsResponseMessage(message)) {
+               /* rather than routing the message to the right virtual
+                * endpoint we just process it here right away. */
+               GetEndpointManager()->ProcessResponseMessage(sender, message);
+               return 0;
+       }
+
        string method;
-       if (message.GetProperty("method", &method)) {
-               if (!HasPublication(method))
-                       return 0;
+       if (!message.GetProperty("method", &method))
+               return 0;
 
-               RequestMessage request = message;
+       if (!HasPublication(method))
+               return 0;
 
-               string id;
-               if (request.GetID(&id))
-                       GetEndpointManager()->SendAnycastMessage(sender, request);
-               else
-                       GetEndpointManager()->SendMulticastMessage(sender, request);
-       } else {
-               ResponseMessage response = message;
+       RequestMessage request = message;
 
-               // TODO: deal with response messages
-               throw NotImplementedException();
-       }
+       string id;
+       if (request.GetID(&id))
+               GetEndpointManager()->SendAnycastMessage(sender, request);
+       else
+               GetEndpointManager()->SendMulticastMessage(sender, request);
 
        return 0;
 }
index 3f756c097977596b214a307c77d796df33b878e3..3af0c1555cf90975dde1af26dd39c929e46759c4 100644 (file)
@@ -4,7 +4,7 @@
 namespace icinga
 {
 
-class MacroProcessor
+class I2_ICINGA_API MacroProcessor
 {
 public:
        static string ResolveMacros(string str, Dictionary::Ptr macros);
index 60c14641435d97821127bea689d7c7d8d482a7bb..99cdd70920c6defb6c0def93bb11db0f1f1d7a50 100644 (file)
@@ -4,7 +4,7 @@
 namespace icinga
 {
 
-class NagiosCheckTask : public CheckTask
+class I2_ICINGA_API NagiosCheckTask : public CheckTask
 {
 public:
        NagiosCheckTask(const Service& service);
index 3e38f9e30c45db6dcff51bd1f73ca04437fb942d..6e8b606d7deea126a3ce941d7a4c0799bd141ffc 100644 (file)
@@ -51,14 +51,26 @@ long Service::GetMaxCheckAttempts(void) const
 
 long Service::GetCheckInterval(void) const
 {
-       long value = 1;
+       long value = 300;
        GetConfigObject()->GetProperty("check_interval", &value);
        return value;
 }
 
 long Service::GetRetryInterval(void) const
 {
-       long value = 1;
+       long value = 15;
        GetConfigObject()->GetProperty("retry_interval", &value);
        return value;
 }
+
+void Service::SetNextCheck(time_t nextCheck)
+{
+       GetConfigObject()->SetTag("next_check", static_cast<long>(nextCheck));
+}
+
+time_t Service::GetNextCheck(void) const
+{
+       long value = 0;
+       GetConfigObject()->GetTag("next_check", &value);
+       return value;
+}
index ca47ae3e1d3813f6fd1989d716332ddb8e2cfeaa..4257ec764f0d964b30b53996774cc23656bf21f2 100644 (file)
@@ -19,6 +19,9 @@ public:
        long GetMaxCheckAttempts(void) const;
        long GetCheckInterval(void) const;
        long GetRetryInterval(void) const;
+
+       void SetNextCheck(time_t nextCheck);
+       time_t GetNextCheck(void) const;
 };
 
 }
index be8b0c28bbef141871102c3da1490cfda9ca8c87..fbb8b76a629649b8d7d9907d9e1c17ee3169e049 100644 (file)
@@ -74,8 +74,7 @@ void VirtualEndpoint::ProcessRequest(Endpoint::Ptr sender, const RequestMessage&
 
 void VirtualEndpoint::ProcessResponse(Endpoint::Ptr sender, const ResponseMessage& response)
 {
-       // TODO: figure out which request this response belongs to and notify the caller
-       throw NotImplementedException();
+       GetEndpointManager()->ProcessResponseMessage(sender, response);
 }
 
 void VirtualEndpoint::Stop(void)
index 19d1d1c888d9b354ca423c85cb570e61992c2cac..ee405c08123b923cbca5038b64bce093c3069189 100644 (file)
@@ -232,3 +232,14 @@ DictionaryIterator MessagePart::End(void)
 {
        return GetDictionary()->End();
 }
+
+/**
+ * Checks whether the message contains the specified element.
+ *
+ * @param key The name of the element.
+ * @returns true if the message contains the element, false otherwise.
+ */
+bool MessagePart::Contains(const string& key) const
+{
+       return GetDictionary()->Contains(key);
+}
index 19ae4b161326dd4b889971bad52488e4c83ee553..82d76d330a26dc4f627c7e24ced2c039f1c68190 100644 (file)
@@ -85,6 +85,8 @@ public:
 
        void AddUnnamedProperty(const MessagePart& value);
 
+       bool Contains(const string& key) const;
+
        DictionaryIterator Begin(void);
        DictionaryIterator End(void);
 
index 785c5c0bf3bcc138ff7bc2962904732d6d71a605..8a6431085be70e5631a004397604f17933e090a5 100644 (file)
@@ -128,6 +128,17 @@ public:
        {
                SetProperty("id", value);
        }
+
+       /**
+        * Checks whether a message is a response message.
+        *
+        * @param message The message.
+        * @returns true if the message is a response message, false otherwise.
+        */
+       static bool IsResponseMessage(const MessagePart& message)
+       {
+               return (message.Contains("result"));
+       }
 };
 
 }