]> granicus.if.org Git - icinga2/blob - lib/base/workqueue.cpp
Merge pull request #7185 from Icinga/bugfix/gelfwriter-wrong-log-facility
[icinga2] / lib / base / workqueue.cpp
1 /* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */
2
3 #include "base/workqueue.hpp"
4 #include "base/utility.hpp"
5 #include "base/logger.hpp"
6 #include "base/convert.hpp"
7 #include "base/application.hpp"
8 #include "base/exception.hpp"
9 #include <boost/thread/tss.hpp>
10 #include <math.h>
11
12 using namespace icinga;
13
14 std::atomic<int> WorkQueue::m_NextID(1);
15 boost::thread_specific_ptr<WorkQueue *> l_ThreadWorkQueue;
16
17 WorkQueue::WorkQueue(size_t maxItems, int threadCount)
18         : m_ID(m_NextID++), m_ThreadCount(threadCount), m_MaxItems(maxItems),
19         m_TaskStats(15 * 60)
20 {
21         /* Initialize logger. */
22         m_StatusTimerTimeout = Utility::GetTime();
23
24         m_StatusTimer = new Timer();
25         m_StatusTimer->SetInterval(10);
26         m_StatusTimer->OnTimerExpired.connect(std::bind(&WorkQueue::StatusTimerHandler, this));
27         m_StatusTimer->Start();
28 }
29
30 WorkQueue::~WorkQueue()
31 {
32         m_StatusTimer->Stop(true);
33
34         Join(true);
35 }
36
37 void WorkQueue::SetName(const String& name)
38 {
39         m_Name = name;
40 }
41
42 String WorkQueue::GetName() const
43 {
44         return m_Name;
45 }
46
47 boost::mutex::scoped_lock WorkQueue::AcquireLock()
48 {
49         return boost::mutex::scoped_lock(m_Mutex);
50 }
51
52 /**
53  * Enqueues a task. Tasks are guaranteed to be executed in the order
54  * they were enqueued in except if there is more than one worker thread.
55  */
56 void WorkQueue::EnqueueUnlocked(boost::mutex::scoped_lock& lock, std::function<void ()>&& function, WorkQueuePriority priority)
57 {
58         if (!m_Spawned) {
59                 Log(LogNotice, "WorkQueue")
60                         << "Spawning WorkQueue threads for '" << m_Name << "'";
61
62                 for (int i = 0; i < m_ThreadCount; i++) {
63                         m_Threads.create_thread(std::bind(&WorkQueue::WorkerThreadProc, this));
64                 }
65
66                 m_Spawned = true;
67         }
68
69         bool wq_thread = IsWorkerThread();
70
71         if (!wq_thread) {
72                 while (m_Tasks.size() >= m_MaxItems && m_MaxItems != 0)
73                         m_CVFull.wait(lock);
74         }
75
76         m_Tasks.emplace(std::move(function), priority, ++m_NextTaskID);
77
78         m_CVEmpty.notify_one();
79 }
80
81 /**
82  * Enqueues a task. Tasks are guaranteed to be executed in the order
83  * they were enqueued in except if there is more than one worker thread or when
84  * allowInterleaved is true in which case the new task might be run
85  * immediately if it's being enqueued from within the WorkQueue thread.
86  */
87 void WorkQueue::Enqueue(std::function<void ()>&& function, WorkQueuePriority priority,
88         bool allowInterleaved)
89 {
90         bool wq_thread = IsWorkerThread();
91
92         if (wq_thread && allowInterleaved) {
93                 function();
94
95                 return;
96         }
97
98         auto lock = AcquireLock();
99         EnqueueUnlocked(lock, std::move(function), priority);
100 }
101
102 /**
103  * Waits until all currently enqueued tasks have completed. This only works reliably
104  * when no other thread is enqueuing new tasks when this method is called.
105  *
106  * @param stop Whether to stop the worker threads
107  */
108 void WorkQueue::Join(bool stop)
109 {
110         boost::mutex::scoped_lock lock(m_Mutex);
111
112         while (m_Processing || !m_Tasks.empty())
113                 m_CVStarved.wait(lock);
114
115         if (stop) {
116                 m_Stopped = true;
117                 m_CVEmpty.notify_all();
118                 lock.unlock();
119
120                 m_Threads.join_all();
121                 m_Spawned = false;
122
123                 Log(LogNotice, "WorkQueue")
124                         << "Stopped WorkQueue threads for '" << m_Name << "'";
125         }
126 }
127
128 /**
129  * Checks whether the calling thread is one of the worker threads
130  * for this work queue.
131  *
132  * @returns true if called from one of the worker threads, false otherwise
133  */
134 bool WorkQueue::IsWorkerThread() const
135 {
136         WorkQueue **pwq = l_ThreadWorkQueue.get();
137
138         if (!pwq)
139                 return false;
140
141         return *pwq == this;
142 }
143
144 void WorkQueue::SetExceptionCallback(const ExceptionCallback& callback)
145 {
146         m_ExceptionCallback = callback;
147 }
148
149 /**
150  * Checks whether any exceptions have occurred while executing tasks for this
151  * work queue. When a custom exception callback is set this method will always
152  * return false.
153  */
154 bool WorkQueue::HasExceptions() const
155 {
156         boost::mutex::scoped_lock lock(m_Mutex);
157
158         return !m_Exceptions.empty();
159 }
160
161 /**
162  * Returns all exceptions which have occurred for tasks in this work queue. When a
163  * custom exception callback is set this method will always return an empty list.
164  */
165 std::vector<boost::exception_ptr> WorkQueue::GetExceptions() const
166 {
167         boost::mutex::scoped_lock lock(m_Mutex);
168
169         return m_Exceptions;
170 }
171
172 void WorkQueue::ReportExceptions(const String& facility) const
173 {
174         std::vector<boost::exception_ptr> exceptions = GetExceptions();
175
176         for (const auto& eptr : exceptions) {
177                 Log(LogCritical, facility)
178                         << DiagnosticInformation(eptr);
179         }
180
181         Log(LogCritical, facility)
182                 << exceptions.size() << " error" << (exceptions.size() != 1 ? "s" : "");
183 }
184
185 size_t WorkQueue::GetLength() const
186 {
187         boost::mutex::scoped_lock lock(m_Mutex);
188
189         return m_Tasks.size();
190 }
191
192 void WorkQueue::StatusTimerHandler()
193 {
194         boost::mutex::scoped_lock lock(m_Mutex);
195
196         ASSERT(!m_Name.IsEmpty());
197
198         size_t pending = m_Tasks.size();
199
200         double now = Utility::GetTime();
201         double gradient = (pending - m_PendingTasks) / (now - m_PendingTasksTimestamp);
202         double timeToZero = pending / gradient;
203
204         String timeInfo;
205
206         if (pending > GetTaskCount(5)) {
207                 timeInfo = " empty in ";
208                 if (timeToZero < 0 || std::isinf(timeToZero))
209                         timeInfo += "infinite time, your task handler isn't able to keep up";
210                 else
211                         timeInfo += Utility::FormatDuration(timeToZero);
212         }
213
214         m_PendingTasks = pending;
215         m_PendingTasksTimestamp = now;
216
217         /* Log if there are pending items, or 5 minute timeout is reached. */
218         if (pending > 0 || m_StatusTimerTimeout < now) {
219                 Log(LogInformation, "WorkQueue")
220                         << "#" << m_ID << " (" << m_Name << ") "
221                         << "items: " << pending << ", "
222                         << "rate: " << std::setw(2) << GetTaskCount(60) / 60.0 << "/s "
223                         << "(" << GetTaskCount(60) << "/min " << GetTaskCount(60 * 5) << "/5min " << GetTaskCount(60 * 15) << "/15min);"
224                         << timeInfo;
225         }
226
227         /* Reschedule next log entry in 5 minutes. */
228         if (m_StatusTimerTimeout < now) {
229                 m_StatusTimerTimeout = now + 60 * 5;
230         }
231 }
232
233 void WorkQueue::RunTaskFunction(const TaskFunction& func)
234 {
235         try {
236                 func();
237         } catch (const std::exception&) {
238                 boost::exception_ptr eptr = boost::current_exception();
239
240                 {
241                         boost::mutex::scoped_lock mutex(m_Mutex);
242
243                         if (!m_ExceptionCallback)
244                                 m_Exceptions.push_back(eptr);
245                 }
246
247                 if (m_ExceptionCallback)
248                         m_ExceptionCallback(eptr);
249         }
250 }
251
252 void WorkQueue::WorkerThreadProc()
253 {
254         std::ostringstream idbuf;
255         idbuf << "WQ #" << m_ID;
256         Utility::SetThreadName(idbuf.str());
257
258         l_ThreadWorkQueue.reset(new WorkQueue *(this));
259
260         boost::mutex::scoped_lock lock(m_Mutex);
261
262         for (;;) {
263                 while (m_Tasks.empty() && !m_Stopped)
264                         m_CVEmpty.wait(lock);
265
266                 if (m_Stopped)
267                         break;
268
269                 if (m_Tasks.size() >= m_MaxItems && m_MaxItems != 0)
270                         m_CVFull.notify_all();
271
272                 Task task = m_Tasks.top();
273                 m_Tasks.pop();
274
275                 m_Processing++;
276
277                 lock.unlock();
278
279                 RunTaskFunction(task.Function);
280
281                 /* clear the task so whatever other resources it holds are released _before_ we re-acquire the mutex */
282                 task = Task();
283
284                 IncreaseTaskCount();
285
286                 lock.lock();
287
288                 m_Processing--;
289
290                 if (m_Tasks.empty())
291                         m_CVStarved.notify_all();
292         }
293 }
294
295 void WorkQueue::IncreaseTaskCount()
296 {
297         m_TaskStats.InsertValue(Utility::GetTime(), 1);
298 }
299
300 size_t WorkQueue::GetTaskCount(RingBuffer::SizeType span)
301 {
302         return m_TaskStats.UpdateAndGetValues(Utility::GetTime(), span);
303 }
304
305 bool icinga::operator<(const Task& a, const Task& b)
306 {
307         if (a.Priority < b.Priority)
308                 return true;
309
310         if (a.Priority == b.Priority) {
311                 if (a.ID > b.ID)
312                         return true;
313                 else
314                         return false;
315         }
316
317         return false;
318 }