return m_Name;
}
+boost::mutex::scoped_lock WorkQueue::AcquireLock()
+{
+ return boost::mutex::scoped_lock(m_Mutex);
+}
+
/**
* Enqueues a task. Tasks are guaranteed to be executed in the order
- * they were enqueued in except if there is more than one worker thread or when
- * allowInterleaved is true in which case the new task might be run
- * immediately if it's being enqueued from within the WorkQueue thread.
+ * they were enqueued in except if there is more than one worker thread.
*/
-void WorkQueue::Enqueue(std::function<void ()>&& function, WorkQueuePriority priority,
- bool allowInterleaved)
+void WorkQueue::EnqueueUnlocked(boost::mutex::scoped_lock& lock, std::function<void ()>&& function, WorkQueuePriority priority)
{
- bool wq_thread = IsWorkerThread();
-
- if (wq_thread && allowInterleaved) {
- function();
-
- return;
- }
-
- boost::mutex::scoped_lock lock(m_Mutex);
-
if (!m_Spawned) {
Log(LogNotice, "WorkQueue")
<< "Spawning WorkQueue threads for '" << m_Name << "'";
m_Spawned = true;
}
+ bool wq_thread = IsWorkerThread();
+
if (!wq_thread) {
while (m_Tasks.size() >= m_MaxItems && m_MaxItems != 0)
m_CVFull.wait(lock);
m_CVEmpty.notify_one();
}
+/**
+ * Enqueues a task. Tasks are guaranteed to be executed in the order
+ * they were enqueued in except if there is more than one worker thread or when
+ * allowInterleaved is true in which case the new task might be run
+ * immediately if it's being enqueued from within the WorkQueue thread.
+ */
+void WorkQueue::Enqueue(std::function<void ()>&& function, WorkQueuePriority priority,
+ bool allowInterleaved)
+{
+ bool wq_thread = IsWorkerThread();
+
+ if (wq_thread && allowInterleaved) {
+ function();
+
+ return;
+ }
+
+ auto lock = AcquireLock();
+ EnqueueUnlocked(lock, std::move(function), priority);
+}
+
/**
* Waits until all currently enqueued tasks have completed. This only works reliably
* when no other thread is enqueuing new tasks when this method is called.
}
}
+void WorkQueue::RunTaskFunction(const TaskFunction& func)
+{
+ try {
+ func();
+ } catch (const std::exception&) {
+ boost::exception_ptr eptr = boost::current_exception();
+
+ {
+ boost::mutex::scoped_lock mutex(m_Mutex);
+
+ if (!m_ExceptionCallback)
+ m_Exceptions.push_back(eptr);
+ }
+
+ if (m_ExceptionCallback)
+ m_ExceptionCallback(eptr);
+ }
+}
+
void WorkQueue::WorkerThreadProc()
{
std::ostringstream idbuf;
lock.unlock();
- try {
- task.Function();
- } catch (const std::exception&) {
- lock.lock();
-
- if (!m_ExceptionCallback)
- m_Exceptions.push_back(boost::current_exception());
-
- lock.unlock();
-
- if (m_ExceptionCallback)
- m_ExceptionCallback(boost::current_exception());
- }
+ RunTaskFunction(task.Function);
/* clear the task so whatever other resources it holds are released _before_ we re-acquire the mutex */
task = Task();
PriorityHigh
};
+using TaskFunction = std::function<void ()>;
+
struct Task
{
Task() = default;
- Task(std::function<void (void)> function, WorkQueuePriority priority, int id)
+ Task(TaskFunction function, WorkQueuePriority priority, int id)
: Function(std::move(function)), Priority(priority), ID(id)
{ }
- std::function<void (void)> Function;
+ TaskFunction Function;
WorkQueuePriority Priority{PriorityNormal};
int ID{-1};
};
void SetName(const String& name);
String GetName() const;
- void Enqueue(std::function<void (void)>&& function, WorkQueuePriority priority = PriorityNormal,
+ boost::mutex::scoped_lock AcquireLock();
+ void EnqueueUnlocked(boost::mutex::scoped_lock& lock, TaskFunction&& function, WorkQueuePriority priority = PriorityNormal);
+ void Enqueue(TaskFunction&& function, WorkQueuePriority priority = PriorityNormal,
bool allowInterleaved = false);
void Join(bool stop = false);
+ template<typename VectorType, typename FuncType>
+ void ParallelFor(const VectorType& items, const FuncType& func)
+ {
+ using SizeType = decltype(items.size());
+
+ SizeType totalCount = items.size();
+
+ auto lock = AcquireLock();
+
+ SizeType offset = 0;
+
+ for (int i = 0; i < m_ThreadCount; i++) {
+ SizeType count = totalCount / static_cast<SizeType>(m_ThreadCount);
+ if (static_cast<SizeType>(i) < totalCount % static_cast<SizeType>(m_ThreadCount))
+ count++;
+
+ EnqueueUnlocked(lock, [&items, func, offset, count, this]() {
+ for (SizeType j = offset; j < offset + count; j++) {
+ RunTaskFunction([&func, &items, j]() {
+ func(items[j]);
+ });
+ }
+ });
+
+ offset += count;
+ }
+
+ ASSERT(offset == items.size());
+ }
+
bool IsWorkerThread() const;
size_t GetLength() const;
void WorkerThreadProc();
void StatusTimerHandler();
+
+ void RunTaskFunction(const TaskFunction& func);
};
}
if (items.empty())
return true;
- for (const ItemPair& ip : items) {
+ for (const auto& ip : items)
newItems.push_back(ip.first);
- upq.Enqueue([&]() {
- ip.first->Commit(ip.second);
- });
- }
+
+ upq.ParallelFor(items, [](const ItemPair& ip) {
+ ip.first->Commit(ip.second);
+ });
upq.Join();
if (unresolved_dep)
continue;
- for (const ItemPair& ip : items) {
+ upq.ParallelFor(items, [&type](const ItemPair& ip) {
const ConfigItem::Ptr& item = ip.first;
- if (!item->m_Object)
- continue;
+ if (!item->m_Object || item->m_Type != type)
+ return;
- if (item->m_Type == type) {
- upq.Enqueue([&]() {
- try {
- item->m_Object->OnAllConfigLoaded();
- } catch (const std::exception& ex) {
- if (item->m_IgnoreOnError) {
- Log(LogNotice, "ConfigObject")
- << "Ignoring config object '" << item->m_Name << "' of type '" << item->m_Type->GetName() << "' due to errors: " << DiagnosticInformation(ex);
-
- item->Unregister();
+ try {
+ item->m_Object->OnAllConfigLoaded();
+ } catch (const std::exception& ex) {
+ if (!item->m_IgnoreOnError)
+ throw;
- {
- boost::mutex::scoped_lock lock(item->m_Mutex);
- item->m_IgnoredItems.push_back(item->m_DebugInfo.Path);
- }
+ Log(LogNotice, "ConfigObject")
+ << "Ignoring config object '" << item->m_Name << "' of type '" << item->m_Type->GetName() << "' due to errors: " << DiagnosticInformation(ex);
- return;
- }
+ item->Unregister();
- throw;
- }
- });
+ {
+ boost::mutex::scoped_lock lock(item->m_Mutex);
+ item->m_IgnoredItems.push_back(item->m_DebugInfo.Path);
+ }
}
- }
+ });
completed_types.insert(type);
return false;
for (const String& loadDep : type->GetLoadDependencies()) {
- for (const ItemPair& ip : items) {
+ upq.ParallelFor(items, [loadDep, &type](const ItemPair& ip) {
const ConfigItem::Ptr& item = ip.first;
- if (!item->m_Object)
- continue;
+ if (!item->m_Object || item->m_Type->GetName() != loadDep)
+ return;
- if (item->m_Type->GetName() == loadDep) {
- upq.Enqueue([&]() {
- ActivationScope ascope(item->m_ActivationContext);
- item->m_Object->CreateChildObjects(type);
- });
- }
- }
+ ActivationScope ascope(item->m_ActivationContext);
+ item->m_Object->CreateChildObjects(type);
+ });
}
upq.Join();
Log(LogDebug, "ConfigItem")
<< "Setting 'active' to true for object '" << object->GetName() << "' of type '" << object->GetReflectionType()->GetName() << "'";
#endif /* I2_DEBUG */
- upq.Enqueue(std::bind(&ConfigObject::PreActivate, object));
- }
-
- upq.Join();
- if (upq.HasExceptions()) {
- upq.ReportExceptions("ConfigItem");
- return false;
+ object->PreActivate();
}
if (!silent)
Log(LogDebug, "ConfigItem")
<< "Activating object '" << object->GetName() << "' of type '" << object->GetReflectionType()->GetName() << "'";
#endif /* I2_DEBUG */
- upq.Enqueue(std::bind(&ConfigObject::Activate, object, runtimeCreated));
+
+ object->Activate(runtimeCreated);
}
upq.Join();