]> granicus.if.org Git - icinga2/commitdiff
Fine-grained locks (WIP, Part 1).
authorGunnar Beutner <gunnar.beutner@netways.de>
Sun, 17 Feb 2013 18:14:34 +0000 (19:14 +0100)
committerGunnar Beutner <gunnar.beutner@netways.de>
Sun, 17 Feb 2013 18:14:34 +0000 (19:14 +0100)
71 files changed:
components/checker/checkercomponent.cpp
components/checker/checkercomponent.h
components/checker/i2-checker.h
components/compat/compatcomponent.cpp
components/delegation/delegationcomponent.cpp
components/demo/democomponent.cpp
components/notification/notificationcomponent.cpp
components/replication/replicationcomponent.cpp
icinga-app/icinga.cpp
lib/base/application.cpp
lib/base/application.h
lib/base/asynctask.h
lib/base/component.cpp
lib/base/connection.h
lib/base/dynamicobject.cpp
lib/base/dynamicobject.h
lib/base/dynamictype.cpp
lib/base/dynamictype.h
lib/base/eventqueue.cpp
lib/base/eventqueue.h
lib/base/i2-base.h
lib/base/logger.cpp
lib/base/netstring.cpp
lib/base/object.cpp
lib/base/object.h
lib/base/process-unix.cpp
lib/base/process-windows.cpp
lib/base/process.cpp
lib/base/process.h
lib/base/script.cpp
lib/base/script.h
lib/base/scriptfunction.cpp
lib/base/scriptfunction.h
lib/base/scriptinterpreter.cpp
lib/base/scriptinterpreter.h
lib/base/scriptlanguage.cpp
lib/base/scriptlanguage.h
lib/base/socket.cpp
lib/base/socket.h
lib/base/stream.cpp
lib/base/stream.h
lib/base/timer.cpp
lib/base/timer.h
lib/base/utility.cpp
lib/base/utility.h
lib/config/configitem.cpp
lib/config/configitem.h
lib/config/configtype.cpp
lib/config/expression.cpp
lib/icinga/api.cpp
lib/icinga/cib.cpp
lib/icinga/cib.h
lib/icinga/externalcommandprocessor.cpp
lib/icinga/externalcommandprocessor.h
lib/icinga/host.cpp
lib/icinga/hostgroup.cpp
lib/icinga/macroprocessor.cpp
lib/icinga/pluginchecktask.cpp
lib/icinga/pluginnotificationtask.cpp
lib/icinga/service-check.cpp
lib/icinga/service-comment.cpp
lib/icinga/service-downtime.cpp
lib/icinga/service.cpp
lib/icinga/service.h
lib/icinga/servicegroup.cpp
lib/python/pythonlanguage.cpp
lib/remoting/endpoint.cpp
lib/remoting/endpoint.h
lib/remoting/endpointmanager.cpp
lib/remoting/endpointmanager.h
lib/remoting/jsonrpcconnection.h

index 12b3bad69d7f51a53a759b85162a3ef31c14ec4a..f084368909bd93e9e3db70284b51c679b0bb3735 100644 (file)
@@ -53,27 +53,44 @@ void CheckerComponent::Stop(void)
 
 void CheckerComponent::CheckTimerHandler(void)
 {
+       recursive_mutex::scoped_lock lock(Application::GetMutex());
+
        double now = Utility::GetTime();
        long tasks = 0;
 
        int missedServices = 0, missedChecks = 0;
 
-       while (!m_IdleServices.empty()) {
-               typedef nth_index<ServiceSet, 1>::type CheckTimeView;
-               CheckTimeView& idx = boost::get<1>(m_IdleServices);
+       for (;;) {
+               Service::Ptr service;
+
+               {
+                       boost::mutex::scoped_lock lock(m_Mutex);
+
+                       typedef nth_index<ServiceSet, 1>::type CheckTimeView;
+                       CheckTimeView& idx = boost::get<1>(m_IdleServices);
+
+                       if (idx.begin() == idx.end())
+                               break;
 
-               CheckTimeView::iterator it = idx.begin();
-               Service::Ptr service = it->lock();
+                       CheckTimeView::iterator it = idx.begin();
+                       service = it->lock();
+
+                       if (!service) {
+                               idx.erase(it);
+                               continue;
+                       }
+
+                       {
+                               ObjectLock olock(service);
+
+                               if (service->GetNextCheck() > now)
+                                       break;
+                       }
 
-               if (!service) {
                        idx.erase(it);
-                       continue;
                }
 
-               if (service->GetNextCheck() > now)
-                       break;
-
-               idx.erase(it);
+               ObjectLock olock(service);
 
                /* reschedule the service if checks are currently disabled
                 * for it and this is not a forced check */
@@ -83,7 +100,14 @@ void CheckerComponent::CheckTimerHandler(void)
 
                                service->UpdateNextCheck();
 
-                               idx.insert(service);
+                               {
+                                       boost::mutex::scoped_lock lock(m_Mutex);
+
+                                       typedef nth_index<ServiceSet, 1>::type CheckTimeView;
+                                       CheckTimeView& idx = boost::get<1>(m_IdleServices);
+
+                                       idx.insert(service);
+                               }
 
                                continue;
                        }
@@ -136,19 +160,26 @@ void CheckerComponent::CheckTimerHandler(void)
 
 void CheckerComponent::CheckCompletedHandler(const Service::Ptr& service)
 {
-       /* remove the service from the list of pending services; if it's not in the
-        * list this was a manual (i.e. forced) check and we must not re-add the
-        * service to the services list because it's already there. */
-       CheckerComponent::ServiceSet::iterator it;
-       it = m_PendingServices.find(service);
-       if (it != m_PendingServices.end()) {
-               m_PendingServices.erase(it);
-               m_IdleServices.insert(service);
+       {
+               boost::mutex::scoped_lock lock(m_Mutex);
+
+               /* remove the service from the list of pending services; if it's not in the
+                * list this was a manual (i.e. forced) check and we must not re-add the
+                * service to the services list because it's already there. */
+               CheckerComponent::ServiceSet::iterator it;
+               it = m_PendingServices.find(service);
+               if (it != m_PendingServices.end()) {
+                       m_PendingServices.erase(it);
+                       m_IdleServices.insert(service);
+               }
        }
 
        RescheduleCheckTimer();
 
-       Logger::Write(LogDebug, "checker", "Check finished for service '" + service->GetName() + "'");
+       {
+               ObjectLock olock(service);
+               Logger::Write(LogDebug, "checker", "Check finished for service '" + service->GetName() + "'");
+       }
 }
 
 void CheckerComponent::ResultTimerHandler(void)
@@ -156,20 +187,35 @@ void CheckerComponent::ResultTimerHandler(void)
        Logger::Write(LogDebug, "checker", "ResultTimerHandler entered.");
 
        stringstream msgbuf;
-       msgbuf << "Pending services: " << m_PendingServices.size() << "; Idle services: " << m_IdleServices.size();
+
+       {
+               boost::mutex::scoped_lock lock(m_Mutex);
+
+               msgbuf << "Pending services: " << m_PendingServices.size() << "; Idle services: " << m_IdleServices.size();
+       }
+
        Logger::Write(LogInformation, "checker", msgbuf.str());
 }
 
 void CheckerComponent::CheckerChangedHandler(const Service::Ptr& service)
 {
-       String checker = service->GetChecker();
+       String checker;
+
+       {
+               ObjectLock olock(service);
+               checker = service->GetChecker();
+       }
 
        if (checker == EndpointManager::GetInstance()->GetIdentity() || checker == m_Endpoint->GetName()) {
+               boost::mutex::scoped_lock lock(m_Mutex);
+
                if (m_PendingServices.find(service) != m_PendingServices.end())
                        return;
 
                m_IdleServices.insert(service);
        } else {
+               boost::mutex::scoped_lock lock(m_Mutex);
+
                m_IdleServices.erase(service);
                m_PendingServices.erase(service);
        }
@@ -177,16 +223,20 @@ void CheckerComponent::CheckerChangedHandler(const Service::Ptr& service)
 
 void CheckerComponent::NextCheckChangedHandler(const Service::Ptr& service)
 {
-       /* remove and re-insert the service from the set in order to force an index update */
-       typedef nth_index<ServiceSet, 0>::type ServiceView;
-       ServiceView& idx = boost::get<0>(m_IdleServices);
+       {
+               boost::mutex::scoped_lock lock(m_Mutex);
 
-       ServiceView::iterator it = idx.find(service);
-       if (it == idx.end())
-               return;
+               /* remove and re-insert the service from the set in order to force an index update */
+               typedef nth_index<ServiceSet, 0>::type ServiceView;
+               ServiceView& idx = boost::get<0>(m_IdleServices);
+
+               ServiceView::iterator it = idx.find(service);
+               if (it == idx.end())
+                       return;
 
-       idx.erase(it);
-       idx.insert(service);
+               idx.erase(it);
+               idx.insert(service);
+       }
 
        RescheduleCheckTimer();
 }
@@ -199,31 +249,40 @@ void CheckerComponent::ObjectRemovedHandler(const DynamicObject::Ptr& object)
        if (!service)
                return;
 
-       m_IdleServices.erase(service);
-       m_PendingServices.erase(service);
+       {
+               boost::mutex::scoped_lock lock(m_Mutex);
+
+               m_IdleServices.erase(service);
+               m_PendingServices.erase(service);
+       }
 }
 
 void CheckerComponent::RescheduleCheckTimer(void)
 {
-       if (m_IdleServices.empty())
-               return;
-
-       typedef nth_index<ServiceSet, 1>::type CheckTimeView;
-       CheckTimeView& idx = boost::get<1>(m_IdleServices);
-
        Service::Ptr service;
 
-       do {
-               CheckTimeView::iterator it = idx.begin();
+       {
+               boost::mutex::scoped_lock lock(m_Mutex);
 
-               if (it == idx.end())
+               if (m_IdleServices.empty())
                        return;
 
-               service = it->lock();
+               typedef nth_index<ServiceSet, 1>::type CheckTimeView;
+               CheckTimeView& idx = boost::get<1>(m_IdleServices);
+
+               do {
+                       CheckTimeView::iterator it = idx.begin();
 
-               if (!service)
-                       idx.erase(it);
-       } while (!service);
+                       if (it == idx.end())
+                               return;
+
+                       service = it->lock();
+
+                       if (!service)
+                               idx.erase(it);
+               } while (!service);
+       }
 
+       ObjectLock olock(service);
        m_CheckTimer->Reschedule(service->GetNextCheck());
 }
index f930fa04bd9b8c61ca13e9923379ca985f11e935..4e4134cded1af86ff8eef7c1baa3e36ae081787d 100644 (file)
@@ -37,7 +37,10 @@ struct ServiceNextCheckExtractor
                if (!service)
                        return 0;
 
-               return service->GetNextCheck();
+               {
+                       ObjectLock olock(service);
+                       return service->GetNextCheck();
+               }
        }
 };
 
@@ -64,6 +67,8 @@ public:
 private:
        Endpoint::Ptr m_Endpoint;
 
+       boost::mutex m_Mutex;
+
        ServiceSet m_IdleServices;
        ServiceSet m_PendingServices;
 
index 9b7a7f49e4b579a2fee9f7cf04085e256c4af9da..f21721915f27e0d4f07b747b1b6529afce18e6f0 100644 (file)
 #include <i2-base.h>
 #include <i2-icinga.h>
 
-#include <boost/multi_index_container.hpp>
-#include <boost/multi_index/ordered_index.hpp>
-#include <boost/multi_index/key_extractors.hpp>
-
-using boost::multi_index_container;
-using boost::multi_index::indexed_by;
-using boost::multi_index::identity;
-using boost::multi_index::ordered_unique;
-using boost::multi_index::ordered_non_unique;
-using boost::multi_index::nth_index;
-
 #include "checkercomponent.h"
 
 #endif /* I2CHECKER_H */
index ff080e9ac6dff3d5f93787db2403f0277dffd142..360a2af89d5125cd30020f31909c870f9596dacd 100644 (file)
@@ -124,7 +124,6 @@ void CompatComponent::CommandPipeThread(const String& commandPath)
                }
        }
 
-
        if (!fifo_ok && mkfifo(commandPath.CStr(), S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP) < 0)
                BOOST_THROW_EXCEPTION(PosixException("mkfifo() failed", errno));
 
@@ -154,7 +153,12 @@ void CompatComponent::CommandPipeThread(const String& commandPath)
                                line[strlen(line) - 1] = '\0';
 
                        String command = line;
-                       Application::GetEQ().Post(boost::bind(&CompatComponent::ProcessCommand, this, command));
+
+                       {
+                               recursive_mutex::scoped_lock lock(Application::GetMutex());
+
+                               ProcessCommand(command);
+                       }
                }
 
                fclose(fp);
@@ -412,6 +416,8 @@ void CompatComponent::DumpServiceObject(ofstream& fp, const Service::Ptr& servic
  */
 void CompatComponent::StatusTimerHandler(void)
 {
+       recursive_mutex::scoped_lock lock(Application::GetMutex());
+
        Logger::Write(LogInformation, "compat", "Writing compat status information");
 
        String statuspath = GetStatusPath();
index fb00b433574bef45bf2b5cc62f32c54a81ad91b8..4e32271a64ca1beb682f594af0fd015649d0b7bb 100644 (file)
@@ -82,6 +82,8 @@ vector<Endpoint::Ptr> DelegationComponent::GetCheckerCandidates(const Service::P
 
 void DelegationComponent::DelegationTimerHandler(void)
 {
+       recursive_mutex::scoped_lock lock(Application::GetMutex());
+
        map<Endpoint::Ptr, int> histogram;
 
        DynamicObject::Ptr object;
index e2657d078f07b6fb92e32b68e782313d2de76361..07272ad970b9bf645dd47abb64d685f89540fe09 100644 (file)
@@ -54,6 +54,8 @@ void DemoComponent::Stop(void)
  */
 void DemoComponent::DemoTimerHandler(void)
 {
+       recursive_mutex::scoped_lock lock(Application::GetMutex());
+
        Logger::Write(LogInformation, "demo", "Sending multicast 'hello"
            " world' message.");
 
index d4de1111fd9aa42abd2b7b5b9de498b546301ef8..a529ce5364cfa18ef2dd6f180108fc33d56764b2 100644 (file)
@@ -53,6 +53,8 @@ void NotificationComponent::Stop(void)
  */
 void NotificationComponent::NotificationTimerHandler(void)
 {
+       recursive_mutex::scoped_lock lock(Application::GetMutex());
+
        // TODO: implement
 }
 
index e35a6e3e58f3513db07f6284c0a2a76ed3771219..218ca7adaaab16c559a9d751f80c045ba59c212e 100644 (file)
@@ -32,7 +32,7 @@ void ReplicationComponent::Start(void)
 
        DynamicObject::OnRegistered.connect(boost::bind(&ReplicationComponent::LocalObjectRegisteredHandler, this, _1));
        DynamicObject::OnUnregistered.connect(boost::bind(&ReplicationComponent::LocalObjectUnregisteredHandler, this, _1));
-       DynamicObject::OnTransactionClosing.connect(boost::bind(&ReplicationComponent::TransactionClosingHandler, this, _1));
+       DynamicObject::OnTransactionClosing.connect(boost::bind(&ReplicationComponent::TransactionClosingHandler, this, _2));
 
        Endpoint::OnConnected.connect(boost::bind(&ReplicationComponent::EndpointConnectedHandler, this, _1));
 
index 08af5900bb993e3bed865f0a2792b59951a64d6b..1dafdfe9517a8274c15681e65dbbf34ae3e3fd60 100644 (file)
@@ -66,23 +66,6 @@ static bool LoadConfigFiles(bool validateOnly)
        if (hasError)
                return false;
 
-/*                     Logger::Write(LogInformation, "icinga-app", "Validating config items...");
-                       DynamicType::Ptr type;
-                       BOOST_FOREACH(tie(tuples::ignore, type), DynamicType::GetTypes()) {
-                               ConfigType::Ptr ctype = ConfigType::GetByName(type->GetName());
-
-                               if (!ctype) {
-                                       Logger::Write(LogWarning, "icinga-app", "No config type found for type '" + type->GetName() + "'");
-
-                                       continue;
-                               }
-
-                               DynamicObject::Ptr object;
-                               BOOST_FOREACH(tie(tuples::ignore, object), type->GetObjects()) {
-                                       ctype->ValidateObject(object);
-                               }
-                       }*/
-
        if (validateOnly)
                return true;
 
@@ -105,8 +88,13 @@ static bool LoadConfigFiles(bool validateOnly)
 static void ReloadConfigTimerHandler(void)
 {
        if (g_ReloadConfig) {
-               Logger::Write(LogInformation, "icinga-app", "Received SIGHUP. Reloading config files.");
-               LoadConfigFiles(false);
+               {
+                       recursive_mutex::scoped_lock lock(Application::GetMutex());
+
+                       Logger::Write(LogInformation, "icinga-app", "Received SIGHUP. Reloading config files.");
+                       LoadConfigFiles(false);
+               }
+
                g_ReloadConfig = false;
        }
 }
@@ -136,10 +124,6 @@ int main(int argc, char **argv)
        lt_dlinit();
 #endif /* _WIN32 */
 
-       /* This must be done before calling any other functions
-        * in the base library. */
-       Application::SetMainThread();
-
        /* Set command-line arguments. */
        Application::SetArgC(argc);
        Application::SetArgV(argv);
@@ -252,14 +236,14 @@ int main(int argc, char **argv)
                return EXIT_FAILURE;
        }
 
-       DynamicObject::BeginTx();
+       DynamicObject::NewTx();
 
        bool validateOnly = g_AppParams.count("validate");
 
        if (!LoadConfigFiles(validateOnly))
                return EXIT_FAILURE;
 
-       DynamicObject::FinishTx();
+       DynamicObject::NewTx();
 
        if (validateOnly) {
                Logger::Write(LogInformation, "icinga-app", "Terminating as requested by --validate.");
@@ -290,4 +274,3 @@ int main(int argc, char **argv)
 
        return app->Run();
 }
-
index 531a8168de19b6412947a251bcf9953a61da5fbf..ab91ce35b7b0da07f732a441d892ed6fa9a79f2b 100644 (file)
@@ -21,7 +21,7 @@
 
 using namespace icinga;
 
-boost::mutex Application::m_Mutex;
+recursive_mutex Application::m_Mutex;
 Application *Application::m_Instance = NULL;
 bool Application::m_ShuttingDown = false;
 bool Application::m_Debugging = false;
@@ -110,25 +110,26 @@ void Application::SetArgV(char **argv)
        m_ArgV = argv;
 }
 
-/**
- * Runs one iteration of the event loop.
- *
- * @returns false if we're shutting down, true otherwise.
- */
-bool Application::ProcessEvents(void)
+void Application::NewTxTimerHandler(void)
 {
-       Object::ClearHeldObjects();
-
-       double sleep = Timer::ProcessTimers();
-
-       if (m_ShuttingDown)
-               return false;
+       DynamicObject::NewTx();
+}
 
-       GetEQ().ProcessEvents(m_Mutex, boost::posix_time::milliseconds(sleep * 1000));
+#ifdef _DEBUG
+void Application::ProfileTimerHandler(void)
+{
+       stringstream msgbuf;
+       msgbuf << "Active objects: " << Object::GetAliveObjectsCount();
+       Logger::Write(LogInformation, "base", msgbuf.str());
 
-       DynamicObject::FlushTx();
+       Object::PrintMemoryProfile();
+}
+#endif /* _DEBUG */
 
-       return true;
+void Application::ShutdownTimerHandler(void)
+{
+       if (m_ShuttingDown)
+               m_EQ.Stop();
 }
 
 /**
@@ -137,32 +138,31 @@ bool Application::ProcessEvents(void)
  */
 void Application::RunEventLoop(void) const
 {
-       boost::mutex::scoped_lock lock(m_Mutex);
-
-#ifdef _DEBUG
-       double nextProfile = 0;
-#endif /* _DEBUG */
-
        /* Start the system time watch thread. */
        thread t(&Application::TimeWatchThreadProc);
        t.detach();
 
-       while (!m_ShuttingDown) {
-               if (!ProcessEvents())
-                       break;
+       /* Set up a timer to periodically flush the tx. */
+       Timer::Ptr newTxTimer = boost::make_shared<Timer>();
+       newTxTimer->OnTimerExpired.connect(boost::bind(&Application::NewTxTimerHandler));
+       newTxTimer->SetInterval(0.5);
+       newTxTimer->Start();
 
-#ifdef _DEBUG
-               if (nextProfile < Utility::GetTime()) {
-                       stringstream msgbuf;
-                       msgbuf << "Active objects: " << Object::GetAliveObjectsCount();
-                       Logger::Write(LogInformation, "base", msgbuf.str());
-
-                       Object::PrintMemoryProfile();
+       /* Set up a timer that watches the m_Shutdown flag. */
+       Timer::Ptr shutdownTimer = boost::make_shared<Timer>();
+       shutdownTimer->OnTimerExpired.connect(boost::bind(&Application::ShutdownTimerHandler));
+       shutdownTimer->SetInterval(0.5);
+       shutdownTimer->Start();
 
-                       nextProfile = Utility::GetTime() + 15.0;
-               }
+#ifdef _DEBUG
+       /* Set up a timer that periodically prints some information about the object system. */
+       Timer::Ptr profileTimer = boost::make_shared<Timer>();
+       profileTimer->OnTimerExpired.connect(boost::bind(&Application::ProfileTimerHandler));
+       flushTxTimer->SetInterval(15);
+       flushTxTimer->Start();
 #endif /* _DEBUG */
-       }
+
+       GetEQ().Run();
 }
 
 /**
@@ -186,12 +186,7 @@ void Application::TimeWatchThreadProc(void)
                               << " in time: " << abs(timeDiff) << " seconds";
                        Logger::Write(LogInformation, "base", msgbuf.str());
 
-                       /* in addition to rescheduling the timers this
-                        * causes the event loop to wake up thereby
-                        * solving the problem that timed_wait()
-                        * uses an absolute timestamp for the timeout */
-                       GetEQ().Post(boost::bind(&Timer::AdjustTimers,
-                           -timeDiff));
+                       Timer::AdjustTimers(-timeDiff);
                }
 
                lastLoop = now;
@@ -302,25 +297,6 @@ bool Application::IsDebugging(void)
        return m_Debugging;
 }
 
-/**
- * Checks whether we're currently on the main thread.
- *
- * @returns true if this is the main thread, false otherwise
- */
-bool Application::IsMainThread(void)
-{
-       return (boost::this_thread::get_id() == m_MainThreadID);
-}
-
-/**
- * Sets the main thread to the currently running thread.
- */
-void Application::SetMainThread(void)
-{
-       m_MainThreadID = boost::this_thread::get_id();
-       m_EQ.SetOwner(m_MainThreadID);
-}
-
 /**
  * Displays a message that tells users what to do when they encounter a bug.
  */
@@ -455,11 +431,11 @@ int Application::Run(void)
        SetConsoleCtrlHandler(&Application::CtrlHandler, TRUE);
 #endif /* _WIN32 */
 
-       DynamicObject::BeginTx();
+       DynamicObject::NewTx();
 
        result = Main();
 
-       DynamicObject::FinishTx();
+       DynamicObject::NewTx();
        DynamicObject::DeactivateObjects();
 
        return result;
@@ -594,11 +570,11 @@ void Application::SetPkgDataDir(const String& path)
 }
 
 /**
- * Returns the global mutex for the main thread.
+ * Returns the global mutex.
  *
  * @returns The mutex.
  */
-boost::mutex& Application::GetMutex(void)
+recursive_mutex& Application::GetMutex(void)
 {
        return m_Mutex;
 }
index 358a8ee2e9a454cb082b6b3557a85fb58168766c..f816f7b0e2995b987e8cbcfa9e53325e32b4fbfc 100644 (file)
@@ -62,9 +62,6 @@ public:
        static void SetDebugging(bool debug);
        static bool IsDebugging(void);
 
-       static bool IsMainThread(void);
-       static void SetMainThread(void);
-
        void UpdatePidFile(const String& filename);
        void ClosePidFile(void);
 
@@ -82,9 +79,7 @@ public:
        static String GetPkgDataDir(void);
        static void SetPkgDataDir(const String& path);
 
-       static bool ProcessEvents(void);
-
-       static boost::mutex& GetMutex(void);
+       static recursive_mutex& GetMutex(void);
 
        static EventQueue& GetEQ(void);
 
@@ -92,7 +87,7 @@ protected:
        void RunEventLoop(void) const;
 
 private:
-       static boost::mutex m_Mutex; /**< The main thread mutex. */
+       static recursive_mutex m_Mutex; /**< The global mutex. */
        static Application *m_Instance; /**< The application instance. */
 
        static bool m_ShuttingDown; /**< Whether the application is in the process of
@@ -120,6 +115,11 @@ private:
        static void ExceptionHandler(void);
 
        static void TimeWatchThreadProc(void);
+       static void NewTxTimerHandler(void);
+#ifdef _DEBUG
+       static void ProfileTimerHandler(void)
+#endif /* _DEBUG */
+       static void ShutdownTimerHandler(void);
 };
 
 }
index ca0f8b3d668f3856a3ecc1a72ffad80d54bd617e..a1941866b4fc05723b1e50bb597d6cb650316721 100644 (file)
@@ -18,7 +18,7 @@
  ******************************************************************************/
 
 #ifndef ASYNCTASK_H
-#define ASYNCTASK_H 
+#define ASYNCTASK_H
 
 namespace icinga
 {
@@ -79,6 +79,7 @@ public:
         */
        bool IsFinished(void) const
        {
+               boost::mutex::scoped_lock lock(m_Mutex);
                return m_Finished;
        }
 
@@ -133,7 +134,9 @@ public:
         */
        void Wait(void)
        {
-               Utility::WaitUntil(boost::bind(&AsyncTask<TClass, TResult>::IsFinished, this));
+               boost::mutex::scoped_lock lock(m_Mutex);
+               while (!m_Finished)
+                       m_CV.wait(lock);
        }
 
 protected:
@@ -151,9 +154,14 @@ private:
         */
        void FinishInternal(void)
        {
-               assert(!m_Finished);
+               {
+                       boost::mutex::scoped_lock lock(m_Mutex);
+                       assert(!m_Finished);
 
-               m_Finished = true;
+                       m_Finished = true;
+
+                       m_CV.notify_all();
+               }
 
                if (!m_CompletionCallback.empty()) {
                        m_CompletionCallback(GetSelf());
@@ -164,6 +172,8 @@ private:
                }
        }
 
+       mutable boost::mutex m_Mutex;
+       boost::condition_variable m_CV;
        CompletionCallback m_CompletionCallback; /**< The completion callback. */
        TResult m_Result; /**< The task's result. */
        boost::exception_ptr m_Exception; /**< The task's exception. */
index f60ed4f6c5608ee0b59ba4b1a6b2adecff7e5937..86074b7c70a83f4919393e9e8b0737b798defb84 100644 (file)
@@ -29,8 +29,6 @@ REGISTER_TYPE(Component, NULL);
 Component::Component(const Dictionary::Ptr& properties)
        : DynamicObject(properties)
 {
-       assert(Application::IsMainThread());
-
        if (!IsLocal())
                BOOST_THROW_EXCEPTION(runtime_error("Component objects must be local."));
 
index 4e48d408153ddbc65626c8e2b5b3aa9f23ab772d..8167e9a73227e398e697f8a2626cd4c06e21fe5e 100644 (file)
@@ -35,7 +35,7 @@ public:
 
        void Close(void);
 
-       boost::signal<void (const Connection::Ptr&)> OnClosed;
+       signals2::signal<void (const Connection::Ptr&)> OnClosed;
 
 protected:
        virtual void ProcessData(void) = 0;
index 10ae87196554f540775607f8c9562abfa68ada6c..4583805c74b78eb5ec056dab321d6a3b48bd65a7 100644 (file)
@@ -23,10 +23,11 @@ using namespace icinga;
 
 double DynamicObject::m_CurrentTx = 0;
 set<DynamicObject *> DynamicObject::m_ModifiedObjects;
+boost::mutex DynamicObject::m_ModifiedObjectsMutex;
 
-boost::signal<void (const DynamicObject::Ptr&)> DynamicObject::OnRegistered;
-boost::signal<void (const DynamicObject::Ptr&)> DynamicObject::OnUnregistered;
-boost::signal<void (const set<DynamicObject *>&)> DynamicObject::OnTransactionClosing;
+signals2::signal<void (const DynamicObject::Ptr&)> DynamicObject::OnRegistered;
+signals2::signal<void (const DynamicObject::Ptr&)> DynamicObject::OnUnregistered;
+signals2::signal<void (double, const set<DynamicObject *>&)> DynamicObject::OnTransactionClosing;
 
 DynamicObject::DynamicObject(const Dictionary::Ptr& serializedObject)
        : m_ConfigTx(0)
@@ -47,8 +48,12 @@ DynamicObject::DynamicObject(const Dictionary::Ptr& serializedObject)
        ApplyUpdate(serializedObject, Attribute_Config);
 }
 
+/*
+ * @threadsafety Always.
+ */
 DynamicObject::~DynamicObject(void)
 {
+       boost::mutex::scoped_lock lock(m_ModifiedObjectsMutex);
        m_ModifiedObjects.erase(this);
 }
 
@@ -193,7 +198,10 @@ void DynamicObject::InternalSetAttribute(const String& name, const Value& data,
        if (tt.first->second.Type & Attribute_Config)
                m_ConfigTx = tx;
 
-       m_ModifiedObjects.insert(this);
+       {
+               boost::mutex::scoped_lock lock(m_ModifiedObjectsMutex);
+               m_ModifiedObjects.insert(this);
+       }
 
        /* Use insert() rather than [] so we don't overwrite
         * an existing oldValue if the attribute was previously
@@ -272,7 +280,7 @@ String DynamicObject::GetSource(void) const
 
 void DynamicObject::Register(void)
 {
-       assert(Application::IsMainThread());
+       recursive_mutex::scoped_lock lock(Application::GetMutex());
 
        DynamicType::Ptr dtype = GetType();
 
@@ -294,7 +302,7 @@ void DynamicObject::Start(void)
 
 void DynamicObject::Unregister(void)
 {
-       assert(Application::IsMainThread());
+       recursive_mutex::scoped_lock lock(Application::GetMutex());
 
        DynamicType::Ptr dtype = GetType();
 
@@ -331,8 +339,13 @@ ScriptTask::Ptr DynamicObject::InvokeMethod(const String& method,
        return task;
 }
 
+/*
+ * @threadsafety Always.
+ */
 void DynamicObject::DumpObjects(const String& filename)
 {
+       recursive_mutex::scoped_lock lock(Application::GetMutex());
+
        Logger::Write(LogInformation, "base", "Dumping program state to file '" + filename + "'");
 
        String tempFilename = filename + ".tmp";
@@ -391,8 +404,13 @@ void DynamicObject::DumpObjects(const String& filename)
                BOOST_THROW_EXCEPTION(PosixException("rename() failed", errno));
 }
 
+/*
+ * @threadsafety Always.
+ */
 void DynamicObject::RestoreObjects(const String& filename)
 {
+       recursive_mutex::scoped_lock lock(Application::GetMutex());
+
        Logger::Write(LogInformation, "base", "Restoring program state from file '" + filename + "'");
 
        std::fstream fp;
@@ -437,8 +455,13 @@ void DynamicObject::RestoreObjects(const String& filename)
        Logger::Write(LogDebug, "base", msgbuf.str());
 }
 
+/*
+ * @threadsafety Always.
+ */
 void DynamicObject::DeactivateObjects(void)
 {
+       recursive_mutex::scoped_lock lock(Application::GetMutex());
+
        DynamicType::TypeMap::iterator tt;
        for (tt = DynamicType::GetTypes().begin(); tt != DynamicType::GetTypes().end(); tt++) {
                DynamicType::NameMap::iterator nt;
@@ -451,34 +474,42 @@ void DynamicObject::DeactivateObjects(void)
        }
 }
 
+/*
+ * @threadsafety Always.
+ */
 double DynamicObject::GetCurrentTx(void)
 {
+       recursive_mutex::scoped_lock lock(Application::GetMutex());
+
        assert(m_CurrentTx != 0);
 
        return m_CurrentTx;
 }
 
-void DynamicObject::BeginTx(void)
+/*
+ * @threadsafety Always.
+ */
+void DynamicObject::NewTx(void)
 {
-       m_CurrentTx = Utility::GetTime();
-}
+       set<DynamicObject *> objects;
 
-void DynamicObject::FinishTx(void)
-{
-       BOOST_FOREACH(DynamicObject *object, m_ModifiedObjects) {
-               object->SendLocalUpdateEvents();
+       {
+               boost::mutex::scoped_lock lock(m_ModifiedObjectsMutex);
+
+               /* Some objects may accidentally bleed into the next transaction because
+                * we're not holding the global mutex while "stealing" the modified objects,
+                * but that's entirely ok. */
+               m_ModifiedObjects.swap(objects);
        }
 
-       OnTransactionClosing(m_ModifiedObjects);
-       m_ModifiedObjects.clear();
+       recursive_mutex::scoped_lock lock(Application::GetMutex());
 
-       m_CurrentTx = 0;
-}
+       BOOST_FOREACH(DynamicObject *object, objects) {
+               object->SendLocalUpdateEvents();
+       }
 
-void DynamicObject::FlushTx(void)
-{
-       FinishTx();
-       BeginTx();
+       OnTransactionClosing(m_CurrentTx, objects);
+       m_CurrentTx = Utility::GetTime();
 }
 
 void DynamicObject::OnInitCompleted(void)
@@ -487,8 +518,13 @@ void DynamicObject::OnInitCompleted(void)
 void DynamicObject::OnAttributeChanged(const String&, const Value&)
 { }
 
+/*
+ * @threadsafety Always.
+ */
 DynamicObject::Ptr DynamicObject::GetObject(const String& type, const String& name)
 {
+       recursive_mutex::scoped_lock lock(Application::GetMutex());
+
        DynamicType::Ptr dtype = DynamicType::GetByName(type);
        return dtype->GetObject(name);
 }
index 8e02d075da4df3197eafa47442eb9ce52b203545..24377f3bd2eb017c6551d64ffa5a4520ae241bf4 100644 (file)
@@ -94,9 +94,9 @@ public:
 
        void ClearAttributesByType(DynamicAttributeType type);
 
-       static boost::signal<void (const DynamicObject::Ptr&)> OnRegistered;
-       static boost::signal<void (const DynamicObject::Ptr&)> OnUnregistered;
-       static boost::signal<void (const set<DynamicObject *>&)> OnTransactionClosing;
+       static signals2::signal<void (const DynamicObject::Ptr&)> OnRegistered;
+       static signals2::signal<void (const DynamicObject::Ptr&)> OnUnregistered;
+       static signals2::signal<void (double, const set<DynamicObject *>&)> OnTransactionClosing;
 
        ScriptTask::Ptr InvokeMethod(const String& method,
            const vector<Value>& arguments, ScriptTask::CompletionCallback callback);
@@ -127,9 +127,7 @@ public:
        static void DeactivateObjects(void);
 
        static double GetCurrentTx(void);
-       static void BeginTx(void);
-       static void FinishTx(void);
-       static void FlushTx(void);
+       static void NewTx(void);
 
 protected:
        virtual void OnInitCompleted(void);
@@ -149,6 +147,7 @@ private:
        /* This has to be a set of raw pointers because the DynamicObject
         * constructor has to be able to insert objects into this list. */
        static set<DynamicObject *> m_ModifiedObjects;
+       static boost::mutex m_ModifiedObjectsMutex;
 
        friend class DynamicType; /* for OnInitCompleted. */
 };
index 75ec7fae619c1b1d8815abb4a4ba3f233b49ece4..1e40c0681aa4f5ddb5896d93bb45b7358c0cb0f6 100644 (file)
 
 using namespace icinga;
 
+boost::mutex DynamicType::m_Mutex;
+
 DynamicType::DynamicType(const String& name, const DynamicType::ObjectFactory& factory)
        : m_Name(name), m_ObjectFactory(factory)
 { }
 
+/**
+ * @threadsafety Always.
+ */
 DynamicType::Ptr DynamicType::GetByName(const String& name)
 {
+       boost::mutex::scoped_lock lock(m_Mutex);
+
        DynamicType::TypeMap::const_iterator tt = GetTypes().find(name);
 
        if (tt == GetTypes().end())
@@ -35,12 +42,18 @@ DynamicType::Ptr DynamicType::GetByName(const String& name)
        return tt->second;
 }
 
+/**
+ * @threadsafety Caller must hold DynamicType::m_Mutex while using the map.
+ */
 DynamicType::TypeMap& DynamicType::GetTypes(void)
 {
        static DynamicType::TypeMap types;
        return types;
 }
 
+/**
+ * @threadsafety Caller must hold DynamicType::m_Mutex while using the map.
+ */
 DynamicType::NameMap& DynamicType::GetObjects(void)
 {
        return m_Objects;
@@ -71,9 +84,16 @@ DynamicObject::Ptr DynamicType::GetObject(const String& name) const
        return nt->second;
 }
 
+/**
+ * @threadsafety Always.
+ */
 void DynamicType::RegisterType(const DynamicType::Ptr& type)
 {
-       if (GetByName(type->GetName()))
+       boost::mutex::scoped_lock lock(m_Mutex);
+
+       DynamicType::TypeMap::const_iterator tt = GetTypes().find(type->GetName());
+
+       if (tt != GetTypes().end())
                BOOST_THROW_EXCEPTION(runtime_error("Cannot register class for type '" +
                    type->GetName() + "': Objects of this type already exist."));
 
@@ -99,6 +119,9 @@ DynamicObject::Ptr DynamicType::CreateObject(const Dictionary::Ptr& serializedUp
        return obj;
 }
 
+/**
+ * @threadsafety Always.
+ */
 bool DynamicType::TypeExists(const String& name)
 {
        return (GetByName(name));
index b2bbd6e6b7603aca97e98e574c6b2914235c8d03..d97a6c6602e6b985844c46a0a2de79a8a5bdb436 100644 (file)
@@ -47,15 +47,15 @@ public:
 
        static void RegisterType(const DynamicType::Ptr& type);
        static bool TypeExists(const String& name);
-       
+
        DynamicObject::Ptr CreateObject(const Dictionary::Ptr& serializedUpdate) const;
        DynamicObject::Ptr GetObject(const String& name) const;
 
        void RegisterObject(const DynamicObject::Ptr& object);
        void UnregisterObject(const DynamicObject::Ptr& object);
 
-       static TypeMap& GetTypes(void);
-       NameMap& GetObjects(void);
+       /* TODO(thread) make private */ static TypeMap& GetTypes(void);
+       /* TODO(thread) make private */ NameMap& GetObjects(void);
 
        void AddAttribute(const String& name, DynamicAttributeType type);
        void RemoveAttribute(const String& name);
@@ -64,6 +64,7 @@ public:
        void AddAttributes(const AttributeDescription *attributes, int attributeCount);
 
 private:
+       static boost::mutex m_Mutex;
        String m_Name;
        ObjectFactory m_ObjectFactory;
        map<String, DynamicAttributeType> m_Attributes;
index a236e1cf1f0cd96619d02fe68080c7e03d66a472..0dc4f40b82c5832c049832f3559f4d7a61b0598d 100644 (file)
 
 using namespace icinga;
 
+/**
+ * @threadsafety Always.
+ */
 EventQueue::EventQueue(void)
        : m_Stopped(false)
 { }
 
-boost::thread::id EventQueue::GetOwner(void) const
-{
-       return m_Owner;
-}
-
-void EventQueue::SetOwner(boost::thread::id owner)
+/**
+ * @threadsafety Always.
+ */
+EventQueue::~EventQueue(void)
 {
-       m_Owner = owner;
+       Stop();
 }
 
+/**
+ * @threadsafety Always.
+ */
 void EventQueue::Stop(void)
 {
        boost::mutex::scoped_lock lock(m_Mutex);
        m_Stopped = true;
-       m_EventAvailable.notify_all();
+       m_CV.notify_all();
 }
 
 /**
- * Waits for events using the specified timeout value and processes
- * them.
+ * Spawns worker threads and waits for them to complete.
  *
- * @param mtx The mutex that should be unlocked while waiting. Caller
- *           must have this mutex locked.
- * @param timeout The wait timeout.
- * @returns false if the queue has been stopped, true otherwise.
+ * @threadsafety Always.
  */
-bool EventQueue::ProcessEvents(boost::mutex& mtx, millisec timeout)
+void EventQueue::Run(void)
 {
-       vector<Callback> events;
+       thread_group threads;
 
-       mtx.unlock();
+       int cpus = thread::hardware_concurrency();
 
-       {
-               boost::mutex::scoped_lock lock(m_Mutex);
+       if (cpus == 0)
+               cpus = 4;
 
-               while (m_Events.empty() && !m_Stopped) {
-                       if (!m_EventAvailable.timed_wait(lock, timeout)) {
-                               mtx.lock();
+       for (int i = 0; i < cpus * 4; i++)
+               threads.create_thread(boost::bind(&EventQueue::QueueThreadProc, this));
 
-                               return !m_Stopped;
-                       }
-               }
+       threads.join_all();
+}
 
-               events.swap(m_Events);
-       }
+/**
+ * Waits for events and processes them.
+ *
+ * @threadsafety Always.
+ */
+void EventQueue::QueueThreadProc(void)
+{
+       while (!m_Stopped) {
+               vector<Callback> events;
 
-       mtx.lock();
+               {
+                       boost::mutex::scoped_lock lock(m_Mutex);
+
+                       while (m_Events.empty() && !m_Stopped)
+                               m_CV.wait(lock);
+
+                       events.swap(m_Events);
+               }
 
-       BOOST_FOREACH(const Callback& ev, events) {
-               double st = Utility::GetTime();
+               BOOST_FOREACH(const Callback& ev, events) {
+                       double st = Utility::GetTime();
 
-               ev();
+                       ev();
 
-               double et = Utility::GetTime();
+                       double et = Utility::GetTime();
 
-               if (et - st > 1.0) {
-                       stringstream msgbuf;
-                       msgbuf << "Event call took " << et - st << " seconds.";
-                       Logger::Write(LogWarning, "base", msgbuf.str());
+                       if (et - st > 1.0) {
+                               stringstream msgbuf;
+                               msgbuf << "Event call took " << et - st << " seconds.";
+                               Logger::Write(LogWarning, "base", msgbuf.str());
+                       }
                }
        }
-
-       return !m_Stopped;
 }
 
 /**
- * Appends an event to the event queue. Events will be processed in FIFO
- * order on the main thread.
+ * Appends an event to the event queue. Events will be processed in FIFO order.
  *
  * @param callback The callback function for the event.
+ * @threadsafety Always.
  */
 void EventQueue::Post(const EventQueue::Callback& callback)
 {
-       if (boost::this_thread::get_id() == m_Owner) {
-               callback();
-               return;
-       }
-
-       {
-               boost::mutex::scoped_lock lock(m_Mutex);
-               m_Events.push_back(callback);
-               m_EventAvailable.notify_all();
-       }
+       boost::mutex::scoped_lock lock(m_Mutex);
+       m_Events.push_back(callback);
+       m_CV.notify_all();
 }
index 422746482c6102919595e4bfb78584a140a7c229..dec61d21c7133e5fed9948ffa79d9f5512aa0216 100644 (file)
@@ -34,24 +34,23 @@ public:
        typedef function<void ()> Callback;
 
        EventQueue(void);
+       ~EventQueue(void);
 
-       bool ProcessEvents(boost::mutex& mtx, millisec timeout = boost::posix_time::milliseconds(30000));
+       void Run(void);
        void Post(const Callback& callback);
 
        void Stop(void);
 
-       boost::thread::id GetOwner(void) const;
-       void SetOwner(boost::thread::id owner);
-
-       boost::mutex& GetMutex(void);
-
 private:
        boost::thread::id m_Owner;
 
        boost::mutex m_Mutex;
+       condition_variable m_CV;
+
        bool m_Stopped;
        vector<Callback> m_Events;
-       condition_variable m_EventAvailable;
+
+       void QueueThreadProc(void);
 };
 
 }
index 8d10d41cf4de6a35122b5611ebc553cd63fc8cac..04e9684f97f5c8b01a2d7810f61327e27fd4d88b 100644 (file)
@@ -125,7 +125,7 @@ using std::type_info;
 #include <boost/make_shared.hpp>
 #include <boost/bind.hpp>
 #include <boost/function.hpp>
-#include <boost/signal.hpp>
+#include <boost/signals2.hpp>
 #include <boost/algorithm/string/trim.hpp>
 #include <boost/algorithm/string/split.hpp>
 #include <boost/algorithm/string/compare.hpp>
@@ -139,6 +139,9 @@ using std::type_info;
 #include <boost/uuid/uuid_io.hpp>
 #include <boost/program_options.hpp>
 #include <boost/exception/diagnostic_information.hpp>
+#include <boost/multi_index_container.hpp>
+#include <boost/multi_index/ordered_index.hpp>
+#include <boost/multi_index/key_extractors.hpp>
 
 using boost::shared_ptr;
 using boost::weak_ptr;
@@ -148,6 +151,7 @@ using boost::static_pointer_cast;
 using boost::function;
 using boost::thread;
 using boost::thread_group;
+using boost::recursive_mutex;
 using boost::condition_variable;
 using boost::system_time;
 using boost::posix_time::millisec;
@@ -155,11 +159,18 @@ using boost::tie;
 using boost::rethrow_exception;
 using boost::current_exception;
 using boost::diagnostic_information;
+using boost::multi_index_container;
+using boost::multi_index::indexed_by;
+using boost::multi_index::identity;
+using boost::multi_index::ordered_unique;
+using boost::multi_index::ordered_non_unique;
+using boost::multi_index::nth_index;
 
 namespace tuples = boost::tuples;
+namespace signals2 = boost::signals2;
 
 #if defined(__APPLE__) && defined(__MACH__)
-#      pragma GCC diagnostic ignored "-Wdeprecated-declarations" 
+#      pragma GCC diagnostic ignored "-Wdeprecated-declarations"
 #endif
 
 #include <openssl/bio.h>
index 42b0551dbe1aeaaf7e701aecf1d94a511f2137a3..b59bbf79ef2d2892163eead5fd81356bc938575a 100644 (file)
@@ -81,7 +81,10 @@ void Logger::Write(LogSeverity severity, const String& facility,
        entry.Facility = facility;
        entry.Message = message;
 
-       Application::GetEQ().Post(boost::bind(&Logger::ForwardLogEntry, entry));
+       {
+               recursive_mutex::scoped_lock lock(Application::GetMutex());
+               ForwardLogEntry(entry);
+       }
 }
 
 /**
@@ -182,4 +185,3 @@ DynamicObject::Ptr ILogger::GetConfig(void) const
 {
        return m_Config->GetSelf();
 }
-
index 692c7754ef0c9518b37306447aaa57f01ae8153a..dc95cf4c39ffaf609692e0e63b4aeb5a6cfb7377 100644 (file)
@@ -29,6 +29,7 @@ using namespace icinga;
  * @returns true if a complete String was read from the IOQueue, false otherwise.
  * @exception invalid_argument The input stream is invalid.
  * @see https://github.com/PeterScott/netString-c/blob/master/netString.c
+ * @threadsafety Always.
  */
 bool NetString::ReadStringFromStream(const Stream::Ptr& stream, String *str)
 {
@@ -110,6 +111,7 @@ bool NetString::ReadStringFromStream(const Stream::Ptr& stream, String *str)
  *
  * @param stream The stream.
  * @param str The String that is to be written.
+ * @threadsafety Always.
  */
 void NetString::WriteStringToStream(const Stream::Ptr& stream, const String& str)
 {
index 10b591b5244ffbfe35126d30726e59a8f0481fa3..aca652d884d56dca36eb0e7b8024e1268f5115fd 100644 (file)
@@ -25,43 +25,13 @@ using namespace icinga;
  * Default constructor for the Object class.
  */
 Object::Object(void)
-{
-#ifdef _DEBUG
-       boost::mutex::scoped_lock lock(*GetMutex());
-       GetAliveObjects()->insert(this);
-#endif /* _DEBUG */
-}
+{ }
 
 /**
  * Destructor for the Object class.
  */
 Object::~Object(void)
-{
-#ifdef _DEBUG
-       boost::mutex::scoped_lock lock(*GetMutex());
-       GetAliveObjects()->erase(this);
-#endif /* _DEBUG */
-}
-
-/**
- * Temporarily holds onto a reference for an object. This can
- * be used to safely clear the last reference to an object
- * in an event handler.
- */
-void Object::Hold(void)
-{
-       boost::mutex::scoped_lock lock(*GetMutex());
-       GetHeldObjects().push_back(GetSelf());
-}
-
-/**
- * Clears all temporarily held objects.
- */
-void Object::ClearHeldObjects(void)
-{
-       boost::mutex::scoped_lock lock(*GetMutex());
-       GetHeldObjects().clear();
-}
+{ }
 
 /**
  * Returns a reference-counted pointer to this object.
@@ -73,91 +43,14 @@ Object::SharedPtrHolder Object::GetSelf(void)
        return Object::SharedPtrHolder(shared_from_this());
 }
 
-#ifdef _DEBUG
 /**
- * Retrieves the number of currently alive objects.
+ * Returns the mutex that must be held while calling non-static methods
+ * which have not been explicitly marked as thread-safe.
  *
- * @returns The number of alive objects.
+ * @returns The object's mutex.
+ * @threadsafety Always.
  */
-int Object::GetAliveObjectsCount(void)
+recursive_mutex& Object::GetMutex(void)
 {
-       boost::mutex::scoped_lock lock(*GetMutex());
-       return GetAliveObjects()->size();
+       return m_Mutex;
 }
-
-/**
- * Dumps a memory histogram to the "dictionaries.dump" file.
- */
-void Object::PrintMemoryProfile(void)
-{
-       map<String, int> types;
-
-       ofstream dictfp("dictionaries.dump.tmp");
-
-       {
-               boost::mutex::scoped_lock lock(*GetMutex());
-               set<Object *>::iterator it;
-               BOOST_FOREACH(Object *obj, *GetAliveObjects()) {
-                       pair<map<String, int>::iterator, bool> tt;
-                       tt = types.insert(make_pair(Utility::GetTypeName(typeid(*obj)), 1));
-                       if (!tt.second)
-                               tt.first->second++;
-
-                       if (typeid(*obj) == typeid(Dictionary)) {
-                               Dictionary::Ptr dict = obj->GetSelf();
-                               dictfp << Value(dict).Serialize() << std::endl;
-                       }
-               }
-       }
-
-#ifdef _WIN32
-       _unlink("dictionaries.dump");
-#endif /* _WIN32 */
-
-       dictfp.close();
-       if (rename("dictionaries.dump.tmp", "dictionaries.dump") < 0)
-               BOOST_THROW_EXCEPTION(PosixException("rename() failed", errno));
-
-       String type;
-       int count;
-       BOOST_FOREACH(tie(type, count), types) {
-               std::cerr << type << ": " << count << std::endl;
-       }
-}
-
-/**
- * Returns currently active objects.
- *
- * @returns currently active objects
- */
-set<Object *> *Object::GetAliveObjects(void)
-{
-       static set<Object *> *aliveObjects = new set<Object *>();
-       return aliveObjects;
-}
-#endif /* _DEBUG */
-
-/**
- * Returns the mutex used for accessing static members.
- *
- * @returns a mutex
- */
-boost::mutex *Object::GetMutex(void)
-{
-       static boost::mutex *mutex = new boost::mutex();
-       return mutex;
-}
-
-/**
- * Returns currently held objects. The caller must be
- * holding the mutex returned by GetMutex().
- *
- * @returns currently held objects
- */
-vector<Object::Ptr>& Object::GetHeldObjects(void)
-{
-       static vector<Object::Ptr> heldObjects;
-       return heldObjects;
-}
-
-
index fa0fb4c9500408208b4171a269f35e42ce80c6fc..c51c05fb622dcca4a1d9137340e81337ec8dc02a 100644 (file)
@@ -31,15 +31,12 @@ class SharedPtrHolder;
  *
  * @ingroup base
  */
-class I2_BASE_API Object : public enable_shared_from_this<Object>, public boost::signals::trackable
+class I2_BASE_API Object : public enable_shared_from_this<Object>
 {
 public:
        typedef shared_ptr<Object> Ptr;
        typedef weak_ptr<Object> WeakPtr;
 
-       void Hold(void);
-       static void ClearHeldObjects(void);
-
        /**
         * Holds a shared pointer and provides support for implicit upcasts.
         *
@@ -96,10 +93,7 @@ public:
 
        SharedPtrHolder GetSelf(void);
 
-#ifdef _DEBUG
-       static int GetAliveObjectsCount(void);
-       static void PrintMemoryProfile(void);
-#endif /* _DEBUG */
+       recursive_mutex& GetMutex(void);
 
 protected:
        Object(void);
@@ -109,9 +103,24 @@ private:
        Object(const Object& other);
        Object& operator=(const Object& rhs);
 
-       static boost::mutex *GetMutex(void);
-       static set<Object *> *GetAliveObjects(void);
-       static vector<Object::Ptr>& GetHeldObjects(void);
+       recursive_mutex m_Mutex;
+};
+
+/**
+ * A scoped lock for Objects.
+ */
+struct ObjectLock {
+public:
+       ObjectLock(const Object::Ptr& object)
+               : m_Lock(object->GetMutex())
+       { }
+
+       ObjectLock(Object *object)
+               : m_Lock(object->GetMutex())
+       { }
+
+private:
+       recursive_mutex::scoped_lock m_Lock;
 };
 
 /**
index d3306e7ee63f9b8b49a0445c8cb412993e4bd139..2331e37c970b85273d4377b194daaccd9f0387ac 100644 (file)
@@ -26,7 +26,7 @@ using namespace icinga;
 int Process::m_TaskFd;
 extern char **environ;
 
-void Process::CreateWorkers(void)
+void Process::Initialize(void)
 {
        int fds[2];
 
index d6db2df03b587af0dd0620a723d1cc15f3f69aca..3dace62fe3ae1aeb22dd782ddc066a1fe5ed21a2 100644 (file)
@@ -22,7 +22,7 @@
 
 using namespace icinga;
 
-void Process::CreateWorkers(void)
+void Process::Initialize(void)
 {
        // TODO: implement
 }
index 229a4d7d5654388c9df207aec8fb0534bdbdbae5..d959c0102651d2324a5833e8c7bedd1b3ab902f5 100644 (file)
 
 using namespace icinga;
 
-bool Process::m_WorkersCreated = false;
+boost::once_flag Process::m_ThreadOnce;
 boost::mutex Process::m_Mutex;
 deque<Process::Ptr> Process::m_Tasks;
 
 Process::Process(const vector<String>& arguments, const Dictionary::Ptr& extraEnvironment)
        : AsyncTask<Process, ProcessResult>(), m_Arguments(arguments), m_ExtraEnvironment(extraEnvironment)
 {
-       assert(Application::IsMainThread());
-
-       if (!m_WorkersCreated) {
-               CreateWorkers();
-
-               m_WorkersCreated = true;
-       }
+       boost::call_once(&Process::Initialize, m_ThreadOnce);
 
 #ifndef _WIN32
        m_FD = -1;
index 2a14c16fca600f1156eb31f7c7be6a1f5db26735..618acfaa42437819ac659f29ef26c41ef3e99e80 100644 (file)
@@ -54,8 +54,6 @@ public:
 
        static vector<String> SplitCommand(const Value& command);
 private:
-       static bool m_WorkersCreated;
-
        vector<String> m_Arguments;
        Dictionary::Ptr m_ExtraEnvironment;
 
@@ -76,7 +74,6 @@ private:
        static int m_TaskFd;
 #endif /* _WIN32 */
 
-       static void CreateWorkers(void);
        static void NotifyWorker(void);
 
        void SpawnTask(void);
@@ -89,6 +86,9 @@ private:
 
        void InitTask(void);
        bool RunTask(void);
+
+       static boost::once_flag m_ThreadOnce;
+       static void Initialize(void);
 };
 
 }
index abf1a6c977d3cfa5787a93c179ded84fd8b2b11e..0bcc5ca77380f50c63f74b60e7883d2652a1b067 100644 (file)
@@ -32,12 +32,6 @@ Script::Script(const Dictionary::Ptr& properties)
        : DynamicObject(properties)
 { }
 
-Script::~Script(void)
-{
-       if (m_Interpreter)
-               m_Interpreter->Stop();
-}
-
 void Script::OnInitCompleted(void)
 {
        SpawnInterpreter();
@@ -63,10 +57,6 @@ void Script::SpawnInterpreter(void)
 {
        Logger::Write(LogInformation, "base", "Reloading script '" + GetName() + "'");
 
-       if (m_Interpreter)
-               m_Interpreter->Stop();
-
        ScriptLanguage::Ptr language = ScriptLanguage::GetByName(GetLanguage());
        m_Interpreter = language->CreateInterpreter(GetSelf());
-       m_Interpreter->Start();
 }
index 3cae28e3a4011e1aa3e763595b73da0c4a0243d2..daaf288e1059fe336bf219d2be3a63d1b5f36e7a 100644 (file)
@@ -37,7 +37,6 @@ public:
        typedef weak_ptr<Script> WeakPtr;
 
        Script(const Dictionary::Ptr& properties);
-       ~Script(void);
 
        String GetLanguage(void) const;
        String GetCode(void) const;
index 3a8f5fafa4aa042a77475b3c4c4dffc595e61cc5..06ac2429466771ea10dcc919848e84404d0482c7 100644 (file)
@@ -21,8 +21,8 @@
 
 using namespace icinga;
 
-boost::signal<void (const String&, const ScriptFunction::Ptr&)> ScriptFunction::OnRegistered;
-boost::signal<void (const String&)> ScriptFunction::OnUnregistered;
+signals2::signal<void (const String&, const ScriptFunction::Ptr&)> ScriptFunction::OnRegistered;
+signals2::signal<void (const String&)> ScriptFunction::OnUnregistered;
 
 ScriptFunction::ScriptFunction(const Callback& function)
        : m_Callback(function)
@@ -31,13 +31,13 @@ ScriptFunction::ScriptFunction(const Callback& function)
 void ScriptFunction::Register(const String& name, const ScriptFunction::Ptr& function)
 {
        GetFunctions()[name] = function;
-       Application::GetEQ().Post(boost::bind(boost::ref(OnRegistered), name, function));
+       OnRegistered(name, function);
 }
 
 void ScriptFunction::Unregister(const String& name)
 {
        GetFunctions().erase(name);
-       Application::GetEQ().Post(boost::bind(boost::ref(OnUnregistered), name));
+       OnUnregistered(name);
 }
 
 ScriptFunction::Ptr ScriptFunction::GetByName(const String& name)
index 4597acde6bfff443c2a06ac71d9bc5e0bf569e6e..babb62f4acd3bbaf2dca8e7b23531c89404f3450 100644 (file)
@@ -46,10 +46,10 @@ public:
 
        void Invoke(const shared_ptr<ScriptTask>& task, const vector<Value>& arguments);
 
-       static map<String, ScriptFunction::Ptr>& GetFunctions(void);
+       /* TODO(thread) make private */ static map<String, ScriptFunction::Ptr>& GetFunctions(void);
 
-       static boost::signal<void (const String&, const ScriptFunction::Ptr&)> OnRegistered;
-       static boost::signal<void (const String&)> OnUnregistered;
+       static signals2::signal<void (const String&, const ScriptFunction::Ptr&)> OnRegistered;
+       static signals2::signal<void (const String&)> OnUnregistered;
 
 private:
        Callback m_Callback;
index fcfd2ac15098933222e1c92ac93f0b4832e52e12..df1ce9c35b955e628ad8e4eaa2cf3d538829c045 100644 (file)
@@ -26,55 +26,16 @@ ScriptInterpreter::ScriptInterpreter(const Script::Ptr& script)
 
 ScriptInterpreter::~ScriptInterpreter(void)
 {
-       Stop();
-}
-
-void ScriptInterpreter::Start(void)
-{
-       /* We can't start the thread in the constructor because
-        * the worker thread might end up calling one of the virtual
-        * methods before the object is fully constructed. */
-
-       m_Thread = boost::thread(&ScriptInterpreter::ThreadWorkerProc, this);
-}
-
-void ScriptInterpreter::Stop(void)
-{
-       assert(Application::IsMainThread());
-
-       m_EQ.Stop();
-
        BOOST_FOREACH(const String& function, m_SubscribedFunctions) {
                ScriptFunction::Unregister(function);
        }
-
-       m_Thread.join();
-}
-
-void ScriptInterpreter::ThreadWorkerProc(void)
-{
-       m_EQ.SetOwner(boost::this_thread::get_id());
-
-       {
-               boost::mutex::scoped_lock lock(m_Mutex);
-
-               while (m_EQ.ProcessEvents(m_Mutex))
-                       ; /* empty loop */
-       }
-}
-
-void ScriptInterpreter::ScriptFunctionThunk(const ScriptTask::Ptr& task,
-    const String& function, const vector<Value>& arguments)
-{
-       m_EQ.Post(boost::bind(&ScriptInterpreter::ProcessCall, this,
-           task, function, arguments));
 }
 
 void ScriptInterpreter::SubscribeFunction(const String& name)
 {
        m_SubscribedFunctions.insert(name);
 
-       ScriptFunction::Ptr sf = boost::make_shared<ScriptFunction>(boost::bind(&ScriptInterpreter::ScriptFunctionThunk, this, _1, name, _2));
+       ScriptFunction::Ptr sf = boost::make_shared<ScriptFunction>(boost::bind(&ScriptInterpreter::ProcessCall, this, _1, name, _2));
        ScriptFunction::Register(name, sf);
 }
 
index 0ac0cdb58fa2df5c58b7d082a7687785ae33ba51..6ff25f18c54d8fb03b4ace4a1e61dd47e8462f92 100644 (file)
@@ -36,9 +36,6 @@ public:
 
        ~ScriptInterpreter(void);
 
-       void Start(void);
-       void Stop(void);
-
 protected:
        ScriptInterpreter(const Script::Ptr& script);
 
@@ -49,15 +46,7 @@ protected:
        void UnsubscribeFunction(const String& name);
 
 private:
-       boost::mutex m_Mutex;
-       EventQueue m_EQ;
        set<String> m_SubscribedFunctions;
-       boost::thread m_Thread;
-
-       void ThreadWorkerProc(void);
-
-       void ScriptFunctionThunk(const ScriptTask::Ptr& task, const String& function,
-           const vector<Value>& arguments);
 };
 
 }
index 9c131807b8d0361bee49c288c63df99aafa5ba03..7fb44de309dfc726e71a042a44670f5e83810325 100644 (file)
@@ -24,18 +24,33 @@ using namespace icinga;
 ScriptLanguage::ScriptLanguage(void)
 { }
 
+/**
+ * @threadsafety Always.
+ */
 void ScriptLanguage::Register(const String& name, const ScriptLanguage::Ptr& language)
 {
+       boost::mutex::scoped_lock lock(GetMutex());
+
        GetLanguages()[name] = language;
 }
 
+/**
+ * @threadsafety Always.
+ */
 void ScriptLanguage::Unregister(const String& name)
 {
+       boost::mutex::scoped_lock lock(GetMutex());
+
        GetLanguages().erase(name);
 }
 
+/**
+ * @threadsafety Always.
+ */
 ScriptLanguage::Ptr ScriptLanguage::GetByName(const String& name)
 {
+       boost::mutex::scoped_lock lock(GetMutex());
+
        map<String, ScriptLanguage::Ptr>::iterator it;
 
        it = GetLanguages().find(name);
@@ -46,6 +61,12 @@ ScriptLanguage::Ptr ScriptLanguage::GetByName(const String& name)
        return it->second;
 }
 
+boost::mutex& ScriptLanguage::GetMutex(void)
+{
+       static boost::mutex mutex;
+       return mutex;
+}
+
 map<String, ScriptLanguage::Ptr>& ScriptLanguage::GetLanguages(void)
 {
        static map<String, ScriptLanguage::Ptr> languages;
index 4ab9ec9124537b7971fb936facd70fc74ab8d825..4c3c903ca2c1086c7f2323357d89b320c8886209 100644 (file)
@@ -47,6 +47,7 @@ protected:
        ScriptLanguage(void);
 
 private:
+       static boost::mutex& GetMutex(void);
        static map<String, ScriptLanguage::Ptr>& GetLanguages(void);
 };
 
index f0490577e19d5a47d7a0a8eb6879c22e7a99aaa7..d860bf8cffaec11ce2d8a5b4e9af66697c267fe5 100644 (file)
@@ -532,7 +532,7 @@ void Socket::HandleReadableClient(void)
        }
 
        if (new_data)
-               Application::GetEQ().Post(boost::bind(boost::ref(OnDataAvailable), GetSelf()));
+               OnDataAvailable(GetSelf());
 }
 
 void Socket::HandleWritableServer(void)
@@ -557,7 +557,7 @@ void Socket::HandleReadableServer(void)
 
        TcpSocket::Ptr client = boost::make_shared<TcpSocket>();
        client->SetFD(fd);
-       Application::GetEQ().Post(boost::bind(boost::ref(OnNewClient), GetSelf(), client));
+       OnNewClient(GetSelf(), client);
 }
 
 /**
index 7d495987ebd4c8bdb675a3945db2ce23ea83bbf1..4d586024399bec301e3c59493a99f5eb29249ad9 100644 (file)
@@ -51,7 +51,7 @@ public:
 
        void Listen(void);
 
-       boost::signal<void (const Socket::Ptr&, const Socket::Ptr&)> OnNewClient;
+       signals2::signal<void (const Socket::Ptr&, const Socket::Ptr&)> OnNewClient;
 
 protected:
        Socket(void);
index 48d0d5b1c3abf8f3a7a4d7457288dddded8c4a16..b6e7a03a0245da9875bd69e84a446c066e536f69 100644 (file)
@@ -40,9 +40,9 @@ void Stream::SetConnected(bool connected)
        m_Connected = connected;
 
        if (m_Connected)
-               Application::GetEQ().Post(boost::bind(boost::ref(OnConnected), GetSelf()));
+               OnConnected(GetSelf());
        else
-               Application::GetEQ().Post(boost::bind(boost::ref(OnClosed), GetSelf()));
+               OnClosed(GetSelf());
 }
 
 /**
index cdb01f3144e12c1410b9ef1cfa73f74ceb922401..1e868c9016f49317caa9d5bc7f961843a21ee7a8 100644 (file)
@@ -85,9 +85,9 @@ public:
        boost::exception_ptr GetException(void);
        void CheckException(void);
 
-       boost::signal<void (const Stream::Ptr&)> OnConnected;
-       boost::signal<void (const Stream::Ptr&)> OnDataAvailable;
-       boost::signal<void (const Stream::Ptr&)> OnClosed;
+       signals2::signal<void (const Stream::Ptr&)> OnConnected;
+       signals2::signal<void (const Stream::Ptr&)> OnDataAvailable;
+       signals2::signal<void (const Stream::Ptr&)> OnClosed;
 
 protected:
        void SetConnected(bool connected);
index e023404887812058dd206f6b914d01a7b41e7781..963e25209eab38765e02da8424a1b64882e7b045 100644 (file)
 
 using namespace icinga;
 
-Timer::CollectionType Timer::m_Timers;
+Timer::TimerSet Timer::m_Timers;
+boost::mutex Timer::m_Mutex;
+boost::condition_variable Timer::m_CV;
+boost::once_flag Timer::m_ThreadOnce = BOOST_ONCE_INIT;
+
+/**
+ * Extracts the next timestamp from a Timer.
+ *
+ * @param wtimer Weak pointer to the timer.
+ * @returns The next timestamp
+ * @threadsafety Caller must hold Timer::m_Mutex.
+ */
+double TimerNextExtractor::operator()(const Timer::WeakPtr& wtimer)
+{
+       Timer::Ptr timer = wtimer.lock();
+
+       if (!timer)
+               return 0;
+
+       return timer->m_Next;
+}
 
 /**
  * Constructor for the Timer class.
+ *
+ * @threadsafety Always.
  */
 Timer::Timer(void)
-       : m_Interval(0)
+       : m_Interval(0), m_Next(0)
 { }
 
 /**
- * Calls expired timers and returned when the next wake-up should happen.
+ * Initializes the timer sub-system.
  *
- * @returns Time when the next timer is due.
+ * @threadsafety Always.
  */
-double Timer::ProcessTimers(void)
+void Timer::Initialize(void)
 {
-       double wakeup = 30; /* wake up at least once after this many seconds */
-
-       double st = Utility::GetTime();
-
-       Timer::CollectionType::iterator prev, i;
-       for (i = m_Timers.begin(); i != m_Timers.end(); ) {
-               Timer::Ptr timer = i->lock();
-
-               prev = i;
-               i++;
-
-               if (!timer) {
-                       m_Timers.erase(prev);
-                       continue;
-               }
-
-               double now = Utility::GetTime();
-
-               if (timer->m_Next <= now) {
-                       timer->Call();
-
-                       /* time may have changed depending on how long the
-                        * timer call took - we need to fetch the current time */
-                       now = Utility::GetTime();
-
-                       double next = now + timer->GetInterval();
-
-                       if (timer->m_Next <= now || next < timer->m_Next)
-                               timer->Reschedule(next);
-               }
-
-               assert(timer->m_Next > now);
-
-               if (timer->m_Next - now < wakeup)
-                       wakeup = timer->m_Next - now;
-       }
-
-       assert(wakeup > 0);
-
-       double et = Utility::GetTime();
-
-       if (et - st > 0.01) {
-               stringstream msgbuf;
-               msgbuf << "Timers took " << et - st << " seconds";
-               Logger::Write(LogDebug, "base", msgbuf.str());
-       }
-
-       return wakeup;
+       thread worker(boost::bind(&Timer::TimerThreadProc));
+       worker.detach();
 }
 
 /**
- * Calls this timer. Note: the timer delegate must not call
- * Disable() on any other timers than the timer that originally
- * invoked the delegate.
+ * Calls this timer.
+ *
+ * @threadsafety Always.
  */
 void Timer::Call(void)
 {
@@ -105,15 +81,19 @@ void Timer::Call(void)
                msgbuf << "Timer call took " << et - st << " seconds.";
                Logger::Write(LogWarning, "base", msgbuf.str());
        }
+
+       Reschedule();
 }
 
 /**
  * Sets the interval for this timer.
  *
  * @param interval The new interval.
+ * @threadsafety Always.
  */
 void Timer::SetInterval(double interval)
 {
+       boost::mutex::scoped_lock lock(m_Mutex);
        m_Interval = interval;
 }
 
@@ -121,44 +101,81 @@ void Timer::SetInterval(double interval)
  * Retrieves the interval for this timer.
  *
  * @returns The interval.
+ * @threadsafety Always.
  */
 double Timer::GetInterval(void) const
 {
+       boost::mutex::scoped_lock lock(m_Mutex);
        return m_Interval;
 }
 
 /**
  * Registers the timer and starts processing events for it.
+ *
+ * @threadsafety Always.
  */
 void Timer::Start(void)
 {
-       assert(Application::IsMainThread());
-
-       Stop();
-
-       m_Timers.push_back(GetSelf());
+       boost::call_once(&Timer::Initialize, m_ThreadOnce);
 
-       Reschedule(Utility::GetTime() + m_Interval);
+       Reschedule();
 }
 
 /**
  * Unregisters the timer and stops processing events for it.
+ *
+ * @threadsafety Always.
  */
 void Timer::Stop(void)
 {
-       assert(Application::IsMainThread());
+       boost::mutex::scoped_lock lock(m_Mutex);
+       m_Timers.erase(GetSelf());
 
-       m_Timers.remove_if(WeakPtrEqual<Timer>(this));
+       /* Notify the worker thread that we've disabled a timer. */
+       m_CV.notify_all();
 }
 
 /**
  * Reschedules this timer.
  *
- * @param next The time when this timer should be called again.
+ * @param next The time when this timer should be called again. Use -1 to let
+ *            the timer figure out a suitable time based on the interval.
+ * @threadsafety Always.
  */
 void Timer::Reschedule(double next)
 {
+       boost::mutex::scoped_lock lock(m_Mutex);
+
+       if (next < 0) {
+               double now = Utility::GetTime();
+               next = m_Next + m_Interval;
+
+               if (next < now)
+                       next = now + m_Interval;
+               else
+                       next = next;
+       }
+
        m_Next = next;
+
+       /* Remove and re-add the timer to update the index. */
+       m_Timers.erase(GetSelf());
+       m_Timers.insert(GetSelf());
+
+       /* Notify the worker that we've rescheduled a timer. */
+       m_CV.notify_all();
+}
+
+/**
+ * Retrieves when the timer is next due.
+ *
+ * @returns The timestamp.
+ * @threadsafety Always.
+ */
+double Timer::GetNext(void) const
+{
+       boost::mutex::scoped_lock lock(m_Mutex);
+       return m_Next;
 }
 
 /**
@@ -166,17 +183,76 @@ void Timer::Reschedule(double next)
  * next scheduled timestamp.
  *
  * @param adjustment The adjustment.
+ * @threadsafety Always.
  */
 void Timer::AdjustTimers(double adjustment)
 {
+       boost::mutex::scoped_lock lock(m_Mutex);
+
        double now = Utility::GetTime();
 
-       Timer::CollectionType::iterator i;
-        for (i = m_Timers.begin(); i != m_Timers.end(); i++) {
-               Timer::Ptr timer = i->lock();
+       typedef nth_index<TimerSet, 1>::type TimerView;
+       TimerView& idx = boost::get<1>(m_Timers);
+
+       TimerView::iterator it;
+       for (it = idx.begin(); it != idx.end(); it++) {
+               Timer::Ptr timer = it->lock();
 
                if (abs(now - (timer->m_Next + adjustment)) <
-                   abs(now - timer->m_Next))
+                   abs(now - timer->m_Next)) {
                        timer->m_Next += adjustment;
+                       m_Timers.erase(timer);
+                       m_Timers.insert(timer);
+                   }
+       }
+
+       /* Notify the worker that we've rescheduled some timers. */
+       m_CV.notify_all();
+}
+
+/**
+ * Worker thread proc for Timer objects.
+ *
+ * @threadsafety Always.
+ */
+void Timer::TimerThreadProc(void)
+{
+       for (;;) {
+               boost::mutex::scoped_lock lock(m_Mutex);
+
+               typedef nth_index<TimerSet, 1>::type NextTimerView;
+               NextTimerView& idx = boost::get<1>(m_Timers);
+
+               /* Wait until there is at least one timer. */
+               while (idx.empty())
+                       m_CV.wait(lock);
+
+               NextTimerView::iterator it = idx.begin();
+               Timer::Ptr timer = it->lock();
+
+               if (!timer) {
+                       /* Remove the timer from the list if it's not alive anymore. */
+                       idx.erase(it);
+                       continue;
+               }
+
+               double wait = timer->m_Next - Utility::GetTime();
+
+               if (wait > 0) {
+                       /* Make sure the timer we just examined can be destroyed while we're waiting. */
+                       timer.reset();
+
+                       /* Wait for the next timer. */
+                       m_CV.timed_wait(lock, boost::posix_time::milliseconds(wait * 1000));
+
+                       continue;
+               }
+
+               /* Remove the timer from the list so it doesn't get called again
+                * until the current call is completed. */
+               m_Timers.erase(timer);
+
+               /* Asynchronously call the timer. */
+               Application::GetEQ().Post(boost::bind(&Timer::Call, timer));
        }
 }
index ab32c624cb8cdd84e81294f2f90641b9922d33ba..87f55455270e5d21add9c4e9e4e43a38fc2e0333 100644 (file)
 
 namespace icinga {
 
+class Timer;
+
+/**
+ * @ingroup base
+ */
+struct TimerNextExtractor
+{
+       typedef double result_type;
+
+       double operator()(const weak_ptr<Timer>& wtimer);
+};
+
 /**
  * A timer that periodically triggers an event.
  *
@@ -42,23 +54,40 @@ public:
        void SetInterval(double interval);
        double GetInterval(void) const;
 
-       static double ProcessTimers(void);
        static void AdjustTimers(double adjustment);
 
        void Start(void);
        void Stop(void);
 
-       void Reschedule(double next);
+       void Reschedule(double next = -1);
+       double GetNext(void) const;
 
-       boost::signal<void(const Timer::Ptr&)> OnTimerExpired;
+       signals2::signal<void(const Timer::Ptr&)> OnTimerExpired;
 
 private:
        double m_Interval; /**< The interval of the timer. */
        double m_Next; /**< When the next event should happen. */
 
-       static Timer::CollectionType m_Timers;
+       typedef multi_index_container<
+               Timer::WeakPtr,
+               indexed_by<
+                       ordered_unique<identity<Timer::WeakPtr> >,
+                       ordered_non_unique<TimerNextExtractor>
+               >
+       > TimerSet;
+
+       static boost::mutex m_Mutex;
+       static boost::condition_variable m_CV;
+       static TimerSet m_Timers;
 
        void Call(void);
+
+       static boost::once_flag m_ThreadOnce;
+       static void Initialize(void);
+
+       static void TimerThreadProc(void);
+
+       friend struct TimerNextExtractor;
 };
 
 }
index 0b0a4fa50c705bcbc14b0d27ff36b6a887c20a78..2434392c42449e794712b29711f2e9ce47de1669 100644 (file)
@@ -398,17 +398,11 @@ pid_t Utility::GetPid(void)
  */
 void Utility::Sleep(double timeout)
 {
-       if (Application::IsMainThread())
-               Application::GetMutex().unlock();
-
 #ifndef _WIN32
        usleep(timeout * 1000 * 1000);
 #else /* _WIN32 */
        ::Sleep(timeout * 1000);
 #endif /* _WIN32 */
-
-       if (Application::IsMainThread())
-               Application::GetMutex().lock();
 }
 
 /**
@@ -521,17 +515,6 @@ bool Utility::Glob(const String& pathSpec, const function<void (const String&)>&
 #endif /* _WIN32 */
 }
 
-/**
- * Waits until the given predicate is true. Executes events while waiting.
- *
- * @param predicate The predicate.
- */
-void Utility::WaitUntil(const function<bool (void)>& predicate)
-{
-       while (!predicate())
-               Application::ProcessEvents();
-}
-
 #ifndef _WIN32
 void Utility::SetNonBlocking(int fd)
 {
index fabd99fce041768c4f551173a5dbd7002bfcf703..9d0a1f198c7fcdfbf6ced669704688845847ede1 100644 (file)
@@ -58,8 +58,6 @@ public:
 
        static bool Glob(const String& pathSpec, const function<void (const String&)>& callback);
 
-       static void WaitUntil(const function<bool (void)>& predicate);
-
        static
 #ifdef _WIN32
        HMODULE
index 2487a8e27696f02e23b5056614a7bb79031c911a..f5174507b2f80d13c7bdaed5b2ef17a213f95172 100644 (file)
 
 using namespace icinga;
 
+boost::mutex ConfigItem::m_Mutex;
 ConfigItem::ItemMap ConfigItem::m_Items;
-boost::signal<void (const ConfigItem::Ptr&)> ConfigItem::OnCommitted;
-boost::signal<void (const ConfigItem::Ptr&)> ConfigItem::OnRemoved;
+signals2::signal<void (const ConfigItem::Ptr&)> ConfigItem::OnCommitted;
+signals2::signal<void (const ConfigItem::Ptr&)> ConfigItem::OnRemoved;
 
 /**
  * Constructor for the ConfigItem class.
@@ -295,29 +296,38 @@ DynamicObject::Ptr ConfigItem::GetDynamicObject(void) const
  * @param type The type of the ConfigItem that is to be looked up.
  * @param name The name of the ConfigItem that is to be looked up.
  * @returns The configuration item.
+ * @threadsafety Always.
  */
 ConfigItem::Ptr ConfigItem::GetObject(const String& type, const String& name)
 {
-       ConfigItem::ItemMap::iterator it;
+       {
+               recursive_mutex::scoped_lock lockg(Application::GetMutex());
 
-       ConfigCompilerContext *context = ConfigCompilerContext::GetContext();
+               ConfigCompilerContext *context = ConfigCompilerContext::GetContext();
 
-       if (context) {
-               ConfigItem::Ptr item = context->GetItem(type, name);
+               if (context) {
+                       ConfigItem::Ptr item = context->GetItem(type, name);
 
-               if (item)
-                       return item;
+                       if (item)
+                               return item;
 
-               /* ignore already active objects while we're in the compiler
-                * context and linking to existing items is disabled. */
-               if ((context->GetFlags() & CompilerLinkExisting) == 0)
-                       return ConfigItem::Ptr();
+                       /* ignore already active objects while we're in the compiler
+                        * context and linking to existing items is disabled. */
+                       if ((context->GetFlags() & CompilerLinkExisting) == 0)
+                               return ConfigItem::Ptr();
+               }
        }
 
-       it = m_Items.find(make_pair(type, name));
+       {
+               boost::mutex::scoped_lock lock(m_Mutex);
 
-       if (it != m_Items.end())
-               return it->second;
+               ConfigItem::ItemMap::iterator it;
+
+               it = m_Items.find(make_pair(type, name));
+
+               if (it != m_Items.end())
+                       return it->second;
+       }
 
        return ConfigItem::Ptr();
 }
@@ -351,8 +361,13 @@ void ConfigItem::Dump(ostream& fp) const
        fp << "}" << "\n";
 }
 
+/**
+ * @threadsafety Caller must hold the global mutex.
+ */
 void ConfigItem::UnloadUnit(const String& unit)
 {
+       boost::mutex::scoped_lock lock(m_Mutex);
+
        Logger::Write(LogInformation, "config", "Unloading config items from compilation unit '" + unit + "'");
 
        vector<ConfigItem::Ptr> obsoleteItems;
index 14ea0965cde28b9ff7b3f954d06dab47546c36cd..77912962f768ee013c6cf3dfe09dd1fe09247810 100644 (file)
@@ -62,8 +62,8 @@ public:
 
        static void UnloadUnit(const String& unit);
 
-       static boost::signal<void (const ConfigItem::Ptr&)> OnCommitted;
-       static boost::signal<void (const ConfigItem::Ptr&)> OnRemoved;
+       static signals2::signal<void (const ConfigItem::Ptr&)> OnCommitted;
+       static signals2::signal<void (const ConfigItem::Ptr&)> OnRemoved;
 
 private:
        void InternalLink(const Dictionary::Ptr& dictionary) const;
@@ -89,6 +89,8 @@ private:
         set<ConfigItem::WeakPtr> m_ChildObjects; /**< Instantiated items
                                                      * that inherit from this item */
 
+       static boost::mutex m_Mutex;
+
        typedef map<pair<String, String>, ConfigItem::Ptr> ItemMap;
        static ItemMap m_Items; /**< All registered configuration items. */
 };
index 9fba1d09de638f05d6222045e8252a4ae5639584..0f6abe805235d845bb066278060bfa7ea13bb15d 100644 (file)
@@ -78,6 +78,9 @@ void ConfigType::ValidateItem(const ConfigItem::Ptr& item) const
        ValidateDictionary(attrs, ruleLists, locations);
 }
 
+/**
+ * @threadsafety Always.
+ */
 String ConfigType::LocationToString(const vector<String>& locations)
 {
        bool first = true;
@@ -94,6 +97,9 @@ String ConfigType::LocationToString(const vector<String>& locations)
        return stack;
 }
 
+/**
+ * @threadsafety Always.
+ */
 void ConfigType::ValidateDictionary(const Dictionary::Ptr& dictionary,
     const vector<TypeRuleList::Ptr>& ruleLists, vector<String>& locations)
 {
@@ -168,4 +174,3 @@ void ConfigType::ValidateDictionary(const Dictionary::Ptr& dictionary,
                locations.pop_back();
        }
 }
-
index 90e6a48c799d36682348c821e0451a903523b193..8b48f40b253ac914a26a188ac01cd4335e6714f7 100644 (file)
@@ -160,6 +160,9 @@ void Expression::DumpValue(ostream& fp, int indent, const Value& value, bool inl
        BOOST_THROW_EXCEPTION(runtime_error("Encountered unknown type while dumping value."));
 }
 
+/**
+ * @threadsafety Always.
+ */
 void Expression::Dump(ostream& fp, int indent) const
 {
        if (m_Operator == OperatorExecute) {
index 7938094a9b6e342c3daf91f81dc0bef6d71f6ecd..4317969064f8830cd654c8763902ea8a565730bc 100644 (file)
@@ -23,6 +23,9 @@ using namespace icinga;
 
 REGISTER_SCRIPTFUNCTION("GetAnswerToEverything", &API::GetAnswerToEverything);
 
+/**
+ * @threadsafety Always.
+ */
 void API::GetAnswerToEverything(const ScriptTask::Ptr& task, const vector<Value>& arguments)
 {
        if (arguments.size() < 1)
index 6502786f6749c5e1978e7e36af2f519c7760f553..3d1f94a5c97fd9806b112c5caf52e20896ea3d8a 100644 (file)
 
 using namespace icinga;
 
+boost::mutex CIB::m_Mutex;
 RingBuffer CIB::m_ActiveChecksStatistics(15 * 60);
 RingBuffer CIB::m_PassiveChecksStatistics(15 * 60);
 
+/**
+ * @threadsafety Always.
+ */
 void CIB::UpdateActiveChecksStatistics(long tv, int num)
 {
+       boost::mutex::scoped_lock lock(m_Mutex);
        m_ActiveChecksStatistics.InsertValue(tv, num);
 }
 
+/**
+ * @threadsafety Always.
+ */
 int CIB::GetActiveChecksStatistics(long timespan)
 {
+       boost::mutex::scoped_lock lock(m_Mutex);
        return m_ActiveChecksStatistics.GetValues(timespan);
 }
 
+/**
+ * @threadsafety Always.
+ */
 void CIB::UpdatePassiveChecksStatistics(long tv, int num)
 {
+       boost::mutex::scoped_lock lock(m_Mutex);
        m_PassiveChecksStatistics.InsertValue(tv, num);
 }
 
+/**
+ * @threadsafety Always.
+ */
 int CIB::GetPassiveChecksStatistics(long timespan)
 {
+       boost::mutex::scoped_lock lock(m_Mutex);
        return m_PassiveChecksStatistics.GetValues(timespan);
 }
index a9eaa907cd566a23eba1ad3450434cafafcaa085..e7d5903dc3201902b3ef0ee8234ea60da53654db 100644 (file)
@@ -39,6 +39,7 @@ public:
        static int GetPassiveChecksStatistics(long timespan);
 
 private:
+       static boost::mutex m_Mutex;
        static RingBuffer m_ActiveChecksStatistics;
        static RingBuffer m_PassiveChecksStatistics;
 };
index 69c4a0b91606f90be9721efd346d9724f76df043..e8cc80a0b3f1716fe145e42b8c31d0222f15bb41 100644 (file)
 
 using namespace icinga;
 
-bool I2_EXPORT ExternalCommandProcessor::m_Initialized;
-map<String, ExternalCommandProcessor::Callback> I2_EXPORT ExternalCommandProcessor::m_Commands;
+boost::once_flag ExternalCommandProcessor::m_InitializeOnce;
+boost::mutex ExternalCommandProcessor::m_Mutex;
+map<String, ExternalCommandProcessor::Callback> ExternalCommandProcessor::m_Commands;
 
+/**
+ * @threadsafety Always.
+ */
 void ExternalCommandProcessor::Execute(const String& line)
 {
        if (line.IsEmpty())
@@ -54,68 +58,91 @@ void ExternalCommandProcessor::Execute(const String& line)
        Execute(ts, argv[0], argvExtra);
 }
 
+/**
+ * @threadsafety Always.
+ */
 void ExternalCommandProcessor::Execute(double time, const String& command, const vector<String>& arguments)
 {
-       if (!m_Initialized) {
-               RegisterCommand("PROCESS_SERVICE_CHECK_RESULT", &ExternalCommandProcessor::ProcessServiceCheckResult);
-               RegisterCommand("SCHEDULE_SVC_CHECK", &ExternalCommandProcessor::ScheduleSvcCheck);
-               RegisterCommand("SCHEDULE_FORCED_SVC_CHECK", &ExternalCommandProcessor::ScheduleForcedSvcCheck);
-               RegisterCommand("ENABLE_SVC_CHECK", &ExternalCommandProcessor::EnableSvcCheck);
-               RegisterCommand("DISABLE_SVC_CHECK", &ExternalCommandProcessor::DisableSvcCheck);
-               RegisterCommand("SHUTDOWN_PROCESS", &ExternalCommandProcessor::ShutdownProcess);
-               RegisterCommand("SCHEDULE_FORCED_HOST_SVC_CHECKS", &ExternalCommandProcessor::ScheduleForcedHostSvcChecks);
-               RegisterCommand("SCHEDULE_HOST_SVC_CHECKS", &ExternalCommandProcessor::ScheduleHostSvcChecks);
-               RegisterCommand("ENABLE_HOST_SVC_CHECKS", &ExternalCommandProcessor::EnableHostSvcChecks);
-               RegisterCommand("DISABLE_HOST_SVC_CHECKS", &ExternalCommandProcessor::DisableHostSvcChecks);
-               RegisterCommand("ACKNOWLEDGE_SVC_PROBLEM", &ExternalCommandProcessor::AcknowledgeSvcProblem);
-               RegisterCommand("ACKNOWLEDGE_SVC_PROBLEM_EXPIRE", &ExternalCommandProcessor::AcknowledgeSvcProblemExpire);
-               RegisterCommand("REMOVE_SVC_ACKNOWLEDGEMENT", &ExternalCommandProcessor::RemoveHostAcknowledgement);
-               RegisterCommand("ACKNOWLEDGE_HOST_PROBLEM", &ExternalCommandProcessor::AcknowledgeHostProblem);
-               RegisterCommand("ACKNOWLEDGE_HOST_PROBLEM_EXPIRE", &ExternalCommandProcessor::AcknowledgeHostProblemExpire);
-               RegisterCommand("REMOVE_HOST_ACKNOWLEDGEMENT", &ExternalCommandProcessor::RemoveHostAcknowledgement);
-               RegisterCommand("ENABLE_HOSTGROUP_SVC_CHECKS", &ExternalCommandProcessor::EnableHostgroupSvcChecks);
-               RegisterCommand("DISABLE_HOSTGROUP_SVC_CHECKS", &ExternalCommandProcessor::DisableHostgroupSvcChecks);
-               RegisterCommand("ENABLE_SERVICEGROUP_SVC_CHECKS", &ExternalCommandProcessor::EnableServicegroupSvcChecks);
-               RegisterCommand("DISABLE_SERVICEGROUP_SVC_CHECKS", &ExternalCommandProcessor::DisableServicegroupSvcChecks);
-               RegisterCommand("ENABLE_PASSIVE_SVC_CHECKS", &ExternalCommandProcessor::EnablePassiveSvcChecks);
-               RegisterCommand("DISABLE_PASSIVE_SVC_CHECKS", &ExternalCommandProcessor::DisablePassiveSvcChecks);
-               RegisterCommand("ENABLE_SERVICEGROUP_PASSIVE_SVC_CHECKS", &ExternalCommandProcessor::EnableServicegroupPassiveSvcChecks);
-               RegisterCommand("DISABLE_SERVICEGROUP_PASSIVE_SVC_CHECKS", &ExternalCommandProcessor::DisableServicegroupPassiveSvcChecks);
-               RegisterCommand("ENABLE_HOSTGROUP_PASSIVE_SVC_CHECKS", &ExternalCommandProcessor::EnableHostgroupPassiveSvcChecks);
-               RegisterCommand("DISABLE_HOSTGROUP_PASSIVE_SVC_CHECKS", &ExternalCommandProcessor::DisableHostgroupPassiveSvcChecks);
-               RegisterCommand("PROCESS_FILE", &ExternalCommandProcessor::ProcessFile);
-               RegisterCommand("SCHEDULE_SVC_DOWNTIME", &ExternalCommandProcessor::ScheduleSvcDowntime);
-               RegisterCommand("DEL_SVC_DOWNTIME", &ExternalCommandProcessor::DelSvcDowntime);
-               RegisterCommand("SCHEDULE_HOST_DOWNTIME", &ExternalCommandProcessor::ScheduleHostDowntime);
-               RegisterCommand("DEL_HOST_DOWNTIME", &ExternalCommandProcessor::DelHostDowntime);
-               RegisterCommand("SCHEDULE_HOST_SVC_DOWNTIME", &ExternalCommandProcessor::ScheduleHostSvcDowntime);
-               RegisterCommand("SCHEDULE_HOSTGROUP_HOST_DOWNTIME", &ExternalCommandProcessor::ScheduleHostgroupHostDowntime);
-               RegisterCommand("SCHEDULE_HOSTGROUP_SVC_DOWNTIME", &ExternalCommandProcessor::ScheduleHostgroupSvcDowntime);
-               RegisterCommand("SCHEDULE_SERVICEGROUP_HOST_DOWNTIME", &ExternalCommandProcessor::ScheduleServicegroupHostDowntime);
-               RegisterCommand("SCHEDULE_SERVICEGROUP_SVC_DOWNTIME", &ExternalCommandProcessor::ScheduleServicegroupSvcDowntime);
-               RegisterCommand("ADD_HOST_COMMENT", &ExternalCommandProcessor::AddHostComment);
-               RegisterCommand("DEL_HOST_COMMENT", &ExternalCommandProcessor::DelHostComment);
-               RegisterCommand("ADD_SVC_COMMENT", &ExternalCommandProcessor::AddSvcComment);
-               RegisterCommand("DEL_SVC_COMMENT", &ExternalCommandProcessor::DelSvcComment);
-               RegisterCommand("DEL_ALL_HOST_COMMENTS", &ExternalCommandProcessor::DelAllHostComments);
-               RegisterCommand("DEL_ALL_SVC_COMMENTS", &ExternalCommandProcessor::DelAllSvcComments);
-               RegisterCommand("SEND_CUSTOM_HOST_NOTIFICATION", &ExternalCommandProcessor::SendCustomHostNotification);
-               RegisterCommand("SEND_CUSTOM_SVC_NOTIFICATION", &ExternalCommandProcessor::SendCustomSvcNotification);
-
-               m_Initialized = true;
-       }
+       boost::call_once(m_InitializeOnce, &ExternalCommandProcessor::Initialize);
+
+       Callback callback;
+
+       {
+               boost::mutex::scoped_lock lock(m_Mutex);
+
+               map<String, ExternalCommandProcessor::Callback>::iterator it;
+               it = m_Commands.find(command);
 
-       map<String, ExternalCommandProcessor::Callback>::iterator it;
-       it = m_Commands.find(command);
+               if (it == m_Commands.end())
+                       BOOST_THROW_EXCEPTION(invalid_argument("The external command '" + command + "' does not exist."));
 
-       if (it == m_Commands.end())
-               BOOST_THROW_EXCEPTION(invalid_argument("The external command '" + command + "' does not exist."));
+               callback = it->second;
+       }
+
+       {
+               recursive_mutex::scoped_lock lock(Application::GetMutex());
+               callback(time, arguments);
+       }
 
-       it->second(time, arguments);
 }
 
+/**
+ * @threadsafety Always.
+ */
+void ExternalCommandProcessor::Initialize(void)
+{
+       RegisterCommand("PROCESS_SERVICE_CHECK_RESULT", &ExternalCommandProcessor::ProcessServiceCheckResult);
+       RegisterCommand("SCHEDULE_SVC_CHECK", &ExternalCommandProcessor::ScheduleSvcCheck);
+       RegisterCommand("SCHEDULE_FORCED_SVC_CHECK", &ExternalCommandProcessor::ScheduleForcedSvcCheck);
+       RegisterCommand("ENABLE_SVC_CHECK", &ExternalCommandProcessor::EnableSvcCheck);
+       RegisterCommand("DISABLE_SVC_CHECK", &ExternalCommandProcessor::DisableSvcCheck);
+       RegisterCommand("SHUTDOWN_PROCESS", &ExternalCommandProcessor::ShutdownProcess);
+       RegisterCommand("SCHEDULE_FORCED_HOST_SVC_CHECKS", &ExternalCommandProcessor::ScheduleForcedHostSvcChecks);
+       RegisterCommand("SCHEDULE_HOST_SVC_CHECKS", &ExternalCommandProcessor::ScheduleHostSvcChecks);
+       RegisterCommand("ENABLE_HOST_SVC_CHECKS", &ExternalCommandProcessor::EnableHostSvcChecks);
+       RegisterCommand("DISABLE_HOST_SVC_CHECKS", &ExternalCommandProcessor::DisableHostSvcChecks);
+       RegisterCommand("ACKNOWLEDGE_SVC_PROBLEM", &ExternalCommandProcessor::AcknowledgeSvcProblem);
+       RegisterCommand("ACKNOWLEDGE_SVC_PROBLEM_EXPIRE", &ExternalCommandProcessor::AcknowledgeSvcProblemExpire);
+       RegisterCommand("REMOVE_SVC_ACKNOWLEDGEMENT", &ExternalCommandProcessor::RemoveHostAcknowledgement);
+       RegisterCommand("ACKNOWLEDGE_HOST_PROBLEM", &ExternalCommandProcessor::AcknowledgeHostProblem);
+       RegisterCommand("ACKNOWLEDGE_HOST_PROBLEM_EXPIRE", &ExternalCommandProcessor::AcknowledgeHostProblemExpire);
+       RegisterCommand("REMOVE_HOST_ACKNOWLEDGEMENT", &ExternalCommandProcessor::RemoveHostAcknowledgement);
+       RegisterCommand("ENABLE_HOSTGROUP_SVC_CHECKS", &ExternalCommandProcessor::EnableHostgroupSvcChecks);
+       RegisterCommand("DISABLE_HOSTGROUP_SVC_CHECKS", &ExternalCommandProcessor::DisableHostgroupSvcChecks);
+       RegisterCommand("ENABLE_SERVICEGROUP_SVC_CHECKS", &ExternalCommandProcessor::EnableServicegroupSvcChecks);
+       RegisterCommand("DISABLE_SERVICEGROUP_SVC_CHECKS", &ExternalCommandProcessor::DisableServicegroupSvcChecks);
+       RegisterCommand("ENABLE_PASSIVE_SVC_CHECKS", &ExternalCommandProcessor::EnablePassiveSvcChecks);
+       RegisterCommand("DISABLE_PASSIVE_SVC_CHECKS", &ExternalCommandProcessor::DisablePassiveSvcChecks);
+       RegisterCommand("ENABLE_SERVICEGROUP_PASSIVE_SVC_CHECKS", &ExternalCommandProcessor::EnableServicegroupPassiveSvcChecks);
+       RegisterCommand("DISABLE_SERVICEGROUP_PASSIVE_SVC_CHECKS", &ExternalCommandProcessor::DisableServicegroupPassiveSvcChecks);
+       RegisterCommand("ENABLE_HOSTGROUP_PASSIVE_SVC_CHECKS", &ExternalCommandProcessor::EnableHostgroupPassiveSvcChecks);
+       RegisterCommand("DISABLE_HOSTGROUP_PASSIVE_SVC_CHECKS", &ExternalCommandProcessor::DisableHostgroupPassiveSvcChecks);
+       RegisterCommand("PROCESS_FILE", &ExternalCommandProcessor::ProcessFile);
+       RegisterCommand("SCHEDULE_SVC_DOWNTIME", &ExternalCommandProcessor::ScheduleSvcDowntime);
+       RegisterCommand("DEL_SVC_DOWNTIME", &ExternalCommandProcessor::DelSvcDowntime);
+       RegisterCommand("SCHEDULE_HOST_DOWNTIME", &ExternalCommandProcessor::ScheduleHostDowntime);
+       RegisterCommand("DEL_HOST_DOWNTIME", &ExternalCommandProcessor::DelHostDowntime);
+       RegisterCommand("SCHEDULE_HOST_SVC_DOWNTIME", &ExternalCommandProcessor::ScheduleHostSvcDowntime);
+       RegisterCommand("SCHEDULE_HOSTGROUP_HOST_DOWNTIME", &ExternalCommandProcessor::ScheduleHostgroupHostDowntime);
+       RegisterCommand("SCHEDULE_HOSTGROUP_SVC_DOWNTIME", &ExternalCommandProcessor::ScheduleHostgroupSvcDowntime);
+       RegisterCommand("SCHEDULE_SERVICEGROUP_HOST_DOWNTIME", &ExternalCommandProcessor::ScheduleServicegroupHostDowntime);
+       RegisterCommand("SCHEDULE_SERVICEGROUP_SVC_DOWNTIME", &ExternalCommandProcessor::ScheduleServicegroupSvcDowntime);
+       RegisterCommand("ADD_HOST_COMMENT", &ExternalCommandProcessor::AddHostComment);
+       RegisterCommand("DEL_HOST_COMMENT", &ExternalCommandProcessor::DelHostComment);
+       RegisterCommand("ADD_SVC_COMMENT", &ExternalCommandProcessor::AddSvcComment);
+       RegisterCommand("DEL_SVC_COMMENT", &ExternalCommandProcessor::DelSvcComment);
+       RegisterCommand("DEL_ALL_HOST_COMMENTS", &ExternalCommandProcessor::DelAllHostComments);
+       RegisterCommand("DEL_ALL_SVC_COMMENTS", &ExternalCommandProcessor::DelAllSvcComments);
+       RegisterCommand("SEND_CUSTOM_HOST_NOTIFICATION", &ExternalCommandProcessor::SendCustomHostNotification);
+       RegisterCommand("SEND_CUSTOM_SVC_NOTIFICATION", &ExternalCommandProcessor::SendCustomSvcNotification);
+}
+
+/**
+ * @threadsafety Always.
+ */
 void ExternalCommandProcessor::RegisterCommand(const String& command, const ExternalCommandProcessor::Callback& callback)
 {
+       boost::mutex::scoped_lock lock(m_Mutex);
        m_Commands[command] = callback;
 }
 
index 755c36b27a9cc4be7e34ba897849fcb6e0c863f5..965e2ecf40bc5a6596870ba13c72556d5b186c70 100644 (file)
@@ -28,6 +28,19 @@ public:
        static void Execute(const String& line);
        static void Execute(double time, const String& command, const vector<String>& arguments);
 
+private:
+       typedef function<void (double time, const vector<String>& arguments)> Callback;
+
+       static boost::once_flag m_InitializeOnce;
+       static boost::mutex m_Mutex;
+       static map<String, Callback> m_Commands;
+
+       ExternalCommandProcessor(void);
+
+       static void Initialize(void);
+
+       static void RegisterCommand(const String& command, const Callback& callback);
+
        static void ProcessServiceCheckResult(double time, const vector<String>& arguments);
        static void ScheduleSvcCheck(double time, const vector<String>& arguments);
        static void ScheduleForcedSvcCheck(double time, const vector<String>& arguments);
@@ -72,16 +85,6 @@ public:
        static void DelAllSvcComments(double time, const vector<String>& arguments);
        static void SendCustomHostNotification(double time, const vector<String>& arguments);
        static void SendCustomSvcNotification(double time, const vector<String>& arguments);
-
-private:
-       typedef function<void (double time, const vector<String>& arguments)> Callback;
-
-       static bool m_Initialized;
-       static map<String, Callback> m_Commands;
-
-       ExternalCommandProcessor(void);
-
-       static void RegisterCommand(const String& command, const Callback& callback);
 };
 
 }
index 5adcb455ec5260fc77b37f8871609e7e8ed593d5..b01e4b8a56e92019c6959de717b0f049262c8a46 100644 (file)
@@ -59,11 +59,17 @@ String Host::GetDisplayName(void) const
                return GetName();
 }
 
+/**
+ * @threadsafety Always.
+ */
 bool Host::Exists(const String& name)
 {
        return (DynamicObject::GetObject("Host", name));
 }
 
+/**
+ * @threadsafety Always.
+ */
 Host::Ptr Host::GetByName(const String& name)
 {
        DynamicObject::Ptr configObject = DynamicObject::GetObject("Host", name);
index 7cb5d7d740eb8f8cd173754a155efa56f33f2e0f..48b6cdfb8f5460961fbfab081ded58aacaacdf44 100644 (file)
@@ -50,11 +50,17 @@ String HostGroup::GetActionUrl(void) const
        return Get("action_url");
 }
 
+/**
+ * @threadsafety Always.
+ */
 bool HostGroup::Exists(const String& name)
 {
        return (DynamicObject::GetObject("HostGroup", name));
 }
 
+/**
+ * @threadsafety Always.
+ */
 HostGroup::Ptr HostGroup::GetByName(const String& name)
 {
        DynamicObject::Ptr configObject = DynamicObject::GetObject("HostGroup", name);
index 8e45be9a0ec267f0d8082af241c47f5122f4fe14..431843ddc5f2d9aa39fa19b5edd46244e2836b82 100644 (file)
@@ -27,7 +27,7 @@ Value MacroProcessor::ResolveMacros(const Value& cmd, const Dictionary::Ptr& mac
 
        if (cmd.IsScalar()) {
                result = InternalResolveMacros(cmd, macros);
-       } else {
+       } else if (cmd.IsObjectType<Dictionary>()) {
                Dictionary::Ptr resultDict = boost::make_shared<Dictionary>();
                Dictionary::Ptr dict = cmd;
 
@@ -37,6 +37,8 @@ Value MacroProcessor::ResolveMacros(const Value& cmd, const Dictionary::Ptr& mac
                }
 
                result = resultDict;
+       } else {
+               BOOST_THROW_EXCEPTION(invalid_argument("Command is not a string or dictionary."));
        }
 
        return result;
index fb8880f487206914025670629a8049ba3b0d1bc1..9e6570ae6d50339709a37bb413c273b381780f4a 100644 (file)
@@ -27,9 +27,12 @@ PluginCheckTask::PluginCheckTask(const ScriptTask::Ptr& task, const Process::Ptr
        : m_Task(task), m_Process(process)
 { }
 
+/**
+ * @threadsafety Always.
+ */
 void PluginCheckTask::ScriptFunc(const ScriptTask::Ptr& task, const vector<Value>& arguments)
 {
-       assert(Application::IsMainThread());
+       recursive_mutex::scoped_lock lock(Application::GetMutex());
 
        if (arguments.size() < 1)
                BOOST_THROW_EXCEPTION(invalid_argument("Missing argument: Service must be specified."));
@@ -57,8 +60,13 @@ void PluginCheckTask::ScriptFunc(const ScriptTask::Ptr& task, const vector<Value
        process->Start(boost::bind(&PluginCheckTask::ProcessFinishedHandler, ct));
 }
 
+/**
+ * @threadsafety Always.
+ */
 void PluginCheckTask::ProcessFinishedHandler(PluginCheckTask ct)
 {
+       recursive_mutex::scoped_lock lock(Application::GetMutex());
+
        ProcessResult pr;
 
        try {
index 7ca023ed694ab329a6ff4f41c3b81306050aacb7..0168de29f4994ecfbc6a3ad4ebb711c8488e5327 100644 (file)
@@ -28,9 +28,12 @@ PluginNotificationTask::PluginNotificationTask(const ScriptTask::Ptr& task, cons
        : m_Task(task), m_Process(process), m_ServiceName(service), m_Command(command)
 { }
 
+/**
+ * @threadsafety Always.
+ */
 void PluginNotificationTask::ScriptFunc(const ScriptTask::Ptr& task, const vector<Value>& arguments)
 {
-       assert(Application::IsMainThread());
+       recursive_mutex::scoped_lock lock(Application::GetMutex());
 
        if (arguments.size() < 1)
                BOOST_THROW_EXCEPTION(invalid_argument("Missing argument: Notification target must be specified."));
@@ -62,8 +65,13 @@ void PluginNotificationTask::ScriptFunc(const ScriptTask::Ptr& task, const vecto
        process->Start(boost::bind(&PluginNotificationTask::ProcessFinishedHandler, ct));
 }
 
+/**
+ * @threadsafety Always.
+ */
 void PluginNotificationTask::ProcessFinishedHandler(PluginNotificationTask ct)
 {
+       recursive_mutex::scoped_lock lock(Application::GetMutex());
+
        ProcessResult pr;
 
        try {
index b9cc4cd4798aa56bcdf8cab8d3cfbd58bf3a48a5..6038a4a088307156ae3f0a12d044e139cddd8b86 100644 (file)
@@ -25,8 +25,8 @@ const int Service::DefaultMaxCheckAttempts = 3;
 const int Service::DefaultCheckInterval = 5 * 60;
 const int Service::CheckIntervalDivisor = 5;
 
-boost::signal<void (const Service::Ptr&, const String&)> Service::OnCheckerChanged;
-boost::signal<void (const Service::Ptr&, const Value&)> Service::OnNextCheckChanged;
+signals2::signal<void (const Service::Ptr&, const String&)> Service::OnCheckerChanged;
+signals2::signal<void (const Service::Ptr&, const Value&)> Service::OnNextCheckChanged;
 
 Value Service::GetCheckCommand(void) const
 {
@@ -350,7 +350,7 @@ void Service::ApplyCheckResult(const Dictionary::Ptr& cr)
 
                /* Make sure the notification component sees the updated
                 * state/state_type attributes. */
-               DynamicObject::FlushTx();
+               DynamicObject::NewTx();
 
                if (IsReachable() && !IsInDowntime() && !IsAcknowledged())
                        RequestNotifications(NotificationStateChange);
@@ -458,6 +458,8 @@ void Service::BeginExecuteCheck(const function<void (void)>& callback)
 void Service::CheckCompletedHandler(const Dictionary::Ptr& scheduleInfo,
     const ScriptTask::Ptr& task, const function<void (void)>& callback)
 {
+       ObjectLock olock(this);
+
        Set("current_task", Empty);
 
        scheduleInfo->Set("execution_end", Utility::GetTime());
@@ -521,7 +523,7 @@ void Service::ProcessCheckResult(const Dictionary::Ptr& cr)
 
        /* flush the current transaction so other instances see the service's
         * new state when they receive the CheckResult message */
-       DynamicObject::FlushTx();
+       DynamicObject::NewTx();
 
        RequestMessage rm;
        rm.SetMethod("checker::CheckResult");
index dfeed2388770cf95026bd9d0ce7e87e9f5b1d6de..e3ff4fd6a9b2f262415e03c4965e0e5f1aa2f43b 100644 (file)
@@ -210,6 +210,8 @@ void Service::RemoveExpiredComments(void)
 
 void Service::CommentsExpireTimerHandler(void)
 {
+       recursive_mutex::scoped_lock lock(Application::GetMutex());
+
        DynamicObject::Ptr object;
        BOOST_FOREACH(tie(tuples::ignore, object), DynamicType::GetByName("Service")->GetObjects()) {
                Service::Ptr service = dynamic_pointer_cast<Service>(object);
index 2c7df0cfe5c9e1464618d472b52fedd6b5f3a6ac..fa12a93fc101437b06346e67d5013f9c3f2122cf 100644 (file)
@@ -275,6 +275,8 @@ void Service::RemoveExpiredDowntimes(void)
 
 void Service::DowntimesExpireTimerHandler(void)
 {
+       recursive_mutex::scoped_lock lock(Application::GetMutex());
+
        DynamicObject::Ptr object;
        BOOST_FOREACH(tie(tuples::ignore, object), DynamicType::GetByName("Service")->GetObjects()) {
                Service::Ptr service = dynamic_pointer_cast<Service>(object);
index 72a09e8c9c0dad8bef47ecf3aa9157a46142932e..bf4e72bf12cb606fa3b1a6760e34fe19193dd703 100644 (file)
@@ -67,11 +67,17 @@ String Service::GetDisplayName(void) const
        return GetName();
 }
 
+/**
+ * @threadsafety Always.
+ */
 bool Service::Exists(const String& name)
 {
        return (DynamicObject::GetObject("Service", name));
 }
 
+/**
+ * @threadsafety Always.
+ */
 Service::Ptr Service::GetByName(const String& name)
 {
        DynamicObject::Ptr configObject = DynamicObject::GetObject("Service", name);
@@ -82,9 +88,14 @@ Service::Ptr Service::GetByName(const String& name)
        return dynamic_pointer_cast<Service>(configObject);
 }
 
+/**
+ * @threadsafety Always.
+ */
 Service::Ptr Service::GetByNamePair(const String& hostName, const String& serviceName)
 {
        if (!hostName.IsEmpty()) {
+               recursive_mutex::scoped_lock lock(Application::GetMutex());
+
                Host::Ptr host = Host::GetByName(hostName);
                return host->GetServiceByShortName(serviceName);
        } else {
index ba76f6cc51e3639d49520a8fc97f2384b9dd2bd8..2c3d9e99daf473a4653cbccca51141bd2b772e0e 100644 (file)
@@ -180,8 +180,8 @@ public:
        static ServiceStateType StateTypeFromString(const String& state);
        static String StateTypeToString(ServiceStateType state);
 
-       static boost::signal<void (const Service::Ptr&, const String&)> OnCheckerChanged;
-       static boost::signal<void (const Service::Ptr&, const Value&)> OnNextCheckChanged;
+       static signals2::signal<void (const Service::Ptr&, const String&)> OnCheckerChanged;
+       static signals2::signal<void (const Service::Ptr&, const Value&)> OnNextCheckChanged;
 
        /* Downtimes */
        static int GetNextDowntimeID(void);
index e63c134271b8ec3a0cc43366024da3c983fb7b4d..c07e940cfea53ddf6f4d0937370c391f38138fba 100644 (file)
@@ -50,11 +50,17 @@ String ServiceGroup::GetActionUrl(void) const
        return Get("action_url");
 }
 
+/**
+ * @threadsafety Always.
+ */
 bool ServiceGroup::Exists(const String& name)
 {
        return (DynamicObject::GetObject("ServiceGroup", name));
 }
 
+/**
+ * @threadsafety Always.
+ */
 ServiceGroup::Ptr ServiceGroup::GetByName(const String& name)
 {
        DynamicObject::Ptr configObject = DynamicObject::GetObject("ServiceGroup", name);
index 62ab582cfc290248a3186db45a4b0fb2a1623f93..5c0f0de6a0beb2eb1baa34813aa04bedfc2f0d26 100644 (file)
@@ -366,7 +366,7 @@ PyObject *PythonLanguage::PyRegisterFunction(PyObject *self, PyObject *args)
        }
 
        {
-               boost::mutex::scoped_lock lock(Application::GetMutex());
+               recursive_mutex::scoped_lock lock(Application::GetMutex());
                interp->RegisterPythonFunction(name, object);
        }
 
index 32659adb7995b86d8ca8a31afb8a24a2acdb330a..d5631cc4e380ad720cbd1bab438c775a6ea0227a 100644 (file)
@@ -30,10 +30,10 @@ static AttributeDescription endpointAttributes[] = {
 
 REGISTER_TYPE(Endpoint, endpointAttributes);
 
-boost::signal<void (const Endpoint::Ptr&)> Endpoint::OnConnected;
-boost::signal<void (const Endpoint::Ptr&)> Endpoint::OnDisconnected;
-boost::signal<void (const Endpoint::Ptr&, const String& topic)> Endpoint::OnSubscriptionRegistered;
-boost::signal<void (const Endpoint::Ptr&, const String& topic)> Endpoint::OnSubscriptionUnregistered;
+signals2::signal<void (const Endpoint::Ptr&)> Endpoint::OnConnected;
+signals2::signal<void (const Endpoint::Ptr&)> Endpoint::OnDisconnected;
+signals2::signal<void (const Endpoint::Ptr&, const String& topic)> Endpoint::OnSubscriptionRegistered;
+signals2::signal<void (const Endpoint::Ptr&, const String& topic)> Endpoint::OnSubscriptionUnregistered;
 
 /**
  * Constructor for the Endpoint class.
@@ -201,13 +201,13 @@ void Endpoint::SetSubscriptions(const Dictionary::Ptr& subscriptions)
 
 void Endpoint::RegisterTopicHandler(const String& topic, const function<Endpoint::Callback>& callback)
 {
-       map<String, shared_ptr<boost::signal<Endpoint::Callback> > >::iterator it;
+       map<String, shared_ptr<signals2::signal<Endpoint::Callback> > >::iterator it;
        it = m_TopicHandlers.find(topic);
 
-       shared_ptr<boost::signal<Endpoint::Callback> > sig;
+       shared_ptr<signals2::signal<Endpoint::Callback> > sig;
 
        if (it == m_TopicHandlers.end()) {
-               sig = boost::make_shared<boost::signal<Endpoint::Callback> >();
+               sig = boost::make_shared<signals2::signal<Endpoint::Callback> >();
                m_TopicHandlers.insert(make_pair(topic, sig));
        } else {
                sig = it->second;
@@ -271,7 +271,7 @@ void Endpoint::ProcessRequest(const Endpoint::Ptr& sender, const RequestMessage&
                if (!request.GetMethod(&method))
                        return;
 
-               map<String, shared_ptr<boost::signal<Endpoint::Callback> > >::iterator it;
+               map<String, shared_ptr<signals2::signal<Endpoint::Callback> > >::iterator it;
                it = m_TopicHandlers.find(method);
 
                if (it == m_TopicHandlers.end())
index 9b8e22eb81fb92616f254030224d2b1e667b3b63..2d337f16fa714c5507aa308ccbbbe982e8602af1 100644 (file)
@@ -71,11 +71,11 @@ public:
 
        static Endpoint::Ptr MakeEndpoint(const String& name, bool replicated, bool local = true);
 
-       static boost::signal<void (const Endpoint::Ptr&)> OnConnected;
-       static boost::signal<void (const Endpoint::Ptr&)> OnDisconnected;
+       static signals2::signal<void (const Endpoint::Ptr&)> OnConnected;
+       static signals2::signal<void (const Endpoint::Ptr&)> OnDisconnected;
 
-       static boost::signal<void (const Endpoint::Ptr&, const String& topic)> OnSubscriptionRegistered;
-       static boost::signal<void (const Endpoint::Ptr&, const String& topic)> OnSubscriptionUnregistered;
+       static signals2::signal<void (const Endpoint::Ptr&, const String& topic)> OnSubscriptionRegistered;
+       static signals2::signal<void (const Endpoint::Ptr&, const String& topic)> OnSubscriptionUnregistered;
 
 private:
        bool m_ReceivedWelcome; /**< Have we received a welcome message
@@ -83,7 +83,7 @@ private:
        bool m_SentWelcome; /**< Have we sent a welcome message to this
                                 endpoint? */
 
-       map<String, shared_ptr<boost::signal<Callback> > > m_TopicHandlers;
+       map<String, shared_ptr<signals2::signal<Callback> > > m_TopicHandlers;
 
        void NewMessageHandler(const MessagePart& message);
        void ClientClosedHandler(void);
index 3d122a7cf6aec72db0b434e5a6894f854541d202..e7d004bbaaa8c1c5075443f748545c909d1f177c 100644 (file)
@@ -325,6 +325,8 @@ bool EndpointManager::RequestTimeoutLessComparer(const pair<String, PendingReque
 
 void EndpointManager::SubscriptionTimerHandler(void)
 {
+       recursive_mutex::scoped_lock lock(Application::GetMutex());
+
        Dictionary::Ptr subscriptions = boost::make_shared<Dictionary>();
 
        DynamicObject::Ptr object;
@@ -349,6 +351,8 @@ void EndpointManager::SubscriptionTimerHandler(void)
 
 void EndpointManager::ReconnectTimerHandler(void)
 {
+       recursive_mutex::scoped_lock lock(Application::GetMutex());
+
        DynamicObject::Ptr object;
        BOOST_FOREACH(tie(tuples::ignore, object), DynamicType::GetByName("Endpoint")->GetObjects()) {
                Endpoint::Ptr endpoint = dynamic_pointer_cast<Endpoint>(object);
@@ -373,6 +377,8 @@ void EndpointManager::ReconnectTimerHandler(void)
 
 void EndpointManager::RequestTimerHandler(void)
 {
+       recursive_mutex::scoped_lock lock(Application::GetMutex());
+
        map<String, PendingRequest>::iterator it;
        for (it = m_Requests.begin(); it != m_Requests.end(); it++) {
                if (it->second.HasTimedOut()) {
index 120f9f57b5913c468fa7be0d07a0df9310d6fe4e..bcdb14c149b815009e9283bf4ea237795c7bc801 100644 (file)
@@ -60,7 +60,7 @@ public:
 
        void ProcessResponseMessage(const Endpoint::Ptr& sender, const ResponseMessage& message);
 
-       boost::signal<void (const EndpointManager::Ptr&, const Endpoint::Ptr&)> OnNewEndpoint;
+       signals2::signal<void (const EndpointManager::Ptr&, const Endpoint::Ptr&)> OnNewEndpoint;
 
 private:
        String m_Identity;
index 84589e3bc81b8e679ae18ed6db8ef1ee2f06e376..9473b89514a6f4bd6b8e1c99f3840e0cb32c1c05 100644 (file)
@@ -38,7 +38,7 @@ public:
 
        void SendMessage(const MessagePart& message);
 
-       boost::signal<void (const JsonRpcConnection::Ptr&, const MessagePart&)> OnNewMessage;
+       signals2::signal<void (const JsonRpcConnection::Ptr&, const MessagePart&)> OnNewMessage;
 
 protected:
        virtual void ProcessData(void);