void ClusterComponent::ReplayLog(const Endpoint::Ptr& endpoint, const Stream::Ptr& stream)
{
int count = 0;
+ double peer_ts = endpoint->GetLocalLogPosition();
+ bool last_sync = false;
ASSERT(!OwnsLock());
Utility::Glob(GetClusterDir() + "log/*", boost::bind(&ClusterComponent::LogGlobHandler, boost::ref(files), _1));
std::sort(files.begin(), files.end());
- if (files.size() > 1) {
+ if (count == 0 || count > 50000) {
OpenLogFile();
olock.Unlock();
+ } else {
+ last_sync = true;
}
BOOST_FOREACH(int ts, files) {
String path = GetClusterDir() + "log/" + Convert::ToString(ts);
- if (ts < endpoint->GetLocalLogPosition())
+ if (ts < peer_ts)
continue;
Log(LogInformation, "cluster", "Replaying log: " + path);
Dictionary::Ptr pmessage = Value::Deserialize(message);
- if (pmessage->Get("timestamp") < endpoint->GetLocalLogPosition())
+ if (pmessage->Get("timestamp") < peer_ts)
continue;
if (pmessage->Get("except") == endpoint->GetName())
NetString::WriteStringToStream(stream, pmessage->Get("message"));
count++;
+
+ peer_ts = pmessage->Get("timestamp");
}
lstream->Close();
Log(LogInformation, "cluster", "Replayed " + Convert::ToString(count) + " messages.");
- if (files.size() == 1) {
+ if (last_sync) {
ObjectLock olock2(endpoint);
endpoint->SetSyncing(false);