]> granicus.if.org Git - icinga2/commitdiff
Bugfixes for the BufferedStream class.
authorGunnar Beutner <gunnar.beutner@netways.de>
Fri, 19 Apr 2013 12:47:41 +0000 (14:47 +0200)
committerGunnar Beutner <gunnar.beutner@netways.de>
Fri, 19 Apr 2013 12:47:41 +0000 (14:47 +0200)
lib/base/bufferedstream.cpp
lib/base/bufferedstream.h
test/base-match.cpp
third-party/mmatch/mmatch.c

index 4f9bc7bcdeb3795b0df69b20badf61b0725210fa..b9346279e39454173776887c6b7c7be11ded0153 100644 (file)
 using namespace icinga;
 
 BufferedStream::BufferedStream(const Stream::Ptr& innerStream)
-       : m_InnerStream(innerStream), m_RecvQ(boost::make_shared<FIFO>()), m_SendQ(boost::make_shared<FIFO>()),
-         m_Exception(), m_Blocking(true)
+       : m_InnerStream(innerStream), m_Stopped(false),
+         m_RecvQ(boost::make_shared<FIFO>()), m_SendQ(boost::make_shared<FIFO>()),
+         m_Blocking(true), m_Exception()
 {
-       boost::thread readThread(boost::bind(&BufferedStream::ReadThreadProc, this));
-       readThread.detach();
-       
-       boost::thread writeThread(boost::bind(&BufferedStream::WriteThreadProc, this));
-       writeThread.detach();
+       m_ReadThread = boost::thread(boost::bind(&BufferedStream::ReadThreadProc, this));
+       m_WriteThread = boost::thread(boost::bind(&BufferedStream::WriteThreadProc, this));
+}
+
+BufferedStream::~BufferedStream(void)
+{
+       {
+               boost::mutex::scoped_lock lock(m_Mutex);
+
+               m_Stopped = true;
+       }
+
+       m_InnerStream->Close();
+
+       {
+               boost::mutex::scoped_lock lock(m_Mutex);
+
+               m_ReadCV.notify_all();
+               m_WriteCV.notify_all();
+       }
+
+       m_ReadThread.join();
+       m_WriteThread.join();
 }
 
 void BufferedStream::ReadThreadProc(void)
 {
        char buffer[512];
-       
+
        try {
                for (;;) {
                        size_t rc = m_InnerStream->Read(buffer, sizeof(buffer));
-                       
+
                        if (rc == 0)
                                break;
-                       
+
                        boost::mutex::scoped_lock lock(m_Mutex);
                        m_RecvQ->Write(buffer, rc);
                        m_ReadCV.notify_all();
+
+                       if (m_Stopped)
+                               break;
                }
        } catch (const std::exception& ex) {
                {
@@ -68,19 +90,22 @@ void BufferedStream::WriteThreadProc(void)
 {
        char buffer[512];
 
-       try {   
+       try {
                for (;;) {
                        size_t rc;
-       
+
                        {
                                boost::mutex::scoped_lock lock(m_Mutex);
-                               
-                               while (m_SendQ->GetAvailableBytes() == 0)
+
+                               while (m_SendQ->GetAvailableBytes() == 0 && !m_Stopped)
                                        m_WriteCV.wait(lock);
-                                       
+
+                               if (m_Stopped)
+                                       break;
+
                                rc = m_SendQ->Read(buffer, sizeof(buffer));
-                       }               
-                       
+                       }
+
                        m_InnerStream->Write(buffer, rc);
                }
        } catch (const std::exception& ex) {
@@ -136,7 +161,7 @@ void BufferedStream::Write(const void *buffer, size_t count)
                boost::rethrow_exception(m_Exception);
 
        m_SendQ->Write(buffer, count);
-       m_WriteCV.notify_all(); 
+       m_WriteCV.notify_all();
 }
 
 void BufferedStream::WaitReadable(size_t count)
@@ -152,7 +177,7 @@ void BufferedStream::InternalWaitReadable(size_t count, boost::mutex::scoped_loc
                m_ReadCV.wait(lock);
 }
 
-void BufferedStream::WaitWritable(size_t count)
+void BufferedStream::WaitWritable(size_t)
 { /* Nothing to do here. */ }
 
 void BufferedStream::MakeNonBlocking(void)
@@ -161,4 +186,3 @@ void BufferedStream::MakeNonBlocking(void)
 
        m_Blocking = false;
 }
-
index 08efb94c3588bf0447835699af06899de1bb4148..4176a7754c68c724d77e0022af1b550925320fea 100644 (file)
@@ -39,6 +39,7 @@ public:
        typedef weak_ptr<BufferedStream> WeakPtr;
 
        BufferedStream(const Stream::Ptr& innerStream);
+       ~BufferedStream(void);
 
        virtual size_t Read(void *buffer, size_t count);
        virtual void Write(const void *buffer, size_t count);
@@ -52,21 +53,26 @@ public:
 
 private:
        Stream::Ptr m_InnerStream;
-       
+
+       bool m_Stopped;
+
        FIFO::Ptr m_RecvQ;
        FIFO::Ptr m_SendQ;
 
        bool m_Blocking;
-       
+
        boost::exception_ptr m_Exception;
-       
+
        boost::mutex m_Mutex;
        boost::condition_variable m_ReadCV;
        boost::condition_variable m_WriteCV;
-       
+
        void ReadThreadProc(void);
        void WriteThreadProc(void);
 
+       boost::thread m_ReadThread;
+       boost::thread m_WriteThread;
+
        void InternalWaitReadable(size_t count, boost::mutex::scoped_lock& lock);
 };
 
index 84ae92e974f59d002bde88ebbbc480038f3cebed..b3cf5a11e84864552053c8c0bb4638f4196ab093 100644 (file)
@@ -27,10 +27,18 @@ BOOST_AUTO_TEST_SUITE(base_match)
 BOOST_AUTO_TEST_CASE(tolong)
 {
        BOOST_CHECK(Utility::Match("*", "hello"));
+       BOOST_CHECK(!Utility::Match("\\**", "hello"));
+       BOOST_CHECK(Utility::Match("\\**", "*ello"));
+       BOOST_CHECK(Utility::Match("?e*l?", "hello"));
+       BOOST_CHECK(Utility::Match("?e*l?", "helo"));
        BOOST_CHECK(!Utility::Match("world", "hello"));
        BOOST_CHECK(!Utility::Match("hee*", "hello"));
        BOOST_CHECK(Utility::Match("he??o", "hello"));
        BOOST_CHECK(Utility::Match("he?", "hel"));
+       BOOST_CHECK(Utility::Match("he*", "hello"));
+       BOOST_CHECK(Utility::Match("he*o", "heo"));
+       BOOST_CHECK(Utility::Match("he**o", "heo"));
+       BOOST_CHECK(Utility::Match("he**o", "hello"));
 }
 
 BOOST_AUTO_TEST_SUITE_END()
index e6fb1093f2584aaaa6e3df281af8c22e0bef9112..b68f1bf5a0fb2300b82c6ce5cc405e05f3bade8e 100644 (file)
@@ -45,6 +45,7 @@
  * And last but not least, '\?' and '\*' in `new_mask' now become one character.
  */
 
+#if 0
 int mmatch(const char *old_mask, const char *new_mask)
 {
   const char *m = old_mask;
@@ -145,6 +146,7 @@ int mmatch(const char *old_mask, const char *new_mask)
     }
   }
 }
+#endif
 
 /*
  * Compare if a given string (name) matches the given
@@ -261,6 +263,7 @@ break_while:
  * Note that this new optimized alghoritm can *only* work in place.
  */
 
+#if 0
 char *collapse(char *pattern)
 {
   int star = 0;
@@ -303,4 +306,4 @@ char *collapse(char *pattern)
   };
   return pattern;
 }
-
+#endif