using namespace icinga;
BufferedStream::BufferedStream(const Stream::Ptr& innerStream)
- : m_InnerStream(innerStream), m_RecvQ(boost::make_shared<FIFO>()), m_SendQ(boost::make_shared<FIFO>()),
- m_Exception(), m_Blocking(true)
+ : m_InnerStream(innerStream), m_Stopped(false),
+ m_RecvQ(boost::make_shared<FIFO>()), m_SendQ(boost::make_shared<FIFO>()),
+ m_Blocking(true), m_Exception()
{
- boost::thread readThread(boost::bind(&BufferedStream::ReadThreadProc, this));
- readThread.detach();
-
- boost::thread writeThread(boost::bind(&BufferedStream::WriteThreadProc, this));
- writeThread.detach();
+ m_ReadThread = boost::thread(boost::bind(&BufferedStream::ReadThreadProc, this));
+ m_WriteThread = boost::thread(boost::bind(&BufferedStream::WriteThreadProc, this));
+}
+
+BufferedStream::~BufferedStream(void)
+{
+ {
+ boost::mutex::scoped_lock lock(m_Mutex);
+
+ m_Stopped = true;
+ }
+
+ m_InnerStream->Close();
+
+ {
+ boost::mutex::scoped_lock lock(m_Mutex);
+
+ m_ReadCV.notify_all();
+ m_WriteCV.notify_all();
+ }
+
+ m_ReadThread.join();
+ m_WriteThread.join();
}
void BufferedStream::ReadThreadProc(void)
{
char buffer[512];
-
+
try {
for (;;) {
size_t rc = m_InnerStream->Read(buffer, sizeof(buffer));
-
+
if (rc == 0)
break;
-
+
boost::mutex::scoped_lock lock(m_Mutex);
m_RecvQ->Write(buffer, rc);
m_ReadCV.notify_all();
+
+ if (m_Stopped)
+ break;
}
} catch (const std::exception& ex) {
{
{
char buffer[512];
- try {
+ try {
for (;;) {
size_t rc;
-
+
{
boost::mutex::scoped_lock lock(m_Mutex);
-
- while (m_SendQ->GetAvailableBytes() == 0)
+
+ while (m_SendQ->GetAvailableBytes() == 0 && !m_Stopped)
m_WriteCV.wait(lock);
-
+
+ if (m_Stopped)
+ break;
+
rc = m_SendQ->Read(buffer, sizeof(buffer));
- }
-
+ }
+
m_InnerStream->Write(buffer, rc);
}
} catch (const std::exception& ex) {
boost::rethrow_exception(m_Exception);
m_SendQ->Write(buffer, count);
- m_WriteCV.notify_all();
+ m_WriteCV.notify_all();
}
void BufferedStream::WaitReadable(size_t count)
m_ReadCV.wait(lock);
}
-void BufferedStream::WaitWritable(size_t count)
+void BufferedStream::WaitWritable(size_t)
{ /* Nothing to do here. */ }
void BufferedStream::MakeNonBlocking(void)
m_Blocking = false;
}
-
typedef weak_ptr<BufferedStream> WeakPtr;
BufferedStream(const Stream::Ptr& innerStream);
+ ~BufferedStream(void);
virtual size_t Read(void *buffer, size_t count);
virtual void Write(const void *buffer, size_t count);
private:
Stream::Ptr m_InnerStream;
-
+
+ bool m_Stopped;
+
FIFO::Ptr m_RecvQ;
FIFO::Ptr m_SendQ;
bool m_Blocking;
-
+
boost::exception_ptr m_Exception;
-
+
boost::mutex m_Mutex;
boost::condition_variable m_ReadCV;
boost::condition_variable m_WriteCV;
-
+
void ReadThreadProc(void);
void WriteThreadProc(void);
+ boost::thread m_ReadThread;
+ boost::thread m_WriteThread;
+
void InternalWaitReadable(size_t count, boost::mutex::scoped_lock& lock);
};
BOOST_AUTO_TEST_CASE(tolong)
{
BOOST_CHECK(Utility::Match("*", "hello"));
+ BOOST_CHECK(!Utility::Match("\\**", "hello"));
+ BOOST_CHECK(Utility::Match("\\**", "*ello"));
+ BOOST_CHECK(Utility::Match("?e*l?", "hello"));
+ BOOST_CHECK(Utility::Match("?e*l?", "helo"));
BOOST_CHECK(!Utility::Match("world", "hello"));
BOOST_CHECK(!Utility::Match("hee*", "hello"));
BOOST_CHECK(Utility::Match("he??o", "hello"));
BOOST_CHECK(Utility::Match("he?", "hel"));
+ BOOST_CHECK(Utility::Match("he*", "hello"));
+ BOOST_CHECK(Utility::Match("he*o", "heo"));
+ BOOST_CHECK(Utility::Match("he**o", "heo"));
+ BOOST_CHECK(Utility::Match("he**o", "hello"));
}
BOOST_AUTO_TEST_SUITE_END()