From e2815de8a6299f83788d57a5bca9ed9c213583d1 Mon Sep 17 00:00:00 2001 From: Gunnar Beutner Date: Fri, 20 Feb 2015 14:43:13 +0100 Subject: [PATCH] Fix stalled reconnection attempts for the cluster refs #8485 --- lib/base/socketevents.cpp | 49 +++++++++++++++++++++++++++------------ 1 file changed, 34 insertions(+), 15 deletions(-) diff --git a/lib/base/socketevents.cpp b/lib/base/socketevents.cpp index a74d16aef..aa1cc6ee6 100644 --- a/lib/base/socketevents.cpp +++ b/lib/base/socketevents.cpp @@ -41,9 +41,12 @@ struct SocketEventDescriptor { } }; +static boost::thread l_SocketIOThread; static boost::once_flag l_SocketIOOnceFlag = BOOST_ONCE_INIT; static SOCKET l_SocketIOEventFDs[2]; static boost::mutex l_SocketIOMutex; +static boost::condition_variable l_SocketIOCV; +static bool l_SocketIOFDChanged; static std::map l_SocketIOSockets; void SocketEvents::InitializeThread(void) @@ -63,8 +66,7 @@ void SocketEvents::InitializeThread(void) l_SocketIOSockets[l_SocketIOEventFDs[0]] = sed; - boost::thread thread(&SocketEvents::ThreadProc); - thread.detach(); + l_SocketIOThread = boost::thread(&SocketEvents::ThreadProc); } void SocketEvents::ThreadProc(void) @@ -100,6 +102,16 @@ void SocketEvents::ThreadProc(void) (void) poll(pfds, pfdcount, -1); #endif /* _WIN32 */ + { + boost::mutex::scoped_lock lock(l_SocketIOMutex); + + if (l_SocketIOFDChanged) { + l_SocketIOFDChanged = false; + l_SocketIOCV.notify_all(); + continue; + } + } + for (int i = 0; i < pfdcount; i++) { if ((pfds[i].revents & (POLLIN | POLLOUT | POLLHUP | POLLERR)) == 0) continue; @@ -139,7 +151,16 @@ void SocketEvents::ThreadProc(void) void SocketEvents::WakeUpThread(void) { - (void) send(l_SocketIOEventFDs[1], "T", 1, 0); + if (boost::this_thread::get_id() != l_SocketIOThread.get_id()) { + boost::mutex::scoped_lock lock(l_SocketIOMutex); + + l_SocketIOFDChanged = true; + + (void) send(l_SocketIOEventFDs[1], "T", 1, 0); + + while (l_SocketIOFDChanged) + l_SocketIOCV.wait(lock); + } } /** @@ -160,6 +181,8 @@ SocketEvents::~SocketEvents(void) void SocketEvents::Register(Object *lifesupportObject) { + boost::mutex::scoped_lock lock(l_SocketIOMutex); + VERIFY(m_FD != INVALID_SOCKET); SocketEventDescriptor desc; @@ -167,25 +190,21 @@ void SocketEvents::Register(Object *lifesupportObject) desc.EventInterface = this; desc.LifesupportObject = lifesupportObject; - { - boost::mutex::scoped_lock lock(l_SocketIOMutex); - - VERIFY(l_SocketIOSockets.find(m_FD) == l_SocketIOSockets.end()); + VERIFY(l_SocketIOSockets.find(m_FD) == l_SocketIOSockets.end()); - l_SocketIOSockets[m_FD] = desc; - } + l_SocketIOSockets[m_FD] = desc; /* There's no need to wake up the I/O thread here. */ } void SocketEvents::Unregister(void) { - if (m_FD == INVALID_SOCKET) - return; - { boost::mutex::scoped_lock lock(l_SocketIOMutex); + if (m_FD == INVALID_SOCKET) + return; + l_SocketIOSockets.erase(m_FD); m_FD = INVALID_SOCKET; } @@ -195,12 +214,12 @@ void SocketEvents::Unregister(void) void SocketEvents::ChangeEvents(int events) { - if (m_FD == INVALID_SOCKET) - BOOST_THROW_EXCEPTION(std::runtime_error("Tried to read/write from a closed socket.")); - { boost::mutex::scoped_lock lock(l_SocketIOMutex); + if (m_FD == INVALID_SOCKET) + BOOST_THROW_EXCEPTION(std::runtime_error("Tried to read/write from a closed socket.")); + std::map::iterator it = l_SocketIOSockets.find(m_FD); if (it == l_SocketIOSockets.end()) -- 2.40.0