int ThreadPool::m_NextID = 1;
-ThreadPool::ThreadPool(void)
- : m_ID(m_NextID++), m_WaitTime(0), m_ServiceTime(0),
- m_TaskCount(0), m_Stopped(false)
+ThreadPool::ThreadPool(int max_threads)
+ : m_ID(m_NextID++), m_Stopped(false), m_MaxThreads(max_threads)
{
- for (int i = 0; i < 2; i++)
- SpawnWorker();
+ if (m_MaxThreads != -1 && m_MaxThreads < sizeof(m_Queues) / sizeof(m_Queues[0]))
+ m_MaxThreads = sizeof(m_Queues) / sizeof(m_Queues[0]);
- m_ThreadGroup.create_thread(boost::bind(&ThreadPool::ManagerThreadProc, this));
- m_ThreadGroup.create_thread(boost::bind(&ThreadPool::StatsThreadProc, this));
+ Start();
}
ThreadPool::~ThreadPool(void)
{
Stop();
- Join();
+ Join(true);
+}
+
+void ThreadPool::Start(void)
+{
+ for (int i = 0; i < sizeof(m_Queues) / sizeof(m_Queues[0]); i++)
+ m_Queues[i].SpawnWorker(m_ThreadGroup);
+
+ m_ThreadGroup.create_thread(boost::bind(&ThreadPool::ManagerThreadProc, this));
+ m_ThreadGroup.create_thread(boost::bind(&ThreadPool::StatsThreadProc, this));
}
void ThreadPool::Stop(void)
{
- boost::mutex::scoped_lock lock(m_Mutex);
+ for (int i = 0; i < sizeof(m_Queues) / sizeof(m_Queues[0]); i++) {
+ boost::mutex::scoped_lock lock(m_Queues[i].Mutex);
+ m_Queues[i].Stopped = true;
+ m_Queues[i].CV.notify_all();
+ }
+
+ boost::mutex::scoped_lock lock(m_MgmtMutex);
m_Stopped = true;
- m_WorkCV.notify_all();
m_MgmtCV.notify_all();
}
/**
* Waits for all worker threads to finish.
*/
-void ThreadPool::Join(void)
+void ThreadPool::Join(bool wait_for_stop)
{
- {
- boost::mutex::scoped_lock lock(m_Mutex);
-
- while (!m_Stopped || !m_WorkItems.empty()) {
- lock.unlock();
- Utility::Sleep(0.5);
- lock.lock();
- }
+ if (wait_for_stop) {
+ m_ThreadGroup.join_all();
+ return;
}
- m_ThreadGroup.join_all();
+ for (int i = 0; i < sizeof(m_Queues) / sizeof(m_Queues[0]); i++) {
+ boost::mutex::scoped_lock lock(m_Queues[i].Mutex);
+
+ while (!m_Queues[i].Items.empty())
+ m_Queues[i].CVStarved.wait(lock);
+ }
}
/**
* Waits for work items and processes them.
*/
-void ThreadPool::QueueThreadProc(int tid)
+void ThreadPool::WorkerThread::ThreadProc(Queue& queue)
{
std::ostringstream idbuf;
- idbuf << "TP #" << m_ID << " W #" << tid;
+ idbuf << "Q #" << &queue << " W #" << this;
Utility::SetThreadName(idbuf.str());
for (;;) {
WorkItem wi;
{
- boost::mutex::scoped_lock lock(m_Mutex);
+ boost::mutex::scoped_lock lock(queue.Mutex);
+
+ UpdateUtilization(ThreadIdle);
- UpdateThreadUtilization(tid, ThreadIdle);
+ while (queue.Items.empty() && !queue.Stopped && !Zombie) {
+ if (queue.Items.empty())
+ queue.CVStarved.notify_all();
- while (m_WorkItems.empty() && !m_Stopped && !m_Threads[tid].Zombie)
- m_WorkCV.wait(lock);
+ queue.CV.wait(lock);
+ }
- if (m_Threads[tid].Zombie)
+ if (Zombie)
break;
- if (m_WorkItems.empty() && m_Stopped)
+ if (queue.Items.empty() && queue.Stopped)
break;
- wi = m_WorkItems.front();
- m_WorkItems.pop_front();
+ wi = queue.Items.front();
+ queue.Items.pop_front();
- UpdateThreadUtilization(tid, ThreadBusy);
+ UpdateUtilization(ThreadBusy);
}
double st = Utility::GetTime();;
double latency = st - wi.Timestamp;
{
- boost::mutex::scoped_lock lock(m_Mutex);
-
- m_WaitTime += latency;
- m_ServiceTime += et - st;
- m_TaskCount++;
+ boost::mutex::scoped_lock lock(queue.Mutex);
- if (latency > m_MaxLatency)
- m_MaxLatency = latency;
+ queue.WaitTime += latency;
+ queue.ServiceTime += et - st;
+ queue.TaskCount++;
}
#ifdef _DEBUG
#endif /* _DEBUG */
}
- boost::mutex::scoped_lock lock(m_Mutex);
- UpdateThreadUtilization(tid, ThreadDead);
- m_Threads[tid].Zombie = false;
+ boost::mutex::scoped_lock lock(queue.Mutex);
+ UpdateUtilization(ThreadDead);
+ Zombie = false;
}
/**
wi.Callback = callback;
wi.Timestamp = Utility::GetTime();
+ Queue& queue = m_Queues[Utility::Random() % (sizeof(m_Queues) / sizeof(m_Queues[0]))];
+
{
- boost::mutex::scoped_lock lock(m_Mutex);
+ boost::mutex::scoped_lock lock(queue.Mutex);
- if (m_Stopped)
+ if (queue.Stopped)
return false;
- m_WorkItems.push_back(wi);
- m_WorkCV.notify_one();
+ queue.Items.push_back(wi);
+ queue.CV.notify_one();
}
return true;
Utility::SetThreadName(idbuf.str());
for (;;) {
- size_t pending, alive;
- double avg_latency, max_latency;
- double utilization = 0;
+ size_t total_pending = 0, total_alive = 0;
+ double total_avg_latency = 0;
+ double total_utilization = 0;
{
- boost::mutex::scoped_lock lock(m_Mutex);
+ boost::mutex::scoped_lock lock(m_MgmtMutex);
if (!m_Stopped)
m_MgmtCV.timed_wait(lock, boost::posix_time::seconds(5));
if (m_Stopped)
break;
+ }
- pending = m_WorkItems.size();
+ for (int i = 0; i < sizeof(m_Queues) / sizeof(m_Queues[0]); i++) {
+ size_t pending, alive = 0;
+ double avg_latency;
+ double utilization = 0;
- alive = 0;
+ Queue& queue = m_Queues[i];
- for (size_t i = 0; i < sizeof(m_Threads) / sizeof(m_Threads[0]); i++) {
- if (m_Threads[i].State != ThreadDead && !m_Threads[i].Zombie) {
+ boost::mutex::scoped_lock lock(queue.Mutex);
+
+ pending = queue.Items.size();
+
+ for (size_t i = 0; i < sizeof(queue.Threads) / sizeof(queue.Threads[0]); i++) {
+ if (queue.Threads[i].State != ThreadDead && !queue.Threads[i].Zombie) {
alive++;
- utilization += m_Threads[i].Utilization * 100;
+ utilization += queue.Threads[i].Utilization * 100;
}
}
utilization /= alive;
- if (m_TaskCount > 0)
- avg_latency = m_WaitTime / (m_TaskCount * 1.0);
+ if (queue.TaskCount > 0)
+ avg_latency = queue.WaitTime / (queue.TaskCount * 1.0);
else
avg_latency = 0;
int tthreads = wthreads - alive;
- /* Don't ever kill the last 8 threads. */
- if (alive + tthreads < 8)
- tthreads = 8 - alive;
+ /* Don't ever kill the last threads. */
+ if (alive + tthreads < 2)
+ tthreads = 2 - alive;
/* Don't kill more than 8 threads at once. */
if (tthreads < -8)
/* Spawn more workers if there are outstanding work items. */
if (tthreads > 0 && pending > 0)
- tthreads = (Utility::GetTime() - Application::GetStartTime() < 300) ? 128 : 8;
+ tthreads = 8;
+
+ if (m_MaxThreads != -1 && (alive + tthreads) * (sizeof(m_Queues) / sizeof(m_Queues[0])) > m_MaxThreads)
+ tthreads = m_MaxThreads / (sizeof(m_Queues) / sizeof(m_Queues[0])) - alive;
std::ostringstream msgbuf;
msgbuf << "Thread pool; current: " << alive << "; adjustment: " << tthreads;
Log(LogDebug, "base", msgbuf.str());
for (int i = 0; i < -tthreads; i++)
- KillWorker();
+ queue.KillWorker(m_ThreadGroup);
for (int i = 0; i < tthreads; i++)
- SpawnWorker();
+ queue.SpawnWorker(m_ThreadGroup);
}
- m_WaitTime = 0;
- m_ServiceTime = 0;
- m_TaskCount = 0;
+ queue.WaitTime = 0;
+ queue.ServiceTime = 0;
+ queue.TaskCount = 0;
- max_latency = m_MaxLatency;
- m_MaxLatency = 0;
+ total_pending += pending;
+ total_alive += alive;
+ total_avg_latency += avg_latency;
+ total_utilization += utilization;
}
std::ostringstream msgbuf;
- msgbuf << "Pool #" << m_ID << ": Pending tasks: " << pending << "; Average latency: "
- << (long)(avg_latency * 1000) << "ms"
- << "; Max latency: " << (long)(max_latency * 1000) << "ms"
- << "; Threads: " << alive
- << "; Pool utilization: " << utilization << "%";
+ msgbuf << "Pool #" << m_ID << ": Pending tasks: " << total_pending << "; Average latency: "
+ << (long)(total_avg_latency * 1000 / (sizeof(m_Queues) / sizeof(m_Queues[0]))) << "ms"
+ << "; Threads: " << total_alive
+ << "; Pool utilization: " << (total_utilization / (sizeof(m_Queues) / sizeof(m_Queues[0]))) << "%";
Log(LogInformation, "base", msgbuf.str());
}
}
/**
* Note: Caller must hold m_Mutex
*/
-void ThreadPool::SpawnWorker(void)
+void ThreadPool::Queue::SpawnWorker(boost::thread_group& group)
{
- for (size_t i = 0; i < sizeof(m_Threads) / sizeof(m_Threads[0]); i++) {
- if (m_Threads[i].State == ThreadDead) {
+ for (size_t i = 0; i < sizeof(Threads) / sizeof(Threads[0]); i++) {
+ if (Threads[i].State == ThreadDead) {
Log(LogDebug, "debug", "Spawning worker thread.");
- m_Threads[i] = WorkerThread(ThreadIdle);
- m_Threads[i].Thread = m_ThreadGroup.create_thread(boost::bind(&ThreadPool::QueueThreadProc, this, i));
+ Threads[i] = WorkerThread(ThreadIdle);
+ Threads[i].Thread = group.create_thread(boost::bind(&ThreadPool::WorkerThread::ThreadProc, boost::ref(Threads[i]), boost::ref(*this)));
break;
}
}
/**
- * Note: Caller must hold m_Mutex.
+ * Note: Caller must hold Mutex.
*/
-void ThreadPool::KillWorker(void)
+void ThreadPool::Queue::KillWorker(boost::thread_group& group)
{
- for (size_t i = 0; i < sizeof(m_Threads) / sizeof(m_Threads[0]); i++) {
- if (m_Threads[i].State == ThreadIdle && !m_Threads[i].Zombie) {
+ for (size_t i = 0; i < sizeof(Threads) / sizeof(Threads[0]); i++) {
+ if (Threads[i].State == ThreadIdle && !Threads[i].Zombie) {
Log(LogDebug, "base", "Killing worker thread.");
- m_ThreadGroup.remove_thread(m_Threads[i].Thread);
- m_Threads[i].Thread->detach();
- delete m_Threads[i].Thread;
+ group.remove_thread(Threads[i].Thread);
+ Threads[i].Thread->detach();
+ delete Threads[i].Thread;
- m_Threads[i].Zombie = true;
- m_WorkCV.notify_all();
+ Threads[i].Zombie = true;
+ CV.notify_all();
break;
}
Utility::SetThreadName(idbuf.str());
for (;;) {
- boost::mutex::scoped_lock lock(m_Mutex);
+ {
+ boost::mutex::scoped_lock lock(m_MgmtMutex);
- if (!m_Stopped)
- m_MgmtCV.timed_wait(lock, boost::posix_time::milliseconds(250));
+ if (!m_Stopped)
+ m_MgmtCV.timed_wait(lock, boost::posix_time::milliseconds(250));
- if (m_Stopped)
- break;
+ if (m_Stopped)
+ break;
+ }
+
+ for (int i = 0; i < sizeof(m_Queues) / sizeof(m_Queues[0]); i++) {
+ Queue& queue = m_Queues[i];
- for (size_t i = 0; i < sizeof(m_Threads) / sizeof(m_Threads[0]); i++)
- UpdateThreadUtilization(i);
+ boost::mutex::scoped_lock lock(queue.Mutex);
+
+ for (size_t i = 0; i < sizeof(queue.Threads) / sizeof(queue.Threads[0]); i++)
+ queue.Threads[i].UpdateUtilization();
+ }
}
}
/**
- * Note: Caller must hold m_Mutex.
+ * Note: Caller must hold queue Mutex.
*/
-void ThreadPool::UpdateThreadUtilization(int tid, ThreadState state)
+void ThreadPool::WorkerThread::UpdateUtilization(ThreadState state)
{
double utilization;
- switch (m_Threads[tid].State) {
+ switch (State) {
case ThreadDead:
return;
case ThreadIdle:
}
double now = Utility::GetTime();
- double time = now - m_Threads[tid].LastUpdate;
+ double time = now - LastUpdate;
const double avg_time = 5.0;
if (time > avg_time)
time = avg_time;
- m_Threads[tid].Utilization = (m_Threads[tid].Utilization * (avg_time - time) + utilization * time) / avg_time;
- m_Threads[tid].LastUpdate = now;
+ Utilization = (Utilization * (avg_time - time) + utilization * time) / avg_time;
+ LastUpdate = now;
if (state != ThreadUnspecified)
- m_Threads[tid].State = state;
+ State = state;
}
DynamicObject::Ptr dobj = dtype->CreateObject(properties);
dobj->Register();
+ m_Object = dobj;
+
return dobj;
}
+DynamicObject::Ptr ConfigItem::GetObject(void) const
+{
+ return m_Object;
+}
+
/**
* Registers the configuration item.
*/
void ConfigItem::Register(void)
{
- ASSERT(!OwnsLock());
-
- {
- ObjectLock olock(this);
+ boost::mutex::scoped_lock lock(m_Mutex);
- m_Items[std::make_pair(m_Type, m_Name)] = GetSelf();
- }
+ m_Items[std::make_pair(m_Type, m_Name)] = GetSelf();
}
/**
Log(LogInformation, "config", "Validating config items (step 1)...");
+ ThreadPool tp(32);
+
BOOST_FOREACH(const ItemMap::value_type& kv, m_Items) {
- kv.second->ValidateItem();
+ tp.Post(boost::bind(&ConfigItem::ValidateItem, kv.second));
}
+
+ tp.Join();
if (ConfigCompilerContext::GetInstance()->HasErrors())
return false;
- Log(LogInformation, "config", "Activating config items");
+ Log(LogInformation, "config", "Comitting config items");
+ BOOST_FOREACH(const ItemMap::value_type& kv, m_Items) {
+ tp.Post(boost::bind(&ConfigItem::Commit, kv.second));
+ }
+
+ tp.Join();
+
std::vector<DynamicObject::Ptr> objects;
-
BOOST_FOREACH(const ItemMap::value_type& kv, m_Items) {
- DynamicObject::Ptr object = kv.second->Commit();
+ DynamicObject::Ptr object = kv.second->GetObject();
if (object)
objects.push_back(object);
}
-
+
+ Log(LogInformation, "config", "Triggering OnConfigLoaded signal for config items");
+
BOOST_FOREACH(const DynamicObject::Ptr& object, objects) {
- object->OnConfigLoaded();
+ tp.Post(boost::bind(&DynamicObject::OnConfigLoaded, object));
}
+
+ tp.Join();
Log(LogInformation, "config", "Validating config items (step 2)...");
BOOST_FOREACH(const ItemMap::value_type& kv, m_Items) {
- kv.second->ValidateItem();
+ tp.Post(boost::bind(&ConfigItem::ValidateItem, kv.second));
}
+ tp.Join();
+
if (ConfigCompilerContext::GetInstance()->HasErrors())
return false;
/* restore the previous program state */
DynamicObject::RestoreObjects(Application::GetStatePath());
+ Log(LogInformation, "config", "Triggering Start signal for config items");
+
BOOST_FOREACH(const DynamicType::Ptr& type, DynamicType::GetTypes()) {
BOOST_FOREACH(const DynamicObject::Ptr& object, type->GetObjects()) {
if (object->IsActive())
#ifdef _DEBUG
Log(LogDebug, "config", "Activating object '" + object->GetName() + "' of type '" + object->GetType()->GetName() + "'");
#endif /* _DEBUG */
- object->Start();
-
+ tp.Post(boost::bind(&DynamicObject::Start, object));
+ }
+ }
+
+ tp.Join();
+
+#ifdef _DEBUG
+ BOOST_FOREACH(const DynamicType::Ptr& type, DynamicType::GetTypes()) {
+ BOOST_FOREACH(const DynamicObject::Ptr& object, type->GetObjects()) {
ASSERT(object->IsActive());
}
}
+#endif /* _DEBUG */
+
+ Log(LogInformation, "config", "Activated all objects.");
return true;
}