]> granicus.if.org Git - icinga2/commitdiff
Use more threads for Process I/O.
authorGunnar Beutner <gunnar.beutner@netways.de>
Fri, 14 Mar 2014 12:21:11 +0000 (13:21 +0100)
committerGunnar Beutner <gunnar.beutner@netways.de>
Fri, 14 Mar 2014 12:21:11 +0000 (13:21 +0100)
Refs #5748

lib/base/process-unix.cpp
lib/base/process.h

index 479ff69b2a6bfeda1b6a60b1bcba0004f741e9a0..13052229fa9d1b43a863b7bc508e5aa78d7a3e0c 100644 (file)
@@ -44,66 +44,74 @@ extern char **environ;
 #define environ (*_NSGetEnviron())
 #endif /* __APPLE__ */
 
-static boost::mutex l_ProcessMutex;
-static std::map<int, Process::Ptr> l_Processes;
-static int l_EventFDs[2];
+#define IOTHREADS 8
+
+static boost::mutex l_ProcessMutex[IOTHREADS];
+static std::map<int, Process::Ptr> l_Processes[IOTHREADS];
+static int l_EventFDs[IOTHREADS][2];
 static boost::once_flag l_OnceFlag = BOOST_ONCE_INIT;
 
 INITIALIZE_ONCE(&Process::StaticInitialize);
 
 void Process::StaticInitialize(void)
 {
-#ifdef HAVE_PIPE2
-       if (pipe2(l_EventFDs, O_CLOEXEC) < 0) {
-               BOOST_THROW_EXCEPTION(posix_error()
-                   << boost::errinfo_api_function("pipe2")
-                   << boost::errinfo_errno(errno));
-       }
+       for (int tid = 0; tid < IOTHREADS; tid++) {
+       #ifdef HAVE_PIPE2
+               if (pipe2(l_EventFDs[tid], O_CLOEXEC) < 0) {
+                       BOOST_THROW_EXCEPTION(posix_error()
+                           << boost::errinfo_api_function("pipe2")
+                           << boost::errinfo_errno(errno));
+               }
 #else /* HAVE_PIPE2 */
-       if (pipe(l_EventFDs) < 0) {
-               BOOST_THROW_EXCEPTION(posix_error()
-                   << boost::errinfo_api_function("pipe")
-                   << boost::errinfo_errno(errno));
-       }
+               if (pipe(l_EventFDs[tid]) < 0) {
+                       BOOST_THROW_EXCEPTION(posix_error()
+                           << boost::errinfo_api_function("pipe")
+                           << boost::errinfo_errno(errno));
+               }
 
-       Utility::SetCloExec(l_EventFDs[0]);
-       Utility::SetCloExec(l_EventFDs[1]);
+               Utility::SetCloExec(l_EventFDs[tid][0]);
+               Utility::SetCloExec(l_EventFDs[tid][1]);
 #endif /* HAVE_PIPE2 */
 
-       Utility::SetNonBlocking(l_EventFDs[0]);
-       Utility::SetNonBlocking(l_EventFDs[1]);
+               Utility::SetNonBlocking(l_EventFDs[tid][0]);
+               Utility::SetNonBlocking(l_EventFDs[tid][1]);
+       }
 }
 
 void Process::ThreadInitialize(void)
 {
        /* Note to self: Make sure this runs _after_ we've daemonized. */
-       boost::thread t(&Process::IOThreadProc);
-       t.detach();
+       for (int tid = 0; tid < IOTHREADS; tid++) {
+               boost::thread t(boost::bind(&Process::IOThreadProc, tid));
+               t.detach();
+       }
 }
 
-void Process::IOThreadProc(void)
+void Process::IOThreadProc(int tid)
 {
        pollfd *pfds = NULL;
        int count = 0;
 
+       Utility::SetThreadName("ProcessIO");
+
        for (;;) {
                double now, timeout = -1;
 
                now = Utility::GetTime();
 
                {
-                       boost::mutex::scoped_lock lock(l_ProcessMutex);
+                       boost::mutex::scoped_lock lock(l_ProcessMutex[tid]);
 
-                       count = 1 + l_Processes.size();
+                       count = 1 + l_Processes[tid].size();
                        pfds = reinterpret_cast<pollfd *>(realloc(pfds, sizeof(pollfd) * count));
 
-                       pfds[0].fd = l_EventFDs[0];
+                       pfds[0].fd = l_EventFDs[tid][0];
                        pfds[0].events = POLLIN;
                        pfds[0].revents = 0;
 
                        int i = 1;
                        std::pair<int, Process::Ptr> kv;
-                       BOOST_FOREACH(kv, l_Processes) {
+                       BOOST_FOREACH(kv, l_Processes[tid]) {
                                pfds[i].fd = kv.second->m_FD;
                                pfds[i].events = POLLIN;
                                pfds[i].revents = 0;
@@ -125,24 +133,24 @@ void Process::IOThreadProc(void)
                        continue;
 
                {
-                       boost::mutex::scoped_lock lock(l_ProcessMutex);
+                       boost::mutex::scoped_lock lock(l_ProcessMutex[tid]);
 
                        if (pfds[0].revents & (POLLIN|POLLHUP|POLLERR)) {
                                char buffer[512];
-                               (void) read(l_EventFDs[0], buffer, sizeof(buffer));
+                               (void) read(l_EventFDs[tid][0], buffer, sizeof(buffer));
                        }
 
                        for (int i = 1; i < count; i++) {
                                if (pfds[i].revents & (POLLIN|POLLHUP|POLLERR)) {
                                        std::map<int, Process::Ptr>::iterator it;
-                                       it = l_Processes.find(pfds[i].fd);
+                                       it = l_Processes[tid].find(pfds[i].fd);
 
-                                       if (it == l_Processes.end())
+                                       if (it == l_Processes[tid].end())
                                                continue; /* This should never happen. */
 
                                        if (!it->second->DoEvents()) {
                                                (void) close(it->first);
-                                               l_Processes.erase(it);
+                                               l_Processes[tid].erase(it);
                                        }
                                }
                        }
@@ -273,12 +281,14 @@ void Process::Run(const boost::function<void (const ProcessResult&)>& callback)
        m_FD = fds[0];
        m_Callback = callback;
 
+       int tid = GetTID();
+
        {
-               boost::mutex::scoped_lock lock(l_ProcessMutex);
-               l_Processes[m_FD] = GetSelf();
+               boost::mutex::scoped_lock lock(l_ProcessMutex[tid]);
+               l_Processes[tid][m_FD] = GetSelf();
        }
 
-       (void) write(l_EventFDs[1], "T", 1);
+       (void) write(l_EventFDs[tid][1], "T", 1);
 }
 
 bool Process::DoEvents(void)
@@ -338,4 +348,9 @@ bool Process::DoEvents(void)
        return false;
 }
 
+int Process::GetTID(void) const
+{
+       return (reinterpret_cast<uintptr_t>(this) / sizeof(void *)) % IOTHREADS;
+}
+
 #endif /* _WIN32 */
index c8e18f0c5673c30c5a422947cc49c060b164e516..a7cae9fedd14580332b2113e66b13e5072b6b824 100644 (file)
@@ -85,8 +85,9 @@ private:
        boost::function<void (const ProcessResult&)> m_Callback;
        ProcessResult m_Result;
 
-       static void IOThreadProc(void);
+       static void IOThreadProc(int tid);
        bool DoEvents(void);
+       int GetTID(void) const;
 #endif /* _WIN32 */
 };