From: Gunnar Beutner Date: Sat, 12 Apr 2014 06:16:57 +0000 (+0200) Subject: Fix file descriptors not getting closed properly X-Git-Tag: v0.0.10~132 X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=06fdaeb2b2c2511c830ed94c626b6030c69d3f19;p=icinga2 Fix file descriptors not getting closed properly Refs #4865 --- diff --git a/components/perfdata/graphitewriter.cpp b/components/perfdata/graphitewriter.cpp index f81d4c76c..b481f4147 100644 --- a/components/perfdata/graphitewriter.cpp +++ b/components/perfdata/graphitewriter.cpp @@ -32,7 +32,6 @@ #include "base/application.h" #include "base/stream.h" #include "base/networkstream.h" -#include "base/bufferedstream.h" #include "base/exception.h" #include "base/statsfunction.h" #include @@ -90,8 +89,7 @@ void GraphiteWriter::ReconnectTimerHandler(void) Log(LogDebug, "perfdata", "GraphiteWriter: Reconnect to tcp socket on host '" + GetHost() + "' port '" + GetPort() + "'."); socket->Connect(GetHost(), GetPort()); - NetworkStream::Ptr net_stream = make_shared(socket); - m_Stream = make_shared(net_stream); + m_Stream = make_shared(socket); } void GraphiteWriter::CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr) diff --git a/lib/base/CMakeLists.txt b/lib/base/CMakeLists.txt index 86c765e7b..9c0ec21b9 100644 --- a/lib/base/CMakeLists.txt +++ b/lib/base/CMakeLists.txt @@ -23,7 +23,7 @@ mkclass_target(streamlogger.ti streamlogger.th) mkclass_target(sysloglogger.ti sysloglogger.th) add_library(base SHARED - application.cpp application.th array.cpp bufferedstream.cpp context.cpp + application.cpp application.th array.cpp context.cpp convert.cpp dictionary.cpp dynamicobject.cpp dynamicobject.th dynamictype.cpp exception.cpp fifo.cpp filelogger.cpp filelogger.th logger.cpp logger.th netstring.cpp networkstream.cpp object.cpp objectlock.cpp process.cpp diff --git a/lib/base/bufferedstream.cpp b/lib/base/bufferedstream.cpp deleted file mode 100644 index 1b879afb0..000000000 --- a/lib/base/bufferedstream.cpp +++ /dev/null @@ -1,223 +0,0 @@ -/****************************************************************************** - * Icinga 2 * - * Copyright (C) 2012-2014 Icinga Development Team (http://www.icinga.org) * - * * - * This program is free software; you can redistribute it and/or * - * modify it under the terms of the GNU General Public License * - * as published by the Free Software Foundation; either version 2 * - * of the License, or (at your option) any later version. * - * * - * This program is distributed in the hope that it will be useful, * - * but WITHOUT ANY WARRANTY; without even the implied warranty of * - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * - * GNU General Public License for more details. * - * * - * You should have received a copy of the GNU General Public License * - * along with this program; if not, write to the Free Software Foundation * - * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. * - ******************************************************************************/ - -#include "base/bufferedstream.h" -#include "base/objectlock.h" -#include "base/utility.h" -#include "base/logger_fwd.h" -#include - -using namespace icinga; - -BufferedStream::BufferedStream(const Stream::Ptr& innerStream, size_t maxBufferSize) - : m_InnerStream(innerStream), m_Stopped(false), m_Eof(false), - m_RecvQ(make_shared()), m_SendQ(make_shared()), - m_Blocking(true), m_MaxBufferSize(maxBufferSize), m_Exception() -{ - m_ReadThread = boost::thread(boost::bind(&BufferedStream::ReadThreadProc, this)); - m_WriteThread = boost::thread(boost::bind(&BufferedStream::WriteThreadProc, this)); -} - -BufferedStream::~BufferedStream(void) -{ - { - boost::mutex::scoped_lock lock(m_Mutex); - - m_Stopped = true; - } - - m_InnerStream->Close(); - - { - boost::mutex::scoped_lock lock(m_Mutex); - - m_ReadCV.notify_all(); - m_WriteCV.notify_all(); - } - - m_ReadThread.join(); - m_WriteThread.join(); -} - -void BufferedStream::ReadThreadProc(void) -{ - char buffer[512]; - - Utility::SetThreadName("BufS Read"); - - try { - for (;;) { - size_t rc = m_InnerStream->Read(buffer, sizeof(buffer)); - - if (rc == 0) { - boost::mutex::scoped_lock lock(m_Mutex); - m_Eof = true; - m_Stopped = true; - m_ReadCV.notify_all(); - m_WriteCV.notify_all(); - - break; - } - - boost::mutex::scoped_lock lock(m_Mutex); - m_RecvQ->Write(buffer, rc); - m_ReadCV.notify_all(); - - if (m_Stopped) - break; - } - } catch (const std::exception& ex) { - { - boost::mutex::scoped_lock lock(m_Mutex); - - if (!m_Exception) - m_Exception = boost::current_exception(); - - m_ReadCV.notify_all(); - } - } -} - -void BufferedStream::WriteThreadProc(void) -{ - char buffer[512]; - - Utility::SetThreadName("BufS Write"); - - try { - for (;;) { - size_t rc; - - { - boost::mutex::scoped_lock lock(m_Mutex); - - while (m_SendQ->GetAvailableBytes() == 0 && !m_Stopped) - m_WriteCV.wait(lock); - - if (m_Stopped) - break; - - rc = m_SendQ->Read(buffer, sizeof(buffer)); - m_WriteCV.notify_all(); - } - - m_InnerStream->Write(buffer, rc); - } - } catch (const std::exception& ex) { - { - boost::mutex::scoped_lock lock(m_Mutex); - - if (!m_Exception) - m_Exception = boost::current_exception(); - - m_WriteCV.notify_all(); - } - } -} - -void BufferedStream::Close(void) -{ - m_InnerStream->Close(); -} - -/** - * Reads data from the stream. - * - * @param buffer The buffer where data should be stored. May be NULL if you're - * not actually interested in the data. - * @param count The number of bytes to read from the queue. - * @returns The number of bytes actually read. - */ -size_t BufferedStream::Read(void *buffer, size_t count) -{ - boost::mutex::scoped_lock lock(m_Mutex); - - if (m_Blocking) - InternalWaitReadable(count, lock); - - if (m_Exception) - boost::rethrow_exception(m_Exception); - - if (m_Eof) - BOOST_THROW_EXCEPTION(std::invalid_argument("Tried to read from closed socket.")); - - return m_RecvQ->Read(buffer, count); -} - -/** - * Writes data to the stream. - * - * @param buffer The data that is to be written. - * @param count The number of bytes to write. - */ -void BufferedStream::Write(const void *buffer, size_t count) -{ - boost::mutex::scoped_lock lock(m_Mutex); - - InternalWaitWritable(count, lock); - - if (m_Exception) - boost::rethrow_exception(m_Exception); - - if (m_Eof) - BOOST_THROW_EXCEPTION(std::invalid_argument("Tried to write to closed socket.")); - - m_SendQ->Write(buffer, count); - m_WriteCV.notify_all(); -} - -void BufferedStream::WaitReadable(size_t count) -{ - boost::mutex::scoped_lock lock(m_Mutex); - - InternalWaitReadable(count, lock); -} - -void BufferedStream::InternalWaitReadable(size_t count, boost::mutex::scoped_lock& lock) -{ - while (m_RecvQ->GetAvailableBytes() < count && !m_Exception && !m_Stopped) - m_ReadCV.wait(lock); -} - -void BufferedStream::WaitWritable(size_t count) -{ - boost::mutex::scoped_lock lock(m_Mutex); - - InternalWaitWritable(count, lock); -} - -void BufferedStream::InternalWaitWritable(size_t count, boost::mutex::scoped_lock& lock) -{ - while (m_SendQ->GetAvailableBytes() + count > m_MaxBufferSize && !m_Exception && !m_Stopped) - m_WriteCV.wait(lock); -} - -void BufferedStream::MakeNonBlocking(void) -{ - boost::mutex::scoped_lock lock(m_Mutex); - - m_Blocking = false; -} - -bool BufferedStream::IsEof(void) const -{ - boost::mutex::scoped_lock lock(m_Mutex); - - return m_InnerStream->IsEof() && m_RecvQ->GetAvailableBytes() == 0; -} diff --git a/lib/base/bufferedstream.h b/lib/base/bufferedstream.h deleted file mode 100644 index 5f34a4d4c..000000000 --- a/lib/base/bufferedstream.h +++ /dev/null @@ -1,85 +0,0 @@ -/****************************************************************************** - * Icinga 2 * - * Copyright (C) 2012-2014 Icinga Development Team (http://www.icinga.org) * - * * - * This program is free software; you can redistribute it and/or * - * modify it under the terms of the GNU General Public License * - * as published by the Free Software Foundation; either version 2 * - * of the License, or (at your option) any later version. * - * * - * This program is distributed in the hope that it will be useful, * - * but WITHOUT ANY WARRANTY; without even the implied warranty of * - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * - * GNU General Public License for more details. * - * * - * You should have received a copy of the GNU General Public License * - * along with this program; if not, write to the Free Software Foundation * - * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. * - ******************************************************************************/ - -#ifndef BUFFEREDSTREAM_H -#define BUFFEREDSTREAM_H - -#include "base/i2-base.h" -#include "base/stream.h" -#include "base/fifo.h" - -namespace icinga -{ - -/** - * A buffered stream. - * - * @ingroup base - */ -class I2_BASE_API BufferedStream : public Stream -{ -public: - DECLARE_PTR_TYPEDEFS(BufferedStream); - - BufferedStream(const Stream::Ptr& innerStream, size_t maxBufferSize = 64 * 1024 * 1024); - ~BufferedStream(void); - - virtual size_t Read(void *buffer, size_t count); - virtual void Write(const void *buffer, size_t count); - - virtual void Close(void); - - virtual bool IsEof(void) const; - - void WaitReadable(size_t count); - void WaitWritable(size_t count); - - void MakeNonBlocking(void); - -private: - Stream::Ptr m_InnerStream; - - bool m_Stopped; - bool m_Eof; - - FIFO::Ptr m_RecvQ; - FIFO::Ptr m_SendQ; - - bool m_Blocking; - size_t m_MaxBufferSize; - - boost::exception_ptr m_Exception; - - mutable boost::mutex m_Mutex; - boost::condition_variable m_ReadCV; - boost::condition_variable m_WriteCV; - - void ReadThreadProc(void); - void WriteThreadProc(void); - - boost::thread m_ReadThread; - boost::thread m_WriteThread; - - void InternalWaitWritable(size_t count, boost::mutex::scoped_lock& lock); - void InternalWaitReadable(size_t count, boost::mutex::scoped_lock& lock); -}; - -} - -#endif /* BUFFEREDSTREAM_H */ diff --git a/lib/base/tlsstream.cpp b/lib/base/tlsstream.cpp index f9c4c3d48..06bc979ea 100644 --- a/lib/base/tlsstream.cpp +++ b/lib/base/tlsstream.cpp @@ -38,15 +38,8 @@ bool I2_EXPORT TlsStream::m_SSLIndexInitialized = false; * @param sslContext The SSL context for the client. */ TlsStream::TlsStream(const Stream::Ptr& innerStream, TlsRole role, shared_ptr sslContext) - : m_SSLContext(sslContext), m_Role(role) + : m_InnerStream(innerStream), m_SSLContext(sslContext), m_Role(role) { - m_InnerStream = dynamic_pointer_cast(innerStream); - - if (!m_InnerStream) - m_InnerStream = make_shared(innerStream); - - m_InnerStream->MakeNonBlocking(); - m_SSL = shared_ptr(SSL_new(m_SSLContext.get()), SSL_free); m_SSLContext.reset(); @@ -106,14 +99,8 @@ void TlsStream::Handshake(void) while ((rc = SSL_do_handshake(m_SSL.get())) <= 0) { switch (SSL_get_error(m_SSL.get(), rc)) { case SSL_ERROR_WANT_READ: - olock.Unlock(); - m_InnerStream->WaitReadable(1); - olock.Lock(); continue; case SSL_ERROR_WANT_WRITE: - olock.Unlock(); - m_InnerStream->WaitWritable(1); - olock.Lock(); continue; case SSL_ERROR_ZERO_RETURN: Close(); @@ -144,14 +131,8 @@ size_t TlsStream::Read(void *buffer, size_t count) if (rc <= 0) { switch (SSL_get_error(m_SSL.get(), rc)) { case SSL_ERROR_WANT_READ: - olock.Unlock(); - m_InnerStream->WaitReadable(1); - olock.Lock(); continue; case SSL_ERROR_WANT_WRITE: - olock.Unlock(); - m_InnerStream->WaitWritable(1); - olock.Lock(); continue; case SSL_ERROR_ZERO_RETURN: Close(); @@ -184,14 +165,8 @@ void TlsStream::Write(const void *buffer, size_t count) if (rc <= 0) { switch (SSL_get_error(m_SSL.get(), rc)) { case SSL_ERROR_WANT_READ: - olock.Unlock(); - m_InnerStream->WaitReadable(1); - olock.Lock(); continue; case SSL_ERROR_WANT_WRITE: - olock.Unlock(); - m_InnerStream->WaitWritable(1); - olock.Lock(); continue; case SSL_ERROR_ZERO_RETURN: Close(); diff --git a/lib/base/tlsstream.h b/lib/base/tlsstream.h index 89b4d6c4c..1c1b2894f 100644 --- a/lib/base/tlsstream.h +++ b/lib/base/tlsstream.h @@ -21,7 +21,6 @@ #define TLSSTREAM_H #include "base/i2-base.h" -#include "base/bufferedstream.h" #include "base/stream.h" #include "base/fifo.h" #include "base/tlsutility.h" @@ -64,7 +63,7 @@ private: shared_ptr m_SSL; BIO *m_BIO; - BufferedStream::Ptr m_InnerStream; + Stream::Ptr m_InnerStream; TlsRole m_Role; static int m_SSLIndex;