#include <sstream>
#include <boost/bind.hpp>
#include <boost/exception/diagnostic_information.hpp>
+#include <boost/foreach.hpp>
using namespace icinga;
EventQueue::EventQueue(void)
: m_Stopped(false)
{
- unsigned int threads = boost::thread::hardware_concurrency();
+ m_ThreadCount = boost::thread::hardware_concurrency();
- if (threads == 0)
- threads = 1;
+ if (m_ThreadCount == 0)
+ m_ThreadCount = 1;
- threads *= 8;
+ m_ThreadCount *= 8;
- for (unsigned int i = 0; i < threads; i++)
- m_Threads.create_thread(boost::bind(&EventQueue::QueueThreadProc, this));
+ m_ThreadCount = 128;
+
+ m_States = new ThreadState[m_ThreadCount];
+
+ for (int i = 0; i < m_ThreadCount; i++) {
+ m_States[i] = ThreadIdle;
+ m_Threads.create_thread(boost::bind(&EventQueue::QueueThreadProc, this, i));
+ }
boost::thread reportThread(boost::bind(&EventQueue::ReportThreadProc, this));
reportThread.detach();
/**
* Waits for events and processes them.
*/
-void EventQueue::QueueThreadProc(void)
+void EventQueue::QueueThreadProc(int tid)
{
for (;;) {
- Callback event;
+ EventQueueWorkItem event;
{
boost::mutex::scoped_lock lock(m_Mutex);
+ m_States[tid] = ThreadIdle;
+
while (m_Events.empty() && !m_Stopped)
m_CV.wait(lock);
if (m_Events.empty() && m_Stopped)
break;
- event = m_Events.top();
- m_Events.pop();
+ event = m_Events.front();
+ m_Events.pop_front();
+
+ m_States[tid] = ThreadBusy;
}
#ifdef _DEBUG
#endif /* _DEBUG */
try {
- event();
+ event.Callback();
} catch (const std::exception& ex) {
std::ostringstream msgbuf;
msgbuf << "Exception thrown in event handler: " << std::endl
*
* @param callback The callback function for the event.
*/
-void EventQueue::Post(const EventQueue::Callback& callback)
+void EventQueue::Post(const EventQueueCallback& callback)
{
boost::mutex::scoped_lock lock(m_Mutex);
- m_Events.push(callback);
+
+ EventQueueWorkItem event;
+ event.Callback = callback;
+ event.Timestamp = Utility::GetTime();
+
+ m_Events.push_back(event);
m_CV.notify_one();
}
for (;;) {
Utility::Sleep(5);
- int pending;
+ double now = Utility::GetTime();
+
+ int pending, busy;
+ double max_latency, avg_latency;
{
boost::mutex::scoped_lock lock(m_Mutex);
pending = m_Events.size();
+
+ busy = 0;
+
+ for (int i = 0; i < m_ThreadCount; i++) {
+ if (m_States[i] == ThreadBusy)
+ busy++;
+ }
+
+ max_latency = 0;
+ avg_latency = 0;
+
+ BOOST_FOREACH(const EventQueueWorkItem& event, m_Events) {
+ double latency = now - event.Timestamp;
+
+ avg_latency += latency;
+
+ if (latency > max_latency)
+ max_latency = latency;
+ }
+
+ avg_latency /= pending;
}
- Log(LogInformation, "base", "Pending tasks: " + Convert::ToString(pending));
+ Log(LogInformation, "base", "Pending tasks: " + Convert::ToString(pending) + "; Busy threads: " +
+ Convert::ToString(busy) + "; Idle threads: " + Convert::ToString(m_ThreadCount - busy) +
+ "; Maximum latency: " + Convert::ToString((long)max_latency * 1000) + "ms"
+ "; Average latency: " + Convert::ToString((long)avg_latency * 1000) + "ms");
}
}
using namespace icinga;
-boost::condition_variable Process::m_CV;
-int Process::m_TaskFd;
-Timer::Ptr Process::m_StatusTimer;
-
#ifndef __APPLE__
extern char **environ;
#else /* __APPLE__ */
#define environ (*_NSGetEnviron())
#endif /* __APPLE__ */
-void Process::Initialize(void)
+void Process::Run(void)
{
- int fds[2];
-
-#if HAVE_PIPE2
- if (pipe2(fds, O_CLOEXEC) < 0) {
- BOOST_THROW_EXCEPTION(posix_error()
- << boost::errinfo_api_function("pipe2")
- << boost::errinfo_errno(errno));
- }
-#else /* HAVE_PIPE2 */
- if (pipe(fds) < 0) {
- BOOST_THROW_EXCEPTION(posix_error()
- << boost::errinfo_api_function("pipe")
- << boost::errinfo_errno(errno));
- }
-
- /* Don't bother setting fds[0] to clo-exec as we'll only
- * use it in the following dup() call. */
-
- Utility::SetCloExec(fds[1]);
-#endif /* HAVE_PIPE2 */
-
- m_TaskFd = fds[1];
-
- unsigned int threads = boost::thread::hardware_concurrency();
-
- if (threads == 0)
- threads = 2;
-
- for (unsigned int i = 0; i < threads; i++) {
- int childTaskFd = dup(fds[0]);
+ ProcessResult result;
- if (childTaskFd < 0) {
- BOOST_THROW_EXCEPTION(posix_error()
- << boost::errinfo_api_function("dup")
- << boost::errinfo_errno(errno));
- }
-
- Utility::SetNonBlocking(childTaskFd);
- Utility::SetCloExec(childTaskFd);
-
- boost::thread t(&Process::WorkerThreadProc, childTaskFd);
- t.detach();
- }
-
- (void) close(fds[0]);
-
- m_StatusTimer = boost::make_shared<Timer>();
- m_StatusTimer->OnTimerExpired.connect(boost::bind(&Process::StatusTimerHandler));
- m_StatusTimer->SetInterval(5);
- m_StatusTimer->Start();
-}
-
-void Process::WorkerThreadProc(int taskFd)
-{
- std::map<int, Process::Ptr> tasks;
- pollfd *pfds = NULL;
-
- for (;;) {
- std::map<int, Process::Ptr>::iterator it, prev;
-
- pfds = (pollfd *)realloc(pfds, (1 + tasks.size()) * sizeof(pollfd));
-
- if (pfds == NULL) {
- BOOST_THROW_EXCEPTION(posix_error()
- << boost::errinfo_api_function("realloc")
- << boost::errinfo_errno(errno));
- }
-
- int idx = 0;
-
- int fd;
- BOOST_FOREACH(boost::tie(fd, boost::tuples::ignore), tasks) {
- pfds[idx].fd = fd;
- pfds[idx].events = POLLIN | POLLHUP;
- idx++;
- }
-
- if (tasks.size() < MaxTasksPerThread) {
- pfds[idx].fd = taskFd;
- pfds[idx].events = POLLIN;
- idx++;
- }
-
- int rc = poll(pfds, idx, -1);
-
- if (rc < 0 && errno != EINTR) {
- BOOST_THROW_EXCEPTION(posix_error()
- << boost::errinfo_api_function("poll")
- << boost::errinfo_errno(errno));
- }
-
- if (rc == 0)
- continue;
-
- for (int i = 0; i < idx; i++) {
- if ((pfds[i].revents & (POLLIN|POLLHUP)) == 0)
- continue;
-
- if (pfds[i].fd == taskFd) {
- std::vector<Process::Ptr> new_tasks;
-
- unsigned int want = MaxTasksPerThread - tasks.size();
-
- if (want > 0) {
- boost::mutex::scoped_lock lock(m_Mutex);
-
- /* Read one byte for every task we take from the pending tasks list. */
- char buffer[MaxTasksPerThread];
-
- ASSERT(want <= sizeof(buffer));
-
- int have = read(taskFd, &buffer, want);
-
- if (have < 0) {
- if (errno == EAGAIN)
- break; /* Someone else was faster and took our task. */
-
- BOOST_THROW_EXCEPTION(posix_error()
- << boost::errinfo_api_function("read")
- << boost::errinfo_errno(errno));
- }
-
- while (have > 0) {
- ASSERT(!m_Tasks.empty());
-
- Process::Ptr task = m_Tasks.front();
- m_Tasks.pop_front();
-
- new_tasks.push_back(task);
-
- have--;
- }
-
- m_CV.notify_all();
- }
-
- BOOST_FOREACH(const Process::Ptr& task, new_tasks) {
- try {
- task->InitTask();
-
- int fd = task->m_FD;
-
- if (fd >= 0)
- tasks[fd] = task;
- } catch (...) {
- task->FinishException(boost::current_exception());
- }
- }
-
- continue;
- }
-
- it = tasks.find(pfds[i].fd);
-
- if (it == tasks.end())
- continue;
-
- Process::Ptr task = it->second;
-
- if (!task->RunTask()) {
- prev = it;
- tasks.erase(prev);
-
- task->FinishResult(task->m_Result);
- }
- }
- }
-}
-
-void Process::QueueTask(void)
-{
- {
- boost::mutex::scoped_lock lock(m_Mutex);
-
- while (m_Tasks.size() >= PIPE_BUF)
- m_CV.wait(lock);
-
- m_Tasks.push_back(GetSelf());
-
- /**
- * This little gem which is commonly known as the "self-pipe trick"
- * takes care of waking up the select() call in the worker thread.
- */
- if (write(m_TaskFd, "T", 1) < 0) {
- BOOST_THROW_EXCEPTION(posix_error()
- << boost::errinfo_api_function("write")
- << boost::errinfo_errno(errno));
- }
- }
-}
-
-void Process::InitTask(void)
-{
- m_Result.ExecutionStart = Utility::GetTime();
+ result.ExecutionStart = Utility::GetTime();
ASSERT(m_FD == -1);
<< boost::errinfo_errno(errno));
}
- Utility::SetNonBlocking(fds[0]);
Utility::SetCloExec(fds[0]);
-
- Utility::SetNonBlocking(fds[1]);
Utility::SetCloExec(fds[1]);
#endif /* HAVE_PIPE2 */
delete [] envp;
- m_FD = fds[0];
+ int fd = fds[0];
(void) close(fds[1]);
-}
-bool Process::RunTask(void)
-{
- char buffer[512];
+ char buffer[512];
int rc;
+ std::ostringstream outputStream;
+
do {
- rc = read(m_FD, buffer, sizeof(buffer));
+ rc = read(fd, buffer, sizeof(buffer));
if (rc > 0) {
- m_OutputStream.write(buffer, rc);
+ outputStream.write(buffer, rc);
}
} while (rc > 0);
- if (rc < 0 && errno == EAGAIN)
- return true;
-
- String output = m_OutputStream.str();
+ String output = outputStream.str();
int status, exitcode;
- (void) close(m_FD);
+ (void) close(fd);
if (waitpid(m_Pid, &status, 0) != m_Pid) {
BOOST_THROW_EXCEPTION(posix_error()
exitcode = 128;
}
- m_Result.ExecutionEnd = Utility::GetTime();
- m_Result.ExitStatus = exitcode;
- m_Result.Output = output;
+ result.ExecutionEnd = Utility::GetTime();
+ result.ExitStatus = exitcode;
+ result.Output = output;
- return false;
-}
-
-void Process::StatusTimerHandler(void)
-{
- boost::mutex::scoped_lock lock(m_Mutex);
- if (m_Tasks.size() > 50)
- Log(LogCritical, "base", "More than 50 waiting Process tasks: " +
- Convert::ToString(m_Tasks.size()));
+ FinishResult(result);
}
#endif /* _WIN32 */