]> granicus.if.org Git - icinga2/commitdiff
Fix stability issues with the TlsStream/Stream classes
authorGunnar Beutner <gunnar@beutner.name>
Wed, 24 Jun 2015 07:44:59 +0000 (09:44 +0200)
committerMichael Friedrich <michael.friedrich@netways.de>
Mon, 6 Jul 2015 13:09:04 +0000 (15:09 +0200)
fixes #9481

lib/base/netstring.cpp
lib/base/netstring.hpp
lib/base/stdiostream.cpp
lib/base/stdiostream.hpp
lib/base/stream.cpp
lib/base/stream.hpp
lib/remote/apiclient.cpp
lib/remote/apiclient.hpp

index 1d97836afb48e0fc99bd1edd023b3f6bb49eceb7..e692f09d5fcc20bc07ebb03656787275ec91d58b 100644 (file)
@@ -32,13 +32,13 @@ using namespace icinga;
  * @exception invalid_argument The input stream is invalid.
  * @see https://github.com/PeterScott/netstring-c/blob/master/netstring.c
  */
-StreamReadStatus NetString::ReadStringFromStream(const Stream::Ptr& stream, String *str, StreamReadContext& context)
+StreamReadStatus NetString::ReadStringFromStream(const Stream::Ptr& stream, String *str, StreamReadContext& context, bool may_wait)
 {
        if (context.Eof)
                return StatusEof;
 
        if (context.MustRead) {
-               if (!context.FillFromStream(stream)) {
+               if (!context.FillFromStream(stream, may_wait)) {
                        context.Eof = true;
                        return StatusEof;
                }
index 85fbe7e49ba7e990e21f69fb343150911bddc3de..089a211e2a08aeae11ffc373fd390db427ba5a1f 100644 (file)
@@ -38,7 +38,7 @@ class String;
 class I2_BASE_API NetString
 {
 public:
-       static StreamReadStatus ReadStringFromStream(const Stream::Ptr& stream, String *message, StreamReadContext& context);
+       static StreamReadStatus ReadStringFromStream(const Stream::Ptr& stream, String *message, StreamReadContext& context, bool may_wait = false);
        static void WriteStringToStream(const Stream::Ptr& stream, const String& message);
 
 private:
index 140117b98be9c166c851c28ea94d93683af0b0f7..356e64361c6ef76b9177a04bc614f1988f50c037 100644 (file)
@@ -61,6 +61,11 @@ void StdioStream::Close(void)
        }
 }
 
+bool StdioStream::IsDataAvailable(void) const
+{
+       return !IsEof();
+}
+
 bool StdioStream::IsEof(void) const
 {
        return !m_InnerStream->good();
index 371d6994ca8fd2dc347fc81bbd8c7d0960b216a8..44e3424195a9b7881ffc7b01afb6c7a507219fdb 100644 (file)
@@ -39,6 +39,7 @@ public:
 
        virtual void Close(void);
 
+       virtual bool IsDataAvailable(void) const;
        virtual bool IsEof(void) const;
 
 private:
index 84ba19887a315be4aeac98ef91f3241c0ce2c4b3..5b465cba388b6963f5f8678817e826904c1824fd 100644 (file)
@@ -61,13 +61,13 @@ void Stream::WaitForData(void)
                m_CV.wait(lock);
 }
 
-StreamReadStatus Stream::ReadLine(String *line, StreamReadContext& context)
+StreamReadStatus Stream::ReadLine(String *line, StreamReadContext& context, bool may_wait)
 {
        if (context.Eof)
                return StatusEof;
 
        if (context.MustRead) {
-               if (!context.FillFromStream(this)) {
+               if (!context.FillFromStream(this, may_wait)) {
                        context.Eof = true;
 
                        *line = String(context.Buffer, &(context.Buffer[context.Size]));
@@ -86,6 +86,8 @@ StreamReadStatus Stream::ReadLine(String *line, StreamReadContext& context)
 
                        if (count == 1)
                                first_newline = i;
+                       else if (count > 1)
+                               break;
                }
        }
 
@@ -103,9 +105,9 @@ StreamReadStatus Stream::ReadLine(String *line, StreamReadContext& context)
        return StatusNeedData;
 }
 
-bool StreamReadContext::FillFromStream(const Stream::Ptr& stream)
+bool StreamReadContext::FillFromStream(const Stream::Ptr& stream, bool may_wait)
 {
-       if (Wait && stream->SupportsWaiting())
+       if (may_wait && stream->SupportsWaiting())
                stream->WaitForData();
 
        size_t count = 0;
@@ -120,7 +122,7 @@ bool StreamReadContext::FillFromStream(const Stream::Ptr& stream)
 
                Size += rc;
                count += rc;
-       } while (stream->IsDataAvailable());
+       } while (count < 64 * 1024 && stream->IsDataAvailable());
 
        if (count == 0 && stream->IsEof())
                return false;
index fcd14cc95d8ae18a3a606d27d532d8b9411977b8..9754d3b3676b69ea9197b620f958c588ad08116e 100644 (file)
@@ -38,8 +38,8 @@ enum ConnectionRole
 
 struct StreamReadContext
 {
-       StreamReadContext(bool wait = true)
-               : Buffer(NULL), Size(0), MustRead(true), Eof(false), Wait(wait)
+       StreamReadContext(void)
+               : Buffer(NULL), Size(0), MustRead(true), Eof(false)
        { }
 
        ~StreamReadContext(void)
@@ -47,14 +47,13 @@ struct StreamReadContext
                free(Buffer);
        }
 
-       bool FillFromStream(const intrusive_ptr<Stream>& stream);
+       bool FillFromStream(const intrusive_ptr<Stream>& stream, bool may_wait);
        void DropData(size_t count);
 
        char *Buffer;
        size_t Size;
        bool MustRead;
        bool Eof;
-       bool Wait;
 };
 
 enum StreamReadStatus
@@ -117,7 +116,7 @@ public:
 
        void RegisterDataHandler(const boost::function<void(void)>& handler);
 
-       StreamReadStatus ReadLine(String *line, StreamReadContext& context);
+       StreamReadStatus ReadLine(String *line, StreamReadContext& context, bool may_wait = false);
 
 protected:
        void SignalDataAvailable(void);
index b5662433e0df2c15484fd5561d4a6e8adf227f5f..d4fc9f92a99ff13d70ecf374a4364478b1be6104 100644 (file)
@@ -40,7 +40,7 @@ static Timer::Ptr l_ApiClientTimeoutTimer;
 
 ApiClient::ApiClient(const String& identity, bool authenticated, const TlsStream::Ptr& stream, ConnectionRole role)
        : m_Identity(identity), m_Authenticated(authenticated), m_Stream(stream), m_Role(role), m_Seen(Utility::GetTime()),
-         m_NextHeartbeat(0), m_HeartbeatTimeout(0), m_Context(false)
+         m_NextHeartbeat(0), m_HeartbeatTimeout(0)
 {
        boost::call_once(l_ApiClientOnceFlag, &ApiClient::StaticInitialize);
 
@@ -59,6 +59,8 @@ void ApiClient::StaticInitialize(void)
 void ApiClient::Start(void)
 {
        m_Stream->RegisterDataHandler(boost::bind(&ApiClient::DataAvailableHandler, this));
+       if (m_Stream->IsDataAvailable())
+               DataAvailableHandler();
 }
 
 String ApiClient::GetIdentity(void) const
@@ -195,6 +197,8 @@ bool ApiClient::ProcessMessage(void)
 
 void ApiClient::DataAvailableHandler(void)
 {
+       boost::mutex::scoped_lock lock(m_DataHandlerMutex);
+
        try {
                while (ProcessMessage())
                        ; /* empty loop body */
index 80199f1c4fe19981012e2aa196224796f832eb4a..3adf6161a2acd01e67bfb8ea37a572c5e5b5ff41 100644 (file)
@@ -74,6 +74,7 @@ private:
        double m_NextHeartbeat;
        double m_HeartbeatTimeout;
        Timer::Ptr m_TimeoutTimer;
+       boost::mutex m_DataHandlerMutex;
 
        StreamReadContext m_Context;