void ClusterComponent::MessageHandler(const Endpoint::Ptr& sender, const Dictionary::Ptr& message)
{
- if (message->Contains("ts") && sender->GetRemoteLogPosition() + 10 < message->Get("ts")) {
- Dictionary::Ptr lparams = boost::make_shared<Dictionary>();
- lparams->Set("log_position", message->Get("ts"));
+ if (message->Contains("ts")) {
+ double ts = message->Get("ts");
- Dictionary::Ptr lmessage = boost::make_shared<Dictionary>();
- lmessage->Set("jsonrpc", "2.0");
- lmessage->Set("method", "cluster::SetLogPosition");
- lmessage->Set("params", lparams);
+ /* ignore old messages */
+ if (ts < sender->GetRemoteLogPosition())
+ return;
+
+ if (sender->GetRemoteLogPosition() + 10 < ts) {
+ Dictionary::Ptr lparams = boost::make_shared<Dictionary>();
+ lparams->Set("log_position", message->Get("ts"));
- sender->SendMessage(lmessage);
+ Dictionary::Ptr lmessage = boost::make_shared<Dictionary>();
+ lmessage->Set("jsonrpc", "2.0");
+ lmessage->Set("method", "cluster::SetLogPosition");
+ lmessage->Set("params", lparams);
- sender->SetRemoteLogPosition(message->Get("ts"));
+ sender->SendMessage(lmessage);
- Log(LogInformation, "cluster", "Acknowledging log position for identity '" + sender->GetName() + "': " + Utility::FormatDateTime("%Y/%m/%d %H:%M:%S", message->Get("ts")));
+ sender->SetRemoteLogPosition(message->Get("ts"));
+
+ Log(LogInformation, "cluster", "Acknowledging log position for identity '" + sender->GetName() + "': " + Utility::FormatDateTime("%Y/%m/%d %H:%M:%S", message->Get("ts")));
+ }
}
RelayMessage(sender, message, true);