void IdoMysqlConnection::TxTimerHandler(void)
{
- m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::NewTransaction, this));
+ m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::NewTransaction, this), true);
}
void IdoMysqlConnection::NewTransaction(void)
{
ASSERT(query.Category != DbCatInvalid);
- m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::InternalExecuteQuery, this, query));
+ m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::InternalExecuteQuery, this, query), true);
}
void IdoMysqlConnection::InternalExecuteQuery(const DbQuery& query)
void IdoMysqlConnection::CleanUpExecuteQuery(const String& table, const String& time_column, double max_age)
{
- m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::InternalCleanUpExecuteQuery, this, table, time_column, max_age));
+ m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::InternalCleanUpExecuteQuery, this, table, time_column, max_age), true);
}
void IdoMysqlConnection::InternalCleanUpExecuteQuery(const String& table, const String& time_column, double max_age)
void IdoPgsqlConnection::TxTimerHandler(void)
{
- m_QueryQueue.Enqueue(boost::bind(&IdoPgsqlConnection::NewTransaction, this));
+ m_QueryQueue.Enqueue(boost::bind(&IdoPgsqlConnection::NewTransaction, this), true);
}
void IdoPgsqlConnection::NewTransaction(void)
{
ASSERT(query.Category != DbCatInvalid);
- m_QueryQueue.Enqueue(boost::bind(&IdoPgsqlConnection::InternalExecuteQuery, this, query));
+ m_QueryQueue.Enqueue(boost::bind(&IdoPgsqlConnection::InternalExecuteQuery, this, query), true);
}
void IdoPgsqlConnection::InternalExecuteQuery(const DbQuery& query)
void IdoPgsqlConnection::CleanUpExecuteQuery(const String& table, const String& time_column, double max_age)
{
- m_QueryQueue.Enqueue(boost::bind(&IdoPgsqlConnection::InternalCleanUpExecuteQuery, this, table, time_column, max_age));
+ m_QueryQueue.Enqueue(boost::bind(&IdoPgsqlConnection::InternalCleanUpExecuteQuery, this, table, time_column, max_age), true);
}
void IdoPgsqlConnection::InternalCleanUpExecuteQuery(const String& table, const String& time_column, double max_age)
* Enqueues a work item. Work items are guaranteed to be executed in the order
* they were enqueued in.
*/
-void WorkQueue::Enqueue(const WorkCallback& item)
+void WorkQueue::Enqueue(const WorkCallback& callback, bool allowInterleaved)
{
boost::mutex::scoped_lock lock(m_Mutex);
m_CV.wait(lock);
}
+ WorkItem item;
+ item.Callback = callback;
+ item.AllowInterleaved = allowInterleaved;
+
m_Items.push_back(item);
if (wq_thread)
- ProcessItems(lock);
+ ProcessItems(lock, true);
else
m_CV.notify_all();
}
throw;
}
-void WorkQueue::ProcessItems(boost::mutex::scoped_lock& lock)
+void WorkQueue::ProcessItems(boost::mutex::scoped_lock& lock, bool interleaved)
{
while (!m_Items.empty()) {
try {
- WorkCallback wi = m_Items.front();
+ WorkItem wi = m_Items.front();
+
+ if (interleaved && !wi.AllowInterleaved)
+ return;
+
m_Items.pop_front();
m_CV.notify_all();
lock.unlock();
- wi();
+ wi.Callback();
} catch (const std::exception& ex) {
lock.lock();
if (m_Joined)
break;
- ProcessItems(lock);
+ ProcessItems(lock, false);
}
m_Stopped = true;
namespace icinga
{
+typedef boost::function<void (void)> WorkCallback;
+
+struct WorkItem
+{
+
+ WorkCallback Callback;
+ bool AllowInterleaved;
+};
+
/**
* A workqueue.
*
class I2_BASE_API WorkQueue
{
public:
- typedef boost::function<void (void)> WorkCallback;
typedef boost::function<void (boost::exception_ptr)> ExceptionCallback;
WorkQueue(size_t maxItems = 25000);
~WorkQueue(void);
- void Enqueue(const WorkCallback& item);
+ void Enqueue(const WorkCallback& callback, bool allowInterleaved = false);
void Join(void);
boost::thread::id GetThreadId(void) const;
size_t m_MaxItems;
bool m_Joined;
bool m_Stopped;
- std::deque<WorkCallback> m_Items;
+ std::deque<WorkItem> m_Items;
ExceptionCallback m_ExceptionCallback;
- void ProcessItems(boost::mutex::scoped_lock& lock);
+ void ProcessItems(boost::mutex::scoped_lock& lock, bool interleaved);
void WorkerThreadProc(void);
static void DefaultExceptionCallback(boost::exception_ptr exp);