]> granicus.if.org Git - icinga2/commitdiff
Use multiple threads for socket IO
authorGunnar Beutner <gunnar@beutner.name>
Tue, 2 Feb 2016 10:17:33 +0000 (11:17 +0100)
committerGunnar Beutner <gunnar@beutner.name>
Tue, 23 Feb 2016 08:33:45 +0000 (09:33 +0100)
refs #11014

lib/base/process.cpp
lib/base/socketevents.cpp
lib/base/socketevents.hpp

index 1b5a83fbb8909be6391e453a472eeccd30fcef34..831bf36ae1919fb826226e14b45933d63f165b91 100644 (file)
@@ -45,7 +45,7 @@ extern char **environ;
 
 using namespace icinga;
 
-#define IOTHREADS 2
+#define IOTHREADS 4
 
 static boost::mutex l_ProcessMutex[IOTHREADS];
 static std::map<Process::ProcessHandle, Process::Ptr> l_Processes[IOTHREADS];
index e9ea150101d7cacde57f0bbcf2dfa97a717c8da7..bfcebbb9ed4f328df58f62f2feeb5b9d749b49cc 100644 (file)
@@ -44,36 +44,44 @@ struct EventDescription
        Object::Ptr LifesupportReference;
 };
 
-static boost::thread l_SocketIOThread;
+#define IOTHREADS 8
+
 static boost::once_flag l_SocketIOOnceFlag = BOOST_ONCE_INIT;
-static SOCKET l_SocketIOEventFDs[2];
-static boost::mutex l_SocketIOMutex;
-static boost::condition_variable l_SocketIOCV;
-static bool l_SocketIOFDChanged;
-static std::map<SOCKET, SocketEventDescriptor> l_SocketIOSockets;
-static std::map<SOCKET, int> l_SocketIOEventChanges;
+static boost::thread l_SocketIOThreads[IOTHREADS];
+static SOCKET l_SocketIOEventFDs[IOTHREADS][2];
+static boost::mutex l_SocketIOMutex[IOTHREADS];
+static boost::condition_variable l_SocketIOCV[IOTHREADS];
+static bool l_SocketIOFDChanged[IOTHREADS];
+static std::map<SOCKET, SocketEventDescriptor> l_SocketIOSockets[IOTHREADS];
+static std::map<SOCKET, int> l_SocketIOEventChanges[IOTHREADS];
+
+int SocketEvents::m_NextID = 0;
 
 void SocketEvents::InitializeThread(void)
 {
-       Socket::SocketPair(l_SocketIOEventFDs);
 
-       Utility::SetNonBlockingSocket(l_SocketIOEventFDs[0]);
-       Utility::SetNonBlockingSocket(l_SocketIOEventFDs[1]);
+       for (int i = 0; i < IOTHREADS; i++) {
+               Socket::SocketPair(l_SocketIOEventFDs[i]);
+
+               Utility::SetNonBlockingSocket(l_SocketIOEventFDs[i][0]);
+               Utility::SetNonBlockingSocket(l_SocketIOEventFDs[i][1]);
 
 #ifndef _WIN32
-       Utility::SetCloExec(l_SocketIOEventFDs[0]);
-       Utility::SetCloExec(l_SocketIOEventFDs[1]);
+               Utility::SetCloExec(l_SocketIOEventFDs[i][0]);
+               Utility::SetCloExec(l_SocketIOEventFDs[i][1]);
 #endif /* _WIN32 */
 
-       SocketEventDescriptor sed;
-       sed.Events = POLLIN;
+               SocketEventDescriptor sed;
+               sed.Events = POLLIN;
 
-       l_SocketIOSockets[l_SocketIOEventFDs[0]] = sed;
+               l_SocketIOSockets[i][l_SocketIOEventFDs[i][0]] = sed;
+               l_SocketIOFDChanged[i] = true;
 
-       l_SocketIOThread = boost::thread(&SocketEvents::ThreadProc);
+               l_SocketIOThreads[i] = boost::thread(&SocketEvents::ThreadProc, i);
+       }
 }
 
-void SocketEvents::ThreadProc(void)
+void SocketEvents::ThreadProc(int tid)
 {
        Utility::SetThreadName("SocketIO");
 
@@ -82,17 +90,17 @@ void SocketEvents::ThreadProc(void)
 
        for (;;) {
                {
-                       boost::mutex::scoped_lock lock(l_SocketIOMutex);
+                       boost::mutex::scoped_lock lock(l_SocketIOMutex[tid]);
 
-                       if (l_SocketIOFDChanged) {
-                               pfds.resize(l_SocketIOSockets.size());
-                               descriptors.resize(l_SocketIOSockets.size());
+                       if (l_SocketIOFDChanged[tid]) {
+                               pfds.resize(l_SocketIOSockets[tid].size());
+                               descriptors.resize(l_SocketIOSockets[tid].size());
 
                                int i = 0;
 
                                typedef std::map<SOCKET, SocketEventDescriptor>::value_type kv_pair;
 
-                               BOOST_FOREACH(const kv_pair& desc, l_SocketIOSockets) {
+                               BOOST_FOREACH(const kv_pair& desc, l_SocketIOSockets[tid]) {
                                        if (desc.second.EventInterface)
                                                desc.second.EventInterface->m_PFD = &pfds[i];
 
@@ -103,8 +111,8 @@ void SocketEvents::ThreadProc(void)
                                        i++;
                                }
 
-                               l_SocketIOFDChanged = false;
-                               l_SocketIOCV.notify_all();
+                               l_SocketIOFDChanged[tid] = false;
+                               l_SocketIOCV[tid].notify_all();
                        }
                }
 
@@ -119,18 +127,18 @@ void SocketEvents::ThreadProc(void)
                std::vector<EventDescription> events;
 
                {
-                       boost::mutex::scoped_lock lock(l_SocketIOMutex);
+                       boost::mutex::scoped_lock lock(l_SocketIOMutex[tid]);
 
-                       if (l_SocketIOFDChanged)
+                       if (l_SocketIOFDChanged[tid])
                                continue;
 
                        for (int i = 0; i < pfds.size(); i++) {
                                if ((pfds[i].revents & (POLLIN | POLLOUT | POLLHUP | POLLERR)) == 0)
                                        continue;
 
-                               if (pfds[i].fd == l_SocketIOEventFDs[0]) {
+                               if (pfds[i].fd == l_SocketIOEventFDs[tid][0]) {
                                        char buffer[512];
-                                       if (recv(l_SocketIOEventFDs[0], buffer, sizeof(buffer), 0) < 0)
+                                       if (recv(l_SocketIOEventFDs[tid][0], buffer, sizeof(buffer), 0) < 0)
                                                Log(LogCritical, "SocketEvents", "Read from event FD failed.");
 
                                        continue;
@@ -162,21 +170,24 @@ void SocketEvents::ThreadProc(void)
 
 void SocketEvents::WakeUpThread(bool wait)
 {
+       int tid = m_ID % IOTHREADS;
+
+       if (boost::this_thread::get_id() == l_SocketIOThreads[tid].get_id())
+               return;
+
        if (wait) {
-               if (boost::this_thread::get_id() != l_SocketIOThread.get_id()) {
-                       boost::mutex::scoped_lock lock(l_SocketIOMutex);
+               boost::mutex::scoped_lock lock(l_SocketIOMutex[tid]);
 
-                       l_SocketIOFDChanged = true;
+               l_SocketIOFDChanged[tid] = true;
 
-                       while (l_SocketIOFDChanged) {
-                               (void) send(l_SocketIOEventFDs[1], "T", 1, 0);
+               while (l_SocketIOFDChanged[tid]) {
+                       (void) send(l_SocketIOEventFDs[tid][1], "T", 1, 0);
 
-                               boost::system_time const timeout = boost::get_system_time() + boost::posix_time::milliseconds(50);
-                               l_SocketIOCV.timed_wait(lock, timeout);
-                       }
+                       boost::system_time const timeout = boost::get_system_time() + boost::posix_time::milliseconds(50);
+                       l_SocketIOCV[tid].timed_wait(lock, timeout);
                }
        } else {
-               (void) send(l_SocketIOEventFDs[1], "T", 1, 0);
+               (void) send(l_SocketIOEventFDs[tid][1], "T", 1, 0);
        }
 }
 
@@ -184,7 +195,7 @@ void SocketEvents::WakeUpThread(bool wait)
  * Constructor for the SocketEvents class.
  */
 SocketEvents::SocketEvents(const Socket::Ptr& socket, Object *lifesupportObject)
-       : m_FD(socket->GetFD()), m_PFD(NULL)
+       : m_ID(m_NextID++), m_FD(socket->GetFD()), m_PFD(NULL)
 {
        boost::call_once(l_SocketIOOnceFlag, &SocketEvents::InitializeThread);
 
@@ -198,8 +209,10 @@ SocketEvents::~SocketEvents(void)
 
 void SocketEvents::Register(Object *lifesupportObject)
 {
+       int tid = m_ID % IOTHREADS;
+
        {
-               boost::mutex::scoped_lock lock(l_SocketIOMutex);
+               boost::mutex::scoped_lock lock(l_SocketIOMutex[tid]);
 
                VERIFY(m_FD != INVALID_SOCKET);
 
@@ -208,9 +221,10 @@ void SocketEvents::Register(Object *lifesupportObject)
                desc.EventInterface = this;
                desc.LifesupportObject = lifesupportObject;
 
-               VERIFY(l_SocketIOSockets.find(m_FD) == l_SocketIOSockets.end());
+               VERIFY(l_SocketIOSockets[tid].find(m_FD) == l_SocketIOSockets[tid].end());
 
-               l_SocketIOSockets[m_FD] = desc;
+               l_SocketIOSockets[tid][m_FD] = desc;
+               l_SocketIOFDChanged[tid] = true;
 
                m_Events = true;
        }
@@ -220,13 +234,17 @@ void SocketEvents::Register(Object *lifesupportObject)
 
 void SocketEvents::Unregister(void)
 {
+       int tid = m_ID % IOTHREADS;
+
        {
-               boost::mutex::scoped_lock lock(l_SocketIOMutex);
+               boost::mutex::scoped_lock lock(l_SocketIOMutex[tid]);
 
                if (m_FD == INVALID_SOCKET)
                        return;
 
-               l_SocketIOSockets.erase(m_FD);
+               l_SocketIOSockets[tid].erase(m_FD);
+               l_SocketIOFDChanged[tid] = true;
+
                m_FD = INVALID_SOCKET;
 
                m_Events = false;
@@ -237,23 +255,25 @@ void SocketEvents::Unregister(void)
 
 void SocketEvents::ChangeEvents(int events)
 {
+       int tid = m_ID % IOTHREADS;
+
        {
-               boost::mutex::scoped_lock lock(l_SocketIOMutex);
+               boost::mutex::scoped_lock lock(l_SocketIOMutex[tid]);
 
                if (m_FD == INVALID_SOCKET)
                        BOOST_THROW_EXCEPTION(std::runtime_error("Tried to read/write from a closed socket."));
 
-               std::map<SOCKET, SocketEventDescriptor>::iterator it = l_SocketIOSockets.find(m_FD);
+               std::map<SOCKET, SocketEventDescriptor>::iterator it = l_SocketIOSockets[tid].find(m_FD);
 
-               if (it == l_SocketIOSockets.end())
+               if (it == l_SocketIOSockets[tid].end())
                        return;
 
                it->second.Events = events;
 
-               if (m_PFD && boost::this_thread::get_id() == l_SocketIOThread.get_id())
+               if (m_PFD && boost::this_thread::get_id() == l_SocketIOThreads[tid].get_id())
                        m_PFD->events = events;
                else
-                       l_SocketIOFDChanged = true;
+                       l_SocketIOFDChanged[tid] = true;
        }
 
        WakeUpThread();
@@ -261,7 +281,8 @@ void SocketEvents::ChangeEvents(int events)
 
 bool SocketEvents::IsHandlingEvents(void) const
 {
-       boost::mutex::scoped_lock lock(l_SocketIOMutex);
+       int tid = m_ID % IOTHREADS;
+       boost::mutex::scoped_lock lock(l_SocketIOMutex[tid]);
        return m_Events;
 }
 
index 01e1feb8b7de6ecb506352fd36e1172f2ddfb9a5..b4b324bff715f47933faf72b2ba54f16b5ae83ad 100644 (file)
@@ -52,14 +52,17 @@ protected:
        SocketEvents(const Socket::Ptr& socket, Object *lifesupportObject);
 
 private:
+       int m_ID;
        SOCKET m_FD;
        bool m_Events;
        pollfd *m_PFD;
 
+       static int m_NextID;
+
        static void InitializeThread(void);
-       static void ThreadProc(void);
+       static void ThreadProc(int tid);
 
-       static void WakeUpThread(bool wait = false);
+       void WakeUpThread(bool wait = false);
 
        int GetPollEvents(void) const;