{
DynamicObject::Start();
- OpenLogFile();
+ {
+ ObjectLock olock(this);
+ OpenLogFile();
+ }
/* set up SSL context */
shared_ptr<X509> cert = GetX509Certificate(GetCertificateFile());
*/
void ClusterComponent::Stop(void)
{
+ ObjectLock olock(this);
CloseLogFile();
}
if (persistent) {
Dictionary::Ptr pmessage = boost::make_shared<Dictionary>();
- pmessage->Set("timestamp", Utility::GetTime());
+ double ts = Utility::GetTime();
+ pmessage->Set("timestamp", ts);
pmessage->Set("message", message);
ObjectLock olock(this);
String json = Value(pmessage).Serialize();
NetString::WriteStringToStream(m_LogFile, json);
m_LogMessageCount++;
+ m_LogMessageTimestamp = ts;
if (m_LogMessageCount > 250000) {
CloseLogFile();
void ClusterComponent::OpenLogFile(void)
{
- std::ostringstream msgbuf;
- msgbuf << GetClusterDir() << static_cast<long>(Utility::GetTime());
- String path = msgbuf.str();
+ ASSERT(OwnsLock());
+
+ String path = GetClusterDir() + "current";
std::fstream *fp = new std::fstream(path.CStr(), std::fstream::out | std::ofstream::app);
m_LogFile = boost::make_shared<StdioStream>(fp, true);
m_LogMessageCount = 0;
+ m_LogMessageTimestamp = 0;
}
void ClusterComponent::CloseLogFile(void)
{
+ ASSERT(OwnsLock());
+
m_LogFile->Close();
m_LogFile.reset();
+
+ if (m_LogMessageTimestamp != 0) {
+ String oldpath = GetClusterDir() + "current";
+ String newpath = GetClusterDir() + Convert::ToString(static_cast<int>(m_LogMessageTimestamp) + 1);
+ (void) rename(oldpath.CStr(), newpath.CStr());
+ }
}
void ClusterComponent::LogGlobHandler(std::vector<int>& files, const String& file)
msgbuf << GetClusterDir() << ts;
String path = msgbuf.str();
+ if (ts < endpoint->GetLocalLogPosition())
+ continue;
+
Log(LogInformation, "cluster", "Replaying log: " + path);
std::fstream *fp = new std::fstream(path.CStr(), std::fstream::in);