]> granicus.if.org Git - icinga2/commitdiff
base: Limit buffer size for BufferedStream objects.
authorGunnar Beutner <gunnar.beutner@netways.de>
Fri, 20 Sep 2013 07:45:05 +0000 (09:45 +0200)
committerGunnar Beutner <gunnar.beutner@netways.de>
Fri, 20 Sep 2013 07:45:05 +0000 (09:45 +0200)
lib/base/bufferedstream.cpp
lib/base/bufferedstream.h

index 2f35a57f3c75d3a5a9d6c99f2ee2ad0d9261421f..8d7fe4113c7ea773334ee60ce06d0509b30fd9d2 100644 (file)
 
 using namespace icinga;
 
-BufferedStream::BufferedStream(const Stream::Ptr& innerStream)
+BufferedStream::BufferedStream(const Stream::Ptr& innerStream, size_t maxBufferSize)
        : m_InnerStream(innerStream), m_Stopped(false), m_Eof(false),
          m_RecvQ(boost::make_shared<FIFO>()), m_SendQ(boost::make_shared<FIFO>()),
-         m_Blocking(true), m_Exception()
+         m_Blocking(true), m_MaxBufferSize(maxBufferSize), m_Exception()
 {
        m_ReadThread = boost::thread(boost::bind(&BufferedStream::ReadThreadProc, this));
        m_WriteThread = boost::thread(boost::bind(&BufferedStream::WriteThreadProc, this));
@@ -111,6 +111,7 @@ void BufferedStream::WriteThreadProc(void)
                                        break;
 
                                rc = m_SendQ->Read(buffer, sizeof(buffer));
+                               m_WriteCV.notify_all();
                        }
 
                        m_InnerStream->Write(buffer, rc);
@@ -158,12 +159,13 @@ size_t BufferedStream::Read(void *buffer, size_t count)
  *
  * @param buffer The data that is to be written.
  * @param count The number of bytes to write.
- * @returns The number of bytes written
  */
 void BufferedStream::Write(const void *buffer, size_t count)
 {
        boost::mutex::scoped_lock lock(m_Mutex);
 
+       InternalWaitWritable(count, lock);
+
        if (m_Exception)
                boost::rethrow_exception(m_Exception);
 
@@ -184,8 +186,18 @@ void BufferedStream::InternalWaitReadable(size_t count, boost::mutex::scoped_loc
                m_ReadCV.wait(lock);
 }
 
-void BufferedStream::WaitWritable(size_t)
-{ /* Nothing to do here. */ }
+void BufferedStream::WaitWritable(size_t count)
+{
+       boost::mutex::scoped_lock lock(m_Mutex);
+
+       InternalWaitWritable(count, lock);
+}
+
+void BufferedStream::InternalWaitWritable(size_t count, boost::mutex::scoped_lock& lock)
+{
+       while (m_SendQ->GetAvailableBytes() + count > m_MaxBufferSize && !m_Exception && !m_Stopped)
+               m_WriteCV.wait(lock);
+}
 
 void BufferedStream::MakeNonBlocking(void)
 {
index a6028f3c969a9ce8f86514c24c3e31e522ad3d40..6f1bb78ca676c030178fefa7e8a91839c7972780 100644 (file)
@@ -37,7 +37,7 @@ class I2_BASE_API BufferedStream : public Stream
 public:
        DECLARE_PTR_TYPEDEFS(BufferedStream);
 
-       BufferedStream(const Stream::Ptr& innerStream);
+       BufferedStream(const Stream::Ptr& innerStream, size_t maxBufferSize = 64 * 1024 * 1024);
        ~BufferedStream(void);
 
        virtual size_t Read(void *buffer, size_t count);
@@ -62,6 +62,7 @@ private:
        FIFO::Ptr m_SendQ;
 
        bool m_Blocking;
+       size_t m_MaxBufferSize;
 
        boost::exception_ptr m_Exception;
 
@@ -75,6 +76,7 @@ private:
        boost::thread m_ReadThread;
        boost::thread m_WriteThread;
 
+       void InternalWaitWritable(size_t count, boost::mutex::scoped_lock& lock);
        void InternalWaitReadable(size_t count, boost::mutex::scoped_lock& lock);
 };