*/
void Application::RunEventLoop(void)
{
+ double nextProfile = 0;
+
while (!m_ShuttingDown) {
Object::ClearHeldObjects();
DynamicObject::BeginTx();
#ifdef _DEBUG
- stringstream msgbuf;
- msgbuf << "Active objects: " << Object::GetAliveObjects();
- Logger::Write(LogInformation, "base", msgbuf.str());
+ if (nextProfile < Utility::GetTime()) {
+ stringstream msgbuf;
+ msgbuf << "Active objects: " << Object::GetAliveObjects();
+ Logger::Write(LogInformation, "base", msgbuf.str());
+
+ Object::PrintMemoryProfile();
- Object::PrintMemoryProfile();
+ nextProfile = Utility::GetTime() + 15.0;
+ }
#endif /* _DEBUG */
}
}
double DynamicObject::m_CurrentTx = 0;
set<DynamicObject::Ptr> DynamicObject::m_ModifiedObjects;
-boost::signal<void (const DynamicObject::Ptr&, const String& name)> DynamicObject::OnAttributeChanged;
boost::signal<void (const DynamicObject::Ptr&)> DynamicObject::OnRegistered;
boost::signal<void (const DynamicObject::Ptr&)> DynamicObject::OnUnregistered;
boost::signal<void (const set<DynamicObject::Ptr>&)> DynamicObject::OnTransactionClosing;
if (!serializedObject->Contains("configTx"))
throw invalid_argument("Serialized object must contain a config snapshot.");
- /* apply state from the config item/remote update */
- ApplyUpdate(serializedObject, true);
-
- /* restore the object's persistent state */
- map<pair<String, String>, Dictionary::Ptr>::iterator it;
- it = m_PersistentUpdates.find(make_pair(GetType(), GetName()));
- if (it != m_PersistentUpdates.end()) {
- Logger::Write(LogDebug, "base", "Restoring persistent state "
- "for object " + GetType() + ":" + GetName());
- ApplyUpdate(it->second, true);
- m_PersistentUpdates.erase(it);
- }
+ /* apply config state from the config item/remote update;
+ * The DynamicObject::Create function takes care of restoring
+ * non-config state after the object has been fully constructed */
+ InternalApplyUpdate(serializedObject, Attribute_Config, true);
}
Dictionary::Ptr DynamicObject::BuildUpdate(double sinceTx, int attributeTypes) const
return update;
}
-void DynamicObject::ApplyUpdate(const Dictionary::Ptr& serializedUpdate, bool suppressEvents)
+void DynamicObject::ApplyUpdate(const Dictionary::Ptr& serializedUpdate, int allowedTypes)
+{
+ InternalApplyUpdate(serializedUpdate, allowedTypes, false);
+}
+
+void DynamicObject::InternalApplyUpdate(const Dictionary::Ptr& serializedUpdate, int allowedTypes, bool suppressEvents)
{
double configTx = 0;
- if (serializedUpdate->Contains("configTx")) {
+ if ((allowedTypes & Attribute_Config) != 0 && serializedUpdate->Contains("configTx")) {
configTx = serializedUpdate->Get("configTx");
if (configTx > m_ConfigTx)
Dictionary::Ptr attr = it->second;
- Value data = attr->Get("data");
int type = attr->Get("type");
+
+ if ((type & ~allowedTypes) != 0)
+ continue;
+
+ Value data = attr->Get("data");
double tx = attr->Get("tx");
if (type & Attribute_Config)
}
}
-void DynamicObject::SanitizeUpdate(const Dictionary::Ptr& serializedUpdate, int allowedTypes)
-{
- if ((allowedTypes & Attribute_Config) == 0)
- serializedUpdate->Remove("configTx");
-
- Dictionary::Ptr attrs = serializedUpdate->Get("attrs");
-
- Dictionary::Iterator prev, it;
- for (it = attrs->Begin(); it != attrs->End(); ) {
- if (!it->second.IsObjectType<Dictionary>())
- continue;
-
- Dictionary::Ptr attr = it->second;
-
- int type = attr->Get("type");
-
- if (type == 0 || type & ~allowedTypes) {
- prev = it;
- it++;
- attrs->Remove(prev);
- continue;
- }
-
- it++;
- }
-}
-
void DynamicObject::RegisterAttribute(const String& name, DynamicAttributeType type)
{
DynamicAttribute attr;
pair<DynamicObject::AttributeIterator, bool> tt;
tt = m_Attributes.insert(make_pair(name, attr));
+ Value oldValue;
+
if (!tt.second && tx >= tt.first->second.Tx) {
+ oldValue = tt.first->second.Data;
tt.first->second.Data = data;
tt.first->second.Tx = tx;
}
if (!suppressEvent) {
m_ModifiedObjects.insert(GetSelf());
- DynamicObject::OnAttributeChanged(GetSelf(), name);
+ OnAttributeChanged(name, oldValue);
}
}
DynamicObject::Ptr DynamicObject::Create(const String& type, const Dictionary::Ptr& properties)
{
- DynamicObject::ClassMap::iterator it;
- it = GetClasses().find(type);
+ DynamicObject::ClassMap::iterator ct;
+ ct = GetClasses().find(type);
+
+ DynamicObject::Ptr obj;
+ if (ct != GetClasses().end()) {
+ obj = ct->second(properties);
+ } else {
+ obj = boost::make_shared<DynamicObject>(properties);
- if (it != GetClasses().end())
- return it->second(properties);
- else
- return boost::make_shared<DynamicObject>(properties);
+ Logger::Write(LogCritical, "base", "Creating generic DynamicObject for type '" + type + "'");
+ }
+
+ /* restore the object's persistent non-config attributes */
+ map<pair<String, String>, Dictionary::Ptr>::iterator st;
+ st = m_PersistentUpdates.find(make_pair(obj->GetType(), obj->GetName()));
+ if (st != m_PersistentUpdates.end()) {
+ Logger::Write(LogDebug, "base", "Restoring persistent state "
+ "for object " + obj->GetType() + ":" + obj->GetName());
+ obj->ApplyUpdate(st->second, Attribute_All & ~Attribute_Config);
+
+ /* we're done with this update, remove it */
+ m_PersistentUpdates.erase(st);
+ }
+
+ return obj;
}
double DynamicObject::GetCurrentTx(void)
m_CurrentTx = 0;
}
+
+void DynamicObject::OnAttributeChanged(const String& name, const Value& oldValue)
+{ }
\ No newline at end of file
/* Attributes read from the config file are implicitly marked
* as config attributes. */
Attribute_Config = 8,
+
+ /* Combination of all attribute types */
+ Attribute_All = Attribute_Transient | Attribute_Local | Attribute_Replicated | Attribute_Config
};
struct DynamicAttribute
DynamicObject(const Dictionary::Ptr& serializedObject);
Dictionary::Ptr BuildUpdate(double sinceTx, int attributeTypes) const;
- void ApplyUpdate(const Dictionary::Ptr& serializedUpdate, bool suppressEvents = false);
- static void SanitizeUpdate(const Dictionary::Ptr& serializedUpdate, int allowedTypes);
+ void ApplyUpdate(const Dictionary::Ptr& serializedUpdate, int allowedTypes);
void RegisterAttribute(const String& name, DynamicAttributeType type);
AttributeConstIterator AttributeBegin(void) const;
AttributeConstIterator AttributeEnd(void) const;
- static boost::signal<void (const DynamicObject::Ptr&, const String& name)> OnAttributeChanged;
static boost::signal<void (const DynamicObject::Ptr&)> OnRegistered;
static boost::signal<void (const DynamicObject::Ptr&)> OnUnregistered;
static boost::signal<void (const set<DynamicObject::Ptr>&)> OnTransactionClosing;
static void BeginTx(void);
static void FinishTx(void);
+protected:
+ virtual void OnAttributeChanged(const String& name, const Value& oldValue);
+
private:
void InternalSetAttribute(const String& name, const Value& data, double tx, bool suppressEvent = false);
Value InternalGetAttribute(const String& name) const;
static double m_CurrentTx;
static set<DynamicObject::Ptr> m_ModifiedObjects;
+
+ void InternalApplyUpdate(const Dictionary::Ptr& serializedUpdate, int allowedTypes, bool suppressEvents);
};
class RegisterClassHelper
using std::list;
using std::set;
using std::multimap;
+using std::multiset;
using std::pair;
using std::deque;
using std::make_pair;
REGISTER_CLASS(Service);
boost::signal<void (const Service::Ptr&, const CheckResultMessage&)> Service::OnCheckResultReceived;
+boost::signal<void (const Service::Ptr&, const String&)> Service::OnCheckerChanged;
Service::Service(const Dictionary::Ptr& serializedObject)
: DynamicObject(serializedObject)
return result;
}
+
+void Service::OnAttributeChanged(const String& name, const Value& oldValue)
+{
+ if (name == "checker")
+ OnCheckerChanged(GetSelf(), oldValue);
+}
static Dictionary::Ptr ResolveDependencies(const Host::Ptr& host, const Dictionary::Ptr& dependencies);
static boost::signal<void (const Service::Ptr& service, const CheckResultMessage&)> OnCheckResultReceived;
+ static boost::signal<void (const Service::Ptr&, const String&)> OnCheckerChanged;
+
+protected:
+ virtual void OnAttributeChanged(const String& name, const Value& oldValue);
};
}
void CheckerComponent::Start(void)
{
m_Endpoint = boost::make_shared<VirtualEndpoint>();
- m_Endpoint->RegisterTopicHandler("checker::AssignService",
- boost::bind(&CheckerComponent::AssignServiceRequestHandler, this, _2, _3));
- m_Endpoint->RegisterTopicHandler("checker::ClearServices",
- boost::bind(&CheckerComponent::ClearServicesRequestHandler, this, _2, _3));
m_Endpoint->RegisterPublication("checker::ServiceStateChange");
EndpointManager::GetInstance()->RegisterEndpoint(m_Endpoint);
+ Service::OnCheckerChanged.connect(bind(&CheckerComponent::CheckerChangedHandler, this, _1));
+ DynamicObject::OnUnregistered.connect(bind(&CheckerComponent::ServiceRemovedHandler, this, _1));
+
m_CheckTimer = boost::make_shared<Timer>();
m_CheckTimer->SetInterval(1);
m_CheckTimer->OnTimerExpired.connect(boost::bind(&CheckerComponent::CheckTimerHandler, this));
long tasks = 0;
while (!m_Services.empty()) {
- Service::Ptr service = m_Services.top();
+ CheckerComponent::ServiceMultiSet::iterator it = m_Services.begin();
+ Service::Ptr service = *it;
if (service->GetNextCheck() > now)
break;
- m_Services.pop();
+ m_Services.erase(it);
Logger::Write(LogDebug, "checker", "Executing service check for '" + service->GetName() + "'");
/* remove the service from the list of pending services; if it's not in the
* list this was a manual (i.e. forced) check and we must not re-add the
* service to the services list because it's already there. */
- set<Service::Ptr>::iterator it;
+ CheckerComponent::ServiceMultiSet::iterator it;
it = m_PendingServices.find(service);
if (it != m_PendingServices.end()) {
m_PendingServices.erase(it);
- m_Services.push(service);
+ m_Services.insert(service);
}
Logger::Write(LogDebug, "checker", "Check finished for service '" + service->GetName() + "'");
Logger::Write(LogInformation, "checker", msgbuf.str());
}
-void CheckerComponent::AssignServiceRequestHandler(const Endpoint::Ptr& sender, const RequestMessage& request)
+void CheckerComponent::CheckerChangedHandler(const Service::Ptr& service)
{
- MessagePart params;
- if (!request.GetParams(¶ms))
- return;
+ String checker = service->GetChecker();
- String service;
- if (!params.Get("service", &service))
- return;
+ if (checker == EndpointManager::GetInstance()->GetIdentity() || checker == m_Endpoint->GetIdentity()) {
+ if (m_PendingServices.find(service) != m_PendingServices.end())
+ return;
- if (!Service::Exists(service)) {
- Logger::Write(LogWarning, "checker", "Ignoring delegation request for unknown service '" + service + "'.");
- return;
+ m_Services.insert(service);
+ } else {
+ m_Services.erase(service);
+ m_PendingServices.erase(service);
}
-
- Service::Ptr object = Service::GetByName(service);
-
- m_Services.push(object);
-
- Logger::Write(LogDebug, "checker", "Accepted delegation for service '" + service + "'");
}
-void CheckerComponent::ClearServicesRequestHandler(const Endpoint::Ptr& sender, const RequestMessage& request)
+void CheckerComponent::ServiceRemovedHandler(const DynamicObject::Ptr& object)
{
- Logger::Write(LogInformation, "checker", "Clearing service delegations.");
+ Service::Ptr service = dynamic_pointer_cast<Service>(object);
+
+ /* ignore it if the removed object is not a service */
+ if (!service)
+ return;
- /* clear the services lists */
- m_Services = ServiceQueue();
- m_PendingServices.clear();
+ m_Services.erase(service);
+ m_PendingServices.erase(service);
}
EXPORT_COMPONENT(checker, CheckerComponent);
typedef shared_ptr<CheckerComponent> Ptr;
typedef weak_ptr<CheckerComponent> WeakPtr;
- typedef priority_queue<Service::Ptr, vector<Service::Ptr>, ServiceNextCheckLessComparer> ServiceQueue;
+ typedef multiset<Service::Ptr, ServiceNextCheckLessComparer> ServiceMultiSet;
virtual void Start(void);
virtual void Stop(void);
private:
VirtualEndpoint::Ptr m_Endpoint;
- ServiceQueue m_Services;
- set<Service::Ptr> m_PendingServices;
+ ServiceMultiSet m_Services;
+ ServiceMultiSet m_PendingServices;
Timer::Ptr m_CheckTimer;
void AdjustCheckTimer(void);
- void AssignServiceRequestHandler(const Endpoint::Ptr& sender, const RequestMessage& request);
- void ClearServicesRequestHandler(const Endpoint::Ptr& sender, const RequestMessage& request);
+ void CheckerChangedHandler(const Service::Ptr& service);
+ void ServiceRemovedHandler(const DynamicObject::Ptr& object);
+
+ //void AssignServiceRequestHandler(const Endpoint::Ptr& sender, const RequestMessage& request);
+ //void ClearServicesRequestHandler(const Endpoint::Ptr& sender, const RequestMessage& request);
};
}
if (object->GetSource().IsEmpty())
object->SetSource(sender->GetIdentity());
- object->ApplyUpdate(update, true);
+ // TODO: disallow config updates depending on endpoint config
+
+ object->ApplyUpdate(update, Attribute_All);
}
}
void DelegationComponent::Start(void)
{
- DynamicObject::OnRegistered.connect(boost::bind(&DelegationComponent::ServiceCommittedHandler, this, _1));
- DynamicObject::OnUnregistered.connect(boost::bind(&DelegationComponent::ServiceRemovedHandler, this, _1));
-
m_DelegationTimer = boost::make_shared<Timer>();
m_DelegationTimer->SetInterval(30);
m_DelegationTimer->OnTimerExpired.connect(boost::bind(&DelegationComponent::DelegationTimerHandler, this));
m_DelegationTimer->Start();
m_DelegationTimer->Reschedule(0);
-
- m_Endpoint = boost::make_shared<VirtualEndpoint>();
- m_Endpoint->RegisterPublication("checker::AssignService");
- m_Endpoint->RegisterPublication("checker::ClearServices");
- m_Endpoint->RegisterPublication("delegation::ServiceStatus");
- EndpointManager::GetInstance()->RegisterEndpoint(m_Endpoint);
-
- EndpointManager::GetInstance()->OnNewEndpoint.connect(bind(&DelegationComponent::NewEndpointHandler, this, _2));
-}
-
-void DelegationComponent::Stop(void)
-{
- EndpointManager::Ptr mgr = EndpointManager::GetInstance();
-
- if (mgr)
- mgr->UnregisterEndpoint(m_Endpoint);
-}
-
-void DelegationComponent::ServiceCommittedHandler(const DynamicObject::Ptr& object)
-{
- Service::Ptr service = dynamic_pointer_cast<Service>(object);
-
- if (!service)
- return;
-
- String checker = service->GetChecker();
-
- if (!checker.IsEmpty()) {
- /* object was updated, clear its checker to make sure it's re-delegated by the delegation timer */
- service->SetChecker("");
-
- /* TODO: figure out a better way to clear individual services */
- Endpoint::Ptr endpoint = EndpointManager::GetInstance()->GetEndpointByIdentity(checker);
-
- if (endpoint)
- ClearServices(endpoint);
- }
-}
-
-void DelegationComponent::ServiceRemovedHandler(const DynamicObject::Ptr& object)
-{
- Service::Ptr service = dynamic_pointer_cast<Service>(object);
-
- if (!service)
- return;
-
- String checker = service->GetChecker();
-
- if (!checker.IsEmpty()) {
- /* TODO: figure out a better way to clear individual services */
- Endpoint::Ptr endpoint = EndpointManager::GetInstance()->GetEndpointByIdentity(checker);
-
- if (endpoint)
- ClearServices(endpoint);
- }
-}
-
-void DelegationComponent::AssignService(const Endpoint::Ptr& checker, const Service::Ptr& service)
-{
- RequestMessage request;
- request.SetMethod("checker::AssignService");
-
- MessagePart params;
- params.Set("service", service->GetName());
- request.SetParams(params);
-
- Logger::Write(LogDebug, "delegation", "Trying to delegate service '" + service->GetName() + "'");
-
- EndpointManager::GetInstance()->SendUnicastMessage(m_Endpoint, checker, request);
-}
-
-void DelegationComponent::ClearServices(const Endpoint::Ptr& checker)
-{
- stringstream msgbuf;
- msgbuf << "Clearing assigned services for endpoint '" << checker->GetIdentity() << "'";
- Logger::Write(LogInformation, "delegation", msgbuf.str());
-
- RequestMessage request;
- request.SetMethod("checker::ClearServices");
-
- MessagePart params;
- request.SetParams(params);
-
- EndpointManager::GetInstance()->SendUnicastMessage(m_Endpoint, checker, request);
}
bool DelegationComponent::IsEndpointChecker(const Endpoint::Ptr& endpoint)
{
- return (endpoint->HasSubscription("checker::AssignService"));
+ return (endpoint->HasPublication("checker::ServiceStateChange"));
}
vector<Endpoint::Ptr> DelegationComponent::GetCheckerCandidates(const Service::Ptr& service) const
return candidates;
}
-void DelegationComponent::NewEndpointHandler(const Endpoint::Ptr& endpoint)
-{
- endpoint->OnSessionEstablished.connect(bind(&DelegationComponent::SessionEstablishedHandler, this, _1));
-}
-void DelegationComponent::SessionEstablishedHandler(const Endpoint::Ptr& endpoint)
-{
- /* ignore this endpoint if it's not a checker */
- if (!IsEndpointChecker(endpoint))
- return;
-
- /* locally clear checker for all services that previously belonged to this endpoint */
- DynamicObject::Ptr object;
- BOOST_FOREACH(tie(tuples::ignore, object), DynamicObject::GetObjects("Service")) {
- Service::Ptr service = dynamic_pointer_cast<Service>(object);
-
- if (!service)
- continue;
-
- if (service->GetChecker() == endpoint->GetIdentity())
- service->SetChecker("");
- }
-
- /* remotely clear services for this endpoint */
- ClearServices(endpoint);
-}
-
void DelegationComponent::DelegationTimerHandler(void)
{
map<Endpoint::Ptr, int> histogram;
Logger::Write(LogInformation, "delegation", msgbuf.str());
}
- if (delegated > 0) {
- if (need_clear) {
- Endpoint::Ptr endpoint;
- BOOST_FOREACH(tie(endpoint, tuples::ignore), histogram) {
- ClearServices(endpoint);
- }
- }
-
- BOOST_FOREACH(const Service::Ptr& service, services) {
- String checker = service->GetChecker();
- Endpoint::Ptr endpoint = EndpointManager::GetInstance()->GetEndpointByIdentity(checker);
-
- if (!endpoint)
- continue;
-
- AssignService(endpoint, service);
- }
- }
-
stringstream msgbuf;
msgbuf << "Updated delegations for " << delegated << " services";
Logger::Write(LogInformation, "delegation", msgbuf.str());
{
public:
virtual void Start(void);
- virtual void Stop(void);
private:
- VirtualEndpoint::Ptr m_Endpoint;
Timer::Ptr m_DelegationTimer;
- void NewEndpointHandler(const Endpoint::Ptr& endpoint);
- void SessionEstablishedHandler(const Endpoint::Ptr& endpoint);
-
- void ServiceCommittedHandler(const DynamicObject::Ptr& object);
- void ServiceRemovedHandler(const DynamicObject::Ptr& object);
void DelegationTimerHandler(void);
vector<Endpoint::Ptr> GetCheckerCandidates(const Service::Ptr& service) const;
- void AssignService(const Endpoint::Ptr& checker, const Service::Ptr& service);
- void ClearServices(const Endpoint::Ptr& checker);
-
static bool IsEndpointChecker(const Endpoint::Ptr& endpoint);
-
- void CheckResultRequestHandler(const Endpoint::Ptr& sender, const RequestMessage& request);
};
}
if (!dobj)
dobj = DynamicObject::Create(GetType(), update);
else
- dobj->ApplyUpdate(update);
+ dobj->ApplyUpdate(update, Attribute_Config);
m_DynamicObject = dobj;