void ConfigRpcComponent::Start(void)
{
+ m_Syncing = false;
+
EndpointManager::Ptr endpointManager = EndpointManager::GetInstance();
m_Endpoint = boost::make_shared<VirtualEndpoint>();
- long configSource;
- if (GetConfig()->GetProperty("configSource", &configSource) && configSource != 0) {
- m_Endpoint->RegisterTopicHandler("config::FetchObjects",
- boost::bind(&ConfigRpcComponent::FetchObjectsHandler, this, _2));
+ m_Endpoint->RegisterTopicHandler("config::FetchObjects",
+ boost::bind(&ConfigRpcComponent::FetchObjectsHandler, this, _2));
- ConfigObject::GetAllObjects()->OnObjectAdded.connect(boost::bind(&ConfigRpcComponent::LocalObjectCommittedHandler, this, _2));
- ConfigObject::GetAllObjects()->OnObjectCommitted.connect(boost::bind(&ConfigRpcComponent::LocalObjectCommittedHandler, this, _2));
- ConfigObject::GetAllObjects()->OnObjectRemoved.connect(boost::bind(&ConfigRpcComponent::LocalObjectRemovedHandler, this, _2));
+ ConfigObject::GetAllObjects()->OnObjectAdded.connect(boost::bind(&ConfigRpcComponent::LocalObjectCommittedHandler, this, _2));
+ ConfigObject::GetAllObjects()->OnObjectCommitted.connect(boost::bind(&ConfigRpcComponent::LocalObjectCommittedHandler, this, _2));
+ ConfigObject::GetAllObjects()->OnObjectRemoved.connect(boost::bind(&ConfigRpcComponent::LocalObjectRemovedHandler, this, _2));
- m_Endpoint->RegisterPublication("config::ObjectCommitted");
- m_Endpoint->RegisterPublication("config::ObjectRemoved");
- }
+ m_Endpoint->RegisterPublication("config::ObjectCommitted");
+ m_Endpoint->RegisterPublication("config::ObjectRemoved");
endpointManager->OnNewEndpoint.connect(boost::bind(&ConfigRpcComponent::NewEndpointHandler, this, _2));
m_Endpoint->RegisterPublication("config::FetchObjects");
m_Endpoint->RegisterTopicHandler("config::ObjectCommitted",
- boost::bind(&ConfigRpcComponent::RemoteObjectCommittedHandler, this, _3));
+ boost::bind(&ConfigRpcComponent::RemoteObjectCommittedHandler, this, _2, _3));
m_Endpoint->RegisterTopicHandler("config::ObjectRemoved",
boost::bind(&ConfigRpcComponent::RemoteObjectRemovedHandler, this, _3));
void ConfigRpcComponent::LocalObjectCommittedHandler(const ConfigObject::Ptr& object)
{
+ /* don't send messages when we're currently processing a remote update */
+ if (m_Syncing)
+ return;
+
if (!ShouldReplicateObject(object))
return;
void ConfigRpcComponent::LocalObjectRemovedHandler(const ConfigObject::Ptr& object)
{
+ /* don't send messages when we're currently processing a remote update */
+ if (m_Syncing)
+ return;
+
if (!ShouldReplicateObject(object))
return;
MakeObjectMessage(object, "config::ObjectRemoved", false));
}
-void ConfigRpcComponent::RemoteObjectCommittedHandler(const RequestMessage& request)
+void ConfigRpcComponent::RemoteObjectCommittedHandler(const Endpoint::Ptr& sender, const RequestMessage& request)
{
MessagePart params;
if (!request.GetParams(¶ms))
ConfigObject::Ptr object = ConfigObject::GetObject(type, name);
- if (!object)
+ if (!object) {
object = boost::make_shared<ConfigObject>(properties.GetDictionary());
- else
+
+ if (object->GetSource() == EndpointManager::GetInstance()->GetIdentity()) {
+ /* the peer sent us an object that was originally created by us -
+ * however if was deleted locally so we have to tell the peer to destroy
+ * its copy of the object. */
+ EndpointManager::GetInstance()->SendMulticastMessage(m_Endpoint,
+ MakeObjectMessage(object, "config::ObjectRemoved", false));
+
+ return;
+ }
+ } else {
+ /* TODO: compare transaction timestamps and reject the update if our local object is newer */
+
object->SetProperties(properties.GetDictionary());
+ }
if (object->IsLocal())
throw invalid_argument("Replicated remote object is marked as local.");
- object->Commit();
+ if (object->GetSource().empty())
+ object->SetSource(sender->GetIdentity());
+
+ try {
+ /* TODO: only ignore updates for _this_ object rather than all objects
+ * this might be relevant if the commit handler for this object
+ * creates other objects. */
+ m_Syncing = true;
+ object->Commit();
+ m_Syncing = false;
+ } catch (const std::exception& ex) {
+ m_Syncing = false;
+ throw;
+ }
}
void ConfigRpcComponent::RemoteObjectRemovedHandler(const RequestMessage& request)
if (!object)
return;
- if (!object->IsLocal())
- object->Unregister();
+ if (!object->IsLocal()) {
+ try {
+ m_Syncing = true;
+ object->Unregister();
+ m_Syncing = false;
+ } catch (const std::exception& ex) {
+ m_Syncing = false;
+ throw;
+ }
+ }
}
EXPORT_COMPONENT(configrpc, ConfigRpcComponent);
{
Service service(object);
- /* object was updated, clear its checker to make sure it's re-delegated by the delegation timer */
- service.SetChecker("");
+ string checker = service.GetChecker();
+
+ if (!checker.empty()) {
+ /* object was updated, clear its checker to make sure it's re-delegated by the delegation timer */
+ service.SetChecker("");
+
+ /* TODO: figure out a better way to clear individual services */
+ Endpoint::Ptr endpoint = EndpointManager::GetInstance()->GetEndpointByIdentity(checker);
+
+ if (endpoint)
+ ClearServices(endpoint);
+ }
}
void DelegationComponent::AssignService(const Endpoint::Ptr& checker, const Service& service)
void DelegationComponent::ClearServices(const Endpoint::Ptr& checker)
{
+ stringstream msgbuf;
+ msgbuf << "Clearing assigned services for endpoint '" << checker->GetIdentity() << "'";
+ Application::Log(LogInformation, "delegation", msgbuf.str());
+
RequestMessage request;
request.SetMethod("checker::ClearServices");
if (!IsEndpointChecker(endpoint))
return;
- stringstream msgbuf;
- msgbuf << "Clearing assigned services for endpoint '" << endpoint->GetIdentity() << "'";
- Application::Log(LogInformation, "delegation", msgbuf.str());
-
/* locally clear checker for all services that previously belonged to this endpoint */
ConfigObject::Set::Iterator it;
for (it = m_AllServices->Begin(); it != m_AllServices->End(); it++) {