]> granicus.if.org Git - icinga2/commitdiff
Refactor the socket subsystem.
authorGunnar Beutner <gunnar@blade9.beutner.name>
Thu, 4 Apr 2013 14:08:02 +0000 (16:08 +0200)
committerGunnar Beutner <gunnar@blade9.beutner.name>
Thu, 4 Apr 2013 14:08:40 +0000 (16:08 +0200)
33 files changed:
components/livestatus/Makefile.am
components/livestatus/component.cpp
components/livestatus/component.h
components/livestatus/connection.cpp [deleted file]
lib/base/Makefile.am
lib/base/bufferedstream.cpp [new file with mode: 0644]
lib/base/bufferedstream.h [moved from lib/remoting/jsonrpcconnection.h with 63% similarity]
lib/base/dynamicobject.cpp
lib/base/fifo.cpp
lib/base/fifo.h
lib/base/netstring.cpp
lib/base/networkstream.cpp [moved from lib/base/connection.cpp with 59% similarity]
lib/base/networkstream.h [moved from lib/base/connection.h with 75% similarity]
lib/base/socket.cpp
lib/base/socket.h
lib/base/stdiostream.cpp
lib/base/stdiostream.h
lib/base/stream.cpp
lib/base/stream.h
lib/base/stream_bio.cpp
lib/base/tcpsocket.cpp
lib/base/tcpsocket.h
lib/base/tlsstream.cpp
lib/base/tlsstream.h
lib/base/value.cpp
lib/remoting/Makefile.am
lib/remoting/endpoint.cpp
lib/remoting/endpoint.h
lib/remoting/endpointmanager.cpp
lib/remoting/endpointmanager.h
lib/remoting/jsonrpc.cpp [moved from lib/remoting/jsonrpcconnection.cpp with 63% similarity]
lib/remoting/jsonrpc.h [moved from components/livestatus/connection.h with 76% similarity]
lib/remoting/messagepart.h

index 286aa1d6457652873196f999f30eab10bc236b1d..d5eb8b177f7d3fb6ffa756d38a135b964108792e 100644 (file)
@@ -22,8 +22,6 @@ liblivestatus_la_SOURCES = \
        commentstable.h \
        component.cpp \
        component.h \
-       connection.cpp \
-       connection.h \
        contactgroupstable.cpp \
        contactgroupstable.h \
        contactstable.cpp \
index c00dc50a11cc015c88b2d727baf85dfa8d334f44..fe30e165ddca88e4f839b39d8437e167b3e7182c 100644 (file)
@@ -21,6 +21,7 @@
 #include "base/dynamictype.h"
 #include "base/logger_fwd.h"
 #include "base/tcpsocket.h"
+#include "base/networkstream.h"
 #include "base/application.h"
 #include <boost/smart_ptr/make_shared.hpp>
 
@@ -48,10 +49,10 @@ void LivestatusComponent::Start(void)
        socket->Bind("6558", AF_INET);
 //#endif /* _WIN32 */
 
-       socket->OnNewClient.connect(boost::bind(&LivestatusComponent::NewClientHandler, this, _2));
-       socket->Listen();
-       socket->Start();
        m_Listener = socket;
+
+       boost::thread thread(boost::bind(&LivestatusComponent::ServerThreadProc, this, socket));
+       thread.detach();
 }
 
 String LivestatusComponent::GetSocketPath(void) const
@@ -63,21 +64,40 @@ String LivestatusComponent::GetSocketPath(void) const
                return socketPath;
 }
 
-void LivestatusComponent::NewClientHandler(const Socket::Ptr& client)
+void LivestatusComponent::ServerThreadProc(const Socket::Ptr& server)
 {
-       Log(LogInformation, "livestatus", "Client connected");
+       server->Listen();
+
+       for (;;) {
+               Socket::Ptr client = server->Accept();
 
-       LivestatusConnection::Ptr lconnection = boost::make_shared<LivestatusConnection>(client);
-       lconnection->OnClosed.connect(boost::bind(&LivestatusComponent::ClientClosedHandler, this, _1));
+               Log(LogInformation, "livestatus", "Client connected");
 
-       m_Connections.insert(lconnection);
-       client->Start();
+               boost::thread thread(boost::bind(&LivestatusComponent::ClientThreadProc, this, client));
+               thread.detach();
+       }
 }
 
-void LivestatusComponent::ClientClosedHandler(const Connection::Ptr& connection)
+void LivestatusComponent::ClientThreadProc(const Socket::Ptr& client)
 {
-       LivestatusConnection::Ptr lconnection = static_pointer_cast<LivestatusConnection>(connection);
+       Stream::Ptr stream = boost::make_shared<NetworkStream>(client);
+
+       for (;;) {
+               String line;
+               bool read_line = false;
+
+               std::vector<String> lines;
+
+               while (stream->ReadLine(&line)) {
+                       read_line = true;
+
+                       if (line.GetLength() > 0)
+                               lines.push_back(line);
+                       else
+                               break;
+               }
 
-       Log(LogInformation, "livestatus", "Client disconnected");
-       m_Connections.erase(lconnection);
+               Query::Ptr query = boost::make_shared<Query>(lines);
+               query->Execute(stream);
+       }
 }
index 9ec3381dd1c732330daeca1bbfe907346e68f1f9..e197f656a09f78d2e782ffed060a162ee0caf848 100644 (file)
@@ -20,7 +20,7 @@
 #ifndef LIVESTATUSCOMPONENT_H
 #define LIVESTATUSCOMPONENT_H
 
-#include "livestatus/connection.h"
+#include "livestatus/query.h"
 #include "base/dynamicobject.h"
 #include "base/socket.h"
 
@@ -45,10 +45,9 @@ private:
        Attribute<String> m_SocketPath;
 
        Socket::Ptr m_Listener;
-       std::set<LivestatusConnection::Ptr> m_Connections;
 
-       void NewClientHandler(const Socket::Ptr& client);
-       void ClientClosedHandler(const Connection::Ptr& connection);
+       void ServerThreadProc(const Socket::Ptr& server);
+       void ClientThreadProc(const Socket::Ptr& client);
 };
 
 }
diff --git a/components/livestatus/connection.cpp b/components/livestatus/connection.cpp
deleted file mode 100644 (file)
index e22c221..0000000
+++ /dev/null
@@ -1,57 +0,0 @@
-/******************************************************************************
- * Icinga 2                                                                   *
- * Copyright (C) 2012 Icinga Development Team (http://www.icinga.org/)        *
- *                                                                            *
- * This program is free software; you can redistribute it and/or              *
- * modify it under the terms of the GNU General Public License                *
- * as published by the Free Software Foundation; either version 2             *
- * of the License, or (at your option) any later version.                     *
- *                                                                            *
- * This program is distributed in the hope that it will be useful,            *
- * but WITHOUT ANY WARRANTY; without even the implied warranty of             *
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the              *
- * GNU General Public License for more details.                               *
- *                                                                            *
- * You should have received a copy of the GNU General Public License          *
- * along with this program; if not, write to the Free Software Foundation     *
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA.             *
- ******************************************************************************/
-
-#include "livestatus/connection.h"
-#include "livestatus/query.h"
-#include <boost/smart_ptr/make_shared.hpp>
-
-using namespace icinga;
-using namespace livestatus;
-
-LivestatusConnection::LivestatusConnection(const Stream::Ptr& stream)
-       : Connection(stream)
-{ }
-
-void LivestatusConnection::ProcessData(void)
-{
-       String line;
-       bool read_line = false;
-
-       while (GetStream()->ReadLine(&line)) {
-               read_line = true;
-
-               if (line.GetLength() > 0)
-                       m_Lines.push_back(line);
-               else
-                       break;
-       }
-
-       /* Return if we didn't at least read one line. */
-       if (!read_line)
-               return;
-
-       /* Return if we haven't found the end of the query. */
-       if (line.GetLength() > 0 && !GetStream()->IsReadEOF())
-               return;
-
-       Query::Ptr query = boost::make_shared<Query>(m_Lines);
-       m_Lines.clear();
-
-       query->Execute(GetStream());
-}
index 991b177b87d0658dc6d7934ac3ab1659b8a99dd3..86a8d8468681c8aab846a5c8ddd26cbefbe9f71c 100644 (file)
@@ -14,8 +14,8 @@ libbase_la_SOURCES =  \
        array.h \
        attribute.cpp \
        attribute.h \
-       connection.cpp \
-       connection.h \
+       bufferedstream.cpp \
+       bufferedstream.h \
        convert.cpp \
        convert.h \
        dictionary.cpp \
@@ -34,6 +34,8 @@ libbase_la_SOURCES =  \
        logger_fwd.h \
        netstring.cpp \
        netstring.h \
+       networkstream.cpp \
+       networkstream.h \
        object.cpp \
        object.h \
        objectlock.cpp \
diff --git a/lib/base/bufferedstream.cpp b/lib/base/bufferedstream.cpp
new file mode 100644 (file)
index 0000000..6f3db13
--- /dev/null
@@ -0,0 +1,133 @@
+/******************************************************************************
+ * Icinga 2                                                                   *
+ * Copyright (C) 2012 Icinga Development Team (http://www.icinga.org/)        *
+ *                                                                            *
+ * This program is free software; you can redistribute it and/or              *
+ * modify it under the terms of the GNU General Public License                *
+ * as published by the Free Software Foundation; either version 2             *
+ * of the License, or (at your option) any later version.                     *
+ *                                                                            *
+ * This program is distributed in the hope that it will be useful,            *
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of             *
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the              *
+ * GNU General Public License for more details.                               *
+ *                                                                            *
+ * You should have received a copy of the GNU General Public License          *
+ * along with this program; if not, write to the Free Software Foundation     *
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA.             *
+ ******************************************************************************/
+
+#include "base/bufferedstream.h"
+#include "base/objectlock.h"
+#include "base/utility.h"
+#include "base/logger_fwd.h"
+#include <boost/smart_ptr/make_shared.hpp>
+#include <sstream>
+
+using namespace icinga;
+
+BufferedStream::BufferedStream(const Stream::Ptr& innerStream)
+       : m_InnerStream(innerStream), m_RecvQ(boost::make_shared<FIFO>()), m_SendQ(boost::make_shared<FIFO>())
+{
+       boost::thread readThread(boost::bind(&BufferedStream::ReadThreadProc, this));
+       readThread.detach();
+       
+       boost::thread writeThread(boost::bind(&BufferedStream::WriteThreadProc, this));
+       writeThread.detach();
+}
+
+void BufferedStream::ReadThreadProc(void)
+{
+       char buffer[512];
+       
+       try {
+               for (;;) {
+                       size_t rc = m_InnerStream->Read(buffer, sizeof(buffer));
+                       
+                       if (rc == 0)
+                               break;
+                       
+                       boost::mutex::scoped_lock lock(m_Mutex);
+                       m_RecvQ->Write(buffer, rc);
+                       m_ReadCV.notify_all();
+               }
+       } catch (const std::exception& ex) {
+               std::ostringstream msgbuf;
+               msgbuf << "Error for buffered stream (Read): " << boost::diagnostic_information(ex);
+               Log(LogWarning, "base", msgbuf.str());
+
+               Close();
+       }
+}
+
+void BufferedStream::WriteThreadProc(void)
+{
+       char buffer[512];
+
+       try {   
+               for (;;) {
+                       size_t rc;
+       
+                       {
+                               boost::mutex::scoped_lock lock(m_Mutex);
+                               
+                               while (m_SendQ->GetAvailableBytes() == 0)
+                                       m_WriteCV.wait(lock);
+                                       
+                               rc = m_SendQ->Read(buffer, sizeof(buffer));
+                       }               
+                       
+                       m_InnerStream->Write(buffer, rc);
+               }
+       } catch (const std::exception& ex) {
+               std::ostringstream msgbuf;
+               msgbuf << "Error for buffered stream (Write): " << boost::diagnostic_information(ex);
+               Log(LogWarning, "base", msgbuf.str());
+
+               Close();
+       }
+}
+
+void BufferedStream::Close(void)
+{
+       m_InnerStream->Close();
+}
+
+/**
+ * Reads data from the stream.
+ *
+ * @param buffer The buffer where data should be stored. May be NULL if you're
+ *              not actually interested in the data.
+ * @param count The number of bytes to read from the queue.
+ * @returns The number of bytes actually read.
+ */
+size_t BufferedStream::Read(void *buffer, size_t count)
+{
+       boost::mutex::scoped_lock lock(m_Mutex);
+       return m_RecvQ->Read(buffer, count);
+}
+
+/**
+ * Writes data to the stream.
+ *
+ * @param buffer The data that is to be written.
+ * @param count The number of bytes to write.
+ * @returns The number of bytes written
+ */
+void BufferedStream::Write(const void *buffer, size_t count)
+{
+       boost::mutex::scoped_lock lock(m_Mutex);
+       m_SendQ->Write(buffer, count);
+       m_WriteCV.notify_all(); 
+}
+
+void BufferedStream::WaitReadable(size_t count)
+{
+       boost::mutex::scoped_lock lock(m_Mutex);
+
+       while (m_RecvQ->GetAvailableBytes() < count)
+               m_ReadCV.wait(lock);
+}
+
+void BufferedStream::WaitWritable(size_t count)
+{ /* Nothing to do here. */ }
similarity index 63%
rename from lib/remoting/jsonrpcconnection.h
rename to lib/base/bufferedstream.h
index 3aaaa247ea10bcba2ee200beec12c0d802a849ce..d747b93cddc9b5d4c0b1878a41730e28fc8f78e0 100644 (file)
  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA.             *
  ******************************************************************************/
 
-#ifndef JSONRPCCONNECTION_H
-#define JSONRPCCONNECTION_H
+#ifndef BUFFEREDSTREAM_H
+#define BUFFEREDSTREAM_H
 
-#include "remoting/i2-remoting.h"
-#include "remoting/messagepart.h"
-#include "base/connection.h"
-#include <boost/signals2.hpp>
+#include "base/i2-base.h"
+#include "base/stream.h"
+#include "base/fifo.h"
 
 namespace icinga
 {
 
 /**
- * A JSON-RPC connection.
+ * A buffered stream.
  *
- * @ingroup remoting
+ * @ingroup base
  */
-class I2_REMOTING_API JsonRpcConnection : public Connection
+class I2_BASE_API BufferedStream : public Stream
 {
 public:
-       typedef shared_ptr<JsonRpcConnection> Ptr;
-       typedef weak_ptr<JsonRpcConnection> WeakPtr;
+       typedef shared_ptr<BufferedStream> Ptr;
+       typedef weak_ptr<BufferedStream> WeakPtr;
 
-       explicit JsonRpcConnection(const Stream::Ptr& stream);
+       BufferedStream(const Stream::Ptr& innerStream);
 
-       void SendMessage(const MessagePart& message);
+       virtual size_t Read(void *buffer, size_t count);
+       virtual void Write(const void *buffer, size_t count);
 
-       boost::signals2::signal<void (const JsonRpcConnection::Ptr&, const MessagePart&)> OnNewMessage;
+       virtual void Close(void);
 
-protected:
-       virtual void ProcessData(void);
+       void WaitReadable(size_t count);
+       void WaitWritable(size_t count);
+
+private:
+       Stream::Ptr m_InnerStream;
+       
+       FIFO::Ptr m_RecvQ;
+       FIFO::Ptr m_SendQ;
+       
+       boost::exception_ptr m_Exception;
+       
+       boost::mutex m_Mutex;
+       boost::condition_variable m_ReadCV;
+       boost::condition_variable m_WriteCV;
+       
+       void ReadThreadProc(void);
+       void WriteThreadProc(void);
 };
 
 }
 
-#endif /* JSONRPCCONNECTION_H */
+#endif /* BUFFEREDSTREAM_H */
index b32d0a429354aa52920567768424f2f612b5407c..fe1873d7681992474b467424a22a29862551568e 100644 (file)
@@ -446,7 +446,6 @@ void DynamicObject::DumpObjects(const String& filename)
                BOOST_THROW_EXCEPTION(std::runtime_error("Could not open '" + filename + "' file"));
 
        StdioStream::Ptr sfp = boost::make_shared<StdioStream>(&fp, false);
-       sfp->Start();
 
        BOOST_FOREACH(const DynamicType::Ptr& type, DynamicType::GetTypes()) {
                BOOST_FOREACH(const DynamicObject::Ptr& object, type->GetObjects()) {
@@ -503,7 +502,6 @@ void DynamicObject::RestoreObjects(const String& filename)
        fp.open(filename.CStr(), std::ios_base::in);
 
        StdioStream::Ptr sfp = boost::make_shared<StdioStream>(&fp, false);
-       sfp->Start();
 
        unsigned long restored = 0;
 
index f352e5c5fb4760e7de48f6f0e985d0a264bb0664..d0f6bbc8bf0b6b6b699abcfcf3a0273e4174c2d4 100644 (file)
@@ -37,13 +37,6 @@ FIFO::~FIFO(void)
        free(m_Buffer);
 }
 
-void FIFO::Start(void)
-{
-       SetConnected(true);
-
-       Stream::Start();
-}
-
 /**
  * Resizes the FIFO's buffer so that it is at least newSize bytes long.
  *
@@ -95,46 +88,16 @@ void FIFO::Optimize(void)
 }
 
 /**
- * Implements IOQueue::GetAvailableBytes().
- */
-size_t FIFO::GetAvailableBytes(void) const
-{
-       return m_DataSize;
-}
-
-/**
- * Returns a pointer to the start of the read buffer.
- *
- * @returns Pointer to the read buffer.
- */
-/*const void *FIFO::GetReadBuffer(void) const
-{
-       return m_Buffer + m_Offset;
-}*/
-
-/**
- * Implements IOQueue::Peek.
+ * Implements IOQueue::Read.
  */
-size_t FIFO::Peek(void *buffer, size_t count)
+size_t FIFO::Read(void *buffer, size_t count)
 {
-       ASSERT(IsConnected());
-
        if (count > m_DataSize)
                count = m_DataSize;
 
        if (buffer != NULL)
                memcpy(buffer, m_Buffer + m_Offset, count);
 
-       return count;
-}
-
-/**
- * Implements IOQueue::Read.
- */
-size_t FIFO::Read(void *buffer, size_t count)
-{
-       count = Peek(buffer, count);
-
        m_DataSize -= count;
        m_Offset += count;
 
@@ -148,9 +111,15 @@ size_t FIFO::Read(void *buffer, size_t count)
  */
 void FIFO::Write(const void *buffer, size_t count)
 {
-       ASSERT(IsConnected());
-
        ResizeBuffer(m_Offset + m_DataSize + count);
        memcpy(m_Buffer + m_Offset + m_DataSize, buffer, count);
        m_DataSize += count;
 }
+
+void FIFO::Close(void)
+{ }
+
+size_t FIFO::GetAvailableBytes(void) const
+{
+       return m_DataSize;
+}
index 6a701e24de3243de86d74f8824a2630fb431c47a..4c41a9c8981ae8438994309b65f0692b21d64631 100644 (file)
@@ -42,12 +42,11 @@ public:
        FIFO(void);
        ~FIFO(void);
 
-       void Start(void);
+       virtual size_t Read(void *buffer, size_t count);
+       virtual void Write(const void *buffer, size_t count);
+       virtual void Close(void);
 
        size_t GetAvailableBytes(void) const;
-       size_t Peek(void *buffer, size_t count);
-       size_t Read(void *buffer, size_t count);
-       void Write(const void *buffer, size_t count);
 
 private:
        char *m_Buffer;
index f1332cf2fcdbcf61d82ac3b88ad6d03db2548ed9..ba6eab5f9b1c40961b39263f6a1ad1d9cfb35995 100644 (file)
  ******************************************************************************/
 
 #include "base/netstring.h"
+#include "base/utility.h"
 #include <sstream>
 
 using namespace icinga;
 
 /**
- * Reads data from a stream in netString format.
+ * Reads data from a stream in netstring format.
  *
  * @param stream The stream to read from.
  * @param[out] str The String that has been read from the IOQueue.
  * @returns true if a complete String was read from the IOQueue, false otherwise.
  * @exception invalid_argument The input stream is invalid.
- * @see https://github.com/PeterScott/netString-c/blob/master/netString.c
+ * @see https://github.com/PeterScott/netstring-c/blob/master/netstring.c
  */
 bool NetString::ReadStringFromStream(const Stream::Ptr& stream, String *str)
 {
        /* 16 bytes are enough for the header */
-       size_t peek_length, buffer_length = 16;
-       char *buffer = static_cast<char *>(malloc(buffer_length));
+       const size_t header_length = 16;
+       size_t read_length;
+       char *header = static_cast<char *>(malloc(header_length));
 
-       if (buffer == NULL)
+       if (header == NULL)
                BOOST_THROW_EXCEPTION(std::bad_alloc());
 
-       peek_length = stream->Peek(buffer, buffer_length);
+       read_length = 0;
 
-       /* minimum netString length is 3 */
-       if (peek_length < 3) {
-               free(buffer);
-               return false;
+       while (read_length < header_length) {
+               /* Read one byte. */
+               int rc = stream->Read(header + read_length, 1);
+
+               if (rc == 0) {
+                       if (read_length == 0)
+                               return false;
+
+                       BOOST_THROW_EXCEPTION(std::runtime_error("Read() failed."));
+               }
+
+               ASSERT(rc == 1);
+
+               read_length++;
+
+               if (header[read_length - 1] == ':') {
+                       break;
+               } else if (header_length == read_length) {
+                       free(header);
+                       BOOST_THROW_EXCEPTION(std::invalid_argument("Invalid NetString (missing :)"));
+               }
+       }
+
+       /* minimum netstring length is 3 */
+       if (read_length < 3) {
+               free(header);
+               BOOST_THROW_EXCEPTION(std::invalid_argument("Invalid NetString (short header)"));
        }
 
        /* no leading zeros allowed */
-       if (buffer[0] == '0' && isdigit(buffer[1])) {
-               free(buffer);
-               BOOST_THROW_EXCEPTION(std::invalid_argument("Invalid netString (leading zero)"));
+       if (header[0] == '0' && isdigit(header[1])) {
+               free(header);
+               BOOST_THROW_EXCEPTION(std::invalid_argument("Invalid NetString (leading zero)"));
        }
 
        size_t len, i;
 
        len = 0;
-       for (i = 0; i < peek_length && isdigit(buffer[i]); i++) {
+       for (i = 0; i < read_length && isdigit(header[i]); i++) {
                /* length specifier must have at most 9 characters */
                if (i >= 9) {
-                       free(buffer);
+                       free(header);
                        BOOST_THROW_EXCEPTION(std::invalid_argument("Length specifier must not exceed 9 characters"));
                }
 
-               len = len * 10 + (buffer[i] - '0');
+               len = len * 10 + (header[i] - '0');
        }
 
+       free(header);
+
        /* read the whole message */
-       buffer_length = i + 1 + len + 1;
+       size_t data_length = len + 1;
 
-       char *new_buffer = static_cast<char *>(realloc(buffer, buffer_length));
+       char *data = static_cast<char *>(malloc(data_length));
 
-       if (new_buffer == NULL) {
-               free(buffer);
+       if (data == NULL) {
                BOOST_THROW_EXCEPTION(std::bad_alloc());
        }
 
-       buffer = new_buffer;
-
-       peek_length = stream->Peek(buffer, buffer_length);
-
-       if (peek_length < buffer_length)
-               return false;
-
-       /* check for the colon delimiter */
-       if (buffer[i] != ':') {
-               free(buffer);
-               BOOST_THROW_EXCEPTION(std::invalid_argument("Invalid NetString (missing :)"));
-       }
+       size_t rc = stream->Read(data, data_length);
+       
+       if (rc != data_length)
+               BOOST_THROW_EXCEPTION(std::runtime_error("Read() failed."));
 
-       /* check for the comma delimiter after the String */
-       if (buffer[i + 1 + len] != ',') {
-               free(buffer);
+       if (data[len] != ',')
                BOOST_THROW_EXCEPTION(std::invalid_argument("Invalid NetString (missing ,)"));
-       }
-
-       *str = String(&buffer[i + 1], &buffer[i + 1 + len]);
-
-       free(buffer);
+       
+       *str = String(&data[0], &data[len]);
 
-       /* remove the data from the stream */
-       stream->Read(NULL, peek_length);
+       free(data);
 
        return true;
 }
 
 /**
- * Writes data into a stream using the netString format.
+ * Writes data into a stream using the netstring format.
  *
  * @param stream The stream.
  * @param str The String that is to be written.
  */
 void NetString::WriteStringToStream(const Stream::Ptr& stream, const String& str)
 {
-       std::ostringstream prefixbuf;
-       prefixbuf << str.GetLength() << ":";
+       std::ostringstream msgbuf;
+       msgbuf << str.GetLength() << ":" << str << ",";
 
-       String prefix = prefixbuf.str();
-       stream->Write(prefix.CStr(), prefix.GetLength());
-       stream->Write(str.CStr(), str.GetLength());
-       stream->Write(",", 1);
+       String msg = msgbuf.str();
+       stream->Write(msg.CStr(), msg.GetLength());
 }
similarity index 59%
rename from lib/base/connection.cpp
rename to lib/base/networkstream.cpp
index 91cca228e515e387a6c32737758ea66d4fb22f7e..9bc83a5ec452875bf5590f5d942bb8dd87aee1b0 100644 (file)
  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA.             *
  ******************************************************************************/
 
-#include "base/connection.h"
-#include <boost/bind.hpp>
+#include "base/networkstream.h"
+#include "base/objectlock.h"
+#include "base/utility.h"
+#include <boost/algorithm/string/trim.hpp>
 
 using namespace icinga;
 
-Connection::Connection(const Stream::Ptr& stream)
-       : m_Stream(stream)
-{
-       m_Stream->OnDataAvailable.connect(boost::bind(&Connection::ProcessData, this));
-       m_Stream->OnClosed.connect(boost::bind(&Connection::ClosedHandler, this));
-}
+NetworkStream::NetworkStream(const Socket::Ptr& socket)
+       : m_Socket(socket)
+{ }
 
-Stream::Ptr Connection::GetStream(void) const
+void NetworkStream::Close(void)
 {
-       return m_Stream;
+       m_Socket->Close();
 }
 
-void Connection::ClosedHandler(void)
+/**
+ * Reads data from the stream.
+ *
+ * @param buffer The buffer where data should be stored. May be NULL if you're
+ *              not actually interested in the data.
+ * @param count The number of bytes to read from the queue.
+ * @returns The number of bytes actually read.
+ */
+size_t NetworkStream::Read(void *buffer, size_t count)
 {
-       OnClosed(GetSelf());
+       return m_Socket->Read(buffer, count);
 }
 
-void Connection::Close(void)
+/**
+ * Writes data to the stream.
+ *
+ * @param buffer The data that is to be written.
+ * @param count The number of bytes to write.
+ * @returns The number of bytes written
+ */
+void NetworkStream::Write(const void *buffer, size_t count)
 {
-       m_Stream->Close();
+       size_t rc = m_Socket->Write(buffer, count);
+       if (rc < count)
+               BOOST_THROW_EXCEPTION(std::runtime_error("Short write for socket."));
 }
similarity index 75%
rename from lib/base/connection.h
rename to lib/base/networkstream.h
index 4b81d41f600f105c51d3923c747c7a0a4f745acc..cff765b48f484c366358b85d14615871acc8fe26 100644 (file)
  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA.             *
  ******************************************************************************/
 
-#ifndef CONNECTION_H
-#define CONNECTION_H
+#ifndef NETWORKSTREAM_H
+#define NETWORKSTREAM_H
 
 #include "base/i2-base.h"
 #include "base/stream.h"
-#include <boost/signals2.hpp>
+#include "base/socket.h"
 
 namespace icinga
 {
 
-class I2_BASE_API Connection : public Object
+/**
+ * A network stream.
+ *
+ * @ingroup base
+ */
+class I2_BASE_API NetworkStream : public Stream
 {
 public:
-       typedef shared_ptr<Connection> Ptr;
-       typedef weak_ptr<Connection> WeakPtr;
+       typedef shared_ptr<NetworkStream> Ptr;
+       typedef weak_ptr<NetworkStream> WeakPtr;
 
-       explicit Connection(const Stream::Ptr& stream);
+       NetworkStream(const Socket::Ptr& socket);
 
-       Stream::Ptr GetStream(void) const;
+       virtual size_t Read(void *buffer, size_t count);
+       virtual void Write(const void *buffer, size_t count);
 
-       void Close(void);
-
-       boost::signals2::signal<void (const Connection::Ptr&)> OnClosed;
-
-protected:
-       virtual void ProcessData(void) = 0;
+       virtual void Close(void);
 
 private:
-       Stream::Ptr m_Stream;
-
-       void ClosedHandler(void);
+       Socket::Ptr m_Socket;
 };
 
 }
 
-#endif /* CONNECTION_H */
+#endif /* NETWORKSTREAM_H */
index 249bcdcc227b7d8c3b1b08483aea00e1a1302167..2672af43d15e425b706b2d4fff2b1e50c0afbde4 100644 (file)
@@ -34,40 +34,24 @@ using namespace icinga;
  * Constructor for the Socket class.
  */
 Socket::Socket(void)
-       : m_FD(INVALID_SOCKET), m_Connected(false), m_Listening(false),
-        m_SendQueue(boost::make_shared<FIFO>()), m_RecvQueue(boost::make_shared<FIFO>())
-{
-       m_SendQueue->Start();
-       m_RecvQueue->Start();
-}
+       : m_FD(INVALID_SOCKET)
+{ }
 
 /**
- * Destructor for the Socket class.
+ * Constructor for the Socket class.
  */
-Socket::~Socket(void)
+Socket::Socket(SOCKET fd)
+       : m_FD(INVALID_SOCKET)
 {
-       m_SendQueue->Close();
-       m_RecvQueue->Close();
-
-       CloseInternal();
+       SetFD(fd);
 }
 
 /**
- * Starts I/O processing for this socket.
+ * Destructor for the Socket class.
  */
-void Socket::Start(void)
+Socket::~Socket(void)
 {
-       ASSERT(!m_ReadThread.joinable() && !m_WriteThread.joinable());
-       ASSERT(GetFD() != INVALID_SOCKET);
-
-       // TODO: figure out why we're not using "this" here (hint: to keep the object alive until the threads are done)
-       m_ReadThread = boost::thread(boost::bind(&Socket::ReadThreadProc, static_cast<Socket::Ptr>(GetSelf())));
-       m_ReadThread.detach();
-
-       m_WriteThread = boost::thread(boost::bind(&Socket::WriteThreadProc, static_cast<Socket::Ptr>(GetSelf())));
-       m_WriteThread.detach();
-
-       Stream::Start();
+       Close();
 }
 
 /**
@@ -77,16 +61,14 @@ void Socket::Start(void)
  */
 void Socket::SetFD(SOCKET fd)
 {
-       ObjectLock olock(this);
-
-       /* mark the socket as non-blocking and close-on-exec */
        if (fd != INVALID_SOCKET) {
-               Utility::SetNonBlockingSocket(fd);
+               /* mark the socket as close-on-exec */
 #ifndef _WIN32
                Utility::SetCloExec(fd);
 #endif /* _WIN32 */
        }
 
+       ObjectLock olock(this);
        m_FD = fd;
 }
 
@@ -105,29 +87,16 @@ SOCKET Socket::GetFD(void) const
 /**
  * Closes the socket.
  */
-void Socket::CloseInternal(void)
+void Socket::Close(void)
 {
-       {
-               ObjectLock olock(this);
+       ObjectLock olock(this);
 
-               if (m_FD != INVALID_SOCKET) {
-                       closesocket(m_FD);
-                       m_FD = INVALID_SOCKET;
-               }
+       if (m_FD != INVALID_SOCKET) {
+               closesocket(m_FD);
+               m_FD = INVALID_SOCKET;
        }
-
-       Stream::Close();
-}
-
-/**
- * Shuts down the socket.
- */
-void Socket::Close(void)
-{
-       SetWriteEOF(true);
 }
 
-
 /**
  * Retrieves the last error that occured for the socket.
  *
@@ -146,24 +115,6 @@ int Socket::GetError(void) const
        return 0;
 }
 
-/**
- * Processes errors that have occured for the socket.
- */
-void Socket::HandleException(void)
-{
-       ObjectLock olock(this);
-
-#ifndef _WIN32
-       BOOST_THROW_EXCEPTION(socket_error()
-           << boost::errinfo_api_function("select")
-           << boost::errinfo_errno(GetError()));
-#else /* _WIN32 */
-       BOOST_THROW_EXCEPTION(socket_error()
-           << boost::errinfo_api_function("select")
-           << errinfo_win32_error(GetError()));
-#endif /* _WIN32 */
-}
-
 /**
  * Formats a sockaddr in a human-readable way.
  *
@@ -246,235 +197,6 @@ String Socket::GetPeerAddress(void)
        return GetAddressFromSockaddr((sockaddr *)&sin, len);
 }
 
-/**
- * Read thread procedure for sockets. This function waits until the
- * socket is readable and processes inbound data.
- */
-void Socket::ReadThreadProc(void)
-{
-       boost::mutex::scoped_lock lock(m_SocketMutex);
-
-       for (;;) {
-               fd_set readfds, exceptfds;
-
-               FD_ZERO(&readfds);
-               FD_ZERO(&exceptfds);
-
-               int fd = GetFD();
-
-               if (fd == INVALID_SOCKET)
-                       return;
-
-               if (WantsToRead())
-                       FD_SET(fd, &readfds);
-
-               FD_SET(fd, &exceptfds);
-
-               lock.unlock();
-
-               timeval tv;
-               tv.tv_sec = 5;
-               tv.tv_usec = 0;
-               int rc = select(fd + 1, &readfds, NULL, &exceptfds, &tv);
-
-               lock.lock();
-
-               if (GetFD() == INVALID_SOCKET)
-                       return;
-
-               try {
-                       if (rc < 0) {
-#ifndef _WIN32
-                               BOOST_THROW_EXCEPTION(socket_error()
-                                   << boost::errinfo_api_function("select")
-                                   << boost::errinfo_errno(errno));
-#else /* _WIN32 */
-                               BOOST_THROW_EXCEPTION(socket_error()
-                                   << boost::errinfo_api_function("select")
-                                   << errinfo_win32_error(WSAGetLastError()));
-#endif /* _WIN32 */
-                       }
-
-                       if (FD_ISSET(fd, &readfds))
-                               HandleReadable();
-
-                       if (FD_ISSET(fd, &exceptfds))
-                               HandleException();
-               } catch (...) {
-                       SetException(boost::current_exception());
-
-                       CloseInternal();
-
-                       break;
-               }
-
-               if (WantsToWrite())
-                       m_WriteCV.notify_all(); /* notify Write thread */
-       }
-}
-
-/**
- * Write thread procedure for sockets. This function waits until the socket
- * is writable and processes outbound data.
- */
-void Socket::WriteThreadProc(void)
-{
-       boost::mutex::scoped_lock lock(m_SocketMutex);
-
-       for (;;) {
-               fd_set writefds;
-
-               FD_ZERO(&writefds);
-
-               while (!WantsToWrite()) {
-                       m_WriteCV.timed_wait(lock, boost::posix_time::seconds(1));
-
-                       if (GetFD() == INVALID_SOCKET)
-                               return;
-               }
-
-               int fd = GetFD();
-
-               if (fd == INVALID_SOCKET)
-                       return;
-
-               FD_SET(fd, &writefds);
-
-               lock.unlock();
-
-               int rc = select(fd + 1, NULL, &writefds, NULL, NULL);
-
-               lock.lock();
-
-               if (GetFD() == INVALID_SOCKET)
-                       return;
-
-               try {
-                       if (rc < 0) {
-#ifndef _WIN32
-                               BOOST_THROW_EXCEPTION(socket_error()
-                                   << boost::errinfo_api_function("select")
-                                   << boost::errinfo_errno(errno));
-#else /* _WIN32 */
-                               BOOST_THROW_EXCEPTION(socket_error()
-                                   << boost::errinfo_api_function("select")
-                                   << errinfo_win32_error(WSAGetLastError()));
-#endif /* _WIN32 */
-                       }
-
-                       if (FD_ISSET(fd, &writefds))
-                               HandleWritable();
-               } catch (...) {
-                       SetException(boost::current_exception());
-
-                       CloseInternal();
-
-                       break;
-               }
-       }
-}
-
-/**
- * Sets whether the socket is fully connected.
- *
- * @param connected Whether the socket is connected
- */
-void Socket::SetConnected(bool connected)
-{
-       ObjectLock olock(this);
-
-       m_Connected = connected;
-}
-
-/**
- * Checks whether the socket is fully connected.
- *
- * @returns true if the socket is connected, false otherwise
- */
-bool Socket::IsConnected(void) const
-{
-       ObjectLock olock(this);
-
-       return m_Connected;
-}
-
-/**
- * Returns how much data is available for reading.
- *
- * @returns The number of bytes available.
- */
-size_t Socket::GetAvailableBytes(void) const
-{
-       ObjectLock olock(this);
-
-       if (m_Listening)
-               throw new std::logic_error("Socket does not support GetAvailableBytes().");
-
-       return m_RecvQueue->GetAvailableBytes();
-}
-
-/**
- * Reads data from the socket.
- *
- * @param buffer The buffer where the data should be stored.
- * @param size The size of the buffer.
- * @returns The number of bytes read.
- */
-size_t Socket::Read(void *buffer, size_t size)
-{
-       {
-               ObjectLock olock(this);
-
-               if (m_Listening)
-                       throw new std::logic_error("Socket does not support Read().");
-       }
-
-       if (m_RecvQueue->GetAvailableBytes() == 0)
-               CheckException();
-
-       return m_RecvQueue->Read(buffer, size);
-}
-
-/**
- * Peeks at data for the socket.
- *
- * @param buffer The buffer where the data should be stored.
- * @param size The size of the buffer.
- * @returns The number of bytes read.
- */
-size_t Socket::Peek(void *buffer, size_t size)
-{
-       {
-               ObjectLock olock(this);
-
-               if (m_Listening)
-                       throw new std::logic_error("Socket does not support Peek().");
-       }
-
-       if (m_RecvQueue->GetAvailableBytes() == 0)
-               CheckException();
-
-       return m_RecvQueue->Peek(buffer, size);
-}
-
-/**
- * Writes data to the socket.
- *
- * @param buffer The buffer that should be sent.
- * @param size The size of the buffer.
- */
-void Socket::Write(const void *buffer, size_t size)
-{
-       {
-               ObjectLock olock(this);
-
-               if (m_Listening)
-                       throw new std::logic_error("Socket does not support Write().");
-       }
-
-       m_SendQueue->Write(buffer, size);
-}
-
 /**
  * Starts listening for incoming client connections.
  */
@@ -491,134 +213,56 @@ void Socket::Listen(void)
                    << errinfo_win32_error(WSAGetLastError()));
 #endif /* _WIN32 */
        }
-
-       {
-               ObjectLock olock(this);
-               m_Listening = true;
-       }
-}
-
-void Socket::HandleWritable(void)
-{
-       if (m_Listening)
-               HandleWritableServer();
-       else
-               HandleWritableClient();
-}
-
-void Socket::HandleReadable(void)
-{
-       if (m_Listening)
-               HandleReadableServer();
-       else
-               HandleReadableClient();
 }
 
 /**
  * Processes data that is available for this socket.
  */
-void Socket::HandleWritableClient(void)
+size_t Socket::Write(const void *buffer, size_t count)
 {
-       int rc;
-       char data[1024];
-       size_t count;
-
-       if (!IsConnected())
-               SetConnected(true);
-
-       for (;;) {
-               count = m_SendQueue->Peek(data, sizeof(data));
-
-               if (count == 0) {
-                       if (IsWriteEOF())
-                               CloseInternal();
+       int rc = send(GetFD(), buffer, count, 0);
 
-                       break;
-               }
-
-               rc = send(GetFD(), data, count, 0);
-
-#ifdef _WIN32
-               if (rc < 0 && WSAGetLastError() == WSAEWOULDBLOCK)
-#else /* _WIN32 */
-               if (rc < 0 && errno == EAGAIN)
-#endif /* _WIN32 */
-                       break;
-
-               if (rc <= 0) {
+       if (rc < 0) {
 #ifndef _WIN32
-                       BOOST_THROW_EXCEPTION(socket_error()
-                           << boost::errinfo_api_function("send")
-                           << boost::errinfo_errno(errno));
+               BOOST_THROW_EXCEPTION(socket_error()
+                   << boost::errinfo_api_function("send")
+                   << boost::errinfo_errno(errno));
 #else /* _WIN32 */
-                       BOOST_THROW_EXCEPTION(socket_error()
-                           << boost::errinfo_api_function("send")
-                           << errinfo_win32_error(WSAGetLastError()));
+               BOOST_THROW_EXCEPTION(socket_error()
+                   << boost::errinfo_api_function("send")
+                   << errinfo_win32_error(WSAGetLastError()));
 #endif /* _WIN32 */
-               }
-
-               m_SendQueue->Read(NULL, rc);
        }
+
+       return rc;
 }
 
 /**
  * Processes data that can be written for this socket.
  */
-void Socket::HandleReadableClient(void)
+size_t Socket::Read(void *buffer, size_t count)
 {
-       if (!IsConnected())
-               SetConnected(true);
-
-       bool new_data = false;
-
-       for (;;) {
-               char data[1024];
-               int rc = recv(GetFD(), data, sizeof(data), 0);
+       int rc = recv(GetFD(), buffer, count, 0);
 
-#ifdef _WIN32
-               if (rc < 0 && WSAGetLastError() == WSAEWOULDBLOCK)
-#else /* _WIN32 */
-               if (rc < 0 && errno == EAGAIN)
-#endif /* _WIN32 */
-                       break;
-
-               if (rc < 0) {
+       if (rc < 0) {
 #ifndef _WIN32
-                       BOOST_THROW_EXCEPTION(socket_error()
-                           << boost::errinfo_api_function("recv")
-                           << boost::errinfo_errno(errno));
+               BOOST_THROW_EXCEPTION(socket_error()
+                   << boost::errinfo_api_function("recv")
+                   << boost::errinfo_errno(errno));
 #else /* _WIN32 */
-                       BOOST_THROW_EXCEPTION(socket_error()
-                           << boost::errinfo_api_function("recv")
-                           << errinfo_win32_error(WSAGetLastError()));
+               BOOST_THROW_EXCEPTION(socket_error()
+                   << boost::errinfo_api_function("recv")
+                   << errinfo_win32_error(WSAGetLastError()));
 #endif /* _WIN32 */
-               }
-
-               new_data = true;
-
-               if (rc == 0) {
-                       SetReadEOF(true);
-
-                       break;
-               }
-
-               m_RecvQueue->Write(data, rc);
        }
 
-       if (new_data)
-               OnDataAvailable(GetSelf());
-}
-
-void Socket::HandleWritableServer(void)
-{
-       throw std::logic_error("This should never happen.");
+       return rc;
 }
 
 /**
- * Accepts a new client and creates a new client object for it
- * using the client factory function.
+ * Accepts a new client and creates a new client object for it.
  */
-void Socket::HandleReadableServer(void)
+Socket::Ptr Socket::Accept(void)
 {
        int fd;
        sockaddr_storage addr;
@@ -627,87 +271,16 @@ void Socket::HandleReadableServer(void)
        fd = accept(GetFD(), (sockaddr *)&addr, &addrlen);
 
        if (fd < 0) {
-       #ifndef _WIN32
+#ifndef _WIN32
                BOOST_THROW_EXCEPTION(socket_error()
                    << boost::errinfo_api_function("accept")
                    << boost::errinfo_errno(errno));
-       #else /* _WIN32 */
+#else /* _WIN32 */
                BOOST_THROW_EXCEPTION(socket_error()
                    << boost::errinfo_api_function("accept")
                    << errinfo_win32_error(WSAGetLastError()));
-       #endif /* _WIN32 */
+#endif /* _WIN32 */
        }
 
-       Socket::Ptr client = boost::make_shared<Socket>();
-       client->SetFD(fd);
-       OnNewClient(GetSelf(), client);
-}
-
-/**
- * Checks whether data should be written for this socket object.
- *
- * @returns true if the socket should be registered for writing, false otherwise.
- */
-bool Socket::WantsToWrite(void) const
-{
-       if (m_Listening)
-               return WantsToWriteServer();
-       else
-               return WantsToWriteClient();
-}
-
-/**
- * Checks whether data should be read for this socket object.
- *
- * @returns true if the socket should be registered for reading, false otherwise.
- */
-bool Socket::WantsToRead(void) const
-{
-       if (m_Listening)
-               return WantsToReadServer();
-       else
-               return WantsToReadClient();
-}
-
-/**
- * Checks whether data should be read for this socket.
- *
- * @returns true
- */
-bool Socket::WantsToReadClient(void) const
-{
-       return !IsReadEOF();
-}
-
-/**
- * Checks whether data should be written for this socket.
- *
- * @returns true if data should be written, false otherwise.
- */
-bool Socket::WantsToWriteClient(void) const
-{
-       if (m_SendQueue->GetAvailableBytes() > 0)
-               return true;
-
-       return (!IsConnected());
-}
-
-/**
- * Checks whether the TCP server wants to write.
- *
- * @returns false
- */
-bool Socket::WantsToWriteServer(void) const
-{
-       return false;
-}
-
-/**
- * Checks whether the TCP server wants to read (i.e. accept new clients).
- *
- * @returns true
- */
-bool Socket::WantsToReadServer(void) const
-{
-       return true;
+       return boost::make_shared<Socket>(fd);
 }
index 1a8368242ab5d72526d8b07e4e3a2ce7aae86bea..94d6e9b26bc7cf0ee8cc767da255bd17e77aa685 100644 (file)
@@ -21,7 +21,7 @@
 #define SOCKET_H
 
 #include "base/i2-base.h"
-#include "base/fifo.h"
+#include "base/stream.h"
 #include <boost/thread/thread.hpp>
 #include <boost/thread/mutex.hpp>
 #include <boost/thread/condition_variable.hpp>
@@ -34,32 +34,26 @@ namespace icinga {
  *
  * @ingroup base
  */
-class I2_BASE_API Socket : public Stream
+class I2_BASE_API Socket : public Object
 {
 public:
        typedef shared_ptr<Socket> Ptr;
        typedef weak_ptr<Socket> WeakPtr;
 
        Socket(void);
+       Socket(SOCKET fd);
        ~Socket(void);
 
-       virtual void Start(void);
-
-       virtual void Close(void);
+       void Close(void);
 
        String GetClientAddress(void);
        String GetPeerAddress(void);
 
-       bool IsConnected(void) const;
-
-       virtual size_t GetAvailableBytes(void) const;
-       virtual size_t Read(void *buffer, size_t size);
-       virtual size_t Peek(void *buffer, size_t size);
-       virtual void Write(const void *buffer, size_t size);
+       size_t Read(void *buffer, size_t size);
+       size_t Write(const void *buffer, size_t size);
 
        void Listen(void);
-
-       boost::signals2::signal<void (const Socket::Ptr&, const Socket::Ptr&)> OnNewClient;
+       Socket::Ptr Accept(void);
 
 protected:
        void SetFD(SOCKET fd);
@@ -73,44 +67,8 @@ protected:
 
 private:
        SOCKET m_FD; /**< The socket descriptor. */
-       bool m_Connected;
-       bool m_Listening;
-
-       boost::thread m_ReadThread;
-       boost::thread m_WriteThread;
-
-       boost::condition_variable m_WriteCV;
-
-       void ReadThreadProc(void);
-       void WriteThreadProc(void);
-
-       void ExceptionEventHandler(void);
 
        static String GetAddressFromSockaddr(sockaddr *address, socklen_t len);
-
-       void CloseInternal(void);
-
-       FIFO::Ptr m_SendQueue;
-       FIFO::Ptr m_RecvQueue;
-
-       void HandleWritableClient(void);
-       void HandleReadableClient(void);
-
-       void HandleWritableServer(void);
-       void HandleReadableServer(void);
-
-       void HandleReadable(void);
-       void HandleWritable(void);
-       void HandleException(void);
-
-       bool WantsToWriteClient(void) const;
-       bool WantsToReadClient(void) const;
-
-       bool WantsToWriteServer(void) const;
-       bool WantsToReadServer(void) const;
-
-       bool WantsToWrite(void) const;
-       bool WantsToRead(void) const;
 };
 
 class socket_error : virtual public std::exception, virtual public boost::exception { };
index 477208a6564838a1c8a0fc7ca2d1cec5a6d19d3b..959e377dcccf9458b227b2ade55ecb273ae4f4dc 100644 (file)
@@ -31,66 +31,15 @@ using namespace icinga;
  *                                      the stream's destructor deletes the inner stream.
  */
 StdioStream::StdioStream(std::iostream *innerStream, bool ownsStream)
-       : m_InnerStream(innerStream), m_OwnsStream(ownsStream),
-         m_ReadAheadBuffer(boost::make_shared<FIFO>())
-{
-       m_ReadAheadBuffer->Start();
-}
-
-/**
- * Destructor for the StdioStream class.
- */
-StdioStream::~StdioStream(void)
-{
-       m_ReadAheadBuffer->Close();
-}
-
-void StdioStream::Start(void)
-{
-       SetConnected(true);
-
-       Stream::Start();
-}
-
-size_t StdioStream::GetAvailableBytes(void) const
-{
-       ObjectLock olock(this);
-
-       if (m_InnerStream->eof() && m_ReadAheadBuffer->GetAvailableBytes() == 0)
-               return 0;
-       else
-               return 1024; /* doesn't have to be accurate */
-}
+       : m_InnerStream(innerStream), m_OwnsStream(ownsStream)
+{ }
 
 size_t StdioStream::Read(void *buffer, size_t size)
 {
        ObjectLock olock(this);
 
-       size_t peek_len, read_len;
-
-       peek_len = m_ReadAheadBuffer->GetAvailableBytes();
-       peek_len = m_ReadAheadBuffer->Read(buffer, peek_len);
-
-       m_InnerStream->read(static_cast<char *>(buffer) + peek_len, size - peek_len);
-       read_len = m_InnerStream->gcount();
-
-       return peek_len + read_len;
-}
-
-size_t StdioStream::Peek(void *buffer, size_t size)
-{
-       ObjectLock olock(this);
-
-       size_t peek_len, read_len;
-
-       peek_len = m_ReadAheadBuffer->GetAvailableBytes();
-       peek_len = m_ReadAheadBuffer->Peek(buffer, peek_len);
-
-       m_InnerStream->read(static_cast<char *>(buffer) + peek_len, size - peek_len);
-       read_len = m_InnerStream->gcount();
-
-       m_ReadAheadBuffer->Write(static_cast<char *>(buffer) + peek_len, read_len);
-       return peek_len + read_len;
+       m_InnerStream->read(static_cast<char *>(buffer), size);
+       return m_InnerStream->gcount();
 }
 
 void StdioStream::Write(const void *buffer, size_t size)
@@ -104,6 +53,4 @@ void StdioStream::Close(void)
 {
        if (m_OwnsStream)
                delete m_InnerStream;
-
-       Stream::Close();
 }
index e6c09ead6bb524cc1d23b5b2f19fcf12f572a99f..fdd544f73c68a53ec21e2025329e97f6f5c63c93 100644 (file)
@@ -33,13 +33,8 @@ public:
        typedef weak_ptr<StdioStream> WeakPtr;
 
        StdioStream(std::iostream *innerStream, bool ownsStream);
-       ~StdioStream(void);
 
-       virtual void Start(void);
-
-       virtual size_t GetAvailableBytes(void) const;
        virtual size_t Read(void *buffer, size_t size);
-       virtual size_t Peek(void *buffer, size_t size);
        virtual void Write(const void *buffer, size_t size);
 
        virtual void Close(void);
@@ -47,7 +42,6 @@ public:
 private:
        std::iostream *m_InnerStream;
        bool m_OwnsStream;
-       FIFO::Ptr m_ReadAheadBuffer;
 };
 
 }
index c086fcef87a2e86a1938836ba0a7c533a3de5379..1405194c40e5765390b9b783c34d0fbe6254e9a1 100644 (file)
 
 using namespace icinga;
 
-Stream::Stream(void)
-       : m_Connected(false), m_ReadEOF(false), m_WriteEOF(false)
-{ }
-
-Stream::~Stream(void)
-{
-       ASSERT(!m_Running);
-}
-
-bool Stream::IsConnected(void) const
-{
-       ObjectLock olock(this);
-
-       return m_Connected;
-}
-
-bool Stream::IsReadEOF(void) const
-{
-       ObjectLock olock(this);
-
-       return m_ReadEOF;
-}
-
-bool Stream::IsWriteEOF(void) const
-{
-       ObjectLock olock(this);
-
-       return m_WriteEOF;
-}
-
-void Stream::SetConnected(bool connected)
-{
-       bool changed;
-
-       {
-               ObjectLock olock(this);
-               changed = (m_Connected != connected);
-               m_Connected = connected;
-       }
-
-       if (changed) {
-               if (connected)
-                       OnConnected(GetSelf());
-               else
-                       OnClosed(GetSelf());
-       }
-}
-
-void Stream::SetReadEOF(bool eof)
-{
-       ObjectLock olock(this);
-
-       m_ReadEOF = eof;
-}
-
-void Stream::SetWriteEOF(bool eof)
-{
-       ObjectLock olock(this);
-
-       m_WriteEOF = eof;
-}
-
-/**
- * Checks whether an exception is available for this stream and re-throws
- * the exception if there is one.
- */
-void Stream::CheckException(void)
-{
-       ObjectLock olock(this);
-
-       if (m_Exception)
-               rethrow_exception(m_Exception);
-}
-
-void Stream::SetException(boost::exception_ptr exception)
-{
-       ObjectLock olock(this);
-
-       m_Exception = exception;
-}
-
-boost::exception_ptr Stream::GetException(void)
-{
-       return m_Exception;
-}
-
-void Stream::Start(void)
-{
-       ObjectLock olock(this);
-
-       m_Running = true;
-}
-
-void Stream::Close(void)
-{
-       {
-               ObjectLock olock(this);
-
-               m_Running = false;
-       }
-
-       SetConnected(false);
-}
-
 bool Stream::ReadLine(String *line, size_t maxLength)
 {
+       BOOST_THROW_EXCEPTION(std::runtime_error("Not implemented."));
+       /*
        char *buffer = new char[maxLength];
 
        size_t rc = Peek(buffer, maxLength);
@@ -161,7 +59,7 @@ bool Stream::ReadLine(String *line, size_t maxLength)
                return true;
        }
 
-       delete buffer;
+       delete buffer;*/
 
        return false;
 }
index a3e1674c215b80255c79794ca4c74dc1f7bf4bac..510cbb0cd99c583c45e3c478e55be601882e26d1 100644 (file)
@@ -40,28 +40,6 @@ public:
        typedef shared_ptr<Stream> Ptr;
        typedef weak_ptr<Stream> WeakPtr;
 
-       Stream(void);
-       ~Stream(void);
-
-       virtual void Start(void);
-
-       /**
-        * Retrieves the number of bytes available for reading.
-        *
-        * @returns The number of available bytes.
-        */
-       virtual size_t GetAvailableBytes(void) const = 0;
-
-       /**
-        * Reads data from the stream without advancing the read pointer.
-        *
-        * @param buffer The buffer where data should be stored. May be NULL if
-        *               you're not actually interested in the data.
-        * @param count The number of bytes to read from the queue.
-        * @returns The number of bytes actually read.
-        */
-       virtual size_t Peek(void *buffer, size_t count) = 0;
-
        /**
         * Reads data from the stream.
         *
@@ -84,34 +62,9 @@ public:
        /**
         * Closes the stream and releases resources.
         */
-       virtual void Close(void);
-
-       bool IsConnected(void) const;
-       bool IsReadEOF(void) const;
-       bool IsWriteEOF(void) const;
+       virtual void Close(void) = 0;
 
        bool ReadLine(String *line, size_t maxLength = 4096);
-
-       boost::exception_ptr GetException(void);
-       void CheckException(void);
-
-       boost::signals2::signal<void (const Stream::Ptr&)> OnConnected;
-       boost::signals2::signal<void (const Stream::Ptr&)> OnDataAvailable;
-       boost::signals2::signal<void (const Stream::Ptr&)> OnClosed;
-
-protected:
-       void SetConnected(bool connected);
-       void SetReadEOF(bool eof);
-       void SetWriteEOF(bool eof);
-
-       void SetException(boost::exception_ptr exception);
-
-private:
-       bool m_Running;
-       bool m_Connected;
-       bool m_ReadEOF;
-       bool m_WriteEOF;
-       boost::exception_ptr m_Exception;
 };
 
 }
index 814838b244cef9829aef2ddd465af2417a273853..324d0735e0522c9cf11c516065cc8996223d864c 100644 (file)
@@ -123,7 +123,6 @@ static int I2Stream_write(BIO *bi, const char *in, int inl)
 {
        I2Stream_bio_t *bp = (I2Stream_bio_t *)bi->ptr;
        bp->StreamObj->Write(in, inl);
-
        return inl;
 }
 
index b656d268f2452871d7f4c8d0d3860ad8571aefa1..63e2cefb796826ed55afc8411d13eb199d6d2c11 100644 (file)
@@ -31,7 +31,7 @@ using namespace icinga;
  * @param service The service.
  * @param family The address family for the socket.
  */
-void TcpSocket::Bind(String service, int family)
+void TcpSocket::Bind(const String& service, int family)
 {
        Bind(String(), service, family);
 }
@@ -43,7 +43,7 @@ void TcpSocket::Bind(String service, int family)
  * @param service The service.
  * @param family The address family for the socket.
  */
-void TcpSocket::Bind(String node, String service, int family)
+void TcpSocket::Bind(const String& node, const String& service, int family)
 {
        addrinfo hints;
        addrinfo *result;
@@ -75,8 +75,6 @@ void TcpSocket::Bind(String node, String service, int family)
                if (fd == INVALID_SOCKET)
                        continue;
 
-               SetFD(fd);
-
                const int optFalse = 0;
                setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, reinterpret_cast<const char *>(&optFalse), sizeof(optFalse));
 
@@ -87,23 +85,20 @@ void TcpSocket::Bind(String node, String service, int family)
 
                int rc = bind(fd, info->ai_addr, info->ai_addrlen);
 
-#ifdef _WIN32
-               if (rc < 0 && WSAGetLastError() != WSAEWOULDBLOCK) {
-#else /* _WIN32 */
-               if (rc < 0 && errno != EINPROGRESS) {
-#endif /* _WIN32 */
+               if (rc < 0) {
                        closesocket(fd);
-                       SetFD(INVALID_SOCKET);
 
                        continue;
                }
 
+               SetFD(fd);
+
                break;
        }
 
        freeaddrinfo(result);
 
-       if (fd == INVALID_SOCKET)
+       if (GetFD() == INVALID_SOCKET)
                BOOST_THROW_EXCEPTION(std::runtime_error("Could not create a suitable socket."));
 }
 
@@ -145,31 +140,21 @@ void TcpSocket::Connect(const String& node, const String& service)
                if (fd == INVALID_SOCKET)
                        continue;
 
-               SetFD(fd);
-
                rc = connect(fd, info->ai_addr, info->ai_addrlen);
 
-#ifdef _WIN32
-               if (rc < 0 && WSAGetLastError() != WSAEWOULDBLOCK) {
-#else /* _WIN32 */
-               if (rc < 0 && errno != EINPROGRESS) {
-#endif /* _WIN32 */
+               if (rc < 0) {
                        closesocket(fd);
-                       SetFD(INVALID_SOCKET);
 
                        continue;
                }
 
-               if (rc >= 0) {
-                       SetConnected(true);
-                       OnConnected(GetSelf());
-               }
+               SetFD(fd);
 
                break;
        }
 
        freeaddrinfo(result);
 
-       if (fd == INVALID_SOCKET)
-               BOOST_THROW_EXCEPTION(std::runtime_error("Could not create a suitable socket."));
+       if (GetFD() == INVALID_SOCKET)
+               BOOST_THROW_EXCEPTION(std::runtime_error("Could not connect to remote host."));
 }
index 48a4c0d0e36b62a390b2bb6a38118770f65f81bc..eee873016844a6f9bba252e5e0df4a85dd65e2c7 100644 (file)
@@ -37,8 +37,8 @@ public:
        typedef shared_ptr<TcpSocket> Ptr;
        typedef weak_ptr<TcpSocket> WeakPtr;
 
-       void Bind(String service, int family);
-       void Bind(String node, String service, int family);
+       void Bind(const String& service, int family);
+       void Bind(const String& node, const String& service, int family);
 
        void Connect(const String& node, const String& service);
 };
index fa2577c18badacfdf1a2b4f164cad4e3080d7d59..05a9866f0a3b2d47e66f3fb6a159f9f47d6ce5f9 100644 (file)
@@ -37,51 +37,39 @@ bool I2_EXPORT TlsStream::m_SSLIndexInitialized = false;
  * @param sslContext The SSL context for the client.
  */
 TlsStream::TlsStream(const Stream::Ptr& innerStream, TlsRole role, shared_ptr<SSL_CTX> sslContext)
-       : m_SSLContext(sslContext), m_SendQueue(boost::make_shared<FIFO>()), m_RecvQueue(boost::make_shared<FIFO>()),
-         m_InnerStream(innerStream), m_Role(role)
+       : m_SSLContext(sslContext), m_Role(role)
 {
-       m_InnerStream->OnDataAvailable.connect(boost::bind(&TlsStream::DataAvailableHandler, this));
-       m_InnerStream->OnClosed.connect(boost::bind(&TlsStream::ClosedHandler, this));
-       m_SendQueue->Start();
-       m_RecvQueue->Start();
-}
-
-void TlsStream::Start(void)
-{
-       {
-               boost::mutex::scoped_lock lock(m_SSLMutex);
-
-               m_SSL = shared_ptr<SSL>(SSL_new(m_SSLContext.get()), SSL_free);
-
-               m_SSLContext.reset();
+       m_InnerStream = dynamic_pointer_cast<BufferedStream>(innerStream);
+       
+       if (!m_InnerStream)
+               m_InnerStream = boost::make_shared<BufferedStream>(innerStream);
+       
+       m_SSL = shared_ptr<SSL>(SSL_new(m_SSLContext.get()), SSL_free);
 
-               if (!m_SSL) {
-                       BOOST_THROW_EXCEPTION(openssl_error()
-                               << boost::errinfo_api_function("SSL_new")
-                               << errinfo_openssl_error(ERR_get_error()));
-               }
+       m_SSLContext.reset();
 
-               if (!m_SSLIndexInitialized) {
-                       m_SSLIndex = SSL_get_ex_new_index(0, const_cast<char *>("TlsStream"), NULL, NULL, NULL);
-                       m_SSLIndexInitialized = true;
-               }
+       if (!m_SSL) {
+               BOOST_THROW_EXCEPTION(openssl_error()
+                       << boost::errinfo_api_function("SSL_new")
+                       << errinfo_openssl_error(ERR_get_error()));
+       }
 
-               SSL_set_ex_data(m_SSL.get(), m_SSLIndex, this);
+       if (!m_SSLIndexInitialized) {
+               m_SSLIndex = SSL_get_ex_new_index(0, const_cast<char *>("TlsStream"), NULL, NULL, NULL);
+               m_SSLIndexInitialized = true;
+       }
 
-               SSL_set_verify(m_SSL.get(), SSL_VERIFY_PEER | SSL_VERIFY_FAIL_IF_NO_PEER_CERT, NULL);
+       SSL_set_ex_data(m_SSL.get(), m_SSLIndex, this);
 
-               m_BIO = BIO_new_I2Stream(m_InnerStream);
-               SSL_set_bio(m_SSL.get(), m_BIO, m_BIO);
+       SSL_set_verify(m_SSL.get(), SSL_VERIFY_PEER | SSL_VERIFY_FAIL_IF_NO_PEER_CERT, NULL);
 
-               if (m_Role == TlsRoleServer)
-                       SSL_set_accept_state(m_SSL.get());
-               else
-                       SSL_set_connect_state(m_SSL.get());
-       }
+       m_BIO = BIO_new_I2Stream(m_InnerStream);
+       SSL_set_bio(m_SSL.get(), m_BIO, m_BIO);
 
-       Stream::Start();
-
-       HandleIO();
+       if (m_Role == TlsRoleServer)
+               SSL_set_accept_state(m_SSL.get());
+       else
+               SSL_set_connect_state(m_SSL.get());
 }
 
 /**
@@ -91,8 +79,6 @@ void TlsStream::Start(void)
  */
 shared_ptr<X509> TlsStream::GetClientCertificate(void) const
 {
-       boost::mutex::scoped_lock lock(m_SSLMutex);
-
        return shared_ptr<X509>(SSL_get_certificate(m_SSL.get()), &Utility::NullDeleter);
 }
 
@@ -103,90 +89,70 @@ shared_ptr<X509> TlsStream::GetClientCertificate(void) const
  */
 shared_ptr<X509> TlsStream::GetPeerCertificate(void) const
 {
-       boost::mutex::scoped_lock lock(m_SSLMutex);
-
        return shared_ptr<X509>(SSL_get_peer_certificate(m_SSL.get()), X509_free);
 }
 
-void TlsStream::DataAvailableHandler(void)
+void TlsStream::Handshake(void)
 {
-       try {
-               HandleIO();
-       } catch (...) {
-               SetException(boost::current_exception());
+       ASSERT(!OwnsLock());
 
-               Close();
-       }
-}
+       int rc;
 
-void TlsStream::ClosedHandler(void)
-{
        ObjectLock olock(this);
 
-       SetException(m_InnerStream->GetException());
-       Close();
+       while ((rc = SSL_do_handshake(m_SSL.get())) <= 0) {
+               switch (SSL_get_error(m_SSL.get(), rc)) {
+                       case SSL_ERROR_WANT_READ:
+                               olock.Unlock();
+                               m_InnerStream->WaitReadable(1);
+                               olock.Lock();
+                               continue;
+                       case SSL_ERROR_WANT_WRITE:
+                               olock.Unlock();
+                               m_InnerStream->WaitWritable(1);
+                               olock.Lock();
+                               continue;
+                       case SSL_ERROR_ZERO_RETURN:
+                               Close();
+                               return;
+                       default:
+                               I2Stream_check_exception(m_BIO);
+                               BOOST_THROW_EXCEPTION(openssl_error()
+                                   << boost::errinfo_api_function("SSL_read")
+                                   << errinfo_openssl_error(ERR_get_error()));
+               }
+       }
 }
 
 /**
  * Processes data for the stream.
  */
-void TlsStream::HandleIO(void)
+size_t TlsStream::Read(void *buffer, size_t count)
 {
        ASSERT(!OwnsLock());
 
-       char data[16 * 1024];
-       int rc;
-
-       if (!IsConnected()) {
-               boost::mutex::scoped_lock lock(m_SSLMutex);
+       size_t left = count;
 
-               rc = SSL_do_handshake(m_SSL.get());
+       ObjectLock olock(this);
 
-               if (rc == 1) {
-                       lock.unlock();
+       while (left > 0) {
+               int rc = SSL_read(m_SSL.get(), ((char *)buffer) + (count - left), left);
 
-                       SetConnected(true);
-               } else {
+               if (rc <= 0) {
                        switch (SSL_get_error(m_SSL.get(), rc)) {
-                               case SSL_ERROR_WANT_WRITE:
-                                       /* fall through */
                                case SSL_ERROR_WANT_READ:
-                                       return;
-                               case SSL_ERROR_ZERO_RETURN:
-                                       Close();
-                                       return;
-                               default:
-                                       I2Stream_check_exception(m_BIO);
-                                       BOOST_THROW_EXCEPTION(openssl_error()
-                                           << boost::errinfo_api_function("SSL_do_handshake")
-                                           << errinfo_openssl_error(ERR_get_error()));
-                       }
-               }
-       }
-
-       bool new_data = false, read_ok = true;
-
-       while (read_ok) {
-               boost::mutex::scoped_lock lock(m_SSLMutex);
-
-               rc = SSL_read(m_SSL.get(), data, sizeof(data));
-
-               if (rc > 0) {
-                       lock.unlock();
-
-                       ObjectLock olock(this);
-                       m_RecvQueue->Write(data, rc);
-                       new_data = true;
-               } else {
-                       switch (SSL_get_error(m_SSL.get(), rc)) {
+                                       olock.Unlock();
+                                       m_InnerStream->WaitReadable(1);
+                                       olock.Lock();
+                                       continue;
                                case SSL_ERROR_WANT_WRITE:
-                                       /* fall through */
-                               case SSL_ERROR_WANT_READ:
-                                       read_ok = false;
-                                       break;
+                                       olock.Unlock();
+                                       m_InnerStream->WaitWritable(1);
+                                       olock.Lock();
+                                       continue;
                                case SSL_ERROR_ZERO_RETURN:
                                        Close();
-                                       return;
+                                       return count - left;
                                default:
                                        I2Stream_check_exception(m_BIO);
                                        BOOST_THROW_EXCEPTION(openssl_error()
@@ -194,41 +160,36 @@ void TlsStream::HandleIO(void)
                                            << errinfo_openssl_error(ERR_get_error()));
                        }
                }
-       }
-
-       if (new_data)
-               OnDataAvailable(GetSelf());
-
-       ObjectLock olock(this);
-
-       while (m_SendQueue->GetAvailableBytes() > 0) {
-               size_t count = m_SendQueue->GetAvailableBytes();
 
-               if (count == 0)
-                       break;
-
-               if (count > sizeof(data))
-                       count = sizeof(data);
+               left -= rc;
+       }
 
-               m_SendQueue->Peek(data, count);
+       return count;
+}
 
-               olock.Unlock();
+void TlsStream::Write(const void *buffer, size_t count)
+{
+       ASSERT(!OwnsLock());
 
-               boost::mutex::scoped_lock lock(m_SSLMutex);
+       size_t left = count;
 
-               rc = SSL_write(m_SSL.get(), (const char *)data, count);
+       ObjectLock olock(this);
 
-               if (rc > 0) {
-                       lock.unlock();
+       while (left > 0) {
+               int rc = SSL_write(m_SSL.get(), ((const char *)buffer) + (count - left), left);
 
-                       olock.Lock();
-                       m_SendQueue->Read(NULL, rc);
-               } else {
+               if (rc <= 0) {
                        switch (SSL_get_error(m_SSL.get(), rc)) {
                                case SSL_ERROR_WANT_READ:
-                                       /* fall through */
+                                       olock.Unlock();
+                                       m_InnerStream->WaitReadable(1);
+                                       olock.Lock();
+                                       continue;
                                case SSL_ERROR_WANT_WRITE:
-                                       return;
+                                       olock.Unlock();
+                                       m_InnerStream->WaitWritable(1);
+                                       olock.Lock();
+                                       continue;
                                case SSL_ERROR_ZERO_RETURN:
                                        Close();
                                        return;
@@ -239,6 +200,8 @@ void TlsStream::HandleIO(void)
                                            << errinfo_openssl_error(ERR_get_error()));
                        }
                }
+
+               left -= rc;
        }
 }
 
@@ -247,51 +210,5 @@ void TlsStream::HandleIO(void)
  */
 void TlsStream::Close(void)
 {
-       {
-               boost::mutex::scoped_lock lock(m_SSLMutex);
-       
-               if (m_SSL)
-                       SSL_shutdown(m_SSL.get());
-       }
-
-       {
-               ObjectLock olock(this);
-
-               m_SendQueue->Close();
-               m_RecvQueue->Close();
-       }
-
-       Stream::Close();
-}
-
-size_t TlsStream::GetAvailableBytes(void) const
-{
-       ObjectLock olock(this);
-
-       return m_RecvQueue->GetAvailableBytes();
-}
-
-size_t TlsStream::Peek(void *buffer, size_t count)
-{
-       ObjectLock olock(this);
-
-       return m_RecvQueue->Peek(buffer, count);
-}
-
-size_t TlsStream::Read(void *buffer, size_t count)
-{
-       ObjectLock olock(this);
-
-       return m_RecvQueue->Read(buffer, count);
-}
-
-void TlsStream::Write(const void *buffer, size_t count)
-{
-       {
-               ObjectLock olock(this);
-
-               m_SendQueue->Write(buffer, count);
-       }
-
-       Utility::QueueAsyncCallback(boost::bind(&TlsStream::HandleIO, this));
+       m_InnerStream->Close();
 }
index 0d4d714e81dc74034cffedad9c19d01e7c7ac23e..5741bc985209c1c4c866dad5c6047c2c769a328f 100644 (file)
@@ -21,6 +21,7 @@
 #define TLSSTREAM_H
 
 #include "base/i2-base.h"
+#include "base/bufferedstream.h"
 #include "base/stream.h"
 #include "base/fifo.h"
 #include "base/tlsutility.h"
@@ -50,34 +51,24 @@ public:
        shared_ptr<X509> GetClientCertificate(void) const;
        shared_ptr<X509> GetPeerCertificate(void) const;
 
-       virtual void Start(void);
+       void Handshake(void);
+
        virtual void Close(void);
 
-       virtual size_t GetAvailableBytes(void) const;
-       virtual size_t Peek(void *buffer, size_t count);
        virtual size_t Read(void *buffer, size_t count);
        virtual void Write(const void *buffer, size_t count);
 
 private:
        shared_ptr<SSL_CTX> m_SSLContext;
        shared_ptr<SSL> m_SSL;
-       mutable boost::mutex m_SSLMutex;
        BIO *m_BIO;
 
-       FIFO::Ptr m_SendQueue;
-       FIFO::Ptr m_RecvQueue;
-
-       Stream::Ptr m_InnerStream;
+       BufferedStream::Ptr m_InnerStream;
        TlsRole m_Role;
 
        static int m_SSLIndex;
        static bool m_SSLIndexInitialized;
 
-       void DataAvailableHandler(void);
-       void ClosedHandler(void);
-
-       void HandleIO(void);
-
        static void NullCertificateDeleter(X509 *certificate);
 };
 
index 556f9848291174ce6e1d08de1e3f0966a742a807..8e3abe90d45ab1bd8fdc951ab001ca917438fd9d 100644 (file)
@@ -212,7 +212,7 @@ Value Value::Deserialize(const String& jsonString)
        cJSON *json = cJSON_Parse(jsonString.CStr());
 
        if (!json)
-               BOOST_THROW_EXCEPTION(std::runtime_error("Invalid JSON String"));
+               BOOST_THROW_EXCEPTION(std::runtime_error("Invalid JSON String: " + jsonString));
 
        Value value = FromJson(json);
        cJSON_Delete(json);
index 3c4093a04d6be6a0cbaadf72c1415286ad259163..57c7a7a2fdfa5c8ba525ad892461e318dc6eab05 100644 (file)
@@ -16,8 +16,8 @@ libremoting_la_SOURCES = \
        endpointmanager.cpp \
        endpointmanager.h \
        i2-remoting.h \
-       jsonrpcconnection.cpp \
-       jsonrpcconnection.h \
+       jsonrpc.cpp \
+       jsonrpc.h \
        messagepart.cpp \
        messagepart.h \
        remoting-type.cpp \
index d2e88811311f5b41609610b38be1c95be679912c..f77d7949c57fde60a56cdfb0ee9ac399a34f770f 100644 (file)
@@ -19,6 +19,7 @@
 
 #include "remoting/endpoint.h"
 #include "remoting/endpointmanager.h"
+#include "remoting/jsonrpc.h"
 #include "base/application.h"
 #include "base/dynamictype.h"
 #include "base/objectlock.h"
@@ -106,30 +107,28 @@ bool Endpoint::IsConnected(void) const
        if (IsLocalEndpoint()) {
                return true;
        } else {
-               JsonRpcConnection::Ptr client = GetClient();
-
-               return (client && client->GetStream()->IsConnected());
+               return GetClient();
        }
 }
 
-JsonRpcConnection::Ptr Endpoint::GetClient(void) const
+Stream::Ptr Endpoint::GetClient(void) const
 {
        ObjectLock olock(this);
 
        return m_Client;
 }
 
-void Endpoint::SetClient(const JsonRpcConnection::Ptr& client)
+void Endpoint::SetClient(const Stream::Ptr& client)
 {
-       client->OnNewMessage.connect(boost::bind(&Endpoint::NewMessageHandler, this, _2));
-       client->OnClosed.connect(boost::bind(&Endpoint::ClientClosedHandler, this));
-
        {
                ObjectLock olock(this);
 
                m_Client = client;
        }
 
+       boost::thread thread(boost::bind(&Endpoint::MessageThreadProc, this, client));
+       thread.detach();
+
        OnConnected(GetSelf());
 }
 
@@ -261,7 +260,7 @@ void Endpoint::ProcessRequest(const Endpoint::Ptr& sender, const RequestMessage&
 
                Utility::QueueAsyncCallback(boost::bind(boost::ref(*it->second), GetSelf(), sender, request));
        } else {
-               GetClient()->SendMessage(request);
+               JsonRpc::SendMessage(GetClient(), request);
        }
 }
 
@@ -272,61 +271,53 @@ void Endpoint::ProcessResponse(const Endpoint::Ptr& sender, const ResponseMessag
 
        if (IsLocalEndpoint())
                EndpointManager::GetInstance()->ProcessResponseMessage(sender, response);
-       else
-               GetClient()->SendMessage(response);
-}
-
-void Endpoint::NewMessageHandler(const MessagePart& message)
-{
-       Endpoint::Ptr sender = GetSelf();
-
-       if (ResponseMessage::IsResponseMessage(message)) {
-               /* rather than routing the message to the right virtual
-                * endpoint we just process it here right away. */
-               EndpointManager::GetInstance()->ProcessResponseMessage(sender, message);
-               return;
+       else {
+               JsonRpc::SendMessage(GetClient(), response);
        }
-
-       RequestMessage request = message;
-
-       String method;
-       if (!request.GetMethod(&method))
-               return;
-
-       String id;
-       if (request.GetID(&id))
-               EndpointManager::GetInstance()->SendAnycastMessage(sender, request);
-       else
-               EndpointManager::GetInstance()->SendMulticastMessage(sender, request);
 }
 
-void Endpoint::ClientClosedHandler(void)
+void Endpoint::MessageThreadProc(const Stream::Ptr& stream)
 {
-       ASSERT(!OwnsLock());
-
-       /*try {
-               GetClient()->CheckException();
-       } catch (const exception& ex) {
-               stringstream message;
-               message << "Error occured for JSON-RPC socket: Message=" << diagnostic_information(ex);
-
-               Log(LogWarning, "jsonrpc", message.str());
-       }*/
-
-       Log(LogWarning, "jsonrpc", "Lost connection to endpoint: identity=" + GetName());
-
-       {
-               ObjectLock olock(this);
-
-               // TODO: _only_ clear non-persistent subscriptions
-               // unregister ourselves if no persistent subscriptions are left (use a
-               // timer for that, once we have a TTL property for the topics)
-               ClearSubscriptions();
-
-               m_Client.reset();
+       try {
+               for (;;) {
+                       MessagePart message = JsonRpc::ReadMessage(stream);
+                       Endpoint::Ptr sender = GetSelf();
+
+                       if (ResponseMessage::IsResponseMessage(message)) {
+                               /* rather than routing the message to the right virtual
+                                * endpoint we just process it here right away. */
+                               EndpointManager::GetInstance()->ProcessResponseMessage(sender, message);
+                               return;
+                       }
+
+                       RequestMessage request = message;
+
+                       String method;
+                       if (!request.GetMethod(&method))
+                               return;
+
+                       String id;
+                       if (request.GetID(&id))
+                               EndpointManager::GetInstance()->SendAnycastMessage(sender, request);
+                       else
+                               EndpointManager::GetInstance()->SendMulticastMessage(sender, request);
+               }
+       } catch (const std::exception& ex) {
+               Log(LogWarning, "jsonrpc", "Lost connection to endpoint '" + GetName() + "': " + boost::diagnostic_information(ex));
+
+               {
+                       ObjectLock olock(this);
+
+                       // TODO: _only_ clear non-persistent subscriptions
+                       // unregister ourselves if no persistent subscriptions are left (use a
+                       // timer for that, once we have a TTL property for the topics)
+                       ClearSubscriptions();
+
+                       m_Client.reset();
+               }
+
+               OnDisconnected(GetSelf());
        }
-
-       OnDisconnected(GetSelf());
 }
 
 /**
index 5a09732ad553afa3a7c7c77e8041b3a21b80b9c9..272dd66d4e3bc41f3a8bf9aece75efa6d272668a 100644 (file)
@@ -23,8 +23,8 @@
 #include "remoting/i2-remoting.h"
 #include "remoting/requestmessage.h"
 #include "remoting/responsemessage.h"
-#include "remoting/jsonrpcconnection.h"
 #include "base/dynamicobject.h"
+#include "base/stream.h"
 #include <boost/signals2.hpp>
 
 namespace icinga
@@ -50,8 +50,8 @@ public:
 
        static Endpoint::Ptr GetByName(const String& name);
 
-       JsonRpcConnection::Ptr GetClient(void) const;
-       void SetClient(const JsonRpcConnection::Ptr& client);
+       Stream::Ptr GetClient(void) const;
+       void SetClient(const Stream::Ptr& client);
 
        void RegisterSubscription(const String& topic);
        void UnregisterSubscription(const String& topic);
@@ -85,7 +85,7 @@ private:
        Attribute<String> m_Node;
        Attribute<String> m_Service;
 
-       JsonRpcConnection::Ptr m_Client;
+       Stream::Ptr m_Client;
 
        bool m_ReceivedWelcome; /**< Have we received a welcome message
                                     from this endpoint? */
@@ -94,8 +94,7 @@ private:
 
        std::map<String, shared_ptr<boost::signals2::signal<Callback> > > m_TopicHandlers;
 
-       void NewMessageHandler(const MessagePart& message);
-       void ClientClosedHandler(void);
+       void MessageThreadProc(const Stream::Ptr& stream);
 };
 
 }
index 945633fab5fe51154a7ef8f70b569b15ded04159..af517bfd9eba44e6a673a768ffc74f84a60994d6 100644 (file)
@@ -24,6 +24,7 @@
 #include "base/convert.h"
 #include "base/utility.h"
 #include "base/tlsutility.h"
+#include "base/networkstream.h"
 #include <boost/tuple/tuple.hpp>
 #include <boost/foreach.hpp>
 
@@ -129,14 +130,29 @@ void EndpointManager::AddListener(const String& service)
        Log(LogInformation, "icinga", s.str());
 
        TcpSocket::Ptr server = boost::make_shared<TcpSocket>();
+       server->Bind(service, AF_INET6);
+
+       boost::thread thread(boost::bind(&EndpointManager::ListenerThreadProc, this, server));
+       thread.detach();
 
        m_Servers.insert(server);
-       server->OnNewClient.connect(boost::bind(&EndpointManager::NewClientHandler,
-          this, _2, TlsRoleServer));
+}
 
-       server->Bind(service, AF_INET6);
+void EndpointManager::ListenerThreadProc(const Socket::Ptr& server)
+{
        server->Listen();
-       server->Start();
+
+       for (;;) {
+               Socket::Ptr client = server->Accept();
+
+               try {
+                       NewClientHandler(client, TlsRoleServer);
+               } catch (const std::exception& ex) {
+                       std::stringstream message;
+                       message << "Error for new JSON-RPC socket: " << boost::diagnostic_information(ex);
+                       Log(LogInformation, "remoting", message.str());
+               }
+       }
 }
 
 /**
@@ -146,16 +162,23 @@ void EndpointManager::AddListener(const String& service)
  * @param service The remote port.
  */
 void EndpointManager::AddConnection(const String& node, const String& service) {
-       ObjectLock olock(this);
+       {
+               ObjectLock olock(this);
 
-       shared_ptr<SSL_CTX> sslContext = m_SSLContext;
+               shared_ptr<SSL_CTX> sslContext = m_SSLContext;
 
-       if (!sslContext)
-               BOOST_THROW_EXCEPTION(std::logic_error("SSL context is required for AddConnection()"));
+               if (!sslContext)
+                       BOOST_THROW_EXCEPTION(std::logic_error("SSL context is required for AddConnection()"));
+       }
 
        TcpSocket::Ptr client = boost::make_shared<TcpSocket>();
-       client->Connect(node, service);
-       NewClientHandler(client, TlsRoleClient);
+
+       try {
+               client->Connect(node, service);
+               NewClientHandler(client, TlsRoleClient);
+       } catch (const std::exception& ex) {
+               Log(LogInformation, "remoting", "Could not connect to " + node + ":" + service + ": " + ex.what());
+       }
 }
 
 /**
@@ -165,25 +188,10 @@ void EndpointManager::AddConnection(const String& node, const String& service) {
  */
 void EndpointManager::NewClientHandler(const Socket::Ptr& client, TlsRole role)
 {
-       TlsStream::Ptr tlsStream = boost::make_shared<TlsStream>(client, role, m_SSLContext);
-
-       m_PendingClients.insert(tlsStream);
-       tlsStream->OnConnected.connect(boost::bind(&EndpointManager::ClientConnectedHandler, this, _1));
-       tlsStream->OnClosed.connect(boost::bind(&EndpointManager::ClientClosedHandler, this, _1));
-
-       client->Start();
-       tlsStream->Start();
-}
-
-void EndpointManager::ClientConnectedHandler(const Stream::Ptr& client)
-{
-       TlsStream::Ptr tlsStream = static_pointer_cast<TlsStream>(client);
-       JsonRpcConnection::Ptr jclient = boost::make_shared<JsonRpcConnection>(tlsStream);
+       NetworkStream::Ptr netStream = boost::make_shared<NetworkStream>(client);
 
-       {
-               ObjectLock olock(this);
-               m_PendingClients.erase(tlsStream);
-       }
+       TlsStream::Ptr tlsStream = boost::make_shared<TlsStream>(netStream, role, m_SSLContext);
+       tlsStream->Handshake();
 
        shared_ptr<X509> cert = tlsStream->GetPeerCertificate();
        String identity = GetCertificateCN(cert);
@@ -195,17 +203,7 @@ void EndpointManager::ClientConnectedHandler(const Stream::Ptr& client)
        if (!endpoint)
                endpoint = Endpoint::MakeEndpoint(identity, true);
 
-       endpoint->SetClient(jclient);
-}
-
-void EndpointManager::ClientClosedHandler(const Stream::Ptr& client)
-{
-       TlsStream::Ptr tlsStream = static_pointer_cast<TlsStream>(client);
-
-       {
-               ObjectLock olock(this);
-               m_PendingClients.erase(tlsStream);
-       }
+       endpoint->SetClient(tlsStream);
 }
 
 /**
index 7acb1c8659495a9161c252c22a24a15e70a88547..c98c9265ed139459903f9bcfa4e2b60a80ebcf90 100644 (file)
@@ -81,7 +81,6 @@ private:
        Timer::Ptr m_ReconnectTimer;
 
        std::set<TcpSocket::Ptr> m_Servers;
-       std::set<TlsStream::Ptr> m_PendingClients;
 
        /**
         * Information about a pending API request.
@@ -112,8 +111,8 @@ private:
        void ReconnectTimerHandler(void);
 
        void NewClientHandler(const Socket::Ptr& client, TlsRole role);
-       void ClientConnectedHandler(const Stream::Ptr& client);
-       void ClientClosedHandler(const Stream::Ptr& client);
+
+       void ListenerThreadProc(const Socket::Ptr& server);
 };
 
 }
similarity index 63%
rename from lib/remoting/jsonrpcconnection.cpp
rename to lib/remoting/jsonrpc.cpp
index a6225288ee9cd4c7c600ac16b5ec34c80027e005..47d8d8535817cbb8eaa4474beda0e38f642a9adf 100644 (file)
  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA.             *
  ******************************************************************************/
 
-#include "remoting/jsonrpcconnection.h"
+#include "remoting/jsonrpc.h"
 #include "base/netstring.h"
 #include "base/objectlock.h"
 #include "base/logger_fwd.h"
 #include <boost/exception/diagnostic_information.hpp>
+#include <iostream>
 
 using namespace icinga;
 
-/**
- * Constructor for the JsonRpcConnection class.
- *
- * @param stream The stream.
- */
-JsonRpcConnection::JsonRpcConnection(const Stream::Ptr& stream)
-       : Connection(stream)
-{ }
-
 /**
  * Sends a message to the connected peer.
  *
  * @param message The message.
  */
-void JsonRpcConnection::SendMessage(const MessagePart& message)
+void JsonRpc::SendMessage(const Stream::Ptr& stream, const MessagePart& message)
 {
-       ObjectLock olock(this);
-
        Value value = message.GetDictionary();
        String json = value.Serialize();
        //std::cerr << ">> " << json << std::endl;
-       NetString::WriteStringToStream(GetStream(), json);
+       NetString::WriteStringToStream(stream, json);
 }
 
-/**
- * Processes inbound data.
- */
-void JsonRpcConnection::ProcessData(void)
+MessagePart JsonRpc::ReadMessage(const Stream::Ptr& stream)
 {
-       ObjectLock olock(this);
-
        String jsonString;
+       if (!NetString::ReadStringFromStream(stream, &jsonString))
+               BOOST_THROW_EXCEPTION(std::runtime_error("ReadStringFromStream signalled EOF."));
 
-       while (NetString::ReadStringFromStream(GetStream(), &jsonString)) {
-               //std::cerr << "<< " << jsonString << std::endl;
-
-               try {
-                       Value value = Value::Deserialize(jsonString);
+       //std::cerr << "<< " << jsonString << std::endl;
+       Value value = Value::Deserialize(jsonString);
 
-                       if (!value.IsObjectType<Dictionary>()) {
-                               BOOST_THROW_EXCEPTION(std::invalid_argument("JSON-RPC"
-                                   " message must be a dictionary."));
-                       }
-
-                       MessagePart mp(value);
-                       OnNewMessage(GetSelf(), mp);
-               } catch (const std::exception& ex) {
-                       Log(LogCritical, "remoting", "Exception"
-                           " while processing message from JSON-RPC client: " +
-                           boost::diagnostic_information(ex));
-               }
+       if (!value.IsObjectType<Dictionary>()) {
+               BOOST_THROW_EXCEPTION(std::invalid_argument("JSON-RPC"
+                   " message must be a dictionary."));
        }
+
+       return MessagePart(value);
 }
similarity index 76%
rename from components/livestatus/connection.h
rename to lib/remoting/jsonrpc.h
index 49d36ec02563a562ff5469090e534323a2c35869..31cf1ed491411b5d7bcbe3493c5fee1ddbbd00ae 100644 (file)
  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA.             *
  ******************************************************************************/
 
-#ifndef LIVESTATUSCONNECTION_H
-#define LIVESTATUSCONNECTION_H
+#ifndef JSONRPC_H
+#define JSONRPC_H
 
-#include "base/connection.h"
+#include "remoting/i2-remoting.h"
+#include "remoting/messagepart.h"
+#include "base/stream.h"
 
-using namespace icinga;
-
-namespace livestatus
+namespace icinga
 {
 
-class LivestatusConnection : public Connection
+/**
+ * A JSON-RPC connection.
+ *
+ * @ingroup remoting
+ */
+class I2_REMOTING_API JsonRpc
 {
 public:
-       typedef shared_ptr<LivestatusConnection> Ptr;
-       typedef weak_ptr<LivestatusConnection> WeakPtr;
-
-       LivestatusConnection(const Stream::Ptr& stream);
-
-protected:
-       std::vector<String> m_Lines;
+       static void SendMessage(const Stream::Ptr& stream, const MessagePart& message);
+       static MessagePart ReadMessage(const Stream::Ptr& stream);
 
-       virtual void ProcessData(void);
+private:
+       JsonRpc(void);
 };
 
 }
 
-#endif /* LIVESTATUSCONNECTION_H */
+#endif /* JSONRPC_H */
index 1c02fd25830343a1fa65a23dc6fdb571959d329e..ac703b9bdd628c51f387c339e6262dd68b5cb1be 100644 (file)
@@ -39,8 +39,8 @@ class I2_REMOTING_API MessagePart
 {
 public:
        MessagePart(void);
+       MessagePart(const MessagePart& message);
        explicit MessagePart(const Dictionary::Ptr& dictionary);
-       explicit MessagePart(const MessagePart& message);
 
        Dictionary::Ptr GetDictionary(void) const;