]> granicus.if.org Git - icinga2/commitdiff
Process class: Use the global EventQueue instead of a custom queue.
authorGunnar Beutner <gunnar@beutner.name>
Fri, 22 Mar 2013 23:26:56 +0000 (00:26 +0100)
committerGunnar Beutner <gunnar@beutner.name>
Fri, 22 Mar 2013 23:26:56 +0000 (00:26 +0100)
lib/base/eventqueue.cpp
lib/base/eventqueue.h
lib/base/process-unix.cpp
lib/base/process-windows.cpp
lib/base/process.cpp
lib/base/process.h

index 3c6a065c725d278978bac3d183de37de4a20f246..08349453e9de3fdf29210c43d21c4c4bc85f77ac 100644 (file)
 #include <sstream>
 #include <boost/bind.hpp>
 #include <boost/exception/diagnostic_information.hpp>
+#include <boost/foreach.hpp>
 
 using namespace icinga;
 
 EventQueue::EventQueue(void)
        : m_Stopped(false)
 {
-       unsigned int threads = boost::thread::hardware_concurrency();
+       m_ThreadCount = boost::thread::hardware_concurrency();
 
-       if (threads == 0)
-               threads = 1;
+       if (m_ThreadCount == 0)
+               m_ThreadCount = 1;
 
-       threads *= 8;
+       m_ThreadCount *= 8;
 
-       for (unsigned int i = 0; i < threads; i++)
-               m_Threads.create_thread(boost::bind(&EventQueue::QueueThreadProc, this));
+       m_ThreadCount = 128;
+
+       m_States = new ThreadState[m_ThreadCount];
+
+       for (int i = 0; i < m_ThreadCount; i++) {
+               m_States[i] = ThreadIdle;
+               m_Threads.create_thread(boost::bind(&EventQueue::QueueThreadProc, this, i));
+       }
 
        boost::thread reportThread(boost::bind(&EventQueue::ReportThreadProc, this));
        reportThread.detach();
@@ -68,22 +75,26 @@ void EventQueue::Join(void)
 /**
  * Waits for events and processes them.
  */
-void EventQueue::QueueThreadProc(void)
+void EventQueue::QueueThreadProc(int tid)
 {
        for (;;) {
-               Callback event;
+               EventQueueWorkItem event;
 
                {
                        boost::mutex::scoped_lock lock(m_Mutex);
 
+                       m_States[tid] = ThreadIdle;
+
                        while (m_Events.empty() && !m_Stopped)
                                m_CV.wait(lock);
 
                        if (m_Events.empty() && m_Stopped)
                                break;
 
-                       event = m_Events.top();
-                       m_Events.pop();
+                       event = m_Events.front();
+                       m_Events.pop_front();
+
+                       m_States[tid] = ThreadBusy;
                }
 
 #ifdef _DEBUG
@@ -97,7 +108,7 @@ void EventQueue::QueueThreadProc(void)
 #endif /* _DEBUG */
 
                try {
-                       event();
+                       event.Callback();
                } catch (const std::exception& ex) {
                        std::ostringstream msgbuf;
                        msgbuf << "Exception thrown in event handler: " << std::endl
@@ -146,10 +157,15 @@ void EventQueue::QueueThreadProc(void)
  *
  * @param callback The callback function for the event.
  */
-void EventQueue::Post(const EventQueue::Callback& callback)
+void EventQueue::Post(const EventQueueCallback& callback)
 {
        boost::mutex::scoped_lock lock(m_Mutex);
-       m_Events.push(callback);
+
+       EventQueueWorkItem event;
+       event.Callback = callback;
+       event.Timestamp = Utility::GetTime();
+
+       m_Events.push_back(event);
        m_CV.notify_one();
 }
 
@@ -158,13 +174,40 @@ void EventQueue::ReportThreadProc(void)
        for (;;) {
                Utility::Sleep(5);
 
-               int pending;
+               double now = Utility::GetTime();
+
+               int pending, busy;
+               double max_latency, avg_latency;
 
                {
                        boost::mutex::scoped_lock lock(m_Mutex);
                        pending = m_Events.size();
+
+                       busy = 0;
+
+                       for (int i = 0; i < m_ThreadCount; i++) {
+                               if (m_States[i] == ThreadBusy)
+                                       busy++;
+                       }
+
+                       max_latency = 0;
+                       avg_latency = 0;
+
+                       BOOST_FOREACH(const EventQueueWorkItem& event, m_Events) {
+                               double latency = now - event.Timestamp;
+
+                               avg_latency += latency;
+
+                               if (latency > max_latency)
+                                       max_latency = latency;
+                       }
+
+                       avg_latency /= pending;
                }
 
-               Log(LogInformation, "base", "Pending tasks: " + Convert::ToString(pending));
+               Log(LogInformation, "base", "Pending tasks: " + Convert::ToString(pending) + "; Busy threads: " +
+                   Convert::ToString(busy) + "; Idle threads: " + Convert::ToString(m_ThreadCount - busy) +
+                   "; Maximum latency: " + Convert::ToString((long)max_latency * 1000) + "ms"
+                   "; Average latency: " + Convert::ToString((long)avg_latency * 1000) + "ms");
        }
 }
index 944a0870cd34cc5ed9ad786ba3249f82ea0ca021..2b31879480cd4332d7569932cd9f3be6f025a374 100644 (file)
 namespace icinga
 {
 
+enum ThreadState
+{
+       ThreadIdle,
+       ThreadBusy
+};
+
+typedef boost::function<void ()> EventQueueCallback;
+
+struct EventQueueWorkItem
+{
+       EventQueueCallback Callback;
+       double Timestamp;
+};
+
 /**
  * An event queue.
  *
@@ -38,26 +52,26 @@ namespace icinga
 class I2_BASE_API EventQueue
 {
 public:
-       typedef boost::function<void ()> Callback;
-
        EventQueue(void);
        ~EventQueue(void);
 
        void Stop(void);
        void Join(void);
 
-       void Post(const Callback& callback);
+       void Post(const EventQueueCallback& callback);
 
 private:
        boost::thread_group m_Threads;
+       ThreadState *m_States;
+       int m_ThreadCount;
 
        boost::mutex m_Mutex;
        boost::condition_variable m_CV;
 
        bool m_Stopped;
-       std::stack<Callback> m_Events;
+       std::deque<EventQueueWorkItem> m_Events;
 
-       void QueueThreadProc(void);
+       void QueueThreadProc(int tid);
        void ReportThreadProc(void);
 };
 
index ba0325f04632fc4c81a1cdcaf2e206be6c2504cc..6ae2d15663bd6ecbd401e7cb46ac5dfee96e1b66 100644 (file)
 
 using namespace icinga;
 
-boost::condition_variable Process::m_CV;
-int Process::m_TaskFd;
-Timer::Ptr Process::m_StatusTimer;
-
 #ifndef __APPLE__
 extern char **environ;
 #else /* __APPLE__ */
@@ -45,202 +41,11 @@ extern char **environ;
 #define environ (*_NSGetEnviron())
 #endif /* __APPLE__ */
 
-void Process::Initialize(void)
+void Process::Run(void)
 {
-       int fds[2];
-
-#if HAVE_PIPE2
-       if (pipe2(fds, O_CLOEXEC) < 0) {
-               BOOST_THROW_EXCEPTION(posix_error()
-                   << boost::errinfo_api_function("pipe2")
-                   << boost::errinfo_errno(errno));
-       }
-#else /* HAVE_PIPE2 */
-       if (pipe(fds) < 0) {
-               BOOST_THROW_EXCEPTION(posix_error()
-                   << boost::errinfo_api_function("pipe")
-                   << boost::errinfo_errno(errno));
-       }
-
-       /* Don't bother setting fds[0] to clo-exec as we'll only
-        * use it in the following dup() call. */
-
-       Utility::SetCloExec(fds[1]);
-#endif /* HAVE_PIPE2 */
-
-       m_TaskFd = fds[1];
-
-       unsigned int threads = boost::thread::hardware_concurrency();
-
-       if (threads == 0)
-               threads = 2;
-
-       for (unsigned int i = 0; i < threads; i++) {
-               int childTaskFd = dup(fds[0]);
+       ProcessResult result;
 
-               if (childTaskFd < 0) {
-                       BOOST_THROW_EXCEPTION(posix_error()
-                           << boost::errinfo_api_function("dup")
-                           << boost::errinfo_errno(errno));
-               }
-
-               Utility::SetNonBlocking(childTaskFd);
-               Utility::SetCloExec(childTaskFd);
-
-               boost::thread t(&Process::WorkerThreadProc, childTaskFd);
-               t.detach();
-       }
-
-       (void) close(fds[0]);
-
-       m_StatusTimer = boost::make_shared<Timer>();
-       m_StatusTimer->OnTimerExpired.connect(boost::bind(&Process::StatusTimerHandler));
-       m_StatusTimer->SetInterval(5);
-       m_StatusTimer->Start();
-}
-
-void Process::WorkerThreadProc(int taskFd)
-{
-       std::map<int, Process::Ptr> tasks;
-       pollfd *pfds = NULL;
-
-       for (;;) {
-               std::map<int, Process::Ptr>::iterator it, prev;
-
-               pfds = (pollfd *)realloc(pfds, (1 + tasks.size()) * sizeof(pollfd));
-
-               if (pfds == NULL) {
-                       BOOST_THROW_EXCEPTION(posix_error()
-                           << boost::errinfo_api_function("realloc")
-                           << boost::errinfo_errno(errno));
-               }
-
-               int idx = 0;
-
-               int fd;
-               BOOST_FOREACH(boost::tie(fd, boost::tuples::ignore), tasks) {
-                       pfds[idx].fd = fd;
-                       pfds[idx].events = POLLIN | POLLHUP;
-                       idx++;
-               }
-
-               if (tasks.size() < MaxTasksPerThread) {
-                       pfds[idx].fd = taskFd;
-                       pfds[idx].events = POLLIN;
-                       idx++;
-               }
-
-               int rc = poll(pfds, idx, -1);
-
-               if (rc < 0 && errno != EINTR) {
-                       BOOST_THROW_EXCEPTION(posix_error()
-                           << boost::errinfo_api_function("poll")
-                           << boost::errinfo_errno(errno));
-               }
-
-               if (rc == 0)
-                       continue;
-
-               for (int i = 0; i < idx; i++) {
-                       if ((pfds[i].revents & (POLLIN|POLLHUP)) == 0)
-                               continue;
-
-                       if (pfds[i].fd == taskFd) {
-                               std::vector<Process::Ptr> new_tasks;
-
-                               unsigned int want = MaxTasksPerThread - tasks.size();
-
-                               if (want > 0) {
-                                       boost::mutex::scoped_lock lock(m_Mutex);
-
-                                       /* Read one byte for every task we take from the pending tasks list. */
-                                       char buffer[MaxTasksPerThread];
-
-                                       ASSERT(want <= sizeof(buffer));
-
-                                       int have = read(taskFd, &buffer, want);
-
-                                       if (have < 0) {
-                                               if (errno == EAGAIN)
-                                                       break; /* Someone else was faster and took our task. */
-
-                                               BOOST_THROW_EXCEPTION(posix_error()
-                                                   << boost::errinfo_api_function("read")
-                                                   << boost::errinfo_errno(errno));
-                                       }
-
-                                       while (have > 0) {
-                                               ASSERT(!m_Tasks.empty());
-
-                                               Process::Ptr task = m_Tasks.front();
-                                               m_Tasks.pop_front();
-
-                                               new_tasks.push_back(task);
-
-                                               have--;
-                                       }
-
-                                       m_CV.notify_all();
-                               }
-
-                               BOOST_FOREACH(const Process::Ptr& task, new_tasks) {
-                                       try {
-                                               task->InitTask();
-
-                                               int fd = task->m_FD;
-
-                                               if (fd >= 0)
-                                                       tasks[fd] = task;
-                                       } catch (...) {
-                                               task->FinishException(boost::current_exception());
-                                       }
-                               }
-
-                               continue;
-                       }
-
-                       it = tasks.find(pfds[i].fd);
-
-                       if (it == tasks.end())
-                               continue;
-
-                       Process::Ptr task = it->second;
-
-                       if (!task->RunTask()) {
-                               prev = it;
-                               tasks.erase(prev);
-
-                               task->FinishResult(task->m_Result);
-                       }
-               }
-       }
-}
-
-void Process::QueueTask(void)
-{
-       {
-               boost::mutex::scoped_lock lock(m_Mutex);
-
-               while (m_Tasks.size() >= PIPE_BUF)
-                       m_CV.wait(lock);
-
-               m_Tasks.push_back(GetSelf());
-
-               /**
-                * This little gem which is commonly known as the "self-pipe trick"
-                * takes care of waking up the select() call in the worker thread.
-                */
-               if (write(m_TaskFd, "T", 1) < 0) {
-                       BOOST_THROW_EXCEPTION(posix_error()
-                           << boost::errinfo_api_function("write")
-                           << boost::errinfo_errno(errno));
-               }
-       }
-}
-
-void Process::InitTask(void)
-{
-       m_Result.ExecutionStart = Utility::GetTime();
+       result.ExecutionStart = Utility::GetTime();
 
        ASSERT(m_FD == -1);
 
@@ -259,10 +64,7 @@ void Process::InitTask(void)
                    << boost::errinfo_errno(errno));
        }
 
-       Utility::SetNonBlocking(fds[0]);
        Utility::SetCloExec(fds[0]);
-
-       Utility::SetNonBlocking(fds[1]);
        Utility::SetCloExec(fds[1]);
 #endif /* HAVE_PIPE2 */
 
@@ -350,31 +152,27 @@ void Process::InitTask(void)
 
        delete [] envp;
 
-       m_FD = fds[0];
+       int fd = fds[0];
        (void) close(fds[1]);
-}
 
-bool Process::RunTask(void)
-{
-       char buffer[512];
+               char buffer[512];
        int rc;
 
+       std::ostringstream outputStream;
+
        do {
-               rc = read(m_FD, buffer, sizeof(buffer));
+               rc = read(fd, buffer, sizeof(buffer));
 
                if (rc > 0) {
-                       m_OutputStream.write(buffer, rc);
+                       outputStream.write(buffer, rc);
                }
        } while (rc > 0);
 
-       if (rc < 0 && errno == EAGAIN)
-               return true;
-
-       String output = m_OutputStream.str();
+       String output = outputStream.str();
 
        int status, exitcode;
 
-       (void) close(m_FD);
+       (void) close(fd);
 
        if (waitpid(m_Pid, &status, 0) != m_Pid) {
                BOOST_THROW_EXCEPTION(posix_error()
@@ -393,19 +191,11 @@ bool Process::RunTask(void)
                exitcode = 128;
        }
 
-       m_Result.ExecutionEnd = Utility::GetTime();
-       m_Result.ExitStatus = exitcode;
-       m_Result.Output = output;
+       result.ExecutionEnd = Utility::GetTime();
+       result.ExitStatus = exitcode;
+       result.Output = output;
 
-       return false;
-}
-
-void Process::StatusTimerHandler(void)
-{
-       boost::mutex::scoped_lock lock(m_Mutex);
-       if (m_Tasks.size() > 50)
-               Log(LogCritical, "base", "More than 50 waiting Process tasks: " +
-                   Convert::ToString(m_Tasks.size()));
+       FinishResult(result);
 }
 
 #endif /* _WIN32 */
index 3ca4bb07367c4d45be298e8b55ff535f48966ffa..6d7ef6b9fbca949a47733edf250a3ba0bd4a9fac 100644 (file)
 #ifdef _WIN32
 using namespace icinga;
 
-void Process::Initialize(void)
+void Process::Run(void)
 {
        // TODO: implement
 }
 
-void Process::WorkerThreadProc(void)
-{
-       // TODO: implement
-}
-
-void Process::QueueTask(void)
-{
-       // TODO: implement
-}
-
-void Process::InitTask(void)
-{
-       // TODO: implement
-}
-
-bool Process::RunTask(void)
-{
-       // TODO: implement
-       return false;
-}
-
 #endif /* _WIN32 */
index 814a9833a122d43ae2378a622a29b4ba82c41d12..fd0a2c16aecf6d780dfeff22c93cb861e00f4fc6 100644 (file)
 
 using namespace icinga;
 
-boost::once_flag Process::m_ThreadOnce = BOOST_ONCE_INIT;
-boost::mutex Process::m_Mutex;
-std::deque<Process::Ptr> Process::m_Tasks;
-
 Process::Process(const std::vector<String>& arguments, const Dictionary::Ptr& extraEnvironment)
        : AsyncTask<Process, ProcessResult>(), m_Arguments(arguments), m_ExtraEnvironment(extraEnvironment)
-{
-       {
-               boost::mutex::scoped_lock lock(m_Mutex);
-               boost::call_once(&Process::Initialize, m_ThreadOnce);
-       }
-
-#ifndef _WIN32
-       m_FD = -1;
-#endif /* _WIN32 */
-}
+{ }
 
 std::vector<String> Process::SplitCommand(const Value& command)
 {
@@ -67,8 +54,3 @@ std::vector<String> Process::SplitCommand(const Value& command)
 #endif
        return args;
 }
-
-void Process::Run(void)
-{
-       QueueTask();
-}
index 6d3de29ac151d53a2311ee61af79d42f989b6fac..ed32b571661bdd16f9a2239e25a530f191d50d7b 100644 (file)
@@ -69,41 +69,9 @@ private:
 
 #ifndef _WIN32
        pid_t m_Pid;
-       int m_FD;
 #endif /* _WIN32 */
 
-       std::ostringstream m_OutputStream;
-
-       ProcessResult m_Result;
-
        virtual void Run(void);
-
-       static boost::mutex m_Mutex;
-       static std::deque<Process::Ptr> m_Tasks;
-#ifndef _WIN32
-       static boost::condition_variable m_CV;
-       static int m_TaskFd;
-
-       static Timer::Ptr m_StatusTimer;
-#endif /* _WIN32 */
-
-       void QueueTask(void);
-
-       void SpawnTask(void);
-
-#ifdef _WIN32
-       static void WorkerThreadProc(void);
-#else /* _WIN32 */
-       static void WorkerThreadProc(int taskFd);
-
-       static void StatusTimerHandler(void);
-#endif /* _WIN32 */
-
-       void InitTask(void);
-       bool RunTask(void);
-
-       static boost::once_flag m_ThreadOnce;
-       static void Initialize(void);
 };
 
 }