]> granicus.if.org Git - icinga2/commitdiff
Move most of the socket I/O to a separate thread
authorGunnar Beutner <gunnar@beutner.name>
Fri, 13 Feb 2015 20:02:48 +0000 (21:02 +0100)
committerGunnar Beutner <gunnar@beutner.name>
Sat, 14 Feb 2015 08:42:11 +0000 (09:42 +0100)
fixes #8300
fixes #8243

15 files changed:
lib/base/CMakeLists.txt
lib/base/fifo.cpp
lib/base/fifo.hpp
lib/base/socket.cpp
lib/base/socket.hpp
lib/base/socketevents.cpp [new file with mode: 0644]
lib/base/socketevents.hpp [new file with mode: 0644]
lib/base/tlsstream.cpp
lib/base/tlsstream.hpp
lib/cli/pkiutility.cpp
lib/remote/apiclient.cpp
third-party/CMakeLists.txt
third-party/socketpair/CMakeLists.txt [new file with mode: 0644]
third-party/socketpair/socketpair.c [new file with mode: 0644]
third-party/socketpair/socketpair.h [new file with mode: 0644]

index ed60d2aa77848e31f7d2755d5de201f8fbefcaef..0a880e2c3310ec46352ecc15848b51af427b4332 100644 (file)
@@ -28,7 +28,7 @@ set(base_SOURCES
   exception.cpp fifo.cpp filelogger.cpp filelogger.thpp initialize.cpp json.cpp json-script.cpp logger.cpp logger.thpp math-script.cpp
   netstring.cpp networkstream.cpp number.cpp number-script.cpp object.cpp object-script.cpp primitivetype.cpp process.cpp
   ringbuffer.cpp scriptframe.cpp function.cpp function-script.cpp functionwrapper.cpp scriptglobal.cpp
-  scriptutils.cpp serializer.cpp socket.cpp stacktrace.cpp
+  scriptutils.cpp serializer.cpp socket.cpp socketevents.cpp stacktrace.cpp
   statsfunction.cpp stdiostream.cpp stream.cpp streamlogger.cpp streamlogger.thpp string.cpp string-script.cpp
   sysloglogger.cpp sysloglogger.thpp tcpsocket.cpp thinmutex.cpp threadpool.cpp timer.cpp
   tlsstream.cpp tlsutility.cpp type.cpp unixsocket.cpp utility.cpp value.cpp
@@ -43,7 +43,7 @@ endif()
 
 add_library(base SHARED ${base_SOURCES})
 
-target_link_libraries(base ${CMAKE_DL_LIBS} ${Boost_LIBRARIES} ${OPENSSL_LIBRARIES} ${YAJL_LIBRARIES} mmatch)
+target_link_libraries(base ${CMAKE_DL_LIBS} ${Boost_LIBRARIES} ${OPENSSL_LIBRARIES} ${YAJL_LIBRARIES} mmatch socketpair)
 
 if(HAVE_LIBEXECINFO)
     target_link_libraries(base execinfo)
@@ -55,6 +55,9 @@ link_directories(${icinga2_BINARY_DIR}/third-party/execvpe)
 include_directories(${icinga2_SOURCE_DIR}/third-party/mmatch)
 link_directories(${icinga2_BINARY_DIR}/third-party/mmatch)
 
+include_directories(${icinga2_SOURCE_DIR}/third-party/socketpair)
+link_directories(${icinga2_BINARY_DIR}/third-party/socketpair)
+
 if(UNIX OR CYGWIN)
   target_link_libraries(base execvpe)
 endif()
index 2a2a7e8d07a81bb047036f2293ed9d36d98857d4..dc8705a87697968c290165234da7158496a53166 100644 (file)
@@ -78,6 +78,17 @@ void FIFO::Optimize(void)
        }
 }
 
+size_t FIFO::Peek(void *buffer, size_t count)
+{
+       if (count > m_DataSize)
+               count = m_DataSize;
+
+       if (buffer != NULL)
+               std::memcpy(buffer, m_Buffer + m_Offset, count);
+
+       return count;
+}
+
 /**
  * Implements IOQueue::Read.
  */
index 58e6b510d4b3e640798afc7ca41259cfe2e4e289..7a629b196f357aa4e74a366817a207e70e6f9e0c 100644 (file)
@@ -41,6 +41,7 @@ public:
        FIFO(void);
        ~FIFO(void);
 
+       size_t Peek(void *buffer, size_t count);
        virtual size_t Read(void *buffer, size_t count);
        virtual void Write(const void *buffer, size_t count);
        virtual void Close(void);
index f8cecc25e36194f9629b0f4e2197f17d5af1ed06..0289d4258e847ad91ee56358dc4c27a3ec3c2d35 100644 (file)
@@ -26,6 +26,7 @@
 #include <iostream>
 #include <boost/exception/errinfo_api_function.hpp>
 #include <boost/exception/errinfo_errno.hpp>
+#include <socketpair.h>
 
 #ifndef _WIN32
 #      include <poll.h>
@@ -401,3 +402,12 @@ void Socket::MakeNonBlocking(void)
        Utility::SetNonBlocking(GetFD());
 #endif /* _WIN32 */
 }
+
+void Socket::SocketPair(SOCKET s[2])
+{
+       if (dumb_socketpair(s, 0) < 0)
+               BOOST_THROW_EXCEPTION(socket_error()
+                   << boost::errinfo_api_function("socketpair")
+                   << boost::errinfo_errno(errno));
+}
+
index 6b7cc09908f5a155aa07f66186bf9c0bd306f6fa..ed4bc9c6c1b2c183a69ef232b36d28a3ff8a2c8b 100644 (file)
@@ -61,6 +61,8 @@ public:
 
        void MakeNonBlocking(void);
 
+       static void SocketPair(SOCKET s[2]);
+
 protected:
        void SetFD(SOCKET fd);
 
diff --git a/lib/base/socketevents.cpp b/lib/base/socketevents.cpp
new file mode 100644 (file)
index 0000000..26c6200
--- /dev/null
@@ -0,0 +1,205 @@
+/******************************************************************************
+ * Icinga 2                                                                   *
+ * Copyright (C) 2012-2015 Icinga Development Team (http://www.icinga.org)    *
+ *                                                                            *
+ * This program is free software; you can redistribute it and/or              *
+ * modify it under the terms of the GNU General Public License                *
+ * as published by the Free Software Foundation; either version 2             *
+ * of the License, or (at your option) any later version.                     *
+ *                                                                            *
+ * This program is distributed in the hope that it will be useful,            *
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of             *
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the              *
+ * GNU General Public License for more details.                               *
+ *                                                                            *
+ * You should have received a copy of the GNU General Public License          *
+ * along with this program; if not, write to the Free Software Foundation     *
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA.             *
+ ******************************************************************************/
+
+#include "base/socketevents.hpp"
+#include "base/exception.hpp"
+#include "base/logger.hpp"
+#include <boost/thread/once.hpp>
+#include <boost/foreach.hpp>
+#include <map>
+
+#ifndef _WIN32
+#      include <poll.h>
+#endif /* _WIN32 */
+
+using namespace icinga;
+
+struct SocketEventDescriptor
+{
+       int Events;
+       SocketEvents *EventInterface;
+
+       SocketEventDescriptor(void)
+               : Events(0)
+       { }
+};
+
+static boost::once_flag l_SocketIOOnceFlag = BOOST_ONCE_INIT;
+static SOCKET l_SocketIOEventFDs[2];
+static boost::mutex l_SocketIOMutex;
+static std::map<SOCKET, SocketEventDescriptor> l_SocketIOSockets;
+
+void SocketEvents::InitializeThread(void)
+{
+       Socket::SocketPair(l_SocketIOEventFDs);
+
+       Utility::SetNonBlockingSocket(l_SocketIOEventFDs[0]);
+       Utility::SetNonBlockingSocket(l_SocketIOEventFDs[1]);
+
+#ifndef _WIN32
+       Utility::SetCloExec(l_SocketIOEventFDs[0]);
+       Utility::SetCloExec(l_SocketIOEventFDs[1]);
+#endif /* _WIN32 */
+
+       SocketEventDescriptor sed;
+       sed.Events = POLLIN;
+
+       l_SocketIOSockets[l_SocketIOEventFDs[0]] = sed;
+
+       boost::thread thread(&SocketEvents::ThreadProc);
+       thread.detach();
+}
+
+void SocketEvents::ThreadProc(void)
+{
+       Utility::SetThreadName("SocketIO");
+
+       for (;;) {
+               pollfd *pfds;
+               int pfdcount;
+
+               typedef std::map<SOCKET, SocketEventDescriptor>::value_type SocketDesc;
+
+               {
+                       boost::mutex::scoped_lock lock(l_SocketIOMutex);
+
+                       pfdcount = l_SocketIOSockets.size();
+                       pfds  = new pollfd[pfdcount];
+
+                       int i = 0;
+
+                       BOOST_FOREACH(const SocketDesc& desc, l_SocketIOSockets) {
+                               pfds[i].fd = desc.first;
+                               pfds[i].events = desc.second.Events;
+                               pfds[i].revents = 0;
+
+                               i++;
+                       }
+               }
+
+#ifdef _WIN32
+               (void) WSAPoll(pfds, pfdcount, -1);
+#else /* _WIN32 */
+               (void) poll(pfds, pfdcount, -1);
+#endif /* _WIN32 */
+
+               for (int i = 0; i < pfdcount; i++) {
+                       if ((pfds[i].revents & (POLLIN | POLLOUT | POLLHUP | POLLERR)) == 0)
+                               continue;
+
+                       if (pfds[i].fd == l_SocketIOEventFDs[0]) {
+                               char buffer[512];
+                               if (recv(l_SocketIOEventFDs[0], buffer, sizeof(buffer), 0) < 0)
+                                       Log(LogCritical, "SocketEvents", "Read from event FD failed.");
+
+                               continue;
+                       }
+
+                       SocketEventDescriptor desc;
+                       Object::Ptr ltref;
+
+                       {
+                               boost::mutex::scoped_lock lock(l_SocketIOMutex);
+
+                               std::map<SOCKET, SocketEventDescriptor>::const_iterator it = l_SocketIOSockets.find(pfds[i].fd);
+
+                               if (it == l_SocketIOSockets.end())
+                                       continue;
+
+                               desc = it->second;
+
+                               /* We must hold a ref-counted reference to the event object to keep it alive. */
+                               ltref = dynamic_cast<Object *>(desc.EventInterface);
+                       }
+
+                       desc.EventInterface->OnEvent(pfds[i].revents);
+               }
+
+               delete [] pfds;
+       }
+}
+
+void SocketEvents::WakeUpThread(void)
+{
+       (void) send(l_SocketIOEventFDs[1], "T", 1, 0);
+}
+
+/**
+ * Constructor for the SocketEvents class.
+ */
+SocketEvents::SocketEvents(const Socket::Ptr& socket)
+       : m_FD(socket->GetFD())
+{
+       boost::call_once(l_SocketIOOnceFlag, &SocketEvents::InitializeThread);
+
+       Register();
+}
+
+SocketEvents::~SocketEvents(void)
+{
+       Unregister();
+}
+
+void SocketEvents::Register(void)
+{
+       SocketEventDescriptor desc;
+       desc.Events = 0;
+       desc.EventInterface = this;
+
+       {
+               boost::mutex::scoped_lock lock(l_SocketIOMutex);
+
+               l_SocketIOSockets[m_FD] = desc;
+       }
+
+       /* There's no need to wake up the I/O thread here. */
+}
+
+void SocketEvents::Unregister(void)
+{
+       {
+               boost::mutex::scoped_lock lock(l_SocketIOMutex);
+
+               l_SocketIOSockets.erase(m_FD);
+       }
+
+       /* There's no need to wake up the I/O thread here. */
+}
+
+void SocketEvents::ChangeEvents(int events)
+{
+       {
+               boost::mutex::scoped_lock lock(l_SocketIOMutex);
+
+               std::map<SOCKET, SocketEventDescriptor>::iterator it = l_SocketIOSockets.find(m_FD);
+
+               if (it == l_SocketIOSockets.end())
+                       return;
+
+               it->second.Events = events;
+       }
+
+       WakeUpThread();
+}
+
+void SocketEvents::OnEvent(int revents)
+{
+
+}
+
diff --git a/lib/base/socketevents.hpp b/lib/base/socketevents.hpp
new file mode 100644 (file)
index 0000000..0d3a342
--- /dev/null
@@ -0,0 +1,62 @@
+/******************************************************************************
+ * Icinga 2                                                                   *
+ * Copyright (C) 2012-2015 Icinga Development Team (http://www.icinga.org)    *
+ *                                                                            *
+ * This program is free software; you can redistribute it and/or              *
+ * modify it under the terms of the GNU General Public License                *
+ * as published by the Free Software Foundation; either version 2             *
+ * of the License, or (at your option) any later version.                     *
+ *                                                                            *
+ * This program is distributed in the hope that it will be useful,            *
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of             *
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the              *
+ * GNU General Public License for more details.                               *
+ *                                                                            *
+ * You should have received a copy of the GNU General Public License          *
+ * along with this program; if not, write to the Free Software Foundation     *
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA.             *
+ ******************************************************************************/
+
+#ifndef SOCKETEVENTS_H
+#define SOCKETEVENTS_H
+
+#include "base/i2-base.hpp"
+#include "base/socket.hpp"
+
+namespace icinga
+{
+
+/**
+ * Socket event interface
+ *
+ * @ingroup base
+ */
+class I2_BASE_API SocketEvents
+{
+public:
+       ~SocketEvents(void);
+
+       virtual void OnEvent(int revents);
+
+       void Register(void);
+       void Unregister(void);
+
+       void ChangeEvents(int events);
+
+protected:
+       SocketEvents(const Socket::Ptr& socket);
+
+private:
+       SOCKET m_FD;
+
+       static void InitializeThread(void);
+       static void ThreadProc(void);
+
+       static void WakeUpThread(void);
+
+       int GetPollEvents(void) const;
+};
+
+}
+
+#endif /* SOCKETEVENTS_H */
index f4815aff081f5b42b75d9721e3b87c51cd01e07e..c898b9a03bffcfecca58bce4bd71e88170c0de56 100644 (file)
 #include <boost/bind.hpp>
 #include <iostream>
 
+#ifndef _WIN32
+#      include <poll.h>
+#endif /* _WIN32 */
+
 using namespace icinga;
 
 int I2_EXPORT TlsStream::m_SSLIndex;
@@ -36,7 +40,9 @@ bool I2_EXPORT TlsStream::m_SSLIndexInitialized = false;
  * @param sslContext The SSL context for the client.
  */
 TlsStream::TlsStream(const Socket::Ptr& socket, ConnectionRole role, const boost::shared_ptr<SSL_CTX>& sslContext)
-       : m_Eof(false), m_VerifyOK(true), m_Socket(socket), m_Role(role)
+       : SocketEvents(socket), m_Eof(false), m_HandshakeOK(false), m_VerifyOK(true), m_CloseOK(false), m_ErrorCode(0),
+         m_ErrorOccurred(false),  m_Socket(socket), m_Role(role), m_SendQ(new FIFO()), m_RecvQ(new FIFO()),
+         m_CurrentAction(TlsActionNone), m_Retry(false)
 {
        std::ostringstream msgbuf;
        char errbuf[120];
@@ -92,7 +98,7 @@ bool TlsStream::IsVerifyOK(void) const
  */
 boost::shared_ptr<X509> TlsStream::GetClientCertificate(void) const
 {
-       boost::mutex::scoped_lock lock(m_SSLLock);
+       boost::mutex::scoped_lock lock(m_Mutex);
        return boost::shared_ptr<X509>(SSL_get_certificate(m_SSL.get()), &Utility::NullDeleter);
 }
 
@@ -103,220 +109,182 @@ boost::shared_ptr<X509> TlsStream::GetClientCertificate(void) const
  */
 boost::shared_ptr<X509> TlsStream::GetPeerCertificate(void) const
 {
-       boost::mutex::scoped_lock lock(m_SSLLock);
+       boost::mutex::scoped_lock lock(m_Mutex);
        return boost::shared_ptr<X509>(SSL_get_peer_certificate(m_SSL.get()), X509_free);
 }
 
-void TlsStream::Handshake(void)
+void TlsStream::OnEvent(int revents)
 {
-       std::ostringstream msgbuf;
-       char errbuf[120];
-
-       boost::mutex::scoped_lock alock(m_IOActionLock);
+       int rc, err;
+       size_t count;
 
-       for (;;) {
-               int rc, err;
-
-               {
-                       boost::mutex::scoped_lock lock(m_SSLLock);
-                       rc = SSL_do_handshake(m_SSL.get());
-
-                       if (rc > 0)
-                               break;
+       boost::mutex::scoped_lock lock(m_Mutex);
 
-                       err = SSL_get_error(m_SSL.get(), rc);
-               }
+       char buffer[512];
 
-               switch (err) {
-                       case SSL_ERROR_WANT_READ:
-                               try {
-                                       m_Socket->Poll(true, false);
-                               } catch (const std::exception&) {}
-                               continue;
-                       case SSL_ERROR_WANT_WRITE:
-                               try {
-                                       m_Socket->Poll(false, true);
-                               } catch (const std::exception&) {}
-                               continue;
-                       case SSL_ERROR_ZERO_RETURN:
-                               CloseUnlocked();
-                               return;
-                       default:
-                               msgbuf << "SSL_do_handshake() failed with code " << ERR_peek_error() << ", \"" << ERR_error_string(ERR_peek_error(), errbuf) << "\"";
-                               Log(LogCritical, "TlsStream", msgbuf.str());
-
-                               BOOST_THROW_EXCEPTION(openssl_error()
-                                   << boost::errinfo_api_function("SSL_do_handshake")
-                                   << errinfo_openssl_error(ERR_peek_error()));
-               }
+       if (m_CurrentAction == TlsActionNone) {
+               if (m_SendQ->GetAvailableBytes() > 0)
+                       m_CurrentAction = TlsActionWrite;
+               else
+                       m_CurrentAction = TlsActionRead;
        }
-}
 
-/**
- * Processes data for the stream.
- */
-size_t TlsStream::Read(void *buffer, size_t count)
-{
-       size_t left = count;
-       std::ostringstream msgbuf;
-       char errbuf[120];
+       switch (m_CurrentAction) {
+               case TlsActionRead:
+                       do {
+                               rc = SSL_read(m_SSL.get(), buffer, sizeof(buffer));
 
-       bool want_read;
+                               if (rc > 0) {
+                                       m_RecvQ->Write(buffer, rc);
+                                       m_CV.notify_all();
+                               }
+                       } while (SSL_pending(m_SSL.get()));
 
-       {
-               boost::mutex::scoped_lock lock(m_SSLLock);
-               want_read = !SSL_pending(m_SSL.get()) || SSL_want_read(m_SSL.get());
-       }
+                       break;
+               case TlsActionWrite:
+                       count = m_SendQ->Peek(buffer, sizeof(buffer));
 
-       if (want_read)
-               m_Socket->Poll(true, false);
+                       rc = SSL_write(m_SSL.get(), buffer, count);
 
-       boost::mutex::scoped_lock alock(m_IOActionLock);
+                       if (rc > 0)
+                               m_SendQ->Read(NULL, rc);
 
-       while (left > 0) {
-               int rc, err;
+                       break;
+               case TlsActionHandshake:
+                       rc = SSL_do_handshake(m_SSL.get());
 
-               {
-                       boost::mutex::scoped_lock lock(m_SSLLock);
-                       rc = SSL_read(m_SSL.get(), ((char *)buffer) + (count - left), left);
+                       if (rc > 0) {
+                               m_HandshakeOK = true;
+                               m_CV.notify_all();
+                       }
 
-                       if (rc <= 0)
-                               err = SSL_get_error(m_SSL.get(), rc);
-               }
+                       break;
+               case TlsActionClose:
+                       rc = SSL_shutdown(m_SSL.get());
 
-               if (rc <= 0) {
-                       switch (err) {
-                               case SSL_ERROR_WANT_READ:
-                                       try {
-                                               m_Socket->Poll(true, false);
-                                       } catch (const std::exception&) {}
-                                       continue;
-                               case SSL_ERROR_WANT_WRITE:
-                                       try {
-                                               m_Socket->Poll(false, true);
-                                       } catch (const std::exception&) {}
-                                       continue;
-                               case SSL_ERROR_ZERO_RETURN:
-                                       CloseUnlocked();
-                                       return count - left;
-                               default:
-                                       if (ERR_peek_error() != 0) {
-                                               msgbuf << "SSL_read() failed with code " << ERR_peek_error() << ", \"" << ERR_error_string(ERR_peek_error(), errbuf) << "\"";
-                                               Log(LogCritical, "TlsStream", msgbuf.str());
-                                       }
-
-                                       BOOST_THROW_EXCEPTION(openssl_error()
-                                           << boost::errinfo_api_function("SSL_read")
-                                           << errinfo_openssl_error(ERR_peek_error()));
+                       if (rc > 0) {
+                               m_CloseOK = true;
+                               m_CV.notify_all();
                        }
+
+                       break;
+               default:
+                       VERIFY(!"Invalid TlsAction");
+       }
+
+       if (rc > 0) {
+               if (m_SendQ->GetAvailableBytes() > 0) {
+                       m_CurrentAction = TlsActionWrite;
+                       ChangeEvents(POLLOUT);
+               } else {
+                       m_CurrentAction = TlsActionNone;
+                       ChangeEvents(POLLIN);
                }
 
-               left -= rc;
+               return;
        }
 
-       return count;
-}
+       err = SSL_get_error(m_SSL.get(), rc);
 
-void TlsStream::Write(const void *buffer, size_t count)
-{
-       size_t left = count;
        std::ostringstream msgbuf;
        char errbuf[120];
 
-       m_Socket->Poll(false, true);
+       switch (err) {
+               case SSL_ERROR_WANT_READ:
+                       m_Retry = true;
+                       ChangeEvents(POLLIN);
 
-       boost::mutex::scoped_lock alock(m_IOActionLock);
+                       break;
+               case SSL_ERROR_WANT_WRITE:
+                       m_Retry = true;
+                       ChangeEvents(POLLOUT);
 
-       while (left > 0) {
-               int rc, err;
+                       break;
+               case SSL_ERROR_ZERO_RETURN:
+                       Unregister();
 
-               {
-                       boost::mutex::scoped_lock lock(m_SSLLock);
-                       rc = SSL_write(m_SSL.get(), ((const char *)buffer) + (count - left), left);
+                       m_SSL.reset();
+                       m_Socket->Close();
 
-                       if (rc <= 0)
-                               err = SSL_get_error(m_SSL.get(), rc);
-               }
+                       m_Eof = true;
 
-               if (rc <= 0) {
-                       switch (err) {
-                               case SSL_ERROR_WANT_READ:
-                                       try {
-                                               m_Socket->Poll(true, false);
-                                       } catch (const std::exception&) {}
-                                       continue;
-                               case SSL_ERROR_WANT_WRITE:
-                                       try {
-                                               m_Socket->Poll(false, true);
-                                       } catch (const std::exception&) {}
-                                       continue;
-                               case SSL_ERROR_ZERO_RETURN:
-                                       CloseUnlocked();
-                                       return;
-                               default:
-                                       if (ERR_peek_error() != 0) {
-                                               msgbuf << "SSL_write() failed with code " << ERR_peek_error() << ", \"" << ERR_error_string(ERR_peek_error(), errbuf) << "\"";
-                                               Log(LogCritical, "TlsStream", msgbuf.str());
-                                       }
-
-                                       BOOST_THROW_EXCEPTION(openssl_error()
-                                           << boost::errinfo_api_function("SSL_write")
-                                           << errinfo_openssl_error(ERR_peek_error()));
-                       }
-               }
+                       m_CV.notify_all();
+
+                       break;
+               default:
+                       Unregister();
 
-               left -= rc;
+                       m_SSL.reset();
+                       m_Socket->Close();
+
+                       m_ErrorCode = ERR_peek_error();
+                       m_ErrorOccurred = true;
+
+                       m_CV.notify_all();
+
+                       break;
        }
 }
 
-/**
- * Closes the stream.
- */
-void TlsStream::Close(void)
+void TlsStream::HandleError(void) const
+{
+       if (m_ErrorOccurred) {
+               BOOST_THROW_EXCEPTION(openssl_error()
+                   << boost::errinfo_api_function("TlsStream::OnEvent")
+                   << errinfo_openssl_error(m_ErrorCode));
+       }
+}
+
+void TlsStream::Handshake(void)
 {
-       boost::mutex::scoped_lock alock(m_IOActionLock);
+       boost::mutex::scoped_lock lock(m_Mutex);
+
+       m_CurrentAction = TlsActionHandshake;
+       ChangeEvents(POLLOUT);
 
-       CloseUnlocked();
+       while (!m_HandshakeOK && !m_ErrorOccurred)
+               m_CV.wait(lock);
+
+       HandleError();
 }
 
-void TlsStream::CloseUnlocked(void)
+/**
+ * Processes data for the stream.
+ */
+size_t TlsStream::Read(void *buffer, size_t count)
 {
-       m_Eof = true;
+       boost::mutex::scoped_lock lock(m_Mutex);
 
-       for (int i = 0; i < 5; i++) {
-               int rc, err;
+       while (m_RecvQ->GetAvailableBytes() < count && !m_ErrorOccurred && !m_Eof)
+               m_CV.wait(lock);
 
-               {
-                       boost::mutex::scoped_lock lock(m_SSLLock);
-                       rc = SSL_shutdown(m_SSL.get());
+       HandleError();
 
-                       if (rc == 0)
-                               continue;
+       return m_RecvQ->Read(buffer, count);
+}
 
-                       if (rc > 0)
-                               break;
+void TlsStream::Write(const void *buffer, size_t count)
+{
+       boost::mutex::scoped_lock lock(m_Mutex);
 
-                       err = SSL_get_error(m_SSL.get(), rc);
-               }
+       m_SendQ->Write(buffer, count);
 
-               switch (err) {
-                       case SSL_ERROR_WANT_READ:
-                               try {
-                                       m_Socket->Poll(true, false);
-                               } catch (const std::exception&) {}
-                               continue;
-                       case SSL_ERROR_WANT_WRITE:
-                               try {
-                                       m_Socket->Poll(false, true);
-                               } catch (const std::exception&) {}
-                               continue;
-                       default:
-                               goto close_socket;
-               }
-       }
+       ChangeEvents(POLLOUT);
+}
+
+/**
+ * Closes the stream.
+ */
+void TlsStream::Close(void)
+{
+       boost::mutex::scoped_lock lock(m_Mutex);
+       m_CurrentAction = TlsActionClose;
+       ChangeEvents(POLLOUT);
+
+       while (!m_CloseOK && !m_ErrorOccurred)
+               m_CV.wait(lock);
 
-close_socket:
-       m_Socket->Close();
+       HandleError();
 }
 
 bool TlsStream::IsEof(void) const
index e8ad5ce8b4633a957429fc711a1bca86a0297983..7fe2a05b843e3ab316b2949ed7a2c49b66beee5a 100644 (file)
 
 #include "base/i2-base.hpp"
 #include "base/socket.hpp"
+#include "base/socketevents.hpp"
 #include "base/stream.hpp"
 #include "base/tlsutility.hpp"
+#include "base/fifo.hpp"
 
 namespace icinga
 {
 
+enum TlsAction
+{
+       TlsActionNone,
+       TlsActionRead,
+       TlsActionWrite,
+       TlsActionHandshake,
+       TlsActionClose
+};
+
 /**
  * A TLS stream.
  *
  * @ingroup base
  */
-class I2_BASE_API TlsStream : public Stream
+class I2_BASE_API TlsStream : public Stream, private SocketEvents
 {
 public:
        DECLARE_PTR_TYPEDEFS(TlsStream);
@@ -57,17 +68,29 @@ public:
 private:
        boost::shared_ptr<SSL> m_SSL;
        bool m_Eof;
-       mutable boost::mutex m_SSLLock;
-       mutable boost::mutex m_IOActionLock;
+       mutable boost::mutex m_Mutex;
+       mutable boost::condition_variable m_CV;
+       bool m_HandshakeOK;
        bool m_VerifyOK;
+       bool m_CloseOK;
+       int m_ErrorCode;
+       bool m_ErrorOccurred;
 
        Socket::Ptr m_Socket;
        ConnectionRole m_Role;
 
+       FIFO::Ptr m_SendQ;
+       FIFO::Ptr m_RecvQ;
+
+       TlsAction m_CurrentAction;
+       bool m_Retry;
+
        static int m_SSLIndex;
        static bool m_SSLIndexInitialized;
 
-       void CloseUnlocked(void);
+       virtual void OnEvent(int revents);
+
+       void HandleError(void) const;
 
        static int ValidateCertificate(int preverify_ok, X509_STORE_CTX *ctx);
        static void NullCertificateDeleter(X509 *certificate);
index decd1ea9272b899950a29934ff17bd3292723f64..e199bb60c54f015296f1190f85e8cb8dae231e18 100644 (file)
@@ -164,6 +164,11 @@ int PkiUtility::SaveCert(const String& host, const String& port, const String& k
 
        boost::shared_ptr<X509> cert = stream->GetPeerCertificate();
 
+       if (!cert) {
+               Log(LogCritical, "cli", "Peer did not present a valid certificate.");
+               return 1;
+       }
+
        std::ofstream fpcert;
        fpcert.open(trustedfile.CStr());
        fpcert << CertificateToString(cert);
index feb8b95935d664a1b01860ec72552cfef8c551be..2905cb224cfe4a904d60009cc42585305c16208b 100644 (file)
@@ -123,7 +123,11 @@ void ApiClient::DisconnectSync(void)
                listener->RemoveAnonymousClient(this);
        }
 
-       m_Stream->Close();
+       try {
+               m_Stream->Close();
+       } catch (const std::exception&) {
+               /* Ignore the exception. */
+       }
 }
 
 bool ApiClient::ProcessMessage(void)
index 3cad589e881ce1a5ab54a232891cd537ffd4f853..37ffc768429a552a2627ede292c536e985009bd2 100644 (file)
@@ -24,3 +24,5 @@ endif()
 if(UNIX OR CYGWIN)
   add_subdirectory(execvpe)
 endif()
+
+add_subdirectory(socketpair)
diff --git a/third-party/socketpair/CMakeLists.txt b/third-party/socketpair/CMakeLists.txt
new file mode 100644 (file)
index 0000000..8a32822
--- /dev/null
@@ -0,0 +1,34 @@
+# Icinga 2
+# Copyright (C) 2012-2015 Icinga Development Team (http://www.icinga.org)
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License
+# as published by the Free Software Foundation; either version 2
+# of the License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software Foundation
+# Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA.
+
+add_library(socketpair SHARED socketpair.c socketpair.h)
+
+set_target_properties (
+  socketpair PROPERTIES
+  DEFINE_SYMBOL I2_SOCKETPAIR_BUILD
+)
+
+if(WIN32)
+  target_link_libraries(socketpair ws2_32)
+endif()
+
+install(
+  TARGETS socketpair
+  RUNTIME DESTINATION ${CMAKE_INSTALL_SBINDIR}
+  LIBRARY DESTINATION ${CMAKE_INSTALL_LIBDIR}/icinga2
+)
+
diff --git a/third-party/socketpair/socketpair.c b/third-party/socketpair/socketpair.c
new file mode 100644 (file)
index 0000000..b8b9f28
--- /dev/null
@@ -0,0 +1,154 @@
+/* socketpair.c
+Copyright 2007, 2010 by Nathan C. Myers <ncm@cantrip.org>
+Redistribution and use in source and binary forms, with or without modification,
+are permitted provided that the following conditions are met:
+
+    Redistributions of source code must retain the above copyright notice, this
+    list of conditions and the following disclaimer.
+
+    Redistributions in binary form must reproduce the above copyright notice,
+    this list of conditions and the following disclaimer in the documentation
+    and/or other materials provided with the distribution.
+
+    The name of the author must not be used to endorse or promote products
+    derived from this software without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
+FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
+SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
+CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
+OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+/* Changes:
+ * 2014-02-12: merge David Woodhouse, Ger Hobbelt improvements
+ *     git.infradead.org/users/dwmw2/openconnect.git/commitdiff/bdeefa54
+ *     github.com/GerHobbelt/selectable-socketpair
+ *   always init the socks[] to -1/INVALID_SOCKET on error, both on Win32/64
+ *   and UNIX/other platforms
+ * 2013-07-18: Change to BSD 3-clause license
+ * 2010-03-31:
+ *   set addr to 127.0.0.1 because win32 getsockname does not always set it.
+ * 2010-02-25:
+ *   set SO_REUSEADDR option to avoid leaking some windows resource.
+ *   Windows System Error 10049, "Event ID 4226 TCP/IP has reached
+ *   the security limit imposed on the number of concurrent TCP connect
+ *   attempts."  Bleah.
+ * 2007-04-25:
+ *   preserve value of WSAGetLastError() on all error returns.
+ * 2007-04-22:  (Thanks to Matthew Gregan <kinetik@flim.org>)
+ *   s/EINVAL/WSAEINVAL/ fix trivial compile failure
+ *   s/socket/WSASocket/ enable creation of sockets suitable as stdin/stdout
+ *     of a child process.
+ *   add argument make_overlapped
+ */
+
+#include <string.h>
+
+#ifdef WIN32
+# include <ws2tcpip.h>  /* socklen_t, et al (MSVC20xx) */
+# include <windows.h>
+# include <io.h>
+#else
+# include <sys/types.h>
+# include <sys/socket.h>
+# include <errno.h>
+#endif
+
+#include "socketpair.h"
+
+#ifdef WIN32
+
+/* dumb_socketpair:
+ *   If make_overlapped is nonzero, both sockets created will be usable for
+ *   "overlapped" operations via WSASend etc.  If make_overlapped is zero,
+ *   socks[0] (only) will be usable with regular ReadFile etc., and thus
+ *   suitable for use as stdin or stdout of a child process.  Note that the
+ *   sockets must be closed with closesocket() regardless.
+ */
+
+int dumb_socketpair(SOCKET socks[2], int make_overlapped)
+{
+    union {
+       struct sockaddr_in inaddr;
+       struct sockaddr addr;
+    } a;
+    SOCKET listener;
+    int e;
+    socklen_t addrlen = sizeof(a.inaddr);
+    DWORD flags = (make_overlapped ? WSA_FLAG_OVERLAPPED : 0);
+    int reuse = 1;
+
+    if (socks == 0) {
+      WSASetLastError(WSAEINVAL);
+      return SOCKET_ERROR;
+    }
+    socks[0] = socks[1] = -1;
+
+    listener = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
+    if (listener == -1)
+        return SOCKET_ERROR;
+
+    memset(&a, 0, sizeof(a));
+    a.inaddr.sin_family = AF_INET;
+    a.inaddr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
+    a.inaddr.sin_port = 0;
+
+    for (;;) {
+        if (setsockopt(listener, SOL_SOCKET, SO_REUSEADDR,
+               (char*) &reuse, (socklen_t) sizeof(reuse)) == -1)
+            break;
+        if  (bind(listener, &a.addr, sizeof(a.inaddr)) == SOCKET_ERROR)
+            break;
+
+        memset(&a, 0, sizeof(a));
+        if  (getsockname(listener, &a.addr, &addrlen) == SOCKET_ERROR)
+            break;
+        // win32 getsockname may only set the port number, p=0.0005.
+        // ( http://msdn.microsoft.com/library/ms738543.aspx ):
+        a.inaddr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
+        a.inaddr.sin_family = AF_INET;
+
+        if (listen(listener, 1) == SOCKET_ERROR)
+            break;
+
+        socks[0] = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, flags);
+        if (socks[0] == -1)
+            break;
+        if (connect(socks[0], &a.addr, sizeof(a.inaddr)) == SOCKET_ERROR)
+            break;
+
+        socks[1] = accept(listener, NULL, NULL);
+        if (socks[1] == -1)
+            break;
+
+        closesocket(listener);
+        return 0;
+    }
+
+    e = WSAGetLastError();
+    closesocket(listener);
+    closesocket(socks[0]);
+    closesocket(socks[1]);
+    WSASetLastError(e);
+    socks[0] = socks[1] = -1;
+    return SOCKET_ERROR;
+}
+#else
+int dumb_socketpair(int socks[2], int dummy)
+{
+    if (socks == 0) {
+        errno = EINVAL;
+        return -1;
+    }
+    dummy = socketpair(AF_LOCAL, SOCK_STREAM, 0, socks);
+    if (dummy)
+        socks[0] = socks[1] = -1;
+    return dummy;
+}
+#endif
diff --git a/third-party/socketpair/socketpair.h b/third-party/socketpair/socketpair.h
new file mode 100644 (file)
index 0000000..ffa2665
--- /dev/null
@@ -0,0 +1,37 @@
+/* socketpair.h
+ * Copyright 2007 by Nathan C. Myers <ncm@cantrip.org>; some rights reserved.
+ * This code is Free Software.  It may be copied freely, in original or
+ * modified form, subject only to the restrictions that (1) the author is
+ * relieved from all responsibilities for any use for any purpose, and (2)
+ * this copyright notice must be retained, unchanged, in its entirety.  If
+ * for any reason the author might be held responsible for any consequences
+ * of copying or use, license is withheld.
+ */
+
+#ifndef SOCKETPAIR_H
+#define SOCKETPAIR_H
+
+#include "base/visibility.hpp"
+
+#ifdef __cplusplus
+extern "C" {
+#endif /* __cplusplus */
+
+#ifdef I2_SOCKETPAIR_BUILD
+#       define I2_SOCKETPAIR_API I2_EXPORT
+#else
+#       define I2_SOCKETPAIR_API I2_IMPORT
+#endif /* I2_SOCKETPAIR_BUILD */
+
+#ifdef _WIN32
+I2_SOCKETPAIR_API int dumb_socketpair(SOCKET socks[2], int make_overlapped);
+#else /* _WIN32 */
+I2_SOCKETPAIR_API int dumb_socketpair(int socks[2], int dummy);
+#endif
+
+#ifdef __cplusplus
+}
+#endif /* __cplusplus */
+
+#endif /* SOCKETPAIR_H */
+