commentstable.h \
component.cpp \
component.h \
- connection.cpp \
- connection.h \
contactgroupstable.cpp \
contactgroupstable.h \
contactstable.cpp \
#include "base/dynamictype.h"
#include "base/logger_fwd.h"
#include "base/tcpsocket.h"
+#include "base/networkstream.h"
#include "base/application.h"
#include <boost/smart_ptr/make_shared.hpp>
socket->Bind("6558", AF_INET);
//#endif /* _WIN32 */
- socket->OnNewClient.connect(boost::bind(&LivestatusComponent::NewClientHandler, this, _2));
- socket->Listen();
- socket->Start();
m_Listener = socket;
+
+ boost::thread thread(boost::bind(&LivestatusComponent::ServerThreadProc, this, socket));
+ thread.detach();
}
String LivestatusComponent::GetSocketPath(void) const
return socketPath;
}
-void LivestatusComponent::NewClientHandler(const Socket::Ptr& client)
+void LivestatusComponent::ServerThreadProc(const Socket::Ptr& server)
{
- Log(LogInformation, "livestatus", "Client connected");
+ server->Listen();
+
+ for (;;) {
+ Socket::Ptr client = server->Accept();
- LivestatusConnection::Ptr lconnection = boost::make_shared<LivestatusConnection>(client);
- lconnection->OnClosed.connect(boost::bind(&LivestatusComponent::ClientClosedHandler, this, _1));
+ Log(LogInformation, "livestatus", "Client connected");
- m_Connections.insert(lconnection);
- client->Start();
+ boost::thread thread(boost::bind(&LivestatusComponent::ClientThreadProc, this, client));
+ thread.detach();
+ }
}
-void LivestatusComponent::ClientClosedHandler(const Connection::Ptr& connection)
+void LivestatusComponent::ClientThreadProc(const Socket::Ptr& client)
{
- LivestatusConnection::Ptr lconnection = static_pointer_cast<LivestatusConnection>(connection);
+ Stream::Ptr stream = boost::make_shared<NetworkStream>(client);
+
+ for (;;) {
+ String line;
+ bool read_line = false;
+
+ std::vector<String> lines;
+
+ while (stream->ReadLine(&line)) {
+ read_line = true;
+
+ if (line.GetLength() > 0)
+ lines.push_back(line);
+ else
+ break;
+ }
- Log(LogInformation, "livestatus", "Client disconnected");
- m_Connections.erase(lconnection);
+ Query::Ptr query = boost::make_shared<Query>(lines);
+ query->Execute(stream);
+ }
}
#ifndef LIVESTATUSCOMPONENT_H
#define LIVESTATUSCOMPONENT_H
-#include "livestatus/connection.h"
+#include "livestatus/query.h"
#include "base/dynamicobject.h"
#include "base/socket.h"
Attribute<String> m_SocketPath;
Socket::Ptr m_Listener;
- std::set<LivestatusConnection::Ptr> m_Connections;
- void NewClientHandler(const Socket::Ptr& client);
- void ClientClosedHandler(const Connection::Ptr& connection);
+ void ServerThreadProc(const Socket::Ptr& server);
+ void ClientThreadProc(const Socket::Ptr& client);
};
}
+++ /dev/null
-/******************************************************************************
- * Icinga 2 *
- * Copyright (C) 2012 Icinga Development Team (http://www.icinga.org/) *
- * *
- * This program is free software; you can redistribute it and/or *
- * modify it under the terms of the GNU General Public License *
- * as published by the Free Software Foundation; either version 2 *
- * of the License, or (at your option) any later version. *
- * *
- * This program is distributed in the hope that it will be useful, *
- * but WITHOUT ANY WARRANTY; without even the implied warranty of *
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
- * GNU General Public License for more details. *
- * *
- * You should have received a copy of the GNU General Public License *
- * along with this program; if not, write to the Free Software Foundation *
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
- ******************************************************************************/
-
-#include "livestatus/connection.h"
-#include "livestatus/query.h"
-#include <boost/smart_ptr/make_shared.hpp>
-
-using namespace icinga;
-using namespace livestatus;
-
-LivestatusConnection::LivestatusConnection(const Stream::Ptr& stream)
- : Connection(stream)
-{ }
-
-void LivestatusConnection::ProcessData(void)
-{
- String line;
- bool read_line = false;
-
- while (GetStream()->ReadLine(&line)) {
- read_line = true;
-
- if (line.GetLength() > 0)
- m_Lines.push_back(line);
- else
- break;
- }
-
- /* Return if we didn't at least read one line. */
- if (!read_line)
- return;
-
- /* Return if we haven't found the end of the query. */
- if (line.GetLength() > 0 && !GetStream()->IsReadEOF())
- return;
-
- Query::Ptr query = boost::make_shared<Query>(m_Lines);
- m_Lines.clear();
-
- query->Execute(GetStream());
-}
array.h \
attribute.cpp \
attribute.h \
- connection.cpp \
- connection.h \
+ bufferedstream.cpp \
+ bufferedstream.h \
convert.cpp \
convert.h \
dictionary.cpp \
logger_fwd.h \
netstring.cpp \
netstring.h \
+ networkstream.cpp \
+ networkstream.h \
object.cpp \
object.h \
objectlock.cpp \
--- /dev/null
+/******************************************************************************
+ * Icinga 2 *
+ * Copyright (C) 2012 Icinga Development Team (http://www.icinga.org/) *
+ * *
+ * This program is free software; you can redistribute it and/or *
+ * modify it under the terms of the GNU General Public License *
+ * as published by the Free Software Foundation; either version 2 *
+ * of the License, or (at your option) any later version. *
+ * *
+ * This program is distributed in the hope that it will be useful, *
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of *
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
+ * GNU General Public License for more details. *
+ * *
+ * You should have received a copy of the GNU General Public License *
+ * along with this program; if not, write to the Free Software Foundation *
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
+ ******************************************************************************/
+
+#include "base/bufferedstream.h"
+#include "base/objectlock.h"
+#include "base/utility.h"
+#include "base/logger_fwd.h"
+#include <boost/smart_ptr/make_shared.hpp>
+#include <sstream>
+
+using namespace icinga;
+
+BufferedStream::BufferedStream(const Stream::Ptr& innerStream)
+ : m_InnerStream(innerStream), m_RecvQ(boost::make_shared<FIFO>()), m_SendQ(boost::make_shared<FIFO>())
+{
+ boost::thread readThread(boost::bind(&BufferedStream::ReadThreadProc, this));
+ readThread.detach();
+
+ boost::thread writeThread(boost::bind(&BufferedStream::WriteThreadProc, this));
+ writeThread.detach();
+}
+
+void BufferedStream::ReadThreadProc(void)
+{
+ char buffer[512];
+
+ try {
+ for (;;) {
+ size_t rc = m_InnerStream->Read(buffer, sizeof(buffer));
+
+ if (rc == 0)
+ break;
+
+ boost::mutex::scoped_lock lock(m_Mutex);
+ m_RecvQ->Write(buffer, rc);
+ m_ReadCV.notify_all();
+ }
+ } catch (const std::exception& ex) {
+ std::ostringstream msgbuf;
+ msgbuf << "Error for buffered stream (Read): " << boost::diagnostic_information(ex);
+ Log(LogWarning, "base", msgbuf.str());
+
+ Close();
+ }
+}
+
+void BufferedStream::WriteThreadProc(void)
+{
+ char buffer[512];
+
+ try {
+ for (;;) {
+ size_t rc;
+
+ {
+ boost::mutex::scoped_lock lock(m_Mutex);
+
+ while (m_SendQ->GetAvailableBytes() == 0)
+ m_WriteCV.wait(lock);
+
+ rc = m_SendQ->Read(buffer, sizeof(buffer));
+ }
+
+ m_InnerStream->Write(buffer, rc);
+ }
+ } catch (const std::exception& ex) {
+ std::ostringstream msgbuf;
+ msgbuf << "Error for buffered stream (Write): " << boost::diagnostic_information(ex);
+ Log(LogWarning, "base", msgbuf.str());
+
+ Close();
+ }
+}
+
+void BufferedStream::Close(void)
+{
+ m_InnerStream->Close();
+}
+
+/**
+ * Reads data from the stream.
+ *
+ * @param buffer The buffer where data should be stored. May be NULL if you're
+ * not actually interested in the data.
+ * @param count The number of bytes to read from the queue.
+ * @returns The number of bytes actually read.
+ */
+size_t BufferedStream::Read(void *buffer, size_t count)
+{
+ boost::mutex::scoped_lock lock(m_Mutex);
+ return m_RecvQ->Read(buffer, count);
+}
+
+/**
+ * Writes data to the stream.
+ *
+ * @param buffer The data that is to be written.
+ * @param count The number of bytes to write.
+ * @returns The number of bytes written
+ */
+void BufferedStream::Write(const void *buffer, size_t count)
+{
+ boost::mutex::scoped_lock lock(m_Mutex);
+ m_SendQ->Write(buffer, count);
+ m_WriteCV.notify_all();
+}
+
+void BufferedStream::WaitReadable(size_t count)
+{
+ boost::mutex::scoped_lock lock(m_Mutex);
+
+ while (m_RecvQ->GetAvailableBytes() < count)
+ m_ReadCV.wait(lock);
+}
+
+void BufferedStream::WaitWritable(size_t count)
+{ /* Nothing to do here. */ }
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
******************************************************************************/
-#ifndef JSONRPCCONNECTION_H
-#define JSONRPCCONNECTION_H
+#ifndef BUFFEREDSTREAM_H
+#define BUFFEREDSTREAM_H
-#include "remoting/i2-remoting.h"
-#include "remoting/messagepart.h"
-#include "base/connection.h"
-#include <boost/signals2.hpp>
+#include "base/i2-base.h"
+#include "base/stream.h"
+#include "base/fifo.h"
namespace icinga
{
/**
- * A JSON-RPC connection.
+ * A buffered stream.
*
- * @ingroup remoting
+ * @ingroup base
*/
-class I2_REMOTING_API JsonRpcConnection : public Connection
+class I2_BASE_API BufferedStream : public Stream
{
public:
- typedef shared_ptr<JsonRpcConnection> Ptr;
- typedef weak_ptr<JsonRpcConnection> WeakPtr;
+ typedef shared_ptr<BufferedStream> Ptr;
+ typedef weak_ptr<BufferedStream> WeakPtr;
- explicit JsonRpcConnection(const Stream::Ptr& stream);
+ BufferedStream(const Stream::Ptr& innerStream);
- void SendMessage(const MessagePart& message);
+ virtual size_t Read(void *buffer, size_t count);
+ virtual void Write(const void *buffer, size_t count);
- boost::signals2::signal<void (const JsonRpcConnection::Ptr&, const MessagePart&)> OnNewMessage;
+ virtual void Close(void);
-protected:
- virtual void ProcessData(void);
+ void WaitReadable(size_t count);
+ void WaitWritable(size_t count);
+
+private:
+ Stream::Ptr m_InnerStream;
+
+ FIFO::Ptr m_RecvQ;
+ FIFO::Ptr m_SendQ;
+
+ boost::exception_ptr m_Exception;
+
+ boost::mutex m_Mutex;
+ boost::condition_variable m_ReadCV;
+ boost::condition_variable m_WriteCV;
+
+ void ReadThreadProc(void);
+ void WriteThreadProc(void);
};
}
-#endif /* JSONRPCCONNECTION_H */
+#endif /* BUFFEREDSTREAM_H */
BOOST_THROW_EXCEPTION(std::runtime_error("Could not open '" + filename + "' file"));
StdioStream::Ptr sfp = boost::make_shared<StdioStream>(&fp, false);
- sfp->Start();
BOOST_FOREACH(const DynamicType::Ptr& type, DynamicType::GetTypes()) {
BOOST_FOREACH(const DynamicObject::Ptr& object, type->GetObjects()) {
fp.open(filename.CStr(), std::ios_base::in);
StdioStream::Ptr sfp = boost::make_shared<StdioStream>(&fp, false);
- sfp->Start();
unsigned long restored = 0;
free(m_Buffer);
}
-void FIFO::Start(void)
-{
- SetConnected(true);
-
- Stream::Start();
-}
-
/**
* Resizes the FIFO's buffer so that it is at least newSize bytes long.
*
}
/**
- * Implements IOQueue::GetAvailableBytes().
- */
-size_t FIFO::GetAvailableBytes(void) const
-{
- return m_DataSize;
-}
-
-/**
- * Returns a pointer to the start of the read buffer.
- *
- * @returns Pointer to the read buffer.
- */
-/*const void *FIFO::GetReadBuffer(void) const
-{
- return m_Buffer + m_Offset;
-}*/
-
-/**
- * Implements IOQueue::Peek.
+ * Implements IOQueue::Read.
*/
-size_t FIFO::Peek(void *buffer, size_t count)
+size_t FIFO::Read(void *buffer, size_t count)
{
- ASSERT(IsConnected());
-
if (count > m_DataSize)
count = m_DataSize;
if (buffer != NULL)
memcpy(buffer, m_Buffer + m_Offset, count);
- return count;
-}
-
-/**
- * Implements IOQueue::Read.
- */
-size_t FIFO::Read(void *buffer, size_t count)
-{
- count = Peek(buffer, count);
-
m_DataSize -= count;
m_Offset += count;
*/
void FIFO::Write(const void *buffer, size_t count)
{
- ASSERT(IsConnected());
-
ResizeBuffer(m_Offset + m_DataSize + count);
memcpy(m_Buffer + m_Offset + m_DataSize, buffer, count);
m_DataSize += count;
}
+
+void FIFO::Close(void)
+{ }
+
+size_t FIFO::GetAvailableBytes(void) const
+{
+ return m_DataSize;
+}
FIFO(void);
~FIFO(void);
- void Start(void);
+ virtual size_t Read(void *buffer, size_t count);
+ virtual void Write(const void *buffer, size_t count);
+ virtual void Close(void);
size_t GetAvailableBytes(void) const;
- size_t Peek(void *buffer, size_t count);
- size_t Read(void *buffer, size_t count);
- void Write(const void *buffer, size_t count);
private:
char *m_Buffer;
******************************************************************************/
#include "base/netstring.h"
+#include "base/utility.h"
#include <sstream>
using namespace icinga;
/**
- * Reads data from a stream in netString format.
+ * Reads data from a stream in netstring format.
*
* @param stream The stream to read from.
* @param[out] str The String that has been read from the IOQueue.
* @returns true if a complete String was read from the IOQueue, false otherwise.
* @exception invalid_argument The input stream is invalid.
- * @see https://github.com/PeterScott/netString-c/blob/master/netString.c
+ * @see https://github.com/PeterScott/netstring-c/blob/master/netstring.c
*/
bool NetString::ReadStringFromStream(const Stream::Ptr& stream, String *str)
{
/* 16 bytes are enough for the header */
- size_t peek_length, buffer_length = 16;
- char *buffer = static_cast<char *>(malloc(buffer_length));
+ const size_t header_length = 16;
+ size_t read_length;
+ char *header = static_cast<char *>(malloc(header_length));
- if (buffer == NULL)
+ if (header == NULL)
BOOST_THROW_EXCEPTION(std::bad_alloc());
- peek_length = stream->Peek(buffer, buffer_length);
+ read_length = 0;
- /* minimum netString length is 3 */
- if (peek_length < 3) {
- free(buffer);
- return false;
+ while (read_length < header_length) {
+ /* Read one byte. */
+ int rc = stream->Read(header + read_length, 1);
+
+ if (rc == 0) {
+ if (read_length == 0)
+ return false;
+
+ BOOST_THROW_EXCEPTION(std::runtime_error("Read() failed."));
+ }
+
+ ASSERT(rc == 1);
+
+ read_length++;
+
+ if (header[read_length - 1] == ':') {
+ break;
+ } else if (header_length == read_length) {
+ free(header);
+ BOOST_THROW_EXCEPTION(std::invalid_argument("Invalid NetString (missing :)"));
+ }
+ }
+
+ /* minimum netstring length is 3 */
+ if (read_length < 3) {
+ free(header);
+ BOOST_THROW_EXCEPTION(std::invalid_argument("Invalid NetString (short header)"));
}
/* no leading zeros allowed */
- if (buffer[0] == '0' && isdigit(buffer[1])) {
- free(buffer);
- BOOST_THROW_EXCEPTION(std::invalid_argument("Invalid netString (leading zero)"));
+ if (header[0] == '0' && isdigit(header[1])) {
+ free(header);
+ BOOST_THROW_EXCEPTION(std::invalid_argument("Invalid NetString (leading zero)"));
}
size_t len, i;
len = 0;
- for (i = 0; i < peek_length && isdigit(buffer[i]); i++) {
+ for (i = 0; i < read_length && isdigit(header[i]); i++) {
/* length specifier must have at most 9 characters */
if (i >= 9) {
- free(buffer);
+ free(header);
BOOST_THROW_EXCEPTION(std::invalid_argument("Length specifier must not exceed 9 characters"));
}
- len = len * 10 + (buffer[i] - '0');
+ len = len * 10 + (header[i] - '0');
}
+ free(header);
+
/* read the whole message */
- buffer_length = i + 1 + len + 1;
+ size_t data_length = len + 1;
- char *new_buffer = static_cast<char *>(realloc(buffer, buffer_length));
+ char *data = static_cast<char *>(malloc(data_length));
- if (new_buffer == NULL) {
- free(buffer);
+ if (data == NULL) {
BOOST_THROW_EXCEPTION(std::bad_alloc());
}
- buffer = new_buffer;
-
- peek_length = stream->Peek(buffer, buffer_length);
-
- if (peek_length < buffer_length)
- return false;
-
- /* check for the colon delimiter */
- if (buffer[i] != ':') {
- free(buffer);
- BOOST_THROW_EXCEPTION(std::invalid_argument("Invalid NetString (missing :)"));
- }
+ size_t rc = stream->Read(data, data_length);
+
+ if (rc != data_length)
+ BOOST_THROW_EXCEPTION(std::runtime_error("Read() failed."));
- /* check for the comma delimiter after the String */
- if (buffer[i + 1 + len] != ',') {
- free(buffer);
+ if (data[len] != ',')
BOOST_THROW_EXCEPTION(std::invalid_argument("Invalid NetString (missing ,)"));
- }
-
- *str = String(&buffer[i + 1], &buffer[i + 1 + len]);
-
- free(buffer);
+
+ *str = String(&data[0], &data[len]);
- /* remove the data from the stream */
- stream->Read(NULL, peek_length);
+ free(data);
return true;
}
/**
- * Writes data into a stream using the netString format.
+ * Writes data into a stream using the netstring format.
*
* @param stream The stream.
* @param str The String that is to be written.
*/
void NetString::WriteStringToStream(const Stream::Ptr& stream, const String& str)
{
- std::ostringstream prefixbuf;
- prefixbuf << str.GetLength() << ":";
+ std::ostringstream msgbuf;
+ msgbuf << str.GetLength() << ":" << str << ",";
- String prefix = prefixbuf.str();
- stream->Write(prefix.CStr(), prefix.GetLength());
- stream->Write(str.CStr(), str.GetLength());
- stream->Write(",", 1);
+ String msg = msgbuf.str();
+ stream->Write(msg.CStr(), msg.GetLength());
}
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
******************************************************************************/
-#include "base/connection.h"
-#include <boost/bind.hpp>
+#include "base/networkstream.h"
+#include "base/objectlock.h"
+#include "base/utility.h"
+#include <boost/algorithm/string/trim.hpp>
using namespace icinga;
-Connection::Connection(const Stream::Ptr& stream)
- : m_Stream(stream)
-{
- m_Stream->OnDataAvailable.connect(boost::bind(&Connection::ProcessData, this));
- m_Stream->OnClosed.connect(boost::bind(&Connection::ClosedHandler, this));
-}
+NetworkStream::NetworkStream(const Socket::Ptr& socket)
+ : m_Socket(socket)
+{ }
-Stream::Ptr Connection::GetStream(void) const
+void NetworkStream::Close(void)
{
- return m_Stream;
+ m_Socket->Close();
}
-void Connection::ClosedHandler(void)
+/**
+ * Reads data from the stream.
+ *
+ * @param buffer The buffer where data should be stored. May be NULL if you're
+ * not actually interested in the data.
+ * @param count The number of bytes to read from the queue.
+ * @returns The number of bytes actually read.
+ */
+size_t NetworkStream::Read(void *buffer, size_t count)
{
- OnClosed(GetSelf());
+ return m_Socket->Read(buffer, count);
}
-void Connection::Close(void)
+/**
+ * Writes data to the stream.
+ *
+ * @param buffer The data that is to be written.
+ * @param count The number of bytes to write.
+ * @returns The number of bytes written
+ */
+void NetworkStream::Write(const void *buffer, size_t count)
{
- m_Stream->Close();
+ size_t rc = m_Socket->Write(buffer, count);
+ if (rc < count)
+ BOOST_THROW_EXCEPTION(std::runtime_error("Short write for socket."));
}
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
******************************************************************************/
-#ifndef CONNECTION_H
-#define CONNECTION_H
+#ifndef NETWORKSTREAM_H
+#define NETWORKSTREAM_H
#include "base/i2-base.h"
#include "base/stream.h"
-#include <boost/signals2.hpp>
+#include "base/socket.h"
namespace icinga
{
-class I2_BASE_API Connection : public Object
+/**
+ * A network stream.
+ *
+ * @ingroup base
+ */
+class I2_BASE_API NetworkStream : public Stream
{
public:
- typedef shared_ptr<Connection> Ptr;
- typedef weak_ptr<Connection> WeakPtr;
+ typedef shared_ptr<NetworkStream> Ptr;
+ typedef weak_ptr<NetworkStream> WeakPtr;
- explicit Connection(const Stream::Ptr& stream);
+ NetworkStream(const Socket::Ptr& socket);
- Stream::Ptr GetStream(void) const;
+ virtual size_t Read(void *buffer, size_t count);
+ virtual void Write(const void *buffer, size_t count);
- void Close(void);
-
- boost::signals2::signal<void (const Connection::Ptr&)> OnClosed;
-
-protected:
- virtual void ProcessData(void) = 0;
+ virtual void Close(void);
private:
- Stream::Ptr m_Stream;
-
- void ClosedHandler(void);
+ Socket::Ptr m_Socket;
};
}
-#endif /* CONNECTION_H */
+#endif /* NETWORKSTREAM_H */
* Constructor for the Socket class.
*/
Socket::Socket(void)
- : m_FD(INVALID_SOCKET), m_Connected(false), m_Listening(false),
- m_SendQueue(boost::make_shared<FIFO>()), m_RecvQueue(boost::make_shared<FIFO>())
-{
- m_SendQueue->Start();
- m_RecvQueue->Start();
-}
+ : m_FD(INVALID_SOCKET)
+{ }
/**
- * Destructor for the Socket class.
+ * Constructor for the Socket class.
*/
-Socket::~Socket(void)
+Socket::Socket(SOCKET fd)
+ : m_FD(INVALID_SOCKET)
{
- m_SendQueue->Close();
- m_RecvQueue->Close();
-
- CloseInternal();
+ SetFD(fd);
}
/**
- * Starts I/O processing for this socket.
+ * Destructor for the Socket class.
*/
-void Socket::Start(void)
+Socket::~Socket(void)
{
- ASSERT(!m_ReadThread.joinable() && !m_WriteThread.joinable());
- ASSERT(GetFD() != INVALID_SOCKET);
-
- // TODO: figure out why we're not using "this" here (hint: to keep the object alive until the threads are done)
- m_ReadThread = boost::thread(boost::bind(&Socket::ReadThreadProc, static_cast<Socket::Ptr>(GetSelf())));
- m_ReadThread.detach();
-
- m_WriteThread = boost::thread(boost::bind(&Socket::WriteThreadProc, static_cast<Socket::Ptr>(GetSelf())));
- m_WriteThread.detach();
-
- Stream::Start();
+ Close();
}
/**
*/
void Socket::SetFD(SOCKET fd)
{
- ObjectLock olock(this);
-
- /* mark the socket as non-blocking and close-on-exec */
if (fd != INVALID_SOCKET) {
- Utility::SetNonBlockingSocket(fd);
+ /* mark the socket as close-on-exec */
#ifndef _WIN32
Utility::SetCloExec(fd);
#endif /* _WIN32 */
}
+ ObjectLock olock(this);
m_FD = fd;
}
/**
* Closes the socket.
*/
-void Socket::CloseInternal(void)
+void Socket::Close(void)
{
- {
- ObjectLock olock(this);
+ ObjectLock olock(this);
- if (m_FD != INVALID_SOCKET) {
- closesocket(m_FD);
- m_FD = INVALID_SOCKET;
- }
+ if (m_FD != INVALID_SOCKET) {
+ closesocket(m_FD);
+ m_FD = INVALID_SOCKET;
}
-
- Stream::Close();
-}
-
-/**
- * Shuts down the socket.
- */
-void Socket::Close(void)
-{
- SetWriteEOF(true);
}
-
/**
* Retrieves the last error that occured for the socket.
*
return 0;
}
-/**
- * Processes errors that have occured for the socket.
- */
-void Socket::HandleException(void)
-{
- ObjectLock olock(this);
-
-#ifndef _WIN32
- BOOST_THROW_EXCEPTION(socket_error()
- << boost::errinfo_api_function("select")
- << boost::errinfo_errno(GetError()));
-#else /* _WIN32 */
- BOOST_THROW_EXCEPTION(socket_error()
- << boost::errinfo_api_function("select")
- << errinfo_win32_error(GetError()));
-#endif /* _WIN32 */
-}
-
/**
* Formats a sockaddr in a human-readable way.
*
return GetAddressFromSockaddr((sockaddr *)&sin, len);
}
-/**
- * Read thread procedure for sockets. This function waits until the
- * socket is readable and processes inbound data.
- */
-void Socket::ReadThreadProc(void)
-{
- boost::mutex::scoped_lock lock(m_SocketMutex);
-
- for (;;) {
- fd_set readfds, exceptfds;
-
- FD_ZERO(&readfds);
- FD_ZERO(&exceptfds);
-
- int fd = GetFD();
-
- if (fd == INVALID_SOCKET)
- return;
-
- if (WantsToRead())
- FD_SET(fd, &readfds);
-
- FD_SET(fd, &exceptfds);
-
- lock.unlock();
-
- timeval tv;
- tv.tv_sec = 5;
- tv.tv_usec = 0;
- int rc = select(fd + 1, &readfds, NULL, &exceptfds, &tv);
-
- lock.lock();
-
- if (GetFD() == INVALID_SOCKET)
- return;
-
- try {
- if (rc < 0) {
-#ifndef _WIN32
- BOOST_THROW_EXCEPTION(socket_error()
- << boost::errinfo_api_function("select")
- << boost::errinfo_errno(errno));
-#else /* _WIN32 */
- BOOST_THROW_EXCEPTION(socket_error()
- << boost::errinfo_api_function("select")
- << errinfo_win32_error(WSAGetLastError()));
-#endif /* _WIN32 */
- }
-
- if (FD_ISSET(fd, &readfds))
- HandleReadable();
-
- if (FD_ISSET(fd, &exceptfds))
- HandleException();
- } catch (...) {
- SetException(boost::current_exception());
-
- CloseInternal();
-
- break;
- }
-
- if (WantsToWrite())
- m_WriteCV.notify_all(); /* notify Write thread */
- }
-}
-
-/**
- * Write thread procedure for sockets. This function waits until the socket
- * is writable and processes outbound data.
- */
-void Socket::WriteThreadProc(void)
-{
- boost::mutex::scoped_lock lock(m_SocketMutex);
-
- for (;;) {
- fd_set writefds;
-
- FD_ZERO(&writefds);
-
- while (!WantsToWrite()) {
- m_WriteCV.timed_wait(lock, boost::posix_time::seconds(1));
-
- if (GetFD() == INVALID_SOCKET)
- return;
- }
-
- int fd = GetFD();
-
- if (fd == INVALID_SOCKET)
- return;
-
- FD_SET(fd, &writefds);
-
- lock.unlock();
-
- int rc = select(fd + 1, NULL, &writefds, NULL, NULL);
-
- lock.lock();
-
- if (GetFD() == INVALID_SOCKET)
- return;
-
- try {
- if (rc < 0) {
-#ifndef _WIN32
- BOOST_THROW_EXCEPTION(socket_error()
- << boost::errinfo_api_function("select")
- << boost::errinfo_errno(errno));
-#else /* _WIN32 */
- BOOST_THROW_EXCEPTION(socket_error()
- << boost::errinfo_api_function("select")
- << errinfo_win32_error(WSAGetLastError()));
-#endif /* _WIN32 */
- }
-
- if (FD_ISSET(fd, &writefds))
- HandleWritable();
- } catch (...) {
- SetException(boost::current_exception());
-
- CloseInternal();
-
- break;
- }
- }
-}
-
-/**
- * Sets whether the socket is fully connected.
- *
- * @param connected Whether the socket is connected
- */
-void Socket::SetConnected(bool connected)
-{
- ObjectLock olock(this);
-
- m_Connected = connected;
-}
-
-/**
- * Checks whether the socket is fully connected.
- *
- * @returns true if the socket is connected, false otherwise
- */
-bool Socket::IsConnected(void) const
-{
- ObjectLock olock(this);
-
- return m_Connected;
-}
-
-/**
- * Returns how much data is available for reading.
- *
- * @returns The number of bytes available.
- */
-size_t Socket::GetAvailableBytes(void) const
-{
- ObjectLock olock(this);
-
- if (m_Listening)
- throw new std::logic_error("Socket does not support GetAvailableBytes().");
-
- return m_RecvQueue->GetAvailableBytes();
-}
-
-/**
- * Reads data from the socket.
- *
- * @param buffer The buffer where the data should be stored.
- * @param size The size of the buffer.
- * @returns The number of bytes read.
- */
-size_t Socket::Read(void *buffer, size_t size)
-{
- {
- ObjectLock olock(this);
-
- if (m_Listening)
- throw new std::logic_error("Socket does not support Read().");
- }
-
- if (m_RecvQueue->GetAvailableBytes() == 0)
- CheckException();
-
- return m_RecvQueue->Read(buffer, size);
-}
-
-/**
- * Peeks at data for the socket.
- *
- * @param buffer The buffer where the data should be stored.
- * @param size The size of the buffer.
- * @returns The number of bytes read.
- */
-size_t Socket::Peek(void *buffer, size_t size)
-{
- {
- ObjectLock olock(this);
-
- if (m_Listening)
- throw new std::logic_error("Socket does not support Peek().");
- }
-
- if (m_RecvQueue->GetAvailableBytes() == 0)
- CheckException();
-
- return m_RecvQueue->Peek(buffer, size);
-}
-
-/**
- * Writes data to the socket.
- *
- * @param buffer The buffer that should be sent.
- * @param size The size of the buffer.
- */
-void Socket::Write(const void *buffer, size_t size)
-{
- {
- ObjectLock olock(this);
-
- if (m_Listening)
- throw new std::logic_error("Socket does not support Write().");
- }
-
- m_SendQueue->Write(buffer, size);
-}
-
/**
* Starts listening for incoming client connections.
*/
<< errinfo_win32_error(WSAGetLastError()));
#endif /* _WIN32 */
}
-
- {
- ObjectLock olock(this);
- m_Listening = true;
- }
-}
-
-void Socket::HandleWritable(void)
-{
- if (m_Listening)
- HandleWritableServer();
- else
- HandleWritableClient();
-}
-
-void Socket::HandleReadable(void)
-{
- if (m_Listening)
- HandleReadableServer();
- else
- HandleReadableClient();
}
/**
* Processes data that is available for this socket.
*/
-void Socket::HandleWritableClient(void)
+size_t Socket::Write(const void *buffer, size_t count)
{
- int rc;
- char data[1024];
- size_t count;
-
- if (!IsConnected())
- SetConnected(true);
-
- for (;;) {
- count = m_SendQueue->Peek(data, sizeof(data));
-
- if (count == 0) {
- if (IsWriteEOF())
- CloseInternal();
+ int rc = send(GetFD(), buffer, count, 0);
- break;
- }
-
- rc = send(GetFD(), data, count, 0);
-
-#ifdef _WIN32
- if (rc < 0 && WSAGetLastError() == WSAEWOULDBLOCK)
-#else /* _WIN32 */
- if (rc < 0 && errno == EAGAIN)
-#endif /* _WIN32 */
- break;
-
- if (rc <= 0) {
+ if (rc < 0) {
#ifndef _WIN32
- BOOST_THROW_EXCEPTION(socket_error()
- << boost::errinfo_api_function("send")
- << boost::errinfo_errno(errno));
+ BOOST_THROW_EXCEPTION(socket_error()
+ << boost::errinfo_api_function("send")
+ << boost::errinfo_errno(errno));
#else /* _WIN32 */
- BOOST_THROW_EXCEPTION(socket_error()
- << boost::errinfo_api_function("send")
- << errinfo_win32_error(WSAGetLastError()));
+ BOOST_THROW_EXCEPTION(socket_error()
+ << boost::errinfo_api_function("send")
+ << errinfo_win32_error(WSAGetLastError()));
#endif /* _WIN32 */
- }
-
- m_SendQueue->Read(NULL, rc);
}
+
+ return rc;
}
/**
* Processes data that can be written for this socket.
*/
-void Socket::HandleReadableClient(void)
+size_t Socket::Read(void *buffer, size_t count)
{
- if (!IsConnected())
- SetConnected(true);
-
- bool new_data = false;
-
- for (;;) {
- char data[1024];
- int rc = recv(GetFD(), data, sizeof(data), 0);
+ int rc = recv(GetFD(), buffer, count, 0);
-#ifdef _WIN32
- if (rc < 0 && WSAGetLastError() == WSAEWOULDBLOCK)
-#else /* _WIN32 */
- if (rc < 0 && errno == EAGAIN)
-#endif /* _WIN32 */
- break;
-
- if (rc < 0) {
+ if (rc < 0) {
#ifndef _WIN32
- BOOST_THROW_EXCEPTION(socket_error()
- << boost::errinfo_api_function("recv")
- << boost::errinfo_errno(errno));
+ BOOST_THROW_EXCEPTION(socket_error()
+ << boost::errinfo_api_function("recv")
+ << boost::errinfo_errno(errno));
#else /* _WIN32 */
- BOOST_THROW_EXCEPTION(socket_error()
- << boost::errinfo_api_function("recv")
- << errinfo_win32_error(WSAGetLastError()));
+ BOOST_THROW_EXCEPTION(socket_error()
+ << boost::errinfo_api_function("recv")
+ << errinfo_win32_error(WSAGetLastError()));
#endif /* _WIN32 */
- }
-
- new_data = true;
-
- if (rc == 0) {
- SetReadEOF(true);
-
- break;
- }
-
- m_RecvQueue->Write(data, rc);
}
- if (new_data)
- OnDataAvailable(GetSelf());
-}
-
-void Socket::HandleWritableServer(void)
-{
- throw std::logic_error("This should never happen.");
+ return rc;
}
/**
- * Accepts a new client and creates a new client object for it
- * using the client factory function.
+ * Accepts a new client and creates a new client object for it.
*/
-void Socket::HandleReadableServer(void)
+Socket::Ptr Socket::Accept(void)
{
int fd;
sockaddr_storage addr;
fd = accept(GetFD(), (sockaddr *)&addr, &addrlen);
if (fd < 0) {
- #ifndef _WIN32
+#ifndef _WIN32
BOOST_THROW_EXCEPTION(socket_error()
<< boost::errinfo_api_function("accept")
<< boost::errinfo_errno(errno));
- #else /* _WIN32 */
+#else /* _WIN32 */
BOOST_THROW_EXCEPTION(socket_error()
<< boost::errinfo_api_function("accept")
<< errinfo_win32_error(WSAGetLastError()));
- #endif /* _WIN32 */
+#endif /* _WIN32 */
}
- Socket::Ptr client = boost::make_shared<Socket>();
- client->SetFD(fd);
- OnNewClient(GetSelf(), client);
-}
-
-/**
- * Checks whether data should be written for this socket object.
- *
- * @returns true if the socket should be registered for writing, false otherwise.
- */
-bool Socket::WantsToWrite(void) const
-{
- if (m_Listening)
- return WantsToWriteServer();
- else
- return WantsToWriteClient();
-}
-
-/**
- * Checks whether data should be read for this socket object.
- *
- * @returns true if the socket should be registered for reading, false otherwise.
- */
-bool Socket::WantsToRead(void) const
-{
- if (m_Listening)
- return WantsToReadServer();
- else
- return WantsToReadClient();
-}
-
-/**
- * Checks whether data should be read for this socket.
- *
- * @returns true
- */
-bool Socket::WantsToReadClient(void) const
-{
- return !IsReadEOF();
-}
-
-/**
- * Checks whether data should be written for this socket.
- *
- * @returns true if data should be written, false otherwise.
- */
-bool Socket::WantsToWriteClient(void) const
-{
- if (m_SendQueue->GetAvailableBytes() > 0)
- return true;
-
- return (!IsConnected());
-}
-
-/**
- * Checks whether the TCP server wants to write.
- *
- * @returns false
- */
-bool Socket::WantsToWriteServer(void) const
-{
- return false;
-}
-
-/**
- * Checks whether the TCP server wants to read (i.e. accept new clients).
- *
- * @returns true
- */
-bool Socket::WantsToReadServer(void) const
-{
- return true;
+ return boost::make_shared<Socket>(fd);
}
#define SOCKET_H
#include "base/i2-base.h"
-#include "base/fifo.h"
+#include "base/stream.h"
#include <boost/thread/thread.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/thread/condition_variable.hpp>
*
* @ingroup base
*/
-class I2_BASE_API Socket : public Stream
+class I2_BASE_API Socket : public Object
{
public:
typedef shared_ptr<Socket> Ptr;
typedef weak_ptr<Socket> WeakPtr;
Socket(void);
+ Socket(SOCKET fd);
~Socket(void);
- virtual void Start(void);
-
- virtual void Close(void);
+ void Close(void);
String GetClientAddress(void);
String GetPeerAddress(void);
- bool IsConnected(void) const;
-
- virtual size_t GetAvailableBytes(void) const;
- virtual size_t Read(void *buffer, size_t size);
- virtual size_t Peek(void *buffer, size_t size);
- virtual void Write(const void *buffer, size_t size);
+ size_t Read(void *buffer, size_t size);
+ size_t Write(const void *buffer, size_t size);
void Listen(void);
-
- boost::signals2::signal<void (const Socket::Ptr&, const Socket::Ptr&)> OnNewClient;
+ Socket::Ptr Accept(void);
protected:
void SetFD(SOCKET fd);
private:
SOCKET m_FD; /**< The socket descriptor. */
- bool m_Connected;
- bool m_Listening;
-
- boost::thread m_ReadThread;
- boost::thread m_WriteThread;
-
- boost::condition_variable m_WriteCV;
-
- void ReadThreadProc(void);
- void WriteThreadProc(void);
-
- void ExceptionEventHandler(void);
static String GetAddressFromSockaddr(sockaddr *address, socklen_t len);
-
- void CloseInternal(void);
-
- FIFO::Ptr m_SendQueue;
- FIFO::Ptr m_RecvQueue;
-
- void HandleWritableClient(void);
- void HandleReadableClient(void);
-
- void HandleWritableServer(void);
- void HandleReadableServer(void);
-
- void HandleReadable(void);
- void HandleWritable(void);
- void HandleException(void);
-
- bool WantsToWriteClient(void) const;
- bool WantsToReadClient(void) const;
-
- bool WantsToWriteServer(void) const;
- bool WantsToReadServer(void) const;
-
- bool WantsToWrite(void) const;
- bool WantsToRead(void) const;
};
class socket_error : virtual public std::exception, virtual public boost::exception { };
* the stream's destructor deletes the inner stream.
*/
StdioStream::StdioStream(std::iostream *innerStream, bool ownsStream)
- : m_InnerStream(innerStream), m_OwnsStream(ownsStream),
- m_ReadAheadBuffer(boost::make_shared<FIFO>())
-{
- m_ReadAheadBuffer->Start();
-}
-
-/**
- * Destructor for the StdioStream class.
- */
-StdioStream::~StdioStream(void)
-{
- m_ReadAheadBuffer->Close();
-}
-
-void StdioStream::Start(void)
-{
- SetConnected(true);
-
- Stream::Start();
-}
-
-size_t StdioStream::GetAvailableBytes(void) const
-{
- ObjectLock olock(this);
-
- if (m_InnerStream->eof() && m_ReadAheadBuffer->GetAvailableBytes() == 0)
- return 0;
- else
- return 1024; /* doesn't have to be accurate */
-}
+ : m_InnerStream(innerStream), m_OwnsStream(ownsStream)
+{ }
size_t StdioStream::Read(void *buffer, size_t size)
{
ObjectLock olock(this);
- size_t peek_len, read_len;
-
- peek_len = m_ReadAheadBuffer->GetAvailableBytes();
- peek_len = m_ReadAheadBuffer->Read(buffer, peek_len);
-
- m_InnerStream->read(static_cast<char *>(buffer) + peek_len, size - peek_len);
- read_len = m_InnerStream->gcount();
-
- return peek_len + read_len;
-}
-
-size_t StdioStream::Peek(void *buffer, size_t size)
-{
- ObjectLock olock(this);
-
- size_t peek_len, read_len;
-
- peek_len = m_ReadAheadBuffer->GetAvailableBytes();
- peek_len = m_ReadAheadBuffer->Peek(buffer, peek_len);
-
- m_InnerStream->read(static_cast<char *>(buffer) + peek_len, size - peek_len);
- read_len = m_InnerStream->gcount();
-
- m_ReadAheadBuffer->Write(static_cast<char *>(buffer) + peek_len, read_len);
- return peek_len + read_len;
+ m_InnerStream->read(static_cast<char *>(buffer), size);
+ return m_InnerStream->gcount();
}
void StdioStream::Write(const void *buffer, size_t size)
{
if (m_OwnsStream)
delete m_InnerStream;
-
- Stream::Close();
}
typedef weak_ptr<StdioStream> WeakPtr;
StdioStream(std::iostream *innerStream, bool ownsStream);
- ~StdioStream(void);
- virtual void Start(void);
-
- virtual size_t GetAvailableBytes(void) const;
virtual size_t Read(void *buffer, size_t size);
- virtual size_t Peek(void *buffer, size_t size);
virtual void Write(const void *buffer, size_t size);
virtual void Close(void);
private:
std::iostream *m_InnerStream;
bool m_OwnsStream;
- FIFO::Ptr m_ReadAheadBuffer;
};
}
using namespace icinga;
-Stream::Stream(void)
- : m_Connected(false), m_ReadEOF(false), m_WriteEOF(false)
-{ }
-
-Stream::~Stream(void)
-{
- ASSERT(!m_Running);
-}
-
-bool Stream::IsConnected(void) const
-{
- ObjectLock olock(this);
-
- return m_Connected;
-}
-
-bool Stream::IsReadEOF(void) const
-{
- ObjectLock olock(this);
-
- return m_ReadEOF;
-}
-
-bool Stream::IsWriteEOF(void) const
-{
- ObjectLock olock(this);
-
- return m_WriteEOF;
-}
-
-void Stream::SetConnected(bool connected)
-{
- bool changed;
-
- {
- ObjectLock olock(this);
- changed = (m_Connected != connected);
- m_Connected = connected;
- }
-
- if (changed) {
- if (connected)
- OnConnected(GetSelf());
- else
- OnClosed(GetSelf());
- }
-}
-
-void Stream::SetReadEOF(bool eof)
-{
- ObjectLock olock(this);
-
- m_ReadEOF = eof;
-}
-
-void Stream::SetWriteEOF(bool eof)
-{
- ObjectLock olock(this);
-
- m_WriteEOF = eof;
-}
-
-/**
- * Checks whether an exception is available for this stream and re-throws
- * the exception if there is one.
- */
-void Stream::CheckException(void)
-{
- ObjectLock olock(this);
-
- if (m_Exception)
- rethrow_exception(m_Exception);
-}
-
-void Stream::SetException(boost::exception_ptr exception)
-{
- ObjectLock olock(this);
-
- m_Exception = exception;
-}
-
-boost::exception_ptr Stream::GetException(void)
-{
- return m_Exception;
-}
-
-void Stream::Start(void)
-{
- ObjectLock olock(this);
-
- m_Running = true;
-}
-
-void Stream::Close(void)
-{
- {
- ObjectLock olock(this);
-
- m_Running = false;
- }
-
- SetConnected(false);
-}
-
bool Stream::ReadLine(String *line, size_t maxLength)
{
+ BOOST_THROW_EXCEPTION(std::runtime_error("Not implemented."));
+ /*
char *buffer = new char[maxLength];
size_t rc = Peek(buffer, maxLength);
return true;
}
- delete buffer;
+ delete buffer;*/
return false;
}
typedef shared_ptr<Stream> Ptr;
typedef weak_ptr<Stream> WeakPtr;
- Stream(void);
- ~Stream(void);
-
- virtual void Start(void);
-
- /**
- * Retrieves the number of bytes available for reading.
- *
- * @returns The number of available bytes.
- */
- virtual size_t GetAvailableBytes(void) const = 0;
-
- /**
- * Reads data from the stream without advancing the read pointer.
- *
- * @param buffer The buffer where data should be stored. May be NULL if
- * you're not actually interested in the data.
- * @param count The number of bytes to read from the queue.
- * @returns The number of bytes actually read.
- */
- virtual size_t Peek(void *buffer, size_t count) = 0;
-
/**
* Reads data from the stream.
*
/**
* Closes the stream and releases resources.
*/
- virtual void Close(void);
-
- bool IsConnected(void) const;
- bool IsReadEOF(void) const;
- bool IsWriteEOF(void) const;
+ virtual void Close(void) = 0;
bool ReadLine(String *line, size_t maxLength = 4096);
-
- boost::exception_ptr GetException(void);
- void CheckException(void);
-
- boost::signals2::signal<void (const Stream::Ptr&)> OnConnected;
- boost::signals2::signal<void (const Stream::Ptr&)> OnDataAvailable;
- boost::signals2::signal<void (const Stream::Ptr&)> OnClosed;
-
-protected:
- void SetConnected(bool connected);
- void SetReadEOF(bool eof);
- void SetWriteEOF(bool eof);
-
- void SetException(boost::exception_ptr exception);
-
-private:
- bool m_Running;
- bool m_Connected;
- bool m_ReadEOF;
- bool m_WriteEOF;
- boost::exception_ptr m_Exception;
};
}
{
I2Stream_bio_t *bp = (I2Stream_bio_t *)bi->ptr;
bp->StreamObj->Write(in, inl);
-
return inl;
}
* @param service The service.
* @param family The address family for the socket.
*/
-void TcpSocket::Bind(String service, int family)
+void TcpSocket::Bind(const String& service, int family)
{
Bind(String(), service, family);
}
* @param service The service.
* @param family The address family for the socket.
*/
-void TcpSocket::Bind(String node, String service, int family)
+void TcpSocket::Bind(const String& node, const String& service, int family)
{
addrinfo hints;
addrinfo *result;
if (fd == INVALID_SOCKET)
continue;
- SetFD(fd);
-
const int optFalse = 0;
setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, reinterpret_cast<const char *>(&optFalse), sizeof(optFalse));
int rc = bind(fd, info->ai_addr, info->ai_addrlen);
-#ifdef _WIN32
- if (rc < 0 && WSAGetLastError() != WSAEWOULDBLOCK) {
-#else /* _WIN32 */
- if (rc < 0 && errno != EINPROGRESS) {
-#endif /* _WIN32 */
+ if (rc < 0) {
closesocket(fd);
- SetFD(INVALID_SOCKET);
continue;
}
+ SetFD(fd);
+
break;
}
freeaddrinfo(result);
- if (fd == INVALID_SOCKET)
+ if (GetFD() == INVALID_SOCKET)
BOOST_THROW_EXCEPTION(std::runtime_error("Could not create a suitable socket."));
}
if (fd == INVALID_SOCKET)
continue;
- SetFD(fd);
-
rc = connect(fd, info->ai_addr, info->ai_addrlen);
-#ifdef _WIN32
- if (rc < 0 && WSAGetLastError() != WSAEWOULDBLOCK) {
-#else /* _WIN32 */
- if (rc < 0 && errno != EINPROGRESS) {
-#endif /* _WIN32 */
+ if (rc < 0) {
closesocket(fd);
- SetFD(INVALID_SOCKET);
continue;
}
- if (rc >= 0) {
- SetConnected(true);
- OnConnected(GetSelf());
- }
+ SetFD(fd);
break;
}
freeaddrinfo(result);
- if (fd == INVALID_SOCKET)
- BOOST_THROW_EXCEPTION(std::runtime_error("Could not create a suitable socket."));
+ if (GetFD() == INVALID_SOCKET)
+ BOOST_THROW_EXCEPTION(std::runtime_error("Could not connect to remote host."));
}
typedef shared_ptr<TcpSocket> Ptr;
typedef weak_ptr<TcpSocket> WeakPtr;
- void Bind(String service, int family);
- void Bind(String node, String service, int family);
+ void Bind(const String& service, int family);
+ void Bind(const String& node, const String& service, int family);
void Connect(const String& node, const String& service);
};
* @param sslContext The SSL context for the client.
*/
TlsStream::TlsStream(const Stream::Ptr& innerStream, TlsRole role, shared_ptr<SSL_CTX> sslContext)
- : m_SSLContext(sslContext), m_SendQueue(boost::make_shared<FIFO>()), m_RecvQueue(boost::make_shared<FIFO>()),
- m_InnerStream(innerStream), m_Role(role)
+ : m_SSLContext(sslContext), m_Role(role)
{
- m_InnerStream->OnDataAvailable.connect(boost::bind(&TlsStream::DataAvailableHandler, this));
- m_InnerStream->OnClosed.connect(boost::bind(&TlsStream::ClosedHandler, this));
- m_SendQueue->Start();
- m_RecvQueue->Start();
-}
-
-void TlsStream::Start(void)
-{
- {
- boost::mutex::scoped_lock lock(m_SSLMutex);
-
- m_SSL = shared_ptr<SSL>(SSL_new(m_SSLContext.get()), SSL_free);
-
- m_SSLContext.reset();
+ m_InnerStream = dynamic_pointer_cast<BufferedStream>(innerStream);
+
+ if (!m_InnerStream)
+ m_InnerStream = boost::make_shared<BufferedStream>(innerStream);
+
+ m_SSL = shared_ptr<SSL>(SSL_new(m_SSLContext.get()), SSL_free);
- if (!m_SSL) {
- BOOST_THROW_EXCEPTION(openssl_error()
- << boost::errinfo_api_function("SSL_new")
- << errinfo_openssl_error(ERR_get_error()));
- }
+ m_SSLContext.reset();
- if (!m_SSLIndexInitialized) {
- m_SSLIndex = SSL_get_ex_new_index(0, const_cast<char *>("TlsStream"), NULL, NULL, NULL);
- m_SSLIndexInitialized = true;
- }
+ if (!m_SSL) {
+ BOOST_THROW_EXCEPTION(openssl_error()
+ << boost::errinfo_api_function("SSL_new")
+ << errinfo_openssl_error(ERR_get_error()));
+ }
- SSL_set_ex_data(m_SSL.get(), m_SSLIndex, this);
+ if (!m_SSLIndexInitialized) {
+ m_SSLIndex = SSL_get_ex_new_index(0, const_cast<char *>("TlsStream"), NULL, NULL, NULL);
+ m_SSLIndexInitialized = true;
+ }
- SSL_set_verify(m_SSL.get(), SSL_VERIFY_PEER | SSL_VERIFY_FAIL_IF_NO_PEER_CERT, NULL);
+ SSL_set_ex_data(m_SSL.get(), m_SSLIndex, this);
- m_BIO = BIO_new_I2Stream(m_InnerStream);
- SSL_set_bio(m_SSL.get(), m_BIO, m_BIO);
+ SSL_set_verify(m_SSL.get(), SSL_VERIFY_PEER | SSL_VERIFY_FAIL_IF_NO_PEER_CERT, NULL);
- if (m_Role == TlsRoleServer)
- SSL_set_accept_state(m_SSL.get());
- else
- SSL_set_connect_state(m_SSL.get());
- }
+ m_BIO = BIO_new_I2Stream(m_InnerStream);
+ SSL_set_bio(m_SSL.get(), m_BIO, m_BIO);
- Stream::Start();
-
- HandleIO();
+ if (m_Role == TlsRoleServer)
+ SSL_set_accept_state(m_SSL.get());
+ else
+ SSL_set_connect_state(m_SSL.get());
}
/**
*/
shared_ptr<X509> TlsStream::GetClientCertificate(void) const
{
- boost::mutex::scoped_lock lock(m_SSLMutex);
-
return shared_ptr<X509>(SSL_get_certificate(m_SSL.get()), &Utility::NullDeleter);
}
*/
shared_ptr<X509> TlsStream::GetPeerCertificate(void) const
{
- boost::mutex::scoped_lock lock(m_SSLMutex);
-
return shared_ptr<X509>(SSL_get_peer_certificate(m_SSL.get()), X509_free);
}
-void TlsStream::DataAvailableHandler(void)
+void TlsStream::Handshake(void)
{
- try {
- HandleIO();
- } catch (...) {
- SetException(boost::current_exception());
+ ASSERT(!OwnsLock());
- Close();
- }
-}
+ int rc;
-void TlsStream::ClosedHandler(void)
-{
ObjectLock olock(this);
- SetException(m_InnerStream->GetException());
- Close();
+ while ((rc = SSL_do_handshake(m_SSL.get())) <= 0) {
+ switch (SSL_get_error(m_SSL.get(), rc)) {
+ case SSL_ERROR_WANT_READ:
+ olock.Unlock();
+ m_InnerStream->WaitReadable(1);
+ olock.Lock();
+ continue;
+ case SSL_ERROR_WANT_WRITE:
+ olock.Unlock();
+ m_InnerStream->WaitWritable(1);
+ olock.Lock();
+ continue;
+ case SSL_ERROR_ZERO_RETURN:
+ Close();
+ return;
+ default:
+ I2Stream_check_exception(m_BIO);
+ BOOST_THROW_EXCEPTION(openssl_error()
+ << boost::errinfo_api_function("SSL_read")
+ << errinfo_openssl_error(ERR_get_error()));
+ }
+ }
}
/**
* Processes data for the stream.
*/
-void TlsStream::HandleIO(void)
+size_t TlsStream::Read(void *buffer, size_t count)
{
ASSERT(!OwnsLock());
- char data[16 * 1024];
- int rc;
-
- if (!IsConnected()) {
- boost::mutex::scoped_lock lock(m_SSLMutex);
+ size_t left = count;
- rc = SSL_do_handshake(m_SSL.get());
+ ObjectLock olock(this);
- if (rc == 1) {
- lock.unlock();
+ while (left > 0) {
+ int rc = SSL_read(m_SSL.get(), ((char *)buffer) + (count - left), left);
- SetConnected(true);
- } else {
+ if (rc <= 0) {
switch (SSL_get_error(m_SSL.get(), rc)) {
- case SSL_ERROR_WANT_WRITE:
- /* fall through */
case SSL_ERROR_WANT_READ:
- return;
- case SSL_ERROR_ZERO_RETURN:
- Close();
- return;
- default:
- I2Stream_check_exception(m_BIO);
- BOOST_THROW_EXCEPTION(openssl_error()
- << boost::errinfo_api_function("SSL_do_handshake")
- << errinfo_openssl_error(ERR_get_error()));
- }
- }
- }
-
- bool new_data = false, read_ok = true;
-
- while (read_ok) {
- boost::mutex::scoped_lock lock(m_SSLMutex);
-
- rc = SSL_read(m_SSL.get(), data, sizeof(data));
-
- if (rc > 0) {
- lock.unlock();
-
- ObjectLock olock(this);
- m_RecvQueue->Write(data, rc);
- new_data = true;
- } else {
- switch (SSL_get_error(m_SSL.get(), rc)) {
+ olock.Unlock();
+ m_InnerStream->WaitReadable(1);
+ olock.Lock();
+ continue;
case SSL_ERROR_WANT_WRITE:
- /* fall through */
- case SSL_ERROR_WANT_READ:
- read_ok = false;
- break;
+ olock.Unlock();
+ m_InnerStream->WaitWritable(1);
+ olock.Lock();
+ continue;
case SSL_ERROR_ZERO_RETURN:
Close();
- return;
+ return count - left;
default:
I2Stream_check_exception(m_BIO);
BOOST_THROW_EXCEPTION(openssl_error()
<< errinfo_openssl_error(ERR_get_error()));
}
}
- }
-
- if (new_data)
- OnDataAvailable(GetSelf());
-
- ObjectLock olock(this);
-
- while (m_SendQueue->GetAvailableBytes() > 0) {
- size_t count = m_SendQueue->GetAvailableBytes();
- if (count == 0)
- break;
-
- if (count > sizeof(data))
- count = sizeof(data);
+ left -= rc;
+ }
- m_SendQueue->Peek(data, count);
+ return count;
+}
- olock.Unlock();
+void TlsStream::Write(const void *buffer, size_t count)
+{
+ ASSERT(!OwnsLock());
- boost::mutex::scoped_lock lock(m_SSLMutex);
+ size_t left = count;
- rc = SSL_write(m_SSL.get(), (const char *)data, count);
+ ObjectLock olock(this);
- if (rc > 0) {
- lock.unlock();
+ while (left > 0) {
+ int rc = SSL_write(m_SSL.get(), ((const char *)buffer) + (count - left), left);
- olock.Lock();
- m_SendQueue->Read(NULL, rc);
- } else {
+ if (rc <= 0) {
switch (SSL_get_error(m_SSL.get(), rc)) {
case SSL_ERROR_WANT_READ:
- /* fall through */
+ olock.Unlock();
+ m_InnerStream->WaitReadable(1);
+ olock.Lock();
+ continue;
case SSL_ERROR_WANT_WRITE:
- return;
+ olock.Unlock();
+ m_InnerStream->WaitWritable(1);
+ olock.Lock();
+ continue;
case SSL_ERROR_ZERO_RETURN:
Close();
return;
<< errinfo_openssl_error(ERR_get_error()));
}
}
+
+ left -= rc;
}
}
*/
void TlsStream::Close(void)
{
- {
- boost::mutex::scoped_lock lock(m_SSLMutex);
-
- if (m_SSL)
- SSL_shutdown(m_SSL.get());
- }
-
- {
- ObjectLock olock(this);
-
- m_SendQueue->Close();
- m_RecvQueue->Close();
- }
-
- Stream::Close();
-}
-
-size_t TlsStream::GetAvailableBytes(void) const
-{
- ObjectLock olock(this);
-
- return m_RecvQueue->GetAvailableBytes();
-}
-
-size_t TlsStream::Peek(void *buffer, size_t count)
-{
- ObjectLock olock(this);
-
- return m_RecvQueue->Peek(buffer, count);
-}
-
-size_t TlsStream::Read(void *buffer, size_t count)
-{
- ObjectLock olock(this);
-
- return m_RecvQueue->Read(buffer, count);
-}
-
-void TlsStream::Write(const void *buffer, size_t count)
-{
- {
- ObjectLock olock(this);
-
- m_SendQueue->Write(buffer, count);
- }
-
- Utility::QueueAsyncCallback(boost::bind(&TlsStream::HandleIO, this));
+ m_InnerStream->Close();
}
#define TLSSTREAM_H
#include "base/i2-base.h"
+#include "base/bufferedstream.h"
#include "base/stream.h"
#include "base/fifo.h"
#include "base/tlsutility.h"
shared_ptr<X509> GetClientCertificate(void) const;
shared_ptr<X509> GetPeerCertificate(void) const;
- virtual void Start(void);
+ void Handshake(void);
+
virtual void Close(void);
- virtual size_t GetAvailableBytes(void) const;
- virtual size_t Peek(void *buffer, size_t count);
virtual size_t Read(void *buffer, size_t count);
virtual void Write(const void *buffer, size_t count);
private:
shared_ptr<SSL_CTX> m_SSLContext;
shared_ptr<SSL> m_SSL;
- mutable boost::mutex m_SSLMutex;
BIO *m_BIO;
- FIFO::Ptr m_SendQueue;
- FIFO::Ptr m_RecvQueue;
-
- Stream::Ptr m_InnerStream;
+ BufferedStream::Ptr m_InnerStream;
TlsRole m_Role;
static int m_SSLIndex;
static bool m_SSLIndexInitialized;
- void DataAvailableHandler(void);
- void ClosedHandler(void);
-
- void HandleIO(void);
-
static void NullCertificateDeleter(X509 *certificate);
};
cJSON *json = cJSON_Parse(jsonString.CStr());
if (!json)
- BOOST_THROW_EXCEPTION(std::runtime_error("Invalid JSON String"));
+ BOOST_THROW_EXCEPTION(std::runtime_error("Invalid JSON String: " + jsonString));
Value value = FromJson(json);
cJSON_Delete(json);
endpointmanager.cpp \
endpointmanager.h \
i2-remoting.h \
- jsonrpcconnection.cpp \
- jsonrpcconnection.h \
+ jsonrpc.cpp \
+ jsonrpc.h \
messagepart.cpp \
messagepart.h \
remoting-type.cpp \
#include "remoting/endpoint.h"
#include "remoting/endpointmanager.h"
+#include "remoting/jsonrpc.h"
#include "base/application.h"
#include "base/dynamictype.h"
#include "base/objectlock.h"
if (IsLocalEndpoint()) {
return true;
} else {
- JsonRpcConnection::Ptr client = GetClient();
-
- return (client && client->GetStream()->IsConnected());
+ return GetClient();
}
}
-JsonRpcConnection::Ptr Endpoint::GetClient(void) const
+Stream::Ptr Endpoint::GetClient(void) const
{
ObjectLock olock(this);
return m_Client;
}
-void Endpoint::SetClient(const JsonRpcConnection::Ptr& client)
+void Endpoint::SetClient(const Stream::Ptr& client)
{
- client->OnNewMessage.connect(boost::bind(&Endpoint::NewMessageHandler, this, _2));
- client->OnClosed.connect(boost::bind(&Endpoint::ClientClosedHandler, this));
-
{
ObjectLock olock(this);
m_Client = client;
}
+ boost::thread thread(boost::bind(&Endpoint::MessageThreadProc, this, client));
+ thread.detach();
+
OnConnected(GetSelf());
}
Utility::QueueAsyncCallback(boost::bind(boost::ref(*it->second), GetSelf(), sender, request));
} else {
- GetClient()->SendMessage(request);
+ JsonRpc::SendMessage(GetClient(), request);
}
}
if (IsLocalEndpoint())
EndpointManager::GetInstance()->ProcessResponseMessage(sender, response);
- else
- GetClient()->SendMessage(response);
-}
-
-void Endpoint::NewMessageHandler(const MessagePart& message)
-{
- Endpoint::Ptr sender = GetSelf();
-
- if (ResponseMessage::IsResponseMessage(message)) {
- /* rather than routing the message to the right virtual
- * endpoint we just process it here right away. */
- EndpointManager::GetInstance()->ProcessResponseMessage(sender, message);
- return;
+ else {
+ JsonRpc::SendMessage(GetClient(), response);
}
-
- RequestMessage request = message;
-
- String method;
- if (!request.GetMethod(&method))
- return;
-
- String id;
- if (request.GetID(&id))
- EndpointManager::GetInstance()->SendAnycastMessage(sender, request);
- else
- EndpointManager::GetInstance()->SendMulticastMessage(sender, request);
}
-void Endpoint::ClientClosedHandler(void)
+void Endpoint::MessageThreadProc(const Stream::Ptr& stream)
{
- ASSERT(!OwnsLock());
-
- /*try {
- GetClient()->CheckException();
- } catch (const exception& ex) {
- stringstream message;
- message << "Error occured for JSON-RPC socket: Message=" << diagnostic_information(ex);
-
- Log(LogWarning, "jsonrpc", message.str());
- }*/
-
- Log(LogWarning, "jsonrpc", "Lost connection to endpoint: identity=" + GetName());
-
- {
- ObjectLock olock(this);
-
- // TODO: _only_ clear non-persistent subscriptions
- // unregister ourselves if no persistent subscriptions are left (use a
- // timer for that, once we have a TTL property for the topics)
- ClearSubscriptions();
-
- m_Client.reset();
+ try {
+ for (;;) {
+ MessagePart message = JsonRpc::ReadMessage(stream);
+ Endpoint::Ptr sender = GetSelf();
+
+ if (ResponseMessage::IsResponseMessage(message)) {
+ /* rather than routing the message to the right virtual
+ * endpoint we just process it here right away. */
+ EndpointManager::GetInstance()->ProcessResponseMessage(sender, message);
+ return;
+ }
+
+ RequestMessage request = message;
+
+ String method;
+ if (!request.GetMethod(&method))
+ return;
+
+ String id;
+ if (request.GetID(&id))
+ EndpointManager::GetInstance()->SendAnycastMessage(sender, request);
+ else
+ EndpointManager::GetInstance()->SendMulticastMessage(sender, request);
+ }
+ } catch (const std::exception& ex) {
+ Log(LogWarning, "jsonrpc", "Lost connection to endpoint '" + GetName() + "': " + boost::diagnostic_information(ex));
+
+ {
+ ObjectLock olock(this);
+
+ // TODO: _only_ clear non-persistent subscriptions
+ // unregister ourselves if no persistent subscriptions are left (use a
+ // timer for that, once we have a TTL property for the topics)
+ ClearSubscriptions();
+
+ m_Client.reset();
+ }
+
+ OnDisconnected(GetSelf());
}
-
- OnDisconnected(GetSelf());
}
/**
#include "remoting/i2-remoting.h"
#include "remoting/requestmessage.h"
#include "remoting/responsemessage.h"
-#include "remoting/jsonrpcconnection.h"
#include "base/dynamicobject.h"
+#include "base/stream.h"
#include <boost/signals2.hpp>
namespace icinga
static Endpoint::Ptr GetByName(const String& name);
- JsonRpcConnection::Ptr GetClient(void) const;
- void SetClient(const JsonRpcConnection::Ptr& client);
+ Stream::Ptr GetClient(void) const;
+ void SetClient(const Stream::Ptr& client);
void RegisterSubscription(const String& topic);
void UnregisterSubscription(const String& topic);
Attribute<String> m_Node;
Attribute<String> m_Service;
- JsonRpcConnection::Ptr m_Client;
+ Stream::Ptr m_Client;
bool m_ReceivedWelcome; /**< Have we received a welcome message
from this endpoint? */
std::map<String, shared_ptr<boost::signals2::signal<Callback> > > m_TopicHandlers;
- void NewMessageHandler(const MessagePart& message);
- void ClientClosedHandler(void);
+ void MessageThreadProc(const Stream::Ptr& stream);
};
}
#include "base/convert.h"
#include "base/utility.h"
#include "base/tlsutility.h"
+#include "base/networkstream.h"
#include <boost/tuple/tuple.hpp>
#include <boost/foreach.hpp>
Log(LogInformation, "icinga", s.str());
TcpSocket::Ptr server = boost::make_shared<TcpSocket>();
+ server->Bind(service, AF_INET6);
+
+ boost::thread thread(boost::bind(&EndpointManager::ListenerThreadProc, this, server));
+ thread.detach();
m_Servers.insert(server);
- server->OnNewClient.connect(boost::bind(&EndpointManager::NewClientHandler,
- this, _2, TlsRoleServer));
+}
- server->Bind(service, AF_INET6);
+void EndpointManager::ListenerThreadProc(const Socket::Ptr& server)
+{
server->Listen();
- server->Start();
+
+ for (;;) {
+ Socket::Ptr client = server->Accept();
+
+ try {
+ NewClientHandler(client, TlsRoleServer);
+ } catch (const std::exception& ex) {
+ std::stringstream message;
+ message << "Error for new JSON-RPC socket: " << boost::diagnostic_information(ex);
+ Log(LogInformation, "remoting", message.str());
+ }
+ }
}
/**
* @param service The remote port.
*/
void EndpointManager::AddConnection(const String& node, const String& service) {
- ObjectLock olock(this);
+ {
+ ObjectLock olock(this);
- shared_ptr<SSL_CTX> sslContext = m_SSLContext;
+ shared_ptr<SSL_CTX> sslContext = m_SSLContext;
- if (!sslContext)
- BOOST_THROW_EXCEPTION(std::logic_error("SSL context is required for AddConnection()"));
+ if (!sslContext)
+ BOOST_THROW_EXCEPTION(std::logic_error("SSL context is required for AddConnection()"));
+ }
TcpSocket::Ptr client = boost::make_shared<TcpSocket>();
- client->Connect(node, service);
- NewClientHandler(client, TlsRoleClient);
+
+ try {
+ client->Connect(node, service);
+ NewClientHandler(client, TlsRoleClient);
+ } catch (const std::exception& ex) {
+ Log(LogInformation, "remoting", "Could not connect to " + node + ":" + service + ": " + ex.what());
+ }
}
/**
*/
void EndpointManager::NewClientHandler(const Socket::Ptr& client, TlsRole role)
{
- TlsStream::Ptr tlsStream = boost::make_shared<TlsStream>(client, role, m_SSLContext);
-
- m_PendingClients.insert(tlsStream);
- tlsStream->OnConnected.connect(boost::bind(&EndpointManager::ClientConnectedHandler, this, _1));
- tlsStream->OnClosed.connect(boost::bind(&EndpointManager::ClientClosedHandler, this, _1));
-
- client->Start();
- tlsStream->Start();
-}
-
-void EndpointManager::ClientConnectedHandler(const Stream::Ptr& client)
-{
- TlsStream::Ptr tlsStream = static_pointer_cast<TlsStream>(client);
- JsonRpcConnection::Ptr jclient = boost::make_shared<JsonRpcConnection>(tlsStream);
+ NetworkStream::Ptr netStream = boost::make_shared<NetworkStream>(client);
- {
- ObjectLock olock(this);
- m_PendingClients.erase(tlsStream);
- }
+ TlsStream::Ptr tlsStream = boost::make_shared<TlsStream>(netStream, role, m_SSLContext);
+ tlsStream->Handshake();
shared_ptr<X509> cert = tlsStream->GetPeerCertificate();
String identity = GetCertificateCN(cert);
if (!endpoint)
endpoint = Endpoint::MakeEndpoint(identity, true);
- endpoint->SetClient(jclient);
-}
-
-void EndpointManager::ClientClosedHandler(const Stream::Ptr& client)
-{
- TlsStream::Ptr tlsStream = static_pointer_cast<TlsStream>(client);
-
- {
- ObjectLock olock(this);
- m_PendingClients.erase(tlsStream);
- }
+ endpoint->SetClient(tlsStream);
}
/**
Timer::Ptr m_ReconnectTimer;
std::set<TcpSocket::Ptr> m_Servers;
- std::set<TlsStream::Ptr> m_PendingClients;
/**
* Information about a pending API request.
void ReconnectTimerHandler(void);
void NewClientHandler(const Socket::Ptr& client, TlsRole role);
- void ClientConnectedHandler(const Stream::Ptr& client);
- void ClientClosedHandler(const Stream::Ptr& client);
+
+ void ListenerThreadProc(const Socket::Ptr& server);
};
}
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
******************************************************************************/
-#include "remoting/jsonrpcconnection.h"
+#include "remoting/jsonrpc.h"
#include "base/netstring.h"
#include "base/objectlock.h"
#include "base/logger_fwd.h"
#include <boost/exception/diagnostic_information.hpp>
+#include <iostream>
using namespace icinga;
-/**
- * Constructor for the JsonRpcConnection class.
- *
- * @param stream The stream.
- */
-JsonRpcConnection::JsonRpcConnection(const Stream::Ptr& stream)
- : Connection(stream)
-{ }
-
/**
* Sends a message to the connected peer.
*
* @param message The message.
*/
-void JsonRpcConnection::SendMessage(const MessagePart& message)
+void JsonRpc::SendMessage(const Stream::Ptr& stream, const MessagePart& message)
{
- ObjectLock olock(this);
-
Value value = message.GetDictionary();
String json = value.Serialize();
//std::cerr << ">> " << json << std::endl;
- NetString::WriteStringToStream(GetStream(), json);
+ NetString::WriteStringToStream(stream, json);
}
-/**
- * Processes inbound data.
- */
-void JsonRpcConnection::ProcessData(void)
+MessagePart JsonRpc::ReadMessage(const Stream::Ptr& stream)
{
- ObjectLock olock(this);
-
String jsonString;
+ if (!NetString::ReadStringFromStream(stream, &jsonString))
+ BOOST_THROW_EXCEPTION(std::runtime_error("ReadStringFromStream signalled EOF."));
- while (NetString::ReadStringFromStream(GetStream(), &jsonString)) {
- //std::cerr << "<< " << jsonString << std::endl;
-
- try {
- Value value = Value::Deserialize(jsonString);
+ //std::cerr << "<< " << jsonString << std::endl;
+ Value value = Value::Deserialize(jsonString);
- if (!value.IsObjectType<Dictionary>()) {
- BOOST_THROW_EXCEPTION(std::invalid_argument("JSON-RPC"
- " message must be a dictionary."));
- }
-
- MessagePart mp(value);
- OnNewMessage(GetSelf(), mp);
- } catch (const std::exception& ex) {
- Log(LogCritical, "remoting", "Exception"
- " while processing message from JSON-RPC client: " +
- boost::diagnostic_information(ex));
- }
+ if (!value.IsObjectType<Dictionary>()) {
+ BOOST_THROW_EXCEPTION(std::invalid_argument("JSON-RPC"
+ " message must be a dictionary."));
}
+
+ return MessagePart(value);
}
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
******************************************************************************/
-#ifndef LIVESTATUSCONNECTION_H
-#define LIVESTATUSCONNECTION_H
+#ifndef JSONRPC_H
+#define JSONRPC_H
-#include "base/connection.h"
+#include "remoting/i2-remoting.h"
+#include "remoting/messagepart.h"
+#include "base/stream.h"
-using namespace icinga;
-
-namespace livestatus
+namespace icinga
{
-class LivestatusConnection : public Connection
+/**
+ * A JSON-RPC connection.
+ *
+ * @ingroup remoting
+ */
+class I2_REMOTING_API JsonRpc
{
public:
- typedef shared_ptr<LivestatusConnection> Ptr;
- typedef weak_ptr<LivestatusConnection> WeakPtr;
-
- LivestatusConnection(const Stream::Ptr& stream);
-
-protected:
- std::vector<String> m_Lines;
+ static void SendMessage(const Stream::Ptr& stream, const MessagePart& message);
+ static MessagePart ReadMessage(const Stream::Ptr& stream);
- virtual void ProcessData(void);
+private:
+ JsonRpc(void);
};
}
-#endif /* LIVESTATUSCONNECTION_H */
+#endif /* JSONRPC_H */
{
public:
MessagePart(void);
+ MessagePart(const MessagePart& message);
explicit MessagePart(const Dictionary::Ptr& dictionary);
- explicit MessagePart(const MessagePart& message);
Dictionary::Ptr GetDictionary(void) const;