]> granicus.if.org Git - icinga2/commitdiff
Implement support for cleaning up expired API callbacks
authorGunnar Beutner <gunnar.beutner@icinga.com>
Thu, 31 Aug 2017 09:10:14 +0000 (11:10 +0200)
committerGunnar Beutner <gunnar.beutner@icinga.com>
Tue, 12 Sep 2017 10:52:49 +0000 (12:52 +0200)
refs #5450

lib/remote/jsonrpcconnection.cpp
lib/remote/jsonrpcconnection.hpp

index ca43a7c91be20aba22bc24c4e9e936b229e8074a..62466160fad0e54bb98625f329a0deef5cb649da 100644 (file)
@@ -194,13 +194,18 @@ void JsonRpcConnection::MessageHandler(const String& jsonString)
 
                String id = vid;
 
-               auto it = m_ApiCallbacks.find(id);
+               ApiCallbackInfo aci;
 
-               if (it == m_ApiCallbacks.end())
-                       return;
+               {
+                       boost::mutex::scoped_lock lock(m_ApiCallbacksMutex);
+                       auto it = m_ApiCallbacks.find(id);
+
+                       if (it == m_ApiCallbacks.end())
+                               return;
 
-               ApiCallbackInfo aci = it->second;
-               m_ApiCallbacks.erase(it);
+                       aci = it->second;
+                       m_ApiCallbacks.erase(it);
+               }
 
                try {
                        aci.Callback(message);
@@ -301,6 +306,11 @@ Value SetLogPositionHandler(const MessageOrigin::Ptr& origin, const Dictionary::
        return Empty;
 }
 
+bool ApiCallbackInfo::IsExpired(void) const
+{
+       return Timestamp < Utility::GetTime() - 300;
+}
+
 void JsonRpcConnection::CheckLiveness(void)
 {
        if (m_Seen < Utility::GetTime() - 60 && (!m_Endpoint || !m_Endpoint->GetSyncing())) {
@@ -308,6 +318,18 @@ void JsonRpcConnection::CheckLiveness(void)
                    <<  "No messages for identity '" << m_Identity << "' have been received in the last 60 seconds.";
                Disconnect();
        }
+
+       {
+               boost::mutex::scoped_lock lock(m_ApiCallbacksMutex);
+
+               for (auto it = m_ApiCallbacks.begin(), last = m_ApiCallbacks.end(); it != last; ) {
+                       if (it->second.IsExpired()) {
+                               it = m_ApiCallbacks.erase(it);
+                       } else {
+                               ++it;
+                       }
+               }
+       }
 }
 
 void JsonRpcConnection::TimeoutTimerHandler(void)
@@ -363,5 +385,8 @@ void JsonRpcConnection::RegisterCallback(const String& id, const boost::function
        aci.Timestamp = Utility::GetTime();
        aci.Callback = callback;
 
-       m_ApiCallbacks[id] = aci;
+       {
+               boost::mutex::scoped_lock lock(m_ApiCallbacksMutex);
+               m_ApiCallbacks[id] = aci;
+       }
 }
index d91125485f0eef87ede753e4e76be5ad17db0bb8..d8d2a863a400f1345de79c914a3f4b053ea92cdd 100644 (file)
@@ -47,6 +47,8 @@ struct ApiCallbackInfo
 {
        double Timestamp;
        boost::function<void (const Dictionary::Ptr&)> Callback;
+
+       bool IsExpired(void) const;
 };
 
 /**
@@ -96,6 +98,7 @@ private:
        double m_HeartbeatTimeout;
        boost::mutex m_DataHandlerMutex;
        std::map<String, ApiCallbackInfo> m_ApiCallbacks;
+       boost::mutex m_ApiCallbacksMutex;
 
        StreamReadContext m_Context;