if (endpoint->GetName() == GetIdentity())
continue;
- endpoint->SendMessage(message);
+ {
+ ObjectLock olock(endpoint);
+
+ if (!endpoint->IsSyncing())
+ endpoint->SendMessage(message);
+ }
}
}
{
int count = 0;
- ASSERT(OwnsLock());
+ ASSERT(!OwnsLock());
- CloseLogFile();
- RotateLogFile();
+ for (;;) {
+ ObjectLock olock(this);
- std::vector<int> files;
- Utility::Glob(GetClusterDir() + "log/*", boost::bind(&ClusterComponent::LogGlobHandler, boost::ref(files), _1));
- std::sort(files.begin(), files.end());
+ CloseLogFile();
+ RotateLogFile();
- BOOST_FOREACH(int ts, files) {
- String path = GetClusterDir() + "log/" + Convert::ToString(ts);
+ std::vector<int> files;
+ Utility::Glob(GetClusterDir() + "log/*", boost::bind(&ClusterComponent::LogGlobHandler, boost::ref(files), _1));
+ std::sort(files.begin(), files.end());
- if (ts < endpoint->GetLocalLogPosition())
- continue;
+ if (files.size() > 1) {
+ OpenLogFile();
+ olock.Unlock();
+ }
+
+ BOOST_FOREACH(int ts, files) {
+ String path = GetClusterDir() + "log/" + Convert::ToString(ts);
+
+ if (ts < endpoint->GetLocalLogPosition())
+ continue;
- Log(LogInformation, "cluster", "Replaying log: " + path);
+ Log(LogInformation, "cluster", "Replaying log: " + path);
- std::fstream *fp = new std::fstream(path.CStr(), std::fstream::in);
- StdioStream::Ptr logStream = boost::make_shared<StdioStream>(fp, true);
- ZlibStream::Ptr lstream = boost::make_shared<ZlibStream>(logStream);
+ std::fstream *fp = new std::fstream(path.CStr(), std::fstream::in);
+ StdioStream::Ptr logStream = boost::make_shared<StdioStream>(fp, true);
+ ZlibStream::Ptr lstream = boost::make_shared<ZlibStream>(logStream);
- String message;
- while (true) {
- try {
- if (!NetString::ReadStringFromStream(lstream, &message))
+ String message;
+ while (true) {
+ try {
+ if (!NetString::ReadStringFromStream(lstream, &message))
+ break;
+ } catch (std::exception&) {
+ /* Log files may be incomplete or corrupted. This is perfectly OK. */
break;
- } catch (std::exception&) {
- /* Log files may be incomplete or corrupted. This is perfectly OK. */
- break;
- }
+ }
- Dictionary::Ptr pmessage = Value::Deserialize(message);
+ Dictionary::Ptr pmessage = Value::Deserialize(message);
- if (pmessage->Get("timestamp") < endpoint->GetLocalLogPosition())
- continue;
+ if (pmessage->Get("timestamp") < endpoint->GetLocalLogPosition())
+ continue;
- if (pmessage->Get("except") == endpoint->GetName())
- continue;
+ if (pmessage->Get("except") == endpoint->GetName())
+ continue;
- NetString::WriteStringToStream(stream, pmessage->Get("message"));
- count++;
+ NetString::WriteStringToStream(stream, pmessage->Get("message"));
+ count++;
+ }
+
+ lstream->Close();
}
- lstream->Close();
- }
+ Log(LogInformation, "cluster", "Replayed " + Convert::ToString(count) + " messages.");
- Log(LogInformation, "cluster", "Replayed " + Convert::ToString(count) + " messages.");
+ if (files.size() == 1) {
+ ObjectLock olock2(endpoint);
- OpenLogFile();
+ endpoint->SetSyncing(false);
+ break;
+ }
+ }
}
void ClusterComponent::ConfigGlobHandler(const Dictionary::Ptr& config, const String& file, bool basename)
return;
}
- endpoint->SetSeen(Utility::GetTime());
+ {
+ ObjectLock olock(endpoint);
+
+ Stream::Ptr oldClient = endpoint->GetClient();
+ if (oldClient)
+ oldClient->Close();
+
+ endpoint->SetSyncing(true);
+ endpoint->SetSeen(Utility::GetTime());
+ endpoint->SetClient(tlsStream);
+ }
Dictionary::Ptr config = boost::make_shared<Dictionary>();
Array::Ptr configFiles = endpoint->GetConfigFiles();
String json = Value(message).Serialize();
NetString::WriteStringToStream(tlsStream, json);
- {
- ObjectLock olock(this);
-
- Stream::Ptr oldClient = endpoint->GetClient();
- if (oldClient)
- oldClient->Close();
-
- endpoint->SetClient(tlsStream);
-
- ReplayLog(endpoint, tlsStream);
- }
+ ReplayLog(endpoint, tlsStream);
}
void ClusterComponent::ClusterTimerHandler(void)
boost::signals2::signal<void (const Endpoint::Ptr&)> Endpoint::OnConnected;
boost::signals2::signal<void (const Endpoint::Ptr&, const Dictionary::Ptr&)> Endpoint::OnMessageReceived;
+Endpoint::Endpoint(void)
+ : m_Syncing(false)
+{ }
+
/**
* Checks whether this endpoint is connected.
*
Stream::Ptr Endpoint::GetClient(void) const
{
- ObjectLock olock(this);
-
return m_Client;
}
void Endpoint::SetClient(const Stream::Ptr& client)
{
- {
- ObjectLock olock(this);
-
- m_Client = client;
- }
+ m_Client = client;
if (client) {
boost::thread thread(boost::bind(&Endpoint::MessageThreadProc, this, client));
*/
String Endpoint::GetHost(void) const
{
- ObjectLock olock(this);
-
return m_Host;
}
*/
String Endpoint::GetPort(void) const
{
- ObjectLock olock(this);
-
return m_Port;
}
m_RemoteLogPosition = ts;
}
+bool Endpoint::IsSyncing(void) const
+{
+ return m_Syncing;
+}
+
+void Endpoint::SetSyncing(bool syncing)
+{
+ m_Syncing = syncing;
+}
+
Dictionary::Ptr Endpoint::GetFeatures(void) const
{
return m_Features;