]> granicus.if.org Git - icinga2/commitdiff
Fix file descriptors not getting closed properly
authorGunnar Beutner <gunnar@beutner.name>
Sat, 12 Apr 2014 06:16:57 +0000 (08:16 +0200)
committerGunnar Beutner <gunnar@beutner.name>
Sat, 12 Apr 2014 06:16:57 +0000 (08:16 +0200)
Refs #4865

components/perfdata/graphitewriter.cpp
lib/base/CMakeLists.txt
lib/base/bufferedstream.cpp [deleted file]
lib/base/bufferedstream.h [deleted file]
lib/base/tlsstream.cpp
lib/base/tlsstream.h

index f81d4c76cdb168b491e75abec78631388efc3055..b481f414794aef228ebc648c55c1728a6102bf33 100644 (file)
@@ -32,7 +32,6 @@
 #include "base/application.h"
 #include "base/stream.h"
 #include "base/networkstream.h"
-#include "base/bufferedstream.h"
 #include "base/exception.h"
 #include "base/statsfunction.h"
 #include <boost/algorithm/string.hpp>
@@ -90,8 +89,7 @@ void GraphiteWriter::ReconnectTimerHandler(void)
        Log(LogDebug, "perfdata", "GraphiteWriter: Reconnect to tcp socket on host '" + GetHost() + "' port '" + GetPort() + "'.");
        socket->Connect(GetHost(), GetPort());
 
-       NetworkStream::Ptr net_stream = make_shared<NetworkStream>(socket);
-       m_Stream = make_shared<BufferedStream>(net_stream);
+       m_Stream = make_shared<NetworkStream>(socket);
 }
 
 void GraphiteWriter::CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
index 86c765e7b629ebb5c5ba4ca4e0a0528956421b74..9c0ec21b9ffa771572395c967699238ce1174230 100644 (file)
@@ -23,7 +23,7 @@ mkclass_target(streamlogger.ti streamlogger.th)
 mkclass_target(sysloglogger.ti sysloglogger.th)
 
 add_library(base SHARED
-  application.cpp application.th array.cpp bufferedstream.cpp context.cpp
+  application.cpp application.th array.cpp context.cpp
   convert.cpp dictionary.cpp dynamicobject.cpp dynamicobject.th dynamictype.cpp
   exception.cpp fifo.cpp filelogger.cpp filelogger.th logger.cpp logger.th
   netstring.cpp networkstream.cpp object.cpp objectlock.cpp process.cpp
diff --git a/lib/base/bufferedstream.cpp b/lib/base/bufferedstream.cpp
deleted file mode 100644 (file)
index 1b879af..0000000
+++ /dev/null
@@ -1,223 +0,0 @@
-/******************************************************************************
- * Icinga 2                                                                   *
- * Copyright (C) 2012-2014 Icinga Development Team (http://www.icinga.org)    *
- *                                                                            *
- * This program is free software; you can redistribute it and/or              *
- * modify it under the terms of the GNU General Public License                *
- * as published by the Free Software Foundation; either version 2             *
- * of the License, or (at your option) any later version.                     *
- *                                                                            *
- * This program is distributed in the hope that it will be useful,            *
- * but WITHOUT ANY WARRANTY; without even the implied warranty of             *
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the              *
- * GNU General Public License for more details.                               *
- *                                                                            *
- * You should have received a copy of the GNU General Public License          *
- * along with this program; if not, write to the Free Software Foundation     *
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA.             *
- ******************************************************************************/
-
-#include "base/bufferedstream.h"
-#include "base/objectlock.h"
-#include "base/utility.h"
-#include "base/logger_fwd.h"
-#include <sstream>
-
-using namespace icinga;
-
-BufferedStream::BufferedStream(const Stream::Ptr& innerStream, size_t maxBufferSize)
-       : m_InnerStream(innerStream), m_Stopped(false), m_Eof(false),
-         m_RecvQ(make_shared<FIFO>()), m_SendQ(make_shared<FIFO>()),
-         m_Blocking(true), m_MaxBufferSize(maxBufferSize), m_Exception()
-{
-       m_ReadThread = boost::thread(boost::bind(&BufferedStream::ReadThreadProc, this));
-       m_WriteThread = boost::thread(boost::bind(&BufferedStream::WriteThreadProc, this));
-}
-
-BufferedStream::~BufferedStream(void)
-{
-       {
-               boost::mutex::scoped_lock lock(m_Mutex);
-
-               m_Stopped = true;
-       }
-
-       m_InnerStream->Close();
-
-       {
-               boost::mutex::scoped_lock lock(m_Mutex);
-
-               m_ReadCV.notify_all();
-               m_WriteCV.notify_all();
-       }
-
-       m_ReadThread.join();
-       m_WriteThread.join();
-}
-
-void BufferedStream::ReadThreadProc(void)
-{
-       char buffer[512];
-
-       Utility::SetThreadName("BufS Read");
-
-       try {
-               for (;;) {
-                       size_t rc = m_InnerStream->Read(buffer, sizeof(buffer));
-
-                       if (rc == 0) {
-                               boost::mutex::scoped_lock lock(m_Mutex);
-                               m_Eof = true;
-                               m_Stopped = true;
-                               m_ReadCV.notify_all();
-                               m_WriteCV.notify_all();
-
-                               break;
-                       }
-
-                       boost::mutex::scoped_lock lock(m_Mutex);
-                       m_RecvQ->Write(buffer, rc);
-                       m_ReadCV.notify_all();
-
-                       if (m_Stopped)
-                               break;
-               }
-       } catch (const std::exception& ex) {
-               {
-                       boost::mutex::scoped_lock lock(m_Mutex);
-
-                       if (!m_Exception)
-                               m_Exception = boost::current_exception();
-
-                       m_ReadCV.notify_all();
-               }
-       }
-}
-
-void BufferedStream::WriteThreadProc(void)
-{
-       char buffer[512];
-
-       Utility::SetThreadName("BufS Write");
-
-       try {
-               for (;;) {
-                       size_t rc;
-
-                       {
-                               boost::mutex::scoped_lock lock(m_Mutex);
-
-                               while (m_SendQ->GetAvailableBytes() == 0 && !m_Stopped)
-                                       m_WriteCV.wait(lock);
-
-                               if (m_Stopped)
-                                       break;
-
-                               rc = m_SendQ->Read(buffer, sizeof(buffer));
-                               m_WriteCV.notify_all();
-                       }
-
-                       m_InnerStream->Write(buffer, rc);
-               }
-       } catch (const std::exception& ex) {
-               {
-                       boost::mutex::scoped_lock lock(m_Mutex);
-
-                       if (!m_Exception)
-                               m_Exception = boost::current_exception();
-
-                       m_WriteCV.notify_all();
-               }
-       }
-}
-
-void BufferedStream::Close(void)
-{
-       m_InnerStream->Close();
-}
-
-/**
- * Reads data from the stream.
- *
- * @param buffer The buffer where data should be stored. May be NULL if you're
- *              not actually interested in the data.
- * @param count The number of bytes to read from the queue.
- * @returns The number of bytes actually read.
- */
-size_t BufferedStream::Read(void *buffer, size_t count)
-{
-       boost::mutex::scoped_lock lock(m_Mutex);
-
-       if (m_Blocking)
-               InternalWaitReadable(count, lock);
-
-       if (m_Exception)
-               boost::rethrow_exception(m_Exception);
-
-       if (m_Eof)
-               BOOST_THROW_EXCEPTION(std::invalid_argument("Tried to read from closed socket."));
-
-       return m_RecvQ->Read(buffer, count);
-}
-
-/**
- * Writes data to the stream.
- *
- * @param buffer The data that is to be written.
- * @param count The number of bytes to write.
- */
-void BufferedStream::Write(const void *buffer, size_t count)
-{
-       boost::mutex::scoped_lock lock(m_Mutex);
-
-       InternalWaitWritable(count, lock);
-
-       if (m_Exception)
-               boost::rethrow_exception(m_Exception);
-
-       if (m_Eof)
-               BOOST_THROW_EXCEPTION(std::invalid_argument("Tried to write to closed socket."));
-
-       m_SendQ->Write(buffer, count);
-       m_WriteCV.notify_all();
-}
-
-void BufferedStream::WaitReadable(size_t count)
-{
-       boost::mutex::scoped_lock lock(m_Mutex);
-
-       InternalWaitReadable(count, lock);
-}
-
-void BufferedStream::InternalWaitReadable(size_t count, boost::mutex::scoped_lock& lock)
-{
-       while (m_RecvQ->GetAvailableBytes() < count && !m_Exception && !m_Stopped)
-               m_ReadCV.wait(lock);
-}
-
-void BufferedStream::WaitWritable(size_t count)
-{
-       boost::mutex::scoped_lock lock(m_Mutex);
-
-       InternalWaitWritable(count, lock);
-}
-
-void BufferedStream::InternalWaitWritable(size_t count, boost::mutex::scoped_lock& lock)
-{
-       while (m_SendQ->GetAvailableBytes() + count > m_MaxBufferSize && !m_Exception && !m_Stopped)
-               m_WriteCV.wait(lock);
-}
-
-void BufferedStream::MakeNonBlocking(void)
-{
-       boost::mutex::scoped_lock lock(m_Mutex);
-
-       m_Blocking = false;
-}
-
-bool BufferedStream::IsEof(void) const
-{
-       boost::mutex::scoped_lock lock(m_Mutex);
-
-       return m_InnerStream->IsEof() && m_RecvQ->GetAvailableBytes() == 0;
-}
diff --git a/lib/base/bufferedstream.h b/lib/base/bufferedstream.h
deleted file mode 100644 (file)
index 5f34a4d..0000000
+++ /dev/null
@@ -1,85 +0,0 @@
-/******************************************************************************
- * Icinga 2                                                                   *
- * Copyright (C) 2012-2014 Icinga Development Team (http://www.icinga.org)    *
- *                                                                            *
- * This program is free software; you can redistribute it and/or              *
- * modify it under the terms of the GNU General Public License                *
- * as published by the Free Software Foundation; either version 2             *
- * of the License, or (at your option) any later version.                     *
- *                                                                            *
- * This program is distributed in the hope that it will be useful,            *
- * but WITHOUT ANY WARRANTY; without even the implied warranty of             *
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the              *
- * GNU General Public License for more details.                               *
- *                                                                            *
- * You should have received a copy of the GNU General Public License          *
- * along with this program; if not, write to the Free Software Foundation     *
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA.             *
- ******************************************************************************/
-
-#ifndef BUFFEREDSTREAM_H
-#define BUFFEREDSTREAM_H
-
-#include "base/i2-base.h"
-#include "base/stream.h"
-#include "base/fifo.h"
-
-namespace icinga
-{
-
-/**
- * A buffered stream.
- *
- * @ingroup base
- */
-class I2_BASE_API BufferedStream : public Stream
-{
-public:
-       DECLARE_PTR_TYPEDEFS(BufferedStream);
-
-       BufferedStream(const Stream::Ptr& innerStream, size_t maxBufferSize = 64 * 1024 * 1024);
-       ~BufferedStream(void);
-
-       virtual size_t Read(void *buffer, size_t count);
-       virtual void Write(const void *buffer, size_t count);
-
-       virtual void Close(void);
-
-       virtual bool IsEof(void) const;
-
-       void WaitReadable(size_t count);
-       void WaitWritable(size_t count);
-
-       void MakeNonBlocking(void);
-
-private:
-       Stream::Ptr m_InnerStream;
-
-       bool m_Stopped;
-       bool m_Eof;
-
-       FIFO::Ptr m_RecvQ;
-       FIFO::Ptr m_SendQ;
-
-       bool m_Blocking;
-       size_t m_MaxBufferSize;
-
-       boost::exception_ptr m_Exception;
-
-       mutable boost::mutex m_Mutex;
-       boost::condition_variable m_ReadCV;
-       boost::condition_variable m_WriteCV;
-
-       void ReadThreadProc(void);
-       void WriteThreadProc(void);
-
-       boost::thread m_ReadThread;
-       boost::thread m_WriteThread;
-
-       void InternalWaitWritable(size_t count, boost::mutex::scoped_lock& lock);
-       void InternalWaitReadable(size_t count, boost::mutex::scoped_lock& lock);
-};
-
-}
-
-#endif /* BUFFEREDSTREAM_H */
index f9c4c3d4851c37e962de7dfc93d8f821669916cc..06bc979ea89077cf241e7e479dd5d84484d11e6c 100644 (file)
@@ -38,15 +38,8 @@ bool I2_EXPORT TlsStream::m_SSLIndexInitialized = false;
  * @param sslContext The SSL context for the client.
  */
 TlsStream::TlsStream(const Stream::Ptr& innerStream, TlsRole role, shared_ptr<SSL_CTX> sslContext)
-       : m_SSLContext(sslContext), m_Role(role)
+       : m_InnerStream(innerStream), m_SSLContext(sslContext), m_Role(role)
 {
-       m_InnerStream = dynamic_pointer_cast<BufferedStream>(innerStream);
-       
-       if (!m_InnerStream)
-               m_InnerStream = make_shared<BufferedStream>(innerStream);
-
-       m_InnerStream->MakeNonBlocking();
-       
        m_SSL = shared_ptr<SSL>(SSL_new(m_SSLContext.get()), SSL_free);
 
        m_SSLContext.reset();
@@ -106,14 +99,8 @@ void TlsStream::Handshake(void)
        while ((rc = SSL_do_handshake(m_SSL.get())) <= 0) {
                switch (SSL_get_error(m_SSL.get(), rc)) {
                        case SSL_ERROR_WANT_READ:
-                               olock.Unlock();
-                               m_InnerStream->WaitReadable(1);
-                               olock.Lock();
                                continue;
                        case SSL_ERROR_WANT_WRITE:
-                               olock.Unlock();
-                               m_InnerStream->WaitWritable(1);
-                               olock.Lock();
                                continue;
                        case SSL_ERROR_ZERO_RETURN:
                                Close();
@@ -144,14 +131,8 @@ size_t TlsStream::Read(void *buffer, size_t count)
                if (rc <= 0) {
                        switch (SSL_get_error(m_SSL.get(), rc)) {
                                case SSL_ERROR_WANT_READ:
-                                       olock.Unlock();
-                                       m_InnerStream->WaitReadable(1);
-                                       olock.Lock();
                                        continue;
                                case SSL_ERROR_WANT_WRITE:
-                                       olock.Unlock();
-                                       m_InnerStream->WaitWritable(1);
-                                       olock.Lock();
                                        continue;
                                case SSL_ERROR_ZERO_RETURN:
                                        Close();
@@ -184,14 +165,8 @@ void TlsStream::Write(const void *buffer, size_t count)
                if (rc <= 0) {
                        switch (SSL_get_error(m_SSL.get(), rc)) {
                                case SSL_ERROR_WANT_READ:
-                                       olock.Unlock();
-                                       m_InnerStream->WaitReadable(1);
-                                       olock.Lock();
                                        continue;
                                case SSL_ERROR_WANT_WRITE:
-                                       olock.Unlock();
-                                       m_InnerStream->WaitWritable(1);
-                                       olock.Lock();
                                        continue;
                                case SSL_ERROR_ZERO_RETURN:
                                        Close();
index 89b4d6c4cdfe20fddcb781d8f88e9d88a22c56f5..1c1b2894f9b95e4ca62aabc139e1af4e1fb2ff6f 100644 (file)
@@ -21,7 +21,6 @@
 #define TLSSTREAM_H
 
 #include "base/i2-base.h"
-#include "base/bufferedstream.h"
 #include "base/stream.h"
 #include "base/fifo.h"
 #include "base/tlsutility.h"
@@ -64,7 +63,7 @@ private:
        shared_ptr<SSL> m_SSL;
        BIO *m_BIO;
 
-       BufferedStream::Ptr m_InnerStream;
+       Stream::Ptr m_InnerStream;
        TlsRole m_Role;
 
        static int m_SSLIndex;