m_Identity = GetCertificateCN(cert);
Log(LogInformation, "cluster", "My identity: " + m_Identity);
+ Endpoint::Ptr self = Endpoint::GetByName(GetIdentity());
+
+ if (!self)
+ BOOST_THROW_EXCEPTION(std::invalid_argument("No configuration available for the local endpoint."));
+
m_SSLContext = MakeSSLContext(GetCertificateFile(), GetCertificateFile(), GetCAFile());
/* create the primary JSON-RPC listener */
{
ASSERT(OwnsLock());
- String path = GetClusterDir() + "current";
+ String path = GetClusterDir() + "log/current";
std::fstream *fp = new std::fstream(path.CStr(), std::fstream::out | std::ofstream::app);
if (ts == 0)
ts = Utility::GetTime();
- String oldpath = GetClusterDir() + "current";
- String newpath = GetClusterDir() + Convert::ToString(static_cast<int>(ts) + 1);
+ String oldpath = GetClusterDir() + "log/current";
+ String newpath = GetClusterDir() + "log/" + Convert::ToString(static_cast<int>(ts) + 1);
(void) rename(oldpath.CStr(), newpath.CStr());
}
RotateLogFile();
std::vector<int> files;
- Utility::Glob(GetClusterDir() + "*", boost::bind(&ClusterComponent::LogGlobHandler, boost::ref(files), _1));
+ Utility::Glob(GetClusterDir() + "log/*", boost::bind(&ClusterComponent::LogGlobHandler, boost::ref(files), _1));
std::sort(files.begin(), files.end());
BOOST_FOREACH(int ts, files) {
- String path = GetClusterDir() + Convert::ToString(ts);
+ String path = GetClusterDir() + "log/" + Convert::ToString(ts);
if (ts < endpoint->GetLocalLogPosition())
continue;
OpenLogFile();
}
+void ClusterComponent::ConfigGlobHandler(const Dictionary::Ptr& config, const String& file)
+{
+ Dictionary::Ptr elem = boost::make_shared<Dictionary>();
+
+ std::ifstream fp(file.CStr());
+ if (!fp)
+ return;
+
+ String content((std::istreambuf_iterator<char>(fp)), std::istreambuf_iterator<char>());
+ elem->Set("content", content);
+
+ config->Set(file, elem);
+}
+
/**
* Processes a new client connection.
*
return;
}
+ Dictionary::Ptr config = boost::make_shared<Dictionary>();
+ Array::Ptr configFiles = endpoint->GetConfigFiles();
+
+ if (configFiles) {
+ BOOST_FOREACH(const String& pattern, configFiles) {
+ Utility::Glob(pattern, boost::bind(&ClusterComponent::ConfigGlobHandler, boost::cref(config), _1));
+ }
+ }
+
+ Dictionary::Ptr params = boost::make_shared<Dictionary>();
+ params->Set("config_files", config);
+
+ Dictionary::Ptr message = boost::make_shared<Dictionary>();
+ message->Set("jsonrpc", "2.0");
+ message->Set("method", "cluster::Config");
+ message->Set("params", params);
+
+ String json = Value(message).Serialize();
+ NetString::WriteStringToStream(tlsStream, json);
+
{
ObjectLock olock(this);
ReplayLog(endpoint, tlsStream);
}
std::vector<int> files;
- Utility::Glob(GetClusterDir() + "*", boost::bind(&ClusterComponent::LogGlobHandler, boost::ref(files), _1));
+ Utility::Glob(GetClusterDir() + "log/*", boost::bind(&ClusterComponent::LogGlobHandler, boost::ref(files), _1));
std::sort(files.begin(), files.end());
BOOST_FOREACH(int ts, files) {
}
if (!need) {
- String path = GetClusterDir() + Convert::ToString(ts);
+ String path = GetClusterDir() + "log/" + Convert::ToString(ts);
Log(LogInformation, "cluster", "Removing old log file: " + path);
(void) unlink(path.CStr());
}
service->ClearAcknowledgement(sender->GetName());
} else if (message->Get("method") == "cluster::SetLogPosition") {
sender->SetLocalLogPosition(params->Get("log_position"));
+ } else if (message->Get("method") == "cluster::Config") {
+ Dictionary::Ptr files = params->Get("config_files");
+
+ if (!files)
+ return;
+
+ Endpoint::Ptr self = Endpoint::GetByName(GetIdentity());
+
+ Array::Ptr acceptConfig = self->GetAcceptConfig();
+
+ bool accept = false;
+
+ if (acceptConfig) {
+ BOOST_FOREACH(const String& pattern, acceptConfig) {
+ if (Utility::Match(pattern, sender->GetName())) {
+ accept = true;
+ break;
+ }
+ }
+ }
+
+ if (!accept) {
+ Log(LogWarning, "cluster", "Ignoring cluster::Config message from endpoint '" + sender->GetName() + "'.");
+ return;
+ }
+
+ String dir = GetClusterDir() + "config/" + SHA256(sender->GetName());
+ Log(LogInformation, "cluster", "Creating cluster config directory: " + dir);
+ if (mkdir(dir.CStr(), 0700) < 0 && errno != EEXIST) {
+ BOOST_THROW_EXCEPTION(posix_error()
+ << boost::errinfo_api_function("localtime")
+ << boost::errinfo_errno(errno));
+ }
+
+ /* TODO: update files, remove old files, figure out whether we need to restart */
}
}
return shared_ptr<X509>(cert, X509_free);
}
+String SHA256(const String& s)
+{
+ SHA256_CTX context;
+ unsigned char digest[SHA256_DIGEST_LENGTH];
+
+ if (!SHA256_Init(&context)) {
+ BOOST_THROW_EXCEPTION(openssl_error()
+ << boost::errinfo_api_function("SHA256_Init")
+ << errinfo_openssl_error(ERR_get_error()));
+ }
+
+ if (!SHA256_Update(&context, (unsigned char*)s.CStr(), s.GetLength())) {
+ BOOST_THROW_EXCEPTION(openssl_error()
+ << boost::errinfo_api_function("SHA256_Update")
+ << errinfo_openssl_error(ERR_get_error()));
+ }
+
+ if (!SHA256_Final(digest, &context)) {
+ BOOST_THROW_EXCEPTION(openssl_error()
+ << boost::errinfo_api_function("SHA256_Final")
+ << errinfo_openssl_error(ERR_get_error()));
+ }
+
+ int i;
+ char output[SHA256_DIGEST_LENGTH*2+1];
+ for (i = 0; i < 32; i++)
+ sprintf(output + 2 * i, "%02x", digest[i]);
+
+ return output;
+}
+
}