From: Gunnar Beutner Date: Fri, 22 Mar 2013 23:26:56 +0000 (+0100) Subject: Process class: Use the global EventQueue instead of a custom queue. X-Git-Tag: v0.0.2~188 X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=27ceabf3c085b22336ff521aeb69c099983f9440;p=icinga2 Process class: Use the global EventQueue instead of a custom queue. --- diff --git a/lib/base/eventqueue.cpp b/lib/base/eventqueue.cpp index 3c6a065c7..08349453e 100644 --- a/lib/base/eventqueue.cpp +++ b/lib/base/eventqueue.cpp @@ -24,21 +24,28 @@ #include #include #include +#include using namespace icinga; EventQueue::EventQueue(void) : m_Stopped(false) { - unsigned int threads = boost::thread::hardware_concurrency(); + m_ThreadCount = boost::thread::hardware_concurrency(); - if (threads == 0) - threads = 1; + if (m_ThreadCount == 0) + m_ThreadCount = 1; - threads *= 8; + m_ThreadCount *= 8; - for (unsigned int i = 0; i < threads; i++) - m_Threads.create_thread(boost::bind(&EventQueue::QueueThreadProc, this)); + m_ThreadCount = 128; + + m_States = new ThreadState[m_ThreadCount]; + + for (int i = 0; i < m_ThreadCount; i++) { + m_States[i] = ThreadIdle; + m_Threads.create_thread(boost::bind(&EventQueue::QueueThreadProc, this, i)); + } boost::thread reportThread(boost::bind(&EventQueue::ReportThreadProc, this)); reportThread.detach(); @@ -68,22 +75,26 @@ void EventQueue::Join(void) /** * Waits for events and processes them. */ -void EventQueue::QueueThreadProc(void) +void EventQueue::QueueThreadProc(int tid) { for (;;) { - Callback event; + EventQueueWorkItem event; { boost::mutex::scoped_lock lock(m_Mutex); + m_States[tid] = ThreadIdle; + while (m_Events.empty() && !m_Stopped) m_CV.wait(lock); if (m_Events.empty() && m_Stopped) break; - event = m_Events.top(); - m_Events.pop(); + event = m_Events.front(); + m_Events.pop_front(); + + m_States[tid] = ThreadBusy; } #ifdef _DEBUG @@ -97,7 +108,7 @@ void EventQueue::QueueThreadProc(void) #endif /* _DEBUG */ try { - event(); + event.Callback(); } catch (const std::exception& ex) { std::ostringstream msgbuf; msgbuf << "Exception thrown in event handler: " << std::endl @@ -146,10 +157,15 @@ void EventQueue::QueueThreadProc(void) * * @param callback The callback function for the event. */ -void EventQueue::Post(const EventQueue::Callback& callback) +void EventQueue::Post(const EventQueueCallback& callback) { boost::mutex::scoped_lock lock(m_Mutex); - m_Events.push(callback); + + EventQueueWorkItem event; + event.Callback = callback; + event.Timestamp = Utility::GetTime(); + + m_Events.push_back(event); m_CV.notify_one(); } @@ -158,13 +174,40 @@ void EventQueue::ReportThreadProc(void) for (;;) { Utility::Sleep(5); - int pending; + double now = Utility::GetTime(); + + int pending, busy; + double max_latency, avg_latency; { boost::mutex::scoped_lock lock(m_Mutex); pending = m_Events.size(); + + busy = 0; + + for (int i = 0; i < m_ThreadCount; i++) { + if (m_States[i] == ThreadBusy) + busy++; + } + + max_latency = 0; + avg_latency = 0; + + BOOST_FOREACH(const EventQueueWorkItem& event, m_Events) { + double latency = now - event.Timestamp; + + avg_latency += latency; + + if (latency > max_latency) + max_latency = latency; + } + + avg_latency /= pending; } - Log(LogInformation, "base", "Pending tasks: " + Convert::ToString(pending)); + Log(LogInformation, "base", "Pending tasks: " + Convert::ToString(pending) + "; Busy threads: " + + Convert::ToString(busy) + "; Idle threads: " + Convert::ToString(m_ThreadCount - busy) + + "; Maximum latency: " + Convert::ToString((long)max_latency * 1000) + "ms" + "; Average latency: " + Convert::ToString((long)avg_latency * 1000) + "ms"); } } diff --git a/lib/base/eventqueue.h b/lib/base/eventqueue.h index 944a0870c..2b3187948 100644 --- a/lib/base/eventqueue.h +++ b/lib/base/eventqueue.h @@ -30,6 +30,20 @@ namespace icinga { +enum ThreadState +{ + ThreadIdle, + ThreadBusy +}; + +typedef boost::function EventQueueCallback; + +struct EventQueueWorkItem +{ + EventQueueCallback Callback; + double Timestamp; +}; + /** * An event queue. * @@ -38,26 +52,26 @@ namespace icinga class I2_BASE_API EventQueue { public: - typedef boost::function Callback; - EventQueue(void); ~EventQueue(void); void Stop(void); void Join(void); - void Post(const Callback& callback); + void Post(const EventQueueCallback& callback); private: boost::thread_group m_Threads; + ThreadState *m_States; + int m_ThreadCount; boost::mutex m_Mutex; boost::condition_variable m_CV; bool m_Stopped; - std::stack m_Events; + std::deque m_Events; - void QueueThreadProc(void); + void QueueThreadProc(int tid); void ReportThreadProc(void); }; diff --git a/lib/base/process-unix.cpp b/lib/base/process-unix.cpp index ba0325f04..6ae2d1566 100644 --- a/lib/base/process-unix.cpp +++ b/lib/base/process-unix.cpp @@ -34,10 +34,6 @@ using namespace icinga; -boost::condition_variable Process::m_CV; -int Process::m_TaskFd; -Timer::Ptr Process::m_StatusTimer; - #ifndef __APPLE__ extern char **environ; #else /* __APPLE__ */ @@ -45,202 +41,11 @@ extern char **environ; #define environ (*_NSGetEnviron()) #endif /* __APPLE__ */ -void Process::Initialize(void) +void Process::Run(void) { - int fds[2]; - -#if HAVE_PIPE2 - if (pipe2(fds, O_CLOEXEC) < 0) { - BOOST_THROW_EXCEPTION(posix_error() - << boost::errinfo_api_function("pipe2") - << boost::errinfo_errno(errno)); - } -#else /* HAVE_PIPE2 */ - if (pipe(fds) < 0) { - BOOST_THROW_EXCEPTION(posix_error() - << boost::errinfo_api_function("pipe") - << boost::errinfo_errno(errno)); - } - - /* Don't bother setting fds[0] to clo-exec as we'll only - * use it in the following dup() call. */ - - Utility::SetCloExec(fds[1]); -#endif /* HAVE_PIPE2 */ - - m_TaskFd = fds[1]; - - unsigned int threads = boost::thread::hardware_concurrency(); - - if (threads == 0) - threads = 2; - - for (unsigned int i = 0; i < threads; i++) { - int childTaskFd = dup(fds[0]); + ProcessResult result; - if (childTaskFd < 0) { - BOOST_THROW_EXCEPTION(posix_error() - << boost::errinfo_api_function("dup") - << boost::errinfo_errno(errno)); - } - - Utility::SetNonBlocking(childTaskFd); - Utility::SetCloExec(childTaskFd); - - boost::thread t(&Process::WorkerThreadProc, childTaskFd); - t.detach(); - } - - (void) close(fds[0]); - - m_StatusTimer = boost::make_shared(); - m_StatusTimer->OnTimerExpired.connect(boost::bind(&Process::StatusTimerHandler)); - m_StatusTimer->SetInterval(5); - m_StatusTimer->Start(); -} - -void Process::WorkerThreadProc(int taskFd) -{ - std::map tasks; - pollfd *pfds = NULL; - - for (;;) { - std::map::iterator it, prev; - - pfds = (pollfd *)realloc(pfds, (1 + tasks.size()) * sizeof(pollfd)); - - if (pfds == NULL) { - BOOST_THROW_EXCEPTION(posix_error() - << boost::errinfo_api_function("realloc") - << boost::errinfo_errno(errno)); - } - - int idx = 0; - - int fd; - BOOST_FOREACH(boost::tie(fd, boost::tuples::ignore), tasks) { - pfds[idx].fd = fd; - pfds[idx].events = POLLIN | POLLHUP; - idx++; - } - - if (tasks.size() < MaxTasksPerThread) { - pfds[idx].fd = taskFd; - pfds[idx].events = POLLIN; - idx++; - } - - int rc = poll(pfds, idx, -1); - - if (rc < 0 && errno != EINTR) { - BOOST_THROW_EXCEPTION(posix_error() - << boost::errinfo_api_function("poll") - << boost::errinfo_errno(errno)); - } - - if (rc == 0) - continue; - - for (int i = 0; i < idx; i++) { - if ((pfds[i].revents & (POLLIN|POLLHUP)) == 0) - continue; - - if (pfds[i].fd == taskFd) { - std::vector new_tasks; - - unsigned int want = MaxTasksPerThread - tasks.size(); - - if (want > 0) { - boost::mutex::scoped_lock lock(m_Mutex); - - /* Read one byte for every task we take from the pending tasks list. */ - char buffer[MaxTasksPerThread]; - - ASSERT(want <= sizeof(buffer)); - - int have = read(taskFd, &buffer, want); - - if (have < 0) { - if (errno == EAGAIN) - break; /* Someone else was faster and took our task. */ - - BOOST_THROW_EXCEPTION(posix_error() - << boost::errinfo_api_function("read") - << boost::errinfo_errno(errno)); - } - - while (have > 0) { - ASSERT(!m_Tasks.empty()); - - Process::Ptr task = m_Tasks.front(); - m_Tasks.pop_front(); - - new_tasks.push_back(task); - - have--; - } - - m_CV.notify_all(); - } - - BOOST_FOREACH(const Process::Ptr& task, new_tasks) { - try { - task->InitTask(); - - int fd = task->m_FD; - - if (fd >= 0) - tasks[fd] = task; - } catch (...) { - task->FinishException(boost::current_exception()); - } - } - - continue; - } - - it = tasks.find(pfds[i].fd); - - if (it == tasks.end()) - continue; - - Process::Ptr task = it->second; - - if (!task->RunTask()) { - prev = it; - tasks.erase(prev); - - task->FinishResult(task->m_Result); - } - } - } -} - -void Process::QueueTask(void) -{ - { - boost::mutex::scoped_lock lock(m_Mutex); - - while (m_Tasks.size() >= PIPE_BUF) - m_CV.wait(lock); - - m_Tasks.push_back(GetSelf()); - - /** - * This little gem which is commonly known as the "self-pipe trick" - * takes care of waking up the select() call in the worker thread. - */ - if (write(m_TaskFd, "T", 1) < 0) { - BOOST_THROW_EXCEPTION(posix_error() - << boost::errinfo_api_function("write") - << boost::errinfo_errno(errno)); - } - } -} - -void Process::InitTask(void) -{ - m_Result.ExecutionStart = Utility::GetTime(); + result.ExecutionStart = Utility::GetTime(); ASSERT(m_FD == -1); @@ -259,10 +64,7 @@ void Process::InitTask(void) << boost::errinfo_errno(errno)); } - Utility::SetNonBlocking(fds[0]); Utility::SetCloExec(fds[0]); - - Utility::SetNonBlocking(fds[1]); Utility::SetCloExec(fds[1]); #endif /* HAVE_PIPE2 */ @@ -350,31 +152,27 @@ void Process::InitTask(void) delete [] envp; - m_FD = fds[0]; + int fd = fds[0]; (void) close(fds[1]); -} -bool Process::RunTask(void) -{ - char buffer[512]; + char buffer[512]; int rc; + std::ostringstream outputStream; + do { - rc = read(m_FD, buffer, sizeof(buffer)); + rc = read(fd, buffer, sizeof(buffer)); if (rc > 0) { - m_OutputStream.write(buffer, rc); + outputStream.write(buffer, rc); } } while (rc > 0); - if (rc < 0 && errno == EAGAIN) - return true; - - String output = m_OutputStream.str(); + String output = outputStream.str(); int status, exitcode; - (void) close(m_FD); + (void) close(fd); if (waitpid(m_Pid, &status, 0) != m_Pid) { BOOST_THROW_EXCEPTION(posix_error() @@ -393,19 +191,11 @@ bool Process::RunTask(void) exitcode = 128; } - m_Result.ExecutionEnd = Utility::GetTime(); - m_Result.ExitStatus = exitcode; - m_Result.Output = output; + result.ExecutionEnd = Utility::GetTime(); + result.ExitStatus = exitcode; + result.Output = output; - return false; -} - -void Process::StatusTimerHandler(void) -{ - boost::mutex::scoped_lock lock(m_Mutex); - if (m_Tasks.size() > 50) - Log(LogCritical, "base", "More than 50 waiting Process tasks: " + - Convert::ToString(m_Tasks.size())); + FinishResult(result); } #endif /* _WIN32 */ diff --git a/lib/base/process-windows.cpp b/lib/base/process-windows.cpp index 3ca4bb073..6d7ef6b9f 100644 --- a/lib/base/process-windows.cpp +++ b/lib/base/process-windows.cpp @@ -22,30 +22,9 @@ #ifdef _WIN32 using namespace icinga; -void Process::Initialize(void) +void Process::Run(void) { // TODO: implement } -void Process::WorkerThreadProc(void) -{ - // TODO: implement -} - -void Process::QueueTask(void) -{ - // TODO: implement -} - -void Process::InitTask(void) -{ - // TODO: implement -} - -bool Process::RunTask(void) -{ - // TODO: implement - return false; -} - #endif /* _WIN32 */ diff --git a/lib/base/process.cpp b/lib/base/process.cpp index 814a9833a..fd0a2c16a 100644 --- a/lib/base/process.cpp +++ b/lib/base/process.cpp @@ -25,22 +25,9 @@ using namespace icinga; -boost::once_flag Process::m_ThreadOnce = BOOST_ONCE_INIT; -boost::mutex Process::m_Mutex; -std::deque Process::m_Tasks; - Process::Process(const std::vector& arguments, const Dictionary::Ptr& extraEnvironment) : AsyncTask(), m_Arguments(arguments), m_ExtraEnvironment(extraEnvironment) -{ - { - boost::mutex::scoped_lock lock(m_Mutex); - boost::call_once(&Process::Initialize, m_ThreadOnce); - } - -#ifndef _WIN32 - m_FD = -1; -#endif /* _WIN32 */ -} +{ } std::vector Process::SplitCommand(const Value& command) { @@ -67,8 +54,3 @@ std::vector Process::SplitCommand(const Value& command) #endif return args; } - -void Process::Run(void) -{ - QueueTask(); -} diff --git a/lib/base/process.h b/lib/base/process.h index 6d3de29ac..ed32b5716 100644 --- a/lib/base/process.h +++ b/lib/base/process.h @@ -69,41 +69,9 @@ private: #ifndef _WIN32 pid_t m_Pid; - int m_FD; #endif /* _WIN32 */ - std::ostringstream m_OutputStream; - - ProcessResult m_Result; - virtual void Run(void); - - static boost::mutex m_Mutex; - static std::deque m_Tasks; -#ifndef _WIN32 - static boost::condition_variable m_CV; - static int m_TaskFd; - - static Timer::Ptr m_StatusTimer; -#endif /* _WIN32 */ - - void QueueTask(void); - - void SpawnTask(void); - -#ifdef _WIN32 - static void WorkerThreadProc(void); -#else /* _WIN32 */ - static void WorkerThreadProc(int taskFd); - - static void StatusTimerHandler(void); -#endif /* _WIN32 */ - - void InitTask(void); - bool RunTask(void); - - static boost::once_flag m_ThreadOnce; - static void Initialize(void); }; }