1 /******************************************************************************
3 * Copyright (C) 2012-2016 Icinga Development Team (https://www.icinga.org/) *
5 * This program is free software; you can redistribute it and/or *
6 * modify it under the terms of the GNU General Public License *
7 * as published by the Free Software Foundation; either version 2 *
8 * of the License, or (at your option) any later version. *
10 * This program is distributed in the hope that it will be useful, *
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
13 * GNU General Public License for more details. *
15 * You should have received a copy of the GNU General Public License *
16 * along with this program; if not, write to the Free Software Foundation *
17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
18 ******************************************************************************/
20 #include "base/workqueue.hpp"
21 #include "base/utility.hpp"
22 #include "base/logger.hpp"
23 #include "base/convert.hpp"
24 #include "base/application.hpp"
25 #include "base/exception.hpp"
26 #include <boost/bind.hpp>
27 #include <boost/foreach.hpp>
28 #include <boost/thread/tss.hpp>
30 using namespace icinga;
32 int WorkQueue::m_NextID = 1;
33 boost::thread_specific_ptr<WorkQueue *> l_ThreadWorkQueue;
35 WorkQueue::WorkQueue(size_t maxItems, int threadCount)
36 : m_ID(m_NextID++), m_ThreadCount(threadCount), m_Spawned(false), m_MaxItems(maxItems), m_Stopped(false),
39 m_StatusTimer = new Timer();
40 m_StatusTimer->SetInterval(10);
41 m_StatusTimer->OnTimerExpired.connect(boost::bind(&WorkQueue::StatusTimerHandler, this));
42 m_StatusTimer->Start();
45 WorkQueue::~WorkQueue(void)
47 m_StatusTimer->Stop(true);
53 * Enqueues a task. Tasks are guaranteed to be executed in the order
54 * they were enqueued in except if there is more than one worker thread or when
55 * allowInterleaved is true in which case the new task might be run
56 * immediately if it's being enqueued from within the WorkQueue thread.
58 void WorkQueue::Enqueue(const boost::function<void (void)>& function, WorkQueuePriority priority,
59 bool allowInterleaved)
61 bool wq_thread = IsWorkerThread();
63 if (wq_thread && allowInterleaved) {
69 boost::mutex::scoped_lock lock(m_Mutex);
72 for (int i = 0; i < m_ThreadCount; i++) {
73 m_Threads.create_thread(boost::bind(&WorkQueue::WorkerThreadProc, this));
80 while (m_Tasks.size() >= m_MaxItems && m_MaxItems != 0)
84 m_Tasks.push(Task(function, priority, ++m_NextTaskID));
86 m_CVEmpty.notify_one();
90 * Waits until all currently enqueued tasks have completed. This only works reliably
91 * when no other thread is enqueuing new tasks when this method is called.
93 * @param stop Whether to stop the worker threads
95 void WorkQueue::Join(bool stop)
97 boost::mutex::scoped_lock lock(m_Mutex);
99 while (m_Processing || !m_Tasks.empty())
100 m_CVStarved.wait(lock);
104 m_CVEmpty.notify_all();
107 m_Threads.join_all();
113 * Checks whether the calling thread is one of the worker threads
114 * for this work queue.
116 * @returns true if called from one of the worker threads, false otherwise
118 bool WorkQueue::IsWorkerThread(void) const
120 WorkQueue **pwq = l_ThreadWorkQueue.get();
128 void WorkQueue::SetExceptionCallback(const ExceptionCallback& callback)
130 m_ExceptionCallback = callback;
134 * Checks whether any exceptions have occurred while executing tasks for this
135 * work queue. When a custom exception callback is set this method will always
138 bool WorkQueue::HasExceptions(void) const
140 boost::mutex::scoped_lock lock(m_Mutex);
142 return !m_Exceptions.empty();
146 * Returns all exceptions which have occurred for tasks in this work queue. When a
147 * custom exception callback is set this method will always return an empty list.
149 std::vector<boost::exception_ptr> WorkQueue::GetExceptions(void) const
151 boost::mutex::scoped_lock lock(m_Mutex);
156 void WorkQueue::ReportExceptions(const String& facility) const
158 std::vector<boost::exception_ptr> exceptions = GetExceptions();
160 BOOST_FOREACH(const boost::exception_ptr& eptr, exceptions) {
161 Log(LogCritical, facility)
162 << DiagnosticInformation(eptr);
165 Log(LogCritical, facility)
166 << exceptions.size() << " error" << (exceptions.size() != 1 ? "s" : "");
169 size_t WorkQueue::GetLength(void) const
171 boost::mutex::scoped_lock lock(m_Mutex);
173 return m_Tasks.size();
176 void WorkQueue::StatusTimerHandler(void)
178 boost::mutex::scoped_lock lock(m_Mutex);
180 Log(LogNotice, "WorkQueue")
181 << "#" << m_ID << " tasks: " << m_Tasks.size();
184 void WorkQueue::WorkerThreadProc(void)
186 std::ostringstream idbuf;
187 idbuf << "WQ #" << m_ID;
188 Utility::SetThreadName(idbuf.str());
190 l_ThreadWorkQueue.reset(new WorkQueue *(this));
192 boost::mutex::scoped_lock lock(m_Mutex);
195 while (m_Tasks.empty() && !m_Stopped)
196 m_CVEmpty.wait(lock);
201 if (m_Tasks.size() >= m_MaxItems && m_MaxItems != 0)
202 m_CVFull.notify_all();
204 Task task = m_Tasks.top();
213 } catch (const std::exception&) {
216 if (!m_ExceptionCallback)
217 m_Exceptions.push_back(boost::current_exception());
221 if (m_ExceptionCallback)
222 m_ExceptionCallback(boost::current_exception());
225 /* clear the task so whatever other resources it holds are released
226 _before_ we re-acquire the mutex */
234 m_CVStarved.notify_all();