]> granicus.if.org Git - icinga2/commitdiff
Fix stalled reconnection attempts for the cluster
authorGunnar Beutner <gunnar@beutner.name>
Fri, 20 Feb 2015 13:43:13 +0000 (14:43 +0100)
committerGunnar Beutner <gunnar@beutner.name>
Fri, 20 Feb 2015 13:43:55 +0000 (14:43 +0100)
refs #8485

lib/base/socketevents.cpp

index a74d16aeff49a74744fc1d561f279757a9faebc2..aa1cc6ee6f2c37ef262f8e9fe615834932a8265a 100644 (file)
@@ -41,9 +41,12 @@ struct SocketEventDescriptor
        { }
 };
 
+static boost::thread l_SocketIOThread;
 static boost::once_flag l_SocketIOOnceFlag = BOOST_ONCE_INIT;
 static SOCKET l_SocketIOEventFDs[2];
 static boost::mutex l_SocketIOMutex;
+static boost::condition_variable l_SocketIOCV;
+static bool l_SocketIOFDChanged;
 static std::map<SOCKET, SocketEventDescriptor> l_SocketIOSockets;
 
 void SocketEvents::InitializeThread(void)
@@ -63,8 +66,7 @@ void SocketEvents::InitializeThread(void)
 
        l_SocketIOSockets[l_SocketIOEventFDs[0]] = sed;
 
-       boost::thread thread(&SocketEvents::ThreadProc);
-       thread.detach();
+       l_SocketIOThread = boost::thread(&SocketEvents::ThreadProc);
 }
 
 void SocketEvents::ThreadProc(void)
@@ -100,6 +102,16 @@ void SocketEvents::ThreadProc(void)
                (void) poll(pfds, pfdcount, -1);
 #endif /* _WIN32 */
 
+               {
+                       boost::mutex::scoped_lock lock(l_SocketIOMutex);
+
+                       if (l_SocketIOFDChanged) {
+                               l_SocketIOFDChanged = false;
+                               l_SocketIOCV.notify_all();
+                               continue;
+                       }
+               }
+
                for (int i = 0; i < pfdcount; i++) {
                        if ((pfds[i].revents & (POLLIN | POLLOUT | POLLHUP | POLLERR)) == 0)
                                continue;
@@ -139,7 +151,16 @@ void SocketEvents::ThreadProc(void)
 
 void SocketEvents::WakeUpThread(void)
 {
-       (void) send(l_SocketIOEventFDs[1], "T", 1, 0);
+       if (boost::this_thread::get_id() != l_SocketIOThread.get_id()) {
+               boost::mutex::scoped_lock lock(l_SocketIOMutex);
+
+               l_SocketIOFDChanged = true;
+
+               (void) send(l_SocketIOEventFDs[1], "T", 1, 0);
+
+               while (l_SocketIOFDChanged)
+                       l_SocketIOCV.wait(lock);
+       }
 }
 
 /**
@@ -160,6 +181,8 @@ SocketEvents::~SocketEvents(void)
 
 void SocketEvents::Register(Object *lifesupportObject)
 {
+       boost::mutex::scoped_lock lock(l_SocketIOMutex);
+
        VERIFY(m_FD != INVALID_SOCKET);
 
        SocketEventDescriptor desc;
@@ -167,25 +190,21 @@ void SocketEvents::Register(Object *lifesupportObject)
        desc.EventInterface = this;
        desc.LifesupportObject = lifesupportObject;
 
-       {
-               boost::mutex::scoped_lock lock(l_SocketIOMutex);
-
-               VERIFY(l_SocketIOSockets.find(m_FD) == l_SocketIOSockets.end());
+       VERIFY(l_SocketIOSockets.find(m_FD) == l_SocketIOSockets.end());
 
-               l_SocketIOSockets[m_FD] = desc;
-       }
+       l_SocketIOSockets[m_FD] = desc;
 
        /* There's no need to wake up the I/O thread here. */
 }
 
 void SocketEvents::Unregister(void)
 {
-       if (m_FD == INVALID_SOCKET)
-               return;
-
        {
                boost::mutex::scoped_lock lock(l_SocketIOMutex);
 
+               if (m_FD == INVALID_SOCKET)
+                       return;
+
                l_SocketIOSockets.erase(m_FD);
                m_FD = INVALID_SOCKET;
        }
@@ -195,12 +214,12 @@ void SocketEvents::Unregister(void)
 
 void SocketEvents::ChangeEvents(int events)
 {
-       if (m_FD == INVALID_SOCKET)
-               BOOST_THROW_EXCEPTION(std::runtime_error("Tried to read/write from a closed socket."));
-
        {
                boost::mutex::scoped_lock lock(l_SocketIOMutex);
 
+               if (m_FD == INVALID_SOCKET)
+                       BOOST_THROW_EXCEPTION(std::runtime_error("Tried to read/write from a closed socket."));
+
                std::map<SOCKET, SocketEventDescriptor>::iterator it = l_SocketIOSockets.find(m_FD);
 
                if (it == l_SocketIOSockets.end())