String id = vid;
- auto it = m_ApiCallbacks.find(id);
+ ApiCallbackInfo aci;
- if (it == m_ApiCallbacks.end())
- return;
+ {
+ boost::mutex::scoped_lock lock(m_ApiCallbacksMutex);
+ auto it = m_ApiCallbacks.find(id);
+
+ if (it == m_ApiCallbacks.end())
+ return;
- ApiCallbackInfo aci = it->second;
- m_ApiCallbacks.erase(it);
+ aci = it->second;
+ m_ApiCallbacks.erase(it);
+ }
try {
aci.Callback(message);
return Empty;
}
+bool ApiCallbackInfo::IsExpired(void) const
+{
+ return Timestamp < Utility::GetTime() - 300;
+}
+
void JsonRpcConnection::CheckLiveness(void)
{
if (m_Seen < Utility::GetTime() - 60 && (!m_Endpoint || !m_Endpoint->GetSyncing())) {
<< "No messages for identity '" << m_Identity << "' have been received in the last 60 seconds.";
Disconnect();
}
+
+ {
+ boost::mutex::scoped_lock lock(m_ApiCallbacksMutex);
+
+ for (auto it = m_ApiCallbacks.begin(), last = m_ApiCallbacks.end(); it != last; ) {
+ if (it->second.IsExpired()) {
+ it = m_ApiCallbacks.erase(it);
+ } else {
+ ++it;
+ }
+ }
+ }
}
void JsonRpcConnection::TimeoutTimerHandler(void)
aci.Timestamp = Utility::GetTime();
aci.Callback = callback;
- m_ApiCallbacks[id] = aci;
+ {
+ boost::mutex::scoped_lock lock(m_ApiCallbacksMutex);
+ m_ApiCallbacks[id] = aci;
+ }
}
{
double Timestamp;
boost::function<void (const Dictionary::Ptr&)> Callback;
+
+ bool IsExpired(void) const;
};
/**
double m_HeartbeatTimeout;
boost::mutex m_DataHandlerMutex;
std::map<String, ApiCallbackInfo> m_ApiCallbacks;
+ boost::mutex m_ApiCallbacksMutex;
StreamReadContext m_Context;