]> granicus.if.org Git - icinga2/commitdiff
Fine-grained locks (WIP, Part 2).
authorGunnar Beutner <gunnar.beutner@netways.de>
Mon, 18 Feb 2013 13:40:24 +0000 (14:40 +0100)
committerGunnar Beutner <gunnar.beutner@netways.de>
Mon, 18 Feb 2013 13:40:24 +0000 (14:40 +0100)
41 files changed:
components/checker/checkercomponent.cpp
components/checker/checkercomponent.h
components/compat/compatcomponent.cpp
components/compat/compatcomponent.h
components/delegation/delegationcomponent.cpp
components/delegation/delegationcomponent.h
components/demo/democomponent.cpp
components/notification/notificationcomponent.cpp
icinga-app/icinga.cpp
lib/base/application.cpp
lib/base/application.h
lib/base/asynctask.h
lib/base/connection.h
lib/base/dictionary.cpp
lib/base/dynamicobject.cpp
lib/base/dynamicobject.h
lib/base/dynamictype.cpp
lib/base/dynamictype.h
lib/base/eventqueue.cpp
lib/base/eventqueue.h
lib/base/logger.cpp
lib/base/object.cpp
lib/base/object.h
lib/base/stdiostream.h
lib/base/timer.cpp
lib/base/timer.h
lib/base/utility.cpp
lib/base/utility.h
lib/config/configitem.cpp
lib/config/configitembuilder.cpp
lib/icinga/externalcommandprocessor.cpp
lib/icinga/host.cpp
lib/icinga/pluginchecktask.cpp
lib/icinga/pluginnotificationtask.cpp
lib/icinga/service-check.cpp
lib/icinga/service-comment.cpp
lib/icinga/service-downtime.cpp
lib/icinga/service.cpp
lib/python/pythonlanguage.cpp
lib/remoting/endpoint.cpp
lib/remoting/endpointmanager.cpp

index f084368909bd93e9e3db70284b51c679b0bb3735..dbc8617dcc8054b9ca7a7b76bd00c7a66cbc8620 100644 (file)
@@ -35,10 +35,8 @@ void CheckerComponent::Start(void)
        Service::OnNextCheckChanged.connect(bind(&CheckerComponent::NextCheckChangedHandler, this, _1));
        DynamicObject::OnUnregistered.connect(bind(&CheckerComponent::ObjectRemovedHandler, this, _1));
 
-       m_CheckTimer = boost::make_shared<Timer>();
-       m_CheckTimer->SetInterval(0.1);
-       m_CheckTimer->OnTimerExpired.connect(boost::bind(&CheckerComponent::CheckTimerHandler, this));
-       m_CheckTimer->Start();
+       boost::thread thread(boost::bind(&CheckerComponent::CheckThreadProc, this));
+       thread.detach();
 
        m_ResultTimer = boost::make_shared<Timer>();
        m_ResultTimer->SetInterval(5);
@@ -51,15 +49,8 @@ void CheckerComponent::Stop(void)
        m_Endpoint->Unregister();
 }
 
-void CheckerComponent::CheckTimerHandler(void)
+void CheckerComponent::CheckThreadProc(void)
 {
-       recursive_mutex::scoped_lock lock(Application::GetMutex());
-
-       double now = Utility::GetTime();
-       long tasks = 0;
-
-       int missedServices = 0, missedChecks = 0;
-
        for (;;) {
                Service::Ptr service;
 
@@ -69,8 +60,8 @@ void CheckerComponent::CheckTimerHandler(void)
                        typedef nth_index<ServiceSet, 1>::type CheckTimeView;
                        CheckTimeView& idx = boost::get<1>(m_IdleServices);
 
-                       if (idx.begin() == idx.end())
-                               break;
+                       while (idx.begin() == idx.end())
+                               m_CV.wait(lock);
 
                        CheckTimeView::iterator it = idx.begin();
                        service = it->lock();
@@ -79,18 +70,32 @@ void CheckerComponent::CheckTimerHandler(void)
                                idx.erase(it);
                                continue;
                        }
+               }
 
-                       {
-                               ObjectLock olock(service);
+               double wait;
 
-                               if (service->GetNextCheck() > now)
-                                       break;
-                       }
+               {
+                       ObjectLock olock(service);
+                       wait = service->GetNextCheck() - Utility::GetTime();
+               }
 
-                       idx.erase(it);
+               if (wait > 0) {
+                       /* Make sure the service we just examined can be destroyed while we're waiting. */
+                       service.reset();
+
+                       /* Wait for the next check. */
+                       boost::mutex::scoped_lock lock(m_Mutex);
+                       m_CV.timed_wait(lock, boost::posix_time::milliseconds(wait * 1000));
+
+                       continue;
+               }
+
+               {
+                       boost::mutex::scoped_lock lock(m_Mutex);
+                       m_IdleServices.erase(service);
                }
 
-               ObjectLock olock(service);
+               ObjectLock olock(service); /* also required for the key extractor */
 
                /* reschedule the service if checks are currently disabled
                 * for it and this is not a forced check */
@@ -115,51 +120,28 @@ void CheckerComponent::CheckTimerHandler(void)
 
                service->SetForceNextCheck(false);
 
-               Dictionary::Ptr cr = service->GetLastCheckResult();
-
-               if (cr) {
-                       double lastCheck = cr->Get("execution_end");
-                       int missed = (Utility::GetTime() - lastCheck) / service->GetCheckInterval() - 1;
-
-                       if (missed > 0 && !service->GetFirstCheck()) {
-                               missedChecks += missed;
-                               missedServices++;
-                       }
-               }
-
                service->SetFirstCheck(false);
 
                Logger::Write(LogDebug, "checker", "Executing service check for '" + service->GetName() + "'");
 
-               m_IdleServices.erase(service);
-               m_PendingServices.insert(service);
+               {
+                       boost::mutex::scoped_lock lock(m_Mutex);
+                       m_IdleServices.erase(service);
+                       m_PendingServices.insert(service);
+               }
 
                try {
                        service->BeginExecuteCheck(boost::bind(&CheckerComponent::CheckCompletedHandler, this, service));
                } catch (const exception& ex) {
                        Logger::Write(LogCritical, "checker", "Exception occured while checking service '" + service->GetName() + "': " + diagnostic_information(ex));
                }
-
-               tasks++;
-       }
-
-       if (missedServices > 0) {
-               stringstream msgbuf;
-               msgbuf << "Missed " << missedChecks << " checks for " << missedServices << " services";;
-               Logger::Write(LogWarning, "checker", msgbuf.str());
-       }
-
-       if (tasks > 0) {
-               stringstream msgbuf;
-               msgbuf << "CheckTimerHandler: created " << tasks << " task(s)";
-               Logger::Write(LogDebug, "checker", msgbuf.str());
        }
-
-       RescheduleCheckTimer();
 }
 
 void CheckerComponent::CheckCompletedHandler(const Service::Ptr& service)
 {
+       ObjectLock olock(service); /* required for the key extractor */
+
        {
                boost::mutex::scoped_lock lock(m_Mutex);
 
@@ -171,15 +153,11 @@ void CheckerComponent::CheckCompletedHandler(const Service::Ptr& service)
                if (it != m_PendingServices.end()) {
                        m_PendingServices.erase(it);
                        m_IdleServices.insert(service);
+                       m_CV.notify_all();
                }
        }
 
-       RescheduleCheckTimer();
-
-       {
-               ObjectLock olock(service);
-               Logger::Write(LogDebug, "checker", "Check finished for service '" + service->GetName() + "'");
-       }
+       Logger::Write(LogDebug, "checker", "Check finished for service '" + service->GetName() + "'");
 }
 
 void CheckerComponent::ResultTimerHandler(void)
@@ -199,12 +177,8 @@ void CheckerComponent::ResultTimerHandler(void)
 
 void CheckerComponent::CheckerChangedHandler(const Service::Ptr& service)
 {
-       String checker;
-
-       {
-               ObjectLock olock(service);
-               checker = service->GetChecker();
-       }
+       ObjectLock olock(service); /* also required for the key extractor */
+       String checker = service->GetChecker();
 
        if (checker == EndpointManager::GetInstance()->GetIdentity() || checker == m_Endpoint->GetName()) {
                boost::mutex::scoped_lock lock(m_Mutex);
@@ -213,17 +187,20 @@ void CheckerComponent::CheckerChangedHandler(const Service::Ptr& service)
                        return;
 
                m_IdleServices.insert(service);
+               m_CV.notify_all();
        } else {
                boost::mutex::scoped_lock lock(m_Mutex);
 
                m_IdleServices.erase(service);
                m_PendingServices.erase(service);
+               m_CV.notify_all();
        }
 }
 
 void CheckerComponent::NextCheckChangedHandler(const Service::Ptr& service)
 {
        {
+               ObjectLock olock(service); /* required for the key extractor */
                boost::mutex::scoped_lock lock(m_Mutex);
 
                /* remove and re-insert the service from the set in order to force an index update */
@@ -234,11 +211,9 @@ void CheckerComponent::NextCheckChangedHandler(const Service::Ptr& service)
                if (it == idx.end())
                        return;
 
-               idx.erase(it);
-               idx.insert(service);
+               idx.replace(it, service);
+               m_CV.notify_all();
        }
-
-       RescheduleCheckTimer();
 }
 
 void CheckerComponent::ObjectRemovedHandler(const DynamicObject::Ptr& object)
@@ -254,35 +229,6 @@ void CheckerComponent::ObjectRemovedHandler(const DynamicObject::Ptr& object)
 
                m_IdleServices.erase(service);
                m_PendingServices.erase(service);
+               m_CV.notify_all();
        }
 }
-
-void CheckerComponent::RescheduleCheckTimer(void)
-{
-       Service::Ptr service;
-
-       {
-               boost::mutex::scoped_lock lock(m_Mutex);
-
-               if (m_IdleServices.empty())
-                       return;
-
-               typedef nth_index<ServiceSet, 1>::type CheckTimeView;
-               CheckTimeView& idx = boost::get<1>(m_IdleServices);
-
-               do {
-                       CheckTimeView::iterator it = idx.begin();
-
-                       if (it == idx.end())
-                               return;
-
-                       service = it->lock();
-
-                       if (!service)
-                               idx.erase(it);
-               } while (!service);
-       }
-
-       ObjectLock olock(service);
-       m_CheckTimer->Reschedule(service->GetNextCheck());
-}
index 4e4134cded1af86ff8eef7c1baa3e36ae081787d..01c5a41918c53e24d3ea6d5b97728c6c9fec02cd 100644 (file)
@@ -30,6 +30,9 @@ struct ServiceNextCheckExtractor
 {
        typedef double result_type;
 
+       /**
+        * @threadsafety Caller must hold the mutex for the service.
+        */
        double operator()(const Service::WeakPtr& wservice)
        {
                Service::Ptr service = wservice.lock();
@@ -37,10 +40,7 @@ struct ServiceNextCheckExtractor
                if (!service)
                        return 0;
 
-               {
-                       ObjectLock olock(service);
-                       return service->GetNextCheck();
-               }
+               return service->GetNextCheck();
        }
 };
 
@@ -68,15 +68,14 @@ private:
        Endpoint::Ptr m_Endpoint;
 
        boost::mutex m_Mutex;
+       boost::condition_variable m_CV;
 
        ServiceSet m_IdleServices;
        ServiceSet m_PendingServices;
 
-       Timer::Ptr m_CheckTimer;
-
        Timer::Ptr m_ResultTimer;
 
-       void CheckTimerHandler(void);
+       void CheckThreadProc(void);
        void ResultTimerHandler(void);
 
        void CheckCompletedHandler(const Service::Ptr& service);
index 360a2af89d5125cd30020f31909c870f9596dacd..7ee2cdfd68fead4807efd29cb466689cf7353840 100644 (file)
@@ -154,11 +154,7 @@ void CompatComponent::CommandPipeThread(const String& commandPath)
 
                        String command = line;
 
-                       {
-                               recursive_mutex::scoped_lock lock(Application::GetMutex());
-
-                               ProcessCommand(command);
-                       }
+                       ProcessCommand(command);
                }
 
                fclose(fp);
@@ -181,6 +177,8 @@ void CompatComponent::ProcessCommand(const String& command)
 
 void CompatComponent::DumpComments(ofstream& fp, const Service::Ptr& owner, CompatObjectType type)
 {
+       ObjectLock olock(owner);
+
        Service::Ptr service;
        Host::Ptr host;
        Dictionary::Ptr comments = owner->GetComments();
@@ -216,6 +214,8 @@ void CompatComponent::DumpComments(ofstream& fp, const Service::Ptr& owner, Comp
 
 void CompatComponent::DumpDowntimes(ofstream& fp, const Service::Ptr& owner, CompatObjectType type)
 {
+       ObjectLock olock(owner);
+
        Dictionary::Ptr downtimes = owner->GetDowntimes();
 
        if (!downtimes)
@@ -257,6 +257,8 @@ void CompatComponent::DumpDowntimes(ofstream& fp, const Service::Ptr& owner, Com
 
 void CompatComponent::DumpHostStatus(ofstream& fp, const Host::Ptr& host)
 {
+       ObjectLock olock(host);
+
        int state;
        if (!host->IsReachable())
                state = 2; /* unreachable */
@@ -285,6 +287,8 @@ void CompatComponent::DumpHostStatus(ofstream& fp, const Host::Ptr& host)
 
 void CompatComponent::DumpHostObject(ofstream& fp, const Host::Ptr& host)
 {
+       ObjectLock olock(host);
+
        fp << "define host {" << "\n"
           << "\t" << "host_name" << "\t" << host->GetName() << "\n"
           << "\t" << "display_name" << "\t" << host->GetDisplayName() << "\n"
@@ -308,6 +312,8 @@ void CompatComponent::DumpHostObject(ofstream& fp, const Host::Ptr& host)
 
 void CompatComponent::DumpServiceStatusAttrs(ofstream& fp, const Service::Ptr& service, CompatObjectType type)
 {
+       ObjectLock olock(service);
+
        String output;
        String perfdata;
        double schedule_start = -1, schedule_end = -1;
@@ -370,6 +376,8 @@ void CompatComponent::DumpServiceStatusAttrs(ofstream& fp, const Service::Ptr& s
 
 void CompatComponent::DumpServiceStatus(ofstream& fp, const Service::Ptr& service)
 {
+       ObjectLock olock(service);
+
        fp << "servicestatus {" << "\n"
           << "\t" << "host_name=" << service->GetHost()->GetName() << "\n"
           << "\t" << "service_description=" << service->GetShortName() << "\n";
@@ -385,6 +393,8 @@ void CompatComponent::DumpServiceStatus(ofstream& fp, const Service::Ptr& servic
 
 void CompatComponent::DumpServiceObject(ofstream& fp, const Service::Ptr& service)
 {
+       ObjectLock olock(service);
+
        fp << "define service {" << "\n"
           << "\t" << "host_name" << "\t" << service->GetHost()->GetName() << "\n"
           << "\t" << "service_description" << "\t" << service->GetShortName() << "\n"
@@ -399,6 +409,8 @@ void CompatComponent::DumpServiceObject(ofstream& fp, const Service::Ptr& servic
           << "\n";
 
        BOOST_FOREACH(const Service::Ptr& parent, service->GetParentServices()) {
+               ObjectLock plock(parent);
+
                fp << "define servicedependency {" << "\n"
                   << "\t" << "dependent_host_name" << "\t" << service->GetHost()->GetName() << "\n"
                   << "\t" << "dependent_service_description" << "\t" << service->GetShortName() << "\n"
@@ -416,8 +428,6 @@ void CompatComponent::DumpServiceObject(ofstream& fp, const Service::Ptr& servic
  */
 void CompatComponent::StatusTimerHandler(void)
 {
-       recursive_mutex::scoped_lock lock(Application::GetMutex());
-
        Logger::Write(LogInformation, "compat", "Writing compat status information");
 
        String statuspath = GetStatusPath();
@@ -468,55 +478,85 @@ void CompatComponent::StatusTimerHandler(void)
                 << "# This file is auto-generated. Do not modify this file." << "\n"
                 << "\n";
 
-       DynamicObject::Ptr object;
-       BOOST_FOREACH(tie(tuples::ignore, object), DynamicType::GetByName("Host")->GetObjects()) {
-               const Host::Ptr& host = static_pointer_cast<Host>(object);
+       {
+               DynamicType::Ptr dt = DynamicType::GetByName("Host");
+               ObjectLock dlock(dt);
+
+               DynamicObject::Ptr object;
+               BOOST_FOREACH(tie(tuples::ignore, object), dt->GetObjects()) {
+                       Host::Ptr host = static_pointer_cast<Host>(object);
 
-               DumpHostStatus(statusfp, host);
-               DumpHostObject(objectfp, host);
+                       DumpHostStatus(statusfp, host);
+                       DumpHostObject(objectfp, host);
+               }
        }
 
-       BOOST_FOREACH(tie(tuples::ignore, object), DynamicType::GetByName("HostGroup")->GetObjects()) {
-               const HostGroup::Ptr& hg = static_pointer_cast<HostGroup>(object);
+       {
+               DynamicType::Ptr dt = DynamicType::GetByName("Host");
+               ObjectLock dlock(dt);
+
+               DynamicObject::Ptr object;
+               BOOST_FOREACH(tie(tuples::ignore, object), dt->GetObjects()) {
+                       HostGroup::Ptr hg = static_pointer_cast<HostGroup>(object);
+                       ObjectLock olock(hg);
 
-               objectfp << "define hostgroup {" << "\n"
-                        << "\t" << "hostgroup_name" << "\t" << hg->GetName() << "\n"
-                        << "\t" << "notes_url" << "\t" << hg->GetNotesUrl() << "\n"
-                        << "\t" << "action_url" << "\t" << hg->GetActionUrl() << "\n";
+                       objectfp << "define hostgroup {" << "\n"
+                                << "\t" << "hostgroup_name" << "\t" << hg->GetName() << "\n"
+                                << "\t" << "notes_url" << "\t" << hg->GetNotesUrl() << "\n"
+                                << "\t" << "action_url" << "\t" << hg->GetActionUrl() << "\n";
 
-               objectfp << "\t" << "members" << "\t";
-               DumpNameList(objectfp, hg->GetMembers());
-               objectfp << "\n"
-                        << "}" << "\n";
+                       objectfp << "\t" << "members" << "\t";
+                       DumpNameList(objectfp, hg->GetMembers());
+                       objectfp << "\n"
+                                << "}" << "\n";
+               }
        }
 
-       BOOST_FOREACH(tie(tuples::ignore, object), DynamicType::GetByName("Service")->GetObjects()) {
-               const Service::Ptr& service = static_pointer_cast<Service>(object);
+       {
+               DynamicType::Ptr dt = DynamicType::GetByName("Service");
+               ObjectLock dlock(dt);
 
-               DumpServiceStatus(statusfp, service);
-               DumpServiceObject(objectfp, service);
+               DynamicObject::Ptr object;
+               BOOST_FOREACH(tie(tuples::ignore, object), dt->GetObjects()) {
+                       Service::Ptr service = static_pointer_cast<Service>(object);
+
+                       DumpServiceStatus(statusfp, service);
+                       DumpServiceObject(objectfp, service);
+               }
        }
 
-       BOOST_FOREACH(tie(tuples::ignore, object), DynamicType::GetByName("ServiceGroup")->GetObjects()) {
-               const ServiceGroup::Ptr& sg = static_pointer_cast<ServiceGroup>(object);
+       {
+               DynamicType::Ptr dt = DynamicType::GetByName("ServiceGroup");
+               ObjectLock dlock(dt);
 
-               objectfp << "define servicegroup {" << "\n"
-                        << "\t" << "servicegroup_name" << "\t" << sg->GetName() << "\n"
-                        << "\t" << "notes_url" << "\t" << sg->GetNotesUrl() << "\n"
-                        << "\t" << "action_url" << "\t" << sg->GetActionUrl() << "\n";
+               DynamicObject::Ptr object;
+               BOOST_FOREACH(tie(tuples::ignore, object), dt->GetObjects()) {
+                       ServiceGroup::Ptr sg = static_pointer_cast<ServiceGroup>(object);
+                       ObjectLock olock(sg);
 
-               objectfp << "\t" << "members" << "\t";
+                       objectfp << "define servicegroup {" << "\n"
+                                << "\t" << "servicegroup_name" << "\t" << sg->GetName() << "\n"
+                                << "\t" << "notes_url" << "\t" << sg->GetNotesUrl() << "\n"
+                                << "\t" << "action_url" << "\t" << sg->GetActionUrl() << "\n";
 
-               vector<String> sglist;
-               BOOST_FOREACH(const Service::Ptr& service, sg->GetMembers()) {
-                       sglist.push_back(service->GetHost()->GetName());
-                       sglist.push_back(service->GetShortName());
-               }
+                       objectfp << "\t" << "members" << "\t";
 
-               DumpStringList(objectfp, sglist);
+                       vector<String> sglist;
+                       BOOST_FOREACH(const Service::Ptr& service, sg->GetMembers()) {
+                               ObjectLock slock(service);
+                               Host::Ptr host = service->GetHost();
 
-               objectfp << "\n"
-                        << "}" << "\n";
+                               ObjectLock hlock(host);
+                               sglist.push_back(host->GetName());
+
+                               sglist.push_back(service->GetShortName());
+                       }
+
+                       DumpStringList(objectfp, sglist);
+
+                       objectfp << "\n"
+                                << "}" << "\n";
+               }
        }
 
        statusfp.close();
index 27613cc0784ef817d271072eab439bc89411ff8e..749801aea27c792d5fbde41f9e2a471bbc320ace 100644 (file)
@@ -71,6 +71,7 @@ private:
                        else
                                first = false;
 
+                       ObjectLock olock(*it);
                        fp << (*it)->GetName();
                }
        }
index 4e32271a64ca1beb682f594af0fd015649d0b7bb..5485291baad74c970d773d421a402f61b75926be 100644 (file)
@@ -48,13 +48,17 @@ bool DelegationComponent::IsEndpointChecker(const Endpoint::Ptr& endpoint)
        return (endpoint->HasSubscription("checker"));
 }
 
-vector<Endpoint::Ptr> DelegationComponent::GetCheckerCandidates(const Service::Ptr& service) const
+set<Endpoint::Ptr> DelegationComponent::GetCheckerCandidates(const Service::Ptr& service) const
 {
-       vector<Endpoint::Ptr> candidates;
+       set<Endpoint::Ptr> candidates;
+
+       DynamicType::Ptr dt = DynamicType::GetByName("Endpoint");
+       ObjectLock dlock(dt);
 
        DynamicObject::Ptr object;
-       BOOST_FOREACH(tie(tuples::ignore, object), DynamicType::GetByName("Endpoint")->GetObjects()) {
+       BOOST_FOREACH(tie(tuples::ignore, object), dt->GetObjects()) {
                Endpoint::Ptr endpoint = dynamic_pointer_cast<Endpoint>(object);
+               ObjectLock olock(endpoint);
 
                String myIdentity = EndpointManager::GetInstance()->GetIdentity();
 
@@ -74,7 +78,7 @@ vector<Endpoint::Ptr> DelegationComponent::GetCheckerCandidates(const Service::P
                if (!service->IsAllowedChecker(endpoint->GetName()))
                        continue;
 
-               candidates.push_back(endpoint);
+               candidates.insert(endpoint);
        }
 
        return candidates;
@@ -82,59 +86,71 @@ vector<Endpoint::Ptr> DelegationComponent::GetCheckerCandidates(const Service::P
 
 void DelegationComponent::DelegationTimerHandler(void)
 {
-       recursive_mutex::scoped_lock lock(Application::GetMutex());
-
        map<Endpoint::Ptr, int> histogram;
 
-       DynamicObject::Ptr object;
-       BOOST_FOREACH(tie(tuples::ignore, object), DynamicType::GetByName("Endpoint")->GetObjects()) {
-               Endpoint::Ptr endpoint = dynamic_pointer_cast<Endpoint>(object);
+       {
+               DynamicType::Ptr dt = DynamicType::GetByName("Endpoint");
+               ObjectLock dlock(dt);
+
+               DynamicObject::Ptr object;
+               BOOST_FOREACH(tie(tuples::ignore, object), dt->GetObjects()) {
+                       Endpoint::Ptr endpoint = dynamic_pointer_cast<Endpoint>(object);
 
-               histogram[endpoint] = 0;
+                       histogram[endpoint] = 0;
+               }
        }
 
        vector<Service::Ptr> services;
 
-       /* build "checker -> service count" histogram */
-       BOOST_FOREACH(tie(tuples::ignore, object), DynamicType::GetByName("Service")->GetObjects()) {
-               Service::Ptr service = dynamic_pointer_cast<Service>(object);
+       {
+               /* build "checker -> service count" histogram */
+               DynamicType::Ptr dt = DynamicType::GetByName("Service");
+               ObjectLock dlock(dt);
 
-               if (!service)
-                       continue;
+               DynamicObject::Ptr object;
+               BOOST_FOREACH(tie(tuples::ignore, object), dt->GetObjects()) {
+                       Service::Ptr service = dynamic_pointer_cast<Service>(object);
 
-               services.push_back(service);
+                       if (!service)
+                               continue;
 
-               String checker = service->GetChecker();
-               if (checker.IsEmpty())
-                       continue;
+                       services.push_back(service);
 
-               if (!Endpoint::Exists(checker))
-                       continue;
+                       ObjectLock olock(service);
+                       String checker = service->GetChecker();
+                       if (checker.IsEmpty())
+                               continue;
+
+                       if (!Endpoint::Exists(checker))
+                               continue;
 
-               Endpoint::Ptr endpoint = Endpoint::GetByName(checker);
+                       Endpoint::Ptr endpoint = Endpoint::GetByName(checker);
 
-               histogram[endpoint]++;
+                       histogram[endpoint]++;
+               }
        }
 
-       std::random_shuffle(services.begin(), services.end());
+       //std::random_shuffle(services.begin(), services.end());
 
        int delegated = 0;
 
        /* re-assign services */
        BOOST_FOREACH(const Service::Ptr& service, services) {
+               ObjectLock olock(service);
+
                String checker = service->GetChecker();
 
                Endpoint::Ptr oldEndpoint;
                if (Endpoint::Exists(checker))
                        oldEndpoint = Endpoint::GetByName(checker);
 
-               vector<Endpoint::Ptr> candidates = GetCheckerCandidates(service);
+               set<Endpoint::Ptr> candidates = GetCheckerCandidates(service);
 
                int avg_services = 0, overflow_tolerance = 0;
                vector<Endpoint::Ptr>::iterator cit;
 
                if (candidates.size() > 0) {
-                       std::random_shuffle(candidates.begin(), candidates.end());
+                       //std::random_shuffle(candidates.begin(), candidates.end());
 
                        stringstream msgbuf;
                        msgbuf << "Service: " << service->GetName() << ", candidates: " << candidates.size();
@@ -150,8 +166,11 @@ void DelegationComponent::DelegationTimerHandler(void)
 
                /* don't re-assign service if the checker is still valid
                 * and doesn't have too many services */
+
+               ObjectLock elock(oldEndpoint);
+
                if (oldEndpoint && oldEndpoint->IsConnected() &&
-                   find(candidates.begin(), candidates.end(), oldEndpoint) != candidates.end() &&
+                   candidates.find(oldEndpoint) != candidates.end() &&
                    histogram[oldEndpoint] <= avg_services + overflow_tolerance)
                        continue;
 
@@ -169,6 +188,7 @@ void DelegationComponent::DelegationTimerHandler(void)
                        if (histogram[candidate] > avg_services)
                                continue;
 
+                       ObjectLock clock(candidate);
                        service->SetChecker(candidate->GetName());
                        histogram[candidate]++;
 
index b47751b2f61e7f511e35e851dbaadf8805e46f48..655feb1924de732d612d5ae073b39c8b74153827 100644 (file)
@@ -36,7 +36,7 @@ private:
 
        void DelegationTimerHandler(void);
 
-       vector<Endpoint::Ptr> GetCheckerCandidates(const Service::Ptr& service) const;
+       set<Endpoint::Ptr> GetCheckerCandidates(const Service::Ptr& service) const;
 
        static bool IsEndpointChecker(const Endpoint::Ptr& endpoint);
 
index 07272ad970b9bf645dd47abb64d685f89540fe09..31b931c19a1d7ec908f6074c78ee816f169b2f9f 100644 (file)
@@ -54,16 +54,16 @@ void DemoComponent::Stop(void)
  */
 void DemoComponent::DemoTimerHandler(void)
 {
-       recursive_mutex::scoped_lock lock(Application::GetMutex());
-
        Logger::Write(LogInformation, "demo", "Sending multicast 'hello"
            " world' message.");
 
        RequestMessage request;
        request.SetMethod("demo::HelloWorld");
 
-       EndpointManager::GetInstance()->SendMulticastMessage(m_Endpoint,
-           request);
+       EndpointManager::Ptr em = EndpointManager::GetInstance();
+
+       ObjectLock olock(em);
+       em->SendMulticastMessage(m_Endpoint, request);
 }
 
 /**
index a529ce5364cfa18ef2dd6f180108fc33d56764b2..d6e5a9151ea1d24ba7daa4dc97beceedb34eb424 100644 (file)
@@ -29,6 +29,8 @@ EXPORT_COMPONENT(notification, NotificationComponent);
 void NotificationComponent::Start(void)
 {
        m_Endpoint = Endpoint::MakeEndpoint("notification", false);
+
+       ObjectLock olock(m_Endpoint);
        m_Endpoint->RegisterTopicHandler("icinga::SendNotifications",
            boost::bind(&NotificationComponent::SendNotificationsRequestHandler, this, _2,
            _3));
@@ -53,8 +55,6 @@ void NotificationComponent::Stop(void)
  */
 void NotificationComponent::NotificationTimerHandler(void)
 {
-       recursive_mutex::scoped_lock lock(Application::GetMutex());
-
        // TODO: implement
 }
 
@@ -78,5 +78,7 @@ void NotificationComponent::SendNotificationsRequestHandler(const Endpoint::Ptr&
                return;
 
        Service::Ptr service = Service::GetByName(svc);
+
+       ObjectLock olock(service);
        service->SendNotifications(static_cast<NotificationType>(type));
 }
index 1dafdfe9517a8274c15681e65dbbf34ae3e3fd60..01f21a5f9a087a3c830aad720d1bf7c073b5ae9e 100644 (file)
@@ -88,12 +88,8 @@ static bool LoadConfigFiles(bool validateOnly)
 static void ReloadConfigTimerHandler(void)
 {
        if (g_ReloadConfig) {
-               {
-                       recursive_mutex::scoped_lock lock(Application::GetMutex());
-
-                       Logger::Write(LogInformation, "icinga-app", "Received SIGHUP. Reloading config files.");
-                       LoadConfigFiles(false);
-               }
+               Logger::Write(LogInformation, "icinga-app", "Received SIGHUP. Reloading config files.");
+               LoadConfigFiles(false);
 
                g_ReloadConfig = false;
        }
index ab91ce35b7b0da07f732a441d892ed6fa9a79f2b..b0e139e5a7cd1b91e18cea2a8a9ad4a1d57099c6 100644 (file)
@@ -21,7 +21,6 @@
 
 using namespace icinga;
 
-recursive_mutex Application::m_Mutex;
 Application *Application::m_Instance = NULL;
 bool Application::m_ShuttingDown = false;
 bool Application::m_Debugging = false;
@@ -110,11 +109,6 @@ void Application::SetArgV(char **argv)
        m_ArgV = argv;
 }
 
-void Application::NewTxTimerHandler(void)
-{
-       DynamicObject::NewTx();
-}
-
 #ifdef _DEBUG
 void Application::ProfileTimerHandler(void)
 {
@@ -142,12 +136,6 @@ void Application::RunEventLoop(void) const
        thread t(&Application::TimeWatchThreadProc);
        t.detach();
 
-       /* Set up a timer to periodically flush the tx. */
-       Timer::Ptr newTxTimer = boost::make_shared<Timer>();
-       newTxTimer->OnTimerExpired.connect(boost::bind(&Application::NewTxTimerHandler));
-       newTxTimer->SetInterval(0.5);
-       newTxTimer->Start();
-
        /* Set up a timer that watches the m_Shutdown flag. */
        Timer::Ptr shutdownTimer = boost::make_shared<Timer>();
        shutdownTimer->OnTimerExpired.connect(boost::bind(&Application::ShutdownTimerHandler));
@@ -162,7 +150,7 @@ void Application::RunEventLoop(void) const
        flushTxTimer->Start();
 #endif /* _DEBUG */
 
-       GetEQ().Run();
+       GetEQ().Join();
 }
 
 /**
@@ -569,16 +557,6 @@ void Application::SetPkgDataDir(const String& path)
         m_PkgDataDir = path;
 }
 
-/**
- * Returns the global mutex.
- *
- * @returns The mutex.
- */
-recursive_mutex& Application::GetMutex(void)
-{
-       return m_Mutex;
-}
-
 /**
  * Returns the main thread's event queue.
  *
index f816f7b0e2995b987e8cbcfa9e53325e32b4fbfc..e4213882ad9620f84a76462789b6ac2f533a9e67 100644 (file)
@@ -79,15 +79,12 @@ public:
        static String GetPkgDataDir(void);
        static void SetPkgDataDir(const String& path);
 
-       static recursive_mutex& GetMutex(void);
-
        static EventQueue& GetEQ(void);
 
 protected:
        void RunEventLoop(void) const;
 
 private:
-       static recursive_mutex m_Mutex; /**< The global mutex. */
        static Application *m_Instance; /**< The application instance. */
 
        static bool m_ShuttingDown; /**< Whether the application is in the process of
index a1941866b4fc05723b1e50bb597d6cb650316721..6fb9429733cc5d27b5a000844f8883e21d8c6d67 100644 (file)
@@ -66,12 +66,7 @@ public:
        void Start(const CompletionCallback& completionCallback = CompletionCallback())
        {
                m_CompletionCallback = completionCallback;
-
-               try {
-                       Run();
-               } catch (...) {
-                       FinishException(boost::current_exception());
-               }
+               Utility::QueueAsyncCallback(boost::bind(&AsyncTask<TClass, TResult>::Run, this));
        }
 
        /**
index 8167e9a73227e398e697f8a2626cd4c06e21fe5e..096da76b5b695856634894fef4a27278957708ae 100644 (file)
@@ -48,4 +48,4 @@ private:
 
 }
 
-#endif /* CONNECTION_H */
\ No newline at end of file
+#endif /* CONNECTION_H */
index cb1082e9b5a5b9f5ba02f3cdf2caaa7313e3cffc..cd41e5ed3a65b91862044e3db3a320420821c393 100644 (file)
@@ -59,9 +59,12 @@ struct DictionaryKeyLessComparer
  *
  * @param key The key whose value should be retrieved.
  * @returns The value of an empty value if the key was not found.
+ * @threadsafety Always.
  */
 Value Dictionary::Get(const char *key) const
 {
+       ObjectLock olock(this);
+
        map<String, Value>::const_iterator it;
 
        it = std::lower_bound(m_Data.begin(), m_Data.end(), key, DictionaryKeyLessComparer());
@@ -77,6 +80,7 @@ Value Dictionary::Get(const char *key) const
  *
  * @param key The key whose value should be retrieved.
  * @returns The value or an empty value if the key was not found.
+ * @threadsafety Always.
  */
 Value Dictionary::Get(const String& key) const
 {
@@ -88,9 +92,12 @@ Value Dictionary::Get(const String& key) const
  *
  * @param key The key.
  * @param value The value.
+ * @threadsafety Always.
  */
 void Dictionary::Set(const String& key, const Value& value)
 {
+       ObjectLock olock(this);
+
        if (value.IsEmpty()) {
                Remove(key);
                return;
@@ -107,9 +114,12 @@ void Dictionary::Set(const String& key, const Value& value)
  *
  * @param value The value.
  * @returns The key that was used to add the new item.
+ * @threadsafety Always.
  */
 String Dictionary::Add(const Value& value)
 {
+       ObjectLock olock(this);
+
        Dictionary::Iterator it;
        String key;
        long index = GetLength();
@@ -150,9 +160,12 @@ Dictionary::Iterator Dictionary::End(void)
  * Returns the number of elements in the dictionary.
  *
  * @returns Number of elements.
+ * @threadsafety Always.
  */
 size_t Dictionary::GetLength(void) const
 {
+       ObjectLock olock(this);
+
        return m_Data.size();
 }
 
@@ -161,9 +174,12 @@ size_t Dictionary::GetLength(void) const
  *
  * @param key The key.
  * @returns true if the dictionary contains the key, false otherwise.
+ * @threadsafety Always.
  */
 bool Dictionary::Contains(const String& key) const
 {
+       ObjectLock olock(this);
+
        return (m_Data.find(key) != m_Data.end());
 }
 
@@ -171,9 +187,12 @@ bool Dictionary::Contains(const String& key) const
  * Removes the specified key from the dictionary.
  *
  * @param key The key.
+ * @threadsafety Always.
  */
 void Dictionary::Remove(const String& key)
 {
+       ObjectLock olock(this);
+
        Dictionary::Iterator it;
        it = m_Data.find(key);
 
@@ -198,9 +217,12 @@ void Dictionary::Remove(Dictionary::Iterator it)
  * Makes a shallow copy of a dictionary.
  *
  * @returns a copy of the dictionary.
+ * @threadsafety Always.
  */
 Dictionary::Ptr Dictionary::ShallowClone(void) const
 {
+       ObjectLock olock(this);
+
        Dictionary::Ptr clone = boost::make_shared<Dictionary>();
 
        String key;
@@ -217,6 +239,7 @@ Dictionary::Ptr Dictionary::ShallowClone(void) const
  *
  * @param json The JSON object.
  * @returns A dictionary that is equivalent to the JSON object.
+ * @threadsafety Always.
  */
 Dictionary::Ptr Dictionary::FromJson(cJSON *json)
 {
@@ -237,12 +260,15 @@ Dictionary::Ptr Dictionary::FromJson(cJSON *json)
  *
  * @returns A JSON object that is equivalent to the dictionary. Values that
  *         cannot be represented in JSON are omitted.
+ * @threadsafety Always.
  */
 cJSON *Dictionary::ToJson(void) const
 {
        cJSON *json = cJSON_CreateObject();
 
        try {
+               ObjectLock olock(this);
+
                String key;
                Value value;
                BOOST_FOREACH(tie(key, value), m_Data) {
index 4583805c74b78eb5ec056dab321d6a3b48bd65a7..359830ac5d31432309a65249a071c76d33575992 100644 (file)
@@ -23,7 +23,9 @@ using namespace icinga;
 
 double DynamicObject::m_CurrentTx = 0;
 set<DynamicObject *> DynamicObject::m_ModifiedObjects;
-boost::mutex DynamicObject::m_ModifiedObjectsMutex;
+boost::mutex DynamicObject::m_TransactionMutex;
+boost::once_flag DynamicObject::m_TransactionOnce;
+Timer::Ptr DynamicObject::m_TransactionTimer;
 
 signals2::signal<void (const DynamicObject::Ptr&)> DynamicObject::OnRegistered;
 signals2::signal<void (const DynamicObject::Ptr&)> DynamicObject::OnUnregistered;
@@ -46,6 +48,8 @@ DynamicObject::DynamicObject(const Dictionary::Ptr& serializedObject)
         * The DynamicObject::Create function takes care of restoring
         * non-config state after the object has been fully constructed */
        ApplyUpdate(serializedObject, Attribute_Config);
+
+       boost::call_once(m_TransactionOnce, &DynamicObject::Initialize);
 }
 
 /*
@@ -53,10 +57,22 @@ DynamicObject::DynamicObject(const Dictionary::Ptr& serializedObject)
  */
 DynamicObject::~DynamicObject(void)
 {
-       boost::mutex::scoped_lock lock(m_ModifiedObjectsMutex);
+       boost::mutex::scoped_lock lock(m_TransactionMutex);
        m_ModifiedObjects.erase(this);
 }
 
+void DynamicObject::Initialize(void)
+{
+       /* Set up a timer to periodically create a new transaction. */
+       m_TransactionTimer = boost::make_shared<Timer>();
+       m_TransactionTimer->SetInterval(0.5);
+       m_TransactionTimer->OnTimerExpired.connect(boost::bind(&DynamicObject::NewTx));
+       m_TransactionTimer->Start();
+}
+
+/**
+ * @threadsafety Always.
+ */
 void DynamicObject::SendLocalUpdateEvents(void)
 {
        map<String, Value, string_iless>::iterator it;
@@ -199,7 +215,7 @@ void DynamicObject::InternalSetAttribute(const String& name, const Value& data,
                m_ConfigTx = tx;
 
        {
-               boost::mutex::scoped_lock lock(m_ModifiedObjectsMutex);
+               boost::mutex::scoped_lock lock(m_TransactionMutex);
                m_ModifiedObjects.insert(this);
        }
 
@@ -280,8 +296,6 @@ String DynamicObject::GetSource(void) const
 
 void DynamicObject::Register(void)
 {
-       recursive_mutex::scoped_lock lock(Application::GetMutex());
-
        DynamicType::Ptr dtype = GetType();
 
        DynamicObject::Ptr dobj = dtype->GetObject(GetName());
@@ -302,9 +316,8 @@ void DynamicObject::Start(void)
 
 void DynamicObject::Unregister(void)
 {
-       recursive_mutex::scoped_lock lock(Application::GetMutex());
-
        DynamicType::Ptr dtype = GetType();
+       ObjectLock olock(dtype);
 
        if (!dtype || !dtype->GetObject(GetName()))
                return;
@@ -322,11 +335,16 @@ ScriptTask::Ptr DynamicObject::InvokeMethod(const String& method,
        if (!value.IsObjectType<Dictionary>())
                return ScriptTask::Ptr();
 
+       String funcName;
        Dictionary::Ptr methods = value;
-       if (!methods->Contains(method))
-               return ScriptTask::Ptr();
 
-       String funcName = methods->Get(method);
+       {
+               ObjectLock olock(methods);
+               if (!methods->Contains(method))
+                       return ScriptTask::Ptr();
+
+               funcName = methods->Get(method);
+       }
 
        ScriptFunction::Ptr func = ScriptFunction::GetByName(funcName);
 
@@ -344,8 +362,6 @@ ScriptTask::Ptr DynamicObject::InvokeMethod(const String& method,
  */
 void DynamicObject::DumpObjects(const String& filename)
 {
-       recursive_mutex::scoped_lock lock(Application::GetMutex());
-
        Logger::Write(LogInformation, "base", "Dumping program state to file '" + filename + "'");
 
        String tempFilename = filename + ".tmp";
@@ -409,8 +425,6 @@ void DynamicObject::DumpObjects(const String& filename)
  */
 void DynamicObject::RestoreObjects(const String& filename)
 {
-       recursive_mutex::scoped_lock lock(Application::GetMutex());
-
        Logger::Write(LogInformation, "base", "Restoring program state from file '" + filename + "'");
 
        std::fstream fp;
@@ -432,6 +446,7 @@ void DynamicObject::RestoreObjects(const String& filename)
                bool hasConfig = update->Contains("configTx");
 
                DynamicType::Ptr dt = DynamicType::GetByName(type);
+               ObjectLock dlock(dt);
 
                if (!dt)
                        BOOST_THROW_EXCEPTION(invalid_argument("Invalid type: " + type));
@@ -455,13 +470,8 @@ void DynamicObject::RestoreObjects(const String& filename)
        Logger::Write(LogDebug, "base", msgbuf.str());
 }
 
-/*
- * @threadsafety Always.
- */
 void DynamicObject::DeactivateObjects(void)
 {
-       recursive_mutex::scoped_lock lock(Application::GetMutex());
-
        DynamicType::TypeMap::iterator tt;
        for (tt = DynamicType::GetTypes().begin(); tt != DynamicType::GetTypes().end(); tt++) {
                DynamicType::NameMap::iterator nt;
@@ -479,7 +489,7 @@ void DynamicObject::DeactivateObjects(void)
  */
 double DynamicObject::GetCurrentTx(void)
 {
-       recursive_mutex::scoped_lock lock(Application::GetMutex());
+       boost::mutex::scoped_lock lock(m_TransactionMutex);
 
        assert(m_CurrentTx != 0);
 
@@ -487,29 +497,27 @@ double DynamicObject::GetCurrentTx(void)
 }
 
 /*
- * @threadsafety Always.
+ * @threadsafety Always. Caller must not hold any Object locks.
  */
 void DynamicObject::NewTx(void)
 {
+       double tx;
        set<DynamicObject *> objects;
 
        {
-               boost::mutex::scoped_lock lock(m_ModifiedObjectsMutex);
+               boost::mutex::scoped_lock lock(m_TransactionMutex);
 
-               /* Some objects may accidentally bleed into the next transaction because
-                * we're not holding the global mutex while "stealing" the modified objects,
-                * but that's entirely ok. */
+               tx = m_CurrentTx;
                m_ModifiedObjects.swap(objects);
+               m_CurrentTx = Utility::GetTime();
        }
 
-       recursive_mutex::scoped_lock lock(Application::GetMutex());
-
        BOOST_FOREACH(DynamicObject *object, objects) {
+               ObjectLock olock(object);
                object->SendLocalUpdateEvents();
        }
 
-       OnTransactionClosing(m_CurrentTx, objects);
-       m_CurrentTx = Utility::GetTime();
+       OnTransactionClosing(tx, objects);
 }
 
 void DynamicObject::OnInitCompleted(void)
@@ -523,10 +531,12 @@ void DynamicObject::OnAttributeChanged(const String&, const Value&)
  */
 DynamicObject::Ptr DynamicObject::GetObject(const String& type, const String& name)
 {
-       recursive_mutex::scoped_lock lock(Application::GetMutex());
-
        DynamicType::Ptr dtype = DynamicType::GetByName(type);
-       return dtype->GetObject(name);
+
+       {
+               ObjectLock olock(dtype);
+               return dtype->GetObject(name);
+       }
 }
 
 const DynamicObject::AttributeMap& DynamicObject::GetAttributes(void) const
index 24377f3bd2eb017c6551d64ffa5a4520ae241bf4..dc8ddeea6a3637ee62d97607d471364099d3f7ec 100644 (file)
@@ -81,6 +81,8 @@ public:
        DynamicObject(const Dictionary::Ptr& serializedObject);
        ~DynamicObject(void);
 
+       static void Initialize(void);
+
        Dictionary::Ptr BuildUpdate(double sinceTx, int attributeTypes) const;
        void ApplyUpdate(const Dictionary::Ptr& serializedUpdate, int allowedTypes);
 
@@ -147,7 +149,9 @@ private:
        /* This has to be a set of raw pointers because the DynamicObject
         * constructor has to be able to insert objects into this list. */
        static set<DynamicObject *> m_ModifiedObjects;
-       static boost::mutex m_ModifiedObjectsMutex;
+       static boost::mutex m_TransactionMutex;
+       static boost::once_flag m_TransactionOnce;
+       static Timer::Ptr m_TransactionTimer;
 
        friend class DynamicType; /* for OnInitCompleted. */
 };
index 1e40c0681aa4f5ddb5896d93bb45b7358c0cb0f6..dc26d1be3e412eb6c5f599c44dec29e62b58b0f3 100644 (file)
@@ -21,8 +21,6 @@
 
 using namespace icinga;
 
-boost::mutex DynamicType::m_Mutex;
-
 DynamicType::DynamicType(const String& name, const DynamicType::ObjectFactory& factory)
        : m_Name(name), m_ObjectFactory(factory)
 { }
@@ -32,7 +30,7 @@ DynamicType::DynamicType(const String& name, const DynamicType::ObjectFactory& f
  */
 DynamicType::Ptr DynamicType::GetByName(const String& name)
 {
-       boost::mutex::scoped_lock lock(m_Mutex);
+       boost::mutex::scoped_lock lock(GetStaticMutex());
 
        DynamicType::TypeMap::const_iterator tt = GetTypes().find(name);
 
@@ -43,7 +41,7 @@ DynamicType::Ptr DynamicType::GetByName(const String& name)
 }
 
 /**
- * @threadsafety Caller must hold DynamicType::m_Mutex while using the map.
+ * @threadsafety Caller must hold DynamicType::GetStaticMutex() while using the map.
  */
 DynamicType::TypeMap& DynamicType::GetTypes(void)
 {
@@ -52,7 +50,7 @@ DynamicType::TypeMap& DynamicType::GetTypes(void)
 }
 
 /**
- * @threadsafety Caller must hold DynamicType::m_Mutex while using the map.
+ * @threadsafety Caller must hold DynamicType::GetStaticMutex() while using the map.
  */
 DynamicType::NameMap& DynamicType::GetObjects(void)
 {
@@ -89,7 +87,7 @@ DynamicObject::Ptr DynamicType::GetObject(const String& name) const
  */
 void DynamicType::RegisterType(const DynamicType::Ptr& type)
 {
-       boost::mutex::scoped_lock lock(m_Mutex);
+       boost::mutex::scoped_lock lock(GetStaticMutex());
 
        DynamicType::TypeMap::const_iterator tt = GetTypes().find(type->GetName());
 
@@ -147,3 +145,9 @@ void DynamicType::AddAttributes(const AttributeDescription *attributes, int attr
        for (int i = 0; i < attributeCount; i++)
                AddAttribute(attributes[i].Name, attributes[i].Type);
 }
+
+boost::mutex& DynamicType::GetStaticMutex(void)
+{
+       static boost::mutex mutex;
+       return mutex;
+}
index d97a6c6602e6b985844c46a0a2de79a8a5bdb436..5a46b455259f6abfa07c37fc37d5b2f97a662479 100644 (file)
@@ -64,12 +64,13 @@ public:
        void AddAttributes(const AttributeDescription *attributes, int attributeCount);
 
 private:
-       static boost::mutex m_Mutex;
        String m_Name;
        ObjectFactory m_ObjectFactory;
        map<String, DynamicAttributeType> m_Attributes;
 
        NameMap m_Objects;
+
+       static boost::mutex& GetStaticMutex(void);
 };
 
 /**
index 0dc4f40b82c5832c049832f3559f4d7a61b0598d..30e75791d887ee61e7e766c779a3cd0f37a6bc59 100644 (file)
@@ -26,7 +26,15 @@ using namespace icinga;
  */
 EventQueue::EventQueue(void)
        : m_Stopped(false)
-{ }
+{
+       int cpus = thread::hardware_concurrency();
+
+       if (cpus < 4)
+               cpus = 4;
+
+       for (int i = 0; i < cpus; i++)
+               m_Threads.create_thread(boost::bind(&EventQueue::QueueThreadProc, this));
+}
 
 /**
  * @threadsafety Always.
@@ -34,6 +42,7 @@ EventQueue::EventQueue(void)
 EventQueue::~EventQueue(void)
 {
        Stop();
+       Join();
 }
 
 /**
@@ -47,23 +56,13 @@ void EventQueue::Stop(void)
 }
 
 /**
- * Spawns worker threads and waits for them to complete.
+ * Waits for all worker threads to finish.
  *
  * @threadsafety Always.
  */
-void EventQueue::Run(void)
+void EventQueue::Join(void)
 {
-       thread_group threads;
-
-       int cpus = thread::hardware_concurrency();
-
-       if (cpus == 0)
-               cpus = 4;
-
-       for (int i = 0; i < cpus * 4; i++)
-               threads.create_thread(boost::bind(&EventQueue::QueueThreadProc, this));
-
-       threads.join_all();
+       m_Threads.join_all();
 }
 
 /**
index dec61d21c7133e5fed9948ffa79d9f5512aa0216..36b0ff11aa49c40504d81042f5936c169e3d2996 100644 (file)
@@ -36,13 +36,13 @@ public:
        EventQueue(void);
        ~EventQueue(void);
 
-       void Run(void);
-       void Post(const Callback& callback);
-
        void Stop(void);
+       void Join(void);
+
+       void Post(const Callback& callback);
 
 private:
-       boost::thread::id m_Owner;
+       thread_group m_Threads;
 
        boost::mutex m_Mutex;
        condition_variable m_CV;
index b59bbf79ef2d2892163eead5fd81356bc938575a..56d245bdb7e566eebaa9200d175a339a8256ed53 100644 (file)
@@ -81,10 +81,7 @@ void Logger::Write(LogSeverity severity, const String& facility,
        entry.Facility = facility;
        entry.Message = message;
 
-       {
-               recursive_mutex::scoped_lock lock(Application::GetMutex());
-               ForwardLogEntry(entry);
-       }
+       ForwardLogEntry(entry);
 }
 
 /**
@@ -113,13 +110,21 @@ void Logger::ForwardLogEntry(const LogEntry& entry)
        DynamicType::Ptr dt = DynamicType::GetByName("Logger");
 
        DynamicObject::Ptr object;
-       BOOST_FOREACH(tie(tuples::ignore, object), dt->GetObjects()) {
-               Logger::Ptr logger = dynamic_pointer_cast<Logger>(object);
 
-               if (entry.Severity >= logger->GetMinSeverity())
-                       logger->m_Impl->ProcessLogEntry(entry);
+       {
+               ObjectLock olock(dt);
+               BOOST_FOREACH(tie(tuples::ignore, object), dt->GetObjects()) {
+                       Logger::Ptr logger = dynamic_pointer_cast<Logger>(object);
+
+                       {
+                               ObjectLock llock(logger);
+
+                               if (entry.Severity >= logger->GetMinSeverity())
+                                       logger->m_Impl->ProcessLogEntry(entry);
+                       }
 
-               processed = true;
+                       processed = true;
+               }
        }
 
        LogSeverity defaultLogLevel;
index aca652d884d56dca36eb0e7b8024e1268f5115fd..51e055609f8d1b247d218878028295823ff4626f 100644 (file)
@@ -37,9 +37,11 @@ Object::~Object(void)
  * Returns a reference-counted pointer to this object.
  *
  * @returns A shared_ptr object that points to this object
+ * @threadsafety Always.
  */
 Object::SharedPtrHolder Object::GetSelf(void)
 {
+       ObjectLock olock(this);
        return Object::SharedPtrHolder(shared_from_this());
 }
 
@@ -50,7 +52,7 @@ Object::SharedPtrHolder Object::GetSelf(void)
  * @returns The object's mutex.
  * @threadsafety Always.
  */
-recursive_mutex& Object::GetMutex(void)
+recursive_mutex& Object::GetMutex(void) const
 {
        return m_Mutex;
 }
index c51c05fb622dcca4a1d9137340e81337ec8dc02a..1d6e639042da45e6deade83652019d6feb142a1b 100644 (file)
@@ -93,7 +93,7 @@ public:
 
        SharedPtrHolder GetSelf(void);
 
-       recursive_mutex& GetMutex(void);
+       recursive_mutex& GetMutex(void) const;
 
 protected:
        Object(void);
@@ -103,7 +103,7 @@ private:
        Object(const Object& other);
        Object& operator=(const Object& rhs);
 
-       recursive_mutex m_Mutex;
+       mutable recursive_mutex m_Mutex;
 };
 
 /**
@@ -112,15 +112,35 @@ private:
 struct ObjectLock {
 public:
        ObjectLock(const Object::Ptr& object)
-               : m_Lock(object->GetMutex())
-       { }
+#ifdef _DEBUG
+               : m_Lock(), m_Object(object)
+#endif /* _DEBUG */
+       {
+               if (object)
+                       m_Lock = recursive_mutex::scoped_lock(object->GetMutex());
+       }
 
-       ObjectLock(Object *object)
-               : m_Lock(object->GetMutex())
-       { }
+       ObjectLock(const Object *object)
+#ifdef _DEBUG
+               : m_Lock(), m_Object(object->GetSelf())
+#endif /* _DEBUG */
+       {
+               if (object)
+                       m_Lock = recursive_mutex::scoped_lock(object->GetMutex());
+       }
+
+#ifdef _DEBUG
+       ~ObjectLock(void)
+       {
+               assert(m_Object.lock());
+       }
+#endif /* _DEBUG */
 
 private:
        recursive_mutex::scoped_lock m_Lock;
+#ifdef _DEBUG
+       Object::WeakPtr m_Object;
+#endif /* _DEBUG */
 };
 
 /**
index 907073d1e21963961ee2e2eb4647e299fd52594b..a7b002116faecf7102275663bbe2ecf036de558d 100644 (file)
@@ -48,4 +48,4 @@ private:
 
 }
 
-#endif /* STDIOSTREAM_H */
\ No newline at end of file
+#endif /* STDIOSTREAM_H */
index 963e25209eab38765e02da8424a1b64882e7b045..b3cd64384e25aabfa99c76c31afdda47913089b6 100644 (file)
@@ -82,6 +82,8 @@ void Timer::Call(void)
                Logger::Write(LogWarning, "base", msgbuf.str());
        }
 
+       /* Re-enable the timer so it can be called again. */
+       m_Started = true;
        Reschedule();
 }
 
@@ -118,6 +120,8 @@ void Timer::Start(void)
 {
        boost::call_once(&Timer::Initialize, m_ThreadOnce);
 
+       m_Started = true;
+
        Reschedule();
 }
 
@@ -129,6 +133,8 @@ void Timer::Start(void)
 void Timer::Stop(void)
 {
        boost::mutex::scoped_lock lock(m_Mutex);
+
+       m_Started = false;
        m_Timers.erase(GetSelf());
 
        /* Notify the worker thread that we've disabled a timer. */
@@ -158,12 +164,14 @@ void Timer::Reschedule(double next)
 
        m_Next = next;
 
-       /* Remove and re-add the timer to update the index. */
-       m_Timers.erase(GetSelf());
-       m_Timers.insert(GetSelf());
+       if (m_Started) {
+               /* Remove and re-add the timer to update the index. */
+               m_Timers.erase(GetSelf());
+               m_Timers.insert(GetSelf());
 
-       /* Notify the worker that we've rescheduled a timer. */
-       m_CV.notify_all();
+               /* Notify the worker that we've rescheduled a timer. */
+               m_CV.notify_all();
+       }
 }
 
 /**
@@ -250,6 +258,7 @@ void Timer::TimerThreadProc(void)
 
                /* Remove the timer from the list so it doesn't get called again
                 * until the current call is completed. */
+               timer->m_Started = false;
                m_Timers.erase(timer);
 
                /* Asynchronously call the timer. */
index 87f55455270e5d21add9c4e9e4e43a38fc2e0333..a2126b9fa559af4bd59c8e913661066e6f6cd9db 100644 (file)
@@ -67,6 +67,7 @@ public:
 private:
        double m_Interval; /**< The interval of the timer. */
        double m_Next; /**< When the next event should happen. */
+       bool m_Started; /**< Whether the timer is enabled. */
 
        typedef multi_index_container<
                Timer::WeakPtr,
index 2434392c42449e794712b29711f2e9ce47de1669..f98ce187cd20e9c376820c91a727a08f71440cf6 100644 (file)
@@ -548,3 +548,8 @@ void Utility::SetNonBlockingSocket(SOCKET s)
        ioctlsocket(s, FIONBIO, &lTrue);
 #endif /* _WIN32 */
 }
+
+void Utility::QueueAsyncCallback(const boost::function<void (void)>& callback)
+{
+       Application::GetEQ().Post(callback);
+}
index 9d0a1f198c7fcdfbf6ced669704688845847ede1..3b99f5446b6af3e4b1f2906a36a273574aa45757 100644 (file)
@@ -58,6 +58,8 @@ public:
 
        static bool Glob(const String& pathSpec, const function<void (const String&)>& callback);
 
+       static void QueueAsyncCallback(const boost::function<void (void)>& callback);
+
        static
 #ifdef _WIN32
        HMODULE
index f5174507b2f80d13c7bdaed5b2ef17a213f95172..784f63ec76e11df230b8c18cf06fcb6f51d9de05 100644 (file)
@@ -120,7 +120,17 @@ Dictionary::Ptr ConfigItem::Link(void) const
 void ConfigItem::InternalLink(const Dictionary::Ptr& dictionary) const
 {
        BOOST_FOREACH(const String& name, m_Parents) {
-               ConfigItem::Ptr parent = ConfigItem::GetObject(GetType(), name);
+               ConfigItem::Ptr parent;
+
+               ConfigCompilerContext *context = ConfigCompilerContext::GetContext();
+
+               if (context)
+                       parent = context->GetItem(GetType(), name);
+
+               /* ignore already active objects while we're in the compiler
+                * context and linking to existing items is disabled. */
+               if (!parent && (!context || (context->GetFlags() & CompilerLinkExisting)))
+                       parent = ConfigItem::GetObject(GetType(), name);
 
                if (!parent) {
                        stringstream message;
@@ -158,6 +168,7 @@ DynamicObject::Ptr ConfigItem::Commit(void)
        if (it != m_Items.end()) {
                /* Unregister the old item from its parents. */
                ConfigItem::Ptr oldItem = it->second;
+               ObjectLock olock(oldItem);
                oldItem->UnregisterFromParents();
 
                /* Steal the old item's children. */
@@ -167,6 +178,7 @@ DynamicObject::Ptr ConfigItem::Commit(void)
        /* Register this item with its parents. */
        BOOST_FOREACH(const String& parentName, m_Parents) {
                ConfigItem::Ptr parent = GetObject(GetType(), parentName);
+               ObjectLock olock(parent);
                parent->RegisterChild(GetSelf());
        }
 
@@ -196,20 +208,32 @@ DynamicObject::Ptr ConfigItem::Commit(void)
        /* Update or create the object and apply the configuration settings. */
        DynamicObject::Ptr dobj = m_DynamicObject.lock();
 
-       if (!dobj)
+       if (!dobj) {
+               ObjectLock dlock(dtype);
                dobj = dtype->GetObject(GetName());
+       }
+
+       bool was_null = false;
 
-       if (!dobj)
+       if (!dobj) {
+               ObjectLock dlock(dtype);
                dobj = dtype->CreateObject(update);
-       else
-               dobj->ApplyUpdate(update, Attribute_Config);
+               was_null = true;
+       }
 
-       m_DynamicObject = dobj;
+       {
+               ObjectLock olock(dobj);
 
-       if (dobj->IsAbstract())
-               dobj->Unregister();
-       else
-               dobj->Register();
+               if (!was_null)
+                       dobj->ApplyUpdate(update, Attribute_Config);
+
+               m_DynamicObject = dobj;
+
+               if (dobj->IsAbstract())
+                       dobj->Unregister();
+               else
+                       dobj->Register();
+       }
 
        /* We need to make a copy of the child objects because the
         * OnParentCommitted() handler is going to update the list. */
@@ -222,6 +246,7 @@ DynamicObject::Ptr ConfigItem::Commit(void)
                if (!child)
                        continue;
 
+               ObjectLock olock(child);
                child->OnParentCommitted();
        }
 
@@ -237,8 +262,10 @@ void ConfigItem::Unregister(void)
 {
        DynamicObject::Ptr dobj = m_DynamicObject.lock();
 
-       if (dobj)
+       if (dobj) {
+               ObjectLock olock(dobj);
                dobj->Unregister();
+       }
 
        ConfigItem::ItemMap::iterator it;
        it = m_Items.find(make_pair(GetType(), GetName()));
@@ -266,8 +293,10 @@ void ConfigItem::UnregisterFromParents(void)
        BOOST_FOREACH(const String& parentName, m_Parents) {
                ConfigItem::Ptr parent = GetObject(GetType(), parentName);
 
-               if (parent)
+               if (parent) {
+                       ObjectLock olock(parent);
                        parent->UnregisterChild(GetSelf());
+               }
        }
 }
 
@@ -300,24 +329,6 @@ DynamicObject::Ptr ConfigItem::GetDynamicObject(void) const
  */
 ConfigItem::Ptr ConfigItem::GetObject(const String& type, const String& name)
 {
-       {
-               recursive_mutex::scoped_lock lockg(Application::GetMutex());
-
-               ConfigCompilerContext *context = ConfigCompilerContext::GetContext();
-
-               if (context) {
-                       ConfigItem::Ptr item = context->GetItem(type, name);
-
-                       if (item)
-                               return item;
-
-                       /* ignore already active objects while we're in the compiler
-                        * context and linking to existing items is disabled. */
-                       if ((context->GetFlags() & CompilerLinkExisting) == 0)
-                               return ConfigItem::Ptr();
-               }
-       }
-
        {
                boost::mutex::scoped_lock lock(m_Mutex);
 
@@ -362,7 +373,7 @@ void ConfigItem::Dump(ostream& fp) const
 }
 
 /**
- * @threadsafety Caller must hold the global mutex.
+ * @threadsafety Always.
  */
 void ConfigItem::UnloadUnit(const String& unit)
 {
@@ -372,15 +383,22 @@ void ConfigItem::UnloadUnit(const String& unit)
 
        vector<ConfigItem::Ptr> obsoleteItems;
 
-       ConfigItem::Ptr item;
-       BOOST_FOREACH(tie(tuples::ignore, item), m_Items) {
-               if (item->GetUnit() != unit)
-                       continue;
+       {
+               boost::mutex::scoped_lock lock(m_Mutex);
 
-               obsoleteItems.push_back(item);
+               ConfigItem::Ptr item;
+               BOOST_FOREACH(tie(tuples::ignore, item), m_Items) {
+                       ObjectLock olock(item);
+
+                       if (item->GetUnit() != unit)
+                               continue;
+
+                       obsoleteItems.push_back(item);
+               }
        }
 
-       BOOST_FOREACH(item, obsoleteItems) {
+       BOOST_FOREACH(const ConfigItem::Ptr& item, obsoleteItems) {
+               ObjectLock olock(item);
                item->Unregister();
        }
 }
index c680a611630e69a2714dfe867edea5810838cbef..81c6789f5e2cbcf8d31d57e66c30b5c0651931aa 100644 (file)
@@ -106,7 +106,17 @@ ConfigItem::Ptr ConfigItemBuilder::Compile(void)
        }
 
        BOOST_FOREACH(const String& parent, m_Parents) {
-               ConfigItem::Ptr item = ConfigItem::GetObject(m_Type, parent);
+               ConfigItem::Ptr item;
+
+               ConfigCompilerContext *context = ConfigCompilerContext::GetContext();
+
+               if (context)
+                       item = context->GetItem(m_Type, parent);
+
+               /* ignore already active objects while we're in the compiler
+                * context and linking to existing items is disabled. */
+               if (!item && (!context || (context->GetFlags() & CompilerLinkExisting)))
+                       item = ConfigItem::GetObject(m_Type, parent);
 
                if (!item) {
                        stringstream msgbuf;
index e8cc80a0b3f1716fe145e42b8c31d0222f15bb41..337d36ea6d09143a407f6e02b6a38ecd517ff3f0 100644 (file)
@@ -79,11 +79,7 @@ void ExternalCommandProcessor::Execute(double time, const String& command, const
                callback = it->second;
        }
 
-       {
-               recursive_mutex::scoped_lock lock(Application::GetMutex());
-               callback(time, arguments);
-       }
-
+       callback(time, arguments);
 }
 
 /**
@@ -723,7 +719,8 @@ void ExternalCommandProcessor::ScheduleServicegroupHostDowntime(double, const ve
        set<Service::Ptr> services;
 
        BOOST_FOREACH(const Service::Ptr& service, sg->GetMembers()) {
-               Service::Ptr hcService = service->GetHost()->GetHostCheckService();
+               Host::Ptr host = service->GetHost();
+               Service::Ptr hcService = host->GetHostCheckService();
                if (hcService)
                        services.insert(hcService);
        }
@@ -765,9 +762,8 @@ void ExternalCommandProcessor::AddHostComment(double, const vector<String>& argu
 
        Logger::Write(LogInformation, "icinga", "Creating comment for host " + host->GetName());
        Service::Ptr service = host->GetHostCheckService();
-       if (service) {
+       if (service)
                (void) service->AddComment(CommentUser, arguments[2], arguments[3], 0);
-       }
 }
 
 void ExternalCommandProcessor::DelHostComment(double, const vector<String>& arguments)
@@ -813,9 +809,8 @@ void ExternalCommandProcessor::DelAllHostComments(double, const vector<String>&
 
        Logger::Write(LogInformation, "icinga", "Removing all comments for host " + host->GetName());
        Service::Ptr service = host->GetHostCheckService();
-       if (service) {
+       if (service)
                service->RemoveAllComments();
-       }
 }
 
 void ExternalCommandProcessor::DelAllSvcComments(double, const vector<String>& arguments)
index b01e4b8a56e92019c6959de717b0f049262c8a46..09a93ccde4ad38054ac54ed2f6d22df3a99ff40b 100644 (file)
@@ -352,7 +352,19 @@ void Host::ValidateServiceDictionary(const ScriptTask::Ptr& task, const vector<V
                        continue;
                }
 
-               if (!ConfigItem::GetObject("Service", name)) {
+               ConfigItem::Ptr item;
+
+               ConfigCompilerContext *context = ConfigCompilerContext::GetContext();
+
+               if (context)
+                       item = context->GetItem("Service", name);
+
+               /* ignore already active objects while we're in the compiler
+                * context and linking to existing items is disabled. */
+               if (!item && (!context || (context->GetFlags() & CompilerLinkExisting)))
+                       item = ConfigItem::GetObject("Service", name);
+
+               if (!item) {
                        ConfigCompilerContext::GetContext()->AddError(false, "Validation failed for " +
                            location + ": Service '" + name + "' not found.");
                }
index 9e6570ae6d50339709a37bb413c273b381780f4a..d69e6228f60d69d78ff35ea43e92e4cb8e350668 100644 (file)
@@ -27,13 +27,8 @@ PluginCheckTask::PluginCheckTask(const ScriptTask::Ptr& task, const Process::Ptr
        : m_Task(task), m_Process(process)
 { }
 
-/**
- * @threadsafety Always.
- */
 void PluginCheckTask::ScriptFunc(const ScriptTask::Ptr& task, const vector<Value>& arguments)
 {
-       recursive_mutex::scoped_lock lock(Application::GetMutex());
-
        if (arguments.size() < 1)
                BOOST_THROW_EXCEPTION(invalid_argument("Missing argument: Service must be specified."));
 
@@ -41,17 +36,34 @@ void PluginCheckTask::ScriptFunc(const ScriptTask::Ptr& task, const vector<Value
        if (!vservice.IsObjectType<Service>())
                BOOST_THROW_EXCEPTION(invalid_argument("Argument must be a service."));
 
-       Service::Ptr service = vservice;
-
        vector<Dictionary::Ptr> macroDicts;
-       macroDicts.push_back(service->GetMacros());
-       macroDicts.push_back(service->CalculateDynamicMacros());
-       macroDicts.push_back(service->GetHost()->GetMacros());
-       macroDicts.push_back(service->GetHost()->CalculateDynamicMacros());
-       macroDicts.push_back(IcingaApplication::GetInstance()->GetMacros());
+       Value raw_command;
+       Host::Ptr host;
+
+       {
+               Service::Ptr service = vservice;
+               ObjectLock olock(service);
+               macroDicts.push_back(service->GetMacros());
+               macroDicts.push_back(service->CalculateDynamicMacros());
+               raw_command = service->GetCheckCommand();
+               host = service->GetHost();
+       }
+
+       {
+               ObjectLock olock(host);
+               macroDicts.push_back(host->GetMacros());
+               macroDicts.push_back(host->CalculateDynamicMacros());
+       }
+
+       {
+               IcingaApplication::Ptr app = IcingaApplication::GetInstance();
+               ObjectLock olock(app);
+               macroDicts.push_back(app->GetMacros());
+       }
+
        Dictionary::Ptr macros = MacroProcessor::MergeMacroDicts(macroDicts);
 
-       Value command = MacroProcessor::ResolveMacros(service->GetCheckCommand(), macros);
+       Value command = MacroProcessor::ResolveMacros(raw_command, macros);
 
        Process::Ptr process = boost::make_shared<Process>(Process::SplitCommand(command), macros);
 
@@ -60,13 +72,8 @@ void PluginCheckTask::ScriptFunc(const ScriptTask::Ptr& task, const vector<Value
        process->Start(boost::bind(&PluginCheckTask::ProcessFinishedHandler, ct));
 }
 
-/**
- * @threadsafety Always.
- */
 void PluginCheckTask::ProcessFinishedHandler(PluginCheckTask ct)
 {
-       recursive_mutex::scoped_lock lock(Application::GetMutex());
-
        ProcessResult pr;
 
        try {
index 0168de29f4994ecfbc6a3ad4ebb711c8488e5327..96b84d76940d7ea5eafe0e3ecb433e358b32f31c 100644 (file)
@@ -33,8 +33,6 @@ PluginNotificationTask::PluginNotificationTask(const ScriptTask::Ptr& task, cons
  */
 void PluginNotificationTask::ScriptFunc(const ScriptTask::Ptr& task, const vector<Value>& arguments)
 {
-       recursive_mutex::scoped_lock lock(Application::GetMutex());
-
        if (arguments.size() < 1)
                BOOST_THROW_EXCEPTION(invalid_argument("Missing argument: Notification target must be specified."));
 
@@ -44,23 +42,49 @@ void PluginNotificationTask::ScriptFunc(const ScriptTask::Ptr& task, const vecto
        if (!arguments[0].IsObjectType<Notification>())
                BOOST_THROW_EXCEPTION(invalid_argument("Argument must be a service."));
 
-       Notification::Ptr notification = arguments[0];
        NotificationType type = static_cast<NotificationType>(static_cast<int>(arguments[1]));
 
        vector<Dictionary::Ptr> macroDicts;
-       macroDicts.push_back(notification->GetMacros());
-       macroDicts.push_back(notification->GetService()->GetMacros());
-       macroDicts.push_back(notification->GetService()->CalculateDynamicMacros());
-       macroDicts.push_back(notification->GetService()->GetHost()->GetMacros());
-       macroDicts.push_back(notification->GetService()->GetHost()->CalculateDynamicMacros());
-       macroDicts.push_back(IcingaApplication::GetInstance()->GetMacros());
+       Value raw_command;
+       Service::Ptr service;
+       Host::Ptr host;
+       String service_name;
+
+       {
+               Notification::Ptr notification = arguments[0];
+               ObjectLock olock(notification);
+               macroDicts.push_back(notification->GetMacros());
+               raw_command = notification->GetNotificationCommand();
+               service = notification->GetService();
+       }
+
+       {
+               ObjectLock olock(service);
+               macroDicts.push_back(service->GetMacros());
+               macroDicts.push_back(service->CalculateDynamicMacros());
+               service_name = service->GetName();
+               host = service->GetHost();
+       }
+
+       {
+               ObjectLock olock(host);
+               macroDicts.push_back(host->GetMacros());
+               macroDicts.push_back(host->CalculateDynamicMacros());
+       }
+
+       {
+               IcingaApplication::Ptr app = IcingaApplication::GetInstance();
+               ObjectLock olock(app);
+               macroDicts.push_back(app->GetMacros());
+       }
+
        Dictionary::Ptr macros = MacroProcessor::MergeMacroDicts(macroDicts);
 
-       Value command = MacroProcessor::ResolveMacros(notification->GetNotificationCommand(), macros);
+       Value command = MacroProcessor::ResolveMacros(raw_command, macros);
 
        Process::Ptr process = boost::make_shared<Process>(Process::SplitCommand(command), macros);
 
-       PluginNotificationTask ct(task, process, notification->GetService()->GetName(), command);
+       PluginNotificationTask ct(task, process, service_name, command);
 
        process->Start(boost::bind(&PluginNotificationTask::ProcessFinishedHandler, ct));
 }
@@ -70,8 +94,6 @@ void PluginNotificationTask::ScriptFunc(const ScriptTask::Ptr& task, const vecto
  */
 void PluginNotificationTask::ProcessFinishedHandler(PluginNotificationTask ct)
 {
-       recursive_mutex::scoped_lock lock(Application::GetMutex());
-
        ProcessResult pr;
 
        try {
index 6038a4a088307156ae3f0a12d044e139cddd8b86..aab1d9f72717c8272ec4a11c2ba9636351f5365f 100644 (file)
@@ -535,7 +535,9 @@ void Service::ProcessCheckResult(const Dictionary::Ptr& cr)
 
        rm.SetParams(params);
 
-       EndpointManager::GetInstance()->SendMulticastMessage(rm);
+       EndpointManager::Ptr em = EndpointManager::GetInstance();
+       ObjectLock olock(em);
+       em->SendMulticastMessage(rm);
 }
 
 void Service::UpdateStatistics(const Dictionary::Ptr& cr)
index e3ff4fd6a9b2f262415e03c4965e0e5f1aa2f43b..d75fde7f074ce0ee88a2b52c72a80921c3352368 100644 (file)
@@ -210,11 +210,13 @@ void Service::RemoveExpiredComments(void)
 
 void Service::CommentsExpireTimerHandler(void)
 {
-       recursive_mutex::scoped_lock lock(Application::GetMutex());
+       DynamicType::Ptr dt = DynamicType::GetByName("Service");
+       ObjectLock dlock(dt);
 
        DynamicObject::Ptr object;
-       BOOST_FOREACH(tie(tuples::ignore, object), DynamicType::GetByName("Service")->GetObjects()) {
+       BOOST_FOREACH(tie(tuples::ignore, object), dt->GetObjects()) {
                Service::Ptr service = dynamic_pointer_cast<Service>(object);
+               ObjectLock olock(service);
                service->RemoveExpiredComments();
        }
 }
index fa12a93fc101437b06346e67d5013f9c3f2122cf..0d98fac6a913501451115437164dd59ed1a0e373 100644 (file)
@@ -275,11 +275,13 @@ void Service::RemoveExpiredDowntimes(void)
 
 void Service::DowntimesExpireTimerHandler(void)
 {
-       recursive_mutex::scoped_lock lock(Application::GetMutex());
+       DynamicType::Ptr dt = DynamicType::GetByName("Service");
+       ObjectLock dlock(dt);
 
        DynamicObject::Ptr object;
-       BOOST_FOREACH(tie(tuples::ignore, object), DynamicType::GetByName("Service")->GetObjects()) {
+       BOOST_FOREACH(tie(tuples::ignore, object), dt->GetObjects()) {
                Service::Ptr service = dynamic_pointer_cast<Service>(object);
+               ObjectLock slock(service);
                service->RemoveExpiredDowntimes();
        }
 }
index bf4e72bf12cb606fa3b1a6760e34fe19193dd703..6bf21848e769b3c261db1e6da9cf02dff99fc0f6 100644 (file)
@@ -94,9 +94,8 @@ Service::Ptr Service::GetByName(const String& name)
 Service::Ptr Service::GetByNamePair(const String& hostName, const String& serviceName)
 {
        if (!hostName.IsEmpty()) {
-               recursive_mutex::scoped_lock lock(Application::GetMutex());
-
                Host::Ptr host = Host::GetByName(hostName);
+               ObjectLock olock(host);
                return host->GetServiceByShortName(serviceName);
        } else {
                return Service::GetByName(serviceName);
index 5c0f0de6a0beb2eb1baa34813aa04bedfc2f0d26..5fb894c76b4d35ac0eb4a5676bf729b28f8a4891 100644 (file)
@@ -365,11 +365,7 @@ PyObject *PythonLanguage::PyRegisterFunction(PyObject *self, PyObject *args)
                return NULL;
        }
 
-       {
-               recursive_mutex::scoped_lock lock(Application::GetMutex());
-               interp->RegisterPythonFunction(name, object);
-       }
-
+       interp->RegisterPythonFunction(name, object);
 
        Py_INCREF(Py_None);
        return Py_None;
index d5631cc4e380ad720cbd1bab438c775a6ea0227a..a323c250e7e91d57ea92d86a70e5531ebaac98c1 100644 (file)
@@ -277,7 +277,7 @@ void Endpoint::ProcessRequest(const Endpoint::Ptr& sender, const RequestMessage&
                if (it == m_TopicHandlers.end())
                        return;
 
-               (*it->second)(GetSelf(), sender, request);
+               Application::GetEQ().Post(boost::bind(boost::ref(*it->second), GetSelf(), sender, request));
        } else {
                GetClient()->SendMessage(request);
        }
@@ -360,4 +360,3 @@ String Endpoint::GetService(void) const
 {
        return Get("service");
 }
-
index e7d004bbaaa8c1c5075443f748545c909d1f177c..3d122a7cf6aec72db0b434e5a6894f854541d202 100644 (file)
@@ -325,8 +325,6 @@ bool EndpointManager::RequestTimeoutLessComparer(const pair<String, PendingReque
 
 void EndpointManager::SubscriptionTimerHandler(void)
 {
-       recursive_mutex::scoped_lock lock(Application::GetMutex());
-
        Dictionary::Ptr subscriptions = boost::make_shared<Dictionary>();
 
        DynamicObject::Ptr object;
@@ -351,8 +349,6 @@ void EndpointManager::SubscriptionTimerHandler(void)
 
 void EndpointManager::ReconnectTimerHandler(void)
 {
-       recursive_mutex::scoped_lock lock(Application::GetMutex());
-
        DynamicObject::Ptr object;
        BOOST_FOREACH(tie(tuples::ignore, object), DynamicType::GetByName("Endpoint")->GetObjects()) {
                Endpoint::Ptr endpoint = dynamic_pointer_cast<Endpoint>(object);
@@ -377,8 +373,6 @@ void EndpointManager::ReconnectTimerHandler(void)
 
 void EndpointManager::RequestTimerHandler(void)
 {
-       recursive_mutex::scoped_lock lock(Application::GetMutex());
-
        map<String, PendingRequest>::iterator it;
        for (it = m_Requests.begin(); it != m_Requests.end(); it++) {
                if (it->second.HasTimedOut()) {