From 09f395a7de415054279e43a6ad028bc62de200a3 Mon Sep 17 00:00:00 2001 From: Gunnar Beutner Date: Thu, 4 Apr 2013 16:08:02 +0200 Subject: [PATCH] Refactor the socket subsystem. --- components/livestatus/Makefile.am | 2 - components/livestatus/component.cpp | 46 +- components/livestatus/component.h | 7 +- components/livestatus/connection.cpp | 57 -- lib/base/Makefile.am | 6 +- lib/base/bufferedstream.cpp | 133 +++++ .../bufferedstream.h} | 49 +- lib/base/dynamicobject.cpp | 2 - lib/base/fifo.cpp | 51 +- lib/base/fifo.h | 7 +- lib/base/netstring.cpp | 108 ++-- .../{connection.cpp => networkstream.cpp} | 44 +- lib/base/{connection.h => networkstream.h} | 35 +- lib/base/socket.cpp | 513 ++---------------- lib/base/socket.h | 56 +- lib/base/stdiostream.cpp | 61 +-- lib/base/stdiostream.h | 6 - lib/base/stream.cpp | 108 +--- lib/base/stream.h | 49 +- lib/base/stream_bio.cpp | 1 - lib/base/tcpsocket.cpp | 35 +- lib/base/tcpsocket.h | 4 +- lib/base/tlsstream.cpp | 259 +++------ lib/base/tlsstream.h | 17 +- lib/base/value.cpp | 2 +- lib/remoting/Makefile.am | 4 +- lib/remoting/endpoint.cpp | 109 ++-- lib/remoting/endpoint.h | 11 +- lib/remoting/endpointmanager.cpp | 76 ++- lib/remoting/endpointmanager.h | 5 +- .../{jsonrpcconnection.cpp => jsonrpc.cpp} | 51 +- .../connection.h => lib/remoting/jsonrpc.h | 33 +- lib/remoting/messagepart.h | 2 +- 33 files changed, 615 insertions(+), 1334 deletions(-) delete mode 100644 components/livestatus/connection.cpp create mode 100644 lib/base/bufferedstream.cpp rename lib/{remoting/jsonrpcconnection.h => base/bufferedstream.h} (63%) rename lib/base/{connection.cpp => networkstream.cpp} (59%) rename lib/base/{connection.h => networkstream.h} (75%) rename lib/remoting/{jsonrpcconnection.cpp => jsonrpc.cpp} (63%) rename components/livestatus/connection.h => lib/remoting/jsonrpc.h (76%) diff --git a/components/livestatus/Makefile.am b/components/livestatus/Makefile.am index 286aa1d64..d5eb8b177 100644 --- a/components/livestatus/Makefile.am +++ b/components/livestatus/Makefile.am @@ -22,8 +22,6 @@ liblivestatus_la_SOURCES = \ commentstable.h \ component.cpp \ component.h \ - connection.cpp \ - connection.h \ contactgroupstable.cpp \ contactgroupstable.h \ contactstable.cpp \ diff --git a/components/livestatus/component.cpp b/components/livestatus/component.cpp index c00dc50a1..fe30e165d 100644 --- a/components/livestatus/component.cpp +++ b/components/livestatus/component.cpp @@ -21,6 +21,7 @@ #include "base/dynamictype.h" #include "base/logger_fwd.h" #include "base/tcpsocket.h" +#include "base/networkstream.h" #include "base/application.h" #include @@ -48,10 +49,10 @@ void LivestatusComponent::Start(void) socket->Bind("6558", AF_INET); //#endif /* _WIN32 */ - socket->OnNewClient.connect(boost::bind(&LivestatusComponent::NewClientHandler, this, _2)); - socket->Listen(); - socket->Start(); m_Listener = socket; + + boost::thread thread(boost::bind(&LivestatusComponent::ServerThreadProc, this, socket)); + thread.detach(); } String LivestatusComponent::GetSocketPath(void) const @@ -63,21 +64,40 @@ String LivestatusComponent::GetSocketPath(void) const return socketPath; } -void LivestatusComponent::NewClientHandler(const Socket::Ptr& client) +void LivestatusComponent::ServerThreadProc(const Socket::Ptr& server) { - Log(LogInformation, "livestatus", "Client connected"); + server->Listen(); + + for (;;) { + Socket::Ptr client = server->Accept(); - LivestatusConnection::Ptr lconnection = boost::make_shared(client); - lconnection->OnClosed.connect(boost::bind(&LivestatusComponent::ClientClosedHandler, this, _1)); + Log(LogInformation, "livestatus", "Client connected"); - m_Connections.insert(lconnection); - client->Start(); + boost::thread thread(boost::bind(&LivestatusComponent::ClientThreadProc, this, client)); + thread.detach(); + } } -void LivestatusComponent::ClientClosedHandler(const Connection::Ptr& connection) +void LivestatusComponent::ClientThreadProc(const Socket::Ptr& client) { - LivestatusConnection::Ptr lconnection = static_pointer_cast(connection); + Stream::Ptr stream = boost::make_shared(client); + + for (;;) { + String line; + bool read_line = false; + + std::vector lines; + + while (stream->ReadLine(&line)) { + read_line = true; + + if (line.GetLength() > 0) + lines.push_back(line); + else + break; + } - Log(LogInformation, "livestatus", "Client disconnected"); - m_Connections.erase(lconnection); + Query::Ptr query = boost::make_shared(lines); + query->Execute(stream); + } } diff --git a/components/livestatus/component.h b/components/livestatus/component.h index 9ec3381dd..e197f656a 100644 --- a/components/livestatus/component.h +++ b/components/livestatus/component.h @@ -20,7 +20,7 @@ #ifndef LIVESTATUSCOMPONENT_H #define LIVESTATUSCOMPONENT_H -#include "livestatus/connection.h" +#include "livestatus/query.h" #include "base/dynamicobject.h" #include "base/socket.h" @@ -45,10 +45,9 @@ private: Attribute m_SocketPath; Socket::Ptr m_Listener; - std::set m_Connections; - void NewClientHandler(const Socket::Ptr& client); - void ClientClosedHandler(const Connection::Ptr& connection); + void ServerThreadProc(const Socket::Ptr& server); + void ClientThreadProc(const Socket::Ptr& client); }; } diff --git a/components/livestatus/connection.cpp b/components/livestatus/connection.cpp deleted file mode 100644 index e22c22111..000000000 --- a/components/livestatus/connection.cpp +++ /dev/null @@ -1,57 +0,0 @@ -/****************************************************************************** - * Icinga 2 * - * Copyright (C) 2012 Icinga Development Team (http://www.icinga.org/) * - * * - * This program is free software; you can redistribute it and/or * - * modify it under the terms of the GNU General Public License * - * as published by the Free Software Foundation; either version 2 * - * of the License, or (at your option) any later version. * - * * - * This program is distributed in the hope that it will be useful, * - * but WITHOUT ANY WARRANTY; without even the implied warranty of * - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * - * GNU General Public License for more details. * - * * - * You should have received a copy of the GNU General Public License * - * along with this program; if not, write to the Free Software Foundation * - * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. * - ******************************************************************************/ - -#include "livestatus/connection.h" -#include "livestatus/query.h" -#include - -using namespace icinga; -using namespace livestatus; - -LivestatusConnection::LivestatusConnection(const Stream::Ptr& stream) - : Connection(stream) -{ } - -void LivestatusConnection::ProcessData(void) -{ - String line; - bool read_line = false; - - while (GetStream()->ReadLine(&line)) { - read_line = true; - - if (line.GetLength() > 0) - m_Lines.push_back(line); - else - break; - } - - /* Return if we didn't at least read one line. */ - if (!read_line) - return; - - /* Return if we haven't found the end of the query. */ - if (line.GetLength() > 0 && !GetStream()->IsReadEOF()) - return; - - Query::Ptr query = boost::make_shared(m_Lines); - m_Lines.clear(); - - query->Execute(GetStream()); -} diff --git a/lib/base/Makefile.am b/lib/base/Makefile.am index 991b177b8..86a8d8468 100644 --- a/lib/base/Makefile.am +++ b/lib/base/Makefile.am @@ -14,8 +14,8 @@ libbase_la_SOURCES = \ array.h \ attribute.cpp \ attribute.h \ - connection.cpp \ - connection.h \ + bufferedstream.cpp \ + bufferedstream.h \ convert.cpp \ convert.h \ dictionary.cpp \ @@ -34,6 +34,8 @@ libbase_la_SOURCES = \ logger_fwd.h \ netstring.cpp \ netstring.h \ + networkstream.cpp \ + networkstream.h \ object.cpp \ object.h \ objectlock.cpp \ diff --git a/lib/base/bufferedstream.cpp b/lib/base/bufferedstream.cpp new file mode 100644 index 000000000..6f3db1374 --- /dev/null +++ b/lib/base/bufferedstream.cpp @@ -0,0 +1,133 @@ +/****************************************************************************** + * Icinga 2 * + * Copyright (C) 2012 Icinga Development Team (http://www.icinga.org/) * + * * + * This program is free software; you can redistribute it and/or * + * modify it under the terms of the GNU General Public License * + * as published by the Free Software Foundation; either version 2 * + * of the License, or (at your option) any later version. * + * * + * This program is distributed in the hope that it will be useful, * + * but WITHOUT ANY WARRANTY; without even the implied warranty of * + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * + * GNU General Public License for more details. * + * * + * You should have received a copy of the GNU General Public License * + * along with this program; if not, write to the Free Software Foundation * + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. * + ******************************************************************************/ + +#include "base/bufferedstream.h" +#include "base/objectlock.h" +#include "base/utility.h" +#include "base/logger_fwd.h" +#include +#include + +using namespace icinga; + +BufferedStream::BufferedStream(const Stream::Ptr& innerStream) + : m_InnerStream(innerStream), m_RecvQ(boost::make_shared()), m_SendQ(boost::make_shared()) +{ + boost::thread readThread(boost::bind(&BufferedStream::ReadThreadProc, this)); + readThread.detach(); + + boost::thread writeThread(boost::bind(&BufferedStream::WriteThreadProc, this)); + writeThread.detach(); +} + +void BufferedStream::ReadThreadProc(void) +{ + char buffer[512]; + + try { + for (;;) { + size_t rc = m_InnerStream->Read(buffer, sizeof(buffer)); + + if (rc == 0) + break; + + boost::mutex::scoped_lock lock(m_Mutex); + m_RecvQ->Write(buffer, rc); + m_ReadCV.notify_all(); + } + } catch (const std::exception& ex) { + std::ostringstream msgbuf; + msgbuf << "Error for buffered stream (Read): " << boost::diagnostic_information(ex); + Log(LogWarning, "base", msgbuf.str()); + + Close(); + } +} + +void BufferedStream::WriteThreadProc(void) +{ + char buffer[512]; + + try { + for (;;) { + size_t rc; + + { + boost::mutex::scoped_lock lock(m_Mutex); + + while (m_SendQ->GetAvailableBytes() == 0) + m_WriteCV.wait(lock); + + rc = m_SendQ->Read(buffer, sizeof(buffer)); + } + + m_InnerStream->Write(buffer, rc); + } + } catch (const std::exception& ex) { + std::ostringstream msgbuf; + msgbuf << "Error for buffered stream (Write): " << boost::diagnostic_information(ex); + Log(LogWarning, "base", msgbuf.str()); + + Close(); + } +} + +void BufferedStream::Close(void) +{ + m_InnerStream->Close(); +} + +/** + * Reads data from the stream. + * + * @param buffer The buffer where data should be stored. May be NULL if you're + * not actually interested in the data. + * @param count The number of bytes to read from the queue. + * @returns The number of bytes actually read. + */ +size_t BufferedStream::Read(void *buffer, size_t count) +{ + boost::mutex::scoped_lock lock(m_Mutex); + return m_RecvQ->Read(buffer, count); +} + +/** + * Writes data to the stream. + * + * @param buffer The data that is to be written. + * @param count The number of bytes to write. + * @returns The number of bytes written + */ +void BufferedStream::Write(const void *buffer, size_t count) +{ + boost::mutex::scoped_lock lock(m_Mutex); + m_SendQ->Write(buffer, count); + m_WriteCV.notify_all(); +} + +void BufferedStream::WaitReadable(size_t count) +{ + boost::mutex::scoped_lock lock(m_Mutex); + + while (m_RecvQ->GetAvailableBytes() < count) + m_ReadCV.wait(lock); +} + +void BufferedStream::WaitWritable(size_t count) +{ /* Nothing to do here. */ } diff --git a/lib/remoting/jsonrpcconnection.h b/lib/base/bufferedstream.h similarity index 63% rename from lib/remoting/jsonrpcconnection.h rename to lib/base/bufferedstream.h index 3aaaa247e..d747b93cd 100644 --- a/lib/remoting/jsonrpcconnection.h +++ b/lib/base/bufferedstream.h @@ -17,38 +17,53 @@ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. * ******************************************************************************/ -#ifndef JSONRPCCONNECTION_H -#define JSONRPCCONNECTION_H +#ifndef BUFFEREDSTREAM_H +#define BUFFEREDSTREAM_H -#include "remoting/i2-remoting.h" -#include "remoting/messagepart.h" -#include "base/connection.h" -#include +#include "base/i2-base.h" +#include "base/stream.h" +#include "base/fifo.h" namespace icinga { /** - * A JSON-RPC connection. + * A buffered stream. * - * @ingroup remoting + * @ingroup base */ -class I2_REMOTING_API JsonRpcConnection : public Connection +class I2_BASE_API BufferedStream : public Stream { public: - typedef shared_ptr Ptr; - typedef weak_ptr WeakPtr; + typedef shared_ptr Ptr; + typedef weak_ptr WeakPtr; - explicit JsonRpcConnection(const Stream::Ptr& stream); + BufferedStream(const Stream::Ptr& innerStream); - void SendMessage(const MessagePart& message); + virtual size_t Read(void *buffer, size_t count); + virtual void Write(const void *buffer, size_t count); - boost::signals2::signal OnNewMessage; + virtual void Close(void); -protected: - virtual void ProcessData(void); + void WaitReadable(size_t count); + void WaitWritable(size_t count); + +private: + Stream::Ptr m_InnerStream; + + FIFO::Ptr m_RecvQ; + FIFO::Ptr m_SendQ; + + boost::exception_ptr m_Exception; + + boost::mutex m_Mutex; + boost::condition_variable m_ReadCV; + boost::condition_variable m_WriteCV; + + void ReadThreadProc(void); + void WriteThreadProc(void); }; } -#endif /* JSONRPCCONNECTION_H */ +#endif /* BUFFEREDSTREAM_H */ diff --git a/lib/base/dynamicobject.cpp b/lib/base/dynamicobject.cpp index b32d0a429..fe1873d76 100644 --- a/lib/base/dynamicobject.cpp +++ b/lib/base/dynamicobject.cpp @@ -446,7 +446,6 @@ void DynamicObject::DumpObjects(const String& filename) BOOST_THROW_EXCEPTION(std::runtime_error("Could not open '" + filename + "' file")); StdioStream::Ptr sfp = boost::make_shared(&fp, false); - sfp->Start(); BOOST_FOREACH(const DynamicType::Ptr& type, DynamicType::GetTypes()) { BOOST_FOREACH(const DynamicObject::Ptr& object, type->GetObjects()) { @@ -503,7 +502,6 @@ void DynamicObject::RestoreObjects(const String& filename) fp.open(filename.CStr(), std::ios_base::in); StdioStream::Ptr sfp = boost::make_shared(&fp, false); - sfp->Start(); unsigned long restored = 0; diff --git a/lib/base/fifo.cpp b/lib/base/fifo.cpp index f352e5c5f..d0f6bbc8b 100644 --- a/lib/base/fifo.cpp +++ b/lib/base/fifo.cpp @@ -37,13 +37,6 @@ FIFO::~FIFO(void) free(m_Buffer); } -void FIFO::Start(void) -{ - SetConnected(true); - - Stream::Start(); -} - /** * Resizes the FIFO's buffer so that it is at least newSize bytes long. * @@ -95,46 +88,16 @@ void FIFO::Optimize(void) } /** - * Implements IOQueue::GetAvailableBytes(). - */ -size_t FIFO::GetAvailableBytes(void) const -{ - return m_DataSize; -} - -/** - * Returns a pointer to the start of the read buffer. - * - * @returns Pointer to the read buffer. - */ -/*const void *FIFO::GetReadBuffer(void) const -{ - return m_Buffer + m_Offset; -}*/ - -/** - * Implements IOQueue::Peek. + * Implements IOQueue::Read. */ -size_t FIFO::Peek(void *buffer, size_t count) +size_t FIFO::Read(void *buffer, size_t count) { - ASSERT(IsConnected()); - if (count > m_DataSize) count = m_DataSize; if (buffer != NULL) memcpy(buffer, m_Buffer + m_Offset, count); - return count; -} - -/** - * Implements IOQueue::Read. - */ -size_t FIFO::Read(void *buffer, size_t count) -{ - count = Peek(buffer, count); - m_DataSize -= count; m_Offset += count; @@ -148,9 +111,15 @@ size_t FIFO::Read(void *buffer, size_t count) */ void FIFO::Write(const void *buffer, size_t count) { - ASSERT(IsConnected()); - ResizeBuffer(m_Offset + m_DataSize + count); memcpy(m_Buffer + m_Offset + m_DataSize, buffer, count); m_DataSize += count; } + +void FIFO::Close(void) +{ } + +size_t FIFO::GetAvailableBytes(void) const +{ + return m_DataSize; +} diff --git a/lib/base/fifo.h b/lib/base/fifo.h index 6a701e24d..4c41a9c89 100644 --- a/lib/base/fifo.h +++ b/lib/base/fifo.h @@ -42,12 +42,11 @@ public: FIFO(void); ~FIFO(void); - void Start(void); + virtual size_t Read(void *buffer, size_t count); + virtual void Write(const void *buffer, size_t count); + virtual void Close(void); size_t GetAvailableBytes(void) const; - size_t Peek(void *buffer, size_t count); - size_t Read(void *buffer, size_t count); - void Write(const void *buffer, size_t count); private: char *m_Buffer; diff --git a/lib/base/netstring.cpp b/lib/base/netstring.cpp index f1332cf2f..ba6eab5f9 100644 --- a/lib/base/netstring.cpp +++ b/lib/base/netstring.cpp @@ -18,107 +18,117 @@ ******************************************************************************/ #include "base/netstring.h" +#include "base/utility.h" #include using namespace icinga; /** - * Reads data from a stream in netString format. + * Reads data from a stream in netstring format. * * @param stream The stream to read from. * @param[out] str The String that has been read from the IOQueue. * @returns true if a complete String was read from the IOQueue, false otherwise. * @exception invalid_argument The input stream is invalid. - * @see https://github.com/PeterScott/netString-c/blob/master/netString.c + * @see https://github.com/PeterScott/netstring-c/blob/master/netstring.c */ bool NetString::ReadStringFromStream(const Stream::Ptr& stream, String *str) { /* 16 bytes are enough for the header */ - size_t peek_length, buffer_length = 16; - char *buffer = static_cast(malloc(buffer_length)); + const size_t header_length = 16; + size_t read_length; + char *header = static_cast(malloc(header_length)); - if (buffer == NULL) + if (header == NULL) BOOST_THROW_EXCEPTION(std::bad_alloc()); - peek_length = stream->Peek(buffer, buffer_length); + read_length = 0; - /* minimum netString length is 3 */ - if (peek_length < 3) { - free(buffer); - return false; + while (read_length < header_length) { + /* Read one byte. */ + int rc = stream->Read(header + read_length, 1); + + if (rc == 0) { + if (read_length == 0) + return false; + + BOOST_THROW_EXCEPTION(std::runtime_error("Read() failed.")); + } + + ASSERT(rc == 1); + + read_length++; + + if (header[read_length - 1] == ':') { + break; + } else if (header_length == read_length) { + free(header); + BOOST_THROW_EXCEPTION(std::invalid_argument("Invalid NetString (missing :)")); + } + } + + /* minimum netstring length is 3 */ + if (read_length < 3) { + free(header); + BOOST_THROW_EXCEPTION(std::invalid_argument("Invalid NetString (short header)")); } /* no leading zeros allowed */ - if (buffer[0] == '0' && isdigit(buffer[1])) { - free(buffer); - BOOST_THROW_EXCEPTION(std::invalid_argument("Invalid netString (leading zero)")); + if (header[0] == '0' && isdigit(header[1])) { + free(header); + BOOST_THROW_EXCEPTION(std::invalid_argument("Invalid NetString (leading zero)")); } size_t len, i; len = 0; - for (i = 0; i < peek_length && isdigit(buffer[i]); i++) { + for (i = 0; i < read_length && isdigit(header[i]); i++) { /* length specifier must have at most 9 characters */ if (i >= 9) { - free(buffer); + free(header); BOOST_THROW_EXCEPTION(std::invalid_argument("Length specifier must not exceed 9 characters")); } - len = len * 10 + (buffer[i] - '0'); + len = len * 10 + (header[i] - '0'); } + free(header); + /* read the whole message */ - buffer_length = i + 1 + len + 1; + size_t data_length = len + 1; - char *new_buffer = static_cast(realloc(buffer, buffer_length)); + char *data = static_cast(malloc(data_length)); - if (new_buffer == NULL) { - free(buffer); + if (data == NULL) { BOOST_THROW_EXCEPTION(std::bad_alloc()); } - buffer = new_buffer; - - peek_length = stream->Peek(buffer, buffer_length); - - if (peek_length < buffer_length) - return false; - - /* check for the colon delimiter */ - if (buffer[i] != ':') { - free(buffer); - BOOST_THROW_EXCEPTION(std::invalid_argument("Invalid NetString (missing :)")); - } + size_t rc = stream->Read(data, data_length); + + if (rc != data_length) + BOOST_THROW_EXCEPTION(std::runtime_error("Read() failed.")); - /* check for the comma delimiter after the String */ - if (buffer[i + 1 + len] != ',') { - free(buffer); + if (data[len] != ',') BOOST_THROW_EXCEPTION(std::invalid_argument("Invalid NetString (missing ,)")); - } - - *str = String(&buffer[i + 1], &buffer[i + 1 + len]); - - free(buffer); + + *str = String(&data[0], &data[len]); - /* remove the data from the stream */ - stream->Read(NULL, peek_length); + free(data); return true; } /** - * Writes data into a stream using the netString format. + * Writes data into a stream using the netstring format. * * @param stream The stream. * @param str The String that is to be written. */ void NetString::WriteStringToStream(const Stream::Ptr& stream, const String& str) { - std::ostringstream prefixbuf; - prefixbuf << str.GetLength() << ":"; + std::ostringstream msgbuf; + msgbuf << str.GetLength() << ":" << str << ","; - String prefix = prefixbuf.str(); - stream->Write(prefix.CStr(), prefix.GetLength()); - stream->Write(str.CStr(), str.GetLength()); - stream->Write(",", 1); + String msg = msgbuf.str(); + stream->Write(msg.CStr(), msg.GetLength()); } diff --git a/lib/base/connection.cpp b/lib/base/networkstream.cpp similarity index 59% rename from lib/base/connection.cpp rename to lib/base/networkstream.cpp index 91cca228e..9bc83a5ec 100644 --- a/lib/base/connection.cpp +++ b/lib/base/networkstream.cpp @@ -17,29 +17,45 @@ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. * ******************************************************************************/ -#include "base/connection.h" -#include +#include "base/networkstream.h" +#include "base/objectlock.h" +#include "base/utility.h" +#include using namespace icinga; -Connection::Connection(const Stream::Ptr& stream) - : m_Stream(stream) -{ - m_Stream->OnDataAvailable.connect(boost::bind(&Connection::ProcessData, this)); - m_Stream->OnClosed.connect(boost::bind(&Connection::ClosedHandler, this)); -} +NetworkStream::NetworkStream(const Socket::Ptr& socket) + : m_Socket(socket) +{ } -Stream::Ptr Connection::GetStream(void) const +void NetworkStream::Close(void) { - return m_Stream; + m_Socket->Close(); } -void Connection::ClosedHandler(void) +/** + * Reads data from the stream. + * + * @param buffer The buffer where data should be stored. May be NULL if you're + * not actually interested in the data. + * @param count The number of bytes to read from the queue. + * @returns The number of bytes actually read. + */ +size_t NetworkStream::Read(void *buffer, size_t count) { - OnClosed(GetSelf()); + return m_Socket->Read(buffer, count); } -void Connection::Close(void) +/** + * Writes data to the stream. + * + * @param buffer The data that is to be written. + * @param count The number of bytes to write. + * @returns The number of bytes written + */ +void NetworkStream::Write(const void *buffer, size_t count) { - m_Stream->Close(); + size_t rc = m_Socket->Write(buffer, count); + if (rc < count) + BOOST_THROW_EXCEPTION(std::runtime_error("Short write for socket.")); } diff --git a/lib/base/connection.h b/lib/base/networkstream.h similarity index 75% rename from lib/base/connection.h rename to lib/base/networkstream.h index 4b81d41f6..cff765b48 100644 --- a/lib/base/connection.h +++ b/lib/base/networkstream.h @@ -17,39 +17,38 @@ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. * ******************************************************************************/ -#ifndef CONNECTION_H -#define CONNECTION_H +#ifndef NETWORKSTREAM_H +#define NETWORKSTREAM_H #include "base/i2-base.h" #include "base/stream.h" -#include +#include "base/socket.h" namespace icinga { -class I2_BASE_API Connection : public Object +/** + * A network stream. + * + * @ingroup base + */ +class I2_BASE_API NetworkStream : public Stream { public: - typedef shared_ptr Ptr; - typedef weak_ptr WeakPtr; + typedef shared_ptr Ptr; + typedef weak_ptr WeakPtr; - explicit Connection(const Stream::Ptr& stream); + NetworkStream(const Socket::Ptr& socket); - Stream::Ptr GetStream(void) const; + virtual size_t Read(void *buffer, size_t count); + virtual void Write(const void *buffer, size_t count); - void Close(void); - - boost::signals2::signal OnClosed; - -protected: - virtual void ProcessData(void) = 0; + virtual void Close(void); private: - Stream::Ptr m_Stream; - - void ClosedHandler(void); + Socket::Ptr m_Socket; }; } -#endif /* CONNECTION_H */ +#endif /* NETWORKSTREAM_H */ diff --git a/lib/base/socket.cpp b/lib/base/socket.cpp index 249bcdcc2..2672af43d 100644 --- a/lib/base/socket.cpp +++ b/lib/base/socket.cpp @@ -34,40 +34,24 @@ using namespace icinga; * Constructor for the Socket class. */ Socket::Socket(void) - : m_FD(INVALID_SOCKET), m_Connected(false), m_Listening(false), - m_SendQueue(boost::make_shared()), m_RecvQueue(boost::make_shared()) -{ - m_SendQueue->Start(); - m_RecvQueue->Start(); -} + : m_FD(INVALID_SOCKET) +{ } /** - * Destructor for the Socket class. + * Constructor for the Socket class. */ -Socket::~Socket(void) +Socket::Socket(SOCKET fd) + : m_FD(INVALID_SOCKET) { - m_SendQueue->Close(); - m_RecvQueue->Close(); - - CloseInternal(); + SetFD(fd); } /** - * Starts I/O processing for this socket. + * Destructor for the Socket class. */ -void Socket::Start(void) +Socket::~Socket(void) { - ASSERT(!m_ReadThread.joinable() && !m_WriteThread.joinable()); - ASSERT(GetFD() != INVALID_SOCKET); - - // TODO: figure out why we're not using "this" here (hint: to keep the object alive until the threads are done) - m_ReadThread = boost::thread(boost::bind(&Socket::ReadThreadProc, static_cast(GetSelf()))); - m_ReadThread.detach(); - - m_WriteThread = boost::thread(boost::bind(&Socket::WriteThreadProc, static_cast(GetSelf()))); - m_WriteThread.detach(); - - Stream::Start(); + Close(); } /** @@ -77,16 +61,14 @@ void Socket::Start(void) */ void Socket::SetFD(SOCKET fd) { - ObjectLock olock(this); - - /* mark the socket as non-blocking and close-on-exec */ if (fd != INVALID_SOCKET) { - Utility::SetNonBlockingSocket(fd); + /* mark the socket as close-on-exec */ #ifndef _WIN32 Utility::SetCloExec(fd); #endif /* _WIN32 */ } + ObjectLock olock(this); m_FD = fd; } @@ -105,29 +87,16 @@ SOCKET Socket::GetFD(void) const /** * Closes the socket. */ -void Socket::CloseInternal(void) +void Socket::Close(void) { - { - ObjectLock olock(this); + ObjectLock olock(this); - if (m_FD != INVALID_SOCKET) { - closesocket(m_FD); - m_FD = INVALID_SOCKET; - } + if (m_FD != INVALID_SOCKET) { + closesocket(m_FD); + m_FD = INVALID_SOCKET; } - - Stream::Close(); -} - -/** - * Shuts down the socket. - */ -void Socket::Close(void) -{ - SetWriteEOF(true); } - /** * Retrieves the last error that occured for the socket. * @@ -146,24 +115,6 @@ int Socket::GetError(void) const return 0; } -/** - * Processes errors that have occured for the socket. - */ -void Socket::HandleException(void) -{ - ObjectLock olock(this); - -#ifndef _WIN32 - BOOST_THROW_EXCEPTION(socket_error() - << boost::errinfo_api_function("select") - << boost::errinfo_errno(GetError())); -#else /* _WIN32 */ - BOOST_THROW_EXCEPTION(socket_error() - << boost::errinfo_api_function("select") - << errinfo_win32_error(GetError())); -#endif /* _WIN32 */ -} - /** * Formats a sockaddr in a human-readable way. * @@ -246,235 +197,6 @@ String Socket::GetPeerAddress(void) return GetAddressFromSockaddr((sockaddr *)&sin, len); } -/** - * Read thread procedure for sockets. This function waits until the - * socket is readable and processes inbound data. - */ -void Socket::ReadThreadProc(void) -{ - boost::mutex::scoped_lock lock(m_SocketMutex); - - for (;;) { - fd_set readfds, exceptfds; - - FD_ZERO(&readfds); - FD_ZERO(&exceptfds); - - int fd = GetFD(); - - if (fd == INVALID_SOCKET) - return; - - if (WantsToRead()) - FD_SET(fd, &readfds); - - FD_SET(fd, &exceptfds); - - lock.unlock(); - - timeval tv; - tv.tv_sec = 5; - tv.tv_usec = 0; - int rc = select(fd + 1, &readfds, NULL, &exceptfds, &tv); - - lock.lock(); - - if (GetFD() == INVALID_SOCKET) - return; - - try { - if (rc < 0) { -#ifndef _WIN32 - BOOST_THROW_EXCEPTION(socket_error() - << boost::errinfo_api_function("select") - << boost::errinfo_errno(errno)); -#else /* _WIN32 */ - BOOST_THROW_EXCEPTION(socket_error() - << boost::errinfo_api_function("select") - << errinfo_win32_error(WSAGetLastError())); -#endif /* _WIN32 */ - } - - if (FD_ISSET(fd, &readfds)) - HandleReadable(); - - if (FD_ISSET(fd, &exceptfds)) - HandleException(); - } catch (...) { - SetException(boost::current_exception()); - - CloseInternal(); - - break; - } - - if (WantsToWrite()) - m_WriteCV.notify_all(); /* notify Write thread */ - } -} - -/** - * Write thread procedure for sockets. This function waits until the socket - * is writable and processes outbound data. - */ -void Socket::WriteThreadProc(void) -{ - boost::mutex::scoped_lock lock(m_SocketMutex); - - for (;;) { - fd_set writefds; - - FD_ZERO(&writefds); - - while (!WantsToWrite()) { - m_WriteCV.timed_wait(lock, boost::posix_time::seconds(1)); - - if (GetFD() == INVALID_SOCKET) - return; - } - - int fd = GetFD(); - - if (fd == INVALID_SOCKET) - return; - - FD_SET(fd, &writefds); - - lock.unlock(); - - int rc = select(fd + 1, NULL, &writefds, NULL, NULL); - - lock.lock(); - - if (GetFD() == INVALID_SOCKET) - return; - - try { - if (rc < 0) { -#ifndef _WIN32 - BOOST_THROW_EXCEPTION(socket_error() - << boost::errinfo_api_function("select") - << boost::errinfo_errno(errno)); -#else /* _WIN32 */ - BOOST_THROW_EXCEPTION(socket_error() - << boost::errinfo_api_function("select") - << errinfo_win32_error(WSAGetLastError())); -#endif /* _WIN32 */ - } - - if (FD_ISSET(fd, &writefds)) - HandleWritable(); - } catch (...) { - SetException(boost::current_exception()); - - CloseInternal(); - - break; - } - } -} - -/** - * Sets whether the socket is fully connected. - * - * @param connected Whether the socket is connected - */ -void Socket::SetConnected(bool connected) -{ - ObjectLock olock(this); - - m_Connected = connected; -} - -/** - * Checks whether the socket is fully connected. - * - * @returns true if the socket is connected, false otherwise - */ -bool Socket::IsConnected(void) const -{ - ObjectLock olock(this); - - return m_Connected; -} - -/** - * Returns how much data is available for reading. - * - * @returns The number of bytes available. - */ -size_t Socket::GetAvailableBytes(void) const -{ - ObjectLock olock(this); - - if (m_Listening) - throw new std::logic_error("Socket does not support GetAvailableBytes()."); - - return m_RecvQueue->GetAvailableBytes(); -} - -/** - * Reads data from the socket. - * - * @param buffer The buffer where the data should be stored. - * @param size The size of the buffer. - * @returns The number of bytes read. - */ -size_t Socket::Read(void *buffer, size_t size) -{ - { - ObjectLock olock(this); - - if (m_Listening) - throw new std::logic_error("Socket does not support Read()."); - } - - if (m_RecvQueue->GetAvailableBytes() == 0) - CheckException(); - - return m_RecvQueue->Read(buffer, size); -} - -/** - * Peeks at data for the socket. - * - * @param buffer The buffer where the data should be stored. - * @param size The size of the buffer. - * @returns The number of bytes read. - */ -size_t Socket::Peek(void *buffer, size_t size) -{ - { - ObjectLock olock(this); - - if (m_Listening) - throw new std::logic_error("Socket does not support Peek()."); - } - - if (m_RecvQueue->GetAvailableBytes() == 0) - CheckException(); - - return m_RecvQueue->Peek(buffer, size); -} - -/** - * Writes data to the socket. - * - * @param buffer The buffer that should be sent. - * @param size The size of the buffer. - */ -void Socket::Write(const void *buffer, size_t size) -{ - { - ObjectLock olock(this); - - if (m_Listening) - throw new std::logic_error("Socket does not support Write()."); - } - - m_SendQueue->Write(buffer, size); -} - /** * Starts listening for incoming client connections. */ @@ -491,134 +213,56 @@ void Socket::Listen(void) << errinfo_win32_error(WSAGetLastError())); #endif /* _WIN32 */ } - - { - ObjectLock olock(this); - m_Listening = true; - } -} - -void Socket::HandleWritable(void) -{ - if (m_Listening) - HandleWritableServer(); - else - HandleWritableClient(); -} - -void Socket::HandleReadable(void) -{ - if (m_Listening) - HandleReadableServer(); - else - HandleReadableClient(); } /** * Processes data that is available for this socket. */ -void Socket::HandleWritableClient(void) +size_t Socket::Write(const void *buffer, size_t count) { - int rc; - char data[1024]; - size_t count; - - if (!IsConnected()) - SetConnected(true); - - for (;;) { - count = m_SendQueue->Peek(data, sizeof(data)); - - if (count == 0) { - if (IsWriteEOF()) - CloseInternal(); + int rc = send(GetFD(), buffer, count, 0); - break; - } - - rc = send(GetFD(), data, count, 0); - -#ifdef _WIN32 - if (rc < 0 && WSAGetLastError() == WSAEWOULDBLOCK) -#else /* _WIN32 */ - if (rc < 0 && errno == EAGAIN) -#endif /* _WIN32 */ - break; - - if (rc <= 0) { + if (rc < 0) { #ifndef _WIN32 - BOOST_THROW_EXCEPTION(socket_error() - << boost::errinfo_api_function("send") - << boost::errinfo_errno(errno)); + BOOST_THROW_EXCEPTION(socket_error() + << boost::errinfo_api_function("send") + << boost::errinfo_errno(errno)); #else /* _WIN32 */ - BOOST_THROW_EXCEPTION(socket_error() - << boost::errinfo_api_function("send") - << errinfo_win32_error(WSAGetLastError())); + BOOST_THROW_EXCEPTION(socket_error() + << boost::errinfo_api_function("send") + << errinfo_win32_error(WSAGetLastError())); #endif /* _WIN32 */ - } - - m_SendQueue->Read(NULL, rc); } + + return rc; } /** * Processes data that can be written for this socket. */ -void Socket::HandleReadableClient(void) +size_t Socket::Read(void *buffer, size_t count) { - if (!IsConnected()) - SetConnected(true); - - bool new_data = false; - - for (;;) { - char data[1024]; - int rc = recv(GetFD(), data, sizeof(data), 0); + int rc = recv(GetFD(), buffer, count, 0); -#ifdef _WIN32 - if (rc < 0 && WSAGetLastError() == WSAEWOULDBLOCK) -#else /* _WIN32 */ - if (rc < 0 && errno == EAGAIN) -#endif /* _WIN32 */ - break; - - if (rc < 0) { + if (rc < 0) { #ifndef _WIN32 - BOOST_THROW_EXCEPTION(socket_error() - << boost::errinfo_api_function("recv") - << boost::errinfo_errno(errno)); + BOOST_THROW_EXCEPTION(socket_error() + << boost::errinfo_api_function("recv") + << boost::errinfo_errno(errno)); #else /* _WIN32 */ - BOOST_THROW_EXCEPTION(socket_error() - << boost::errinfo_api_function("recv") - << errinfo_win32_error(WSAGetLastError())); + BOOST_THROW_EXCEPTION(socket_error() + << boost::errinfo_api_function("recv") + << errinfo_win32_error(WSAGetLastError())); #endif /* _WIN32 */ - } - - new_data = true; - - if (rc == 0) { - SetReadEOF(true); - - break; - } - - m_RecvQueue->Write(data, rc); } - if (new_data) - OnDataAvailable(GetSelf()); -} - -void Socket::HandleWritableServer(void) -{ - throw std::logic_error("This should never happen."); + return rc; } /** - * Accepts a new client and creates a new client object for it - * using the client factory function. + * Accepts a new client and creates a new client object for it. */ -void Socket::HandleReadableServer(void) +Socket::Ptr Socket::Accept(void) { int fd; sockaddr_storage addr; @@ -627,87 +271,16 @@ void Socket::HandleReadableServer(void) fd = accept(GetFD(), (sockaddr *)&addr, &addrlen); if (fd < 0) { - #ifndef _WIN32 +#ifndef _WIN32 BOOST_THROW_EXCEPTION(socket_error() << boost::errinfo_api_function("accept") << boost::errinfo_errno(errno)); - #else /* _WIN32 */ +#else /* _WIN32 */ BOOST_THROW_EXCEPTION(socket_error() << boost::errinfo_api_function("accept") << errinfo_win32_error(WSAGetLastError())); - #endif /* _WIN32 */ +#endif /* _WIN32 */ } - Socket::Ptr client = boost::make_shared(); - client->SetFD(fd); - OnNewClient(GetSelf(), client); -} - -/** - * Checks whether data should be written for this socket object. - * - * @returns true if the socket should be registered for writing, false otherwise. - */ -bool Socket::WantsToWrite(void) const -{ - if (m_Listening) - return WantsToWriteServer(); - else - return WantsToWriteClient(); -} - -/** - * Checks whether data should be read for this socket object. - * - * @returns true if the socket should be registered for reading, false otherwise. - */ -bool Socket::WantsToRead(void) const -{ - if (m_Listening) - return WantsToReadServer(); - else - return WantsToReadClient(); -} - -/** - * Checks whether data should be read for this socket. - * - * @returns true - */ -bool Socket::WantsToReadClient(void) const -{ - return !IsReadEOF(); -} - -/** - * Checks whether data should be written for this socket. - * - * @returns true if data should be written, false otherwise. - */ -bool Socket::WantsToWriteClient(void) const -{ - if (m_SendQueue->GetAvailableBytes() > 0) - return true; - - return (!IsConnected()); -} - -/** - * Checks whether the TCP server wants to write. - * - * @returns false - */ -bool Socket::WantsToWriteServer(void) const -{ - return false; -} - -/** - * Checks whether the TCP server wants to read (i.e. accept new clients). - * - * @returns true - */ -bool Socket::WantsToReadServer(void) const -{ - return true; + return boost::make_shared(fd); } diff --git a/lib/base/socket.h b/lib/base/socket.h index 1a8368242..94d6e9b26 100644 --- a/lib/base/socket.h +++ b/lib/base/socket.h @@ -21,7 +21,7 @@ #define SOCKET_H #include "base/i2-base.h" -#include "base/fifo.h" +#include "base/stream.h" #include #include #include @@ -34,32 +34,26 @@ namespace icinga { * * @ingroup base */ -class I2_BASE_API Socket : public Stream +class I2_BASE_API Socket : public Object { public: typedef shared_ptr Ptr; typedef weak_ptr WeakPtr; Socket(void); + Socket(SOCKET fd); ~Socket(void); - virtual void Start(void); - - virtual void Close(void); + void Close(void); String GetClientAddress(void); String GetPeerAddress(void); - bool IsConnected(void) const; - - virtual size_t GetAvailableBytes(void) const; - virtual size_t Read(void *buffer, size_t size); - virtual size_t Peek(void *buffer, size_t size); - virtual void Write(const void *buffer, size_t size); + size_t Read(void *buffer, size_t size); + size_t Write(const void *buffer, size_t size); void Listen(void); - - boost::signals2::signal OnNewClient; + Socket::Ptr Accept(void); protected: void SetFD(SOCKET fd); @@ -73,44 +67,8 @@ protected: private: SOCKET m_FD; /**< The socket descriptor. */ - bool m_Connected; - bool m_Listening; - - boost::thread m_ReadThread; - boost::thread m_WriteThread; - - boost::condition_variable m_WriteCV; - - void ReadThreadProc(void); - void WriteThreadProc(void); - - void ExceptionEventHandler(void); static String GetAddressFromSockaddr(sockaddr *address, socklen_t len); - - void CloseInternal(void); - - FIFO::Ptr m_SendQueue; - FIFO::Ptr m_RecvQueue; - - void HandleWritableClient(void); - void HandleReadableClient(void); - - void HandleWritableServer(void); - void HandleReadableServer(void); - - void HandleReadable(void); - void HandleWritable(void); - void HandleException(void); - - bool WantsToWriteClient(void) const; - bool WantsToReadClient(void) const; - - bool WantsToWriteServer(void) const; - bool WantsToReadServer(void) const; - - bool WantsToWrite(void) const; - bool WantsToRead(void) const; }; class socket_error : virtual public std::exception, virtual public boost::exception { }; diff --git a/lib/base/stdiostream.cpp b/lib/base/stdiostream.cpp index 477208a65..959e377dc 100644 --- a/lib/base/stdiostream.cpp +++ b/lib/base/stdiostream.cpp @@ -31,66 +31,15 @@ using namespace icinga; * the stream's destructor deletes the inner stream. */ StdioStream::StdioStream(std::iostream *innerStream, bool ownsStream) - : m_InnerStream(innerStream), m_OwnsStream(ownsStream), - m_ReadAheadBuffer(boost::make_shared()) -{ - m_ReadAheadBuffer->Start(); -} - -/** - * Destructor for the StdioStream class. - */ -StdioStream::~StdioStream(void) -{ - m_ReadAheadBuffer->Close(); -} - -void StdioStream::Start(void) -{ - SetConnected(true); - - Stream::Start(); -} - -size_t StdioStream::GetAvailableBytes(void) const -{ - ObjectLock olock(this); - - if (m_InnerStream->eof() && m_ReadAheadBuffer->GetAvailableBytes() == 0) - return 0; - else - return 1024; /* doesn't have to be accurate */ -} + : m_InnerStream(innerStream), m_OwnsStream(ownsStream) +{ } size_t StdioStream::Read(void *buffer, size_t size) { ObjectLock olock(this); - size_t peek_len, read_len; - - peek_len = m_ReadAheadBuffer->GetAvailableBytes(); - peek_len = m_ReadAheadBuffer->Read(buffer, peek_len); - - m_InnerStream->read(static_cast(buffer) + peek_len, size - peek_len); - read_len = m_InnerStream->gcount(); - - return peek_len + read_len; -} - -size_t StdioStream::Peek(void *buffer, size_t size) -{ - ObjectLock olock(this); - - size_t peek_len, read_len; - - peek_len = m_ReadAheadBuffer->GetAvailableBytes(); - peek_len = m_ReadAheadBuffer->Peek(buffer, peek_len); - - m_InnerStream->read(static_cast(buffer) + peek_len, size - peek_len); - read_len = m_InnerStream->gcount(); - - m_ReadAheadBuffer->Write(static_cast(buffer) + peek_len, read_len); - return peek_len + read_len; + m_InnerStream->read(static_cast(buffer), size); + return m_InnerStream->gcount(); } void StdioStream::Write(const void *buffer, size_t size) @@ -104,6 +53,4 @@ void StdioStream::Close(void) { if (m_OwnsStream) delete m_InnerStream; - - Stream::Close(); } diff --git a/lib/base/stdiostream.h b/lib/base/stdiostream.h index e6c09ead6..fdd544f73 100644 --- a/lib/base/stdiostream.h +++ b/lib/base/stdiostream.h @@ -33,13 +33,8 @@ public: typedef weak_ptr WeakPtr; StdioStream(std::iostream *innerStream, bool ownsStream); - ~StdioStream(void); - virtual void Start(void); - - virtual size_t GetAvailableBytes(void) const; virtual size_t Read(void *buffer, size_t size); - virtual size_t Peek(void *buffer, size_t size); virtual void Write(const void *buffer, size_t size); virtual void Close(void); @@ -47,7 +42,6 @@ public: private: std::iostream *m_InnerStream; bool m_OwnsStream; - FIFO::Ptr m_ReadAheadBuffer; }; } diff --git a/lib/base/stream.cpp b/lib/base/stream.cpp index c086fcef8..1405194c4 100644 --- a/lib/base/stream.cpp +++ b/lib/base/stream.cpp @@ -24,112 +24,10 @@ using namespace icinga; -Stream::Stream(void) - : m_Connected(false), m_ReadEOF(false), m_WriteEOF(false) -{ } - -Stream::~Stream(void) -{ - ASSERT(!m_Running); -} - -bool Stream::IsConnected(void) const -{ - ObjectLock olock(this); - - return m_Connected; -} - -bool Stream::IsReadEOF(void) const -{ - ObjectLock olock(this); - - return m_ReadEOF; -} - -bool Stream::IsWriteEOF(void) const -{ - ObjectLock olock(this); - - return m_WriteEOF; -} - -void Stream::SetConnected(bool connected) -{ - bool changed; - - { - ObjectLock olock(this); - changed = (m_Connected != connected); - m_Connected = connected; - } - - if (changed) { - if (connected) - OnConnected(GetSelf()); - else - OnClosed(GetSelf()); - } -} - -void Stream::SetReadEOF(bool eof) -{ - ObjectLock olock(this); - - m_ReadEOF = eof; -} - -void Stream::SetWriteEOF(bool eof) -{ - ObjectLock olock(this); - - m_WriteEOF = eof; -} - -/** - * Checks whether an exception is available for this stream and re-throws - * the exception if there is one. - */ -void Stream::CheckException(void) -{ - ObjectLock olock(this); - - if (m_Exception) - rethrow_exception(m_Exception); -} - -void Stream::SetException(boost::exception_ptr exception) -{ - ObjectLock olock(this); - - m_Exception = exception; -} - -boost::exception_ptr Stream::GetException(void) -{ - return m_Exception; -} - -void Stream::Start(void) -{ - ObjectLock olock(this); - - m_Running = true; -} - -void Stream::Close(void) -{ - { - ObjectLock olock(this); - - m_Running = false; - } - - SetConnected(false); -} - bool Stream::ReadLine(String *line, size_t maxLength) { + BOOST_THROW_EXCEPTION(std::runtime_error("Not implemented.")); + /* char *buffer = new char[maxLength]; size_t rc = Peek(buffer, maxLength); @@ -161,7 +59,7 @@ bool Stream::ReadLine(String *line, size_t maxLength) return true; } - delete buffer; + delete buffer;*/ return false; } diff --git a/lib/base/stream.h b/lib/base/stream.h index a3e1674c2..510cbb0cd 100644 --- a/lib/base/stream.h +++ b/lib/base/stream.h @@ -40,28 +40,6 @@ public: typedef shared_ptr Ptr; typedef weak_ptr WeakPtr; - Stream(void); - ~Stream(void); - - virtual void Start(void); - - /** - * Retrieves the number of bytes available for reading. - * - * @returns The number of available bytes. - */ - virtual size_t GetAvailableBytes(void) const = 0; - - /** - * Reads data from the stream without advancing the read pointer. - * - * @param buffer The buffer where data should be stored. May be NULL if - * you're not actually interested in the data. - * @param count The number of bytes to read from the queue. - * @returns The number of bytes actually read. - */ - virtual size_t Peek(void *buffer, size_t count) = 0; - /** * Reads data from the stream. * @@ -84,34 +62,9 @@ public: /** * Closes the stream and releases resources. */ - virtual void Close(void); - - bool IsConnected(void) const; - bool IsReadEOF(void) const; - bool IsWriteEOF(void) const; + virtual void Close(void) = 0; bool ReadLine(String *line, size_t maxLength = 4096); - - boost::exception_ptr GetException(void); - void CheckException(void); - - boost::signals2::signal OnConnected; - boost::signals2::signal OnDataAvailable; - boost::signals2::signal OnClosed; - -protected: - void SetConnected(bool connected); - void SetReadEOF(bool eof); - void SetWriteEOF(bool eof); - - void SetException(boost::exception_ptr exception); - -private: - bool m_Running; - bool m_Connected; - bool m_ReadEOF; - bool m_WriteEOF; - boost::exception_ptr m_Exception; }; } diff --git a/lib/base/stream_bio.cpp b/lib/base/stream_bio.cpp index 814838b24..324d0735e 100644 --- a/lib/base/stream_bio.cpp +++ b/lib/base/stream_bio.cpp @@ -123,7 +123,6 @@ static int I2Stream_write(BIO *bi, const char *in, int inl) { I2Stream_bio_t *bp = (I2Stream_bio_t *)bi->ptr; bp->StreamObj->Write(in, inl); - return inl; } diff --git a/lib/base/tcpsocket.cpp b/lib/base/tcpsocket.cpp index b656d268f..63e2cefb7 100644 --- a/lib/base/tcpsocket.cpp +++ b/lib/base/tcpsocket.cpp @@ -31,7 +31,7 @@ using namespace icinga; * @param service The service. * @param family The address family for the socket. */ -void TcpSocket::Bind(String service, int family) +void TcpSocket::Bind(const String& service, int family) { Bind(String(), service, family); } @@ -43,7 +43,7 @@ void TcpSocket::Bind(String service, int family) * @param service The service. * @param family The address family for the socket. */ -void TcpSocket::Bind(String node, String service, int family) +void TcpSocket::Bind(const String& node, const String& service, int family) { addrinfo hints; addrinfo *result; @@ -75,8 +75,6 @@ void TcpSocket::Bind(String node, String service, int family) if (fd == INVALID_SOCKET) continue; - SetFD(fd); - const int optFalse = 0; setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, reinterpret_cast(&optFalse), sizeof(optFalse)); @@ -87,23 +85,20 @@ void TcpSocket::Bind(String node, String service, int family) int rc = bind(fd, info->ai_addr, info->ai_addrlen); -#ifdef _WIN32 - if (rc < 0 && WSAGetLastError() != WSAEWOULDBLOCK) { -#else /* _WIN32 */ - if (rc < 0 && errno != EINPROGRESS) { -#endif /* _WIN32 */ + if (rc < 0) { closesocket(fd); - SetFD(INVALID_SOCKET); continue; } + SetFD(fd); + break; } freeaddrinfo(result); - if (fd == INVALID_SOCKET) + if (GetFD() == INVALID_SOCKET) BOOST_THROW_EXCEPTION(std::runtime_error("Could not create a suitable socket.")); } @@ -145,31 +140,21 @@ void TcpSocket::Connect(const String& node, const String& service) if (fd == INVALID_SOCKET) continue; - SetFD(fd); - rc = connect(fd, info->ai_addr, info->ai_addrlen); -#ifdef _WIN32 - if (rc < 0 && WSAGetLastError() != WSAEWOULDBLOCK) { -#else /* _WIN32 */ - if (rc < 0 && errno != EINPROGRESS) { -#endif /* _WIN32 */ + if (rc < 0) { closesocket(fd); - SetFD(INVALID_SOCKET); continue; } - if (rc >= 0) { - SetConnected(true); - OnConnected(GetSelf()); - } + SetFD(fd); break; } freeaddrinfo(result); - if (fd == INVALID_SOCKET) - BOOST_THROW_EXCEPTION(std::runtime_error("Could not create a suitable socket.")); + if (GetFD() == INVALID_SOCKET) + BOOST_THROW_EXCEPTION(std::runtime_error("Could not connect to remote host.")); } diff --git a/lib/base/tcpsocket.h b/lib/base/tcpsocket.h index 48a4c0d0e..eee873016 100644 --- a/lib/base/tcpsocket.h +++ b/lib/base/tcpsocket.h @@ -37,8 +37,8 @@ public: typedef shared_ptr Ptr; typedef weak_ptr WeakPtr; - void Bind(String service, int family); - void Bind(String node, String service, int family); + void Bind(const String& service, int family); + void Bind(const String& node, const String& service, int family); void Connect(const String& node, const String& service); }; diff --git a/lib/base/tlsstream.cpp b/lib/base/tlsstream.cpp index fa2577c18..05a9866f0 100644 --- a/lib/base/tlsstream.cpp +++ b/lib/base/tlsstream.cpp @@ -37,51 +37,39 @@ bool I2_EXPORT TlsStream::m_SSLIndexInitialized = false; * @param sslContext The SSL context for the client. */ TlsStream::TlsStream(const Stream::Ptr& innerStream, TlsRole role, shared_ptr sslContext) - : m_SSLContext(sslContext), m_SendQueue(boost::make_shared()), m_RecvQueue(boost::make_shared()), - m_InnerStream(innerStream), m_Role(role) + : m_SSLContext(sslContext), m_Role(role) { - m_InnerStream->OnDataAvailable.connect(boost::bind(&TlsStream::DataAvailableHandler, this)); - m_InnerStream->OnClosed.connect(boost::bind(&TlsStream::ClosedHandler, this)); - m_SendQueue->Start(); - m_RecvQueue->Start(); -} - -void TlsStream::Start(void) -{ - { - boost::mutex::scoped_lock lock(m_SSLMutex); - - m_SSL = shared_ptr(SSL_new(m_SSLContext.get()), SSL_free); - - m_SSLContext.reset(); + m_InnerStream = dynamic_pointer_cast(innerStream); + + if (!m_InnerStream) + m_InnerStream = boost::make_shared(innerStream); + + m_SSL = shared_ptr(SSL_new(m_SSLContext.get()), SSL_free); - if (!m_SSL) { - BOOST_THROW_EXCEPTION(openssl_error() - << boost::errinfo_api_function("SSL_new") - << errinfo_openssl_error(ERR_get_error())); - } + m_SSLContext.reset(); - if (!m_SSLIndexInitialized) { - m_SSLIndex = SSL_get_ex_new_index(0, const_cast("TlsStream"), NULL, NULL, NULL); - m_SSLIndexInitialized = true; - } + if (!m_SSL) { + BOOST_THROW_EXCEPTION(openssl_error() + << boost::errinfo_api_function("SSL_new") + << errinfo_openssl_error(ERR_get_error())); + } - SSL_set_ex_data(m_SSL.get(), m_SSLIndex, this); + if (!m_SSLIndexInitialized) { + m_SSLIndex = SSL_get_ex_new_index(0, const_cast("TlsStream"), NULL, NULL, NULL); + m_SSLIndexInitialized = true; + } - SSL_set_verify(m_SSL.get(), SSL_VERIFY_PEER | SSL_VERIFY_FAIL_IF_NO_PEER_CERT, NULL); + SSL_set_ex_data(m_SSL.get(), m_SSLIndex, this); - m_BIO = BIO_new_I2Stream(m_InnerStream); - SSL_set_bio(m_SSL.get(), m_BIO, m_BIO); + SSL_set_verify(m_SSL.get(), SSL_VERIFY_PEER | SSL_VERIFY_FAIL_IF_NO_PEER_CERT, NULL); - if (m_Role == TlsRoleServer) - SSL_set_accept_state(m_SSL.get()); - else - SSL_set_connect_state(m_SSL.get()); - } + m_BIO = BIO_new_I2Stream(m_InnerStream); + SSL_set_bio(m_SSL.get(), m_BIO, m_BIO); - Stream::Start(); - - HandleIO(); + if (m_Role == TlsRoleServer) + SSL_set_accept_state(m_SSL.get()); + else + SSL_set_connect_state(m_SSL.get()); } /** @@ -91,8 +79,6 @@ void TlsStream::Start(void) */ shared_ptr TlsStream::GetClientCertificate(void) const { - boost::mutex::scoped_lock lock(m_SSLMutex); - return shared_ptr(SSL_get_certificate(m_SSL.get()), &Utility::NullDeleter); } @@ -103,90 +89,70 @@ shared_ptr TlsStream::GetClientCertificate(void) const */ shared_ptr TlsStream::GetPeerCertificate(void) const { - boost::mutex::scoped_lock lock(m_SSLMutex); - return shared_ptr(SSL_get_peer_certificate(m_SSL.get()), X509_free); } -void TlsStream::DataAvailableHandler(void) +void TlsStream::Handshake(void) { - try { - HandleIO(); - } catch (...) { - SetException(boost::current_exception()); + ASSERT(!OwnsLock()); - Close(); - } -} + int rc; -void TlsStream::ClosedHandler(void) -{ ObjectLock olock(this); - SetException(m_InnerStream->GetException()); - Close(); + while ((rc = SSL_do_handshake(m_SSL.get())) <= 0) { + switch (SSL_get_error(m_SSL.get(), rc)) { + case SSL_ERROR_WANT_READ: + olock.Unlock(); + m_InnerStream->WaitReadable(1); + olock.Lock(); + continue; + case SSL_ERROR_WANT_WRITE: + olock.Unlock(); + m_InnerStream->WaitWritable(1); + olock.Lock(); + continue; + case SSL_ERROR_ZERO_RETURN: + Close(); + return; + default: + I2Stream_check_exception(m_BIO); + BOOST_THROW_EXCEPTION(openssl_error() + << boost::errinfo_api_function("SSL_read") + << errinfo_openssl_error(ERR_get_error())); + } + } } /** * Processes data for the stream. */ -void TlsStream::HandleIO(void) +size_t TlsStream::Read(void *buffer, size_t count) { ASSERT(!OwnsLock()); - char data[16 * 1024]; - int rc; - - if (!IsConnected()) { - boost::mutex::scoped_lock lock(m_SSLMutex); + size_t left = count; - rc = SSL_do_handshake(m_SSL.get()); + ObjectLock olock(this); - if (rc == 1) { - lock.unlock(); + while (left > 0) { + int rc = SSL_read(m_SSL.get(), ((char *)buffer) + (count - left), left); - SetConnected(true); - } else { + if (rc <= 0) { switch (SSL_get_error(m_SSL.get(), rc)) { - case SSL_ERROR_WANT_WRITE: - /* fall through */ case SSL_ERROR_WANT_READ: - return; - case SSL_ERROR_ZERO_RETURN: - Close(); - return; - default: - I2Stream_check_exception(m_BIO); - BOOST_THROW_EXCEPTION(openssl_error() - << boost::errinfo_api_function("SSL_do_handshake") - << errinfo_openssl_error(ERR_get_error())); - } - } - } - - bool new_data = false, read_ok = true; - - while (read_ok) { - boost::mutex::scoped_lock lock(m_SSLMutex); - - rc = SSL_read(m_SSL.get(), data, sizeof(data)); - - if (rc > 0) { - lock.unlock(); - - ObjectLock olock(this); - m_RecvQueue->Write(data, rc); - new_data = true; - } else { - switch (SSL_get_error(m_SSL.get(), rc)) { + olock.Unlock(); + m_InnerStream->WaitReadable(1); + olock.Lock(); + continue; case SSL_ERROR_WANT_WRITE: - /* fall through */ - case SSL_ERROR_WANT_READ: - read_ok = false; - break; + olock.Unlock(); + m_InnerStream->WaitWritable(1); + olock.Lock(); + continue; case SSL_ERROR_ZERO_RETURN: Close(); - return; + return count - left; default: I2Stream_check_exception(m_BIO); BOOST_THROW_EXCEPTION(openssl_error() @@ -194,41 +160,36 @@ void TlsStream::HandleIO(void) << errinfo_openssl_error(ERR_get_error())); } } - } - - if (new_data) - OnDataAvailable(GetSelf()); - - ObjectLock olock(this); - - while (m_SendQueue->GetAvailableBytes() > 0) { - size_t count = m_SendQueue->GetAvailableBytes(); - if (count == 0) - break; - - if (count > sizeof(data)) - count = sizeof(data); + left -= rc; + } - m_SendQueue->Peek(data, count); + return count; +} - olock.Unlock(); +void TlsStream::Write(const void *buffer, size_t count) +{ + ASSERT(!OwnsLock()); - boost::mutex::scoped_lock lock(m_SSLMutex); + size_t left = count; - rc = SSL_write(m_SSL.get(), (const char *)data, count); + ObjectLock olock(this); - if (rc > 0) { - lock.unlock(); + while (left > 0) { + int rc = SSL_write(m_SSL.get(), ((const char *)buffer) + (count - left), left); - olock.Lock(); - m_SendQueue->Read(NULL, rc); - } else { + if (rc <= 0) { switch (SSL_get_error(m_SSL.get(), rc)) { case SSL_ERROR_WANT_READ: - /* fall through */ + olock.Unlock(); + m_InnerStream->WaitReadable(1); + olock.Lock(); + continue; case SSL_ERROR_WANT_WRITE: - return; + olock.Unlock(); + m_InnerStream->WaitWritable(1); + olock.Lock(); + continue; case SSL_ERROR_ZERO_RETURN: Close(); return; @@ -239,6 +200,8 @@ void TlsStream::HandleIO(void) << errinfo_openssl_error(ERR_get_error())); } } + + left -= rc; } } @@ -247,51 +210,5 @@ void TlsStream::HandleIO(void) */ void TlsStream::Close(void) { - { - boost::mutex::scoped_lock lock(m_SSLMutex); - - if (m_SSL) - SSL_shutdown(m_SSL.get()); - } - - { - ObjectLock olock(this); - - m_SendQueue->Close(); - m_RecvQueue->Close(); - } - - Stream::Close(); -} - -size_t TlsStream::GetAvailableBytes(void) const -{ - ObjectLock olock(this); - - return m_RecvQueue->GetAvailableBytes(); -} - -size_t TlsStream::Peek(void *buffer, size_t count) -{ - ObjectLock olock(this); - - return m_RecvQueue->Peek(buffer, count); -} - -size_t TlsStream::Read(void *buffer, size_t count) -{ - ObjectLock olock(this); - - return m_RecvQueue->Read(buffer, count); -} - -void TlsStream::Write(const void *buffer, size_t count) -{ - { - ObjectLock olock(this); - - m_SendQueue->Write(buffer, count); - } - - Utility::QueueAsyncCallback(boost::bind(&TlsStream::HandleIO, this)); + m_InnerStream->Close(); } diff --git a/lib/base/tlsstream.h b/lib/base/tlsstream.h index 0d4d714e8..5741bc985 100644 --- a/lib/base/tlsstream.h +++ b/lib/base/tlsstream.h @@ -21,6 +21,7 @@ #define TLSSTREAM_H #include "base/i2-base.h" +#include "base/bufferedstream.h" #include "base/stream.h" #include "base/fifo.h" #include "base/tlsutility.h" @@ -50,34 +51,24 @@ public: shared_ptr GetClientCertificate(void) const; shared_ptr GetPeerCertificate(void) const; - virtual void Start(void); + void Handshake(void); + virtual void Close(void); - virtual size_t GetAvailableBytes(void) const; - virtual size_t Peek(void *buffer, size_t count); virtual size_t Read(void *buffer, size_t count); virtual void Write(const void *buffer, size_t count); private: shared_ptr m_SSLContext; shared_ptr m_SSL; - mutable boost::mutex m_SSLMutex; BIO *m_BIO; - FIFO::Ptr m_SendQueue; - FIFO::Ptr m_RecvQueue; - - Stream::Ptr m_InnerStream; + BufferedStream::Ptr m_InnerStream; TlsRole m_Role; static int m_SSLIndex; static bool m_SSLIndexInitialized; - void DataAvailableHandler(void); - void ClosedHandler(void); - - void HandleIO(void); - static void NullCertificateDeleter(X509 *certificate); }; diff --git a/lib/base/value.cpp b/lib/base/value.cpp index 556f98482..8e3abe90d 100644 --- a/lib/base/value.cpp +++ b/lib/base/value.cpp @@ -212,7 +212,7 @@ Value Value::Deserialize(const String& jsonString) cJSON *json = cJSON_Parse(jsonString.CStr()); if (!json) - BOOST_THROW_EXCEPTION(std::runtime_error("Invalid JSON String")); + BOOST_THROW_EXCEPTION(std::runtime_error("Invalid JSON String: " + jsonString)); Value value = FromJson(json); cJSON_Delete(json); diff --git a/lib/remoting/Makefile.am b/lib/remoting/Makefile.am index 3c4093a04..57c7a7a2f 100644 --- a/lib/remoting/Makefile.am +++ b/lib/remoting/Makefile.am @@ -16,8 +16,8 @@ libremoting_la_SOURCES = \ endpointmanager.cpp \ endpointmanager.h \ i2-remoting.h \ - jsonrpcconnection.cpp \ - jsonrpcconnection.h \ + jsonrpc.cpp \ + jsonrpc.h \ messagepart.cpp \ messagepart.h \ remoting-type.cpp \ diff --git a/lib/remoting/endpoint.cpp b/lib/remoting/endpoint.cpp index d2e888113..f77d7949c 100644 --- a/lib/remoting/endpoint.cpp +++ b/lib/remoting/endpoint.cpp @@ -19,6 +19,7 @@ #include "remoting/endpoint.h" #include "remoting/endpointmanager.h" +#include "remoting/jsonrpc.h" #include "base/application.h" #include "base/dynamictype.h" #include "base/objectlock.h" @@ -106,30 +107,28 @@ bool Endpoint::IsConnected(void) const if (IsLocalEndpoint()) { return true; } else { - JsonRpcConnection::Ptr client = GetClient(); - - return (client && client->GetStream()->IsConnected()); + return GetClient(); } } -JsonRpcConnection::Ptr Endpoint::GetClient(void) const +Stream::Ptr Endpoint::GetClient(void) const { ObjectLock olock(this); return m_Client; } -void Endpoint::SetClient(const JsonRpcConnection::Ptr& client) +void Endpoint::SetClient(const Stream::Ptr& client) { - client->OnNewMessage.connect(boost::bind(&Endpoint::NewMessageHandler, this, _2)); - client->OnClosed.connect(boost::bind(&Endpoint::ClientClosedHandler, this)); - { ObjectLock olock(this); m_Client = client; } + boost::thread thread(boost::bind(&Endpoint::MessageThreadProc, this, client)); + thread.detach(); + OnConnected(GetSelf()); } @@ -261,7 +260,7 @@ void Endpoint::ProcessRequest(const Endpoint::Ptr& sender, const RequestMessage& Utility::QueueAsyncCallback(boost::bind(boost::ref(*it->second), GetSelf(), sender, request)); } else { - GetClient()->SendMessage(request); + JsonRpc::SendMessage(GetClient(), request); } } @@ -272,61 +271,53 @@ void Endpoint::ProcessResponse(const Endpoint::Ptr& sender, const ResponseMessag if (IsLocalEndpoint()) EndpointManager::GetInstance()->ProcessResponseMessage(sender, response); - else - GetClient()->SendMessage(response); -} - -void Endpoint::NewMessageHandler(const MessagePart& message) -{ - Endpoint::Ptr sender = GetSelf(); - - if (ResponseMessage::IsResponseMessage(message)) { - /* rather than routing the message to the right virtual - * endpoint we just process it here right away. */ - EndpointManager::GetInstance()->ProcessResponseMessage(sender, message); - return; + else { + JsonRpc::SendMessage(GetClient(), response); } - - RequestMessage request = message; - - String method; - if (!request.GetMethod(&method)) - return; - - String id; - if (request.GetID(&id)) - EndpointManager::GetInstance()->SendAnycastMessage(sender, request); - else - EndpointManager::GetInstance()->SendMulticastMessage(sender, request); } -void Endpoint::ClientClosedHandler(void) +void Endpoint::MessageThreadProc(const Stream::Ptr& stream) { - ASSERT(!OwnsLock()); - - /*try { - GetClient()->CheckException(); - } catch (const exception& ex) { - stringstream message; - message << "Error occured for JSON-RPC socket: Message=" << diagnostic_information(ex); - - Log(LogWarning, "jsonrpc", message.str()); - }*/ - - Log(LogWarning, "jsonrpc", "Lost connection to endpoint: identity=" + GetName()); - - { - ObjectLock olock(this); - - // TODO: _only_ clear non-persistent subscriptions - // unregister ourselves if no persistent subscriptions are left (use a - // timer for that, once we have a TTL property for the topics) - ClearSubscriptions(); - - m_Client.reset(); + try { + for (;;) { + MessagePart message = JsonRpc::ReadMessage(stream); + Endpoint::Ptr sender = GetSelf(); + + if (ResponseMessage::IsResponseMessage(message)) { + /* rather than routing the message to the right virtual + * endpoint we just process it here right away. */ + EndpointManager::GetInstance()->ProcessResponseMessage(sender, message); + return; + } + + RequestMessage request = message; + + String method; + if (!request.GetMethod(&method)) + return; + + String id; + if (request.GetID(&id)) + EndpointManager::GetInstance()->SendAnycastMessage(sender, request); + else + EndpointManager::GetInstance()->SendMulticastMessage(sender, request); + } + } catch (const std::exception& ex) { + Log(LogWarning, "jsonrpc", "Lost connection to endpoint '" + GetName() + "': " + boost::diagnostic_information(ex)); + + { + ObjectLock olock(this); + + // TODO: _only_ clear non-persistent subscriptions + // unregister ourselves if no persistent subscriptions are left (use a + // timer for that, once we have a TTL property for the topics) + ClearSubscriptions(); + + m_Client.reset(); + } + + OnDisconnected(GetSelf()); } - - OnDisconnected(GetSelf()); } /** diff --git a/lib/remoting/endpoint.h b/lib/remoting/endpoint.h index 5a09732ad..272dd66d4 100644 --- a/lib/remoting/endpoint.h +++ b/lib/remoting/endpoint.h @@ -23,8 +23,8 @@ #include "remoting/i2-remoting.h" #include "remoting/requestmessage.h" #include "remoting/responsemessage.h" -#include "remoting/jsonrpcconnection.h" #include "base/dynamicobject.h" +#include "base/stream.h" #include namespace icinga @@ -50,8 +50,8 @@ public: static Endpoint::Ptr GetByName(const String& name); - JsonRpcConnection::Ptr GetClient(void) const; - void SetClient(const JsonRpcConnection::Ptr& client); + Stream::Ptr GetClient(void) const; + void SetClient(const Stream::Ptr& client); void RegisterSubscription(const String& topic); void UnregisterSubscription(const String& topic); @@ -85,7 +85,7 @@ private: Attribute m_Node; Attribute m_Service; - JsonRpcConnection::Ptr m_Client; + Stream::Ptr m_Client; bool m_ReceivedWelcome; /**< Have we received a welcome message from this endpoint? */ @@ -94,8 +94,7 @@ private: std::map > > m_TopicHandlers; - void NewMessageHandler(const MessagePart& message); - void ClientClosedHandler(void); + void MessageThreadProc(const Stream::Ptr& stream); }; } diff --git a/lib/remoting/endpointmanager.cpp b/lib/remoting/endpointmanager.cpp index 945633fab..af517bfd9 100644 --- a/lib/remoting/endpointmanager.cpp +++ b/lib/remoting/endpointmanager.cpp @@ -24,6 +24,7 @@ #include "base/convert.h" #include "base/utility.h" #include "base/tlsutility.h" +#include "base/networkstream.h" #include #include @@ -129,14 +130,29 @@ void EndpointManager::AddListener(const String& service) Log(LogInformation, "icinga", s.str()); TcpSocket::Ptr server = boost::make_shared(); + server->Bind(service, AF_INET6); + + boost::thread thread(boost::bind(&EndpointManager::ListenerThreadProc, this, server)); + thread.detach(); m_Servers.insert(server); - server->OnNewClient.connect(boost::bind(&EndpointManager::NewClientHandler, - this, _2, TlsRoleServer)); +} - server->Bind(service, AF_INET6); +void EndpointManager::ListenerThreadProc(const Socket::Ptr& server) +{ server->Listen(); - server->Start(); + + for (;;) { + Socket::Ptr client = server->Accept(); + + try { + NewClientHandler(client, TlsRoleServer); + } catch (const std::exception& ex) { + std::stringstream message; + message << "Error for new JSON-RPC socket: " << boost::diagnostic_information(ex); + Log(LogInformation, "remoting", message.str()); + } + } } /** @@ -146,16 +162,23 @@ void EndpointManager::AddListener(const String& service) * @param service The remote port. */ void EndpointManager::AddConnection(const String& node, const String& service) { - ObjectLock olock(this); + { + ObjectLock olock(this); - shared_ptr sslContext = m_SSLContext; + shared_ptr sslContext = m_SSLContext; - if (!sslContext) - BOOST_THROW_EXCEPTION(std::logic_error("SSL context is required for AddConnection()")); + if (!sslContext) + BOOST_THROW_EXCEPTION(std::logic_error("SSL context is required for AddConnection()")); + } TcpSocket::Ptr client = boost::make_shared(); - client->Connect(node, service); - NewClientHandler(client, TlsRoleClient); + + try { + client->Connect(node, service); + NewClientHandler(client, TlsRoleClient); + } catch (const std::exception& ex) { + Log(LogInformation, "remoting", "Could not connect to " + node + ":" + service + ": " + ex.what()); + } } /** @@ -165,25 +188,10 @@ void EndpointManager::AddConnection(const String& node, const String& service) { */ void EndpointManager::NewClientHandler(const Socket::Ptr& client, TlsRole role) { - TlsStream::Ptr tlsStream = boost::make_shared(client, role, m_SSLContext); - - m_PendingClients.insert(tlsStream); - tlsStream->OnConnected.connect(boost::bind(&EndpointManager::ClientConnectedHandler, this, _1)); - tlsStream->OnClosed.connect(boost::bind(&EndpointManager::ClientClosedHandler, this, _1)); - - client->Start(); - tlsStream->Start(); -} - -void EndpointManager::ClientConnectedHandler(const Stream::Ptr& client) -{ - TlsStream::Ptr tlsStream = static_pointer_cast(client); - JsonRpcConnection::Ptr jclient = boost::make_shared(tlsStream); + NetworkStream::Ptr netStream = boost::make_shared(client); - { - ObjectLock olock(this); - m_PendingClients.erase(tlsStream); - } + TlsStream::Ptr tlsStream = boost::make_shared(netStream, role, m_SSLContext); + tlsStream->Handshake(); shared_ptr cert = tlsStream->GetPeerCertificate(); String identity = GetCertificateCN(cert); @@ -195,17 +203,7 @@ void EndpointManager::ClientConnectedHandler(const Stream::Ptr& client) if (!endpoint) endpoint = Endpoint::MakeEndpoint(identity, true); - endpoint->SetClient(jclient); -} - -void EndpointManager::ClientClosedHandler(const Stream::Ptr& client) -{ - TlsStream::Ptr tlsStream = static_pointer_cast(client); - - { - ObjectLock olock(this); - m_PendingClients.erase(tlsStream); - } + endpoint->SetClient(tlsStream); } /** diff --git a/lib/remoting/endpointmanager.h b/lib/remoting/endpointmanager.h index 7acb1c865..c98c9265e 100644 --- a/lib/remoting/endpointmanager.h +++ b/lib/remoting/endpointmanager.h @@ -81,7 +81,6 @@ private: Timer::Ptr m_ReconnectTimer; std::set m_Servers; - std::set m_PendingClients; /** * Information about a pending API request. @@ -112,8 +111,8 @@ private: void ReconnectTimerHandler(void); void NewClientHandler(const Socket::Ptr& client, TlsRole role); - void ClientConnectedHandler(const Stream::Ptr& client); - void ClientClosedHandler(const Stream::Ptr& client); + + void ListenerThreadProc(const Socket::Ptr& server); }; } diff --git a/lib/remoting/jsonrpcconnection.cpp b/lib/remoting/jsonrpc.cpp similarity index 63% rename from lib/remoting/jsonrpcconnection.cpp rename to lib/remoting/jsonrpc.cpp index a6225288e..47d8d8535 100644 --- a/lib/remoting/jsonrpcconnection.cpp +++ b/lib/remoting/jsonrpc.cpp @@ -17,64 +17,41 @@ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. * ******************************************************************************/ -#include "remoting/jsonrpcconnection.h" +#include "remoting/jsonrpc.h" #include "base/netstring.h" #include "base/objectlock.h" #include "base/logger_fwd.h" #include +#include using namespace icinga; -/** - * Constructor for the JsonRpcConnection class. - * - * @param stream The stream. - */ -JsonRpcConnection::JsonRpcConnection(const Stream::Ptr& stream) - : Connection(stream) -{ } - /** * Sends a message to the connected peer. * * @param message The message. */ -void JsonRpcConnection::SendMessage(const MessagePart& message) +void JsonRpc::SendMessage(const Stream::Ptr& stream, const MessagePart& message) { - ObjectLock olock(this); - Value value = message.GetDictionary(); String json = value.Serialize(); //std::cerr << ">> " << json << std::endl; - NetString::WriteStringToStream(GetStream(), json); + NetString::WriteStringToStream(stream, json); } -/** - * Processes inbound data. - */ -void JsonRpcConnection::ProcessData(void) +MessagePart JsonRpc::ReadMessage(const Stream::Ptr& stream) { - ObjectLock olock(this); - String jsonString; + if (!NetString::ReadStringFromStream(stream, &jsonString)) + BOOST_THROW_EXCEPTION(std::runtime_error("ReadStringFromStream signalled EOF.")); - while (NetString::ReadStringFromStream(GetStream(), &jsonString)) { - //std::cerr << "<< " << jsonString << std::endl; - - try { - Value value = Value::Deserialize(jsonString); + //std::cerr << "<< " << jsonString << std::endl; + Value value = Value::Deserialize(jsonString); - if (!value.IsObjectType()) { - BOOST_THROW_EXCEPTION(std::invalid_argument("JSON-RPC" - " message must be a dictionary.")); - } - - MessagePart mp(value); - OnNewMessage(GetSelf(), mp); - } catch (const std::exception& ex) { - Log(LogCritical, "remoting", "Exception" - " while processing message from JSON-RPC client: " + - boost::diagnostic_information(ex)); - } + if (!value.IsObjectType()) { + BOOST_THROW_EXCEPTION(std::invalid_argument("JSON-RPC" + " message must be a dictionary.")); } + + return MessagePart(value); } diff --git a/components/livestatus/connection.h b/lib/remoting/jsonrpc.h similarity index 76% rename from components/livestatus/connection.h rename to lib/remoting/jsonrpc.h index 49d36ec02..31cf1ed49 100644 --- a/components/livestatus/connection.h +++ b/lib/remoting/jsonrpc.h @@ -17,30 +17,31 @@ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. * ******************************************************************************/ -#ifndef LIVESTATUSCONNECTION_H -#define LIVESTATUSCONNECTION_H +#ifndef JSONRPC_H +#define JSONRPC_H -#include "base/connection.h" +#include "remoting/i2-remoting.h" +#include "remoting/messagepart.h" +#include "base/stream.h" -using namespace icinga; - -namespace livestatus +namespace icinga { -class LivestatusConnection : public Connection +/** + * A JSON-RPC connection. + * + * @ingroup remoting + */ +class I2_REMOTING_API JsonRpc { public: - typedef shared_ptr Ptr; - typedef weak_ptr WeakPtr; - - LivestatusConnection(const Stream::Ptr& stream); - -protected: - std::vector m_Lines; + static void SendMessage(const Stream::Ptr& stream, const MessagePart& message); + static MessagePart ReadMessage(const Stream::Ptr& stream); - virtual void ProcessData(void); +private: + JsonRpc(void); }; } -#endif /* LIVESTATUSCONNECTION_H */ +#endif /* JSONRPC_H */ diff --git a/lib/remoting/messagepart.h b/lib/remoting/messagepart.h index 1c02fd258..ac703b9bd 100644 --- a/lib/remoting/messagepart.h +++ b/lib/remoting/messagepart.h @@ -39,8 +39,8 @@ class I2_REMOTING_API MessagePart { public: MessagePart(void); + MessagePart(const MessagePart& message); explicit MessagePart(const Dictionary::Ptr& dictionary); - explicit MessagePart(const MessagePart& message); Dictionary::Ptr GetDictionary(void) const; -- 2.40.0