From: Gunnar Beutner Date: Sat, 9 Mar 2013 14:56:56 +0000 (+0100) Subject: Fix deadlocks in the Socket/Stream classes. X-Git-Tag: v0.0.2~305 X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=375746d710d8772405ed606563d2141a0294bf43;p=icinga2 Fix deadlocks in the Socket/Stream classes. --- diff --git a/lib/base/socket.cpp b/lib/base/socket.cpp index 196cb2bed..1e88b233e 100644 --- a/lib/base/socket.cpp +++ b/lib/base/socket.cpp @@ -48,8 +48,6 @@ Socket::~Socket(void) */ void Socket::Start(void) { - ObjectLock olock(this); - ASSERT(!m_ReadThread.joinable() && !m_WriteThread.joinable()); ASSERT(GetFD() != INVALID_SOCKET); @@ -100,13 +98,15 @@ SOCKET Socket::GetFD(void) const */ void Socket::Close(void) { - ObjectLock olock(this); + { + ObjectLock olock(this); - if (m_FD == INVALID_SOCKET) - return; + if (m_FD == INVALID_SOCKET) + return; - closesocket(m_FD); - m_FD = INVALID_SOCKET; + closesocket(m_FD); + m_FD = INVALID_SOCKET; + } Stream::Close(); } @@ -385,19 +385,17 @@ size_t Socket::GetAvailableBytes(void) const */ size_t Socket::Read(void *buffer, size_t size) { - ObjectLock olock(this); - - if (m_Listening) - throw new logic_error("Socket does not support Read()."); - { - ObjectLock olock(m_RecvQueue); - - if (m_RecvQueue->GetAvailableBytes() == 0) - CheckException(); + ObjectLock olock(this); - return m_RecvQueue->Read(buffer, size); + if (m_Listening) + throw new logic_error("Socket does not support Read()."); } + + if (m_RecvQueue->GetAvailableBytes() == 0) + CheckException(); + + return m_RecvQueue->Read(buffer, size); } /** @@ -416,14 +414,10 @@ size_t Socket::Peek(void *buffer, size_t size) throw new logic_error("Socket does not support Peek()."); } - { - ObjectLock olock(m_RecvQueue); - - if (m_RecvQueue->GetAvailableBytes() == 0) - CheckException(); + if (m_RecvQueue->GetAvailableBytes() == 0) + CheckException(); - return m_RecvQueue->Peek(buffer, size); - } + return m_RecvQueue->Peek(buffer, size); } /** @@ -449,12 +443,13 @@ void Socket::Write(const void *buffer, size_t size) */ void Socket::Listen(void) { - ObjectLock olock(this); - if (listen(GetFD(), SOMAXCONN) < 0) BOOST_THROW_EXCEPTION(SocketException("listen() failed", GetError())); - m_Listening = true; + { + ObjectLock olock(this); + m_Listening = true; + } } void Socket::HandleWritable(void) @@ -553,7 +548,7 @@ void Socket::HandleReadableServer(void) if (fd < 0) BOOST_THROW_EXCEPTION(SocketException("accept() failed", GetError())); - TcpSocket::Ptr client = boost::make_shared(); + Socket::Ptr client = boost::make_shared(); client->SetFD(fd); OnNewClient(GetSelf(), client); } diff --git a/lib/base/socket.h b/lib/base/socket.h index a5d86a49c..d580bd7a9 100644 --- a/lib/base/socket.h +++ b/lib/base/socket.h @@ -33,6 +33,7 @@ public: typedef shared_ptr Ptr; typedef weak_ptr WeakPtr; + Socket(void); ~Socket(void); virtual void Start(void); @@ -54,8 +55,6 @@ public: signals2::signal OnNewClient; protected: - Socket(void); - void SetFD(SOCKET fd); SOCKET GetFD(void) const; diff --git a/lib/base/stream.cpp b/lib/base/stream.cpp index 235bd8bf2..cf5289b00 100644 --- a/lib/base/stream.cpp +++ b/lib/base/stream.cpp @@ -112,3 +112,22 @@ void Stream::Close(void) SetConnected(false); } + +bool Stream::ReadLine(String *line, size_t maxLength) +{ + char buffer[maxLength]; + + size_t rc = Peek(buffer, maxLength); + + for (int i = 0; i < rc; i++) { + if (buffer[i] == '\n') { + *line = String(buffer, &(buffer[i])); + + Read(NULL, rc); + + return true; + } + } + + return false; +} diff --git a/lib/base/stream.h b/lib/base/stream.h index 1e868c901..c02d2adba 100644 --- a/lib/base/stream.h +++ b/lib/base/stream.h @@ -82,6 +82,8 @@ public: bool IsConnected(void) const; + bool ReadLine(String *line, size_t maxLength = 4096); + boost::exception_ptr GetException(void); void CheckException(void);