]> granicus.if.org Git - icinga2/blob - lib/base/workqueue.cpp
Update copyright headers for 2016
[icinga2] / lib / base / workqueue.cpp
1 /******************************************************************************
2  * Icinga 2                                                                   *
3  * Copyright (C) 2012-2016 Icinga Development Team (https://www.icinga.org/)  *
4  *                                                                            *
5  * This program is free software; you can redistribute it and/or              *
6  * modify it under the terms of the GNU General Public License                *
7  * as published by the Free Software Foundation; either version 2             *
8  * of the License, or (at your option) any later version.                     *
9  *                                                                            *
10  * This program is distributed in the hope that it will be useful,            *
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of             *
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the              *
13  * GNU General Public License for more details.                               *
14  *                                                                            *
15  * You should have received a copy of the GNU General Public License          *
16  * along with this program; if not, write to the Free Software Foundation     *
17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA.             *
18  ******************************************************************************/
19
20 #include "base/workqueue.hpp"
21 #include "base/utility.hpp"
22 #include "base/logger.hpp"
23 #include "base/convert.hpp"
24 #include "base/application.hpp"
25 #include "base/exception.hpp"
26 #include <boost/bind.hpp>
27 #include <boost/foreach.hpp>
28 #include <boost/thread/tss.hpp>
29
30 using namespace icinga;
31
32 int WorkQueue::m_NextID = 1;
33 boost::thread_specific_ptr<WorkQueue *> l_ThreadWorkQueue;
34
35 WorkQueue::WorkQueue(size_t maxItems, int threadCount)
36         : m_ID(m_NextID++), m_ThreadCount(threadCount), m_Spawned(false), m_MaxItems(maxItems), m_Stopped(false),
37           m_Processing(0)
38 {
39         m_StatusTimer = new Timer();
40         m_StatusTimer->SetInterval(10);
41         m_StatusTimer->OnTimerExpired.connect(boost::bind(&WorkQueue::StatusTimerHandler, this));
42         m_StatusTimer->Start();
43 }
44
45 WorkQueue::~WorkQueue(void)
46 {
47         m_StatusTimer->Stop(true);
48
49         Join(true);
50 }
51
52 /**
53  * Enqueues a task. Tasks are guaranteed to be executed in the order
54  * they were enqueued in except if there is more than one worker thread or when
55  * allowInterleaved is true in which case the new task might be run
56  * immediately if it's being enqueued from within the WorkQueue thread.
57  */
58 void WorkQueue::Enqueue(const boost::function<void (void)>& function, WorkQueuePriority priority,
59     bool allowInterleaved)
60 {
61         bool wq_thread = IsWorkerThread();
62
63         if (wq_thread && allowInterleaved) {
64                 function();
65
66                 return;
67         }
68
69         boost::mutex::scoped_lock lock(m_Mutex);
70
71         if (!m_Spawned) {
72                 for (int i = 0; i < m_ThreadCount; i++) {
73                         m_Threads.create_thread(boost::bind(&WorkQueue::WorkerThreadProc, this));
74                 }
75
76                 m_Spawned = true;
77         }
78
79         if (!wq_thread) {
80                 while (m_Tasks.size() >= m_MaxItems && m_MaxItems != 0)
81                         m_CVFull.wait(lock);
82         }
83
84         m_Tasks.push(Task(function, priority, ++m_NextTaskID));
85
86         m_CVEmpty.notify_one();
87 }
88
89 /**
90  * Waits until all currently enqueued tasks have completed. This only works reliably
91  * when no other thread is enqueuing new tasks when this method is called.
92  *
93  * @param stop Whether to stop the worker threads
94  */
95 void WorkQueue::Join(bool stop)
96 {
97         boost::mutex::scoped_lock lock(m_Mutex);
98
99         while (m_Processing || !m_Tasks.empty())
100                 m_CVStarved.wait(lock);
101
102         if (stop) {
103                 m_Stopped = true;
104                 m_CVEmpty.notify_all();
105                 lock.unlock();
106
107                 m_Threads.join_all();
108                 m_Spawned = false;
109         }
110 }
111
112 /**
113  * Checks whether the calling thread is one of the worker threads
114  * for this work queue.
115  *
116  * @returns true if called from one of the worker threads, false otherwise
117  */
118 bool WorkQueue::IsWorkerThread(void) const
119 {
120         WorkQueue **pwq = l_ThreadWorkQueue.get();
121
122         if (!pwq)
123                 return false;
124
125         return *pwq == this;
126 }
127
128 void WorkQueue::SetExceptionCallback(const ExceptionCallback& callback)
129 {
130         m_ExceptionCallback = callback;
131 }
132
133 /**
134  * Checks whether any exceptions have occurred while executing tasks for this
135  * work queue. When a custom exception callback is set this method will always
136  * return false.
137  */
138 bool WorkQueue::HasExceptions(void) const
139 {
140         boost::mutex::scoped_lock lock(m_Mutex);
141  
142         return !m_Exceptions.empty();
143 }
144
145 /**
146  * Returns all exceptions which have occurred for tasks in this work queue. When a
147  * custom exception callback is set this method will always return an empty list.
148  */
149 std::vector<boost::exception_ptr> WorkQueue::GetExceptions(void) const
150 {
151         boost::mutex::scoped_lock lock(m_Mutex);
152  
153         return m_Exceptions;
154 }
155
156 void WorkQueue::ReportExceptions(const String& facility) const
157 {
158         std::vector<boost::exception_ptr> exceptions = GetExceptions();
159
160         BOOST_FOREACH(const boost::exception_ptr& eptr, exceptions) {
161                 Log(LogCritical, facility)
162                     << DiagnosticInformation(eptr);
163         }
164
165         Log(LogCritical, facility)
166             << exceptions.size() << " error" << (exceptions.size() != 1 ? "s" : "");
167 }
168
169 size_t WorkQueue::GetLength(void) const
170 {
171         boost::mutex::scoped_lock lock(m_Mutex);
172
173         return m_Tasks.size();
174 }
175
176 void WorkQueue::StatusTimerHandler(void)
177 {
178         boost::mutex::scoped_lock lock(m_Mutex);
179
180         Log(LogNotice, "WorkQueue")
181             << "#" << m_ID << " tasks: " << m_Tasks.size();
182 }
183
184 void WorkQueue::WorkerThreadProc(void)
185 {
186         std::ostringstream idbuf;
187         idbuf << "WQ #" << m_ID;
188         Utility::SetThreadName(idbuf.str());
189
190         l_ThreadWorkQueue.reset(new WorkQueue *(this));
191
192         boost::mutex::scoped_lock lock(m_Mutex);
193
194         for (;;) {
195                 while (m_Tasks.empty() && !m_Stopped)
196                         m_CVEmpty.wait(lock);
197
198                 if (m_Stopped)
199                         break;
200
201                 if (m_Tasks.size() >= m_MaxItems && m_MaxItems != 0)
202                         m_CVFull.notify_all();
203
204                 Task task = m_Tasks.top();
205                 m_Tasks.pop();
206
207                 m_Processing++;
208
209                 lock.unlock();
210
211                 try {
212                         task.Function();
213                 } catch (const std::exception&) {
214                         lock.lock();
215
216                         if (!m_ExceptionCallback)
217                                 m_Exceptions.push_back(boost::current_exception());
218
219                         lock.unlock();
220
221                         if (m_ExceptionCallback)
222                                 m_ExceptionCallback(boost::current_exception());
223                 }
224
225                 /* clear the task so whatever other resources it holds are released
226                    _before_ we re-acquire the mutex */
227                 task = Task();
228
229                 lock.lock();
230
231                 m_Processing--;
232
233                 if (m_Tasks.empty())
234                         m_CVStarved.notify_all();
235         }
236 }
237