From 321d66023f0e4f2458b6ae1c914fca59ea1b9c2d Mon Sep 17 00:00:00 2001 From: Gunnar Beutner Date: Thu, 22 Nov 2012 12:04:32 +0100 Subject: [PATCH] Refactored the socket classes. --- components/compatido/Makefile.am | 8 +- components/compatido/compatido.vcxproj | 4 +- .../compatido/compatido.vcxproj.filters | 6 +- components/compatido/compatidocomponent.cpp | 509 +++++++----------- components/compatido/compatidocomponent.h | 19 +- components/compatido/i2-compatido.h | 2 +- components/compatido/idoconnection.cpp | 59 ++ .../compatido/idoconnection.h | 27 +- components/compatido/idosocket.cpp | 147 ----- components/compatido/idosocket.h | 68 --- components/demo/democomponent.cpp | 3 +- lib/base/Makefile.am | 15 +- lib/base/base.vcxproj | 17 +- lib/base/base.vcxproj.filters | 51 +- lib/base/connection.cpp | 44 ++ lib/base/{tcpserver.h => connection.h} | 37 +- lib/base/dynamicobject.cpp | 11 +- lib/base/fifo.cpp | 24 +- lib/base/fifo.h | 12 +- lib/base/i2-base.h | 8 +- lib/base/netstring.cpp | 30 +- lib/base/netstring.h | 4 +- lib/base/socket.cpp | 358 +++++++++--- lib/base/socket.h | 54 +- lib/base/stream.cpp | 79 +++ lib/base/{ioqueue.h => stream.h} | 60 ++- lib/base/stream_bio.cpp | 121 +++++ .../jsonrpcserver.h => base/stream_bio.h} | 21 +- lib/base/tcpclient.cpp | 256 --------- lib/base/tcpclient.h | 90 ---- lib/base/tcpserver.cpp | 89 --- lib/base/tcpsocket.cpp | 75 ++- lib/base/tcpsocket.h | 3 +- lib/base/{tlsclient.cpp => tlsstream.cpp} | 240 ++++----- lib/base/{tlsclient.h => tlsstream.h} | 51 +- lib/remoting/Makefile.am | 6 +- lib/remoting/endpoint.cpp | 31 +- lib/remoting/endpoint.h | 6 +- lib/remoting/endpointmanager.cpp | 35 +- lib/remoting/endpointmanager.h | 8 +- lib/remoting/i2-remoting.h | 3 +- ...sonrpcclient.cpp => jsonrpcconnection.cpp} | 39 +- .../{jsonrpcclient.h => jsonrpcconnection.h} | 26 +- lib/remoting/remoting.vcxproj | 6 +- lib/remoting/remoting.vcxproj.filters | 18 +- 45 files changed, 1294 insertions(+), 1486 deletions(-) create mode 100644 components/compatido/idoconnection.cpp rename lib/remoting/jsonrpcserver.cpp => components/compatido/idoconnection.h (78%) delete mode 100644 components/compatido/idosocket.cpp delete mode 100644 components/compatido/idosocket.h create mode 100644 lib/base/connection.cpp rename lib/base/{tcpserver.h => connection.h} (68%) create mode 100644 lib/base/stream.cpp rename lib/base/{ioqueue.h => stream.h} (66%) create mode 100644 lib/base/stream_bio.cpp rename lib/{remoting/jsonrpcserver.h => base/stream_bio.h} (81%) delete mode 100644 lib/base/tcpclient.cpp delete mode 100644 lib/base/tcpclient.h delete mode 100644 lib/base/tcpserver.cpp rename lib/base/{tlsclient.cpp => tlsstream.cpp} (50%) rename lib/base/{tlsclient.h => tlsstream.h} (69%) rename lib/remoting/{jsonrpcclient.cpp => jsonrpcconnection.cpp} (68%) rename lib/remoting/{jsonrpcclient.h => jsonrpcconnection.h} (70%) diff --git a/components/compatido/Makefile.am b/components/compatido/Makefile.am index becebf3d0..c81e0cb6f 100644 --- a/components/compatido/Makefile.am +++ b/components/compatido/Makefile.am @@ -4,11 +4,11 @@ pkglib_LTLIBRARIES = \ compatido.la compatido_la_SOURCES = \ - i2-compatido.h \ - idosocket.cpp \ - idosocket.h \ compatidocomponent.cpp \ - compatidocomponent.h + compatidocomponent.h \ + i2-compatido.h \ + idoconnection.cpp \ + idoconnection.h compatido_la_CPPFLAGS = \ $(BOOST_CPPFLAGS) \ diff --git a/components/compatido/compatido.vcxproj b/components/compatido/compatido.vcxproj index 5e00eb389..f2d2e3f3d 100644 --- a/components/compatido/compatido.vcxproj +++ b/components/compatido/compatido.vcxproj @@ -151,12 +151,12 @@ - + - + diff --git a/components/compatido/compatido.vcxproj.filters b/components/compatido/compatido.vcxproj.filters index 838aa7ef7..7f00fa250 100644 --- a/components/compatido/compatido.vcxproj.filters +++ b/components/compatido/compatido.vcxproj.filters @@ -14,7 +14,7 @@ Source Files - + Source Files @@ -25,8 +25,8 @@ Header Files - + Header Files - + \ No newline at end of file diff --git a/components/compatido/compatidocomponent.cpp b/components/compatido/compatidocomponent.cpp index f61b0cbe1..4c2152f81 100644 --- a/components/compatido/compatidocomponent.cpp +++ b/components/compatido/compatidocomponent.cpp @@ -38,13 +38,13 @@ const int CompatIdoComponent::DefaultReconnectInterval = 15; String CompatIdoComponent::GetSocketAddress(void) const { Value address = GetConfig()->Get("socket_address"); - if(address.IsEmpty()) + + if (address.IsEmpty()) return DefaultSocketAddress; else return address; } - /** * Reads the socket port from the config * @returns port The config option, or static default @@ -52,7 +52,8 @@ String CompatIdoComponent::GetSocketAddress(void) const String CompatIdoComponent::GetSocketPort(void) const { Value port = GetConfig()->Get("socket_port"); - if(port.IsEmpty()) + + if (port.IsEmpty()) return DefaultSocketPort; else return port; @@ -65,7 +66,8 @@ String CompatIdoComponent::GetSocketPort(void) const String CompatIdoComponent::GetInstanceName(void) const { Value instance = GetConfig()->Get("instance_name"); - if(instance.IsEmpty()) + + if (instance.IsEmpty()) return DefaultInstanceName; else return instance; @@ -75,34 +77,16 @@ String CompatIdoComponent::GetInstanceName(void) const * Reads the reconnect interval from the config * @returns reconnect_interval The config option, or static default */ -int CompatIdoComponent::GetReconnectInterval(void) const +double CompatIdoComponent::GetReconnectInterval(void) const { - Value interval = GetConfig()->Get("reconnect_interval"); - if(interval.IsEmpty()) - return DefaultReconnectInterval; - else - return interval; -} + Value interval = GetConfig()->Get("reconnect_interval"); -/** - * Sets config dump in progress state - */ -void CompatIdoComponent::SetConfigDumpInProgress(bool state) -{ - m_ConfigDumpInProgress = state; + if (interval.IsEmpty()) + return DefaultReconnectInterval; + else + return interval; } -/** - * Get state of config in progress - * - * @returns state bis config dump in progress. - */ -bool CompatIdoComponent::GetConfigDumpInProgress(void) -{ - return m_ConfigDumpInProgress; -} - - /** * Starts the component. */ @@ -111,7 +95,7 @@ void CompatIdoComponent::Start(void) const int StatusTimerInterval = 60; const int ConfigTimerInterval = 3600; const int ProgramStatusTimerInterval = 15; - const int ReconnectTimerInterval = GetReconnectInterval(); + const double ReconnectTimerInterval = GetReconnectInterval(); /* FIXME - make this a config option when unix sockets are realdy */ @@ -125,12 +109,7 @@ void CompatIdoComponent::Start(void) /* * open ido socket once */ - OpenIdoSocket(IdoSocketType); - - /* - * tell ido2db that we just started - */ - SendStartProcess(); + OpenIdoSocket(); /* * ddump the config later (can't do that within start of the component) @@ -171,7 +150,6 @@ void CompatIdoComponent::Start(void) m_ReconnectTimer->Start(); } - /** * Stops the component. */ @@ -180,24 +158,58 @@ void CompatIdoComponent::Stop(void) CloseIdoSocket(); } - /** * Opens the ido socket, and sends hello to ido2db */ -void CompatIdoComponent::OpenIdoSocket(bool sockettype) +void CompatIdoComponent::OpenIdoSocket(void) { - OpenSink(GetSocketAddress(), GetSocketPort()); - SendHello(GetInstanceName(), sockettype); + TcpSocket::Ptr socket = boost::make_shared(); + socket->Connect(GetSocketAddress(), GetSocketPort()); + socket->Start(); - m_IdoSocket->SetSocketType(sockettype); - /* - * if we're connected, do not reconnecte - */ - if(m_IdoSocket->IsConnected()) { - m_IdoSocket->SetReconnect(false); - } else { - m_IdoSocket->SetReconnect(true); - } + m_IdoConnection = boost::make_shared(socket); + m_IdoConnection->OnClosed.connect(boost::bind(&CompatIdoComponent::SocketDisconnectHandler, this)); + + /* FIXME */ +#define COMPATIDO_PROTOCOL 2 +#define COMPATIDO_NAME "ICINGA2 COMPATIDO" +#define COMPATIDO_RELEASE_VERSION "2.0" + + /* connection is always TCP */ + /* connecttype is always initial */ + stringstream msgHello; + msgHello << "\n\n" + << "HELLO" << "\n" + << "PROTOCOL" << ": " << COMPATIDO_PROTOCOL<< "\n" + << "AGENT" << ": " << COMPATIDO_NAME << "\n" + << "AGENTVERSION" << ": " << VERSION << "\n" + << "STARTTIME" << ": " << static_cast(Utility::GetTime()) << "\n" + << "DISPOSITION" << ": " << "REALTIME" << "\n" + << "CONNECTION" << ": " << "TCPSOCKET" << "\n" + << "INSTANCENAME" << ": " << GetInstanceName() << "\n" + << "STARTDATADUMP" + << "\n\n"; + + m_IdoConnection->SendMessage(msgHello.str()); + +/* TODO */ +#define PROGRAM_MODIFICATION_DATE "10-17-2012" +#define PROGRAM_RELEASE_VERSION "2.0" + + stringstream msgProcessData; + msgProcessData << "\n" + << 200 << "\n" /* processdata */ + << 1 << "=" << 104 << "\n" /* type = pprocess prelaunch */ + << 2 << "=" << "" << "\n" /* flags */ + << 3 << "=" << "" << "\n" /* attributes */ + << 4 << "=" << std::setprecision(17) << Utility::GetTime() << "\n" /* timestamp */ + << 105 << "=" << "Icinga2" << "\n" /* progranname */ + << 107 << "=" << PROGRAM_RELEASE_VERSION << "\n" /* programversion */ + << 104 << "=" << PROGRAM_MODIFICATION_DATE << "\n" /* programdata */ + << 102 << "=" << Utility::GetPid() << "\n" /* process id */ + << 999 << "\n\n"; /* enddata */ + + m_IdoConnection->SendMessage(msgProcessData.str()); } /* @@ -205,10 +217,20 @@ void CompatIdoComponent::OpenIdoSocket(bool sockettype) */ void CompatIdoComponent::CloseIdoSocket(void) { - GoodByeSink(); - CloseSink(); + stringstream message; + message << "\n" + << 1000 << "\n" /* enddatadump */ + << "ENDTIME" << ": " << static_cast(Utility::GetTime()) << "\n" /* endtime */ + << "GOODBYE" /* goodbye */ + << "\n\n"; + + m_IdoConnection->SendMessage(message.str()); } +void CompatIdoComponent::SocketDisconnectHandler(void) +{ + m_IdoConnection.reset(); +} /* TODO * subscribe to all status updates and checkresults and dump them @@ -225,7 +247,7 @@ void CompatIdoComponent::StatusTimerHandler(void) Logger::Write(LogInformation, "compatido", "Writing compat ido status information"); DumpStatusData(); - DumpProgramStatusData(); + DumpProgramStatusData(); } /** @@ -235,10 +257,7 @@ void CompatIdoComponent::ConfigTimerHandler(void) { Logger::Write(LogInformation, "compatido", "Writing compat ido config information"); - /* protect the dump of status update messages */ - SetConfigDumpInProgress(true); DumpConfigObjects(); - SetConfigDumpInProgress(false); } /** @@ -246,13 +265,9 @@ void CompatIdoComponent::ConfigTimerHandler(void) */ void CompatIdoComponent::ProgramStatusTimerHandler(void) { - /* do not dump any data if config dump is still in progress */ - if(GetConfigDumpInProgress()) - return; - - Logger::Write(LogInformation, "compatido", "Writing compat ido program status information"); + Logger::Write(LogInformation, "compatido", "Writing compat ido program status information"); - DumpProgramStatusData(); + DumpProgramStatusData(); } /** @@ -260,153 +275,11 @@ void CompatIdoComponent::ProgramStatusTimerHandler(void) */ void CompatIdoComponent::ReconnectTimerHandler(void) { - Logger::Write(LogDebug, "compatido", "Checking if ido socket requires reconnect"); - - if(m_IdoSocket->GetReconnect()) { - - /* check if we aren't already connected */ - if(m_IdoSocket->IsConnected()) { - Logger::Write(LogDebug, "compatido", "Already connected to ido socket ... no reconnect necessary"); - return; - } + Logger::Write(LogDebug, "compatido", "Checking if ido socket requires reconnect"); + if (!m_IdoConnection) /* socket was disconnected, recconnect */ - OpenIdoSocket(m_IdoSocket->GetSocketType()); - - if(m_IdoSocket->IsConnected()) { - Logger::Write(LogInformation, "compatido", "Successfully reconnected to ido socket"); - } else { - stringstream message; - message << "Unable to reconnect to ido socket. Trying again in " << GetReconnectInterval() << " sec"; - Logger::Write(LogWarning, "compatido", message.str()); - } - } -} - - -/** - * opens a tcp connection to the ido socket - */ -void CompatIdoComponent::OpenSink(String node, String service) -{ - m_IdoSocket = boost::make_shared(RoleOutbound); - m_IdoSocket->Connect(node, service); - m_IdoSocket->Start(); -} - -/** - * sends hello msg to ido2b - */ -void CompatIdoComponent::SendHello(String instancename, bool sockettype) -{ - /* FIXME */ -#define COMPATIDO_PROTOCOL 2 -#define COMPATIDO_NAME "ICINGA2 COMPATIDO" -#define COMPATIDO_RELEASE_VERSION "2.0" - - String connection; - if(sockettype) - connection = "TCPSOCKET"; - else - connection = "UNIXSOCKET"; - - /* connection is always TCP */ - /* connecttype is always initial */ - stringstream message; - message << "\n\n" - << "HELLO" << "\n" - << "PROTOCOL" << ": " << COMPATIDO_PROTOCOL<< "\n" - << "AGENT" << ": " << COMPATIDO_NAME << "\n" - << "AGENTVERSION" << ": " << VERSION << "\n" - << "STARTTIME" << ": " << static_cast(Utility::GetTime()) << "\n" - << "DISPOSITION" << ": " << "REALTIME" << "\n" - << "CONNECTION" << ": " << connection << "\n" - << "INSTANCENAME" << ": " << instancename << "\n" - << "STARTDATADUMP" - << "\n\n"; - - m_IdoSocket->SendMessage(message.str()); -} - -/** - * sends goodbye msg to ido - */ -void CompatIdoComponent::GoodByeSink(void) -{ - stringstream message; - message << "\n" - << 1000 << "\n" /* enddatadump */ - << "ENDTIME" << ": " << static_cast(Utility::GetTime()) << "\n" /* endtime */ - << "GOODBYE" /* goodbye */ - << "\n\n"; - - m_IdoSocket->SendMessage(message.str()); -} - -/** - * closes ido socket - */ -void CompatIdoComponent::CloseSink(void) -{ - m_IdoSocket->Close(); -} - -/** - * tell ido2db that we are starting up (must be called before config dump) - */ -void CompatIdoComponent::SendStartProcess(void) -{ -/* TODO */ -#define PROGRAM_MODIFICATION_DATE "10-17-2012" -#define PROGRAM_RELEASE_VERSION "2.0" - - stringstream message; - message << "\n" - << 200 << "\n" /* processdata */ - << 1 << "=" << 104 << "\n" /* type = pprocess prelaunch */ - << 2 << "=" << "" << "\n" /* flags */ - << 3 << "=" << "" << "\n" /* attributes */ - << 4 << "=" << std::setprecision(17) << Utility::GetTime() << "\n" /* timestamp */ - << 105 << "=" << "Icinga2" << "\n" /* progranname */ - << 107 << "=" << PROGRAM_RELEASE_VERSION << "\n" /* programversion */ - << 104 << "=" << PROGRAM_MODIFICATION_DATE << "\n" /* programdata */ - << 102 << "=" << Utility::GetPid() << "\n" /* process id */ - << 999 << "\n\n"; /* enddata */ - - m_IdoSocket->SendMessage(message.str()); - -} - -/** - * sends config dump start signal to ido - */ -void CompatIdoComponent::StartConfigDump(void) -{ - /* configtype =1 (original), =2 (retained == default) */ - stringstream message; - message << "\n\n" - << 900 << ":" << "\n" /* startconfigdump */ - << 245 << "=" << "RETAINED" << "\n" /* configdumptype */ - << 4 << "=" << std::setprecision(17) << Utility::GetTime() << "\n" /* timestamp */ - << 999 /* enddata */ - << "\n\n"; - - m_IdoSocket->SendMessage(message.str()); -} - -/** - * sends config dump end signal to ido - */ -void CompatIdoComponent::EndConfigDump(void) -{ - stringstream message; - message << "\n\n" - << 901 << ":" << "\n" /* endconfigdump */ - << 4 << "=" << std::setprecision(17) << Utility::GetTime() << "\n" /* timestamp */ - << 999 /* enddata */ - << "\n\n"; - - m_IdoSocket->SendMessage(message.str()); + OpenIdoSocket(); } /** @@ -419,9 +292,9 @@ void CompatIdoComponent::EnableHostObject(const Host::Ptr& host) << 500 << ":" << "\n" /* enableobject */ << 4 << "=" << std::setprecision(17) << Utility::GetTime() << "\n" /* timestamp */ << 53 << "=" << host->GetName() << "\n" /* host */ - << 999 << "\n\n"; /* enddata */ + << 999 << "\n\n"; /* enddata */ - m_IdoSocket->SendMessage(message.str()); + m_IdoConnection->SendMessage(message.str()); } /** @@ -429,15 +302,15 @@ void CompatIdoComponent::EnableHostObject(const Host::Ptr& host) */ void CompatIdoComponent::EnableServiceObject(const Service::Ptr& service) { - stringstream message; - message << "\n" - << 500 << ":" << "\n" /* enableobject */ - << 4 << "=" << std::setprecision(17) << Utility::GetTime() << "\n" /* timestamp */ - << 53 << "=" << service->GetHost()->GetName() << "\n" /* host */ - << 114 << "=" << service->GetAlias() << "\n" /* service */ - << 999 << "\n\n"; /* enddata */ - - m_IdoSocket->SendMessage(message.str()); + stringstream message; + message << "\n" + << 500 << ":" << "\n" /* enableobject */ + << 4 << "=" << std::setprecision(17) << Utility::GetTime() << "\n" /* timestamp */ + << 53 << "=" << service->GetHost()->GetName() << "\n" /* host */ + << 114 << "=" << service->GetAlias() << "\n" /* service */ + << 999 << "\n\n"; /* enddata */ + + m_IdoConnection->SendMessage(message.str()); } /** @@ -445,14 +318,14 @@ void CompatIdoComponent::EnableServiceObject(const Service::Ptr& service) */ void CompatIdoComponent::DisableHostObject(const Host::Ptr& host) { - stringstream message; - message << "\n" - << 501 << ":" << "\n" /* disableobject */ - << 4 << "=" << std::setprecision(17) << Utility::GetTime() << "\n" /* timestamp */ - << 53 << "=" << host->GetName() << "\n" /* host */ - << 999 << "\n\n"; /* enddata */ - - m_IdoSocket->SendMessage(message.str()); + stringstream message; + message << "\n" + << 501 << ":" << "\n" /* disableobject */ + << 4 << "=" << std::setprecision(17) << Utility::GetTime() << "\n" /* timestamp */ + << 53 << "=" << host->GetName() << "\n" /* host */ + << 999 << "\n\n"; /* enddata */ + + m_IdoConnection->SendMessage(message.str()); } /** @@ -460,20 +333,17 @@ void CompatIdoComponent::DisableHostObject(const Host::Ptr& host) */ void CompatIdoComponent::DisableServiceObject(const Service::Ptr& service) { - stringstream message; - message << "\n" - << 501 << ":" << "\n" /* disableobject */ - << 4 << "=" << std::setprecision(17) << Utility::GetTime() << "\n" /* timestamp */ - << 53 << "=" << service->GetHost()->GetName() << "\n" /* host */ - << 114 << "=" << service->GetAlias() << "\n" /* service */ - << 999 << "\n\n"; /* enddata */ + stringstream message; + message << "\n" + << 501 << ":" << "\n" /* disableobject */ + << 4 << "=" << std::setprecision(17) << Utility::GetTime() << "\n" /* timestamp */ + << 53 << "=" << service->GetHost()->GetName() << "\n" /* host */ + << 114 << "=" << service->GetAlias() << "\n" /* service */ + << 999 << "\n\n"; /* enddata */ - m_IdoSocket->SendMessage(message.str()); + m_IdoConnection->SendMessage(message.str()); } - - - /** * dump host config to ido * @@ -483,7 +353,7 @@ void CompatIdoComponent::DumpHostObject(const Host::Ptr& host) { stringstream log; log << "Dumping Host Config: " << host->GetName(); - Logger::Write(LogDebug, "compatido", log.str()); + Logger::Write(LogDebug, "compatido", log.str()); stringstream message; message << "\n" @@ -550,10 +420,10 @@ void CompatIdoComponent::DumpHostObject(const Host::Ptr& host) << 200 << "=" << "i2_parent" << "\n" /* parenthost */ << 130 << "=" << "i2_contactgroup" << "\n" /* contactgroup */ << 264 << "=" << "i2_contact" << "\n" /* contact */ - << 262 << "=" << "i2_customvar" << ":" << 1 << ":" << "i2_custom_var_mod" << "\n" /* customvariable */ - << 999 << "\n\n"; /* enddata */ + << 262 << "=" << "i2_customvar" << ":" << 1 << ":" << "i2_custom_var_mod" << "\n" /* customvariable */ + << 999 << "\n\n"; /* enddata */ - m_IdoSocket->SendMessage(message.str()); + m_IdoConnection->SendMessage(message.str()); } /** @@ -563,21 +433,20 @@ void CompatIdoComponent::DumpHostObject(const Host::Ptr& host) */ void CompatIdoComponent::DumpHostStatus(const Host::Ptr& host) { - stringstream log; log << "Dumping Host Status: " << host->GetName(); - Logger::Write(LogDebug, "compatido", log.str()); - - int state; - if (!host->IsReachable()) - state = 2; /* unreachable */ - else if (!host->IsUp()) - state = 1; /* down */ - else - state = 0; /* up */ - - stringstream message; - message << "\n" + Logger::Write(LogDebug, "compatido", log.str()); + + int state; + if (!host->IsReachable()) + state = 2; /* unreachable */ + else if (!host->IsUp()) + state = 1; /* down */ + else + state = 0; /* up */ + + stringstream message; + message << "\n" << 212 << ":" << "\n" /* hoststatusdata */ << 1 << "=" << "" << "\n" /* type */ << 2 << "=" << "" << "\n" /* flags */ @@ -631,7 +500,7 @@ void CompatIdoComponent::DumpHostStatus(const Host::Ptr& host) << 262 << "=" << "i2_customvar" << ":" << "1" << ":" << "i2_customvarmod" << "\n" /* customvariable */ << 999 << "\n\n"; /* enddata */ - m_IdoSocket->SendMessage(message.str()); + m_IdoConnection->SendMessage(message.str()); } /** @@ -643,7 +512,7 @@ void CompatIdoComponent::DumpServiceObject(const Service::Ptr& service) { stringstream log; log << "Dumping Service Config: " << service->GetHost()->GetName() << "->" << service->GetAlias(); - Logger::Write(LogDebug, "compatido", log.str()); + Logger::Write(LogDebug, "compatido", log.str()); stringstream message; message << "\n" @@ -705,7 +574,7 @@ void CompatIdoComponent::DumpServiceObject(const Service::Ptr& service) << 262 << "=" << "i2_customvar" << ":" << 1 << ":" << "i2_custom_var_mod" << "\n" /* customvariable */ << 999 << "\n\n"; /* enddata */ - m_IdoSocket->SendMessage(message.str()); + m_IdoConnection->SendMessage(message.str()); } /** @@ -717,35 +586,35 @@ void CompatIdoComponent::DumpServiceStatus(const Service::Ptr& service) { stringstream log; log << "Dumping Service Status: " << service->GetHost()->GetName() << "->" << service->GetAlias(); - Logger::Write(LogDebug, "compatido", log.str()); - - String output; - String perfdata; - double schedule_start = -1, schedule_end = -1; - double execution_start = -1, execution_end = -1; - - Dictionary::Ptr cr = service->GetLastCheckResult(); - - if (cr) { - output = cr->Get("output"); - schedule_start = cr->Get("schedule_start"); - schedule_end = cr->Get("schedule_end"); - execution_start = cr->Get("execution_start"); - execution_end = cr->Get("execution_end"); - perfdata = cr->Get("performance_data_raw"); - } + Logger::Write(LogDebug, "compatido", log.str()); + + String output; + String perfdata; + double schedule_start = -1, schedule_end = -1; + double execution_start = -1, execution_end = -1; + + Dictionary::Ptr cr = service->GetLastCheckResult(); + + if (cr) { + output = cr->Get("output"); + schedule_start = cr->Get("schedule_start"); + schedule_end = cr->Get("schedule_end"); + execution_start = cr->Get("execution_start"); + execution_end = cr->Get("execution_end"); + perfdata = cr->Get("performance_data_raw"); + } - double execution_time = (execution_end - execution_start); - double latency = (schedule_end - schedule_start) - execution_time; + double execution_time = (execution_end - execution_start); + double latency = (schedule_end - schedule_start) - execution_time; - int state = service->GetState(); + int state = service->GetState(); - if (state > StateUnknown) - state = StateUnknown; + if (state > StateUnknown) + state = StateUnknown; - stringstream message; - message << "\n" - << 213 << ":" << "\n" /* servicestatusdata */ + stringstream message; + message << "\n" + << 213 << ":" << "\n" /* servicestatusdata */ << 1 << "=" << "" << "\n" /* type */ << 2 << "=" << "" << "\n" /* flags */ << 3 << "=" << "" << "\n" /* attributes */ @@ -800,7 +669,7 @@ void CompatIdoComponent::DumpServiceStatus(const Service::Ptr& service) << 262 << "=" << "i2_customvar" << ":" << "1" << ":" << "i2_customvarmod" << "\n" /* customvariable */ << 999 << "\n\n"; /* enddata */ - m_IdoSocket->SendMessage(message.str()); + m_IdoConnection->SendMessage(message.str()); } @@ -811,9 +680,9 @@ void CompatIdoComponent::DumpProgramStatusData(void) { double start_time = IcingaApplication::GetInstance()->GetStartTime(); - stringstream message; - message << "\n" - << 211 << ":" << "\n" /* programstatusdata */ + stringstream message; + message << "\n" + << 211 << ":" << "\n" /* programstatusdata */ << 1 << "=" << "" << "\n" /* type */ << 2 << "=" << "" << "\n" /* flags */ << 3 << "=" << "" << "\n" /* attributes */ @@ -839,9 +708,9 @@ void CompatIdoComponent::DumpProgramStatusData(void) << 49 << "=" << "" << "\n" /* globalhosteventhandler */ << 50 << "=" << "" << "\n" /* globalserviceeventhandler */ << 270 << "=" << static_cast(Utility::GetTime()) << "\n" /* disablednotificationsexpiretime - supported in 1.8 XXX */ - << 999 << "\n\n"; /* enddata */ + << 999 << "\n\n"; /* enddata */ - m_IdoSocket->SendMessage(message.str()); + m_IdoConnection->SendMessage(message.str()); } /** @@ -856,7 +725,16 @@ void CompatIdoComponent::DumpConfigObjects(void) */ /* tell ido2db that we start now */ - StartConfigDump(); + /* configtype =1 (original), =2 (retained == default) */ + stringstream msgStartConfigDump; + msgStartConfigDump << "\n\n" + << 900 << ":" << "\n" /* startconfigdump */ + << 245 << "=" << "RETAINED" << "\n" /* configdumptype */ + << 4 << "=" << std::setprecision(17) << Utility::GetTime() << "\n" /* timestamp */ + << 999 /* enddata */ + << "\n\n"; + + m_IdoConnection->SendMessage(msgStartConfigDump.str()); /* hosts and hostgroups */ map > hostgroups; @@ -885,7 +763,7 @@ void CompatIdoComponent::DumpConfigObjects(void) const String& name = hgt.first; const vector& hosts = hgt.second; - if(HostGroup::Exists(name)) { + if (HostGroup::Exists(name)) { HostGroup::Ptr hg = HostGroup::GetByName(name); /* dump the hostgroup and its attributes/members to ido */ @@ -896,11 +774,11 @@ void CompatIdoComponent::DumpConfigObjects(void) << 172 << "=" << name << "\n" /* hostgroupname */ << 170 << "=" << hg->GetAlias() << "\n"; /* hostgroupalias */ - CreateMessageList(message, hosts, 171); /* hostgroupmember */ + SendMessageList(message, hosts, 171); /* hostgroupmember */ message << 999 << "\n\n"; /* enddata */ - m_IdoSocket->SendMessage(message.str()); + m_IdoConnection->SendMessage(message.str()); } } @@ -935,12 +813,12 @@ void CompatIdoComponent::DumpConfigObjects(void) ServiceGroup::Ptr sg = ServiceGroup::GetByName(name); /* dump the servicegroup and its attributes/members to ido */ - stringstream message; - message << "\n" - << 403 << ":" << "\n" /* servicegroupdefinition */ - << 4 << "=" << std::setprecision(17) << Utility::GetTime() << "\n" /* timestamp */ - << 220 << "=" << name << "\n" /* servicegroupname */ - << 218 << "=" << sg->GetAlias() << "\n"; /* servicegroupalias */ + stringstream message; + message << "\n" + << 403 << ":" << "\n" /* servicegroupdefinition */ + << 4 << "=" << std::setprecision(17) << Utility::GetTime() << "\n" /* timestamp */ + << 220 << "=" << name << "\n" /* servicegroupname */ + << 218 << "=" << sg->GetAlias() << "\n"; /* servicegroupalias */ vector sglist; vector::iterator vt; @@ -950,16 +828,23 @@ void CompatIdoComponent::DumpConfigObjects(void) sglist.push_back(service->GetAlias()); } - CreateMessageList(message, services, 219); /* servicegroupmember */ + SendMessageList(message, services, 219); /* servicegroupmember */ - message << 999 << "\n\n"; /* enddata */ + message << 999 << "\n\n"; /* enddata */ - m_IdoSocket->SendMessage(message.str()); + m_IdoConnection->SendMessage(message.str()); } } /* tell ido2db that we ended dumping the config */ - EndConfigDump(); + stringstream msgEndConfigDump; + msgEndConfigDump << "\n\n" + << 901 << ":" << "\n" /* endconfigdump */ + << 4 << "=" << std::setprecision(17) << Utility::GetTime() << "\n" /* timestamp */ + << 999 /* enddata */ + << "\n\n"; + + m_IdoConnection->SendMessage(msgEndConfigDump.str()); } /** @@ -967,21 +852,21 @@ void CompatIdoComponent::DumpConfigObjects(void) */ void CompatIdoComponent::DumpStatusData(void) { - /* hosts */ - DynamicObject::Ptr object; - BOOST_FOREACH(tie(tuples::ignore, object), DynamicObject::GetObjects("Host")) { - const Host::Ptr& host = static_pointer_cast(object); + /* hosts */ + DynamicObject::Ptr object; + BOOST_FOREACH(tie(tuples::ignore, object), DynamicObject::GetObjects("Host")) { + const Host::Ptr& host = static_pointer_cast(object); - DumpHostStatus(host); - } + DumpHostStatus(host); + } - /* services */ - BOOST_FOREACH(tie(tuples::ignore, object), DynamicObject::GetObjects("Service")) { - Service::Ptr service = static_pointer_cast(object); + /* services */ + BOOST_FOREACH(tie(tuples::ignore, object), DynamicObject::GetObjects("Service")) { + Service::Ptr service = static_pointer_cast(object); - DumpServiceStatus(service); - } + DumpServiceStatus(service); + } } diff --git a/components/compatido/compatidocomponent.h b/components/compatido/compatidocomponent.h index 682c6f2c7..4422b178b 100644 --- a/components/compatido/compatidocomponent.h +++ b/components/compatido/compatidocomponent.h @@ -38,33 +38,24 @@ private: Timer::Ptr m_ProgramStatusTimer; Timer::Ptr m_ReconnectTimer; - IdoSocket::Ptr m_IdoSocket; - - bool m_ConfigDumpInProgress; + IdoConnection::Ptr m_IdoConnection; String GetSocketAddress(void) const; String GetSocketPort(void) const; String GetInstanceName(void) const; - int GetReconnectInterval(void) const; + double GetReconnectInterval(void) const; - void SetConfigDumpInProgress(bool state); - bool GetConfigDumpInProgress(void); + void SocketDisconnectHandler(void); void ConfigTimerHandler(void); void StatusTimerHandler(void); void ProgramStatusTimerHandler(void); void ReconnectTimerHandler(void); - void OpenIdoSocket(bool sockettype); + void OpenIdoSocket(void); void CloseIdoSocket(void); - void OpenSink(String node, String service); - void SendHello(String instancename, bool sockettype); - void GoodByeSink(void); - void CloseSink(void); void SendStartProcess(void); - void StartConfigDump(void); - void EndConfigDump(void); void EnableHostObject(const Host::Ptr& host); void EnableServiceObject(const Service::Ptr& service); @@ -80,7 +71,7 @@ private: void DumpProgramStatusData(void); template - void CreateMessageList(stringstream& msg, const T& list, int type) + void SendMessageList(stringstream& msg, const T& list, int type) { typename T::const_iterator it; for (it = list.begin(); it != list.end(); it++) { diff --git a/components/compatido/i2-compatido.h b/components/compatido/i2-compatido.h index 0b9219ad4..400712b32 100644 --- a/components/compatido/i2-compatido.h +++ b/components/compatido/i2-compatido.h @@ -33,7 +33,7 @@ using std::stringstream; -#include "idosocket.h" +#include "idoconnection.h" #include "compatidocomponent.h" diff --git a/components/compatido/idoconnection.cpp b/components/compatido/idoconnection.cpp new file mode 100644 index 000000000..af4670c80 --- /dev/null +++ b/components/compatido/idoconnection.cpp @@ -0,0 +1,59 @@ +/****************************************************************************** + * Icinga 2 * + * Copyright (C) 2012 Icinga Development Team (http://www.icinga.org/) * + * * + * This program is free software; you can redistribute it and/or * + * modify it under the terms of the GNU General Public License * + * as published by the Free Software Foundation; either version 2 * + * of the License, or (at your option) any later version. * + * * + * This program is distributed in the hope that it will be useful, * + * but WITHOUT ANY WARRANTY; without even the implied warranty of * + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * + * GNU General Public License for more details. * + * * + * You should have received a copy of the GNU General Public License * + * along with this program; if not, write to the Free Software Foundation * + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. * + ******************************************************************************/ + +#include "i2-compatido.h" + +using namespace icinga; + +/** + * Constructor for the IdoSocket class. + * + * @param role The role of the underlying TCP client. + */ +IdoConnection::IdoConnection(const Stream::Ptr& stream) + : Connection(stream) +{ } + +/** + * Sends a message to the ido socket + * + * @param message The message. + */ +void IdoConnection::SendMessage(const String& message) +{ + /* + * write our message to the send queue + * as we inherit all the functionality + * of the tcpclient class + */ + GetStream()->Write(message.CStr(), message.GetLength()); +} + + +/** + * Processes inbound data. + * Currently not used, as we do not receive data from ido sockets + */ +void IdoConnection::ProcessData(void) +{ + // Just ignore whatever data the other side is sending + GetStream()->Read(NULL, GetStream()->GetAvailableBytes()); + + return; +} diff --git a/lib/remoting/jsonrpcserver.cpp b/components/compatido/idoconnection.h similarity index 78% rename from lib/remoting/jsonrpcserver.cpp rename to components/compatido/idoconnection.h index c645de639..f0956ef6c 100644 --- a/lib/remoting/jsonrpcserver.cpp +++ b/components/compatido/idoconnection.h @@ -17,16 +17,31 @@ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. * ******************************************************************************/ -#include "i2-remoting.h" +#ifndef IDOCONNECTION_H +#define IDOCONNECTION_H -using namespace icinga; +namespace icinga +{ /** - * Constructor for the JsonRpcServer class. + * An IDO socket client. * - * @param sslContext SSL context that should be used for client connections. + * @ingroup compatido */ -JsonRpcServer::JsonRpcServer(shared_ptr sslContext) +class IdoConnection : public Connection { - SetClientFactory(boost::bind(&JsonRpcClientFactory, _1, RoleInbound, sslContext)); +public: + typedef shared_ptr Ptr; + typedef weak_ptr WeakPtr; + + IdoConnection(const Stream::Ptr& stream); + + void SendMessage(const String& message); + +protected: + virtual void ProcessData(void); +}; + } + +#endif /* IDOCONNECTION_H */ diff --git a/components/compatido/idosocket.cpp b/components/compatido/idosocket.cpp deleted file mode 100644 index fa07791b7..000000000 --- a/components/compatido/idosocket.cpp +++ /dev/null @@ -1,147 +0,0 @@ -/****************************************************************************** - * Icinga 2 * - * Copyright (C) 2012 Icinga Development Team (http://www.icinga.org/) * - * * - * This program is free software; you can redistribute it and/or * - * modify it under the terms of the GNU General Public License * - * as published by the Free Software Foundation; either version 2 * - * of the License, or (at your option) any later version. * - * * - * This program is distributed in the hope that it will be useful, * - * but WITHOUT ANY WARRANTY; without even the implied warranty of * - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * - * GNU General Public License for more details. * - * * - * You should have received a copy of the GNU General Public License * - * along with this program; if not, write to the Free Software Foundation * - * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. * - ******************************************************************************/ - -#include "i2-compatido.h" - -using namespace icinga; - -/** - * Constructor for the IdoSocket class. - * - * @param role The role of the underlying TCP client. - */ -IdoSocket::IdoSocket(TcpClientRole role) - : TcpClient(role) -{ - /* - * we currently do not receive any data from the ido socket, - * this is just data output - so we do not need to bind - * a local instance of our datahandler in case of a new - * signal telling about new data - */ - OnDataAvailable.connect(boost::bind(&IdoSocket::DataAvailableHandler, this)); - - /* - * what to do on disconnect - */ - OnClosed.connect(boost::bind(&IdoSocket::ClientClosedHandler, this)); - -} - - -/** - * * Set the ido socket type - * * - * * @param type true=tcp, false=unix - * */ -void IdoSocket::SetSocketType(bool type) -{ - m_SocketType = type; -} - -/* - * * Get the ido socket type - * * - * * @returns type true=tcp, false=unix - * */ -bool IdoSocket::GetSocketType(void) -{ - return m_SocketType; -} - -/** - * Sends a message to the ido socket - * - * @param message The message. - */ -void IdoSocket::SendMessage(const String& message) -{ - /* - * write our message to the send queue - * as we inherit all the functionality - * of the tcpclient class - */ - Write(message.CStr(), message.GetLength()); -} - - -/** - * Handles closed client connect - */ -void IdoSocket::ClientClosedHandler(void) -{ - try { - CheckException(); - } catch (const exception& ex) { - stringstream message; - message << "Error occured for ido socket: " << ex.what(); - - Logger::Write(LogWarning, "compatido", message.str()); - } - - Logger::Write(LogWarning, "compatido", "Lost connection to ido socket"); - - SetReconnect(true); - - OnDisconnected(GetSelf()); -} - - -/** - * Set reconnect vstate - * - * @aparam enable Enables the reconnect. - */ -void IdoSocket::SetReconnect(bool reconnect) -{ - m_Reconnect = reconnect; -} - -/** - * Get reconnect state - * - * @returns reconnect The reconnect variable - */ -bool IdoSocket::GetReconnect(void) -{ - return m_Reconnect; -} - -/** - * Processes inbound data. - * Currently not used, as we do not receive data from ido sockets - */ -void IdoSocket::DataAvailableHandler(void) -{ - return; -} - -/** - * Factory function for ido socket clients. - * - * @param fd The file descriptor. - * @param role The role of the underlying TCP client. - * @returns A new ido socket client. - */ -IdoSocket::Ptr icinga::IdoSocketFactory(SOCKET fd, TcpClientRole role) -{ - IdoSocket::Ptr client = boost::make_shared(role); - client->SetFD(fd); - return client; -} diff --git a/components/compatido/idosocket.h b/components/compatido/idosocket.h deleted file mode 100644 index 6ac356185..000000000 --- a/components/compatido/idosocket.h +++ /dev/null @@ -1,68 +0,0 @@ -/****************************************************************************** - * Icinga 2 * - * Copyright (C) 2012 Icinga Development Team (http://www.icinga.org/) * - * * - * This program is free software; you can redistribute it and/or * - * modify it under the terms of the GNU General Public License * - * as published by the Free Software Foundation; either version 2 * - * of the License, or (at your option) any later version. * - * * - * This program is distributed in the hope that it will be useful, * - * but WITHOUT ANY WARRANTY; without even the implied warranty of * - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * - * GNU General Public License for more details. * - * * - * You should have received a copy of the GNU General Public License * - * along with this program; if not, write to the Free Software Foundation * - * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. * - ******************************************************************************/ - -#ifndef IDOSOCKET_H -#define IDOSOCKET_H - -#include "i2-compatido.h" - -namespace icinga -{ - -/** - * An IDO socket client. - * - * @ingroup compatido - */ -class IdoSocket : public TcpClient -{ -public: - typedef shared_ptr Ptr; - typedef weak_ptr WeakPtr; - - IdoSocket(TcpClientRole role); - - void SetSocketType(bool); - bool GetSocketType(void); - - void SendMessage(const String& message); - - void SetReconnect(bool reconnect); - bool GetReconnect(void); - - boost::signal OnNewMessage; - - boost::signal OnConnected; - boost::signal OnDisconnected; - -private: - void DataAvailableHandler(void); - void ClientClosedHandler(void); - - bool m_Reconnect; - bool m_SocketType; - - friend IdoSocket::Ptr IdoSocketFactory(SOCKET fd, TcpClientRole role); -}; - -IdoSocket::Ptr IdoSocketFactory(SOCKET fd, TcpClientRole role); - -} - -#endif /* JSONRPCCLIENT_H */ diff --git a/components/demo/democomponent.cpp b/components/demo/democomponent.cpp index ebc8dd841..17c3d4dbf 100644 --- a/components/demo/democomponent.cpp +++ b/components/demo/democomponent.cpp @@ -68,8 +68,7 @@ void DemoComponent::DemoTimerHandler(void) void DemoComponent::HelloWorldRequestHandler(const Endpoint::Ptr& sender, const RequestMessage& request) { - Logger::Write(LogInformation, "demo", "Got 'hello world' from" - " address=" + sender->GetAddress() + ", identity=" + + Logger::Write(LogInformation, "demo", "Got 'hello world' from identity=" + sender->GetName()); } diff --git a/lib/base/Makefile.am b/lib/base/Makefile.am index f7fed226c..8fc1cc23d 100644 --- a/lib/base/Makefile.am +++ b/lib/base/Makefile.am @@ -10,6 +10,8 @@ libbase_la_SOURCES = \ asynctask.h \ component.cpp \ component.h \ + connection.cpp \ + connection.h \ dictionary.cpp \ dictionary.h \ dynamicobject.cpp \ @@ -21,7 +23,6 @@ libbase_la_SOURCES = \ fifo.cpp \ fifo.h \ i2-base.h \ - ioqueue.h \ logger.cpp \ logger.h \ netstring.cpp \ @@ -40,20 +41,20 @@ libbase_la_SOURCES = \ scripttask.h \ socket.cpp \ socket.h \ + stream.cpp \ + stream.h \ + stream_bio.cpp \ + stream_bio.h \ streamlogger.cpp \ streamlogger.h \ sysloglogger.cpp \ sysloglogger.h \ - tcpclient.cpp \ - tcpclient.h \ - tcpserver.cpp \ - tcpserver.h \ tcpsocket.cpp \ tcpsocket.h \ timer.cpp \ timer.h \ - tlsclient.cpp \ - tlsclient.h \ + tlsstream.cpp \ + tlsstream.h \ unix.h \ utility.cpp \ utility.h \ diff --git a/lib/base/base.vcxproj b/lib/base/base.vcxproj index f3620d621..c877fbfa9 100644 --- a/lib/base/base.vcxproj +++ b/lib/base/base.vcxproj @@ -21,6 +21,7 @@ + @@ -41,13 +42,13 @@ + + - - - + @@ -55,11 +56,12 @@ + - + @@ -72,12 +74,11 @@ + - - - + @@ -240,4 +241,4 @@ - + \ No newline at end of file diff --git a/lib/base/base.vcxproj.filters b/lib/base/base.vcxproj.filters index a7b9317e4..0845ca949 100644 --- a/lib/base/base.vcxproj.filters +++ b/lib/base/base.vcxproj.filters @@ -25,30 +25,15 @@ Quelldateien - - Quelldateien - - - Quelldateien - Quelldateien Quelldateien - - Quelldateien - - - Quelldateien - Quelldateien - - Quelldateien - Quelldateien @@ -85,6 +70,18 @@ Quelldateien + + Quelldateien + + + Quelldateien + + + Quelldateien + + + Quelldateien + @@ -117,21 +114,12 @@ Headerdateien - - Headerdateien - - - Headerdateien - Headerdateien Headerdateien - - Headerdateien - Headerdateien @@ -159,9 +147,6 @@ Headerdateien - - Headerdateien - Headerdateien @@ -177,6 +162,18 @@ Headerdateien + + Headerdateien + + + Headerdateien + + + Headerdateien + + + Headerdateien + diff --git a/lib/base/connection.cpp b/lib/base/connection.cpp new file mode 100644 index 000000000..cedd4ef6f --- /dev/null +++ b/lib/base/connection.cpp @@ -0,0 +1,44 @@ +/****************************************************************************** + * Icinga 2 * + * Copyright (C) 2012 Icinga Development Team (http://www.icinga.org/) * + * * + * This program is free software; you can redistribute it and/or * + * modify it under the terms of the GNU General Public License * + * as published by the Free Software Foundation; either version 2 * + * of the License, or (at your option) any later version. * + * * + * This program is distributed in the hope that it will be useful, * + * but WITHOUT ANY WARRANTY; without even the implied warranty of * + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * + * GNU General Public License for more details. * + * * + * You should have received a copy of the GNU General Public License * + * along with this program; if not, write to the Free Software Foundation * + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. * + ******************************************************************************/ + +#include "i2-base.h" + +using namespace icinga; + +Connection::Connection(const Stream::Ptr& stream) + : m_Stream(stream) +{ + m_Stream->OnDataAvailable.connect(boost::bind(&Connection::ProcessData, this)); + m_Stream->OnClosed.connect(boost::bind(&Connection::ClosedHandler, this)); +} + +Stream::Ptr Connection::GetStream(void) const +{ + return m_Stream; +} + +void Connection::ClosedHandler(void) +{ + OnClosed(GetSelf()); +} + +void Connection::Close(void) +{ + m_Stream->Close(); +} diff --git a/lib/base/tcpserver.h b/lib/base/connection.h similarity index 68% rename from lib/base/tcpserver.h rename to lib/base/connection.h index 27aa3e34c..4e48d4081 100644 --- a/lib/base/tcpserver.h +++ b/lib/base/connection.h @@ -17,44 +17,35 @@ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. * ******************************************************************************/ -#ifndef TCPSERVER_H -#define TCPSERVER_H +#ifndef CONNECTION_H +#define CONNECTION_H namespace icinga { -/** - * A TCP server that listens on a TCP port and accepts incoming - * client connections. - * - * @ingroup base - */ -class I2_BASE_API TcpServer : public TcpSocket +class I2_BASE_API Connection : public Object { public: - typedef shared_ptr Ptr; - typedef weak_ptr WeakPtr; + typedef shared_ptr Ptr; + typedef weak_ptr WeakPtr; - typedef function ClientFactory; + Connection(const Stream::Ptr& stream); - TcpServer(void); + Stream::Ptr GetStream(void) const; - void SetClientFactory(const ClientFactory& clientFactory); - ClientFactory GetFactoryFunction(void) const; + void Close(void); - void Listen(void); - - boost::signal OnNewClient; + boost::signal OnClosed; protected: - virtual bool WantsToRead(void) const; - - virtual void HandleReadable(void); + virtual void ProcessData(void) = 0; private: - ClientFactory m_ClientFactory; + Stream::Ptr m_Stream; + + void ClosedHandler(void); }; } -#endif /* TCPSERVER_H */ +#endif /* CONNECTION_H */ \ No newline at end of file diff --git a/lib/base/dynamicobject.cpp b/lib/base/dynamicobject.cpp index 7e983cdb8..734b9e93c 100644 --- a/lib/base/dynamicobject.cpp +++ b/lib/base/dynamicobject.cpp @@ -368,6 +368,7 @@ void DynamicObject::DumpObjects(const String& filename) throw_exception(runtime_error("Could not open '" + filename + "' file")); FIFO::Ptr fifo = boost::make_shared(); + fifo->Start(); DynamicObject::TypeMap::iterator tt; for (tt = GetAllObjects().begin(); tt != GetAllObjects().end(); tt++) { @@ -401,7 +402,7 @@ void DynamicObject::DumpObjects(const String& filename) String json = value.Serialize(); /* This is quite ugly, unfortunatelly NetString requires an IOQueue object */ - NetString::WriteStringToIOQueue(fifo.get(), json); + NetString::WriteStringToStream(fifo, json); size_t count; while ((count = fifo->GetAvailableBytes()) > 0) { @@ -416,6 +417,8 @@ void DynamicObject::DumpObjects(const String& filename) } } + fifo->Close(); + fp.close(); #ifdef _WIN32 @@ -436,6 +439,8 @@ void DynamicObject::RestoreObjects(const String& filename) /* TODO: Fix this horrible mess by implementing a class that provides * IOQueue functionality for files. */ FIFO::Ptr fifo = boost::make_shared(); + fifo->Start(); + while (fp) { char buffer[1024]; @@ -444,7 +449,7 @@ void DynamicObject::RestoreObjects(const String& filename) } String message; - while (NetString::ReadStringFromIOQueue(fifo.get(), &message)) { + while (NetString::ReadStringFromStream(fifo, &message)) { Dictionary::Ptr persistentObject = Value::Deserialize(message); String type = persistentObject->Get("type"); @@ -462,6 +467,8 @@ void DynamicObject::RestoreObjects(const String& filename) object->ApplyUpdate(update, Attribute_All); } } + + fifo->Close(); } void DynamicObject::DeactivateObjects(void) diff --git a/lib/base/fifo.cpp b/lib/base/fifo.cpp index 5f9e59bfd..6af65b8f2 100644 --- a/lib/base/fifo.cpp +++ b/lib/base/fifo.cpp @@ -38,6 +38,13 @@ FIFO::~FIFO(void) free(m_Buffer); } +void FIFO::Start(void) +{ + SetConnected(true); + + Stream::Start(); +} + /** * Resizes the FIFO's buffer so that it is at least newSize bytes long. * @@ -109,25 +116,32 @@ size_t FIFO::GetAvailableBytes(void) const /** * Implements IOQueue::Peek. */ -void FIFO::Peek(void *buffer, size_t count) +size_t FIFO::Peek(void *buffer, size_t count) { - assert(m_DataSize >= count); + assert(IsConnected()); + + if (count > m_DataSize) + count = m_DataSize; if (buffer != NULL) memcpy(buffer, m_Buffer + m_Offset, count); + + return count; } /** * Implements IOQueue::Read. */ -void FIFO::Read(void *buffer, size_t count) +size_t FIFO::Read(void *buffer, size_t count) { - Peek(buffer, count); + count = Peek(buffer, count); m_DataSize -= count; m_Offset += count; Optimize(); + + return count; } /** @@ -135,6 +149,8 @@ void FIFO::Read(void *buffer, size_t count) */ void FIFO::Write(const void *buffer, size_t count) { + assert(IsConnected()); + ResizeBuffer(m_Offset + m_DataSize + count); memcpy(m_Buffer + m_Offset + m_DataSize, buffer, count); m_DataSize += count; diff --git a/lib/base/fifo.h b/lib/base/fifo.h index f472833e9..bc31578c5 100644 --- a/lib/base/fifo.h +++ b/lib/base/fifo.h @@ -28,7 +28,7 @@ namespace icinga * * @ingroup base */ -class I2_BASE_API FIFO : public Object, public IOQueue +class I2_BASE_API FIFO : public Stream { public: static const size_t BlockSize = 16 * 1024; @@ -39,13 +39,15 @@ public: FIFO(void); ~FIFO(void); + void Start(void); + /*const void *GetReadBuffer(void) const; void *GetWriteBuffer(size_t *count);*/ - virtual size_t GetAvailableBytes(void) const; - virtual void Peek(void *buffer, size_t count); - virtual void Read(void *buffer, size_t count); - virtual void Write(const void *buffer, size_t count); + size_t GetAvailableBytes(void) const; + size_t Peek(void *buffer, size_t count); + size_t Read(void *buffer, size_t count); + void Write(const void *buffer, size_t count); private: char *m_Buffer; diff --git a/lib/base/i2-base.h b/lib/base/i2-base.h index f4668b601..25940b0b7 100644 --- a/lib/base/i2-base.h +++ b/lib/base/i2-base.h @@ -175,14 +175,14 @@ namespace tuples = boost::tuples; #include "dictionary.h" #include "ringbuffer.h" #include "timer.h" -#include "ioqueue.h" +#include "stream.h" +#include "stream_bio.h" +#include "connection.h" #include "netstring.h" #include "fifo.h" #include "socket.h" #include "tcpsocket.h" -#include "tcpclient.h" -#include "tcpserver.h" -#include "tlsclient.h" +#include "tlsstream.h" #include "asynctask.h" #include "process.h" #include "scriptfunction.h" diff --git a/lib/base/netstring.cpp b/lib/base/netstring.cpp index 3519580ca..546aa79c3 100644 --- a/lib/base/netstring.cpp +++ b/lib/base/netstring.cpp @@ -22,17 +22,17 @@ using namespace icinga; /** - * Reads data from an IOQueue in netString format. + * Reads data from a stream in netString format. * - * @param queue The IOQueue to read from. + * @param stream The stream to read from. * @param[out] str The String that has been read from the IOQueue. * @returns true if a complete String was read from the IOQueue, false otherwise. * @exception invalid_argument The input stream is invalid. * @see https://github.com/PeterScott/netString-c/blob/master/netString.c */ -bool NetString::ReadStringFromIOQueue(IOQueue *queue, String *str) +bool NetString::ReadStringFromStream(const Stream::Ptr& stream, String *str) { - size_t buffer_length = queue->GetAvailableBytes(); + size_t buffer_length = stream->GetAvailableBytes(); /* minimum netString length is 3 */ if (buffer_length < 3) @@ -47,7 +47,7 @@ bool NetString::ReadStringFromIOQueue(IOQueue *queue, String *str) if (buffer == NULL && buffer_length > 0) throw_exception(bad_alloc()); - queue->Peek(buffer, buffer_length); + stream->Peek(buffer, buffer_length); /* no leading zeros allowed */ if (buffer[0] == '0' && isdigit(buffer[1])) { @@ -68,7 +68,7 @@ bool NetString::ReadStringFromIOQueue(IOQueue *queue, String *str) len = len * 10 + (buffer[i] - '0'); } - buffer_length = queue->GetAvailableBytes(); + buffer_length = stream->GetAvailableBytes(); /* make sure the buffer is large enough */ if (i + len + 1 >= buffer_length) @@ -86,7 +86,7 @@ bool NetString::ReadStringFromIOQueue(IOQueue *queue, String *str) buffer = new_buffer; - queue->Peek(buffer, buffer_length); + stream->Peek(buffer, buffer_length); /* check for the colon delimiter */ if (buffer[i] != ':') { @@ -104,25 +104,25 @@ bool NetString::ReadStringFromIOQueue(IOQueue *queue, String *str) free(buffer); - /* remove the data from the IOQueue */ - queue->Read(NULL, buffer_length); + /* remove the data from the stream */ + stream->Read(NULL, buffer_length); return true; } /** - * Writes data into an IOQueue using the netString format. + * Writes data into a stream using the netString format. * - * @param queue The IOQueue. + * @param stream The stream. * @param str The String that is to be written. */ -void NetString::WriteStringToIOQueue(IOQueue *queue, const String& str) +void NetString::WriteStringToStream(const Stream::Ptr& stream, const String& str) { stringstream prefixbuf; prefixbuf << str.GetLength() << ":"; String prefix = prefixbuf.str(); - queue->Write(prefix.CStr(), prefix.GetLength()); - queue->Write(str.CStr(), str.GetLength()); - queue->Write(",", 1); + stream->Write(prefix.CStr(), prefix.GetLength()); + stream->Write(str.CStr(), str.GetLength()); + stream->Write(",", 1); } diff --git a/lib/base/netstring.h b/lib/base/netstring.h index d32616e4c..9b4efc872 100644 --- a/lib/base/netstring.h +++ b/lib/base/netstring.h @@ -33,8 +33,8 @@ namespace icinga class I2_BASE_API NetString { public: - static bool ReadStringFromIOQueue(IOQueue *queue, String *message); - static void WriteStringToIOQueue(IOQueue *queue, const String& message); + static bool ReadStringFromStream(const Stream::Ptr& stream, String *message); + static void WriteStringToStream(const Stream::Ptr& stream, const String& message); private: NetString(void); diff --git a/lib/base/socket.cpp b/lib/base/socket.cpp index dd6240e39..3f5973eda 100644 --- a/lib/base/socket.cpp +++ b/lib/base/socket.cpp @@ -25,16 +25,19 @@ using namespace icinga; * Constructor for the Socket class. */ Socket::Socket(void) - : m_FD(INVALID_SOCKET), m_Connected(false) -{ } + : m_FD(INVALID_SOCKET), m_Connected(false), m_Listening(false), + m_SendQueue(boost::make_shared()), m_RecvQueue(boost::make_shared()) +{ + m_SendQueue->Start(); + m_RecvQueue->Start(); +} /** * Destructor for the Socket class. */ Socket::~Socket(void) { - boost::mutex::scoped_lock lock(m_SocketMutex); - CloseInternal(true); + Close(); } /** @@ -50,6 +53,8 @@ void Socket::Start(void) m_WriteThread = thread(boost::bind(&Socket::WriteThreadProc, static_cast(GetSelf()))); m_WriteThread.detach(); + + Stream::Start(); } /** @@ -88,35 +93,24 @@ SOCKET Socket::GetFD(void) const return m_FD; } -/** - * Closes the socket. - */ -void Socket::Close(void) +void Socket::CloseUnlocked(void) { - boost::mutex::scoped_lock lock(m_SocketMutex); + if (m_FD == INVALID_SOCKET) + return; + + closesocket(m_FD); + m_FD = INVALID_SOCKET; - CloseInternal(false); + Stream::Close(); } /** * Closes the socket. - * - * @param from_dtor Whether this method was called from the destructor. */ -void Socket::CloseInternal(bool from_dtor) +void Socket::Close(void) { - if (m_FD == INVALID_SOCKET) - return; - - SetConnected(false); - - closesocket(m_FD); - m_FD = INVALID_SOCKET; - - /* nobody can possibly have a valid event subscription when the - destructor has been called */ - if (!from_dtor) - Event::Post(boost::bind(boost::ref(OnClosed), GetSelf())); + boost::mutex::scoped_lock lock(m_SocketMutex); + CloseUnlocked(); } /** @@ -159,32 +153,6 @@ void Socket::HandleException(void) throw_exception(SocketException("select() returned fd in except fdset", GetError())); } -/** - * Checks whether data should be read for this socket object. - * - * @returns true if the socket should be registered for reading, false otherwise. - */ -bool Socket::WantsToRead(void) const -{ - return false; -} - -void Socket::HandleReadable(void) -{ } - -/** - * Checks whether data should be written for this socket object. - * - * @returns true if the socket should be registered for writing, false otherwise. - */ -bool Socket::WantsToWrite(void) const -{ - return false; -} - -void Socket::HandleWritable(void) -{ } - /** * Formats a sockaddr in a human-readable way. * @@ -305,9 +273,9 @@ void Socket::ReadThreadProc(void) if (FD_ISSET(fd, &exceptfds)) HandleException(); } catch (...) { - m_Exception = boost::current_exception(); + SetException(boost::current_exception()); - CloseInternal(false); + CloseUnlocked(); break; } @@ -360,9 +328,9 @@ void Socket::WriteThreadProc(void) if (FD_ISSET(fd, &writefds)) HandleWritable(); } catch (...) { - m_Exception = boost::current_exception(); + SetException(boost::current_exception()); - CloseInternal(false); + CloseUnlocked(); break; } @@ -390,11 +358,281 @@ bool Socket::IsConnected(void) const } /** - * Checks whether an exception is available for this socket. Should be called - * by user-supplied handlers for the OnClosed signal. + * Returns how much data is available for reading. + * + * @returns The number of bytes available. + */ +size_t Socket::GetAvailableBytes(void) const +{ + if (m_Listening) + throw new logic_error("Socket does not support GetAvailableBytes()."); + + { + boost::mutex::scoped_lock lock(m_QueueMutex); + + return m_RecvQueue->GetAvailableBytes(); + } +} + +/** + * Reads data from the socket. + * + * @param buffer The buffer where the data should be stored. + * @param size The size of the buffer. + * @returns The number of bytes read. + */ +size_t Socket::Read(void *buffer, size_t size) +{ + if (m_Listening) + throw new logic_error("Socket does not support Read()."); + + { + boost::mutex::scoped_lock lock(m_QueueMutex); + + if (m_RecvQueue->GetAvailableBytes() == 0) + CheckException(); + + return m_RecvQueue->Read(buffer, size); + } +} + +/** + * Peeks at data for the socket. + * + * @param buffer The buffer where the data should be stored. + * @param size The size of the buffer. + * @returns The number of bytes read. + */ +size_t Socket::Peek(void *buffer, size_t size) +{ + if (m_Listening) + throw new logic_error("Socket does not support Peek()."); + + { + boost::mutex::scoped_lock lock(m_QueueMutex); + + if (m_RecvQueue->GetAvailableBytes() == 0) + CheckException(); + + return m_RecvQueue->Peek(buffer, size); + } +} + +/** + * Writes data to the socket. + * + * @param buffer The buffer that should be sent. + * @param size The size of the buffer. + */ +void Socket::Write(const void *buffer, size_t size) +{ + if (m_Listening) + throw new logic_error("Socket does not support Write()."); + + { + boost::mutex::scoped_lock lock(m_QueueMutex); + + m_SendQueue->Write(buffer, size); + } +} + +/** + * Starts listening for incoming client connections. + */ +void Socket::Listen(void) +{ + if (listen(GetFD(), SOMAXCONN) < 0) + throw_exception(SocketException("listen() failed", GetError())); + + m_Listening = true; +} + +void Socket::HandleWritable(void) +{ + if (m_Listening) + HandleWritableServer(); + else + HandleWritableClient(); +} + +void Socket::HandleReadable(void) +{ + if (m_Listening) + HandleReadableServer(); + else + HandleReadableClient(); +} + +/** + * Processes data that is available for this socket. + */ +void Socket::HandleWritableClient(void) +{ + int rc; + char data[1024]; + size_t count; + + if (!IsConnected()) + SetConnected(true); + + for (;;) { + { + boost::mutex::scoped_lock lock(m_QueueMutex); + + count = m_SendQueue->GetAvailableBytes(); + + if (count == 0) + break; + + if (count > sizeof(data)) + count = sizeof(data); + + m_SendQueue->Peek(data, count); + } + + rc = send(GetFD(), data, count, 0); + + if (rc <= 0) + throw_exception(SocketException("send() failed", GetError())); + + { + boost::mutex::scoped_lock lock(m_QueueMutex); + m_SendQueue->Read(NULL, rc); + } + } +} + +/** + * Processes data that can be written for this socket. + */ +void Socket::HandleReadableClient(void) +{ + if (!IsConnected()) + SetConnected(true); + + bool new_data = false; + + for (;;) { + char data[1024]; + int rc = recv(GetFD(), data, sizeof(data), 0); + +#ifdef _WIN32 + if (rc < 0 && WSAGetLastError() == WSAEWOULDBLOCK) +#else /* _WIN32 */ + if (rc < 0 && errno == EAGAIN) +#endif /* _WIN32 */ + break; + + if (rc <= 0) + throw_exception(SocketException("recv() failed", GetError())); + + { + boost::mutex::scoped_lock lock(m_QueueMutex); + + m_RecvQueue->Write(data, rc); + } + + new_data = true; + } + + if (new_data) + Event::Post(boost::bind(boost::ref(OnDataAvailable), GetSelf())); +} + +void Socket::HandleWritableServer(void) +{ + throw logic_error("This should never happen."); +} + +/** + * Accepts a new client and creates a new client object for it + * using the client factory function. + */ +void Socket::HandleReadableServer(void) +{ + int fd; + sockaddr_storage addr; + socklen_t addrlen = sizeof(addr); + + fd = accept(GetFD(), (sockaddr *)&addr, &addrlen); + + if (fd < 0) + throw_exception(SocketException("accept() failed", GetError())); + + TcpSocket::Ptr client = boost::make_shared(); + client->SetFD(fd); + Event::Post(boost::bind(boost::ref(OnNewClient), GetSelf(), client)); +} + +/** + * Checks whether data should be written for this socket object. + * + * @returns true if the socket should be registered for writing, false otherwise. + */ +bool Socket::WantsToWrite(void) const +{ + if (m_Listening) + return WantsToWriteServer(); + else + return WantsToWriteClient(); +} + +/** + * Checks whether data should be read for this socket object. + * + * @returns true if the socket should be registered for reading, false otherwise. + */ +bool Socket::WantsToRead(void) const +{ + if (m_Listening) + return WantsToReadServer(); + else + return WantsToReadClient(); +} + +/** + * Checks whether data should be read for this socket. + * + * @returns true + */ +bool Socket::WantsToReadClient(void) const +{ + return true; +} + +/** + * Checks whether data should be written for this socket. + * + * @returns true if data should be written, false otherwise. + */ +bool Socket::WantsToWriteClient(void) const +{ + { + boost::mutex::scoped_lock lock(m_QueueMutex); + + if (m_SendQueue->GetAvailableBytes() > 0) + return true; + } + + return (!IsConnected()); +} + +/** + * Checks whether the TCP server wants to write. + * + * @returns false + */ +bool Socket::WantsToWriteServer(void) const +{ + return false; +} + +/** + * Checks whether the TCP server wants to read (i.e. accept new clients). + * + * @returns true */ -void Socket::CheckException(void) +bool Socket::WantsToReadServer(void) const { - if (m_Exception) - rethrow_exception(m_Exception); + return true; } diff --git a/lib/base/socket.h b/lib/base/socket.h index 1777f2f15..0dad5d507 100644 --- a/lib/base/socket.h +++ b/lib/base/socket.h @@ -23,11 +23,11 @@ namespace icinga { /** - * Base class for sockets. + * Base class for connection-oriented sockets. * * @ingroup base */ -class I2_BASE_API Socket : public Object +class I2_BASE_API Socket : public Stream { public: typedef shared_ptr Ptr; @@ -35,18 +35,23 @@ public: ~Socket(void); - boost::signal OnClosed; - virtual void Start(void); - void Close(void); + virtual void Close(void); String GetClientAddress(void); String GetPeerAddress(void); bool IsConnected(void) const; - void CheckException(void); + size_t GetAvailableBytes(void) const; + size_t Read(void *buffer, size_t size); + size_t Peek(void *buffer, size_t size); + void Write(const void *buffer, size_t size); + + void Listen(void); + + boost::signal OnNewClient; protected: Socket(void); @@ -59,34 +64,49 @@ protected: int GetError(void) const; static int GetLastSocketError(void); - virtual bool WantsToRead(void) const; - virtual bool WantsToWrite(void) const; - - virtual void HandleReadable(void); - virtual void HandleWritable(void); - virtual void HandleException(void); - - virtual void CloseInternal(bool from_dtor); - mutable boost::mutex m_SocketMutex; private: SOCKET m_FD; /**< The socket descriptor. */ bool m_Connected; + bool m_Listening; thread m_ReadThread; thread m_WriteThread; condition_variable m_WriteCV; - boost::exception_ptr m_Exception; - void ReadThreadProc(void); void WriteThreadProc(void); void ExceptionEventHandler(void); static String GetAddressFromSockaddr(sockaddr *address, socklen_t len); + + mutable boost::mutex m_QueueMutex; + FIFO::Ptr m_SendQueue; + FIFO::Ptr m_RecvQueue; + + void HandleWritableClient(void); + void HandleReadableClient(void); + + void HandleWritableServer(void); + void HandleReadableServer(void); + + void HandleReadable(void); + void HandleWritable(void); + void HandleException(void); + + bool WantsToWriteClient(void) const; + bool WantsToReadClient(void) const; + + bool WantsToWriteServer(void) const; + bool WantsToReadServer(void) const; + + bool WantsToWrite(void) const; + bool WantsToRead(void) const; + + void CloseUnlocked(void); }; /** diff --git a/lib/base/stream.cpp b/lib/base/stream.cpp new file mode 100644 index 000000000..fa26d6823 --- /dev/null +++ b/lib/base/stream.cpp @@ -0,0 +1,79 @@ +/****************************************************************************** + * Icinga 2 * + * Copyright (C) 2012 Icinga Development Team (http://www.icinga.org/) * + * * + * This program is free software; you can redistribute it and/or * + * modify it under the terms of the GNU General Public License * + * as published by the Free Software Foundation; either version 2 * + * of the License, or (at your option) any later version. * + * * + * This program is distributed in the hope that it will be useful, * + * but WITHOUT ANY WARRANTY; without even the implied warranty of * + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * + * GNU General Public License for more details. * + * * + * You should have received a copy of the GNU General Public License * + * along with this program; if not, write to the Free Software Foundation * + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. * + ******************************************************************************/ + +#include "i2-base.h" + +using namespace icinga; + +Stream::Stream(void) + : m_Connected(false) +{ } + +Stream::~Stream(void) +{ + assert(!m_Running); +} + +bool Stream::IsConnected(void) const +{ + return m_Connected; +} + +void Stream::SetConnected(bool connected) +{ + m_Connected = connected; + + if (m_Connected) + Event::Post(boost::bind(boost::ref(OnConnected), GetSelf())); + else + Event::Post(boost::bind(boost::ref(OnClosed), GetSelf())); +} + +/** + * Checks whether an exception is available for this socket and re-throws + * the exception if there is one. + */ +void Stream::CheckException(void) +{ + if (m_Exception) + rethrow_exception(m_Exception); +} + +void Stream::SetException(boost::exception_ptr exception) +{ + m_Exception = exception; +} + +boost::exception_ptr Stream::GetException(void) +{ + return m_Exception; +} + +void Stream::Start(void) +{ + m_Running = true; +} + +void Stream::Close(void) +{ + assert(m_Running); + m_Running = false; + + SetConnected(false); +} diff --git a/lib/base/ioqueue.h b/lib/base/stream.h similarity index 66% rename from lib/base/ioqueue.h rename to lib/base/stream.h index 842e9461f..cdb01f314 100644 --- a/lib/base/ioqueue.h +++ b/lib/base/stream.h @@ -17,20 +17,28 @@ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. * ******************************************************************************/ -#ifndef IOQUEUE_H -#define IOQUEUE_H +#ifndef STREAM_H +#define STREAM_H namespace icinga { /** - * An I/O queue. + * A stream. * * @ingroup base */ -class IOQueue +class I2_BASE_API Stream : public Object { public: + typedef shared_ptr Ptr; + typedef weak_ptr WeakPtr; + + Stream(void); + ~Stream(void); + + virtual void Start(void); + /** * Retrieves the number of bytes available for reading. * @@ -39,37 +47,61 @@ public: virtual size_t GetAvailableBytes(void) const = 0; /** - * Reads data from the queue without advancing the read pointer. Trying - * to read more data than is available in the queue is a programming error. - * Use GetBytesAvailable() to check how much data is available. + * Reads data from the stream without advancing the read pointer. * * @param buffer The buffer where data should be stored. May be NULL if * you're not actually interested in the data. * @param count The number of bytes to read from the queue. + * @returns The number of bytes actually read. */ - virtual void Peek(void *buffer, size_t count) = 0; + virtual size_t Peek(void *buffer, size_t count) = 0; /** - * Reads data from the queue. Trying to read more data than is - * available in the queue is a programming error. Use GetBytesAvailable() - * to check how much data is available. + * Reads data from the stream. * * @param buffer The buffer where data should be stored. May be NULL if you're * not actually interested in the data. * @param count The number of bytes to read from the queue. + * @returns The number of bytes actually read. */ - virtual void Read(void *buffer, size_t count) = 0; + virtual size_t Read(void *buffer, size_t count) = 0; /** - * Writes data to the queue. + * Writes data to the stream. * * @param buffer The data that is to be written. * @param count The number of bytes to write. * @returns The number of bytes written */ virtual void Write(const void *buffer, size_t count) = 0; + + /** + * Closes the stream and releases resources. + */ + virtual void Close(void); + + bool IsConnected(void) const; + + boost::exception_ptr GetException(void); + void CheckException(void); + + boost::signal OnConnected; + boost::signal OnDataAvailable; + boost::signal OnClosed; + +protected: + void SetConnected(bool connected); + + void SetException(boost::exception_ptr exception); + +private: + bool m_Running; + bool m_Connected; + boost::exception_ptr m_Exception; }; +BIO *BIO_Stream_new(const Stream::Ptr& stream); + } -#endif /* IOQUEUE_H */ +#endif /* STREAM_H */ diff --git a/lib/base/stream_bio.cpp b/lib/base/stream_bio.cpp new file mode 100644 index 000000000..ccdda1b83 --- /dev/null +++ b/lib/base/stream_bio.cpp @@ -0,0 +1,121 @@ +#include "i2-base.h" + +using namespace icinga; + +int I2Stream_new(BIO *bi); +int I2Stream_free(BIO *bi); +int I2Stream_read(BIO *bi, char *out, int outl); +int I2Stream_write(BIO *bi, const char *in, int inl); +long I2Stream_ctrl(BIO *bi, int cmd, long num, void *ptr); +int I2Stream_gets(BIO *bi, char *buf, int size); +int I2Stream_puts(BIO *bi, const char *str); + +#define BIO_TYPE_I2STREAM (99|0x0400|0x0100) + +static BIO_METHOD I2Stream_method = +{ + BIO_TYPE_I2STREAM, + "Icinga Stream", + I2Stream_write, + I2Stream_read, + NULL, + NULL, + I2Stream_ctrl, + I2Stream_new, + I2Stream_free, + NULL, +}; + +typedef struct I2Stream_bio_s +{ + Stream::Ptr Stream; + boost::exception_ptr Exception; +} I2Stream_bio_t; + +BIO_METHOD *BIO_s_I2Stream(void) +{ + return &I2Stream_method; +} + +BIO *icinga::BIO_new_I2Stream(const Stream::Ptr& stream) +{ + BIO *bi = BIO_new(BIO_s_I2Stream()); + + if (bi == NULL) + return NULL; + + I2Stream_bio_t *bp = (I2Stream_bio_t *)bi->ptr; + + bp->Stream = stream; + + return bi; +} + +void icinga::I2Stream_check_exception(BIO *bi) { + I2Stream_bio_t *bp = (I2Stream_bio_t *)bi->ptr; + + if (bp->Exception) { + boost::exception_ptr ptr = bp->Exception; + bp->Exception = boost::exception_ptr(); + rethrow_exception(ptr); + } +} + +static int I2Stream_new(BIO *bi) +{ + bi->shutdown = 0; + bi->init = 1; + bi->num = -1; + bi->ptr = new I2Stream_bio_t; + + return 1; +} + +static int I2Stream_free(BIO *bi) +{ + I2Stream_bio_t *bp = (I2Stream_bio_t *)bi->ptr; + delete bp; + + return 1; +} + +static int I2Stream_read(BIO *bi, char *out, int outl) +{ + I2Stream_bio_t *bp = (I2Stream_bio_t *)bi->ptr; + + size_t data_read; + + BIO_clear_retry_flags(bi); + + try { + data_read = bp->Stream->Read(out, outl); + } catch (...) { + bp->Exception = boost::current_exception(); + return -1; + } + + if (data_read == 0) { + BIO_set_retry_read(bi); + return -1; + } + + return data_read; +} + +static int I2Stream_write(BIO *bi, const char *in, int inl) +{ + I2Stream_bio_t *bp = (I2Stream_bio_t *)bi->ptr; + bp->Stream->Write(in, inl); + + return inl; +} + +static long I2Stream_ctrl(BIO *bi, int cmd, long num, void *ptr) +{ + switch (cmd) { + case BIO_CTRL_FLUSH: + return 1; + default: + return 0; + } +} diff --git a/lib/remoting/jsonrpcserver.h b/lib/base/stream_bio.h similarity index 81% rename from lib/remoting/jsonrpcserver.h rename to lib/base/stream_bio.h index 4cf9a5841..3f40f9195 100644 --- a/lib/remoting/jsonrpcserver.h +++ b/lib/base/stream_bio.h @@ -17,26 +17,15 @@ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. * ******************************************************************************/ -#ifndef JSONRPCSERVER_H -#define JSONRPCSERVER_H +#ifndef STREAMBIO_H +#define STREAMBIO_H namespace icinga { -/** - * A JSON-RPC server. - * - * @ingroup remoting - */ -class I2_REMOTING_API JsonRpcServer : public TcpServer -{ -public: - typedef shared_ptr Ptr; - typedef weak_ptr WeakPtr; - - JsonRpcServer(shared_ptr sslContext); -}; +BIO *BIO_new_I2Stream(const Stream::Ptr& stream); +void I2Stream_check_exception(BIO *bi); } -#endif /* JSONRPCSERVER_H */ +#endif /* STREAMBIO_H */ \ No newline at end of file diff --git a/lib/base/tcpclient.cpp b/lib/base/tcpclient.cpp deleted file mode 100644 index c21a438cd..000000000 --- a/lib/base/tcpclient.cpp +++ /dev/null @@ -1,256 +0,0 @@ -/****************************************************************************** - * Icinga 2 * - * Copyright (C) 2012 Icinga Development Team (http://www.icinga.org/) * - * * - * This program is free software; you can redistribute it and/or * - * modify it under the terms of the GNU General Public License * - * as published by the Free Software Foundation; either version 2 * - * of the License, or (at your option) any later version. * - * * - * This program is distributed in the hope that it will be useful, * - * but WITHOUT ANY WARRANTY; without even the implied warranty of * - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * - * GNU General Public License for more details. * - * * - * You should have received a copy of the GNU General Public License * - * along with this program; if not, write to the Free Software Foundation * - * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. * - ******************************************************************************/ - -#include "i2-base.h" - -using namespace icinga; - -/** - * Constructor for the TcpClient class. - * - * @param role The role of the TCP client socket. - */ -TcpClient::TcpClient(TcpClientRole role) - : m_SendQueue(boost::make_shared()), - m_RecvQueue(boost::make_shared()), - m_Role(role) -{ } - -/** - * Retrieves the role of the socket. - * - * @returns The role. - */ -TcpClientRole TcpClient::GetRole(void) const -{ - return m_Role; -} - -/** - * Creates a socket and connects to the specified node and service. - * - * @param node The node. - * @param service The service. - */ -void TcpClient::Connect(const String& node, const String& service) -{ - m_Role = RoleOutbound; - - addrinfo hints; - addrinfo *result; - - memset(&hints, 0, sizeof(hints)); - hints.ai_family = AF_UNSPEC; - hints.ai_socktype = SOCK_STREAM; - hints.ai_protocol = IPPROTO_TCP; - - int rc = getaddrinfo(node.CStr(), service.CStr(), &hints, &result); - - if (rc < 0) - throw_exception(SocketException("getaddrinfo() failed", GetLastSocketError())); - - int fd = INVALID_SOCKET; - - for (addrinfo *info = result; info != NULL; info = info->ai_next) { - fd = socket(info->ai_family, info->ai_socktype, info->ai_protocol); - - if (fd == INVALID_SOCKET) - continue; - - SetFD(fd); - - rc = connect(fd, info->ai_addr, info->ai_addrlen); - -#ifdef _WIN32 - if (rc < 0 && WSAGetLastError() != WSAEWOULDBLOCK) { -#else /* _WIN32 */ - if (rc < 0 && errno != EINPROGRESS) { -#endif /* _WIN32 */ - closesocket(fd); - SetFD(INVALID_SOCKET); - - continue; - } - - if (rc >= 0) { - SetConnected(true); - OnConnected(GetSelf()); - } - - break; - } - - freeaddrinfo(result); - - if (fd == INVALID_SOCKET) - throw_exception(runtime_error("Could not create a suitable socket.")); -} - -/** - * Processes data that is available for this socket. - */ -void TcpClient::HandleWritable(void) -{ - int rc; - char data[1024]; - size_t count; - - if (!IsConnected()) { - SetConnected(true); - Event::Post(boost::bind(boost::cref(OnConnected), GetSelf())); - } - - for (;;) { - { - boost::mutex::scoped_lock lock(m_QueueMutex); - - count = m_SendQueue->GetAvailableBytes(); - - if (count == 0) - break; - - if (count > sizeof(data)) - count = sizeof(data); - - m_SendQueue->Peek(data, count); - } - - rc = send(GetFD(), data, count, 0); - - if (rc <= 0) - throw_exception(SocketException("send() failed", GetError())); - - { - boost::mutex::scoped_lock lock(m_QueueMutex); - m_SendQueue->Read(NULL, rc); - } - } -} - -/** - * Implements IOQueue::GetAvailableBytes. - */ -size_t TcpClient::GetAvailableBytes(void) const -{ - boost::mutex::scoped_lock lock(m_QueueMutex); - - return m_RecvQueue->GetAvailableBytes(); -} - -/** - * Implements IOQueue::Peek. - */ -void TcpClient::Peek(void *buffer, size_t count) -{ - boost::mutex::scoped_lock lock(m_QueueMutex); - - m_RecvQueue->Peek(buffer, count); -} - -/** - * Implements IOQueue::Read. - */ -void TcpClient::Read(void *buffer, size_t count) -{ - boost::mutex::scoped_lock lock(m_QueueMutex); - - m_RecvQueue->Read(buffer, count); -} - -/** - * Implements IOQueue::Write. - */ -void TcpClient::Write(const void *buffer, size_t count) -{ - boost::mutex::scoped_lock lock(m_QueueMutex); - - m_SendQueue->Write(buffer, count); -} - -/** - * Processes data that can be written for this socket. - */ -void TcpClient::HandleReadable(void) -{ - if (!IsConnected()) { - SetConnected(true); - Event::Post(boost::bind(boost::cref(OnConnected), GetSelf())); - } - - for (;;) { - char data[1024]; - int rc = recv(GetFD(), data, sizeof(data), 0); - - #ifdef _WIN32 - if (rc < 0 && WSAGetLastError() == WSAEWOULDBLOCK) - #else /* _WIN32 */ - if (rc < 0 && errno == EAGAIN) - #endif /* _WIN32 */ - return; - - if (rc <= 0) - throw_exception(SocketException("recv() failed", GetError())); - - { - boost::mutex::scoped_lock lock(m_QueueMutex); - - m_RecvQueue->Write(data, rc); - } - } - - Event::Post(boost::bind(boost::ref(OnDataAvailable), GetSelf())); -} - -/** - * Checks whether data should be read for this socket. - * - * @returns true - */ -bool TcpClient::WantsToRead(void) const -{ - return true; -} - -/** - * Checks whether data should be written for this socket. - * - * @returns true if data should be written, false otherwise. - */ -bool TcpClient::WantsToWrite(void) const -{ - { - boost::mutex::scoped_lock lock(m_QueueMutex); - - if (m_SendQueue->GetAvailableBytes() > 0) - return true; - } - - return (!IsConnected()); -} - -/** - * Default factory function for TCP clients. - * - * @param role The role of the new client. - * @returns The new client. - */ -TcpClient::Ptr icinga::TcpClientFactory(TcpClientRole role) -{ - return boost::make_shared(role); -} diff --git a/lib/base/tcpclient.h b/lib/base/tcpclient.h deleted file mode 100644 index 64d33fef9..000000000 --- a/lib/base/tcpclient.h +++ /dev/null @@ -1,90 +0,0 @@ -/****************************************************************************** - * Icinga 2 * - * Copyright (C) 2012 Icinga Development Team (http://www.icinga.org/) * - * * - * This program is free software; you can redistribute it and/or * - * modify it under the terms of the GNU General Public License * - * as published by the Free Software Foundation; either version 2 * - * of the License, or (at your option) any later version. * - * * - * This program is distributed in the hope that it will be useful, * - * but WITHOUT ANY WARRANTY; without even the implied warranty of * - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * - * GNU General Public License for more details. * - * * - * You should have received a copy of the GNU General Public License * - * along with this program; if not, write to the Free Software Foundation * - * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. * - ******************************************************************************/ - -#ifndef TCPCLIENT_H -#define TCPCLIENT_H - -namespace icinga -{ - -/** - * The role of a TCP client object. - * - * @ingroup base - */ -enum TcpClientRole -{ - RoleInbound, /**< Inbound socket, i.e. one that was returned - from accept(). */ - RoleOutbound /**< Outbound socket, i.e. one that is connect()'d to a - remote socket. */ -}; - -/** - * A TCP client connection. - * - * @ingroup base - */ -class I2_BASE_API TcpClient : public TcpSocket, public IOQueue -{ -public: - typedef shared_ptr Ptr; - typedef weak_ptr WeakPtr; - - TcpClient(TcpClientRole role); - - TcpClientRole GetRole(void) const; - - void Connect(const String& node, const String& service); - - boost::signal OnConnected; - boost::signal OnDataAvailable; - - virtual size_t GetAvailableBytes(void) const; - virtual void Peek(void *buffer, size_t count); - virtual void Read(void *buffer, size_t count); - virtual void Write(const void *buffer, size_t count); - -protected: - virtual bool WantsToRead(void) const; - virtual bool WantsToWrite(void) const; - - virtual void HandleReadable(void); - virtual void HandleWritable(void); - - mutable boost::mutex m_QueueMutex; - FIFO::Ptr m_SendQueue; - FIFO::Ptr m_RecvQueue; - -private: - TcpClientRole m_Role; -}; - -/** - * Returns a new unconnected TcpClient object that has the specified - * connection role. - * - * @param role The role of the new object. - * @returns A new TcpClient object. - */ -TcpClient::Ptr TcpClientFactory(TcpClientRole role); - -} - -#endif /* TCPCLIENT_H */ diff --git a/lib/base/tcpserver.cpp b/lib/base/tcpserver.cpp deleted file mode 100644 index 4bfb3db81..000000000 --- a/lib/base/tcpserver.cpp +++ /dev/null @@ -1,89 +0,0 @@ -/****************************************************************************** - * Icinga 2 * - * Copyright (C) 2012 Icinga Development Team (http://www.icinga.org/) * - * * - * This program is free software; you can redistribute it and/or * - * modify it under the terms of the GNU General Public License * - * as published by the Free Software Foundation; either version 2 * - * of the License, or (at your option) any later version. * - * * - * This program is distributed in the hope that it will be useful, * - * but WITHOUT ANY WARRANTY; without even the implied warranty of * - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * - * GNU General Public License for more details. * - * * - * You should have received a copy of the GNU General Public License * - * along with this program; if not, write to the Free Software Foundation * - * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. * - ******************************************************************************/ - -#include "i2-base.h" - -using namespace icinga; - -/** - * Constructor for the TcpServer class. - */ -TcpServer::TcpServer(void) - : m_ClientFactory(boost::bind(&TcpClientFactory, RoleInbound)) -{ } - -/** - * Sets the client factory. - * - * @param clientFactory The client factory function. - */ -void TcpServer::SetClientFactory(const TcpServer::ClientFactory& clientFactory) -{ - m_ClientFactory = clientFactory; -} - -/** - * Retrieves the client factory. - * - * @returns The client factory function. - */ -TcpServer::ClientFactory TcpServer::GetFactoryFunction(void) const -{ - return m_ClientFactory; -} - -/** - * Starts listening for incoming client connections. - */ -void TcpServer::Listen(void) -{ - if (listen(GetFD(), SOMAXCONN) < 0) - throw_exception(SocketException("listen() failed", GetError())); -} - -/** - * Checks whether the TCP server wants to read (i.e. accept new clients). - * - * @returns true - */ -bool TcpServer::WantsToRead(void) const -{ - return true; -} - -/** - * Accepts a new client and creates a new client object for it - * using the client factory function. - */ -void TcpServer::HandleReadable(void) -{ - int fd; - sockaddr_storage addr; - socklen_t addrlen = sizeof(addr); - - fd = accept(GetFD(), (sockaddr *)&addr, &addrlen); - - if (fd < 0) - throw_exception(SocketException("accept() failed", GetError())); - - TcpClient::Ptr client = m_ClientFactory(fd); - - Event::Post(boost::bind(boost::ref(OnNewClient), GetSelf(), client)); -} - diff --git a/lib/base/tcpsocket.cpp b/lib/base/tcpsocket.cpp index 2e7bf3d2a..c42499211 100644 --- a/lib/base/tcpsocket.cpp +++ b/lib/base/tcpsocket.cpp @@ -21,23 +21,6 @@ using namespace icinga; -/** - * Creates a socket. - * - * @param family The socket family for the new socket. - */ -void TcpSocket::MakeSocket(int family) -{ - assert(GetFD() == INVALID_SOCKET); - - int fd = socket(family, SOCK_STREAM, 0); - - if (fd == INVALID_SOCKET) - throw_exception(SocketException("socket() failed", GetLastSocketError())); - - SetFD(fd); -} - /** * Creates a socket and binds it to the specified service. * @@ -110,3 +93,61 @@ void TcpSocket::Bind(String node, String service, int family) if (fd == INVALID_SOCKET) throw_exception(runtime_error("Could not create a suitable socket.")); } + +/** + * Creates a socket and connects to the specified node and service. + * + * @param node The node. + * @param service The service. + */ +void TcpSocket::Connect(const String& node, const String& service) +{ + addrinfo hints; + addrinfo *result; + + memset(&hints, 0, sizeof(hints)); + hints.ai_family = AF_UNSPEC; + hints.ai_socktype = SOCK_STREAM; + hints.ai_protocol = IPPROTO_TCP; + + int rc = getaddrinfo(node.CStr(), service.CStr(), &hints, &result); + + if (rc < 0) + throw_exception(SocketException("getaddrinfo() failed", GetLastSocketError())); + + int fd = INVALID_SOCKET; + + for (addrinfo *info = result; info != NULL; info = info->ai_next) { + fd = socket(info->ai_family, info->ai_socktype, info->ai_protocol); + + if (fd == INVALID_SOCKET) + continue; + + SetFD(fd); + + rc = connect(fd, info->ai_addr, info->ai_addrlen); + +#ifdef _WIN32 + if (rc < 0 && WSAGetLastError() != WSAEWOULDBLOCK) { +#else /* _WIN32 */ + if (rc < 0 && errno != EINPROGRESS) { +#endif /* _WIN32 */ + closesocket(fd); + SetFD(INVALID_SOCKET); + + continue; + } + + if (rc >= 0) { + SetConnected(true); + OnConnected(GetSelf()); + } + + break; + } + + freeaddrinfo(result); + + if (fd == INVALID_SOCKET) + throw_exception(runtime_error("Could not create a suitable socket.")); +} diff --git a/lib/base/tcpsocket.h b/lib/base/tcpsocket.h index e1027e108..2d8f6b4c3 100644 --- a/lib/base/tcpsocket.h +++ b/lib/base/tcpsocket.h @@ -37,8 +37,7 @@ public: void Bind(String service, int family); void Bind(String node, String service, int family); -private: - void MakeSocket(int family); + void Connect(const String& node, const String& service); }; } diff --git a/lib/base/tlsclient.cpp b/lib/base/tlsstream.cpp similarity index 50% rename from lib/base/tlsclient.cpp rename to lib/base/tlsstream.cpp index 1f4f2a270..947ed9194 100644 --- a/lib/base/tlsclient.cpp +++ b/lib/base/tlsstream.cpp @@ -21,21 +21,26 @@ using namespace icinga; -int I2_EXPORT TlsClient::m_SSLIndex; -bool I2_EXPORT TlsClient::m_SSLIndexInitialized = false; +int I2_EXPORT TlsStream::m_SSLIndex; +bool I2_EXPORT TlsStream::m_SSLIndexInitialized = false; /** - * Constructor for the TlsClient class. + * Constructor for the TlsStream class. * * @param role The role of the client. * @param sslContext The SSL context for the client. */ -TlsClient::TlsClient(TcpClientRole role, shared_ptr sslContext) - : TcpClient(role), m_SSLContext(sslContext), - m_BlockRead(false), m_BlockWrite(false) -{ } +TlsStream::TlsStream(const Stream::Ptr& innerStream, TlsRole role, shared_ptr sslContext) + : m_InnerStream(innerStream), m_Role(role), m_SSLContext(sslContext), + m_SendQueue(boost::make_shared()), m_RecvQueue(boost::make_shared()) +{ + m_InnerStream->OnDataAvailable.connect(boost::bind(&TlsStream::DataAvailableHandler, this)); + m_InnerStream->OnClosed.connect(boost::bind(&TlsStream::ClosedHandler, this)); + m_SendQueue->Start(); + m_RecvQueue->Start(); +} -void TlsClient::Start(void) +void TlsStream::Start(void) { m_SSL = shared_ptr(SSL_new(m_SSLContext.get()), SSL_free); @@ -48,7 +53,7 @@ void TlsClient::Start(void) throw_exception(logic_error("No X509 client certificate was specified.")); if (!m_SSLIndexInitialized) { - m_SSLIndex = SSL_get_ex_new_index(0, const_cast("TlsClient"), NULL, NULL, NULL); + m_SSLIndex = SSL_get_ex_new_index(0, const_cast("TlsStream"), NULL, NULL, NULL); m_SSLIndexInitialized = true; } @@ -56,21 +61,24 @@ void TlsClient::Start(void) SSL_set_verify(m_SSL.get(), SSL_VERIFY_PEER | SSL_VERIFY_FAIL_IF_NO_PEER_CERT, NULL); - SSL_set_fd(m_SSL.get(), GetFD()); + m_BIO = BIO_new_I2Stream(m_InnerStream); + SSL_set_bio(m_SSL.get(), m_BIO, m_BIO); - if (GetRole() == RoleInbound) + if (m_Role == TlsRoleServer) SSL_set_accept_state(m_SSL.get()); else SSL_set_connect_state(m_SSL.get()); - int rc = SSL_do_handshake(m_SSL.get()); + /*int rc = SSL_do_handshake(m_SSL.get()); if (rc == 1) { SetConnected(true); OnConnected(GetSelf()); - } + }*/ - Socket::Start(); + Stream::Start(); + + HandleIO(); } /** @@ -78,10 +86,8 @@ void TlsClient::Start(void) * * @returns The X509 certificate. */ -shared_ptr TlsClient::GetClientCertificate(void) const +shared_ptr TlsStream::GetClientCertificate(void) const { - boost::mutex::scoped_lock lock(m_SocketMutex); - return shared_ptr(SSL_get_certificate(m_SSL.get()), &Utility::NullDeleter); } @@ -90,180 +96,146 @@ shared_ptr TlsClient::GetClientCertificate(void) const * * @returns The X509 certificate. */ -shared_ptr TlsClient::GetPeerCertificate(void) const +shared_ptr TlsStream::GetPeerCertificate(void) const { - boost::mutex::scoped_lock lock(m_SocketMutex); - return shared_ptr(SSL_get_peer_certificate(m_SSL.get()), X509_free); } +void TlsStream::DataAvailableHandler(void) +{ + try { + HandleIO(); + } catch (...) { + SetException(boost::current_exception()); + + Close(); + } +} + +void TlsStream::ClosedHandler(void) +{ + SetException(m_InnerStream->GetException()); + Close(); +} + /** - * Processes data that is available for this socket. + * Processes data for the stream. */ -void TlsClient::HandleReadable(void) +void TlsStream::HandleIO(void) { - m_BlockRead = false; - m_BlockWrite = false; + char data[16 * 1024]; + int rc; - for (;;) { - char data[1024]; - int rc; + if (!IsConnected()) { + rc = SSL_do_handshake(m_SSL.get()); - if (IsConnected()) { - rc = SSL_read(m_SSL.get(), data, sizeof(data)); + if (rc == 1) { + SetConnected(true); } else { - rc = SSL_do_handshake(m_SSL.get()); - - if (rc == 1) { - SetConnected(true); - Event::Post(boost::bind(boost::cref(OnConnected), GetSelf())); - return; - } - } - - if (rc <= 0) { switch (SSL_get_error(m_SSL.get(), rc)) { case SSL_ERROR_WANT_WRITE: - m_BlockRead = true; /* fall through */ case SSL_ERROR_WANT_READ: - goto post_event; + return; case SSL_ERROR_ZERO_RETURN: - CloseInternal(false); - goto post_event; + Close(); + return; default: - throw_exception(OpenSSLException("SSL_read failed", ERR_get_error())); + I2Stream_check_exception(m_BIO); + throw_exception(OpenSSLException("SSL_do_handshake failed", ERR_get_error())); } } + } - if (IsConnected()) { - boost::mutex::scoped_lock lock(m_QueueMutex); + bool new_data = false, read_ok = true; + while (read_ok) { + rc = SSL_read(m_SSL.get(), data, sizeof(data)); + + if (rc > 0) { m_RecvQueue->Write(data, rc); + new_data = true; + } else { + switch (SSL_get_error(m_SSL.get(), rc)) { + case SSL_ERROR_WANT_WRITE: + /* fall through */ + case SSL_ERROR_WANT_READ: + read_ok = false; + break; + case SSL_ERROR_ZERO_RETURN: + Close(); + return; + default: + I2Stream_check_exception(m_BIO); + throw_exception(OpenSSLException("SSL_read failed", ERR_get_error())); + } } } -post_event: - Event::Post(boost::bind(boost::ref(OnDataAvailable), GetSelf())); -} + if (new_data) + OnDataAvailable(GetSelf()); -/** - * Processes data that can be written for this socket. - */ -void TlsClient::HandleWritable(void) -{ - m_BlockRead = false; - m_BlockWrite = false; - - char data[1024]; - size_t count; + while (m_SendQueue->GetAvailableBytes() > 0) { + size_t count = m_SendQueue->GetAvailableBytes(); - for (;;) { - int rc; + if (count == 0) + break; - if (IsConnected()) { - { - boost::mutex::scoped_lock lock(m_QueueMutex); + if (count > sizeof(data)) + count = sizeof(data); - count = m_SendQueue->GetAvailableBytes(); + m_SendQueue->Peek(data, count); - if (count == 0) - break; + rc = SSL_write(m_SSL.get(), (const char *)data, count); - if (count > sizeof(data)) - count = sizeof(data); - - m_SendQueue->Peek(data, count); - } - - rc = SSL_write(m_SSL.get(), (const char *)data, count); + if (rc > 0) { + m_SendQueue->Read(NULL, rc); } else { - rc = SSL_do_handshake(m_SSL.get()); - - if (rc == 1) { - SetConnected(true); - Event::Post(boost::bind(boost::cref(OnConnected), GetSelf())); - return; - } - } - - if (rc <= 0) { switch (SSL_get_error(m_SSL.get(), rc)) { case SSL_ERROR_WANT_READ: - m_BlockWrite = true; /* fall through */ case SSL_ERROR_WANT_WRITE: return; case SSL_ERROR_ZERO_RETURN: - CloseInternal(false); + Close(); return; default: + I2Stream_check_exception(m_BIO); throw_exception(OpenSSLException("SSL_write failed", ERR_get_error())); } } - - if (IsConnected()) { - boost::mutex::scoped_lock lock(m_QueueMutex); - - m_SendQueue->Read(NULL, rc); - } } } /** - * Checks whether data should be read for this socket. - * - * @returns true if data should be read, false otherwise. + * Closes the stream. */ -bool TlsClient::WantsToRead(void) const +void TlsStream::Close(void) { - if (SSL_want_read(m_SSL.get())) - return true; - - if (m_BlockRead) - return false; + if (m_SSL) + SSL_shutdown(m_SSL.get()); - return TcpClient::WantsToRead(); + Stream::Close(); } -/** - * Checks whether data should be written for this socket. - * - * @returns true if data should be written, false otherwise. - */ -bool TlsClient::WantsToWrite(void) const +size_t TlsStream::GetAvailableBytes(void) const { - if (SSL_want_write(m_SSL.get())) - return true; - - if (m_BlockWrite) - return false; - - return TcpClient::WantsToWrite(); + return m_RecvQueue->GetAvailableBytes(); } -/** - * Closes the socket. - * - * @param from_dtor Whether this method was invoked from the destructor. - */ -void TlsClient::CloseInternal(bool from_dtor) +size_t TlsStream::Peek(void *buffer, size_t count) { - if (m_SSL) - SSL_shutdown(m_SSL.get()); - - TcpClient::CloseInternal(from_dtor); + return m_RecvQueue->Peek(buffer, count); } -/** - * Factory function for the TlsClient class. - * - * @param role The role of the TLS socket. - * @param sslContext The SSL context for the socket. - * @returns A new TLS socket. - */ -TcpClient::Ptr icinga::TlsClientFactory(TcpClientRole role, shared_ptr sslContext) +size_t TlsStream::Read(void *buffer, size_t count) { - return boost::make_shared(role, sslContext); + return m_RecvQueue->Read(buffer, count); } +void TlsStream::Write(const void *buffer, size_t count) +{ + m_SendQueue->Write(buffer, count); + + HandleIO(); +} diff --git a/lib/base/tlsclient.h b/lib/base/tlsstream.h similarity index 69% rename from lib/base/tlsclient.h rename to lib/base/tlsstream.h index 61d8d3496..f0d278355 100644 --- a/lib/base/tlsclient.h +++ b/lib/base/tlsstream.h @@ -17,53 +17,66 @@ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. * ******************************************************************************/ -#ifndef TLSCLIENT_H -#define TLSCLIENT_H +#ifndef TLSSTREAM_H +#define TLSSTREAM_H namespace icinga { +typedef enum +{ + TlsRoleClient, + TlsRoleServer +} TlsRole; + /** - * A TLS client connection. + * A TLS stream. * * @ingroup base */ -class I2_BASE_API TlsClient : public TcpClient +class I2_BASE_API TlsStream : public Stream { public: - TlsClient(TcpClientRole role, shared_ptr sslContext); + typedef shared_ptr Ptr; + typedef weak_ptr WeakPtr; - virtual void Start(void); + TlsStream(const Stream::Ptr& innerStream, TlsRole role, shared_ptr sslContext); shared_ptr GetClientCertificate(void) const; shared_ptr GetPeerCertificate(void) const; -protected: - void HandleSSLError(void); + void Start(void); + virtual void Close(void); - virtual bool WantsToRead(void) const; - virtual bool WantsToWrite(void) const; + virtual size_t GetAvailableBytes(void) const; + virtual size_t Peek(void *buffer, size_t count); + virtual size_t Read(void *buffer, size_t count); + virtual void Write(const void *buffer, size_t count); + +protected: + void DataAvailableHandler(void); + void ClosedHandler(void); - virtual void HandleReadable(void); - virtual void HandleWritable(void); + void HandleIO(void); private: shared_ptr m_SSLContext; shared_ptr m_SSL; + BIO *m_BIO; - bool m_BlockRead; - bool m_BlockWrite; + FIFO::Ptr m_SendQueue; + FIFO::Ptr m_RecvQueue; + + Stream::Ptr m_InnerStream; + TlsRole m_Role; static int m_SSLIndex; static bool m_SSLIndexInitialized; - virtual void CloseInternal(bool from_dtor); - static void NullCertificateDeleter(X509 *certificate); -}; -TcpClient::Ptr TlsClientFactory(TcpClientRole role, shared_ptr sslContext); +}; } -#endif /* TLSCLIENT_H */ +#endif /* TLSSTREAM_H */ diff --git a/lib/remoting/Makefile.am b/lib/remoting/Makefile.am index 4f5417bb9..eee7801c7 100644 --- a/lib/remoting/Makefile.am +++ b/lib/remoting/Makefile.am @@ -10,10 +10,8 @@ libremoting_la_SOURCES = \ endpointmanager.cpp \ endpointmanager.h \ i2-remoting.h \ - jsonrpcclient.cpp \ - jsonrpcclient.h \ - jsonrpcserver.cpp \ - jsonrpcserver.h \ + jsonrpcconnection.cpp \ + jsonrpcconnection.h \ messagepart.cpp \ messagepart.h \ requestmessage.cpp \ diff --git a/lib/remoting/endpoint.cpp b/lib/remoting/endpoint.cpp index c2dcb775d..1746189e7 100644 --- a/lib/remoting/endpoint.cpp +++ b/lib/remoting/endpoint.cpp @@ -111,37 +111,18 @@ bool Endpoint::IsConnected(void) const if (IsLocalEndpoint()) { return true; } else { - JsonRpcClient::Ptr client = GetClient(); + JsonRpcConnection::Ptr client = GetClient(); - return (client && client->IsConnected()); + return (client && client->GetStream()->IsConnected()); } } -/** - * Retrieves the address for the endpoint. - * - * @returns The endpoint's address. - */ -String Endpoint::GetAddress(void) const -{ - if (IsLocalEndpoint()) { - return "local:" + GetName(); - } else { - JsonRpcClient::Ptr client = GetClient(); - - if (!client || !client->IsConnected()) - return ""; - - return client->GetPeerAddress(); - } -} - -JsonRpcClient::Ptr Endpoint::GetClient(void) const +JsonRpcConnection::Ptr Endpoint::GetClient(void) const { return Get("client"); } -void Endpoint::SetClient(const JsonRpcClient::Ptr& client) +void Endpoint::SetClient(const JsonRpcConnection::Ptr& client) { Set("client", client); client->OnNewMessage.connect(boost::bind(&Endpoint::NewMessageHandler, this, _2)); @@ -337,14 +318,14 @@ void Endpoint::NewMessageHandler(const MessagePart& message) void Endpoint::ClientClosedHandler(void) { - try { + /*try { GetClient()->CheckException(); } catch (const exception& ex) { stringstream message; message << "Error occured for JSON-RPC socket: Message=" << ex.what(); Logger::Write(LogWarning, "jsonrpc", message.str()); - } + }*/ Logger::Write(LogWarning, "jsonrpc", "Lost connection to endpoint: identity=" + GetName()); diff --git a/lib/remoting/endpoint.h b/lib/remoting/endpoint.h index bfa6a9236..744469723 100644 --- a/lib/remoting/endpoint.h +++ b/lib/remoting/endpoint.h @@ -43,10 +43,8 @@ public: static bool Exists(const String& name); static Endpoint::Ptr GetByName(const String& name); - String GetAddress(void) const; - - JsonRpcClient::Ptr GetClient(void) const; - void SetClient(const JsonRpcClient::Ptr& client); + JsonRpcConnection::Ptr GetClient(void) const; + void SetClient(const JsonRpcConnection::Ptr& client); void RegisterSubscription(const String& topic); void UnregisterSubscription(const String& topic); diff --git a/lib/remoting/endpointmanager.cpp b/lib/remoting/endpointmanager.cpp index a0cb240e8..c25801add 100644 --- a/lib/remoting/endpointmanager.cpp +++ b/lib/remoting/endpointmanager.cpp @@ -110,11 +110,11 @@ void EndpointManager::AddListener(const String& service) s << "Adding new listener: port " << service; Logger::Write(LogInformation, "icinga", s.str()); - JsonRpcServer::Ptr server = boost::make_shared(sslContext); + TcpSocket::Ptr server = boost::make_shared(); m_Servers.insert(server); server->OnNewClient.connect(boost::bind(&EndpointManager::NewClientHandler, - this, _2)); + this, _2, TlsRoleServer)); server->Bind(service, AF_INET6); server->Listen(); @@ -133,9 +133,9 @@ void EndpointManager::AddConnection(const String& node, const String& service) { if (!sslContext) throw_exception(logic_error("SSL context is required for AddConnection()")); - JsonRpcClient::Ptr client = boost::make_shared(RoleOutbound, sslContext); + TcpSocket::Ptr client = boost::make_shared(); client->Connect(node, service); - NewClientHandler(client); + NewClientHandler(client, TlsRoleClient); } /** @@ -143,27 +143,30 @@ void EndpointManager::AddConnection(const String& node, const String& service) { * * @param client The new client. */ -void EndpointManager::NewClientHandler(const TcpClient::Ptr& client) +void EndpointManager::NewClientHandler(const Socket::Ptr& client, TlsRole role) { - JsonRpcClient::Ptr jclient = static_pointer_cast(client); + String peerAddress = client->GetPeerAddress(); + TlsStream::Ptr tlsStream = boost::make_shared(client, role, GetSSLContext()); + tlsStream->Start(); - m_PendingClients.insert(jclient); - jclient->OnConnected.connect(boost::bind(&EndpointManager::ClientConnectedHandler, this, _1)); - jclient->Start(); + m_PendingClients.insert(tlsStream); + tlsStream->OnConnected.connect(boost::bind(&EndpointManager::ClientConnectedHandler, this, _1, peerAddress)); + + client->Start(); } -void EndpointManager::ClientConnectedHandler(const TcpClient::Ptr& client) +void EndpointManager::ClientConnectedHandler(const Stream::Ptr& client, const String& peerAddress) { - JsonRpcClient::Ptr jclient = static_pointer_cast(client); - - Logger::Write(LogInformation, "icinga", "New client connection for " + jclient->GetPeerAddress()); + TlsStream::Ptr tlsStream = static_pointer_cast(client); + JsonRpcConnection::Ptr jclient = boost::make_shared(tlsStream); - m_PendingClients.erase(jclient); - - shared_ptr cert = jclient->GetPeerCertificate(); + m_PendingClients.erase(tlsStream); + shared_ptr cert = tlsStream->GetPeerCertificate(); String identity = Utility::GetCertificateCN(cert); + Logger::Write(LogInformation, "icinga", "New client connection at " + peerAddress + " for identity '" + identity + "'"); + Endpoint::Ptr endpoint; if (Endpoint::Exists(identity)) diff --git a/lib/remoting/endpointmanager.h b/lib/remoting/endpointmanager.h index 72c0ba910..9ae73ce43 100644 --- a/lib/remoting/endpointmanager.h +++ b/lib/remoting/endpointmanager.h @@ -70,8 +70,8 @@ private: Timer::Ptr m_ReconnectTimer; - set m_Servers; - set m_PendingClients; + set m_Servers; + set m_PendingClients; /** * Information about a pending API request. @@ -101,8 +101,8 @@ private: void ReconnectTimerHandler(void); - void NewClientHandler(const TcpClient::Ptr& client); - void ClientConnectedHandler(const TcpClient::Ptr& client); + void NewClientHandler(const Socket::Ptr& client, TlsRole rol); + void ClientConnectedHandler(const Stream::Ptr& client, const String& peerAddress); }; } diff --git a/lib/remoting/i2-remoting.h b/lib/remoting/i2-remoting.h index 132a2314b..cb18bf81e 100644 --- a/lib/remoting/i2-remoting.h +++ b/lib/remoting/i2-remoting.h @@ -39,8 +39,7 @@ #include "messagepart.h" #include "requestmessage.h" #include "responsemessage.h" -#include "jsonrpcclient.h" -#include "jsonrpcserver.h" +#include "jsonrpcconnection.h" #include "endpoint.h" #include "endpointmanager.h" diff --git a/lib/remoting/jsonrpcclient.cpp b/lib/remoting/jsonrpcconnection.cpp similarity index 68% rename from lib/remoting/jsonrpcclient.cpp rename to lib/remoting/jsonrpcconnection.cpp index 4fd349746..22ec9744f 100644 --- a/lib/remoting/jsonrpcclient.cpp +++ b/lib/remoting/jsonrpcconnection.cpp @@ -22,39 +22,35 @@ using namespace icinga; /** - * Constructor for the JsonRpcClient class. + * Constructor for the JsonRpcConnection class. * - * @param role The role of the underlying TCP client. - * @param sslContext SSL context for the TLS connection. + * @param stream The stream. */ -JsonRpcClient::JsonRpcClient(TcpClientRole role, shared_ptr sslContext) - : TlsClient(role, sslContext) -{ - OnDataAvailable.connect(boost::bind(&JsonRpcClient::DataAvailableHandler, - this)); -} +JsonRpcConnection::JsonRpcConnection(const Stream::Ptr& stream) + : Connection(stream) +{ } /** * Sends a message to the connected peer. * * @param message The message. */ -void JsonRpcClient::SendMessage(const MessagePart& message) +void JsonRpcConnection::SendMessage(const MessagePart& message) { Value value = message.GetDictionary(); String json = value.Serialize(); //std::cerr << ">> " << json << std::endl; - NetString::WriteStringToIOQueue(this, json); + NetString::WriteStringToStream(GetStream(), json); } /** * Processes inbound data. */ -void JsonRpcClient::DataAvailableHandler(void) +void JsonRpcConnection::ProcessData(void) { String jsonString; - while (NetString::ReadStringFromIOQueue(this, &jsonString)) { + while (NetString::ReadStringFromStream(GetStream(), &jsonString)) { //std::cerr << "<< " << jsonString << std::endl; try { @@ -73,20 +69,3 @@ void JsonRpcClient::DataAvailableHandler(void) } } } - -/** - * Factory function for JSON-RPC clients. - * - * @param fd The file descriptor. - * @param role The role of the underlying TCP client. - * @param sslContext SSL context for the TLS connection. - * @returns A new JSON-RPC client. - */ -JsonRpcClient::Ptr icinga::JsonRpcClientFactory(SOCKET fd, TcpClientRole role, - shared_ptr sslContext) -{ - JsonRpcClient::Ptr client = boost::make_shared(role, - sslContext); - client->SetFD(fd); - return client; -} diff --git a/lib/remoting/jsonrpcclient.h b/lib/remoting/jsonrpcconnection.h similarity index 70% rename from lib/remoting/jsonrpcclient.h rename to lib/remoting/jsonrpcconnection.h index c1e719ab4..84589e3bc 100644 --- a/lib/remoting/jsonrpcclient.h +++ b/lib/remoting/jsonrpcconnection.h @@ -17,37 +17,33 @@ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. * ******************************************************************************/ -#ifndef JSONRPCCLIENT_H -#define JSONRPCCLIENT_H +#ifndef JSONRPCCONNECTION_H +#define JSONRPCCONNECTION_H namespace icinga { /** - * A JSON-RPC client. + * A JSON-RPC connection. * * @ingroup remoting */ -class I2_REMOTING_API JsonRpcClient : public TlsClient +class I2_REMOTING_API JsonRpcConnection : public Connection { public: - typedef shared_ptr Ptr; - typedef weak_ptr WeakPtr; + typedef shared_ptr Ptr; + typedef weak_ptr WeakPtr; - JsonRpcClient(TcpClientRole role, shared_ptr sslContext); + JsonRpcConnection(const Stream::Ptr& stream); void SendMessage(const MessagePart& message); - boost::signal OnNewMessage; + boost::signal OnNewMessage; -private: - void DataAvailableHandler(void); - - friend JsonRpcClient::Ptr JsonRpcClientFactory(SOCKET fd, TcpClientRole role, shared_ptr sslContext); +protected: + virtual void ProcessData(void); }; -JsonRpcClient::Ptr JsonRpcClientFactory(SOCKET fd, TcpClientRole role, shared_ptr sslContext); - } -#endif /* JSONRPCCLIENT_H */ +#endif /* JSONRPCCONNECTION_H */ diff --git a/lib/remoting/remoting.vcxproj b/lib/remoting/remoting.vcxproj index 10c03c96a..2d79775ce 100644 --- a/lib/remoting/remoting.vcxproj +++ b/lib/remoting/remoting.vcxproj @@ -22,10 +22,9 @@ - + - @@ -37,10 +36,9 @@ Create Create - + - diff --git a/lib/remoting/remoting.vcxproj.filters b/lib/remoting/remoting.vcxproj.filters index 1e5766784..268ba65c3 100644 --- a/lib/remoting/remoting.vcxproj.filters +++ b/lib/remoting/remoting.vcxproj.filters @@ -1,15 +1,9 @@  - - Quelldateien - Quelldateien - - Quelldateien - Quelldateien @@ -25,14 +19,11 @@ Quelldateien + + Quelldateien + - - Headerdateien - - - Headerdateien - Headerdateien @@ -51,6 +42,9 @@ Headerdateien + + Headerdateien + -- 2.40.0