void CheckerComponent::CheckTimerHandler(void)
{
+ recursive_mutex::scoped_lock lock(Application::GetMutex());
+
double now = Utility::GetTime();
long tasks = 0;
int missedServices = 0, missedChecks = 0;
- while (!m_IdleServices.empty()) {
- typedef nth_index<ServiceSet, 1>::type CheckTimeView;
- CheckTimeView& idx = boost::get<1>(m_IdleServices);
+ for (;;) {
+ Service::Ptr service;
+
+ {
+ boost::mutex::scoped_lock lock(m_Mutex);
+
+ typedef nth_index<ServiceSet, 1>::type CheckTimeView;
+ CheckTimeView& idx = boost::get<1>(m_IdleServices);
+
+ if (idx.begin() == idx.end())
+ break;
- CheckTimeView::iterator it = idx.begin();
- Service::Ptr service = it->lock();
+ CheckTimeView::iterator it = idx.begin();
+ service = it->lock();
+
+ if (!service) {
+ idx.erase(it);
+ continue;
+ }
+
+ {
+ ObjectLock olock(service);
+
+ if (service->GetNextCheck() > now)
+ break;
+ }
- if (!service) {
idx.erase(it);
- continue;
}
- if (service->GetNextCheck() > now)
- break;
-
- idx.erase(it);
+ ObjectLock olock(service);
/* reschedule the service if checks are currently disabled
* for it and this is not a forced check */
service->UpdateNextCheck();
- idx.insert(service);
+ {
+ boost::mutex::scoped_lock lock(m_Mutex);
+
+ typedef nth_index<ServiceSet, 1>::type CheckTimeView;
+ CheckTimeView& idx = boost::get<1>(m_IdleServices);
+
+ idx.insert(service);
+ }
continue;
}
void CheckerComponent::CheckCompletedHandler(const Service::Ptr& service)
{
- /* remove the service from the list of pending services; if it's not in the
- * list this was a manual (i.e. forced) check and we must not re-add the
- * service to the services list because it's already there. */
- CheckerComponent::ServiceSet::iterator it;
- it = m_PendingServices.find(service);
- if (it != m_PendingServices.end()) {
- m_PendingServices.erase(it);
- m_IdleServices.insert(service);
+ {
+ boost::mutex::scoped_lock lock(m_Mutex);
+
+ /* remove the service from the list of pending services; if it's not in the
+ * list this was a manual (i.e. forced) check and we must not re-add the
+ * service to the services list because it's already there. */
+ CheckerComponent::ServiceSet::iterator it;
+ it = m_PendingServices.find(service);
+ if (it != m_PendingServices.end()) {
+ m_PendingServices.erase(it);
+ m_IdleServices.insert(service);
+ }
}
RescheduleCheckTimer();
- Logger::Write(LogDebug, "checker", "Check finished for service '" + service->GetName() + "'");
+ {
+ ObjectLock olock(service);
+ Logger::Write(LogDebug, "checker", "Check finished for service '" + service->GetName() + "'");
+ }
}
void CheckerComponent::ResultTimerHandler(void)
Logger::Write(LogDebug, "checker", "ResultTimerHandler entered.");
stringstream msgbuf;
- msgbuf << "Pending services: " << m_PendingServices.size() << "; Idle services: " << m_IdleServices.size();
+
+ {
+ boost::mutex::scoped_lock lock(m_Mutex);
+
+ msgbuf << "Pending services: " << m_PendingServices.size() << "; Idle services: " << m_IdleServices.size();
+ }
+
Logger::Write(LogInformation, "checker", msgbuf.str());
}
void CheckerComponent::CheckerChangedHandler(const Service::Ptr& service)
{
- String checker = service->GetChecker();
+ String checker;
+
+ {
+ ObjectLock olock(service);
+ checker = service->GetChecker();
+ }
if (checker == EndpointManager::GetInstance()->GetIdentity() || checker == m_Endpoint->GetName()) {
+ boost::mutex::scoped_lock lock(m_Mutex);
+
if (m_PendingServices.find(service) != m_PendingServices.end())
return;
m_IdleServices.insert(service);
} else {
+ boost::mutex::scoped_lock lock(m_Mutex);
+
m_IdleServices.erase(service);
m_PendingServices.erase(service);
}
void CheckerComponent::NextCheckChangedHandler(const Service::Ptr& service)
{
- /* remove and re-insert the service from the set in order to force an index update */
- typedef nth_index<ServiceSet, 0>::type ServiceView;
- ServiceView& idx = boost::get<0>(m_IdleServices);
+ {
+ boost::mutex::scoped_lock lock(m_Mutex);
- ServiceView::iterator it = idx.find(service);
- if (it == idx.end())
- return;
+ /* remove and re-insert the service from the set in order to force an index update */
+ typedef nth_index<ServiceSet, 0>::type ServiceView;
+ ServiceView& idx = boost::get<0>(m_IdleServices);
+
+ ServiceView::iterator it = idx.find(service);
+ if (it == idx.end())
+ return;
- idx.erase(it);
- idx.insert(service);
+ idx.erase(it);
+ idx.insert(service);
+ }
RescheduleCheckTimer();
}
if (!service)
return;
- m_IdleServices.erase(service);
- m_PendingServices.erase(service);
+ {
+ boost::mutex::scoped_lock lock(m_Mutex);
+
+ m_IdleServices.erase(service);
+ m_PendingServices.erase(service);
+ }
}
void CheckerComponent::RescheduleCheckTimer(void)
{
- if (m_IdleServices.empty())
- return;
-
- typedef nth_index<ServiceSet, 1>::type CheckTimeView;
- CheckTimeView& idx = boost::get<1>(m_IdleServices);
-
Service::Ptr service;
- do {
- CheckTimeView::iterator it = idx.begin();
+ {
+ boost::mutex::scoped_lock lock(m_Mutex);
- if (it == idx.end())
+ if (m_IdleServices.empty())
return;
- service = it->lock();
+ typedef nth_index<ServiceSet, 1>::type CheckTimeView;
+ CheckTimeView& idx = boost::get<1>(m_IdleServices);
+
+ do {
+ CheckTimeView::iterator it = idx.begin();
- if (!service)
- idx.erase(it);
- } while (!service);
+ if (it == idx.end())
+ return;
+
+ service = it->lock();
+
+ if (!service)
+ idx.erase(it);
+ } while (!service);
+ }
+ ObjectLock olock(service);
m_CheckTimer->Reschedule(service->GetNextCheck());
}
if (!service)
return 0;
- return service->GetNextCheck();
+ {
+ ObjectLock olock(service);
+ return service->GetNextCheck();
+ }
}
};
private:
Endpoint::Ptr m_Endpoint;
+ boost::mutex m_Mutex;
+
ServiceSet m_IdleServices;
ServiceSet m_PendingServices;
#include <i2-base.h>
#include <i2-icinga.h>
-#include <boost/multi_index_container.hpp>
-#include <boost/multi_index/ordered_index.hpp>
-#include <boost/multi_index/key_extractors.hpp>
-
-using boost::multi_index_container;
-using boost::multi_index::indexed_by;
-using boost::multi_index::identity;
-using boost::multi_index::ordered_unique;
-using boost::multi_index::ordered_non_unique;
-using boost::multi_index::nth_index;
-
#include "checkercomponent.h"
#endif /* I2CHECKER_H */
}
}
-
if (!fifo_ok && mkfifo(commandPath.CStr(), S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP) < 0)
BOOST_THROW_EXCEPTION(PosixException("mkfifo() failed", errno));
line[strlen(line) - 1] = '\0';
String command = line;
- Application::GetEQ().Post(boost::bind(&CompatComponent::ProcessCommand, this, command));
+
+ {
+ recursive_mutex::scoped_lock lock(Application::GetMutex());
+
+ ProcessCommand(command);
+ }
}
fclose(fp);
*/
void CompatComponent::StatusTimerHandler(void)
{
+ recursive_mutex::scoped_lock lock(Application::GetMutex());
+
Logger::Write(LogInformation, "compat", "Writing compat status information");
String statuspath = GetStatusPath();
void DelegationComponent::DelegationTimerHandler(void)
{
+ recursive_mutex::scoped_lock lock(Application::GetMutex());
+
map<Endpoint::Ptr, int> histogram;
DynamicObject::Ptr object;
*/
void DemoComponent::DemoTimerHandler(void)
{
+ recursive_mutex::scoped_lock lock(Application::GetMutex());
+
Logger::Write(LogInformation, "demo", "Sending multicast 'hello"
" world' message.");
*/
void NotificationComponent::NotificationTimerHandler(void)
{
+ recursive_mutex::scoped_lock lock(Application::GetMutex());
+
// TODO: implement
}
DynamicObject::OnRegistered.connect(boost::bind(&ReplicationComponent::LocalObjectRegisteredHandler, this, _1));
DynamicObject::OnUnregistered.connect(boost::bind(&ReplicationComponent::LocalObjectUnregisteredHandler, this, _1));
- DynamicObject::OnTransactionClosing.connect(boost::bind(&ReplicationComponent::TransactionClosingHandler, this, _1));
+ DynamicObject::OnTransactionClosing.connect(boost::bind(&ReplicationComponent::TransactionClosingHandler, this, _2));
Endpoint::OnConnected.connect(boost::bind(&ReplicationComponent::EndpointConnectedHandler, this, _1));
if (hasError)
return false;
-/* Logger::Write(LogInformation, "icinga-app", "Validating config items...");
- DynamicType::Ptr type;
- BOOST_FOREACH(tie(tuples::ignore, type), DynamicType::GetTypes()) {
- ConfigType::Ptr ctype = ConfigType::GetByName(type->GetName());
-
- if (!ctype) {
- Logger::Write(LogWarning, "icinga-app", "No config type found for type '" + type->GetName() + "'");
-
- continue;
- }
-
- DynamicObject::Ptr object;
- BOOST_FOREACH(tie(tuples::ignore, object), type->GetObjects()) {
- ctype->ValidateObject(object);
- }
- }*/
-
if (validateOnly)
return true;
static void ReloadConfigTimerHandler(void)
{
if (g_ReloadConfig) {
- Logger::Write(LogInformation, "icinga-app", "Received SIGHUP. Reloading config files.");
- LoadConfigFiles(false);
+ {
+ recursive_mutex::scoped_lock lock(Application::GetMutex());
+
+ Logger::Write(LogInformation, "icinga-app", "Received SIGHUP. Reloading config files.");
+ LoadConfigFiles(false);
+ }
+
g_ReloadConfig = false;
}
}
lt_dlinit();
#endif /* _WIN32 */
- /* This must be done before calling any other functions
- * in the base library. */
- Application::SetMainThread();
-
/* Set command-line arguments. */
Application::SetArgC(argc);
Application::SetArgV(argv);
return EXIT_FAILURE;
}
- DynamicObject::BeginTx();
+ DynamicObject::NewTx();
bool validateOnly = g_AppParams.count("validate");
if (!LoadConfigFiles(validateOnly))
return EXIT_FAILURE;
- DynamicObject::FinishTx();
+ DynamicObject::NewTx();
if (validateOnly) {
Logger::Write(LogInformation, "icinga-app", "Terminating as requested by --validate.");
return app->Run();
}
-
using namespace icinga;
-boost::mutex Application::m_Mutex;
+recursive_mutex Application::m_Mutex;
Application *Application::m_Instance = NULL;
bool Application::m_ShuttingDown = false;
bool Application::m_Debugging = false;
m_ArgV = argv;
}
-/**
- * Runs one iteration of the event loop.
- *
- * @returns false if we're shutting down, true otherwise.
- */
-bool Application::ProcessEvents(void)
+void Application::NewTxTimerHandler(void)
{
- Object::ClearHeldObjects();
-
- double sleep = Timer::ProcessTimers();
-
- if (m_ShuttingDown)
- return false;
+ DynamicObject::NewTx();
+}
- GetEQ().ProcessEvents(m_Mutex, boost::posix_time::milliseconds(sleep * 1000));
+#ifdef _DEBUG
+void Application::ProfileTimerHandler(void)
+{
+ stringstream msgbuf;
+ msgbuf << "Active objects: " << Object::GetAliveObjectsCount();
+ Logger::Write(LogInformation, "base", msgbuf.str());
- DynamicObject::FlushTx();
+ Object::PrintMemoryProfile();
+}
+#endif /* _DEBUG */
- return true;
+void Application::ShutdownTimerHandler(void)
+{
+ if (m_ShuttingDown)
+ m_EQ.Stop();
}
/**
*/
void Application::RunEventLoop(void) const
{
- boost::mutex::scoped_lock lock(m_Mutex);
-
-#ifdef _DEBUG
- double nextProfile = 0;
-#endif /* _DEBUG */
-
/* Start the system time watch thread. */
thread t(&Application::TimeWatchThreadProc);
t.detach();
- while (!m_ShuttingDown) {
- if (!ProcessEvents())
- break;
+ /* Set up a timer to periodically flush the tx. */
+ Timer::Ptr newTxTimer = boost::make_shared<Timer>();
+ newTxTimer->OnTimerExpired.connect(boost::bind(&Application::NewTxTimerHandler));
+ newTxTimer->SetInterval(0.5);
+ newTxTimer->Start();
-#ifdef _DEBUG
- if (nextProfile < Utility::GetTime()) {
- stringstream msgbuf;
- msgbuf << "Active objects: " << Object::GetAliveObjectsCount();
- Logger::Write(LogInformation, "base", msgbuf.str());
-
- Object::PrintMemoryProfile();
+ /* Set up a timer that watches the m_Shutdown flag. */
+ Timer::Ptr shutdownTimer = boost::make_shared<Timer>();
+ shutdownTimer->OnTimerExpired.connect(boost::bind(&Application::ShutdownTimerHandler));
+ shutdownTimer->SetInterval(0.5);
+ shutdownTimer->Start();
- nextProfile = Utility::GetTime() + 15.0;
- }
+#ifdef _DEBUG
+ /* Set up a timer that periodically prints some information about the object system. */
+ Timer::Ptr profileTimer = boost::make_shared<Timer>();
+ profileTimer->OnTimerExpired.connect(boost::bind(&Application::ProfileTimerHandler));
+ flushTxTimer->SetInterval(15);
+ flushTxTimer->Start();
#endif /* _DEBUG */
- }
+
+ GetEQ().Run();
}
/**
<< " in time: " << abs(timeDiff) << " seconds";
Logger::Write(LogInformation, "base", msgbuf.str());
- /* in addition to rescheduling the timers this
- * causes the event loop to wake up thereby
- * solving the problem that timed_wait()
- * uses an absolute timestamp for the timeout */
- GetEQ().Post(boost::bind(&Timer::AdjustTimers,
- -timeDiff));
+ Timer::AdjustTimers(-timeDiff);
}
lastLoop = now;
return m_Debugging;
}
-/**
- * Checks whether we're currently on the main thread.
- *
- * @returns true if this is the main thread, false otherwise
- */
-bool Application::IsMainThread(void)
-{
- return (boost::this_thread::get_id() == m_MainThreadID);
-}
-
-/**
- * Sets the main thread to the currently running thread.
- */
-void Application::SetMainThread(void)
-{
- m_MainThreadID = boost::this_thread::get_id();
- m_EQ.SetOwner(m_MainThreadID);
-}
-
/**
* Displays a message that tells users what to do when they encounter a bug.
*/
SetConsoleCtrlHandler(&Application::CtrlHandler, TRUE);
#endif /* _WIN32 */
- DynamicObject::BeginTx();
+ DynamicObject::NewTx();
result = Main();
- DynamicObject::FinishTx();
+ DynamicObject::NewTx();
DynamicObject::DeactivateObjects();
return result;
}
/**
- * Returns the global mutex for the main thread.
+ * Returns the global mutex.
*
* @returns The mutex.
*/
-boost::mutex& Application::GetMutex(void)
+recursive_mutex& Application::GetMutex(void)
{
return m_Mutex;
}
static void SetDebugging(bool debug);
static bool IsDebugging(void);
- static bool IsMainThread(void);
- static void SetMainThread(void);
-
void UpdatePidFile(const String& filename);
void ClosePidFile(void);
static String GetPkgDataDir(void);
static void SetPkgDataDir(const String& path);
- static bool ProcessEvents(void);
-
- static boost::mutex& GetMutex(void);
+ static recursive_mutex& GetMutex(void);
static EventQueue& GetEQ(void);
void RunEventLoop(void) const;
private:
- static boost::mutex m_Mutex; /**< The main thread mutex. */
+ static recursive_mutex m_Mutex; /**< The global mutex. */
static Application *m_Instance; /**< The application instance. */
static bool m_ShuttingDown; /**< Whether the application is in the process of
static void ExceptionHandler(void);
static void TimeWatchThreadProc(void);
+ static void NewTxTimerHandler(void);
+#ifdef _DEBUG
+ static void ProfileTimerHandler(void)
+#endif /* _DEBUG */
+ static void ShutdownTimerHandler(void);
};
}
******************************************************************************/
#ifndef ASYNCTASK_H
-#define ASYNCTASK_H
+#define ASYNCTASK_H
namespace icinga
{
*/
bool IsFinished(void) const
{
+ boost::mutex::scoped_lock lock(m_Mutex);
return m_Finished;
}
*/
void Wait(void)
{
- Utility::WaitUntil(boost::bind(&AsyncTask<TClass, TResult>::IsFinished, this));
+ boost::mutex::scoped_lock lock(m_Mutex);
+ while (!m_Finished)
+ m_CV.wait(lock);
}
protected:
*/
void FinishInternal(void)
{
- assert(!m_Finished);
+ {
+ boost::mutex::scoped_lock lock(m_Mutex);
+ assert(!m_Finished);
- m_Finished = true;
+ m_Finished = true;
+
+ m_CV.notify_all();
+ }
if (!m_CompletionCallback.empty()) {
m_CompletionCallback(GetSelf());
}
}
+ mutable boost::mutex m_Mutex;
+ boost::condition_variable m_CV;
CompletionCallback m_CompletionCallback; /**< The completion callback. */
TResult m_Result; /**< The task's result. */
boost::exception_ptr m_Exception; /**< The task's exception. */
Component::Component(const Dictionary::Ptr& properties)
: DynamicObject(properties)
{
- assert(Application::IsMainThread());
-
if (!IsLocal())
BOOST_THROW_EXCEPTION(runtime_error("Component objects must be local."));
void Close(void);
- boost::signal<void (const Connection::Ptr&)> OnClosed;
+ signals2::signal<void (const Connection::Ptr&)> OnClosed;
protected:
virtual void ProcessData(void) = 0;
double DynamicObject::m_CurrentTx = 0;
set<DynamicObject *> DynamicObject::m_ModifiedObjects;
+boost::mutex DynamicObject::m_ModifiedObjectsMutex;
-boost::signal<void (const DynamicObject::Ptr&)> DynamicObject::OnRegistered;
-boost::signal<void (const DynamicObject::Ptr&)> DynamicObject::OnUnregistered;
-boost::signal<void (const set<DynamicObject *>&)> DynamicObject::OnTransactionClosing;
+signals2::signal<void (const DynamicObject::Ptr&)> DynamicObject::OnRegistered;
+signals2::signal<void (const DynamicObject::Ptr&)> DynamicObject::OnUnregistered;
+signals2::signal<void (double, const set<DynamicObject *>&)> DynamicObject::OnTransactionClosing;
DynamicObject::DynamicObject(const Dictionary::Ptr& serializedObject)
: m_ConfigTx(0)
ApplyUpdate(serializedObject, Attribute_Config);
}
+/*
+ * @threadsafety Always.
+ */
DynamicObject::~DynamicObject(void)
{
+ boost::mutex::scoped_lock lock(m_ModifiedObjectsMutex);
m_ModifiedObjects.erase(this);
}
if (tt.first->second.Type & Attribute_Config)
m_ConfigTx = tx;
- m_ModifiedObjects.insert(this);
+ {
+ boost::mutex::scoped_lock lock(m_ModifiedObjectsMutex);
+ m_ModifiedObjects.insert(this);
+ }
/* Use insert() rather than [] so we don't overwrite
* an existing oldValue if the attribute was previously
void DynamicObject::Register(void)
{
- assert(Application::IsMainThread());
+ recursive_mutex::scoped_lock lock(Application::GetMutex());
DynamicType::Ptr dtype = GetType();
void DynamicObject::Unregister(void)
{
- assert(Application::IsMainThread());
+ recursive_mutex::scoped_lock lock(Application::GetMutex());
DynamicType::Ptr dtype = GetType();
return task;
}
+/*
+ * @threadsafety Always.
+ */
void DynamicObject::DumpObjects(const String& filename)
{
+ recursive_mutex::scoped_lock lock(Application::GetMutex());
+
Logger::Write(LogInformation, "base", "Dumping program state to file '" + filename + "'");
String tempFilename = filename + ".tmp";
BOOST_THROW_EXCEPTION(PosixException("rename() failed", errno));
}
+/*
+ * @threadsafety Always.
+ */
void DynamicObject::RestoreObjects(const String& filename)
{
+ recursive_mutex::scoped_lock lock(Application::GetMutex());
+
Logger::Write(LogInformation, "base", "Restoring program state from file '" + filename + "'");
std::fstream fp;
Logger::Write(LogDebug, "base", msgbuf.str());
}
+/*
+ * @threadsafety Always.
+ */
void DynamicObject::DeactivateObjects(void)
{
+ recursive_mutex::scoped_lock lock(Application::GetMutex());
+
DynamicType::TypeMap::iterator tt;
for (tt = DynamicType::GetTypes().begin(); tt != DynamicType::GetTypes().end(); tt++) {
DynamicType::NameMap::iterator nt;
}
}
+/*
+ * @threadsafety Always.
+ */
double DynamicObject::GetCurrentTx(void)
{
+ recursive_mutex::scoped_lock lock(Application::GetMutex());
+
assert(m_CurrentTx != 0);
return m_CurrentTx;
}
-void DynamicObject::BeginTx(void)
+/*
+ * @threadsafety Always.
+ */
+void DynamicObject::NewTx(void)
{
- m_CurrentTx = Utility::GetTime();
-}
+ set<DynamicObject *> objects;
-void DynamicObject::FinishTx(void)
-{
- BOOST_FOREACH(DynamicObject *object, m_ModifiedObjects) {
- object->SendLocalUpdateEvents();
+ {
+ boost::mutex::scoped_lock lock(m_ModifiedObjectsMutex);
+
+ /* Some objects may accidentally bleed into the next transaction because
+ * we're not holding the global mutex while "stealing" the modified objects,
+ * but that's entirely ok. */
+ m_ModifiedObjects.swap(objects);
}
- OnTransactionClosing(m_ModifiedObjects);
- m_ModifiedObjects.clear();
+ recursive_mutex::scoped_lock lock(Application::GetMutex());
- m_CurrentTx = 0;
-}
+ BOOST_FOREACH(DynamicObject *object, objects) {
+ object->SendLocalUpdateEvents();
+ }
-void DynamicObject::FlushTx(void)
-{
- FinishTx();
- BeginTx();
+ OnTransactionClosing(m_CurrentTx, objects);
+ m_CurrentTx = Utility::GetTime();
}
void DynamicObject::OnInitCompleted(void)
void DynamicObject::OnAttributeChanged(const String&, const Value&)
{ }
+/*
+ * @threadsafety Always.
+ */
DynamicObject::Ptr DynamicObject::GetObject(const String& type, const String& name)
{
+ recursive_mutex::scoped_lock lock(Application::GetMutex());
+
DynamicType::Ptr dtype = DynamicType::GetByName(type);
return dtype->GetObject(name);
}
void ClearAttributesByType(DynamicAttributeType type);
- static boost::signal<void (const DynamicObject::Ptr&)> OnRegistered;
- static boost::signal<void (const DynamicObject::Ptr&)> OnUnregistered;
- static boost::signal<void (const set<DynamicObject *>&)> OnTransactionClosing;
+ static signals2::signal<void (const DynamicObject::Ptr&)> OnRegistered;
+ static signals2::signal<void (const DynamicObject::Ptr&)> OnUnregistered;
+ static signals2::signal<void (double, const set<DynamicObject *>&)> OnTransactionClosing;
ScriptTask::Ptr InvokeMethod(const String& method,
const vector<Value>& arguments, ScriptTask::CompletionCallback callback);
static void DeactivateObjects(void);
static double GetCurrentTx(void);
- static void BeginTx(void);
- static void FinishTx(void);
- static void FlushTx(void);
+ static void NewTx(void);
protected:
virtual void OnInitCompleted(void);
/* This has to be a set of raw pointers because the DynamicObject
* constructor has to be able to insert objects into this list. */
static set<DynamicObject *> m_ModifiedObjects;
+ static boost::mutex m_ModifiedObjectsMutex;
friend class DynamicType; /* for OnInitCompleted. */
};
using namespace icinga;
+boost::mutex DynamicType::m_Mutex;
+
DynamicType::DynamicType(const String& name, const DynamicType::ObjectFactory& factory)
: m_Name(name), m_ObjectFactory(factory)
{ }
+/**
+ * @threadsafety Always.
+ */
DynamicType::Ptr DynamicType::GetByName(const String& name)
{
+ boost::mutex::scoped_lock lock(m_Mutex);
+
DynamicType::TypeMap::const_iterator tt = GetTypes().find(name);
if (tt == GetTypes().end())
return tt->second;
}
+/**
+ * @threadsafety Caller must hold DynamicType::m_Mutex while using the map.
+ */
DynamicType::TypeMap& DynamicType::GetTypes(void)
{
static DynamicType::TypeMap types;
return types;
}
+/**
+ * @threadsafety Caller must hold DynamicType::m_Mutex while using the map.
+ */
DynamicType::NameMap& DynamicType::GetObjects(void)
{
return m_Objects;
return nt->second;
}
+/**
+ * @threadsafety Always.
+ */
void DynamicType::RegisterType(const DynamicType::Ptr& type)
{
- if (GetByName(type->GetName()))
+ boost::mutex::scoped_lock lock(m_Mutex);
+
+ DynamicType::TypeMap::const_iterator tt = GetTypes().find(type->GetName());
+
+ if (tt != GetTypes().end())
BOOST_THROW_EXCEPTION(runtime_error("Cannot register class for type '" +
type->GetName() + "': Objects of this type already exist."));
return obj;
}
+/**
+ * @threadsafety Always.
+ */
bool DynamicType::TypeExists(const String& name)
{
return (GetByName(name));
static void RegisterType(const DynamicType::Ptr& type);
static bool TypeExists(const String& name);
-
+
DynamicObject::Ptr CreateObject(const Dictionary::Ptr& serializedUpdate) const;
DynamicObject::Ptr GetObject(const String& name) const;
void RegisterObject(const DynamicObject::Ptr& object);
void UnregisterObject(const DynamicObject::Ptr& object);
- static TypeMap& GetTypes(void);
- NameMap& GetObjects(void);
+ /* TODO(thread) make private */ static TypeMap& GetTypes(void);
+ /* TODO(thread) make private */ NameMap& GetObjects(void);
void AddAttribute(const String& name, DynamicAttributeType type);
void RemoveAttribute(const String& name);
void AddAttributes(const AttributeDescription *attributes, int attributeCount);
private:
+ static boost::mutex m_Mutex;
String m_Name;
ObjectFactory m_ObjectFactory;
map<String, DynamicAttributeType> m_Attributes;
using namespace icinga;
+/**
+ * @threadsafety Always.
+ */
EventQueue::EventQueue(void)
: m_Stopped(false)
{ }
-boost::thread::id EventQueue::GetOwner(void) const
-{
- return m_Owner;
-}
-
-void EventQueue::SetOwner(boost::thread::id owner)
+/**
+ * @threadsafety Always.
+ */
+EventQueue::~EventQueue(void)
{
- m_Owner = owner;
+ Stop();
}
+/**
+ * @threadsafety Always.
+ */
void EventQueue::Stop(void)
{
boost::mutex::scoped_lock lock(m_Mutex);
m_Stopped = true;
- m_EventAvailable.notify_all();
+ m_CV.notify_all();
}
/**
- * Waits for events using the specified timeout value and processes
- * them.
+ * Spawns worker threads and waits for them to complete.
*
- * @param mtx The mutex that should be unlocked while waiting. Caller
- * must have this mutex locked.
- * @param timeout The wait timeout.
- * @returns false if the queue has been stopped, true otherwise.
+ * @threadsafety Always.
*/
-bool EventQueue::ProcessEvents(boost::mutex& mtx, millisec timeout)
+void EventQueue::Run(void)
{
- vector<Callback> events;
+ thread_group threads;
- mtx.unlock();
+ int cpus = thread::hardware_concurrency();
- {
- boost::mutex::scoped_lock lock(m_Mutex);
+ if (cpus == 0)
+ cpus = 4;
- while (m_Events.empty() && !m_Stopped) {
- if (!m_EventAvailable.timed_wait(lock, timeout)) {
- mtx.lock();
+ for (int i = 0; i < cpus * 4; i++)
+ threads.create_thread(boost::bind(&EventQueue::QueueThreadProc, this));
- return !m_Stopped;
- }
- }
+ threads.join_all();
+}
- events.swap(m_Events);
- }
+/**
+ * Waits for events and processes them.
+ *
+ * @threadsafety Always.
+ */
+void EventQueue::QueueThreadProc(void)
+{
+ while (!m_Stopped) {
+ vector<Callback> events;
- mtx.lock();
+ {
+ boost::mutex::scoped_lock lock(m_Mutex);
+
+ while (m_Events.empty() && !m_Stopped)
+ m_CV.wait(lock);
+
+ events.swap(m_Events);
+ }
- BOOST_FOREACH(const Callback& ev, events) {
- double st = Utility::GetTime();
+ BOOST_FOREACH(const Callback& ev, events) {
+ double st = Utility::GetTime();
- ev();
+ ev();
- double et = Utility::GetTime();
+ double et = Utility::GetTime();
- if (et - st > 1.0) {
- stringstream msgbuf;
- msgbuf << "Event call took " << et - st << " seconds.";
- Logger::Write(LogWarning, "base", msgbuf.str());
+ if (et - st > 1.0) {
+ stringstream msgbuf;
+ msgbuf << "Event call took " << et - st << " seconds.";
+ Logger::Write(LogWarning, "base", msgbuf.str());
+ }
}
}
-
- return !m_Stopped;
}
/**
- * Appends an event to the event queue. Events will be processed in FIFO
- * order on the main thread.
+ * Appends an event to the event queue. Events will be processed in FIFO order.
*
* @param callback The callback function for the event.
+ * @threadsafety Always.
*/
void EventQueue::Post(const EventQueue::Callback& callback)
{
- if (boost::this_thread::get_id() == m_Owner) {
- callback();
- return;
- }
-
- {
- boost::mutex::scoped_lock lock(m_Mutex);
- m_Events.push_back(callback);
- m_EventAvailable.notify_all();
- }
+ boost::mutex::scoped_lock lock(m_Mutex);
+ m_Events.push_back(callback);
+ m_CV.notify_all();
}
typedef function<void ()> Callback;
EventQueue(void);
+ ~EventQueue(void);
- bool ProcessEvents(boost::mutex& mtx, millisec timeout = boost::posix_time::milliseconds(30000));
+ void Run(void);
void Post(const Callback& callback);
void Stop(void);
- boost::thread::id GetOwner(void) const;
- void SetOwner(boost::thread::id owner);
-
- boost::mutex& GetMutex(void);
-
private:
boost::thread::id m_Owner;
boost::mutex m_Mutex;
+ condition_variable m_CV;
+
bool m_Stopped;
vector<Callback> m_Events;
- condition_variable m_EventAvailable;
+
+ void QueueThreadProc(void);
};
}
#include <boost/make_shared.hpp>
#include <boost/bind.hpp>
#include <boost/function.hpp>
-#include <boost/signal.hpp>
+#include <boost/signals2.hpp>
#include <boost/algorithm/string/trim.hpp>
#include <boost/algorithm/string/split.hpp>
#include <boost/algorithm/string/compare.hpp>
#include <boost/uuid/uuid_io.hpp>
#include <boost/program_options.hpp>
#include <boost/exception/diagnostic_information.hpp>
+#include <boost/multi_index_container.hpp>
+#include <boost/multi_index/ordered_index.hpp>
+#include <boost/multi_index/key_extractors.hpp>
using boost::shared_ptr;
using boost::weak_ptr;
using boost::function;
using boost::thread;
using boost::thread_group;
+using boost::recursive_mutex;
using boost::condition_variable;
using boost::system_time;
using boost::posix_time::millisec;
using boost::rethrow_exception;
using boost::current_exception;
using boost::diagnostic_information;
+using boost::multi_index_container;
+using boost::multi_index::indexed_by;
+using boost::multi_index::identity;
+using boost::multi_index::ordered_unique;
+using boost::multi_index::ordered_non_unique;
+using boost::multi_index::nth_index;
namespace tuples = boost::tuples;
+namespace signals2 = boost::signals2;
#if defined(__APPLE__) && defined(__MACH__)
-# pragma GCC diagnostic ignored "-Wdeprecated-declarations"
+# pragma GCC diagnostic ignored "-Wdeprecated-declarations"
#endif
#include <openssl/bio.h>
entry.Facility = facility;
entry.Message = message;
- Application::GetEQ().Post(boost::bind(&Logger::ForwardLogEntry, entry));
+ {
+ recursive_mutex::scoped_lock lock(Application::GetMutex());
+ ForwardLogEntry(entry);
+ }
}
/**
{
return m_Config->GetSelf();
}
-
* @returns true if a complete String was read from the IOQueue, false otherwise.
* @exception invalid_argument The input stream is invalid.
* @see https://github.com/PeterScott/netString-c/blob/master/netString.c
+ * @threadsafety Always.
*/
bool NetString::ReadStringFromStream(const Stream::Ptr& stream, String *str)
{
*
* @param stream The stream.
* @param str The String that is to be written.
+ * @threadsafety Always.
*/
void NetString::WriteStringToStream(const Stream::Ptr& stream, const String& str)
{
* Default constructor for the Object class.
*/
Object::Object(void)
-{
-#ifdef _DEBUG
- boost::mutex::scoped_lock lock(*GetMutex());
- GetAliveObjects()->insert(this);
-#endif /* _DEBUG */
-}
+{ }
/**
* Destructor for the Object class.
*/
Object::~Object(void)
-{
-#ifdef _DEBUG
- boost::mutex::scoped_lock lock(*GetMutex());
- GetAliveObjects()->erase(this);
-#endif /* _DEBUG */
-}
-
-/**
- * Temporarily holds onto a reference for an object. This can
- * be used to safely clear the last reference to an object
- * in an event handler.
- */
-void Object::Hold(void)
-{
- boost::mutex::scoped_lock lock(*GetMutex());
- GetHeldObjects().push_back(GetSelf());
-}
-
-/**
- * Clears all temporarily held objects.
- */
-void Object::ClearHeldObjects(void)
-{
- boost::mutex::scoped_lock lock(*GetMutex());
- GetHeldObjects().clear();
-}
+{ }
/**
* Returns a reference-counted pointer to this object.
return Object::SharedPtrHolder(shared_from_this());
}
-#ifdef _DEBUG
/**
- * Retrieves the number of currently alive objects.
+ * Returns the mutex that must be held while calling non-static methods
+ * which have not been explicitly marked as thread-safe.
*
- * @returns The number of alive objects.
+ * @returns The object's mutex.
+ * @threadsafety Always.
*/
-int Object::GetAliveObjectsCount(void)
+recursive_mutex& Object::GetMutex(void)
{
- boost::mutex::scoped_lock lock(*GetMutex());
- return GetAliveObjects()->size();
+ return m_Mutex;
}
-
-/**
- * Dumps a memory histogram to the "dictionaries.dump" file.
- */
-void Object::PrintMemoryProfile(void)
-{
- map<String, int> types;
-
- ofstream dictfp("dictionaries.dump.tmp");
-
- {
- boost::mutex::scoped_lock lock(*GetMutex());
- set<Object *>::iterator it;
- BOOST_FOREACH(Object *obj, *GetAliveObjects()) {
- pair<map<String, int>::iterator, bool> tt;
- tt = types.insert(make_pair(Utility::GetTypeName(typeid(*obj)), 1));
- if (!tt.second)
- tt.first->second++;
-
- if (typeid(*obj) == typeid(Dictionary)) {
- Dictionary::Ptr dict = obj->GetSelf();
- dictfp << Value(dict).Serialize() << std::endl;
- }
- }
- }
-
-#ifdef _WIN32
- _unlink("dictionaries.dump");
-#endif /* _WIN32 */
-
- dictfp.close();
- if (rename("dictionaries.dump.tmp", "dictionaries.dump") < 0)
- BOOST_THROW_EXCEPTION(PosixException("rename() failed", errno));
-
- String type;
- int count;
- BOOST_FOREACH(tie(type, count), types) {
- std::cerr << type << ": " << count << std::endl;
- }
-}
-
-/**
- * Returns currently active objects.
- *
- * @returns currently active objects
- */
-set<Object *> *Object::GetAliveObjects(void)
-{
- static set<Object *> *aliveObjects = new set<Object *>();
- return aliveObjects;
-}
-#endif /* _DEBUG */
-
-/**
- * Returns the mutex used for accessing static members.
- *
- * @returns a mutex
- */
-boost::mutex *Object::GetMutex(void)
-{
- static boost::mutex *mutex = new boost::mutex();
- return mutex;
-}
-
-/**
- * Returns currently held objects. The caller must be
- * holding the mutex returned by GetMutex().
- *
- * @returns currently held objects
- */
-vector<Object::Ptr>& Object::GetHeldObjects(void)
-{
- static vector<Object::Ptr> heldObjects;
- return heldObjects;
-}
-
-
*
* @ingroup base
*/
-class I2_BASE_API Object : public enable_shared_from_this<Object>, public boost::signals::trackable
+class I2_BASE_API Object : public enable_shared_from_this<Object>
{
public:
typedef shared_ptr<Object> Ptr;
typedef weak_ptr<Object> WeakPtr;
- void Hold(void);
- static void ClearHeldObjects(void);
-
/**
* Holds a shared pointer and provides support for implicit upcasts.
*
SharedPtrHolder GetSelf(void);
-#ifdef _DEBUG
- static int GetAliveObjectsCount(void);
- static void PrintMemoryProfile(void);
-#endif /* _DEBUG */
+ recursive_mutex& GetMutex(void);
protected:
Object(void);
Object(const Object& other);
Object& operator=(const Object& rhs);
- static boost::mutex *GetMutex(void);
- static set<Object *> *GetAliveObjects(void);
- static vector<Object::Ptr>& GetHeldObjects(void);
+ recursive_mutex m_Mutex;
+};
+
+/**
+ * A scoped lock for Objects.
+ */
+struct ObjectLock {
+public:
+ ObjectLock(const Object::Ptr& object)
+ : m_Lock(object->GetMutex())
+ { }
+
+ ObjectLock(Object *object)
+ : m_Lock(object->GetMutex())
+ { }
+
+private:
+ recursive_mutex::scoped_lock m_Lock;
};
/**
int Process::m_TaskFd;
extern char **environ;
-void Process::CreateWorkers(void)
+void Process::Initialize(void)
{
int fds[2];
using namespace icinga;
-void Process::CreateWorkers(void)
+void Process::Initialize(void)
{
// TODO: implement
}
using namespace icinga;
-bool Process::m_WorkersCreated = false;
+boost::once_flag Process::m_ThreadOnce;
boost::mutex Process::m_Mutex;
deque<Process::Ptr> Process::m_Tasks;
Process::Process(const vector<String>& arguments, const Dictionary::Ptr& extraEnvironment)
: AsyncTask<Process, ProcessResult>(), m_Arguments(arguments), m_ExtraEnvironment(extraEnvironment)
{
- assert(Application::IsMainThread());
-
- if (!m_WorkersCreated) {
- CreateWorkers();
-
- m_WorkersCreated = true;
- }
+ boost::call_once(&Process::Initialize, m_ThreadOnce);
#ifndef _WIN32
m_FD = -1;
static vector<String> SplitCommand(const Value& command);
private:
- static bool m_WorkersCreated;
-
vector<String> m_Arguments;
Dictionary::Ptr m_ExtraEnvironment;
static int m_TaskFd;
#endif /* _WIN32 */
- static void CreateWorkers(void);
static void NotifyWorker(void);
void SpawnTask(void);
void InitTask(void);
bool RunTask(void);
+
+ static boost::once_flag m_ThreadOnce;
+ static void Initialize(void);
};
}
: DynamicObject(properties)
{ }
-Script::~Script(void)
-{
- if (m_Interpreter)
- m_Interpreter->Stop();
-}
-
void Script::OnInitCompleted(void)
{
SpawnInterpreter();
{
Logger::Write(LogInformation, "base", "Reloading script '" + GetName() + "'");
- if (m_Interpreter)
- m_Interpreter->Stop();
-
ScriptLanguage::Ptr language = ScriptLanguage::GetByName(GetLanguage());
m_Interpreter = language->CreateInterpreter(GetSelf());
- m_Interpreter->Start();
}
typedef weak_ptr<Script> WeakPtr;
Script(const Dictionary::Ptr& properties);
- ~Script(void);
String GetLanguage(void) const;
String GetCode(void) const;
using namespace icinga;
-boost::signal<void (const String&, const ScriptFunction::Ptr&)> ScriptFunction::OnRegistered;
-boost::signal<void (const String&)> ScriptFunction::OnUnregistered;
+signals2::signal<void (const String&, const ScriptFunction::Ptr&)> ScriptFunction::OnRegistered;
+signals2::signal<void (const String&)> ScriptFunction::OnUnregistered;
ScriptFunction::ScriptFunction(const Callback& function)
: m_Callback(function)
void ScriptFunction::Register(const String& name, const ScriptFunction::Ptr& function)
{
GetFunctions()[name] = function;
- Application::GetEQ().Post(boost::bind(boost::ref(OnRegistered), name, function));
+ OnRegistered(name, function);
}
void ScriptFunction::Unregister(const String& name)
{
GetFunctions().erase(name);
- Application::GetEQ().Post(boost::bind(boost::ref(OnUnregistered), name));
+ OnUnregistered(name);
}
ScriptFunction::Ptr ScriptFunction::GetByName(const String& name)
void Invoke(const shared_ptr<ScriptTask>& task, const vector<Value>& arguments);
- static map<String, ScriptFunction::Ptr>& GetFunctions(void);
+ /* TODO(thread) make private */ static map<String, ScriptFunction::Ptr>& GetFunctions(void);
- static boost::signal<void (const String&, const ScriptFunction::Ptr&)> OnRegistered;
- static boost::signal<void (const String&)> OnUnregistered;
+ static signals2::signal<void (const String&, const ScriptFunction::Ptr&)> OnRegistered;
+ static signals2::signal<void (const String&)> OnUnregistered;
private:
Callback m_Callback;
ScriptInterpreter::~ScriptInterpreter(void)
{
- Stop();
-}
-
-void ScriptInterpreter::Start(void)
-{
- /* We can't start the thread in the constructor because
- * the worker thread might end up calling one of the virtual
- * methods before the object is fully constructed. */
-
- m_Thread = boost::thread(&ScriptInterpreter::ThreadWorkerProc, this);
-}
-
-void ScriptInterpreter::Stop(void)
-{
- assert(Application::IsMainThread());
-
- m_EQ.Stop();
-
BOOST_FOREACH(const String& function, m_SubscribedFunctions) {
ScriptFunction::Unregister(function);
}
-
- m_Thread.join();
-}
-
-void ScriptInterpreter::ThreadWorkerProc(void)
-{
- m_EQ.SetOwner(boost::this_thread::get_id());
-
- {
- boost::mutex::scoped_lock lock(m_Mutex);
-
- while (m_EQ.ProcessEvents(m_Mutex))
- ; /* empty loop */
- }
-}
-
-void ScriptInterpreter::ScriptFunctionThunk(const ScriptTask::Ptr& task,
- const String& function, const vector<Value>& arguments)
-{
- m_EQ.Post(boost::bind(&ScriptInterpreter::ProcessCall, this,
- task, function, arguments));
}
void ScriptInterpreter::SubscribeFunction(const String& name)
{
m_SubscribedFunctions.insert(name);
- ScriptFunction::Ptr sf = boost::make_shared<ScriptFunction>(boost::bind(&ScriptInterpreter::ScriptFunctionThunk, this, _1, name, _2));
+ ScriptFunction::Ptr sf = boost::make_shared<ScriptFunction>(boost::bind(&ScriptInterpreter::ProcessCall, this, _1, name, _2));
ScriptFunction::Register(name, sf);
}
~ScriptInterpreter(void);
- void Start(void);
- void Stop(void);
-
protected:
ScriptInterpreter(const Script::Ptr& script);
void UnsubscribeFunction(const String& name);
private:
- boost::mutex m_Mutex;
- EventQueue m_EQ;
set<String> m_SubscribedFunctions;
- boost::thread m_Thread;
-
- void ThreadWorkerProc(void);
-
- void ScriptFunctionThunk(const ScriptTask::Ptr& task, const String& function,
- const vector<Value>& arguments);
};
}
ScriptLanguage::ScriptLanguage(void)
{ }
+/**
+ * @threadsafety Always.
+ */
void ScriptLanguage::Register(const String& name, const ScriptLanguage::Ptr& language)
{
+ boost::mutex::scoped_lock lock(GetMutex());
+
GetLanguages()[name] = language;
}
+/**
+ * @threadsafety Always.
+ */
void ScriptLanguage::Unregister(const String& name)
{
+ boost::mutex::scoped_lock lock(GetMutex());
+
GetLanguages().erase(name);
}
+/**
+ * @threadsafety Always.
+ */
ScriptLanguage::Ptr ScriptLanguage::GetByName(const String& name)
{
+ boost::mutex::scoped_lock lock(GetMutex());
+
map<String, ScriptLanguage::Ptr>::iterator it;
it = GetLanguages().find(name);
return it->second;
}
+boost::mutex& ScriptLanguage::GetMutex(void)
+{
+ static boost::mutex mutex;
+ return mutex;
+}
+
map<String, ScriptLanguage::Ptr>& ScriptLanguage::GetLanguages(void)
{
static map<String, ScriptLanguage::Ptr> languages;
ScriptLanguage(void);
private:
+ static boost::mutex& GetMutex(void);
static map<String, ScriptLanguage::Ptr>& GetLanguages(void);
};
}
if (new_data)
- Application::GetEQ().Post(boost::bind(boost::ref(OnDataAvailable), GetSelf()));
+ OnDataAvailable(GetSelf());
}
void Socket::HandleWritableServer(void)
TcpSocket::Ptr client = boost::make_shared<TcpSocket>();
client->SetFD(fd);
- Application::GetEQ().Post(boost::bind(boost::ref(OnNewClient), GetSelf(), client));
+ OnNewClient(GetSelf(), client);
}
/**
void Listen(void);
- boost::signal<void (const Socket::Ptr&, const Socket::Ptr&)> OnNewClient;
+ signals2::signal<void (const Socket::Ptr&, const Socket::Ptr&)> OnNewClient;
protected:
Socket(void);
m_Connected = connected;
if (m_Connected)
- Application::GetEQ().Post(boost::bind(boost::ref(OnConnected), GetSelf()));
+ OnConnected(GetSelf());
else
- Application::GetEQ().Post(boost::bind(boost::ref(OnClosed), GetSelf()));
+ OnClosed(GetSelf());
}
/**
boost::exception_ptr GetException(void);
void CheckException(void);
- boost::signal<void (const Stream::Ptr&)> OnConnected;
- boost::signal<void (const Stream::Ptr&)> OnDataAvailable;
- boost::signal<void (const Stream::Ptr&)> OnClosed;
+ signals2::signal<void (const Stream::Ptr&)> OnConnected;
+ signals2::signal<void (const Stream::Ptr&)> OnDataAvailable;
+ signals2::signal<void (const Stream::Ptr&)> OnClosed;
protected:
void SetConnected(bool connected);
using namespace icinga;
-Timer::CollectionType Timer::m_Timers;
+Timer::TimerSet Timer::m_Timers;
+boost::mutex Timer::m_Mutex;
+boost::condition_variable Timer::m_CV;
+boost::once_flag Timer::m_ThreadOnce = BOOST_ONCE_INIT;
+
+/**
+ * Extracts the next timestamp from a Timer.
+ *
+ * @param wtimer Weak pointer to the timer.
+ * @returns The next timestamp
+ * @threadsafety Caller must hold Timer::m_Mutex.
+ */
+double TimerNextExtractor::operator()(const Timer::WeakPtr& wtimer)
+{
+ Timer::Ptr timer = wtimer.lock();
+
+ if (!timer)
+ return 0;
+
+ return timer->m_Next;
+}
/**
* Constructor for the Timer class.
+ *
+ * @threadsafety Always.
*/
Timer::Timer(void)
- : m_Interval(0)
+ : m_Interval(0), m_Next(0)
{ }
/**
- * Calls expired timers and returned when the next wake-up should happen.
+ * Initializes the timer sub-system.
*
- * @returns Time when the next timer is due.
+ * @threadsafety Always.
*/
-double Timer::ProcessTimers(void)
+void Timer::Initialize(void)
{
- double wakeup = 30; /* wake up at least once after this many seconds */
-
- double st = Utility::GetTime();
-
- Timer::CollectionType::iterator prev, i;
- for (i = m_Timers.begin(); i != m_Timers.end(); ) {
- Timer::Ptr timer = i->lock();
-
- prev = i;
- i++;
-
- if (!timer) {
- m_Timers.erase(prev);
- continue;
- }
-
- double now = Utility::GetTime();
-
- if (timer->m_Next <= now) {
- timer->Call();
-
- /* time may have changed depending on how long the
- * timer call took - we need to fetch the current time */
- now = Utility::GetTime();
-
- double next = now + timer->GetInterval();
-
- if (timer->m_Next <= now || next < timer->m_Next)
- timer->Reschedule(next);
- }
-
- assert(timer->m_Next > now);
-
- if (timer->m_Next - now < wakeup)
- wakeup = timer->m_Next - now;
- }
-
- assert(wakeup > 0);
-
- double et = Utility::GetTime();
-
- if (et - st > 0.01) {
- stringstream msgbuf;
- msgbuf << "Timers took " << et - st << " seconds";
- Logger::Write(LogDebug, "base", msgbuf.str());
- }
-
- return wakeup;
+ thread worker(boost::bind(&Timer::TimerThreadProc));
+ worker.detach();
}
/**
- * Calls this timer. Note: the timer delegate must not call
- * Disable() on any other timers than the timer that originally
- * invoked the delegate.
+ * Calls this timer.
+ *
+ * @threadsafety Always.
*/
void Timer::Call(void)
{
msgbuf << "Timer call took " << et - st << " seconds.";
Logger::Write(LogWarning, "base", msgbuf.str());
}
+
+ Reschedule();
}
/**
* Sets the interval for this timer.
*
* @param interval The new interval.
+ * @threadsafety Always.
*/
void Timer::SetInterval(double interval)
{
+ boost::mutex::scoped_lock lock(m_Mutex);
m_Interval = interval;
}
* Retrieves the interval for this timer.
*
* @returns The interval.
+ * @threadsafety Always.
*/
double Timer::GetInterval(void) const
{
+ boost::mutex::scoped_lock lock(m_Mutex);
return m_Interval;
}
/**
* Registers the timer and starts processing events for it.
+ *
+ * @threadsafety Always.
*/
void Timer::Start(void)
{
- assert(Application::IsMainThread());
-
- Stop();
-
- m_Timers.push_back(GetSelf());
+ boost::call_once(&Timer::Initialize, m_ThreadOnce);
- Reschedule(Utility::GetTime() + m_Interval);
+ Reschedule();
}
/**
* Unregisters the timer and stops processing events for it.
+ *
+ * @threadsafety Always.
*/
void Timer::Stop(void)
{
- assert(Application::IsMainThread());
+ boost::mutex::scoped_lock lock(m_Mutex);
+ m_Timers.erase(GetSelf());
- m_Timers.remove_if(WeakPtrEqual<Timer>(this));
+ /* Notify the worker thread that we've disabled a timer. */
+ m_CV.notify_all();
}
/**
* Reschedules this timer.
*
- * @param next The time when this timer should be called again.
+ * @param next The time when this timer should be called again. Use -1 to let
+ * the timer figure out a suitable time based on the interval.
+ * @threadsafety Always.
*/
void Timer::Reschedule(double next)
{
+ boost::mutex::scoped_lock lock(m_Mutex);
+
+ if (next < 0) {
+ double now = Utility::GetTime();
+ next = m_Next + m_Interval;
+
+ if (next < now)
+ next = now + m_Interval;
+ else
+ next = next;
+ }
+
m_Next = next;
+
+ /* Remove and re-add the timer to update the index. */
+ m_Timers.erase(GetSelf());
+ m_Timers.insert(GetSelf());
+
+ /* Notify the worker that we've rescheduled a timer. */
+ m_CV.notify_all();
+}
+
+/**
+ * Retrieves when the timer is next due.
+ *
+ * @returns The timestamp.
+ * @threadsafety Always.
+ */
+double Timer::GetNext(void) const
+{
+ boost::mutex::scoped_lock lock(m_Mutex);
+ return m_Next;
}
/**
* next scheduled timestamp.
*
* @param adjustment The adjustment.
+ * @threadsafety Always.
*/
void Timer::AdjustTimers(double adjustment)
{
+ boost::mutex::scoped_lock lock(m_Mutex);
+
double now = Utility::GetTime();
- Timer::CollectionType::iterator i;
- for (i = m_Timers.begin(); i != m_Timers.end(); i++) {
- Timer::Ptr timer = i->lock();
+ typedef nth_index<TimerSet, 1>::type TimerView;
+ TimerView& idx = boost::get<1>(m_Timers);
+
+ TimerView::iterator it;
+ for (it = idx.begin(); it != idx.end(); it++) {
+ Timer::Ptr timer = it->lock();
if (abs(now - (timer->m_Next + adjustment)) <
- abs(now - timer->m_Next))
+ abs(now - timer->m_Next)) {
timer->m_Next += adjustment;
+ m_Timers.erase(timer);
+ m_Timers.insert(timer);
+ }
+ }
+
+ /* Notify the worker that we've rescheduled some timers. */
+ m_CV.notify_all();
+}
+
+/**
+ * Worker thread proc for Timer objects.
+ *
+ * @threadsafety Always.
+ */
+void Timer::TimerThreadProc(void)
+{
+ for (;;) {
+ boost::mutex::scoped_lock lock(m_Mutex);
+
+ typedef nth_index<TimerSet, 1>::type NextTimerView;
+ NextTimerView& idx = boost::get<1>(m_Timers);
+
+ /* Wait until there is at least one timer. */
+ while (idx.empty())
+ m_CV.wait(lock);
+
+ NextTimerView::iterator it = idx.begin();
+ Timer::Ptr timer = it->lock();
+
+ if (!timer) {
+ /* Remove the timer from the list if it's not alive anymore. */
+ idx.erase(it);
+ continue;
+ }
+
+ double wait = timer->m_Next - Utility::GetTime();
+
+ if (wait > 0) {
+ /* Make sure the timer we just examined can be destroyed while we're waiting. */
+ timer.reset();
+
+ /* Wait for the next timer. */
+ m_CV.timed_wait(lock, boost::posix_time::milliseconds(wait * 1000));
+
+ continue;
+ }
+
+ /* Remove the timer from the list so it doesn't get called again
+ * until the current call is completed. */
+ m_Timers.erase(timer);
+
+ /* Asynchronously call the timer. */
+ Application::GetEQ().Post(boost::bind(&Timer::Call, timer));
}
}
namespace icinga {
+class Timer;
+
+/**
+ * @ingroup base
+ */
+struct TimerNextExtractor
+{
+ typedef double result_type;
+
+ double operator()(const weak_ptr<Timer>& wtimer);
+};
+
/**
* A timer that periodically triggers an event.
*
void SetInterval(double interval);
double GetInterval(void) const;
- static double ProcessTimers(void);
static void AdjustTimers(double adjustment);
void Start(void);
void Stop(void);
- void Reschedule(double next);
+ void Reschedule(double next = -1);
+ double GetNext(void) const;
- boost::signal<void(const Timer::Ptr&)> OnTimerExpired;
+ signals2::signal<void(const Timer::Ptr&)> OnTimerExpired;
private:
double m_Interval; /**< The interval of the timer. */
double m_Next; /**< When the next event should happen. */
- static Timer::CollectionType m_Timers;
+ typedef multi_index_container<
+ Timer::WeakPtr,
+ indexed_by<
+ ordered_unique<identity<Timer::WeakPtr> >,
+ ordered_non_unique<TimerNextExtractor>
+ >
+ > TimerSet;
+
+ static boost::mutex m_Mutex;
+ static boost::condition_variable m_CV;
+ static TimerSet m_Timers;
void Call(void);
+
+ static boost::once_flag m_ThreadOnce;
+ static void Initialize(void);
+
+ static void TimerThreadProc(void);
+
+ friend struct TimerNextExtractor;
};
}
*/
void Utility::Sleep(double timeout)
{
- if (Application::IsMainThread())
- Application::GetMutex().unlock();
-
#ifndef _WIN32
usleep(timeout * 1000 * 1000);
#else /* _WIN32 */
::Sleep(timeout * 1000);
#endif /* _WIN32 */
-
- if (Application::IsMainThread())
- Application::GetMutex().lock();
}
/**
#endif /* _WIN32 */
}
-/**
- * Waits until the given predicate is true. Executes events while waiting.
- *
- * @param predicate The predicate.
- */
-void Utility::WaitUntil(const function<bool (void)>& predicate)
-{
- while (!predicate())
- Application::ProcessEvents();
-}
-
#ifndef _WIN32
void Utility::SetNonBlocking(int fd)
{
static bool Glob(const String& pathSpec, const function<void (const String&)>& callback);
- static void WaitUntil(const function<bool (void)>& predicate);
-
static
#ifdef _WIN32
HMODULE
using namespace icinga;
+boost::mutex ConfigItem::m_Mutex;
ConfigItem::ItemMap ConfigItem::m_Items;
-boost::signal<void (const ConfigItem::Ptr&)> ConfigItem::OnCommitted;
-boost::signal<void (const ConfigItem::Ptr&)> ConfigItem::OnRemoved;
+signals2::signal<void (const ConfigItem::Ptr&)> ConfigItem::OnCommitted;
+signals2::signal<void (const ConfigItem::Ptr&)> ConfigItem::OnRemoved;
/**
* Constructor for the ConfigItem class.
* @param type The type of the ConfigItem that is to be looked up.
* @param name The name of the ConfigItem that is to be looked up.
* @returns The configuration item.
+ * @threadsafety Always.
*/
ConfigItem::Ptr ConfigItem::GetObject(const String& type, const String& name)
{
- ConfigItem::ItemMap::iterator it;
+ {
+ recursive_mutex::scoped_lock lockg(Application::GetMutex());
- ConfigCompilerContext *context = ConfigCompilerContext::GetContext();
+ ConfigCompilerContext *context = ConfigCompilerContext::GetContext();
- if (context) {
- ConfigItem::Ptr item = context->GetItem(type, name);
+ if (context) {
+ ConfigItem::Ptr item = context->GetItem(type, name);
- if (item)
- return item;
+ if (item)
+ return item;
- /* ignore already active objects while we're in the compiler
- * context and linking to existing items is disabled. */
- if ((context->GetFlags() & CompilerLinkExisting) == 0)
- return ConfigItem::Ptr();
+ /* ignore already active objects while we're in the compiler
+ * context and linking to existing items is disabled. */
+ if ((context->GetFlags() & CompilerLinkExisting) == 0)
+ return ConfigItem::Ptr();
+ }
}
- it = m_Items.find(make_pair(type, name));
+ {
+ boost::mutex::scoped_lock lock(m_Mutex);
- if (it != m_Items.end())
- return it->second;
+ ConfigItem::ItemMap::iterator it;
+
+ it = m_Items.find(make_pair(type, name));
+
+ if (it != m_Items.end())
+ return it->second;
+ }
return ConfigItem::Ptr();
}
fp << "}" << "\n";
}
+/**
+ * @threadsafety Caller must hold the global mutex.
+ */
void ConfigItem::UnloadUnit(const String& unit)
{
+ boost::mutex::scoped_lock lock(m_Mutex);
+
Logger::Write(LogInformation, "config", "Unloading config items from compilation unit '" + unit + "'");
vector<ConfigItem::Ptr> obsoleteItems;
static void UnloadUnit(const String& unit);
- static boost::signal<void (const ConfigItem::Ptr&)> OnCommitted;
- static boost::signal<void (const ConfigItem::Ptr&)> OnRemoved;
+ static signals2::signal<void (const ConfigItem::Ptr&)> OnCommitted;
+ static signals2::signal<void (const ConfigItem::Ptr&)> OnRemoved;
private:
void InternalLink(const Dictionary::Ptr& dictionary) const;
set<ConfigItem::WeakPtr> m_ChildObjects; /**< Instantiated items
* that inherit from this item */
+ static boost::mutex m_Mutex;
+
typedef map<pair<String, String>, ConfigItem::Ptr> ItemMap;
static ItemMap m_Items; /**< All registered configuration items. */
};
ValidateDictionary(attrs, ruleLists, locations);
}
+/**
+ * @threadsafety Always.
+ */
String ConfigType::LocationToString(const vector<String>& locations)
{
bool first = true;
return stack;
}
+/**
+ * @threadsafety Always.
+ */
void ConfigType::ValidateDictionary(const Dictionary::Ptr& dictionary,
const vector<TypeRuleList::Ptr>& ruleLists, vector<String>& locations)
{
locations.pop_back();
}
}
-
BOOST_THROW_EXCEPTION(runtime_error("Encountered unknown type while dumping value."));
}
+/**
+ * @threadsafety Always.
+ */
void Expression::Dump(ostream& fp, int indent) const
{
if (m_Operator == OperatorExecute) {
REGISTER_SCRIPTFUNCTION("GetAnswerToEverything", &API::GetAnswerToEverything);
+/**
+ * @threadsafety Always.
+ */
void API::GetAnswerToEverything(const ScriptTask::Ptr& task, const vector<Value>& arguments)
{
if (arguments.size() < 1)
using namespace icinga;
+boost::mutex CIB::m_Mutex;
RingBuffer CIB::m_ActiveChecksStatistics(15 * 60);
RingBuffer CIB::m_PassiveChecksStatistics(15 * 60);
+/**
+ * @threadsafety Always.
+ */
void CIB::UpdateActiveChecksStatistics(long tv, int num)
{
+ boost::mutex::scoped_lock lock(m_Mutex);
m_ActiveChecksStatistics.InsertValue(tv, num);
}
+/**
+ * @threadsafety Always.
+ */
int CIB::GetActiveChecksStatistics(long timespan)
{
+ boost::mutex::scoped_lock lock(m_Mutex);
return m_ActiveChecksStatistics.GetValues(timespan);
}
+/**
+ * @threadsafety Always.
+ */
void CIB::UpdatePassiveChecksStatistics(long tv, int num)
{
+ boost::mutex::scoped_lock lock(m_Mutex);
m_PassiveChecksStatistics.InsertValue(tv, num);
}
+/**
+ * @threadsafety Always.
+ */
int CIB::GetPassiveChecksStatistics(long timespan)
{
+ boost::mutex::scoped_lock lock(m_Mutex);
return m_PassiveChecksStatistics.GetValues(timespan);
}
static int GetPassiveChecksStatistics(long timespan);
private:
+ static boost::mutex m_Mutex;
static RingBuffer m_ActiveChecksStatistics;
static RingBuffer m_PassiveChecksStatistics;
};
using namespace icinga;
-bool I2_EXPORT ExternalCommandProcessor::m_Initialized;
-map<String, ExternalCommandProcessor::Callback> I2_EXPORT ExternalCommandProcessor::m_Commands;
+boost::once_flag ExternalCommandProcessor::m_InitializeOnce;
+boost::mutex ExternalCommandProcessor::m_Mutex;
+map<String, ExternalCommandProcessor::Callback> ExternalCommandProcessor::m_Commands;
+/**
+ * @threadsafety Always.
+ */
void ExternalCommandProcessor::Execute(const String& line)
{
if (line.IsEmpty())
Execute(ts, argv[0], argvExtra);
}
+/**
+ * @threadsafety Always.
+ */
void ExternalCommandProcessor::Execute(double time, const String& command, const vector<String>& arguments)
{
- if (!m_Initialized) {
- RegisterCommand("PROCESS_SERVICE_CHECK_RESULT", &ExternalCommandProcessor::ProcessServiceCheckResult);
- RegisterCommand("SCHEDULE_SVC_CHECK", &ExternalCommandProcessor::ScheduleSvcCheck);
- RegisterCommand("SCHEDULE_FORCED_SVC_CHECK", &ExternalCommandProcessor::ScheduleForcedSvcCheck);
- RegisterCommand("ENABLE_SVC_CHECK", &ExternalCommandProcessor::EnableSvcCheck);
- RegisterCommand("DISABLE_SVC_CHECK", &ExternalCommandProcessor::DisableSvcCheck);
- RegisterCommand("SHUTDOWN_PROCESS", &ExternalCommandProcessor::ShutdownProcess);
- RegisterCommand("SCHEDULE_FORCED_HOST_SVC_CHECKS", &ExternalCommandProcessor::ScheduleForcedHostSvcChecks);
- RegisterCommand("SCHEDULE_HOST_SVC_CHECKS", &ExternalCommandProcessor::ScheduleHostSvcChecks);
- RegisterCommand("ENABLE_HOST_SVC_CHECKS", &ExternalCommandProcessor::EnableHostSvcChecks);
- RegisterCommand("DISABLE_HOST_SVC_CHECKS", &ExternalCommandProcessor::DisableHostSvcChecks);
- RegisterCommand("ACKNOWLEDGE_SVC_PROBLEM", &ExternalCommandProcessor::AcknowledgeSvcProblem);
- RegisterCommand("ACKNOWLEDGE_SVC_PROBLEM_EXPIRE", &ExternalCommandProcessor::AcknowledgeSvcProblemExpire);
- RegisterCommand("REMOVE_SVC_ACKNOWLEDGEMENT", &ExternalCommandProcessor::RemoveHostAcknowledgement);
- RegisterCommand("ACKNOWLEDGE_HOST_PROBLEM", &ExternalCommandProcessor::AcknowledgeHostProblem);
- RegisterCommand("ACKNOWLEDGE_HOST_PROBLEM_EXPIRE", &ExternalCommandProcessor::AcknowledgeHostProblemExpire);
- RegisterCommand("REMOVE_HOST_ACKNOWLEDGEMENT", &ExternalCommandProcessor::RemoveHostAcknowledgement);
- RegisterCommand("ENABLE_HOSTGROUP_SVC_CHECKS", &ExternalCommandProcessor::EnableHostgroupSvcChecks);
- RegisterCommand("DISABLE_HOSTGROUP_SVC_CHECKS", &ExternalCommandProcessor::DisableHostgroupSvcChecks);
- RegisterCommand("ENABLE_SERVICEGROUP_SVC_CHECKS", &ExternalCommandProcessor::EnableServicegroupSvcChecks);
- RegisterCommand("DISABLE_SERVICEGROUP_SVC_CHECKS", &ExternalCommandProcessor::DisableServicegroupSvcChecks);
- RegisterCommand("ENABLE_PASSIVE_SVC_CHECKS", &ExternalCommandProcessor::EnablePassiveSvcChecks);
- RegisterCommand("DISABLE_PASSIVE_SVC_CHECKS", &ExternalCommandProcessor::DisablePassiveSvcChecks);
- RegisterCommand("ENABLE_SERVICEGROUP_PASSIVE_SVC_CHECKS", &ExternalCommandProcessor::EnableServicegroupPassiveSvcChecks);
- RegisterCommand("DISABLE_SERVICEGROUP_PASSIVE_SVC_CHECKS", &ExternalCommandProcessor::DisableServicegroupPassiveSvcChecks);
- RegisterCommand("ENABLE_HOSTGROUP_PASSIVE_SVC_CHECKS", &ExternalCommandProcessor::EnableHostgroupPassiveSvcChecks);
- RegisterCommand("DISABLE_HOSTGROUP_PASSIVE_SVC_CHECKS", &ExternalCommandProcessor::DisableHostgroupPassiveSvcChecks);
- RegisterCommand("PROCESS_FILE", &ExternalCommandProcessor::ProcessFile);
- RegisterCommand("SCHEDULE_SVC_DOWNTIME", &ExternalCommandProcessor::ScheduleSvcDowntime);
- RegisterCommand("DEL_SVC_DOWNTIME", &ExternalCommandProcessor::DelSvcDowntime);
- RegisterCommand("SCHEDULE_HOST_DOWNTIME", &ExternalCommandProcessor::ScheduleHostDowntime);
- RegisterCommand("DEL_HOST_DOWNTIME", &ExternalCommandProcessor::DelHostDowntime);
- RegisterCommand("SCHEDULE_HOST_SVC_DOWNTIME", &ExternalCommandProcessor::ScheduleHostSvcDowntime);
- RegisterCommand("SCHEDULE_HOSTGROUP_HOST_DOWNTIME", &ExternalCommandProcessor::ScheduleHostgroupHostDowntime);
- RegisterCommand("SCHEDULE_HOSTGROUP_SVC_DOWNTIME", &ExternalCommandProcessor::ScheduleHostgroupSvcDowntime);
- RegisterCommand("SCHEDULE_SERVICEGROUP_HOST_DOWNTIME", &ExternalCommandProcessor::ScheduleServicegroupHostDowntime);
- RegisterCommand("SCHEDULE_SERVICEGROUP_SVC_DOWNTIME", &ExternalCommandProcessor::ScheduleServicegroupSvcDowntime);
- RegisterCommand("ADD_HOST_COMMENT", &ExternalCommandProcessor::AddHostComment);
- RegisterCommand("DEL_HOST_COMMENT", &ExternalCommandProcessor::DelHostComment);
- RegisterCommand("ADD_SVC_COMMENT", &ExternalCommandProcessor::AddSvcComment);
- RegisterCommand("DEL_SVC_COMMENT", &ExternalCommandProcessor::DelSvcComment);
- RegisterCommand("DEL_ALL_HOST_COMMENTS", &ExternalCommandProcessor::DelAllHostComments);
- RegisterCommand("DEL_ALL_SVC_COMMENTS", &ExternalCommandProcessor::DelAllSvcComments);
- RegisterCommand("SEND_CUSTOM_HOST_NOTIFICATION", &ExternalCommandProcessor::SendCustomHostNotification);
- RegisterCommand("SEND_CUSTOM_SVC_NOTIFICATION", &ExternalCommandProcessor::SendCustomSvcNotification);
-
- m_Initialized = true;
- }
+ boost::call_once(m_InitializeOnce, &ExternalCommandProcessor::Initialize);
+
+ Callback callback;
+
+ {
+ boost::mutex::scoped_lock lock(m_Mutex);
+
+ map<String, ExternalCommandProcessor::Callback>::iterator it;
+ it = m_Commands.find(command);
- map<String, ExternalCommandProcessor::Callback>::iterator it;
- it = m_Commands.find(command);
+ if (it == m_Commands.end())
+ BOOST_THROW_EXCEPTION(invalid_argument("The external command '" + command + "' does not exist."));
- if (it == m_Commands.end())
- BOOST_THROW_EXCEPTION(invalid_argument("The external command '" + command + "' does not exist."));
+ callback = it->second;
+ }
+
+ {
+ recursive_mutex::scoped_lock lock(Application::GetMutex());
+ callback(time, arguments);
+ }
- it->second(time, arguments);
}
+/**
+ * @threadsafety Always.
+ */
+void ExternalCommandProcessor::Initialize(void)
+{
+ RegisterCommand("PROCESS_SERVICE_CHECK_RESULT", &ExternalCommandProcessor::ProcessServiceCheckResult);
+ RegisterCommand("SCHEDULE_SVC_CHECK", &ExternalCommandProcessor::ScheduleSvcCheck);
+ RegisterCommand("SCHEDULE_FORCED_SVC_CHECK", &ExternalCommandProcessor::ScheduleForcedSvcCheck);
+ RegisterCommand("ENABLE_SVC_CHECK", &ExternalCommandProcessor::EnableSvcCheck);
+ RegisterCommand("DISABLE_SVC_CHECK", &ExternalCommandProcessor::DisableSvcCheck);
+ RegisterCommand("SHUTDOWN_PROCESS", &ExternalCommandProcessor::ShutdownProcess);
+ RegisterCommand("SCHEDULE_FORCED_HOST_SVC_CHECKS", &ExternalCommandProcessor::ScheduleForcedHostSvcChecks);
+ RegisterCommand("SCHEDULE_HOST_SVC_CHECKS", &ExternalCommandProcessor::ScheduleHostSvcChecks);
+ RegisterCommand("ENABLE_HOST_SVC_CHECKS", &ExternalCommandProcessor::EnableHostSvcChecks);
+ RegisterCommand("DISABLE_HOST_SVC_CHECKS", &ExternalCommandProcessor::DisableHostSvcChecks);
+ RegisterCommand("ACKNOWLEDGE_SVC_PROBLEM", &ExternalCommandProcessor::AcknowledgeSvcProblem);
+ RegisterCommand("ACKNOWLEDGE_SVC_PROBLEM_EXPIRE", &ExternalCommandProcessor::AcknowledgeSvcProblemExpire);
+ RegisterCommand("REMOVE_SVC_ACKNOWLEDGEMENT", &ExternalCommandProcessor::RemoveHostAcknowledgement);
+ RegisterCommand("ACKNOWLEDGE_HOST_PROBLEM", &ExternalCommandProcessor::AcknowledgeHostProblem);
+ RegisterCommand("ACKNOWLEDGE_HOST_PROBLEM_EXPIRE", &ExternalCommandProcessor::AcknowledgeHostProblemExpire);
+ RegisterCommand("REMOVE_HOST_ACKNOWLEDGEMENT", &ExternalCommandProcessor::RemoveHostAcknowledgement);
+ RegisterCommand("ENABLE_HOSTGROUP_SVC_CHECKS", &ExternalCommandProcessor::EnableHostgroupSvcChecks);
+ RegisterCommand("DISABLE_HOSTGROUP_SVC_CHECKS", &ExternalCommandProcessor::DisableHostgroupSvcChecks);
+ RegisterCommand("ENABLE_SERVICEGROUP_SVC_CHECKS", &ExternalCommandProcessor::EnableServicegroupSvcChecks);
+ RegisterCommand("DISABLE_SERVICEGROUP_SVC_CHECKS", &ExternalCommandProcessor::DisableServicegroupSvcChecks);
+ RegisterCommand("ENABLE_PASSIVE_SVC_CHECKS", &ExternalCommandProcessor::EnablePassiveSvcChecks);
+ RegisterCommand("DISABLE_PASSIVE_SVC_CHECKS", &ExternalCommandProcessor::DisablePassiveSvcChecks);
+ RegisterCommand("ENABLE_SERVICEGROUP_PASSIVE_SVC_CHECKS", &ExternalCommandProcessor::EnableServicegroupPassiveSvcChecks);
+ RegisterCommand("DISABLE_SERVICEGROUP_PASSIVE_SVC_CHECKS", &ExternalCommandProcessor::DisableServicegroupPassiveSvcChecks);
+ RegisterCommand("ENABLE_HOSTGROUP_PASSIVE_SVC_CHECKS", &ExternalCommandProcessor::EnableHostgroupPassiveSvcChecks);
+ RegisterCommand("DISABLE_HOSTGROUP_PASSIVE_SVC_CHECKS", &ExternalCommandProcessor::DisableHostgroupPassiveSvcChecks);
+ RegisterCommand("PROCESS_FILE", &ExternalCommandProcessor::ProcessFile);
+ RegisterCommand("SCHEDULE_SVC_DOWNTIME", &ExternalCommandProcessor::ScheduleSvcDowntime);
+ RegisterCommand("DEL_SVC_DOWNTIME", &ExternalCommandProcessor::DelSvcDowntime);
+ RegisterCommand("SCHEDULE_HOST_DOWNTIME", &ExternalCommandProcessor::ScheduleHostDowntime);
+ RegisterCommand("DEL_HOST_DOWNTIME", &ExternalCommandProcessor::DelHostDowntime);
+ RegisterCommand("SCHEDULE_HOST_SVC_DOWNTIME", &ExternalCommandProcessor::ScheduleHostSvcDowntime);
+ RegisterCommand("SCHEDULE_HOSTGROUP_HOST_DOWNTIME", &ExternalCommandProcessor::ScheduleHostgroupHostDowntime);
+ RegisterCommand("SCHEDULE_HOSTGROUP_SVC_DOWNTIME", &ExternalCommandProcessor::ScheduleHostgroupSvcDowntime);
+ RegisterCommand("SCHEDULE_SERVICEGROUP_HOST_DOWNTIME", &ExternalCommandProcessor::ScheduleServicegroupHostDowntime);
+ RegisterCommand("SCHEDULE_SERVICEGROUP_SVC_DOWNTIME", &ExternalCommandProcessor::ScheduleServicegroupSvcDowntime);
+ RegisterCommand("ADD_HOST_COMMENT", &ExternalCommandProcessor::AddHostComment);
+ RegisterCommand("DEL_HOST_COMMENT", &ExternalCommandProcessor::DelHostComment);
+ RegisterCommand("ADD_SVC_COMMENT", &ExternalCommandProcessor::AddSvcComment);
+ RegisterCommand("DEL_SVC_COMMENT", &ExternalCommandProcessor::DelSvcComment);
+ RegisterCommand("DEL_ALL_HOST_COMMENTS", &ExternalCommandProcessor::DelAllHostComments);
+ RegisterCommand("DEL_ALL_SVC_COMMENTS", &ExternalCommandProcessor::DelAllSvcComments);
+ RegisterCommand("SEND_CUSTOM_HOST_NOTIFICATION", &ExternalCommandProcessor::SendCustomHostNotification);
+ RegisterCommand("SEND_CUSTOM_SVC_NOTIFICATION", &ExternalCommandProcessor::SendCustomSvcNotification);
+}
+
+/**
+ * @threadsafety Always.
+ */
void ExternalCommandProcessor::RegisterCommand(const String& command, const ExternalCommandProcessor::Callback& callback)
{
+ boost::mutex::scoped_lock lock(m_Mutex);
m_Commands[command] = callback;
}
static void Execute(const String& line);
static void Execute(double time, const String& command, const vector<String>& arguments);
+private:
+ typedef function<void (double time, const vector<String>& arguments)> Callback;
+
+ static boost::once_flag m_InitializeOnce;
+ static boost::mutex m_Mutex;
+ static map<String, Callback> m_Commands;
+
+ ExternalCommandProcessor(void);
+
+ static void Initialize(void);
+
+ static void RegisterCommand(const String& command, const Callback& callback);
+
static void ProcessServiceCheckResult(double time, const vector<String>& arguments);
static void ScheduleSvcCheck(double time, const vector<String>& arguments);
static void ScheduleForcedSvcCheck(double time, const vector<String>& arguments);
static void DelAllSvcComments(double time, const vector<String>& arguments);
static void SendCustomHostNotification(double time, const vector<String>& arguments);
static void SendCustomSvcNotification(double time, const vector<String>& arguments);
-
-private:
- typedef function<void (double time, const vector<String>& arguments)> Callback;
-
- static bool m_Initialized;
- static map<String, Callback> m_Commands;
-
- ExternalCommandProcessor(void);
-
- static void RegisterCommand(const String& command, const Callback& callback);
};
}
return GetName();
}
+/**
+ * @threadsafety Always.
+ */
bool Host::Exists(const String& name)
{
return (DynamicObject::GetObject("Host", name));
}
+/**
+ * @threadsafety Always.
+ */
Host::Ptr Host::GetByName(const String& name)
{
DynamicObject::Ptr configObject = DynamicObject::GetObject("Host", name);
return Get("action_url");
}
+/**
+ * @threadsafety Always.
+ */
bool HostGroup::Exists(const String& name)
{
return (DynamicObject::GetObject("HostGroup", name));
}
+/**
+ * @threadsafety Always.
+ */
HostGroup::Ptr HostGroup::GetByName(const String& name)
{
DynamicObject::Ptr configObject = DynamicObject::GetObject("HostGroup", name);
if (cmd.IsScalar()) {
result = InternalResolveMacros(cmd, macros);
- } else {
+ } else if (cmd.IsObjectType<Dictionary>()) {
Dictionary::Ptr resultDict = boost::make_shared<Dictionary>();
Dictionary::Ptr dict = cmd;
}
result = resultDict;
+ } else {
+ BOOST_THROW_EXCEPTION(invalid_argument("Command is not a string or dictionary."));
}
return result;
: m_Task(task), m_Process(process)
{ }
+/**
+ * @threadsafety Always.
+ */
void PluginCheckTask::ScriptFunc(const ScriptTask::Ptr& task, const vector<Value>& arguments)
{
- assert(Application::IsMainThread());
+ recursive_mutex::scoped_lock lock(Application::GetMutex());
if (arguments.size() < 1)
BOOST_THROW_EXCEPTION(invalid_argument("Missing argument: Service must be specified."));
process->Start(boost::bind(&PluginCheckTask::ProcessFinishedHandler, ct));
}
+/**
+ * @threadsafety Always.
+ */
void PluginCheckTask::ProcessFinishedHandler(PluginCheckTask ct)
{
+ recursive_mutex::scoped_lock lock(Application::GetMutex());
+
ProcessResult pr;
try {
: m_Task(task), m_Process(process), m_ServiceName(service), m_Command(command)
{ }
+/**
+ * @threadsafety Always.
+ */
void PluginNotificationTask::ScriptFunc(const ScriptTask::Ptr& task, const vector<Value>& arguments)
{
- assert(Application::IsMainThread());
+ recursive_mutex::scoped_lock lock(Application::GetMutex());
if (arguments.size() < 1)
BOOST_THROW_EXCEPTION(invalid_argument("Missing argument: Notification target must be specified."));
process->Start(boost::bind(&PluginNotificationTask::ProcessFinishedHandler, ct));
}
+/**
+ * @threadsafety Always.
+ */
void PluginNotificationTask::ProcessFinishedHandler(PluginNotificationTask ct)
{
+ recursive_mutex::scoped_lock lock(Application::GetMutex());
+
ProcessResult pr;
try {
const int Service::DefaultCheckInterval = 5 * 60;
const int Service::CheckIntervalDivisor = 5;
-boost::signal<void (const Service::Ptr&, const String&)> Service::OnCheckerChanged;
-boost::signal<void (const Service::Ptr&, const Value&)> Service::OnNextCheckChanged;
+signals2::signal<void (const Service::Ptr&, const String&)> Service::OnCheckerChanged;
+signals2::signal<void (const Service::Ptr&, const Value&)> Service::OnNextCheckChanged;
Value Service::GetCheckCommand(void) const
{
/* Make sure the notification component sees the updated
* state/state_type attributes. */
- DynamicObject::FlushTx();
+ DynamicObject::NewTx();
if (IsReachable() && !IsInDowntime() && !IsAcknowledged())
RequestNotifications(NotificationStateChange);
void Service::CheckCompletedHandler(const Dictionary::Ptr& scheduleInfo,
const ScriptTask::Ptr& task, const function<void (void)>& callback)
{
+ ObjectLock olock(this);
+
Set("current_task", Empty);
scheduleInfo->Set("execution_end", Utility::GetTime());
/* flush the current transaction so other instances see the service's
* new state when they receive the CheckResult message */
- DynamicObject::FlushTx();
+ DynamicObject::NewTx();
RequestMessage rm;
rm.SetMethod("checker::CheckResult");
void Service::CommentsExpireTimerHandler(void)
{
+ recursive_mutex::scoped_lock lock(Application::GetMutex());
+
DynamicObject::Ptr object;
BOOST_FOREACH(tie(tuples::ignore, object), DynamicType::GetByName("Service")->GetObjects()) {
Service::Ptr service = dynamic_pointer_cast<Service>(object);
void Service::DowntimesExpireTimerHandler(void)
{
+ recursive_mutex::scoped_lock lock(Application::GetMutex());
+
DynamicObject::Ptr object;
BOOST_FOREACH(tie(tuples::ignore, object), DynamicType::GetByName("Service")->GetObjects()) {
Service::Ptr service = dynamic_pointer_cast<Service>(object);
return GetName();
}
+/**
+ * @threadsafety Always.
+ */
bool Service::Exists(const String& name)
{
return (DynamicObject::GetObject("Service", name));
}
+/**
+ * @threadsafety Always.
+ */
Service::Ptr Service::GetByName(const String& name)
{
DynamicObject::Ptr configObject = DynamicObject::GetObject("Service", name);
return dynamic_pointer_cast<Service>(configObject);
}
+/**
+ * @threadsafety Always.
+ */
Service::Ptr Service::GetByNamePair(const String& hostName, const String& serviceName)
{
if (!hostName.IsEmpty()) {
+ recursive_mutex::scoped_lock lock(Application::GetMutex());
+
Host::Ptr host = Host::GetByName(hostName);
return host->GetServiceByShortName(serviceName);
} else {
static ServiceStateType StateTypeFromString(const String& state);
static String StateTypeToString(ServiceStateType state);
- static boost::signal<void (const Service::Ptr&, const String&)> OnCheckerChanged;
- static boost::signal<void (const Service::Ptr&, const Value&)> OnNextCheckChanged;
+ static signals2::signal<void (const Service::Ptr&, const String&)> OnCheckerChanged;
+ static signals2::signal<void (const Service::Ptr&, const Value&)> OnNextCheckChanged;
/* Downtimes */
static int GetNextDowntimeID(void);
return Get("action_url");
}
+/**
+ * @threadsafety Always.
+ */
bool ServiceGroup::Exists(const String& name)
{
return (DynamicObject::GetObject("ServiceGroup", name));
}
+/**
+ * @threadsafety Always.
+ */
ServiceGroup::Ptr ServiceGroup::GetByName(const String& name)
{
DynamicObject::Ptr configObject = DynamicObject::GetObject("ServiceGroup", name);
}
{
- boost::mutex::scoped_lock lock(Application::GetMutex());
+ recursive_mutex::scoped_lock lock(Application::GetMutex());
interp->RegisterPythonFunction(name, object);
}
REGISTER_TYPE(Endpoint, endpointAttributes);
-boost::signal<void (const Endpoint::Ptr&)> Endpoint::OnConnected;
-boost::signal<void (const Endpoint::Ptr&)> Endpoint::OnDisconnected;
-boost::signal<void (const Endpoint::Ptr&, const String& topic)> Endpoint::OnSubscriptionRegistered;
-boost::signal<void (const Endpoint::Ptr&, const String& topic)> Endpoint::OnSubscriptionUnregistered;
+signals2::signal<void (const Endpoint::Ptr&)> Endpoint::OnConnected;
+signals2::signal<void (const Endpoint::Ptr&)> Endpoint::OnDisconnected;
+signals2::signal<void (const Endpoint::Ptr&, const String& topic)> Endpoint::OnSubscriptionRegistered;
+signals2::signal<void (const Endpoint::Ptr&, const String& topic)> Endpoint::OnSubscriptionUnregistered;
/**
* Constructor for the Endpoint class.
void Endpoint::RegisterTopicHandler(const String& topic, const function<Endpoint::Callback>& callback)
{
- map<String, shared_ptr<boost::signal<Endpoint::Callback> > >::iterator it;
+ map<String, shared_ptr<signals2::signal<Endpoint::Callback> > >::iterator it;
it = m_TopicHandlers.find(topic);
- shared_ptr<boost::signal<Endpoint::Callback> > sig;
+ shared_ptr<signals2::signal<Endpoint::Callback> > sig;
if (it == m_TopicHandlers.end()) {
- sig = boost::make_shared<boost::signal<Endpoint::Callback> >();
+ sig = boost::make_shared<signals2::signal<Endpoint::Callback> >();
m_TopicHandlers.insert(make_pair(topic, sig));
} else {
sig = it->second;
if (!request.GetMethod(&method))
return;
- map<String, shared_ptr<boost::signal<Endpoint::Callback> > >::iterator it;
+ map<String, shared_ptr<signals2::signal<Endpoint::Callback> > >::iterator it;
it = m_TopicHandlers.find(method);
if (it == m_TopicHandlers.end())
static Endpoint::Ptr MakeEndpoint(const String& name, bool replicated, bool local = true);
- static boost::signal<void (const Endpoint::Ptr&)> OnConnected;
- static boost::signal<void (const Endpoint::Ptr&)> OnDisconnected;
+ static signals2::signal<void (const Endpoint::Ptr&)> OnConnected;
+ static signals2::signal<void (const Endpoint::Ptr&)> OnDisconnected;
- static boost::signal<void (const Endpoint::Ptr&, const String& topic)> OnSubscriptionRegistered;
- static boost::signal<void (const Endpoint::Ptr&, const String& topic)> OnSubscriptionUnregistered;
+ static signals2::signal<void (const Endpoint::Ptr&, const String& topic)> OnSubscriptionRegistered;
+ static signals2::signal<void (const Endpoint::Ptr&, const String& topic)> OnSubscriptionUnregistered;
private:
bool m_ReceivedWelcome; /**< Have we received a welcome message
bool m_SentWelcome; /**< Have we sent a welcome message to this
endpoint? */
- map<String, shared_ptr<boost::signal<Callback> > > m_TopicHandlers;
+ map<String, shared_ptr<signals2::signal<Callback> > > m_TopicHandlers;
void NewMessageHandler(const MessagePart& message);
void ClientClosedHandler(void);
void EndpointManager::SubscriptionTimerHandler(void)
{
+ recursive_mutex::scoped_lock lock(Application::GetMutex());
+
Dictionary::Ptr subscriptions = boost::make_shared<Dictionary>();
DynamicObject::Ptr object;
void EndpointManager::ReconnectTimerHandler(void)
{
+ recursive_mutex::scoped_lock lock(Application::GetMutex());
+
DynamicObject::Ptr object;
BOOST_FOREACH(tie(tuples::ignore, object), DynamicType::GetByName("Endpoint")->GetObjects()) {
Endpoint::Ptr endpoint = dynamic_pointer_cast<Endpoint>(object);
void EndpointManager::RequestTimerHandler(void)
{
+ recursive_mutex::scoped_lock lock(Application::GetMutex());
+
map<String, PendingRequest>::iterator it;
for (it = m_Requests.begin(); it != m_Requests.end(); it++) {
if (it->second.HasTimedOut()) {
void ProcessResponseMessage(const Endpoint::Ptr& sender, const ResponseMessage& message);
- boost::signal<void (const EndpointManager::Ptr&, const Endpoint::Ptr&)> OnNewEndpoint;
+ signals2::signal<void (const EndpointManager::Ptr&, const Endpoint::Ptr&)> OnNewEndpoint;
private:
String m_Identity;
void SendMessage(const MessagePart& message);
- boost::signal<void (const JsonRpcConnection::Ptr&, const MessagePart&)> OnNewMessage;
+ signals2::signal<void (const JsonRpcConnection::Ptr&, const MessagePart&)> OnNewMessage;
protected:
virtual void ProcessData(void);