using namespace icinga;
-BufferedStream::BufferedStream(const Stream::Ptr& innerStream)
+BufferedStream::BufferedStream(const Stream::Ptr& innerStream, size_t maxBufferSize)
: m_InnerStream(innerStream), m_Stopped(false), m_Eof(false),
m_RecvQ(boost::make_shared<FIFO>()), m_SendQ(boost::make_shared<FIFO>()),
- m_Blocking(true), m_Exception()
+ m_Blocking(true), m_MaxBufferSize(maxBufferSize), m_Exception()
{
m_ReadThread = boost::thread(boost::bind(&BufferedStream::ReadThreadProc, this));
m_WriteThread = boost::thread(boost::bind(&BufferedStream::WriteThreadProc, this));
break;
rc = m_SendQ->Read(buffer, sizeof(buffer));
+ m_WriteCV.notify_all();
}
m_InnerStream->Write(buffer, rc);
*
* @param buffer The data that is to be written.
* @param count The number of bytes to write.
- * @returns The number of bytes written
*/
void BufferedStream::Write(const void *buffer, size_t count)
{
boost::mutex::scoped_lock lock(m_Mutex);
+ InternalWaitWritable(count, lock);
+
if (m_Exception)
boost::rethrow_exception(m_Exception);
m_ReadCV.wait(lock);
}
-void BufferedStream::WaitWritable(size_t)
-{ /* Nothing to do here. */ }
+void BufferedStream::WaitWritable(size_t count)
+{
+ boost::mutex::scoped_lock lock(m_Mutex);
+
+ InternalWaitWritable(count, lock);
+}
+
+void BufferedStream::InternalWaitWritable(size_t count, boost::mutex::scoped_lock& lock)
+{
+ while (m_SendQ->GetAvailableBytes() + count > m_MaxBufferSize && !m_Exception && !m_Stopped)
+ m_WriteCV.wait(lock);
+}
void BufferedStream::MakeNonBlocking(void)
{
public:
DECLARE_PTR_TYPEDEFS(BufferedStream);
- BufferedStream(const Stream::Ptr& innerStream);
+ BufferedStream(const Stream::Ptr& innerStream, size_t maxBufferSize = 64 * 1024 * 1024);
~BufferedStream(void);
virtual size_t Read(void *buffer, size_t count);
FIFO::Ptr m_SendQ;
bool m_Blocking;
+ size_t m_MaxBufferSize;
boost::exception_ptr m_Exception;
boost::thread m_ReadThread;
boost::thread m_WriteThread;
+ void InternalWaitWritable(size_t count, boost::mutex::scoped_lock& lock);
void InternalWaitReadable(size_t count, boost::mutex::scoped_lock& lock);
};