#include "base/logger_fwd.h"
#include "base/convert.h"
#include <boost/bind.hpp>
+#include <boost/foreach.hpp>
using namespace icinga;
WorkQueue::WorkQueue(size_t maxItems)
: m_ID(m_NextID++), m_MaxItems(maxItems), m_Stopped(false),
- m_ExceptionCallback(WorkQueue::DefaultExceptionCallback)
+ m_Processing(false), m_ExceptionCallback(WorkQueue::DefaultExceptionCallback)
{
m_Thread = boost::thread(boost::bind(&WorkQueue::WorkerThreadProc, this));
/**
* Enqueues a work item. Work items are guaranteed to be executed in the order
- * they were enqueued in.
+ * they were enqueued in except when allowInterleaved is true in which case
+ * the new work item might be run immediately if it's being enqueued from
+ * within the WorkQueue thread.
*/
void WorkQueue::Enqueue(const WorkCallback& callback, bool allowInterleaved)
{
+ bool wq_thread = (boost::this_thread::get_id() == GetThreadId());
+
+ if (wq_thread && allowInterleaved) {
+ callback();
+
+ return;
+ }
+
WorkItem item;
item.Callback = callback;
item.AllowInterleaved = allowInterleaved;
- bool wq_thread = (boost::this_thread::get_id() == GetThreadId());
-
boost::mutex::scoped_lock lock(m_Mutex);
if (!wq_thread) {
m_Items.push_back(item);
- if (wq_thread)
- ProcessItems(lock, true);
- else if (m_Items.size() == 1)
+ if (m_Items.size() == 1)
m_CVEmpty.notify_all();
}
{
boost::mutex::scoped_lock lock(m_Mutex);
- while (!m_Items.empty())
+ while (m_Processing || !m_Items.empty())
m_CVStarved.wait(lock);
if (stop) {
Log(LogInformation, "base", "WQ #" + Convert::ToString(m_ID) + " items: " + Convert::ToString(m_Items.size()));
}
-void WorkQueue::ProcessItems(boost::mutex::scoped_lock& lock, bool interleaved)
+void WorkQueue::WorkerThreadProc(void)
{
- while (!m_Items.empty()) {
- WorkItem wi = m_Items.front();
-
- if (interleaved && !wi.AllowInterleaved)
- return;
+ std::ostringstream idbuf;
+ idbuf << "WQ #" << m_ID;
+ Utility::SetThreadName(idbuf.str());
- lock.unlock();
+ boost::mutex::scoped_lock lock(m_Mutex);
- try {
- wi.Callback();
- } catch (const std::exception&) {
- lock.lock();
+ for (;;) {
+ while (m_Items.empty() && !m_Stopped)
+ m_CVEmpty.wait(lock);
- ExceptionCallback callback = m_ExceptionCallback;
+ if (m_Stopped)
+ break;
- lock.unlock();
+ std::deque<WorkItem> items;
+ m_Items.swap(items);
- callback(boost::current_exception());
- }
+ if (items.size() >= m_MaxItems)
+ m_CVFull.notify_all();
- lock.lock();
+ m_Processing = true;
- m_Items.pop_front();
+ lock.unlock();
- if (m_Items.size() + 1 == m_MaxItems)
- m_CVFull.notify_one();
- }
+ BOOST_FOREACH(WorkItem& wi, items) {
+ try {
+ wi.Callback();
+ }
+ catch (const std::exception&) {
+ lock.lock();
- m_CVStarved.notify_all();
-}
+ ExceptionCallback callback = m_ExceptionCallback;
-void WorkQueue::WorkerThreadProc(void)
-{
- boost::mutex::scoped_lock lock(m_Mutex);
+ lock.unlock();
- std::ostringstream idbuf;
- idbuf << "WQ #" << m_ID;
- Utility::SetThreadName(idbuf.str());
+ callback(boost::current_exception());
+ }
+ }
- for (;;) {
- while (m_Items.empty() && !m_Stopped)
- m_CVEmpty.wait(lock);
+ lock.lock();
- if (m_Stopped)
- break;
+ m_Processing = false;
- ProcessItems(lock, false);
+ m_CVStarved.notify_all();
}
}
-
ParallelWorkQueue::ParallelWorkQueue(void)
: m_QueueCount(boost::thread::hardware_concurrency()),
m_Queues(new WorkQueue[m_QueueCount]),
struct WorkItem
{
-
WorkCallback Callback;
bool AllowInterleaved;
};
boost::thread m_Thread;
size_t m_MaxItems;
bool m_Stopped;
+ bool m_Processing;
std::deque<WorkItem> m_Items;
ExceptionCallback m_ExceptionCallback;
Timer::Ptr m_StatusTimer;
- void ProcessItems(boost::mutex::scoped_lock& lock, bool interleaved);
void WorkerThreadProc(void);
void StatusTimerHandler(void);
void ConfigItem::Link(void)
{
- ObjectLock olock(this);
+ ASSERT(OwnsLock());
if (m_LinkedExpressionList)
return;
ExpressionList::Ptr ConfigItem::GetLinkedExpressionList(void)
{
+ ASSERT(OwnsLock());
+
if (!m_LinkedExpressionList)
Link();
Dictionary::Ptr ConfigItem::GetProperties(void)
{
+ ASSERT(!OwnsLock());
+
+ ObjectLock olock(this);
+
if (!m_Properties) {
m_Properties = make_shared<Dictionary>();
GetLinkedExpressionList()->Execute(m_Properties);
return dobj;
}
-DynamicObject::Ptr ConfigItem::GetObject(void) const
-{
- return m_Object;
-}
-
/**
* Registers the configuration item.
*/
void ConfigItem::Register(void)
{
+ std::pair<String, String> key = std::make_pair(m_Type, m_Name);
+ ConfigItem::Ptr self = GetSelf();
+
boost::mutex::scoped_lock lock(m_Mutex);
- m_Items[std::make_pair(m_Type, m_Name)] = GetSelf();
+ m_Items[key] = self;
}
/**
*/
ConfigItem::Ptr ConfigItem::GetObject(const String& type, const String& name)
{
- boost::mutex::scoped_lock lock(m_Mutex);
-
+ std::pair<String, String> key = std::make_pair(type, name);
ConfigItem::ItemMap::iterator it;
- it = m_Items.find(std::make_pair(type, name));
+ {
+ boost::mutex::scoped_lock lock(m_Mutex);
+
+ it = m_Items.find(key);
+ }
if (it != m_Items.end())
return it->second;
bool ConfigItem::HasObject(const String& type, const String& name)
{
- boost::mutex::scoped_lock lock(m_Mutex);
-
+ std::pair<String, String> key = std::make_pair(type, name);
ConfigItem::ItemMap::iterator it;
- it = m_Items.find(std::make_pair(type, name));
+ {
+ boost::mutex::scoped_lock lock(m_Mutex);
+
+ it = m_Items.find(key);
+ }
return (it != m_Items.end());
}
std::vector<DynamicObject::Ptr> objects;
BOOST_FOREACH(const ItemMap::value_type& kv, m_Items) {
- DynamicObject::Ptr object = kv.second->GetObject();
+ DynamicObject::Ptr object = kv.second->m_Object;
if (object)
objects.push_back(object);
void ConfigItem::DiscardItems(void)
{
+ boost::mutex::scoped_lock lock(m_Mutex);
+
m_Items.clear();
}
std::vector<ConfigItem::Ptr> GetParents(void) const;
- void Link(void);
ExpressionList::Ptr GetLinkedExpressionList(void);
Dictionary::Ptr GetProperties(void);
static bool HasObject(const String& type, const String& name);
void ValidateItem(void);
-
- DynamicObject::Ptr GetObject(void) const;
static bool ActivateItems(bool validateOnly);
static void DiscardItems(void);
private:
+ void Link(void);
ExpressionList::Ptr GetExpressionList(void) const;
String m_Type; /**< The object type. */
{
ASSERT(!OwnsLock());
- ConfigItem::Ptr item = ConfigItem::GetObject("Host", GetName());
-
- /* Don't create slave services unless we own this object */
- if (!item)
- return;
-
Dictionary::Ptr service_descriptions = GetServiceDescriptions();
- if (!service_descriptions)
+ if (!service_descriptions ||service_descriptions->GetLength() == 0)
return;
+ ConfigItem::Ptr item = ConfigItem::GetObject("Host", GetName());
+
ObjectLock olock(service_descriptions);
BOOST_FOREACH(const Dictionary::Pair& kv, service_descriptions) {
std::ostringstream namebuf;
void Service::StartDowntimesExpiredTimer(void)
{
- if (!l_DowntimesExpireTimer) {
- l_DowntimesExpireTimer = make_shared<Timer>();
- l_DowntimesExpireTimer->SetInterval(60);
- l_DowntimesExpireTimer->OnTimerExpired.connect(boost::bind(&Service::DowntimesExpireTimerHandler));
- l_DowntimesExpireTimer->Start();
- }
+ l_DowntimesExpireTimer = make_shared<Timer>();
+ l_DowntimesExpireTimer->SetInterval(60);
+ l_DowntimesExpireTimer->OnTimerExpired.connect(boost::bind(&Service::DowntimesExpireTimerHandler));
+ l_DowntimesExpireTimer->Start();
}
void Service::AddDowntimesToCache(void)
void Service::UpdateSlaveScheduledDowntimes(void)
{
- ConfigItem::Ptr item = ConfigItem::GetObject("Service", GetName());
-
- /* Don't create slave scheduled downtimes unless we own this object */
- if (!item)
- return;
-
/* Service scheduled downtime descs */
Dictionary::Ptr descs = GetScheduledDowntimeDescriptions();
- if (!descs)
+ if (!descs || descs->GetLength() == 0)
return;
+ ConfigItem::Ptr item = ConfigItem::GetObject("Service", GetName());
+
ObjectLock olock(descs);
BOOST_FOREACH(const Dictionary::Pair& kv, descs) {
void Service::UpdateSlaveNotifications(void)
{
- ConfigItem::Ptr item = ConfigItem::GetObject("Service", GetName());
-
- /* Don't create slave notifications unless we own this object */
- if (!item)
- return;
-
/* Service notification descs */
Dictionary::Ptr descs = GetNotificationDescriptions();
- if (!descs)
+ if (!descs || descs->GetLength() == 0)
return;
+ ConfigItem::Ptr item = ConfigItem::GetObject("Service", GetName());
+
ObjectLock olock(descs);
BOOST_FOREACH(const Dictionary::Pair& kv, descs) {
#include "base/objectlock.h"
#include "base/convert.h"
#include "base/utility.h"
+#include "base/initialize.h"
#include <boost/foreach.hpp>
#include <boost/bind/apply.hpp>
boost::signals2::signal<void (const Service::Ptr&, const String&, const String&, AcknowledgementType, double, const String&)> Service::OnAcknowledgementSet;
boost::signals2::signal<void (const Service::Ptr&, const String&)> Service::OnAcknowledgementCleared;
+INITIALIZE_ONCE(&Service::StartDowntimesExpiredTimer);
+
Service::Service(void)
: m_CheckRunning(false)
{ }
void Service::Start(void)
{
- VERIFY(GetHost());
-
- StartDowntimesExpiredTimer();
-
double now = Utility::GetTime();
if (GetNextCheck() < now + 300)
static Service::Ptr GetOwnerByDowntimeID(const String& id);
static Downtime::Ptr GetDowntimeByID(const String& id);
- void StartDowntimesExpiredTimer(void);
+ static void StartDowntimesExpiredTimer(void);
bool IsInDowntime(void) const;
bool IsAcknowledged(void);