]> granicus.if.org Git - icinga2/commitdiff
Refactored the socket classes.
authorGunnar Beutner <gunnar@beutner.name>
Thu, 22 Nov 2012 11:04:32 +0000 (12:04 +0100)
committerGunnar Beutner <gunnar@beutner.name>
Thu, 22 Nov 2012 11:04:32 +0000 (12:04 +0100)
45 files changed:
components/compatido/Makefile.am
components/compatido/compatido.vcxproj
components/compatido/compatido.vcxproj.filters
components/compatido/compatidocomponent.cpp
components/compatido/compatidocomponent.h
components/compatido/i2-compatido.h
components/compatido/idoconnection.cpp [new file with mode: 0644]
components/compatido/idoconnection.h [moved from lib/remoting/jsonrpcserver.cpp with 78% similarity]
components/compatido/idosocket.cpp [deleted file]
components/compatido/idosocket.h [deleted file]
components/demo/democomponent.cpp
lib/base/Makefile.am
lib/base/base.vcxproj
lib/base/base.vcxproj.filters
lib/base/connection.cpp [new file with mode: 0644]
lib/base/connection.h [moved from lib/base/tcpserver.h with 68% similarity]
lib/base/dynamicobject.cpp
lib/base/fifo.cpp
lib/base/fifo.h
lib/base/i2-base.h
lib/base/netstring.cpp
lib/base/netstring.h
lib/base/socket.cpp
lib/base/socket.h
lib/base/stream.cpp [new file with mode: 0644]
lib/base/stream.h [moved from lib/base/ioqueue.h with 66% similarity]
lib/base/stream_bio.cpp [new file with mode: 0644]
lib/base/stream_bio.h [moved from lib/remoting/jsonrpcserver.h with 81% similarity]
lib/base/tcpclient.cpp [deleted file]
lib/base/tcpclient.h [deleted file]
lib/base/tcpserver.cpp [deleted file]
lib/base/tcpsocket.cpp
lib/base/tcpsocket.h
lib/base/tlsstream.cpp [moved from lib/base/tlsclient.cpp with 50% similarity]
lib/base/tlsstream.h [moved from lib/base/tlsclient.h with 69% similarity]
lib/remoting/Makefile.am
lib/remoting/endpoint.cpp
lib/remoting/endpoint.h
lib/remoting/endpointmanager.cpp
lib/remoting/endpointmanager.h
lib/remoting/i2-remoting.h
lib/remoting/jsonrpcconnection.cpp [moved from lib/remoting/jsonrpcclient.cpp with 68% similarity]
lib/remoting/jsonrpcconnection.h [moved from lib/remoting/jsonrpcclient.h with 70% similarity]
lib/remoting/remoting.vcxproj
lib/remoting/remoting.vcxproj.filters

index becebf3d0b8687a769680fb35c829b5c07a30261..c81e0cb6f73b92d3894ec6c6943a0b76b37a65f7 100644 (file)
@@ -4,11 +4,11 @@ pkglib_LTLIBRARIES = \
        compatido.la
 
 compatido_la_SOURCES = \
-       i2-compatido.h \
-       idosocket.cpp \
-       idosocket.h \
        compatidocomponent.cpp \
-       compatidocomponent.h
+       compatidocomponent.h \
+       i2-compatido.h \
+       idoconnection.cpp \
+       idoconnection.h
 
 compatido_la_CPPFLAGS = \
         $(BOOST_CPPFLAGS) \
index 5e00eb389d14677817566346615597caa5eb17d3..f2d2e3f3d2fd8d0b549f09c1f0efdf65c68c0042 100644 (file)
   </ItemDefinitionGroup>
   <ItemGroup>
     <ClCompile Include="compatidocomponent.cpp" />
-    <ClCompile Include="idosocket.cpp" />
+    <ClCompile Include="idoconnection.cpp" />
   </ItemGroup>
   <ItemGroup>
     <ClInclude Include="compatidocomponent.h" />
     <ClInclude Include="i2-compatido.h" />
-    <ClInclude Include="idosocket.h" />
+    <ClInclude Include="idoconnection.h" />
   </ItemGroup>
   <Import Project="$(VCTargetsPath)\Microsoft.Cpp.targets" />
   <ImportGroup Label="ExtensionTargets">
index 838aa7ef793c77bfa7f728400f4902ab76731535..7f00fa2504d0aeb337bb5811f4ae500b1ec821d2 100644 (file)
@@ -14,7 +14,7 @@
     <ClCompile Include="compatidocomponent.cpp">
       <Filter>Source Files</Filter>
     </ClCompile>
-    <ClCompile Include="idosocket.cpp">
+    <ClCompile Include="idoconnection.cpp">
       <Filter>Source Files</Filter>
     </ClCompile>
   </ItemGroup>
@@ -25,8 +25,8 @@
     <ClInclude Include="i2-compatido.h">
       <Filter>Header Files</Filter>
     </ClInclude>
-    <ClInclude Include="idosocket.h">
+    <ClInclude Include="idoconnection.h">
       <Filter>Header Files</Filter>
     </ClInclude>
   </ItemGroup>
-</Project>
+</Project>
\ No newline at end of file
index f61b0cbe13f7e15a21ad3e581c7e885644d184c4..4c2152f81b155b66a14169f322473f84417eb5b8 100644 (file)
@@ -38,13 +38,13 @@ const int CompatIdoComponent::DefaultReconnectInterval = 15;
 String CompatIdoComponent::GetSocketAddress(void) const
 {
        Value address = GetConfig()->Get("socket_address");
-       if(address.IsEmpty())
+
+       if (address.IsEmpty())
                return DefaultSocketAddress;
        else
                return address;
 }
 
-
 /**
  * Reads the socket port from the config
  * @returns port The config option, or static default
@@ -52,7 +52,8 @@ String CompatIdoComponent::GetSocketAddress(void) const
 String CompatIdoComponent::GetSocketPort(void) const
 {
        Value port = GetConfig()->Get("socket_port");
-       if(port.IsEmpty())
+
+       if (port.IsEmpty())
                return DefaultSocketPort;
        else
                return port;
@@ -65,7 +66,8 @@ String CompatIdoComponent::GetSocketPort(void) const
 String CompatIdoComponent::GetInstanceName(void) const
 {
        Value instance = GetConfig()->Get("instance_name");
-       if(instance.IsEmpty())
+
+       if (instance.IsEmpty())
                return DefaultInstanceName;
        else
                return instance;
@@ -75,34 +77,16 @@ String CompatIdoComponent::GetInstanceName(void) const
  * Reads the reconnect interval from the config
  * @returns reconnect_interval The config option, or static default
  */
-int CompatIdoComponent::GetReconnectInterval(void) const
+double CompatIdoComponent::GetReconnectInterval(void) const
 {
-        Value interval = GetConfig()->Get("reconnect_interval");
-        if(interval.IsEmpty())
-                return DefaultReconnectInterval;
-        else   
-                return interval;
-}
+       Value interval = GetConfig()->Get("reconnect_interval");
 
-/**
- * Sets config dump in progress state
- */
-void CompatIdoComponent::SetConfigDumpInProgress(bool state)
-{
-       m_ConfigDumpInProgress = state;
+       if (interval.IsEmpty())
+               return DefaultReconnectInterval;
+       else   
+               return interval;
 }
 
-/**
- * Get state of config in progress
- *
- * @returns state bis config dump in progress.
- */
-bool CompatIdoComponent::GetConfigDumpInProgress(void)
-{
-       return m_ConfigDumpInProgress;
-}
-
-
 /**
  * Starts the component.
  */
@@ -111,7 +95,7 @@ void CompatIdoComponent::Start(void)
        const int StatusTimerInterval = 60;
        const int ConfigTimerInterval = 3600;
        const int ProgramStatusTimerInterval = 15;
-       const int ReconnectTimerInterval = GetReconnectInterval();
+       const double ReconnectTimerInterval = GetReconnectInterval();
 
        /* FIXME - make this a config option when unix sockets are realdy */
 
@@ -125,12 +109,7 @@ void CompatIdoComponent::Start(void)
        /*
         * open ido socket once
         */
-       OpenIdoSocket(IdoSocketType);
-
-       /* 
-        * tell ido2db that we just started
-        */
-       SendStartProcess();
+       OpenIdoSocket();
 
        /*
         * ddump the config later (can't do that within start of the component)
@@ -171,7 +150,6 @@ void CompatIdoComponent::Start(void)
        m_ReconnectTimer->Start();
 }
 
-
 /**
  * Stops the component.
  */
@@ -180,24 +158,58 @@ void CompatIdoComponent::Stop(void)
        CloseIdoSocket();
 }
 
-
 /**
  * Opens the ido socket, and sends hello to ido2db
  */
-void CompatIdoComponent::OpenIdoSocket(bool sockettype)
+void CompatIdoComponent::OpenIdoSocket(void)
 {
-       OpenSink(GetSocketAddress(), GetSocketPort());
-       SendHello(GetInstanceName(), sockettype);
+       TcpSocket::Ptr socket = boost::make_shared<TcpSocket>();
+       socket->Connect(GetSocketAddress(), GetSocketPort());
+       socket->Start();
 
-       m_IdoSocket->SetSocketType(sockettype);
-       /* 
-        * if we're connected, do not reconnecte
-        */
-       if(m_IdoSocket->IsConnected()) {
-               m_IdoSocket->SetReconnect(false);
-       } else {
-               m_IdoSocket->SetReconnect(true);
-       }
+       m_IdoConnection = boost::make_shared<IdoConnection>(socket);
+       m_IdoConnection->OnClosed.connect(boost::bind(&CompatIdoComponent::SocketDisconnectHandler, this));
+
+       /* FIXME */
+#define COMPATIDO_PROTOCOL 2
+#define COMPATIDO_NAME "ICINGA2 COMPATIDO"
+#define COMPATIDO_RELEASE_VERSION "2.0"
+       
+       /* connection is always TCP */
+       /* connecttype is always initial */
+       stringstream msgHello;
+       msgHello << "\n\n"
+               << "HELLO" << "\n"
+               << "PROTOCOL" << ": " << COMPATIDO_PROTOCOL<< "\n"
+               << "AGENT" << ": " << COMPATIDO_NAME << "\n"
+               << "AGENTVERSION" << ": " << VERSION << "\n"
+               << "STARTTIME" << ": " << static_cast<int>(Utility::GetTime()) << "\n"
+               << "DISPOSITION" << ": " << "REALTIME" << "\n"
+               << "CONNECTION" << ": " << "TCPSOCKET" << "\n"
+               << "INSTANCENAME" << ": " << GetInstanceName() << "\n"
+               << "STARTDATADUMP"
+               << "\n\n";
+
+       m_IdoConnection->SendMessage(msgHello.str());
+
+/* TODO */
+#define PROGRAM_MODIFICATION_DATE "10-17-2012"
+#define PROGRAM_RELEASE_VERSION "2.0"
+
+       stringstream msgProcessData;
+       msgProcessData << "\n"
+               << 200 << "\n"                                          /* processdata */
+               << 1 << "=" << 104 << "\n"                              /* type = pprocess prelaunch */
+               << 2 << "=" << "" << "\n"                               /* flags */
+               << 3 << "=" << "" << "\n"                               /* attributes */
+               << 4 << "=" << std::setprecision(17) << Utility::GetTime() << "\n"              /* timestamp */
+               << 105 << "=" << "Icinga2" << "\n"                      /* progranname */
+               << 107 << "=" << PROGRAM_RELEASE_VERSION << "\n"                        /*  programversion */
+               << 104 << "=" << PROGRAM_MODIFICATION_DATE << "\n"                      /* programdata */
+               << 102 << "=" << Utility::GetPid() << "\n"                      /* process id */
+               << 999 << "\n\n";                                       /* enddata */
+
+       m_IdoConnection->SendMessage(msgProcessData.str());
 }
 
 /*
@@ -205,10 +217,20 @@ void CompatIdoComponent::OpenIdoSocket(bool sockettype)
  */
 void CompatIdoComponent::CloseIdoSocket(void)
 {
-       GoodByeSink();
-       CloseSink();
+       stringstream message;
+       message << "\n"
+               << 1000 << "\n"                                 /* enddatadump */
+               << "ENDTIME" << ": " << static_cast<int>(Utility::GetTime()) << "\n"            /* endtime */
+               << "GOODBYE"                                    /* goodbye */
+               << "\n\n";
+
+       m_IdoConnection->SendMessage(message.str());
 }
 
+void CompatIdoComponent::SocketDisconnectHandler(void)
+{
+       m_IdoConnection.reset();
+}
 
 /* TODO
  * subscribe to all status updates and checkresults and dump them
@@ -225,7 +247,7 @@ void CompatIdoComponent::StatusTimerHandler(void)
        Logger::Write(LogInformation, "compatido", "Writing compat ido status information");
 
        DumpStatusData();
-        DumpProgramStatusData();
+       DumpProgramStatusData();
 }
 
 /**
@@ -235,10 +257,7 @@ void CompatIdoComponent::ConfigTimerHandler(void)
 {
        Logger::Write(LogInformation, "compatido", "Writing compat ido config information");
 
-       /* protect the dump of status update messages */
-       SetConfigDumpInProgress(true);
        DumpConfigObjects();
-       SetConfigDumpInProgress(false);
 }
 
 /**
@@ -246,13 +265,9 @@ void CompatIdoComponent::ConfigTimerHandler(void)
  */
 void CompatIdoComponent::ProgramStatusTimerHandler(void)
 {
-       /* do not dump any data if config dump is still in progress */
-       if(GetConfigDumpInProgress())
-               return;
-
-        Logger::Write(LogInformation, "compatido", "Writing compat ido program status information");
+       Logger::Write(LogInformation, "compatido", "Writing compat ido program status information");
 
-        DumpProgramStatusData();
+       DumpProgramStatusData();
 }
 
 /**
@@ -260,153 +275,11 @@ void CompatIdoComponent::ProgramStatusTimerHandler(void)
  */
 void CompatIdoComponent::ReconnectTimerHandler(void)
 {
-        Logger::Write(LogDebug, "compatido", "Checking if ido socket requires reconnect");
-
-       if(m_IdoSocket->GetReconnect()) {
-
-               /* check if we aren't already connected */
-               if(m_IdoSocket->IsConnected()) {
-                       Logger::Write(LogDebug, "compatido", "Already connected to ido socket ... no reconnect necessary");
-                       return;
-               }
+       Logger::Write(LogDebug, "compatido", "Checking if ido socket requires reconnect");
 
+       if (!m_IdoConnection)
                /* socket was disconnected, recconnect */
-               OpenIdoSocket(m_IdoSocket->GetSocketType());
-
-               if(m_IdoSocket->IsConnected()) {
-                       Logger::Write(LogInformation, "compatido", "Successfully reconnected to ido socket");
-               } else {
-                       stringstream message;
-                       message << "Unable to reconnect to ido socket. Trying again in " << GetReconnectInterval() << " sec";
-                       Logger::Write(LogWarning, "compatido", message.str());
-               }
-       }
-}
-
-
-/**
- * opens a tcp connection to the ido socket
- */
-void CompatIdoComponent::OpenSink(String node, String service)
-{
-       m_IdoSocket = boost::make_shared<IdoSocket>(RoleOutbound);
-       m_IdoSocket->Connect(node, service);
-       m_IdoSocket->Start();
-}
-
-/**
- * sends hello msg to ido2b
- */
-void CompatIdoComponent::SendHello(String instancename, bool sockettype)
-{
-       /* FIXME */
-#define COMPATIDO_PROTOCOL 2
-#define COMPATIDO_NAME "ICINGA2 COMPATIDO"
-#define COMPATIDO_RELEASE_VERSION "2.0"
-
-       String connection;
-       if(sockettype)
-               connection = "TCPSOCKET";
-       else
-               connection = "UNIXSOCKET";
-       
-       /* connection is always TCP */
-       /* connecttype is always initial */
-       stringstream message;
-       message << "\n\n"
-               << "HELLO" << "\n"
-               << "PROTOCOL" << ": " << COMPATIDO_PROTOCOL<< "\n"
-               << "AGENT" << ": " << COMPATIDO_NAME << "\n"
-               << "AGENTVERSION" << ": " << VERSION << "\n"
-               << "STARTTIME" << ": " << static_cast<int>(Utility::GetTime()) << "\n"
-               << "DISPOSITION" << ": " << "REALTIME" << "\n"
-               << "CONNECTION" << ": " << connection << "\n"
-               << "INSTANCENAME" << ": " << instancename << "\n"
-               << "STARTDATADUMP"
-               << "\n\n";
-
-       m_IdoSocket->SendMessage(message.str());
-}
-
-/**
- * sends goodbye msg to ido
- */
-void CompatIdoComponent::GoodByeSink(void)
-{
-        stringstream message;
-        message << "\n"
-                << 1000 << "\n"                                /* enddatadump */
-                << "ENDTIME" << ": " << static_cast<int>(Utility::GetTime()) << "\n"           /* endtime */
-               << "GOODBYE"                                    /* goodbye */
-                << "\n\n";
-
-        m_IdoSocket->SendMessage(message.str());
-}
-
-/**
- * closes ido socket
- */
-void CompatIdoComponent::CloseSink(void)
-{
-       m_IdoSocket->Close();
-}
-
-/**
- * tell ido2db that we are starting up (must be called before config dump)
- */
-void CompatIdoComponent::SendStartProcess(void)
-{
-/* TODO */
-#define PROGRAM_MODIFICATION_DATE "10-17-2012"
-#define PROGRAM_RELEASE_VERSION "2.0"
-
-        stringstream message;
-        message << "\n"
-                << 200 << "\n"                                                 /* processdata */
-               << 1 << "=" << 104 << "\n"                              /* type = pprocess prelaunch */
-               << 2 << "=" << "" << "\n"                               /* flags */
-               << 3 << "=" << "" << "\n"                               /* attributes */
-               << 4 << "=" << std::setprecision(17) << Utility::GetTime() << "\n"              /* timestamp */
-               << 105 << "=" << "Icinga2" << "\n"                      /* progranname */
-               << 107 << "=" << PROGRAM_RELEASE_VERSION << "\n"                        /*  programversion */
-               << 104 << "=" << PROGRAM_MODIFICATION_DATE << "\n"                      /* programdata */
-               << 102 << "=" << Utility::GetPid() << "\n"                      /* process id */
-               << 999 << "\n\n";                                       /* enddata */
-
-        m_IdoSocket->SendMessage(message.str());
-
-}
-
-/**
- * sends config dump start signal to ido
- */
-void CompatIdoComponent::StartConfigDump(void)
-{
-       /* configtype =1 (original), =2 (retained == default) */
-       stringstream message;
-       message << "\n\n"
-               << 900 << ":" << "\n"                                   /* startconfigdump */
-               << 245 << "=" << "RETAINED" << "\n"                     /* configdumptype */
-               << 4 << "=" << std::setprecision(17) << Utility::GetTime() << "\n"              /* timestamp */
-               << 999                                                  /* enddata */
-               << "\n\n";
-
-       m_IdoSocket->SendMessage(message.str());
-}
-
-/**
- * sends config dump end signal to ido
- */
-void CompatIdoComponent::EndConfigDump(void)
-{
-        stringstream message;
-        message << "\n\n"
-                << 901 << ":" << "\n"                                  /* endconfigdump */
-                << 4 << "=" << std::setprecision(17) << Utility::GetTime() << "\n"             /* timestamp */
-                << 999                                                 /* enddata */
-                << "\n\n";
-
-        m_IdoSocket->SendMessage(message.str());
+               OpenIdoSocket();
 }
 
 /**
@@ -419,9 +292,9 @@ void CompatIdoComponent::EnableHostObject(const Host::Ptr& host)
                << 500 << ":" << "\n"                                   /* enableobject */
                << 4 << "=" << std::setprecision(17) << Utility::GetTime() << "\n"              /* timestamp */
                << 53 << "=" << host->GetName() << "\n"                 /* host */
-                << 999 << "\n\n";                                      /* enddata */
+               << 999 << "\n\n";                                       /* enddata */
 
-        m_IdoSocket->SendMessage(message.str());
+       m_IdoConnection->SendMessage(message.str());
 }
 
 /**
@@ -429,15 +302,15 @@ void CompatIdoComponent::EnableHostObject(const Host::Ptr& host)
  */
 void CompatIdoComponent::EnableServiceObject(const Service::Ptr& service)
 {
-        stringstream message;
-        message << "\n"
-                << 500 << ":" << "\n"                                   /* enableobject */
-                << 4 << "=" << std::setprecision(17) << Utility::GetTime() << "\n"              /* timestamp */
-                << 53 << "=" << service->GetHost()->GetName() << "\n"   /* host */
-                << 114 << "=" << service->GetAlias() << "\n"            /* service */
-                << 999 << "\n\n";                                       /* enddata */
-
-        m_IdoSocket->SendMessage(message.str());
+       stringstream message;
+       message << "\n"
+               << 500 << ":" << "\n"                                   /* enableobject */
+               << 4 << "=" << std::setprecision(17) << Utility::GetTime() << "\n"              /* timestamp */
+               << 53 << "=" << service->GetHost()->GetName() << "\n"   /* host */
+               << 114 << "=" << service->GetAlias() << "\n"            /* service */
+               << 999 << "\n\n";                                       /* enddata */
+
+       m_IdoConnection->SendMessage(message.str());
 }
 
 /**
@@ -445,14 +318,14 @@ void CompatIdoComponent::EnableServiceObject(const Service::Ptr& service)
  */
 void CompatIdoComponent::DisableHostObject(const Host::Ptr& host)
 {
-        stringstream message;
-        message << "\n"
-                << 501 << ":" << "\n"                                   /* disableobject */
-                << 4 << "=" << std::setprecision(17) << Utility::GetTime() << "\n"              /* timestamp */
-                << 53 << "=" << host->GetName() << "\n"                 /* host */
-                << 999 << "\n\n";                                       /* enddata */
-
-        m_IdoSocket->SendMessage(message.str());
+       stringstream message;
+       message << "\n"
+               << 501 << ":" << "\n"                                   /* disableobject */
+               << 4 << "=" << std::setprecision(17) << Utility::GetTime() << "\n"              /* timestamp */
+               << 53 << "=" << host->GetName() << "\n"                 /* host */
+               << 999 << "\n\n";                                       /* enddata */
+
+       m_IdoConnection->SendMessage(message.str());
 }
 
 /**
@@ -460,20 +333,17 @@ void CompatIdoComponent::DisableHostObject(const Host::Ptr& host)
  */
 void CompatIdoComponent::DisableServiceObject(const Service::Ptr& service)
 {
-        stringstream message;
-        message << "\n"
-                << 501 << ":" << "\n"                                   /* disableobject */
-                << 4 << "=" << std::setprecision(17) << Utility::GetTime() << "\n"              /* timestamp */
-                << 53 << "=" << service->GetHost()->GetName() << "\n"   /* host */
-                << 114 << "=" << service->GetAlias() << "\n"            /* service */
-                << 999 << "\n\n";                                       /* enddata */
+       stringstream message;
+       message << "\n"
+               << 501 << ":" << "\n"                                   /* disableobject */
+               << 4 << "=" << std::setprecision(17) << Utility::GetTime() << "\n"              /* timestamp */
+               << 53 << "=" << service->GetHost()->GetName() << "\n"   /* host */
+               << 114 << "=" << service->GetAlias() << "\n"            /* service */
+               << 999 << "\n\n";                                       /* enddata */
  
-        m_IdoSocket->SendMessage(message.str());
+       m_IdoConnection->SendMessage(message.str());
 }
 
-
-
-
 /**
  * dump host config to ido
  *
@@ -483,7 +353,7 @@ void CompatIdoComponent::DumpHostObject(const Host::Ptr& host)
 {
        stringstream log;
        log << "Dumping Host Config: " << host->GetName();
-        Logger::Write(LogDebug, "compatido", log.str());
+       Logger::Write(LogDebug, "compatido", log.str());
 
        stringstream message;
        message << "\n"
@@ -550,10 +420,10 @@ void CompatIdoComponent::DumpHostObject(const Host::Ptr& host)
                << 200 << "=" << "i2_parent" << "\n"                    /* parenthost */
                << 130 << "=" << "i2_contactgroup" << "\n"              /* contactgroup */
                << 264 << "=" << "i2_contact" << "\n"                   /* contact */
-                << 262 << "=" << "i2_customvar" << ":" << 1 << ":" << "i2_custom_var_mod" << "\n"      /* customvariable */
-                << 999 << "\n\n";                                      /* enddata */
+               << 262 << "=" << "i2_customvar" << ":" << 1 << ":" << "i2_custom_var_mod" << "\n"       /* customvariable */
+               << 999 << "\n\n";                                       /* enddata */
 
-        m_IdoSocket->SendMessage(message.str());
+       m_IdoConnection->SendMessage(message.str());
 }
 
 /**
@@ -563,21 +433,20 @@ void CompatIdoComponent::DumpHostObject(const Host::Ptr& host)
  */
 void CompatIdoComponent::DumpHostStatus(const Host::Ptr& host)
 {
-
        stringstream log;
        log << "Dumping Host Status: " << host->GetName();
-        Logger::Write(LogDebug, "compatido", log.str());
-
-        int state;
-        if (!host->IsReachable())
-                state = 2; /* unreachable */
-        else if (!host->IsUp())
-                state = 1; /* down */
-        else   
-                state = 0; /* up */
-
-        stringstream message;
-        message << "\n"
+       Logger::Write(LogDebug, "compatido", log.str());
+
+    int state;
+    if (!host->IsReachable())
+               state = 2; /* unreachable */
+    else if (!host->IsUp())
+               state = 1; /* down */
+    else   
+               state = 0; /* up */
+
+    stringstream message;
+    message << "\n"
                << 212 << ":" << "\n"                                   /* hoststatusdata */
                << 1 << "=" << "" << "\n"                               /* type */
                << 2 << "=" << "" << "\n"                               /* flags */
@@ -631,7 +500,7 @@ void CompatIdoComponent::DumpHostStatus(const Host::Ptr& host)
                << 262 << "=" << "i2_customvar" << ":" << "1" << ":" << "i2_customvarmod" << "\n"       /* customvariable */
                << 999 << "\n\n";                                       /* enddata */
 
-        m_IdoSocket->SendMessage(message.str());
+    m_IdoConnection->SendMessage(message.str());
 }
 
 /**
@@ -643,7 +512,7 @@ void CompatIdoComponent::DumpServiceObject(const Service::Ptr& service)
 {
        stringstream log;
        log << "Dumping Service Config: " << service->GetHost()->GetName() << "->" << service->GetAlias();
-        Logger::Write(LogDebug, "compatido", log.str());
+       Logger::Write(LogDebug, "compatido", log.str());
 
        stringstream message;
        message << "\n"
@@ -705,7 +574,7 @@ void CompatIdoComponent::DumpServiceObject(const Service::Ptr& service)
                << 262 << "=" << "i2_customvar" << ":" << 1 << ":" << "i2_custom_var_mod" << "\n"       /* customvariable */
                << 999 << "\n\n";                                       /* enddata */
 
-        m_IdoSocket->SendMessage(message.str());
+       m_IdoConnection->SendMessage(message.str());
 }
 
 /**
@@ -717,35 +586,35 @@ void CompatIdoComponent::DumpServiceStatus(const Service::Ptr& service)
 {
        stringstream log;
        log << "Dumping Service Status: " << service->GetHost()->GetName() << "->" << service->GetAlias();
-        Logger::Write(LogDebug, "compatido", log.str());
-
-        String output;
-        String perfdata;
-        double schedule_start = -1, schedule_end = -1;
-        double execution_start = -1, execution_end = -1;
-
-        Dictionary::Ptr cr = service->GetLastCheckResult();
-
-        if (cr) {
-                output = cr->Get("output");
-                schedule_start = cr->Get("schedule_start");
-                schedule_end = cr->Get("schedule_end");
-                execution_start = cr->Get("execution_start");
-                execution_end = cr->Get("execution_end");
-                perfdata = cr->Get("performance_data_raw");
-        }
+       Logger::Write(LogDebug, "compatido", log.str());
+
+       String output;
+       String perfdata;
+       double schedule_start = -1, schedule_end = -1;
+       double execution_start = -1, execution_end = -1;
+
+       Dictionary::Ptr cr = service->GetLastCheckResult();
+
+       if (cr) {
+                       output = cr->Get("output");
+                       schedule_start = cr->Get("schedule_start");
+                       schedule_end = cr->Get("schedule_end");
+                       execution_start = cr->Get("execution_start");
+                       execution_end = cr->Get("execution_end");
+                       perfdata = cr->Get("performance_data_raw");
+       }
 
-        double execution_time = (execution_end - execution_start);
-        double latency = (schedule_end - schedule_start) - execution_time;
+       double execution_time = (execution_end - execution_start);
+       double latency = (schedule_end - schedule_start) - execution_time;
 
-        int state = service->GetState();
+       int state = service->GetState();
 
-        if (state > StateUnknown)
-                state = StateUnknown;
+       if (state > StateUnknown)
+                       state = StateUnknown;
 
-        stringstream message;
-        message << "\n"
-                << 213 << ":" << "\n"                                  /* servicestatusdata */
+       stringstream message;
+       message << "\n"
+               << 213 << ":" << "\n"                                   /* servicestatusdata */
                << 1 << "=" << "" << "\n"                               /* type */
                << 2 << "=" << "" << "\n"                               /* flags */
                << 3 << "=" << "" << "\n"                               /* attributes */
@@ -800,7 +669,7 @@ void CompatIdoComponent::DumpServiceStatus(const Service::Ptr& service)
                << 262 << "=" << "i2_customvar" << ":" << "1" << ":" << "i2_customvarmod" << "\n"       /* customvariable */
                << 999 << "\n\n";                                       /* enddata */
 
-        m_IdoSocket->SendMessage(message.str());
+       m_IdoConnection->SendMessage(message.str());
 }
 
 
@@ -811,9 +680,9 @@ void CompatIdoComponent::DumpProgramStatusData(void)
 {
        double start_time = IcingaApplication::GetInstance()->GetStartTime();
 
-        stringstream message;
-        message << "\n"
-                << 211 << ":" << "\n"                                  /* programstatusdata */
+       stringstream message;
+       message << "\n"
+               << 211 << ":" << "\n"                                   /* programstatusdata */
                << 1 << "=" << "" << "\n"                               /* type */
                << 2 << "=" << "" << "\n"                               /* flags */
                << 3 << "=" << "" << "\n"                               /* attributes */
@@ -839,9 +708,9 @@ void CompatIdoComponent::DumpProgramStatusData(void)
                << 49 << "=" << "" << "\n"                              /* globalhosteventhandler */
                << 50 << "=" << "" << "\n"                              /* globalserviceeventhandler */
                << 270 << "=" << static_cast<int>(Utility::GetTime()) << "\n"                           /* disablednotificationsexpiretime - supported in 1.8 XXX */
-                << 999 << "\n\n";                                      /* enddata */
+               << 999 << "\n\n";                                       /* enddata */
 
-        m_IdoSocket->SendMessage(message.str());
+       m_IdoConnection->SendMessage(message.str());
 }
 
 /**
@@ -856,7 +725,16 @@ void CompatIdoComponent::DumpConfigObjects(void)
         */
 
        /* tell ido2db that we start now */
-       StartConfigDump();
+       /* configtype =1 (original), =2 (retained == default) */
+       stringstream msgStartConfigDump;
+       msgStartConfigDump << "\n\n"
+               << 900 << ":" << "\n"                                   /* startconfigdump */
+               << 245 << "=" << "RETAINED" << "\n"                     /* configdumptype */
+               << 4 << "=" << std::setprecision(17) << Utility::GetTime() << "\n"              /* timestamp */
+               << 999                                                  /* enddata */
+               << "\n\n";
+
+       m_IdoConnection->SendMessage(msgStartConfigDump.str());
 
        /* hosts and hostgroups */
        map<String, vector<String> > hostgroups;
@@ -885,7 +763,7 @@ void CompatIdoComponent::DumpConfigObjects(void)
                const String& name = hgt.first;
                const vector<String>& hosts = hgt.second;
 
-               if(HostGroup::Exists(name)) {
+               if (HostGroup::Exists(name)) {
                        HostGroup::Ptr hg = HostGroup::GetByName(name);
 
                        /* dump the hostgroup and its attributes/members to ido */
@@ -896,11 +774,11 @@ void CompatIdoComponent::DumpConfigObjects(void)
                                << 172 << "=" << name << "\n"                   /* hostgroupname */
                                << 170 << "=" << hg->GetAlias() << "\n";        /* hostgroupalias */
 
-                       CreateMessageList(message, hosts, 171);                 /* hostgroupmember */
+                       SendMessageList(message, hosts, 171);                   /* hostgroupmember */
                                
                        message << 999 << "\n\n";                               /* enddata */
 
-                       m_IdoSocket->SendMessage(message.str());
+                       m_IdoConnection->SendMessage(message.str());
                }
        }
 
@@ -935,12 +813,12 @@ void CompatIdoComponent::DumpConfigObjects(void)
                        ServiceGroup::Ptr sg = ServiceGroup::GetByName(name);
 
                        /* dump the servicegroup and its attributes/members to ido */
-                        stringstream message;
-                        message << "\n"
-                                << 403 << ":" << "\n"                          /* servicegroupdefinition */
-                                << 4 << "=" << std::setprecision(17) << Utility::GetTime() << "\n"     /* timestamp */
-                                << 220 << "=" << name << "\n"                  /* servicegroupname */
-                                << 218 << "=" << sg->GetAlias() << "\n";       /* servicegroupalias */
+                       stringstream message;
+                       message << "\n"
+                               << 403 << ":" << "\n"                           /* servicegroupdefinition */
+                               << 4 << "=" << std::setprecision(17) << Utility::GetTime() << "\n"      /* timestamp */
+                               << 220 << "=" << name << "\n"                   /* servicegroupname */
+                               << 218 << "=" << sg->GetAlias() << "\n";        /* servicegroupalias */
 
                        vector<String> sglist;
                        vector<Service::Ptr>::iterator vt;
@@ -950,16 +828,23 @@ void CompatIdoComponent::DumpConfigObjects(void)
                                sglist.push_back(service->GetAlias());
                        }
        
-                       CreateMessageList(message, services, 219);              /* servicegroupmember */
+                       SendMessageList(message, services, 219);                /* servicegroupmember */
 
-                        message << 999 << "\n\n";                              /* enddata */
+                       message << 999 << "\n\n";                               /* enddata */
 
-                       m_IdoSocket->SendMessage(message.str());
+                       m_IdoConnection->SendMessage(message.str());
                }
        }
 
        /* tell ido2db that we ended dumping the config */
-       EndConfigDump();
+       stringstream msgEndConfigDump;
+       msgEndConfigDump << "\n\n"
+               << 901 << ":" << "\n"                                   /* endconfigdump */
+               << 4 << "=" << std::setprecision(17) << Utility::GetTime() << "\n"              /* timestamp */
+               << 999                                                  /* enddata */
+               << "\n\n";
+
+       m_IdoConnection->SendMessage(msgEndConfigDump.str());
 }
 
 /**
@@ -967,21 +852,21 @@ void CompatIdoComponent::DumpConfigObjects(void)
  */
 void CompatIdoComponent::DumpStatusData(void)
 {
-        /* hosts */
-        DynamicObject::Ptr object;
-        BOOST_FOREACH(tie(tuples::ignore, object), DynamicObject::GetObjects("Host")) {
-                const Host::Ptr& host = static_pointer_cast<Host>(object);
+       /* hosts */
+       DynamicObject::Ptr object;
+       BOOST_FOREACH(tie(tuples::ignore, object), DynamicObject::GetObjects("Host")) {
+               const Host::Ptr& host = static_pointer_cast<Host>(object);
 
-                DumpHostStatus(host);
-        }
+               DumpHostStatus(host);
+       }
 
 
-        /* services */
-        BOOST_FOREACH(tie(tuples::ignore, object), DynamicObject::GetObjects("Service")) {
-                Service::Ptr service = static_pointer_cast<Service>(object);
+       /* services */
+       BOOST_FOREACH(tie(tuples::ignore, object), DynamicObject::GetObjects("Service")) {
+               Service::Ptr service = static_pointer_cast<Service>(object);
 
-                DumpServiceStatus(service);
-        }
+               DumpServiceStatus(service);
+       }
 }
 
 
index 682c6f2c767a99c2a4eec725a82fe822f4a01421..4422b178b847d8a7e6d7ec37b52bc27ada679165 100644 (file)
@@ -38,33 +38,24 @@ private:
        Timer::Ptr m_ProgramStatusTimer;
        Timer::Ptr m_ReconnectTimer;
 
-       IdoSocket::Ptr m_IdoSocket;
-
-       bool m_ConfigDumpInProgress;
+       IdoConnection::Ptr m_IdoConnection;
 
        String GetSocketAddress(void) const;
        String GetSocketPort(void) const;
        String GetInstanceName(void) const;
-       int GetReconnectInterval(void) const;
+       double GetReconnectInterval(void) const;
 
-       void SetConfigDumpInProgress(bool state);
-       bool GetConfigDumpInProgress(void);
+       void SocketDisconnectHandler(void);
 
        void ConfigTimerHandler(void);
        void StatusTimerHandler(void);
        void ProgramStatusTimerHandler(void);
        void ReconnectTimerHandler(void);
 
-       void OpenIdoSocket(bool sockettype);
+       void OpenIdoSocket(void);
        void CloseIdoSocket(void);
 
-       void OpenSink(String node, String service);
-       void SendHello(String instancename, bool sockettype);
-       void GoodByeSink(void);
-       void CloseSink(void);
        void SendStartProcess(void);
-       void StartConfigDump(void);
-       void EndConfigDump(void);
 
        void EnableHostObject(const Host::Ptr& host);
        void EnableServiceObject(const Service::Ptr& service);
@@ -80,7 +71,7 @@ private:
        void DumpProgramStatusData(void);
 
        template<typename T>
-       void CreateMessageList(stringstream& msg, const T& list, int type)
+       void SendMessageList(stringstream& msg, const T& list, int type)
        {
                typename T::const_iterator it;
                for (it = list.begin(); it != list.end(); it++) {
index 0b9219ad49333d330ce8600c9d47094764389b07..400712b321bcdd2224558fcbc8d35fd136262d64 100644 (file)
@@ -33,7 +33,7 @@
 
 using std::stringstream;
 
-#include "idosocket.h"
+#include "idoconnection.h"
 #include "compatidocomponent.h"
 
 
diff --git a/components/compatido/idoconnection.cpp b/components/compatido/idoconnection.cpp
new file mode 100644 (file)
index 0000000..af4670c
--- /dev/null
@@ -0,0 +1,59 @@
+/******************************************************************************
+ * Icinga 2                                                                   *
+ * Copyright (C) 2012 Icinga Development Team (http://www.icinga.org/)        *
+ *                                                                            *
+ * This program is free software; you can redistribute it and/or              *
+ * modify it under the terms of the GNU General Public License                *
+ * as published by the Free Software Foundation; either version 2             *
+ * of the License, or (at your option) any later version.                     *
+ *                                                                            *
+ * This program is distributed in the hope that it will be useful,            *
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of             *
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the              *
+ * GNU General Public License for more details.                               *
+ *                                                                            *
+ * You should have received a copy of the GNU General Public License          *
+ * along with this program; if not, write to the Free Software Foundation     *
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA.             *
+ ******************************************************************************/
+
+#include "i2-compatido.h"
+
+using namespace icinga;
+
+/**
+ * Constructor for the IdoSocket class.
+ *
+ * @param role The role of the underlying TCP client.
+ */
+IdoConnection::IdoConnection(const Stream::Ptr& stream)
+       : Connection(stream)
+{ }
+
+/**
+ * Sends a message to the ido socket
+ *
+ * @param message The message.
+ */
+void IdoConnection::SendMessage(const String& message)
+{
+       /* 
+        * write our message to the send queue
+        * as we inherit all the functionality
+        * of the tcpclient class
+        */
+       GetStream()->Write(message.CStr(), message.GetLength());
+}
+
+
+/**
+ * Processes inbound data.
+ * Currently not used, as we do not receive data from ido sockets
+ */
+void IdoConnection::ProcessData(void)
+{
+       // Just ignore whatever data the other side is sending
+       GetStream()->Read(NULL, GetStream()->GetAvailableBytes());
+
+       return;
+}
similarity index 78%
rename from lib/remoting/jsonrpcserver.cpp
rename to components/compatido/idoconnection.h
index c645de63989fb1c8c6647d87d5d04134163ce5c6..f0956ef6c782e5c6935e7d6a20d07157f6e06145 100644 (file)
  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA.             *
  ******************************************************************************/
 
-#include "i2-remoting.h"
+#ifndef IDOCONNECTION_H
+#define IDOCONNECTION_H
 
-using namespace icinga;
+namespace icinga
+{
 
 /**
- * Constructor for the JsonRpcServer class.
+ * An IDO socket client.
  *
- * @param sslContext SSL context that should be used for client connections.
+ * @ingroup compatido
  */
-JsonRpcServer::JsonRpcServer(shared_ptr<SSL_CTX> sslContext)
+class IdoConnection : public Connection
 {
-       SetClientFactory(boost::bind(&JsonRpcClientFactory, _1, RoleInbound, sslContext));
+public:
+       typedef shared_ptr<IdoConnection> Ptr;
+       typedef weak_ptr<IdoConnection> WeakPtr;
+
+       IdoConnection(const Stream::Ptr& stream);
+
+       void SendMessage(const String& message);
+
+protected:
+       virtual void ProcessData(void);
+};
+
 }
+
+#endif /* IDOCONNECTION_H */
diff --git a/components/compatido/idosocket.cpp b/components/compatido/idosocket.cpp
deleted file mode 100644 (file)
index fa07791..0000000
+++ /dev/null
@@ -1,147 +0,0 @@
-/******************************************************************************
- * Icinga 2                                                                   *
- * Copyright (C) 2012 Icinga Development Team (http://www.icinga.org/)        *
- *                                                                            *
- * This program is free software; you can redistribute it and/or              *
- * modify it under the terms of the GNU General Public License                *
- * as published by the Free Software Foundation; either version 2             *
- * of the License, or (at your option) any later version.                     *
- *                                                                            *
- * This program is distributed in the hope that it will be useful,            *
- * but WITHOUT ANY WARRANTY; without even the implied warranty of             *
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the              *
- * GNU General Public License for more details.                               *
- *                                                                            *
- * You should have received a copy of the GNU General Public License          *
- * along with this program; if not, write to the Free Software Foundation     *
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA.             *
- ******************************************************************************/
-
-#include "i2-compatido.h"
-
-using namespace icinga;
-
-/**
- * Constructor for the IdoSocket class.
- *
- * @param role The role of the underlying TCP client.
- */
-IdoSocket::IdoSocket(TcpClientRole role)
-       : TcpClient(role)
-{
-       /* 
-        * we currently do not receive any data from the ido socket,
-        * this is just data output - so we do not need to bind
-        * a local instance of our datahandler in case of a new 
-        * signal telling about new data
-        */
-       OnDataAvailable.connect(boost::bind(&IdoSocket::DataAvailableHandler, this));
-
-       /* 
-        * what to do on disconnect
-        */
-       OnClosed.connect(boost::bind(&IdoSocket::ClientClosedHandler, this));
-
-}
-
-
-/**
- *  * Set the ido socket type
- *   *
- *    * @param type true=tcp, false=unix
- *     */
-void IdoSocket::SetSocketType(bool type)
-{
-        m_SocketType = type;
-}
-
-/*
- *  * Get the ido socket type
- *   *
- *    * @returns type true=tcp, false=unix
- *     */
-bool IdoSocket::GetSocketType(void)
-{
-        return m_SocketType;
-}
-
-/**
- * Sends a message to the ido socket
- *
- * @param message The message.
- */
-void IdoSocket::SendMessage(const String& message)
-{
-       /* 
-        * write our message to the send queue
-        * as we inherit all the functionality
-        * of the tcpclient class
-        */
-       Write(message.CStr(), message.GetLength());
-}
-
-
-/**
- *  Handles closed client connect
- */
-void IdoSocket::ClientClosedHandler(void)
-{
-        try {  
-                CheckException();
-        } catch (const exception& ex) {
-                stringstream message;
-                message << "Error occured for ido socket: " << ex.what();
-
-                Logger::Write(LogWarning, "compatido", message.str());
-        }
-
-        Logger::Write(LogWarning, "compatido", "Lost connection to ido socket");
-
-       SetReconnect(true);
-
-        OnDisconnected(GetSelf());
-}
-
-
-/**
- * Set reconnect vstate
- *
- * @aparam enable Enables the reconnect.
- */
-void IdoSocket::SetReconnect(bool reconnect)
-{
-       m_Reconnect = reconnect;
-}
-
-/**
- * Get reconnect state
- *
- * @returns reconnect  The reconnect variable
- */
-bool IdoSocket::GetReconnect(void)
-{
-       return m_Reconnect;
-}
-
-/**
- * Processes inbound data.
- * Currently not used, as we do not receive data from ido sockets
- */
-void IdoSocket::DataAvailableHandler(void)
-{
-       return;
-}
-
-/**
- * Factory function for ido socket clients.
- *
- * @param fd The file descriptor.
- * @param role The role of the underlying TCP client.
- * @returns A new ido socket client.
- */
-IdoSocket::Ptr icinga::IdoSocketFactory(SOCKET fd, TcpClientRole role)
-{
-       IdoSocket::Ptr client = boost::make_shared<IdoSocket>(role);
-       client->SetFD(fd);
-       return client;
-}
diff --git a/components/compatido/idosocket.h b/components/compatido/idosocket.h
deleted file mode 100644 (file)
index 6ac3561..0000000
+++ /dev/null
@@ -1,68 +0,0 @@
-/******************************************************************************
- * Icinga 2                                                                   *
- * Copyright (C) 2012 Icinga Development Team (http://www.icinga.org/)        *
- *                                                                            *
- * This program is free software; you can redistribute it and/or              *
- * modify it under the terms of the GNU General Public License                *
- * as published by the Free Software Foundation; either version 2             *
- * of the License, or (at your option) any later version.                     *
- *                                                                            *
- * This program is distributed in the hope that it will be useful,            *
- * but WITHOUT ANY WARRANTY; without even the implied warranty of             *
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the              *
- * GNU General Public License for more details.                               *
- *                                                                            *
- * You should have received a copy of the GNU General Public License          *
- * along with this program; if not, write to the Free Software Foundation     *
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA.             *
- ******************************************************************************/
-
-#ifndef IDOSOCKET_H
-#define IDOSOCKET_H
-
-#include "i2-compatido.h"
-
-namespace icinga
-{
-
-/**
- * An IDO socket client.
- *
- * @ingroup compatido
- */
-class IdoSocket : public TcpClient
-{
-public:
-       typedef shared_ptr<IdoSocket> Ptr;
-       typedef weak_ptr<IdoSocket> WeakPtr;
-
-       IdoSocket(TcpClientRole role);
-
-       void SetSocketType(bool);
-       bool GetSocketType(void);
-
-       void SendMessage(const String& message);
-
-       void SetReconnect(bool reconnect);
-       bool GetReconnect(void);
-
-       boost::signal<void (const IdoSocket::Ptr&, const stringstream&)> OnNewMessage;
-
-        boost::signal<void (const IdoSocket::Ptr&)> OnConnected;
-       boost::signal<void (const IdoSocket::Ptr&)> OnDisconnected;
-
-private:
-       void DataAvailableHandler(void);
-       void ClientClosedHandler(void);
-
-       bool m_Reconnect;
-       bool m_SocketType;
-
-       friend IdoSocket::Ptr IdoSocketFactory(SOCKET fd, TcpClientRole role);
-};
-
-IdoSocket::Ptr IdoSocketFactory(SOCKET fd, TcpClientRole role);
-
-}
-
-#endif /* JSONRPCCLIENT_H */
index ebc8dd841f1bb824f56d642ac060260133bd1503..17c3d4dbf88d0375c5345b192edf3b11031cf97a 100644 (file)
@@ -68,8 +68,7 @@ void DemoComponent::DemoTimerHandler(void)
 void DemoComponent::HelloWorldRequestHandler(const Endpoint::Ptr& sender,
     const RequestMessage& request)
 {
-       Logger::Write(LogInformation, "demo", "Got 'hello world' from"
-           " address=" + sender->GetAddress() + ", identity=" +
+       Logger::Write(LogInformation, "demo", "Got 'hello world' from identity=" +
            sender->GetName());
 }
 
index f7fed226c464b5efba61546bb095fe1efb8c13d5..8fc1cc23d731968e3d9ee64d1eaa44374bc0359f 100644 (file)
@@ -10,6 +10,8 @@ libbase_la_SOURCES =  \
        asynctask.h \
        component.cpp \
        component.h \
+       connection.cpp \
+       connection.h \
        dictionary.cpp \
        dictionary.h \
        dynamicobject.cpp \
@@ -21,7 +23,6 @@ libbase_la_SOURCES =  \
        fifo.cpp \
        fifo.h \
        i2-base.h \
-       ioqueue.h \
        logger.cpp \
        logger.h \
        netstring.cpp \
@@ -40,20 +41,20 @@ libbase_la_SOURCES =  \
        scripttask.h \
        socket.cpp \
        socket.h \
+       stream.cpp \
+       stream.h \
+       stream_bio.cpp \
+       stream_bio.h \
        streamlogger.cpp \
        streamlogger.h \
        sysloglogger.cpp \
        sysloglogger.h \
-       tcpclient.cpp \
-       tcpclient.h \
-       tcpserver.cpp \
-       tcpserver.h \
        tcpsocket.cpp \
        tcpsocket.h \
        timer.cpp \
        timer.h \
-       tlsclient.cpp \
-       tlsclient.h \
+       tlsstream.cpp \
+       tlsstream.h \
        unix.h \
        utility.cpp \
        utility.h \
index f3620d62101fb1289050d2eeea8ddcfeea5dceb3..c877fbfa91f49889bdf3cb25d0445fb101b1cc60 100644 (file)
@@ -21,6 +21,7 @@
   <ItemGroup>
     <ClCompile Include="application.cpp" />
     <ClCompile Include="component.cpp" />
+    <ClCompile Include="connection.cpp" />
     <ClCompile Include="dynamicobject.cpp" />
     <ClCompile Include="dictionary.cpp" />
     <ClCompile Include="event.cpp" />
     <ClCompile Include="scriptfunction.cpp" />
     <ClCompile Include="scripttask.cpp" />
     <ClCompile Include="socket.cpp" />
+    <ClCompile Include="stream.cpp" />
     <ClCompile Include="streamlogger.cpp" />
+    <ClCompile Include="stream_bio.cpp" />
     <ClCompile Include="sysloglogger.cpp" />
-    <ClCompile Include="tcpclient.cpp" />
-    <ClCompile Include="tcpserver.cpp" />
     <ClCompile Include="tcpsocket.cpp" />
     <ClCompile Include="timer.cpp" />
-    <ClCompile Include="tlsclient.cpp" />
+    <ClCompile Include="tlsstream.cpp" />
     <ClCompile Include="utility.cpp" />
     <ClCompile Include="value.cpp" />
   </ItemGroup>
     <ClInclude Include="application.h" />
     <ClInclude Include="asynctask.h" />
     <ClInclude Include="component.h" />
+    <ClInclude Include="connection.h" />
     <ClInclude Include="dynamicobject.h" />
     <ClInclude Include="dictionary.h" />
     <ClInclude Include="event.h" />
     <ClInclude Include="fifo.h" />
-    <ClInclude Include="ioqueue.h" />
+    <ClInclude Include="stream.h" />
     <ClInclude Include="netstring.h" />
     <ClInclude Include="qstring.h" />
     <ClInclude Include="scriptfunction.h" />
     <ClInclude Include="ringbuffer.h" />
     <ClInclude Include="socket.h" />
     <ClInclude Include="streamlogger.h" />
+    <ClInclude Include="stream_bio.h" />
     <ClInclude Include="sysloglogger.h" />
-    <ClInclude Include="tcpclient.h" />
-    <ClInclude Include="tcpserver.h" />
     <ClInclude Include="tcpsocket.h" />
     <ClInclude Include="timer.h" />
-    <ClInclude Include="tlsclient.h" />
+    <ClInclude Include="tlsstream.h" />
     <ClInclude Include="unix.h" />
     <ClInclude Include="utility.h" />
     <ClInclude Include="value.h" />
   <Import Project="$(VCTargetsPath)\Microsoft.Cpp.targets" />
   <ImportGroup Label="ExtensionTargets">
   </ImportGroup>
-</Project>
+</Project>
\ No newline at end of file
index a7b9317e4a13989212f0917ab3681361d53794db..0845ca949ddd0676e25fdf75b6fc04f05ff5d7de 100644 (file)
     <ClCompile Include="socket.cpp">
       <Filter>Quelldateien</Filter>
     </ClCompile>
-    <ClCompile Include="tcpclient.cpp">
-      <Filter>Quelldateien</Filter>
-    </ClCompile>
-    <ClCompile Include="tcpserver.cpp">
-      <Filter>Quelldateien</Filter>
-    </ClCompile>
     <ClCompile Include="tcpsocket.cpp">
       <Filter>Quelldateien</Filter>
     </ClCompile>
     <ClCompile Include="timer.cpp">
       <Filter>Quelldateien</Filter>
     </ClCompile>
-    <ClCompile Include="tlsclient.cpp">
-      <Filter>Quelldateien</Filter>
-    </ClCompile>
-    <ClCompile Include="unix.cpp">
-      <Filter>Quelldateien</Filter>
-    </ClCompile>
     <ClCompile Include="utility.cpp">
       <Filter>Quelldateien</Filter>
     </ClCompile>
-    <ClCompile Include="win32.cpp">
-      <Filter>Quelldateien</Filter>
-    </ClCompile>
     <ClCompile Include="logger.cpp">
       <Filter>Quelldateien</Filter>
     </ClCompile>
     <ClCompile Include="qstring.cpp">
       <Filter>Quelldateien</Filter>
     </ClCompile>
+    <ClCompile Include="tlsstream.cpp">
+      <Filter>Quelldateien</Filter>
+    </ClCompile>
+    <ClCompile Include="stream.cpp">
+      <Filter>Quelldateien</Filter>
+    </ClCompile>
+    <ClCompile Include="stream_bio.cpp">
+      <Filter>Quelldateien</Filter>
+    </ClCompile>
+    <ClCompile Include="connection.cpp">
+      <Filter>Quelldateien</Filter>
+    </ClCompile>
   </ItemGroup>
   <ItemGroup>
     <ClInclude Include="application.h">
     <ClInclude Include="socket.h">
       <Filter>Headerdateien</Filter>
     </ClInclude>
-    <ClInclude Include="tcpclient.h">
-      <Filter>Headerdateien</Filter>
-    </ClInclude>
-    <ClInclude Include="tcpserver.h">
-      <Filter>Headerdateien</Filter>
-    </ClInclude>
     <ClInclude Include="tcpsocket.h">
       <Filter>Headerdateien</Filter>
     </ClInclude>
     <ClInclude Include="timer.h">
       <Filter>Headerdateien</Filter>
     </ClInclude>
-    <ClInclude Include="tlsclient.h">
-      <Filter>Headerdateien</Filter>
-    </ClInclude>
     <ClInclude Include="unix.h">
       <Filter>Headerdateien</Filter>
     </ClInclude>
     <ClInclude Include="scriptfunction.h">
       <Filter>Headerdateien</Filter>
     </ClInclude>
-    <ClInclude Include="ioqueue.h">
-      <Filter>Headerdateien</Filter>
-    </ClInclude>
     <ClInclude Include="fifo.h">
       <Filter>Headerdateien</Filter>
     </ClInclude>
     <ClInclude Include="qstring.h">
       <Filter>Headerdateien</Filter>
     </ClInclude>
+    <ClInclude Include="stream.h">
+      <Filter>Headerdateien</Filter>
+    </ClInclude>
+    <ClInclude Include="tlsstream.h">
+      <Filter>Headerdateien</Filter>
+    </ClInclude>
+    <ClInclude Include="stream_bio.h">
+      <Filter>Headerdateien</Filter>
+    </ClInclude>
+    <ClInclude Include="connection.h">
+      <Filter>Headerdateien</Filter>
+    </ClInclude>
   </ItemGroup>
   <ItemGroup>
     <Filter Include="Quelldateien">
diff --git a/lib/base/connection.cpp b/lib/base/connection.cpp
new file mode 100644 (file)
index 0000000..cedd4ef
--- /dev/null
@@ -0,0 +1,44 @@
+/******************************************************************************
+ * Icinga 2                                                                   *
+ * Copyright (C) 2012 Icinga Development Team (http://www.icinga.org/)        *
+ *                                                                            *
+ * This program is free software; you can redistribute it and/or              *
+ * modify it under the terms of the GNU General Public License                *
+ * as published by the Free Software Foundation; either version 2             *
+ * of the License, or (at your option) any later version.                     *
+ *                                                                            *
+ * This program is distributed in the hope that it will be useful,            *
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of             *
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the              *
+ * GNU General Public License for more details.                               *
+ *                                                                            *
+ * You should have received a copy of the GNU General Public License          *
+ * along with this program; if not, write to the Free Software Foundation     *
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA.             *
+ ******************************************************************************/
+
+#include "i2-base.h"
+
+using namespace icinga;
+
+Connection::Connection(const Stream::Ptr& stream)
+       : m_Stream(stream)
+{
+       m_Stream->OnDataAvailable.connect(boost::bind(&Connection::ProcessData, this));
+       m_Stream->OnClosed.connect(boost::bind(&Connection::ClosedHandler, this));
+}
+
+Stream::Ptr Connection::GetStream(void) const
+{
+       return m_Stream;
+}
+
+void Connection::ClosedHandler(void)
+{
+       OnClosed(GetSelf());
+}
+
+void Connection::Close(void)
+{
+       m_Stream->Close();
+}
similarity index 68%
rename from lib/base/tcpserver.h
rename to lib/base/connection.h
index 27aa3e34cdf2f5e569d764cf529def8dc91e891d..4e48d408153ddbc65626c8e2b5b3aa9f23ab772d 100644 (file)
  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA.             *
  ******************************************************************************/
 
-#ifndef TCPSERVER_H
-#define TCPSERVER_H
+#ifndef CONNECTION_H
+#define CONNECTION_H
 
 namespace icinga
 {
 
-/**
- * A TCP server that listens on a TCP port and accepts incoming
- * client connections.
- *
- * @ingroup base
- */
-class I2_BASE_API TcpServer : public TcpSocket
+class I2_BASE_API Connection : public Object
 {
 public:
-       typedef shared_ptr<TcpServer> Ptr;
-       typedef weak_ptr<TcpServer> WeakPtr;
+       typedef shared_ptr<Connection> Ptr;
+       typedef weak_ptr<Connection> WeakPtr;
 
-       typedef function<TcpClient::Ptr(SOCKET)> ClientFactory;
+       Connection(const Stream::Ptr& stream);
 
-       TcpServer(void);
+       Stream::Ptr GetStream(void) const;
 
-       void SetClientFactory(const ClientFactory& clientFactory);
-       ClientFactory GetFactoryFunction(void) const;
+       void Close(void);
 
-       void Listen(void);
-
-       boost::signal<void (const TcpServer::Ptr&, const TcpClient::Ptr&)> OnNewClient;
+       boost::signal<void (const Connection::Ptr&)> OnClosed;
 
 protected:
-       virtual bool WantsToRead(void) const;
-
-       virtual void HandleReadable(void);
+       virtual void ProcessData(void) = 0;
 
 private:
-       ClientFactory m_ClientFactory;
+       Stream::Ptr m_Stream;
+
+       void ClosedHandler(void);
 };
 
 }
 
-#endif /* TCPSERVER_H */
+#endif /* CONNECTION_H */
\ No newline at end of file
index 7e983cdb87353f8c77cd7d1b98a497596d5975ce..734b9e93cd9a3abd71dbdaa37fc601c7574735cc 100644 (file)
@@ -368,6 +368,7 @@ void DynamicObject::DumpObjects(const String& filename)
                throw_exception(runtime_error("Could not open '" + filename + "' file"));
 
        FIFO::Ptr fifo = boost::make_shared<FIFO>();
+       fifo->Start();
 
        DynamicObject::TypeMap::iterator tt;
        for (tt = GetAllObjects().begin(); tt != GetAllObjects().end(); tt++) {
@@ -401,7 +402,7 @@ void DynamicObject::DumpObjects(const String& filename)
                        String json = value.Serialize();
 
                        /* This is quite ugly, unfortunatelly NetString requires an IOQueue object */
-                       NetString::WriteStringToIOQueue(fifo.get(), json);
+                       NetString::WriteStringToStream(fifo, json);
 
                        size_t count;
                        while ((count = fifo->GetAvailableBytes()) > 0) {
@@ -416,6 +417,8 @@ void DynamicObject::DumpObjects(const String& filename)
                }
        }
 
+       fifo->Close();
+
        fp.close();
 
 #ifdef _WIN32
@@ -436,6 +439,8 @@ void DynamicObject::RestoreObjects(const String& filename)
        /* TODO: Fix this horrible mess by implementing a class that provides
         * IOQueue functionality for files. */
        FIFO::Ptr fifo = boost::make_shared<FIFO>();
+       fifo->Start();
+
        while (fp) {
                char buffer[1024];
 
@@ -444,7 +449,7 @@ void DynamicObject::RestoreObjects(const String& filename)
        }
 
        String message;
-       while (NetString::ReadStringFromIOQueue(fifo.get(), &message)) {
+       while (NetString::ReadStringFromStream(fifo, &message)) {
                Dictionary::Ptr persistentObject = Value::Deserialize(message);
 
                String type = persistentObject->Get("type");
@@ -462,6 +467,8 @@ void DynamicObject::RestoreObjects(const String& filename)
                        object->ApplyUpdate(update, Attribute_All);
                }
        }
+
+       fifo->Close();
 }
 
 void DynamicObject::DeactivateObjects(void)
index 5f9e59bfde233d13aec3fd19d53d6705177e558c..6af65b8f25fc87830dc529c60037e321fa13d165 100644 (file)
@@ -38,6 +38,13 @@ FIFO::~FIFO(void)
        free(m_Buffer);
 }
 
+void FIFO::Start(void)
+{
+       SetConnected(true);
+
+       Stream::Start();
+}
+
 /**
  * Resizes the FIFO's buffer so that it is at least newSize bytes long.
  *
@@ -109,25 +116,32 @@ size_t FIFO::GetAvailableBytes(void) const
 /**
  * Implements IOQueue::Peek.
  */
-void FIFO::Peek(void *buffer, size_t count)
+size_t FIFO::Peek(void *buffer, size_t count)
 {
-       assert(m_DataSize >= count);
+       assert(IsConnected());
+
+       if (count > m_DataSize)
+               count = m_DataSize;
 
        if (buffer != NULL)
                memcpy(buffer, m_Buffer + m_Offset, count);
+
+       return count;
 }
 
 /**
  * Implements IOQueue::Read.
  */
-void FIFO::Read(void *buffer, size_t count)
+size_t FIFO::Read(void *buffer, size_t count)
 {
-       Peek(buffer, count);
+       count = Peek(buffer, count);
 
        m_DataSize -= count;
        m_Offset += count;
 
        Optimize();
+
+       return count;
 }
 
 /**
@@ -135,6 +149,8 @@ void FIFO::Read(void *buffer, size_t count)
  */
 void FIFO::Write(const void *buffer, size_t count)
 {
+       assert(IsConnected());
+
        ResizeBuffer(m_Offset + m_DataSize + count);
        memcpy(m_Buffer + m_Offset + m_DataSize, buffer, count);
        m_DataSize += count;
index f472833e962d2cc279accf518867db037bed7355..bc31578c5657d8c7568124ca3cfa9b37dce874e2 100644 (file)
@@ -28,7 +28,7 @@ namespace icinga
  *
  * @ingroup base
  */
-class I2_BASE_API FIFO : public Object, public IOQueue
+class I2_BASE_API FIFO : public Stream
 {
 public:
        static const size_t BlockSize = 16 * 1024;
@@ -39,13 +39,15 @@ public:
        FIFO(void);
        ~FIFO(void);
 
+       void Start(void);
+
        /*const void *GetReadBuffer(void) const;
        void *GetWriteBuffer(size_t *count);*/
 
-       virtual size_t GetAvailableBytes(void) const;
-       virtual void Peek(void *buffer, size_t count);
-       virtual void Read(void *buffer, size_t count);
-       virtual void Write(const void *buffer, size_t count);
+       size_t GetAvailableBytes(void) const;
+       size_t Peek(void *buffer, size_t count);
+       size_t Read(void *buffer, size_t count);
+       void Write(const void *buffer, size_t count);
 
 private:
        char *m_Buffer;
index f4668b6017903e4c683ac5c1af8ea7ec72bfa7db..25940b0b71ad514332ef00e343fca6a91ad4144a 100644 (file)
@@ -175,14 +175,14 @@ namespace tuples = boost::tuples;
 #include "dictionary.h"
 #include "ringbuffer.h"
 #include "timer.h"
-#include "ioqueue.h"
+#include "stream.h"
+#include "stream_bio.h"
+#include "connection.h"
 #include "netstring.h"
 #include "fifo.h"
 #include "socket.h"
 #include "tcpsocket.h"
-#include "tcpclient.h"
-#include "tcpserver.h"
-#include "tlsclient.h"
+#include "tlsstream.h"
 #include "asynctask.h"
 #include "process.h"
 #include "scriptfunction.h"
index 3519580cae19954779b5ab5ed55411c55cb1cc3d..546aa79c3f1abd1b69dd0adeaba10d269aa9d5bc 100644 (file)
 using namespace icinga;
 
 /**
- * Reads data from an IOQueue in netString format.
+ * Reads data from a stream in netString format.
  *
- * @param queue The IOQueue to read from.
+ * @param stream The stream to read from.
  * @param[out] str The String that has been read from the IOQueue.
  * @returns true if a complete String was read from the IOQueue, false otherwise.
  * @exception invalid_argument The input stream is invalid.
  * @see https://github.com/PeterScott/netString-c/blob/master/netString.c
  */
-bool NetString::ReadStringFromIOQueue(IOQueue *queue, String *str)
+bool NetString::ReadStringFromStream(const Stream::Ptr& stream, String *str)
 {
-       size_t buffer_length = queue->GetAvailableBytes();
+       size_t buffer_length = stream->GetAvailableBytes();
 
        /* minimum netString length is 3 */
        if (buffer_length < 3)
@@ -47,7 +47,7 @@ bool NetString::ReadStringFromIOQueue(IOQueue *queue, String *str)
        if (buffer == NULL && buffer_length > 0)
                throw_exception(bad_alloc());
 
-       queue->Peek(buffer, buffer_length);
+       stream->Peek(buffer, buffer_length);
 
        /* no leading zeros allowed */
        if (buffer[0] == '0' && isdigit(buffer[1])) {
@@ -68,7 +68,7 @@ bool NetString::ReadStringFromIOQueue(IOQueue *queue, String *str)
                len = len * 10 + (buffer[i] - '0');
        }
 
-       buffer_length = queue->GetAvailableBytes();
+       buffer_length = stream->GetAvailableBytes();
 
        /* make sure the buffer is large enough */
        if (i + len + 1 >= buffer_length)
@@ -86,7 +86,7 @@ bool NetString::ReadStringFromIOQueue(IOQueue *queue, String *str)
 
        buffer = new_buffer;
 
-       queue->Peek(buffer, buffer_length);
+       stream->Peek(buffer, buffer_length);
 
        /* check for the colon delimiter */
        if (buffer[i] != ':') {
@@ -104,25 +104,25 @@ bool NetString::ReadStringFromIOQueue(IOQueue *queue, String *str)
 
        free(buffer);
 
-       /* remove the data from the IOQueue */
-       queue->Read(NULL, buffer_length);
+       /* remove the data from the stream */
+       stream->Read(NULL, buffer_length);
 
        return true;
 }
 
 /**
- * Writes data into an IOQueue using the netString format.
+ * Writes data into a stream using the netString format.
  *
- * @param queue The IOQueue.
+ * @param stream The stream.
  * @param str The String that is to be written.
  */
-void NetString::WriteStringToIOQueue(IOQueue *queue, const String& str)
+void NetString::WriteStringToStream(const Stream::Ptr& stream, const String& str)
 {
        stringstream prefixbuf;
        prefixbuf << str.GetLength() << ":";
 
        String prefix = prefixbuf.str();
-       queue->Write(prefix.CStr(), prefix.GetLength());
-       queue->Write(str.CStr(), str.GetLength());
-       queue->Write(",", 1);
+       stream->Write(prefix.CStr(), prefix.GetLength());
+       stream->Write(str.CStr(), str.GetLength());
+       stream->Write(",", 1);
 }
index d32616e4c9cffc1663bc958cb899bfff4651c791..9b4efc872143f5f38c73401180f754fd1391efef 100644 (file)
@@ -33,8 +33,8 @@ namespace icinga
 class I2_BASE_API NetString
 {
 public:
-       static bool ReadStringFromIOQueue(IOQueue *queue, String *message);
-       static void WriteStringToIOQueue(IOQueue *queue, const String& message);
+       static bool ReadStringFromStream(const Stream::Ptr& stream, String *message);
+       static void WriteStringToStream(const Stream::Ptr& stream, const String& message);
 
 private:
        NetString(void);
index dd6240e39ba1b798726f63737b1c8bd6298554ea..3f5973eda01196aee4079000fb6109adb1222c00 100644 (file)
@@ -25,16 +25,19 @@ using namespace icinga;
  * Constructor for the Socket class.
  */
 Socket::Socket(void)
-       : m_FD(INVALID_SOCKET), m_Connected(false)
-{ }
+       : m_FD(INVALID_SOCKET), m_Connected(false), m_Listening(false),
+        m_SendQueue(boost::make_shared<FIFO>()), m_RecvQueue(boost::make_shared<FIFO>())
+{
+       m_SendQueue->Start();
+       m_RecvQueue->Start();
+}
 
 /**
  * Destructor for the Socket class.
  */
 Socket::~Socket(void)
 {
-       boost::mutex::scoped_lock lock(m_SocketMutex);
-       CloseInternal(true);
+       Close();
 }
 
 /**
@@ -50,6 +53,8 @@ void Socket::Start(void)
 
        m_WriteThread = thread(boost::bind(&Socket::WriteThreadProc, static_cast<Socket::Ptr>(GetSelf())));
        m_WriteThread.detach();
+
+       Stream::Start();
 }
 
 /**
@@ -88,35 +93,24 @@ SOCKET Socket::GetFD(void) const
        return m_FD;
 }
 
-/**
- * Closes the socket.
- */
-void Socket::Close(void)
+void Socket::CloseUnlocked(void)
 {
-       boost::mutex::scoped_lock lock(m_SocketMutex);
+       if (m_FD == INVALID_SOCKET)
+               return;
+
+       closesocket(m_FD);
+       m_FD = INVALID_SOCKET;
 
-       CloseInternal(false);
+       Stream::Close();
 }
 
 /**
  * Closes the socket.
- *
- * @param from_dtor Whether this method was called from the destructor.
  */
-void Socket::CloseInternal(bool from_dtor)
+void Socket::Close(void)
 {
-       if (m_FD == INVALID_SOCKET)
-               return;
-
-       SetConnected(false);
-
-       closesocket(m_FD);
-       m_FD = INVALID_SOCKET;
-
-       /* nobody can possibly have a valid event subscription when the
-               destructor has been called */
-       if (!from_dtor)
-               Event::Post(boost::bind(boost::ref(OnClosed), GetSelf()));
+       boost::mutex::scoped_lock lock(m_SocketMutex);
+       CloseUnlocked();
 }
 
 /**
@@ -159,32 +153,6 @@ void Socket::HandleException(void)
        throw_exception(SocketException("select() returned fd in except fdset", GetError()));
 }
 
-/**
- * Checks whether data should be read for this socket object.
- *
- * @returns true if the socket should be registered for reading, false otherwise.
- */
-bool Socket::WantsToRead(void) const
-{
-       return false;
-}
-
-void Socket::HandleReadable(void)
-{ }
-
-/**
- * Checks whether data should be written for this socket object.
- *
- * @returns true if the socket should be registered for writing, false otherwise.
- */
-bool Socket::WantsToWrite(void) const
-{
-       return false;
-}
-
-void Socket::HandleWritable(void)
-{ }
-
 /**
  * Formats a sockaddr in a human-readable way.
  *
@@ -305,9 +273,9 @@ void Socket::ReadThreadProc(void)
                        if (FD_ISSET(fd, &exceptfds))
                                HandleException();
                } catch (...) {
-                       m_Exception = boost::current_exception();
+                       SetException(boost::current_exception());
 
-                       CloseInternal(false);
+                       CloseUnlocked();
 
                        break;
                }
@@ -360,9 +328,9 @@ void Socket::WriteThreadProc(void)
                        if (FD_ISSET(fd, &writefds))
                                HandleWritable();
                } catch (...) {
-                       m_Exception = boost::current_exception();
+                       SetException(boost::current_exception());
 
-                       CloseInternal(false);
+                       CloseUnlocked();
 
                        break;
                }
@@ -390,11 +358,281 @@ bool Socket::IsConnected(void) const
 }
 
 /**
- * Checks whether an exception is available for this socket. Should be called
- * by user-supplied handlers for the OnClosed signal.
+ * Returns how much data is available for reading.
+ *
+ * @returns The number of bytes available.
+ */
+size_t Socket::GetAvailableBytes(void) const
+{
+       if (m_Listening)
+               throw new logic_error("Socket does not support GetAvailableBytes().");
+
+       {
+               boost::mutex::scoped_lock lock(m_QueueMutex);
+
+               return m_RecvQueue->GetAvailableBytes();
+       }
+}
+
+/**
+ * Reads data from the socket.
+ *
+ * @param buffer The buffer where the data should be stored.
+ * @param size The size of the buffer.
+ * @returns The number of bytes read.
+ */
+size_t Socket::Read(void *buffer, size_t size)
+{
+       if (m_Listening)
+               throw new logic_error("Socket does not support Read().");
+
+       {
+               boost::mutex::scoped_lock lock(m_QueueMutex);
+
+               if (m_RecvQueue->GetAvailableBytes() == 0)
+                       CheckException();
+
+               return m_RecvQueue->Read(buffer, size);
+       }
+}
+
+/**
+ * Peeks at data for the socket.
+ *
+ * @param buffer The buffer where the data should be stored.
+ * @param size The size of the buffer.
+ * @returns The number of bytes read.
+ */
+size_t Socket::Peek(void *buffer, size_t size)
+{
+       if (m_Listening)
+               throw new logic_error("Socket does not support Peek().");
+
+       {
+               boost::mutex::scoped_lock lock(m_QueueMutex);
+
+               if (m_RecvQueue->GetAvailableBytes() == 0)
+                       CheckException();
+
+               return m_RecvQueue->Peek(buffer, size);
+       }
+}
+
+/**
+ * Writes data to the socket.
+ *
+ * @param buffer The buffer that should be sent.
+ * @param size The size of the buffer.
+ */
+void Socket::Write(const void *buffer, size_t size)
+{
+       if (m_Listening)
+               throw new logic_error("Socket does not support Write().");
+
+       {
+               boost::mutex::scoped_lock lock(m_QueueMutex);
+
+               m_SendQueue->Write(buffer, size);
+       }
+}
+
+/**
+ * Starts listening for incoming client connections.
+ */
+void Socket::Listen(void)
+{
+       if (listen(GetFD(), SOMAXCONN) < 0)
+               throw_exception(SocketException("listen() failed", GetError()));
+
+       m_Listening = true;
+}
+
+void Socket::HandleWritable(void)
+{
+       if (m_Listening)
+               HandleWritableServer();
+       else
+               HandleWritableClient();
+}
+
+void Socket::HandleReadable(void)
+{
+       if (m_Listening)
+               HandleReadableServer();
+       else
+               HandleReadableClient();
+}
+
+/**
+ * Processes data that is available for this socket.
+ */
+void Socket::HandleWritableClient(void)
+{
+       int rc;
+       char data[1024];
+       size_t count;
+
+       if (!IsConnected())
+               SetConnected(true);
+
+       for (;;) {
+               {
+                       boost::mutex::scoped_lock lock(m_QueueMutex);
+
+                       count = m_SendQueue->GetAvailableBytes();
+
+                       if (count == 0)
+                               break;
+
+                       if (count > sizeof(data))
+                               count = sizeof(data);
+
+                       m_SendQueue->Peek(data, count);
+               }
+
+               rc = send(GetFD(), data, count, 0);
+
+               if (rc <= 0)
+                       throw_exception(SocketException("send() failed", GetError()));
+
+               {
+                       boost::mutex::scoped_lock lock(m_QueueMutex);
+                       m_SendQueue->Read(NULL, rc);
+               }
+       }
+}
+
+/**
+ * Processes data that can be written for this socket.
+ */
+void Socket::HandleReadableClient(void)
+{
+       if (!IsConnected())
+               SetConnected(true);
+
+       bool new_data = false;
+
+       for (;;) {
+               char data[1024];
+               int rc = recv(GetFD(), data, sizeof(data), 0);
+
+#ifdef _WIN32
+               if (rc < 0 && WSAGetLastError() == WSAEWOULDBLOCK)
+#else /* _WIN32 */
+               if (rc < 0 && errno == EAGAIN)
+#endif /* _WIN32 */
+                       break;
+
+               if (rc <= 0)
+                       throw_exception(SocketException("recv() failed", GetError()));
+
+               {
+                       boost::mutex::scoped_lock lock(m_QueueMutex);
+
+                       m_RecvQueue->Write(data, rc);
+               }
+
+               new_data = true;
+       }
+
+       if (new_data)
+               Event::Post(boost::bind(boost::ref(OnDataAvailable), GetSelf()));
+}
+
+void Socket::HandleWritableServer(void)
+{
+       throw logic_error("This should never happen.");
+}
+
+/**
+ * Accepts a new client and creates a new client object for it
+ * using the client factory function.
+ */
+void Socket::HandleReadableServer(void)
+{
+       int fd;
+       sockaddr_storage addr;
+       socklen_t addrlen = sizeof(addr);
+
+       fd = accept(GetFD(), (sockaddr *)&addr, &addrlen);
+
+       if (fd < 0)
+               throw_exception(SocketException("accept() failed", GetError()));
+
+       TcpSocket::Ptr client = boost::make_shared<TcpSocket>();
+       client->SetFD(fd);
+       Event::Post(boost::bind(boost::ref(OnNewClient), GetSelf(), client));
+}
+
+/**
+ * Checks whether data should be written for this socket object.
+ *
+ * @returns true if the socket should be registered for writing, false otherwise.
+ */
+bool Socket::WantsToWrite(void) const
+{
+       if (m_Listening)
+               return WantsToWriteServer();
+       else
+               return WantsToWriteClient();
+}
+
+/**
+ * Checks whether data should be read for this socket object.
+ *
+ * @returns true if the socket should be registered for reading, false otherwise.
+ */
+bool Socket::WantsToRead(void) const
+{
+       if (m_Listening)
+               return WantsToReadServer();
+       else
+               return WantsToReadClient();
+}
+
+/**
+ * Checks whether data should be read for this socket.
+ *
+ * @returns true
+ */
+bool Socket::WantsToReadClient(void) const
+{
+       return true;
+}
+
+/**
+ * Checks whether data should be written for this socket.
+ *
+ * @returns true if data should be written, false otherwise.
+ */
+bool Socket::WantsToWriteClient(void) const
+{
+       {
+               boost::mutex::scoped_lock lock(m_QueueMutex);
+
+               if (m_SendQueue->GetAvailableBytes() > 0)
+                       return true;
+       }
+
+       return (!IsConnected());
+}
+
+/**
+ * Checks whether the TCP server wants to write.
+ *
+ * @returns false
+ */
+bool Socket::WantsToWriteServer(void) const
+{
+       return false;
+}
+
+/**
+ * Checks whether the TCP server wants to read (i.e. accept new clients).
+ *
+ * @returns true
  */
-void Socket::CheckException(void)
+bool Socket::WantsToReadServer(void) const
 {
-       if (m_Exception)
-               rethrow_exception(m_Exception);
+       return true;
 }
index 1777f2f152e2576b821439c6c562c767a1489e24..0dad5d507f55c9bc6a996282041120934a9191c6 100644 (file)
 namespace icinga {
 
 /**
- * Base class for sockets.
+ * Base class for connection-oriented sockets.
  *
  * @ingroup base
  */
-class I2_BASE_API Socket : public Object
+class I2_BASE_API Socket : public Stream
 {
 public:
        typedef shared_ptr<Socket> Ptr;
@@ -35,18 +35,23 @@ public:
 
        ~Socket(void);
 
-       boost::signal<void (const Socket::Ptr&)> OnClosed;
-
        virtual void Start(void);
 
-       void Close(void);
+       virtual void Close(void);
 
        String GetClientAddress(void);
        String GetPeerAddress(void);
 
        bool IsConnected(void) const;
 
-       void CheckException(void);
+       size_t GetAvailableBytes(void) const;
+       size_t Read(void *buffer, size_t size);
+       size_t Peek(void *buffer, size_t size);
+       void Write(const void *buffer, size_t size);
+
+       void Listen(void);
+
+       boost::signal<void (const Socket::Ptr&, const Socket::Ptr&)> OnNewClient;
 
 protected:
        Socket(void);
@@ -59,34 +64,49 @@ protected:
        int GetError(void) const;
        static int GetLastSocketError(void);
 
-       virtual bool WantsToRead(void) const;
-       virtual bool WantsToWrite(void) const;
-
-       virtual void HandleReadable(void);
-       virtual void HandleWritable(void);
-       virtual void HandleException(void);
-
-       virtual void CloseInternal(bool from_dtor);
-
        mutable boost::mutex m_SocketMutex;
 
 private:
        SOCKET m_FD; /**< The socket descriptor. */
        bool m_Connected;
+       bool m_Listening;
 
        thread m_ReadThread;
        thread m_WriteThread;
 
        condition_variable m_WriteCV;
 
-       boost::exception_ptr m_Exception;
-
        void ReadThreadProc(void);
        void WriteThreadProc(void);
 
        void ExceptionEventHandler(void);
 
        static String GetAddressFromSockaddr(sockaddr *address, socklen_t len);
+
+       mutable boost::mutex m_QueueMutex;
+       FIFO::Ptr m_SendQueue;
+       FIFO::Ptr m_RecvQueue;
+
+       void HandleWritableClient(void);
+       void HandleReadableClient(void);
+
+       void HandleWritableServer(void);
+       void HandleReadableServer(void);
+
+       void HandleReadable(void);
+       void HandleWritable(void);
+       void HandleException(void);
+
+       bool WantsToWriteClient(void) const;
+       bool WantsToReadClient(void) const;
+
+       bool WantsToWriteServer(void) const;
+       bool WantsToReadServer(void) const;
+
+       bool WantsToWrite(void) const;
+       bool WantsToRead(void) const;
+
+       void CloseUnlocked(void);
 };
 
 /**
diff --git a/lib/base/stream.cpp b/lib/base/stream.cpp
new file mode 100644 (file)
index 0000000..fa26d68
--- /dev/null
@@ -0,0 +1,79 @@
+/******************************************************************************
+ * Icinga 2                                                                   *
+ * Copyright (C) 2012 Icinga Development Team (http://www.icinga.org/)        *
+ *                                                                            *
+ * This program is free software; you can redistribute it and/or              *
+ * modify it under the terms of the GNU General Public License                *
+ * as published by the Free Software Foundation; either version 2             *
+ * of the License, or (at your option) any later version.                     *
+ *                                                                            *
+ * This program is distributed in the hope that it will be useful,            *
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of             *
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the              *
+ * GNU General Public License for more details.                               *
+ *                                                                            *
+ * You should have received a copy of the GNU General Public License          *
+ * along with this program; if not, write to the Free Software Foundation     *
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA.             *
+ ******************************************************************************/
+
+#include "i2-base.h"
+
+using namespace icinga;
+
+Stream::Stream(void)
+       : m_Connected(false)
+{ }
+
+Stream::~Stream(void)
+{
+       assert(!m_Running);
+}
+
+bool Stream::IsConnected(void) const
+{
+       return m_Connected;
+}
+
+void Stream::SetConnected(bool connected)
+{
+       m_Connected = connected;
+
+       if (m_Connected)
+               Event::Post(boost::bind(boost::ref(OnConnected), GetSelf()));
+       else
+               Event::Post(boost::bind(boost::ref(OnClosed), GetSelf()));
+}
+
+/**
+ * Checks whether an exception is available for this socket and re-throws
+ * the exception if there is one.
+ */
+void Stream::CheckException(void)
+{
+       if (m_Exception)
+               rethrow_exception(m_Exception);
+}
+
+void Stream::SetException(boost::exception_ptr exception)
+{
+       m_Exception = exception;
+}
+
+boost::exception_ptr Stream::GetException(void)
+{
+       return m_Exception;
+}
+
+void Stream::Start(void)
+{
+       m_Running = true;
+}
+
+void Stream::Close(void)
+{
+       assert(m_Running);
+       m_Running = false;
+
+       SetConnected(false);
+}
similarity index 66%
rename from lib/base/ioqueue.h
rename to lib/base/stream.h
index 842e9461f0e6d1c2139c0517493e90ee1b506143..cdb01f3144e12c1410b9ef1cfa73f74ceb922401 100644 (file)
  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA.             *
  ******************************************************************************/
 
-#ifndef IOQUEUE_H
-#define IOQUEUE_H
+#ifndef STREAM_H
+#define STREAM_H
 
 namespace icinga
 {
 
 /**
- * An I/O queue.
+ * A stream.
  *
  * @ingroup base
  */
-class IOQueue
+class I2_BASE_API Stream : public Object
 {
 public:
+       typedef shared_ptr<Stream> Ptr;
+       typedef weak_ptr<Stream> WeakPtr;
+
+       Stream(void);
+       ~Stream(void);
+
+       virtual void Start(void);
+
        /**
         * Retrieves the number of bytes available for reading.
         *
@@ -39,37 +47,61 @@ public:
        virtual size_t GetAvailableBytes(void) const = 0;
 
        /**
-        * Reads data from the queue without advancing the read pointer. Trying
-        * to read more data than is available in the queue is a programming error.
-        * Use GetBytesAvailable() to check how much data is available.
+        * Reads data from the stream without advancing the read pointer.
         *
         * @param buffer The buffer where data should be stored. May be NULL if
         *               you're not actually interested in the data.
         * @param count The number of bytes to read from the queue.
+        * @returns The number of bytes actually read.
         */
-       virtual void Peek(void *buffer, size_t count) = 0;
+       virtual size_t Peek(void *buffer, size_t count) = 0;
 
        /**
-        * Reads data from the queue. Trying to read more data than is
-        * available in the queue is a programming error. Use GetBytesAvailable()
-        * to check how much data is available.
+        * Reads data from the stream.
         *
         * @param buffer The buffer where data should be stored. May be NULL if you're
         *               not actually interested in the data.
         * @param count The number of bytes to read from the queue.
+        * @returns The number of bytes actually read.
         */
-       virtual void Read(void *buffer, size_t count) = 0;
+       virtual size_t Read(void *buffer, size_t count) = 0;
 
        /**
-        * Writes data to the queue.
+        * Writes data to the stream.
         *
         * @param buffer The data that is to be written.
         * @param count The number of bytes to write.
         * @returns The number of bytes written
         */
        virtual void Write(const void *buffer, size_t count) = 0;
+
+       /**
+        * Closes the stream and releases resources.
+        */
+       virtual void Close(void);
+
+       bool IsConnected(void) const;
+
+       boost::exception_ptr GetException(void);
+       void CheckException(void);
+
+       boost::signal<void (const Stream::Ptr&)> OnConnected;
+       boost::signal<void (const Stream::Ptr&)> OnDataAvailable;
+       boost::signal<void (const Stream::Ptr&)> OnClosed;
+
+protected:
+       void SetConnected(bool connected);
+
+       void SetException(boost::exception_ptr exception);
+
+private:
+       bool m_Running;
+       bool m_Connected;
+       boost::exception_ptr m_Exception;
 };
 
+BIO *BIO_Stream_new(const Stream::Ptr& stream);
+
 }
 
-#endif /* IOQUEUE_H */
+#endif /* STREAM_H */
diff --git a/lib/base/stream_bio.cpp b/lib/base/stream_bio.cpp
new file mode 100644 (file)
index 0000000..ccdda1b
--- /dev/null
@@ -0,0 +1,121 @@
+#include "i2-base.h"
+
+using namespace icinga;
+
+int I2Stream_new(BIO *bi);
+int I2Stream_free(BIO *bi);
+int I2Stream_read(BIO *bi, char *out, int outl);
+int I2Stream_write(BIO *bi, const char *in, int inl);
+long I2Stream_ctrl(BIO *bi, int cmd, long num, void *ptr);
+int I2Stream_gets(BIO *bi, char *buf, int size);
+int I2Stream_puts(BIO *bi, const char *str);
+
+#define BIO_TYPE_I2STREAM              (99|0x0400|0x0100)
+
+static BIO_METHOD I2Stream_method =
+{
+       BIO_TYPE_I2STREAM,
+       "Icinga Stream",
+       I2Stream_write,
+       I2Stream_read,
+       NULL,
+       NULL,
+       I2Stream_ctrl,
+       I2Stream_new,
+       I2Stream_free,
+       NULL,
+};
+
+typedef struct I2Stream_bio_s
+{
+       Stream::Ptr Stream;
+       boost::exception_ptr Exception;
+} I2Stream_bio_t;
+
+BIO_METHOD *BIO_s_I2Stream(void)
+{
+       return &I2Stream_method;
+}
+
+BIO *icinga::BIO_new_I2Stream(const Stream::Ptr& stream)
+{
+       BIO *bi = BIO_new(BIO_s_I2Stream());
+
+       if (bi == NULL)
+               return NULL;
+
+       I2Stream_bio_t *bp = (I2Stream_bio_t *)bi->ptr;
+
+       bp->Stream = stream;
+
+       return bi;
+}
+
+void icinga::I2Stream_check_exception(BIO *bi) {
+       I2Stream_bio_t *bp = (I2Stream_bio_t *)bi->ptr;
+
+       if (bp->Exception) {
+               boost::exception_ptr ptr = bp->Exception;
+               bp->Exception = boost::exception_ptr();
+               rethrow_exception(ptr);
+       }
+}
+
+static int I2Stream_new(BIO *bi)
+{
+       bi->shutdown = 0;
+       bi->init = 1;
+       bi->num = -1;
+       bi->ptr = new I2Stream_bio_t;
+
+       return 1;
+}
+
+static int I2Stream_free(BIO *bi)
+{
+       I2Stream_bio_t *bp = (I2Stream_bio_t *)bi->ptr;
+       delete bp;
+
+       return 1;
+}
+
+static int I2Stream_read(BIO *bi, char *out, int outl)
+{
+       I2Stream_bio_t *bp = (I2Stream_bio_t *)bi->ptr;
+
+       size_t data_read;
+
+       BIO_clear_retry_flags(bi);
+
+       try {
+               data_read = bp->Stream->Read(out, outl);
+       } catch (...) {
+               bp->Exception = boost::current_exception();
+               return -1;
+       }
+
+       if (data_read == 0) {
+               BIO_set_retry_read(bi);
+               return -1;
+       }
+
+       return data_read;
+}
+
+static int I2Stream_write(BIO *bi, const char *in, int inl)
+{
+       I2Stream_bio_t *bp = (I2Stream_bio_t *)bi->ptr;
+       bp->Stream->Write(in, inl);
+
+       return inl;
+}
+
+static long I2Stream_ctrl(BIO *bi, int cmd, long num, void *ptr)
+{
+       switch (cmd) {
+               case BIO_CTRL_FLUSH:
+                       return 1;
+               default:
+                       return 0;
+       }
+}
similarity index 81%
rename from lib/remoting/jsonrpcserver.h
rename to lib/base/stream_bio.h
index 4cf9a5841c7f9a890f39c9c2898e81403e68e9c5..3f40f919557fa12775eb7fe292a15bda5c15ceed 100644 (file)
  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA.             *
  ******************************************************************************/
 
-#ifndef JSONRPCSERVER_H
-#define JSONRPCSERVER_H
+#ifndef STREAMBIO_H
+#define STREAMBIO_H
 
 namespace icinga
 {
 
-/**
- * A JSON-RPC server.
- *
- * @ingroup remoting
- */
-class I2_REMOTING_API JsonRpcServer : public TcpServer
-{
-public:
-       typedef shared_ptr<JsonRpcServer> Ptr;
-       typedef weak_ptr<JsonRpcServer> WeakPtr;
-
-       JsonRpcServer(shared_ptr<SSL_CTX> sslContext);
-};
+BIO *BIO_new_I2Stream(const Stream::Ptr& stream);
+void I2Stream_check_exception(BIO *bi);
 
 }
 
-#endif /* JSONRPCSERVER_H */
+#endif /* STREAMBIO_H */
\ No newline at end of file
diff --git a/lib/base/tcpclient.cpp b/lib/base/tcpclient.cpp
deleted file mode 100644 (file)
index c21a438..0000000
+++ /dev/null
@@ -1,256 +0,0 @@
-/******************************************************************************
- * Icinga 2                                                                   *
- * Copyright (C) 2012 Icinga Development Team (http://www.icinga.org/)        *
- *                                                                            *
- * This program is free software; you can redistribute it and/or              *
- * modify it under the terms of the GNU General Public License                *
- * as published by the Free Software Foundation; either version 2             *
- * of the License, or (at your option) any later version.                     *
- *                                                                            *
- * This program is distributed in the hope that it will be useful,            *
- * but WITHOUT ANY WARRANTY; without even the implied warranty of             *
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the              *
- * GNU General Public License for more details.                               *
- *                                                                            *
- * You should have received a copy of the GNU General Public License          *
- * along with this program; if not, write to the Free Software Foundation     *
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA.             *
- ******************************************************************************/
-
-#include "i2-base.h"
-
-using namespace icinga;
-
-/**
- * Constructor for the TcpClient class.
- *
- * @param role The role of the TCP client socket.
- */
-TcpClient::TcpClient(TcpClientRole role)
-       : m_SendQueue(boost::make_shared<FIFO>()),
-         m_RecvQueue(boost::make_shared<FIFO>()),
-         m_Role(role)
-{ }
-
-/**
- * Retrieves the role of the socket.
- *
- * @returns The role.
- */
-TcpClientRole TcpClient::GetRole(void) const
-{
-       return m_Role;
-}
-
-/**
- * Creates a socket and connects to the specified node and service.
- *
- * @param node The node.
- * @param service The service.
- */
-void TcpClient::Connect(const String& node, const String& service)
-{
-       m_Role = RoleOutbound;
-
-       addrinfo hints;
-       addrinfo *result;
-
-       memset(&hints, 0, sizeof(hints));
-       hints.ai_family = AF_UNSPEC;
-       hints.ai_socktype = SOCK_STREAM;
-       hints.ai_protocol = IPPROTO_TCP;
-
-       int rc = getaddrinfo(node.CStr(), service.CStr(), &hints, &result);
-
-       if (rc < 0)
-               throw_exception(SocketException("getaddrinfo() failed", GetLastSocketError()));
-
-       int fd = INVALID_SOCKET;
-
-       for (addrinfo *info = result; info != NULL; info = info->ai_next) {
-               fd = socket(info->ai_family, info->ai_socktype, info->ai_protocol);
-
-               if (fd == INVALID_SOCKET)
-                       continue;
-
-               SetFD(fd);
-
-               rc = connect(fd, info->ai_addr, info->ai_addrlen);
-
-#ifdef _WIN32
-               if (rc < 0 && WSAGetLastError() != WSAEWOULDBLOCK) {
-#else /* _WIN32 */
-               if (rc < 0 && errno != EINPROGRESS) {
-#endif /* _WIN32 */
-                       closesocket(fd);
-                       SetFD(INVALID_SOCKET);
-
-                       continue;
-               }
-
-               if (rc >= 0) {
-                       SetConnected(true);
-                       OnConnected(GetSelf());
-               }
-
-               break;
-       }
-
-       freeaddrinfo(result);
-
-       if (fd == INVALID_SOCKET)
-               throw_exception(runtime_error("Could not create a suitable socket."));
-}
-
-/**
- * Processes data that is available for this socket.
- */
-void TcpClient::HandleWritable(void)
-{
-       int rc;
-       char data[1024];
-       size_t count;
-
-       if (!IsConnected()) {
-               SetConnected(true);
-               Event::Post(boost::bind(boost::cref(OnConnected), GetSelf()));
-       }
-
-       for (;;) {
-               {
-                       boost::mutex::scoped_lock lock(m_QueueMutex);
-
-                       count = m_SendQueue->GetAvailableBytes();
-
-                       if (count == 0)
-                               break;
-
-                       if (count > sizeof(data))
-                               count = sizeof(data);
-
-                       m_SendQueue->Peek(data, count);
-               }
-
-               rc = send(GetFD(), data, count, 0);
-
-               if (rc <= 0)
-                       throw_exception(SocketException("send() failed", GetError()));
-
-               {
-                       boost::mutex::scoped_lock lock(m_QueueMutex);
-                       m_SendQueue->Read(NULL, rc);
-               }
-       }
-}
-
-/**
- * Implements IOQueue::GetAvailableBytes.
- */
-size_t TcpClient::GetAvailableBytes(void) const
-{
-       boost::mutex::scoped_lock lock(m_QueueMutex);
-
-       return m_RecvQueue->GetAvailableBytes();
-}
-
-/**
- * Implements IOQueue::Peek.
- */
-void TcpClient::Peek(void *buffer, size_t count)
-{
-       boost::mutex::scoped_lock lock(m_QueueMutex);
-
-       m_RecvQueue->Peek(buffer, count);
-}
-
-/**
- * Implements IOQueue::Read.
- */
-void TcpClient::Read(void *buffer, size_t count)
-{
-       boost::mutex::scoped_lock lock(m_QueueMutex);
-
-       m_RecvQueue->Read(buffer, count);
-}
-
-/**
- * Implements IOQueue::Write.
- */
-void TcpClient::Write(const void *buffer, size_t count)
-{
-       boost::mutex::scoped_lock lock(m_QueueMutex);
-
-       m_SendQueue->Write(buffer, count);
-}
-
-/**
- * Processes data that can be written for this socket.
- */
-void TcpClient::HandleReadable(void)
-{
-       if (!IsConnected()) {
-               SetConnected(true);
-               Event::Post(boost::bind(boost::cref(OnConnected), GetSelf()));
-       }
-
-       for (;;) {
-               char data[1024];
-               int rc = recv(GetFD(), data, sizeof(data), 0);
-
-       #ifdef _WIN32
-               if (rc < 0 && WSAGetLastError() == WSAEWOULDBLOCK)
-       #else /* _WIN32 */
-               if (rc < 0 && errno == EAGAIN)
-       #endif /* _WIN32 */
-                       return;
-
-               if (rc <= 0)
-                       throw_exception(SocketException("recv() failed", GetError()));
-
-               {
-                       boost::mutex::scoped_lock lock(m_QueueMutex);
-
-                       m_RecvQueue->Write(data, rc);
-               }
-       }
-
-       Event::Post(boost::bind(boost::ref(OnDataAvailable), GetSelf()));
-}
-
-/**
- * Checks whether data should be read for this socket.
- *
- * @returns true
- */
-bool TcpClient::WantsToRead(void) const
-{
-       return true;
-}
-
-/**
- * Checks whether data should be written for this socket.
- *
- * @returns true if data should be written, false otherwise.
- */
-bool TcpClient::WantsToWrite(void) const
-{
-       {
-               boost::mutex::scoped_lock lock(m_QueueMutex);
-
-               if (m_SendQueue->GetAvailableBytes() > 0)
-                       return true;
-       }
-
-       return (!IsConnected());
-}
-
-/**
- * Default factory function for TCP clients.
- *
- * @param role The role of the new client.
- * @returns The new client.
- */
-TcpClient::Ptr icinga::TcpClientFactory(TcpClientRole role)
-{
-       return boost::make_shared<TcpClient>(role);
-}
diff --git a/lib/base/tcpclient.h b/lib/base/tcpclient.h
deleted file mode 100644 (file)
index 64d33fe..0000000
+++ /dev/null
@@ -1,90 +0,0 @@
-/******************************************************************************
- * Icinga 2                                                                   *
- * Copyright (C) 2012 Icinga Development Team (http://www.icinga.org/)        *
- *                                                                            *
- * This program is free software; you can redistribute it and/or              *
- * modify it under the terms of the GNU General Public License                *
- * as published by the Free Software Foundation; either version 2             *
- * of the License, or (at your option) any later version.                     *
- *                                                                            *
- * This program is distributed in the hope that it will be useful,            *
- * but WITHOUT ANY WARRANTY; without even the implied warranty of             *
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the              *
- * GNU General Public License for more details.                               *
- *                                                                            *
- * You should have received a copy of the GNU General Public License          *
- * along with this program; if not, write to the Free Software Foundation     *
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA.             *
- ******************************************************************************/
-
-#ifndef TCPCLIENT_H
-#define TCPCLIENT_H
-
-namespace icinga
-{
-
-/**
- * The role of a TCP client object.
- *
- * @ingroup base
- */
-enum TcpClientRole
-{
-       RoleInbound, /**< Inbound socket, i.e. one that was returned
-                         from accept(). */
-       RoleOutbound /**< Outbound socket, i.e. one that is connect()'d to a
-                         remote socket. */
-};
-
-/**
- * A TCP client connection.
- *
- * @ingroup base
- */
-class I2_BASE_API TcpClient : public TcpSocket, public IOQueue
-{
-public:
-       typedef shared_ptr<TcpClient> Ptr;
-       typedef weak_ptr<TcpClient> WeakPtr;
-
-       TcpClient(TcpClientRole role);
-
-       TcpClientRole GetRole(void) const;
-
-       void Connect(const String& node, const String& service);
-
-       boost::signal<void (const TcpClient::Ptr&)> OnConnected;
-       boost::signal<void (const TcpClient::Ptr&)> OnDataAvailable;
-
-       virtual size_t GetAvailableBytes(void) const;
-       virtual void Peek(void *buffer, size_t count);
-       virtual void Read(void *buffer, size_t count);
-       virtual void Write(const void *buffer, size_t count);
-
-protected:
-       virtual bool WantsToRead(void) const;
-       virtual bool WantsToWrite(void) const;
-
-       virtual void HandleReadable(void);
-       virtual void HandleWritable(void);
-
-       mutable boost::mutex m_QueueMutex;
-       FIFO::Ptr m_SendQueue;
-       FIFO::Ptr m_RecvQueue;
-
-private:
-       TcpClientRole m_Role;
-};
-
-/**
- * Returns a new unconnected TcpClient object that has the specified
- * connection role.
- *
- * @param role The role of the new object.
- * @returns A new TcpClient object.
- */
-TcpClient::Ptr TcpClientFactory(TcpClientRole role);
-
-}
-
-#endif /* TCPCLIENT_H */
diff --git a/lib/base/tcpserver.cpp b/lib/base/tcpserver.cpp
deleted file mode 100644 (file)
index 4bfb3db..0000000
+++ /dev/null
@@ -1,89 +0,0 @@
-/******************************************************************************
- * Icinga 2                                                                   *
- * Copyright (C) 2012 Icinga Development Team (http://www.icinga.org/)        *
- *                                                                            *
- * This program is free software; you can redistribute it and/or              *
- * modify it under the terms of the GNU General Public License                *
- * as published by the Free Software Foundation; either version 2             *
- * of the License, or (at your option) any later version.                     *
- *                                                                            *
- * This program is distributed in the hope that it will be useful,            *
- * but WITHOUT ANY WARRANTY; without even the implied warranty of             *
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the              *
- * GNU General Public License for more details.                               *
- *                                                                            *
- * You should have received a copy of the GNU General Public License          *
- * along with this program; if not, write to the Free Software Foundation     *
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA.             *
- ******************************************************************************/
-
-#include "i2-base.h"
-
-using namespace icinga;
-
-/**
- * Constructor for the TcpServer class.
- */
-TcpServer::TcpServer(void)
-       : m_ClientFactory(boost::bind(&TcpClientFactory, RoleInbound))
-{ }
-
-/**
- * Sets the client factory.
- *
- * @param clientFactory The client factory function.
- */
-void TcpServer::SetClientFactory(const TcpServer::ClientFactory& clientFactory)
-{
-       m_ClientFactory = clientFactory;
-}
-
-/**
- * Retrieves the client factory.
- *
- * @returns The client factory function.
- */
-TcpServer::ClientFactory TcpServer::GetFactoryFunction(void) const
-{
-       return m_ClientFactory;
-}
-
-/**
- * Starts listening for incoming client connections.
- */
-void TcpServer::Listen(void)
-{
-       if (listen(GetFD(), SOMAXCONN) < 0)
-               throw_exception(SocketException("listen() failed", GetError()));
-}
-
-/**
- * Checks whether the TCP server wants to read (i.e. accept new clients).
- *
- * @returns true
- */
-bool TcpServer::WantsToRead(void) const
-{
-       return true;
-}
-
-/**
- * Accepts a new client and creates a new client object for it
- * using the client factory function.
- */
-void TcpServer::HandleReadable(void)
-{
-       int fd;
-       sockaddr_storage addr;
-       socklen_t addrlen = sizeof(addr);
-
-       fd = accept(GetFD(), (sockaddr *)&addr, &addrlen);
-
-       if (fd < 0)
-               throw_exception(SocketException("accept() failed", GetError()));
-
-       TcpClient::Ptr client = m_ClientFactory(fd);
-
-       Event::Post(boost::bind(boost::ref(OnNewClient), GetSelf(), client));
-}
-
index 2e7bf3d2a34ef791c8f744b114a0b7f4ade13d01..c42499211e456f77b062e005b03f7503a9b23548 100644 (file)
 
 using namespace icinga;
 
-/**
- * Creates a socket.
- *
- * @param family The socket family for the new socket.
- */
-void TcpSocket::MakeSocket(int family)
-{
-       assert(GetFD() == INVALID_SOCKET);
-
-       int fd = socket(family, SOCK_STREAM, 0);
-
-       if (fd == INVALID_SOCKET)
-               throw_exception(SocketException("socket() failed", GetLastSocketError()));
-
-       SetFD(fd);
-}
-
 /**
  * Creates a socket and binds it to the specified service.
  *
@@ -110,3 +93,61 @@ void TcpSocket::Bind(String node, String service, int family)
        if (fd == INVALID_SOCKET)
                throw_exception(runtime_error("Could not create a suitable socket."));
 }
+
+/**
+ * Creates a socket and connects to the specified node and service.
+ *
+ * @param node The node.
+ * @param service The service.
+ */
+void TcpSocket::Connect(const String& node, const String& service)
+{
+       addrinfo hints;
+       addrinfo *result;
+
+       memset(&hints, 0, sizeof(hints));
+       hints.ai_family = AF_UNSPEC;
+       hints.ai_socktype = SOCK_STREAM;
+       hints.ai_protocol = IPPROTO_TCP;
+
+       int rc = getaddrinfo(node.CStr(), service.CStr(), &hints, &result);
+
+       if (rc < 0)
+               throw_exception(SocketException("getaddrinfo() failed", GetLastSocketError()));
+
+       int fd = INVALID_SOCKET;
+
+       for (addrinfo *info = result; info != NULL; info = info->ai_next) {
+               fd = socket(info->ai_family, info->ai_socktype, info->ai_protocol);
+
+               if (fd == INVALID_SOCKET)
+                       continue;
+
+               SetFD(fd);
+
+               rc = connect(fd, info->ai_addr, info->ai_addrlen);
+
+#ifdef _WIN32
+               if (rc < 0 && WSAGetLastError() != WSAEWOULDBLOCK) {
+#else /* _WIN32 */
+               if (rc < 0 && errno != EINPROGRESS) {
+#endif /* _WIN32 */
+                       closesocket(fd);
+                       SetFD(INVALID_SOCKET);
+
+                       continue;
+               }
+
+               if (rc >= 0) {
+                       SetConnected(true);
+                       OnConnected(GetSelf());
+               }
+
+               break;
+       }
+
+       freeaddrinfo(result);
+
+       if (fd == INVALID_SOCKET)
+               throw_exception(runtime_error("Could not create a suitable socket."));
+}
index e1027e1085e514827f5932cd365121e7095b0b53..2d8f6b4c3079f76077c08cb90b9d05339f46e230 100644 (file)
@@ -37,8 +37,7 @@ public:
        void Bind(String service, int family);
        void Bind(String node, String service, int family);
 
-private:
-       void MakeSocket(int family);
+       void Connect(const String& node, const String& service);
 };
 
 }
similarity index 50%
rename from lib/base/tlsclient.cpp
rename to lib/base/tlsstream.cpp
index 1f4f2a27049069c07a3ed36e02c7f43cacb39ada..947ed9194cdcf75721d114d1940043ab5d4058cd 100644 (file)
 
 using namespace icinga;
 
-int I2_EXPORT TlsClient::m_SSLIndex;
-bool I2_EXPORT TlsClient::m_SSLIndexInitialized = false;
+int I2_EXPORT TlsStream::m_SSLIndex;
+bool I2_EXPORT TlsStream::m_SSLIndexInitialized = false;
 
 /**
- * Constructor for the TlsClient class.
+ * Constructor for the TlsStream class.
  *
  * @param role The role of the client.
  * @param sslContext The SSL context for the client.
  */
-TlsClient::TlsClient(TcpClientRole role, shared_ptr<SSL_CTX> sslContext)
-       : TcpClient(role), m_SSLContext(sslContext),
-         m_BlockRead(false), m_BlockWrite(false)
-{ }
+TlsStream::TlsStream(const Stream::Ptr& innerStream, TlsRole role, shared_ptr<SSL_CTX> sslContext)
+       : m_InnerStream(innerStream), m_Role(role), m_SSLContext(sslContext),
+         m_SendQueue(boost::make_shared<FIFO>()), m_RecvQueue(boost::make_shared<FIFO>())
+{
+       m_InnerStream->OnDataAvailable.connect(boost::bind(&TlsStream::DataAvailableHandler, this));
+       m_InnerStream->OnClosed.connect(boost::bind(&TlsStream::ClosedHandler, this));
+       m_SendQueue->Start();
+       m_RecvQueue->Start();
+}
 
-void TlsClient::Start(void)
+void TlsStream::Start(void)
 {
        m_SSL = shared_ptr<SSL>(SSL_new(m_SSLContext.get()), SSL_free);
 
@@ -48,7 +53,7 @@ void TlsClient::Start(void)
                throw_exception(logic_error("No X509 client certificate was specified."));
 
        if (!m_SSLIndexInitialized) {
-               m_SSLIndex = SSL_get_ex_new_index(0, const_cast<char *>("TlsClient"), NULL, NULL, NULL);
+               m_SSLIndex = SSL_get_ex_new_index(0, const_cast<char *>("TlsStream"), NULL, NULL, NULL);
                m_SSLIndexInitialized = true;
        }
 
@@ -56,21 +61,24 @@ void TlsClient::Start(void)
 
        SSL_set_verify(m_SSL.get(), SSL_VERIFY_PEER | SSL_VERIFY_FAIL_IF_NO_PEER_CERT, NULL);
 
-       SSL_set_fd(m_SSL.get(), GetFD());
+       m_BIO = BIO_new_I2Stream(m_InnerStream);
+       SSL_set_bio(m_SSL.get(), m_BIO, m_BIO);
 
-       if (GetRole() == RoleInbound)
+       if (m_Role == TlsRoleServer)
                SSL_set_accept_state(m_SSL.get());
        else
                SSL_set_connect_state(m_SSL.get());
 
-       int rc = SSL_do_handshake(m_SSL.get());
+       /*int rc = SSL_do_handshake(m_SSL.get());
 
        if (rc == 1) {
                SetConnected(true);
                OnConnected(GetSelf());
-       }
+       }*/
 
-       Socket::Start();
+       Stream::Start();
+
+       HandleIO();
 }
 
 /**
@@ -78,10 +86,8 @@ void TlsClient::Start(void)
  *
  * @returns The X509 certificate.
  */
-shared_ptr<X509> TlsClient::GetClientCertificate(void) const
+shared_ptr<X509> TlsStream::GetClientCertificate(void) const
 {
-       boost::mutex::scoped_lock lock(m_SocketMutex);
-
        return shared_ptr<X509>(SSL_get_certificate(m_SSL.get()), &Utility::NullDeleter);
 }
 
@@ -90,180 +96,146 @@ shared_ptr<X509> TlsClient::GetClientCertificate(void) const
  *
  * @returns The X509 certificate.
  */
-shared_ptr<X509> TlsClient::GetPeerCertificate(void) const
+shared_ptr<X509> TlsStream::GetPeerCertificate(void) const
 {
-       boost::mutex::scoped_lock lock(m_SocketMutex);
-
        return shared_ptr<X509>(SSL_get_peer_certificate(m_SSL.get()), X509_free);
 }
 
+void TlsStream::DataAvailableHandler(void)
+{
+       try {
+               HandleIO();
+       } catch (...) {
+               SetException(boost::current_exception());
+
+               Close();
+       }
+}
+
+void TlsStream::ClosedHandler(void)
+{
+       SetException(m_InnerStream->GetException());
+       Close();
+}
+
 /**
- * Processes data that is available for this socket.
+ * Processes data for the stream.
  */
-void TlsClient::HandleReadable(void)
+void TlsStream::HandleIO(void)
 {
-       m_BlockRead = false;
-       m_BlockWrite = false;
+       char data[16 * 1024];
+       int rc;
 
-       for (;;) {
-               char data[1024];
-               int rc;
+       if (!IsConnected()) {
+               rc = SSL_do_handshake(m_SSL.get());
 
-               if (IsConnected()) {
-                       rc = SSL_read(m_SSL.get(), data, sizeof(data));
+               if (rc == 1) {
+                       SetConnected(true);
                } else {
-                       rc = SSL_do_handshake(m_SSL.get());
-
-                       if (rc == 1) {
-                               SetConnected(true);
-                               Event::Post(boost::bind(boost::cref(OnConnected), GetSelf()));
-                               return;
-                       }
-               }
-
-               if (rc <= 0) {
                        switch (SSL_get_error(m_SSL.get(), rc)) {
                                case SSL_ERROR_WANT_WRITE:
-                                       m_BlockRead = true;
                                        /* fall through */
                                case SSL_ERROR_WANT_READ:
-                                       goto post_event;
+                                       return;
                                case SSL_ERROR_ZERO_RETURN:
-                                       CloseInternal(false);
-                                       goto post_event;
+                                       Close();
+                                       return;
                                default:
-                                       throw_exception(OpenSSLException("SSL_read failed", ERR_get_error()));
+                                       I2Stream_check_exception(m_BIO);
+                                       throw_exception(OpenSSLException("SSL_do_handshake failed", ERR_get_error()));
                        }
                }
+       }
 
-               if (IsConnected()) {
-                       boost::mutex::scoped_lock lock(m_QueueMutex);
+       bool new_data = false, read_ok = true;
 
+       while (read_ok) {
+               rc = SSL_read(m_SSL.get(), data, sizeof(data));
+
+               if (rc > 0) {
                        m_RecvQueue->Write(data, rc);
+                       new_data = true;
+               } else {
+                       switch (SSL_get_error(m_SSL.get(), rc)) {
+                               case SSL_ERROR_WANT_WRITE:
+                                       /* fall through */
+                               case SSL_ERROR_WANT_READ:
+                                       read_ok = false;
+                                       break;
+                               case SSL_ERROR_ZERO_RETURN:
+                                       Close();
+                                       return;
+                               default:
+                                       I2Stream_check_exception(m_BIO);
+                                       throw_exception(OpenSSLException("SSL_read failed", ERR_get_error()));
+                       }
                }
        }
 
-post_event:
-       Event::Post(boost::bind(boost::ref(OnDataAvailable), GetSelf()));
-}
+       if (new_data)
+               OnDataAvailable(GetSelf());
 
-/**
- * Processes data that can be written for this socket.
- */
-void TlsClient::HandleWritable(void)
-{
-       m_BlockRead = false;
-       m_BlockWrite = false;
-
-       char data[1024];
-       size_t count;
+       while (m_SendQueue->GetAvailableBytes() > 0) {
+               size_t count = m_SendQueue->GetAvailableBytes();
 
-       for (;;) {
-               int rc;
+               if (count == 0)
+                       break;
 
-               if (IsConnected()) {
-                       {
-                               boost::mutex::scoped_lock lock(m_QueueMutex);
+               if (count > sizeof(data))
+                       count = sizeof(data);
 
-                               count = m_SendQueue->GetAvailableBytes();
+               m_SendQueue->Peek(data, count);
 
-                               if (count == 0)
-                                       break;
+               rc = SSL_write(m_SSL.get(), (const char *)data, count);
 
-                               if (count > sizeof(data))
-                                       count = sizeof(data);
-
-                               m_SendQueue->Peek(data, count);
-                       }
-
-                       rc = SSL_write(m_SSL.get(), (const char *)data, count);
+               if (rc > 0) {
+                       m_SendQueue->Read(NULL, rc);
                } else {
-                       rc = SSL_do_handshake(m_SSL.get());
-
-                       if (rc == 1) {
-                               SetConnected(true);
-                               Event::Post(boost::bind(boost::cref(OnConnected), GetSelf()));
-                               return;
-                       }
-               }
-
-               if (rc <= 0) {
                        switch (SSL_get_error(m_SSL.get(), rc)) {
                                case SSL_ERROR_WANT_READ:
-                                       m_BlockWrite = true;
                                        /* fall through */
                                case SSL_ERROR_WANT_WRITE:
                                        return;
                                case SSL_ERROR_ZERO_RETURN:
-                                       CloseInternal(false);
+                                       Close();
                                        return;
                                default:
+                                       I2Stream_check_exception(m_BIO);
                                        throw_exception(OpenSSLException("SSL_write failed", ERR_get_error()));
                        }
                }
-
-               if (IsConnected()) {
-                       boost::mutex::scoped_lock lock(m_QueueMutex);
-
-                       m_SendQueue->Read(NULL, rc);
-               }
        }
 }
 
 /**
- * Checks whether data should be read for this socket.
- *
- * @returns true if data should be read, false otherwise.
+ * Closes the stream.
  */
-bool TlsClient::WantsToRead(void) const
+void TlsStream::Close(void)
 {
-       if (SSL_want_read(m_SSL.get()))
-               return true;
-
-       if (m_BlockRead)
-               return false;
+       if (m_SSL)
+               SSL_shutdown(m_SSL.get());
 
-       return TcpClient::WantsToRead();
+       Stream::Close();
 }
 
-/**
- * Checks whether data should be written for this socket.
- *
- * @returns true if data should be written, false otherwise.
- */
-bool TlsClient::WantsToWrite(void) const
+size_t TlsStream::GetAvailableBytes(void) const
 {
-       if (SSL_want_write(m_SSL.get()))
-               return true;
-
-       if (m_BlockWrite)
-               return false;
-
-       return TcpClient::WantsToWrite();
+       return m_RecvQueue->GetAvailableBytes();
 }
 
-/**
- * Closes the socket.
- *
- * @param from_dtor Whether this method was invoked from the destructor.
- */
-void TlsClient::CloseInternal(bool from_dtor)
+size_t TlsStream::Peek(void *buffer, size_t count)
 {
-       if (m_SSL)
-               SSL_shutdown(m_SSL.get());
-
-       TcpClient::CloseInternal(from_dtor);
+       return m_RecvQueue->Peek(buffer, count);
 }
 
-/**
- * Factory function for the TlsClient class.
- *
- * @param role The role of the TLS socket.
- * @param sslContext The SSL context for the socket.
- * @returns A new TLS socket.
- */
-TcpClient::Ptr icinga::TlsClientFactory(TcpClientRole role, shared_ptr<SSL_CTX> sslContext)
+size_t TlsStream::Read(void *buffer, size_t count)
 {
-       return boost::make_shared<TlsClient>(role, sslContext);
+       return m_RecvQueue->Read(buffer, count);
 }
 
+void TlsStream::Write(const void *buffer, size_t count)
+{
+       m_SendQueue->Write(buffer, count);
+
+       HandleIO();
+}
similarity index 69%
rename from lib/base/tlsclient.h
rename to lib/base/tlsstream.h
index 61d8d349620193a737059123f64623a7347f4411..f0d278355c7680469f14949d7f3df0d6e82ba72a 100644 (file)
  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA.             *
  ******************************************************************************/
 
-#ifndef TLSCLIENT_H
-#define TLSCLIENT_H
+#ifndef TLSSTREAM_H
+#define TLSSTREAM_H
 
 namespace icinga
 {
 
+typedef enum
+{
+       TlsRoleClient,
+       TlsRoleServer
+} TlsRole;
+
 /**
- * A TLS client connection.
+ * A TLS stream.
  *
  * @ingroup base
  */
-class I2_BASE_API TlsClient : public TcpClient
+class I2_BASE_API TlsStream : public Stream
 {
 public:
-       TlsClient(TcpClientRole role, shared_ptr<SSL_CTX> sslContext);
+       typedef shared_ptr<TlsStream> Ptr;
+       typedef weak_ptr<TlsStream> WeakPtr;
 
-       virtual void Start(void);
+       TlsStream(const Stream::Ptr& innerStream, TlsRole role, shared_ptr<SSL_CTX> sslContext);
 
        shared_ptr<X509> GetClientCertificate(void) const;
        shared_ptr<X509> GetPeerCertificate(void) const;
 
-protected:
-       void HandleSSLError(void);
+       void Start(void);
+       virtual void Close(void);
 
-       virtual bool WantsToRead(void) const;
-       virtual bool WantsToWrite(void) const;
+       virtual size_t GetAvailableBytes(void) const;
+       virtual size_t Peek(void *buffer, size_t count);
+       virtual size_t Read(void *buffer, size_t count);
+       virtual void Write(const void *buffer, size_t count);
+
+protected:
+       void DataAvailableHandler(void);
+       void ClosedHandler(void);
 
-       virtual void HandleReadable(void);
-       virtual void HandleWritable(void);
+       void HandleIO(void);
 
 private:
        shared_ptr<SSL_CTX> m_SSLContext;
        shared_ptr<SSL> m_SSL;
+       BIO *m_BIO;
 
-       bool m_BlockRead;
-       bool m_BlockWrite;
+       FIFO::Ptr m_SendQueue;
+       FIFO::Ptr m_RecvQueue;
+
+       Stream::Ptr m_InnerStream;
+       TlsRole m_Role;
 
        static int m_SSLIndex;
        static bool m_SSLIndexInitialized;
 
-       virtual void CloseInternal(bool from_dtor);
-
        static void NullCertificateDeleter(X509 *certificate);
-};
 
-TcpClient::Ptr TlsClientFactory(TcpClientRole role, shared_ptr<SSL_CTX> sslContext);
+};
 
 }
 
-#endif /* TLSCLIENT_H */
+#endif /* TLSSTREAM_H */
index 4f5417bb98d2c0d42649fc6806f77916aaac14db..eee7801c7a9495708bc5916921bd37359dfbb4a1 100644 (file)
@@ -10,10 +10,8 @@ libremoting_la_SOURCES = \
        endpointmanager.cpp \
        endpointmanager.h \
        i2-remoting.h \
-       jsonrpcclient.cpp \
-       jsonrpcclient.h \
-       jsonrpcserver.cpp \
-       jsonrpcserver.h \
+       jsonrpcconnection.cpp \
+       jsonrpcconnection.h \
        messagepart.cpp \
        messagepart.h \
        requestmessage.cpp \
index c2dcb775d4d09157e6608cfa56f8aa672bd38940..1746189e750ad93a0b09885e5cf409a86da1bb0b 100644 (file)
@@ -111,37 +111,18 @@ bool Endpoint::IsConnected(void) const
        if (IsLocalEndpoint()) {
                return true;
        } else {
-               JsonRpcClient::Ptr client = GetClient();
+               JsonRpcConnection::Ptr client = GetClient();
 
-               return (client && client->IsConnected());
+               return (client && client->GetStream()->IsConnected());
        }
 }
 
-/**
- * Retrieves the address for the endpoint.
- *
- * @returns The endpoint's address.
- */
-String Endpoint::GetAddress(void) const
-{
-       if (IsLocalEndpoint()) {
-               return "local:" + GetName();
-       } else {
-               JsonRpcClient::Ptr client = GetClient();
-
-               if (!client || !client->IsConnected())
-                       return "<disconnected endpoint>";
-
-               return client->GetPeerAddress();
-       }
-}
-
-JsonRpcClient::Ptr Endpoint::GetClient(void) const
+JsonRpcConnection::Ptr Endpoint::GetClient(void) const
 {
        return Get("client");
 }
 
-void Endpoint::SetClient(const JsonRpcClient::Ptr& client)
+void Endpoint::SetClient(const JsonRpcConnection::Ptr& client)
 {
        Set("client", client);
        client->OnNewMessage.connect(boost::bind(&Endpoint::NewMessageHandler, this, _2));
@@ -337,14 +318,14 @@ void Endpoint::NewMessageHandler(const MessagePart& message)
 
 void Endpoint::ClientClosedHandler(void)
 {
-       try {
+       /*try {
                GetClient()->CheckException();
        } catch (const exception& ex) {
                stringstream message;
                message << "Error occured for JSON-RPC socket: Message=" << ex.what();
 
                Logger::Write(LogWarning, "jsonrpc", message.str());
-       }
+       }*/
 
        Logger::Write(LogWarning, "jsonrpc", "Lost connection to endpoint: identity=" + GetName());
 
index bfa6a92361f467a653b6d7544c5c765871b3411a..744469723916a7e9a9110c4eefedfb652260f75f 100644 (file)
@@ -43,10 +43,8 @@ public:
        static bool Exists(const String& name);
        static Endpoint::Ptr GetByName(const String& name);
 
-       String GetAddress(void) const;
-
-       JsonRpcClient::Ptr GetClient(void) const;
-       void SetClient(const JsonRpcClient::Ptr& client);
+       JsonRpcConnection::Ptr GetClient(void) const;
+       void SetClient(const JsonRpcConnection::Ptr& client);
 
        void RegisterSubscription(const String& topic);
        void UnregisterSubscription(const String& topic);
index a0cb240e8f8320abd213614fd3e0588cc1d4c5ea..c25801addb0624672207712047abdd7df88f4df8 100644 (file)
@@ -110,11 +110,11 @@ void EndpointManager::AddListener(const String& service)
        s << "Adding new listener: port " << service;
        Logger::Write(LogInformation, "icinga", s.str());
 
-       JsonRpcServer::Ptr server = boost::make_shared<JsonRpcServer>(sslContext);
+       TcpSocket::Ptr server = boost::make_shared<TcpSocket>();
 
        m_Servers.insert(server);
        server->OnNewClient.connect(boost::bind(&EndpointManager::NewClientHandler,
-          this, _2));
+          this, _2, TlsRoleServer));
 
        server->Bind(service, AF_INET6);
        server->Listen();
@@ -133,9 +133,9 @@ void EndpointManager::AddConnection(const String& node, const String& service) {
        if (!sslContext)
                throw_exception(logic_error("SSL context is required for AddConnection()"));
 
-       JsonRpcClient::Ptr client = boost::make_shared<JsonRpcClient>(RoleOutbound, sslContext);
+       TcpSocket::Ptr client = boost::make_shared<TcpSocket>();
        client->Connect(node, service);
-       NewClientHandler(client);
+       NewClientHandler(client, TlsRoleClient);
 }
 
 /**
@@ -143,27 +143,30 @@ void EndpointManager::AddConnection(const String& node, const String& service) {
  *
  * @param client The new client.
  */
-void EndpointManager::NewClientHandler(const TcpClient::Ptr& client)
+void EndpointManager::NewClientHandler(const Socket::Ptr& client, TlsRole role)
 {
-       JsonRpcClient::Ptr jclient = static_pointer_cast<JsonRpcClient>(client);
+       String peerAddress = client->GetPeerAddress();
+       TlsStream::Ptr tlsStream = boost::make_shared<TlsStream>(client, role, GetSSLContext());
+       tlsStream->Start();
 
-       m_PendingClients.insert(jclient);
-       jclient->OnConnected.connect(boost::bind(&EndpointManager::ClientConnectedHandler, this, _1));
-       jclient->Start();
+       m_PendingClients.insert(tlsStream);
+       tlsStream->OnConnected.connect(boost::bind(&EndpointManager::ClientConnectedHandler, this, _1, peerAddress));
+
+       client->Start();
 }
 
-void EndpointManager::ClientConnectedHandler(const TcpClient::Ptr& client)
+void EndpointManager::ClientConnectedHandler(const Stream::Ptr& client, const String& peerAddress)
 {
-       JsonRpcClient::Ptr jclient = static_pointer_cast<JsonRpcClient>(client);
-
-       Logger::Write(LogInformation, "icinga", "New client connection for " + jclient->GetPeerAddress());
+       TlsStream::Ptr tlsStream = static_pointer_cast<TlsStream>(client);
+       JsonRpcConnection::Ptr jclient = boost::make_shared<JsonRpcConnection>(tlsStream);
 
-       m_PendingClients.erase(jclient);
-
-       shared_ptr<X509> cert = jclient->GetPeerCertificate();
+       m_PendingClients.erase(tlsStream);
 
+       shared_ptr<X509> cert = tlsStream->GetPeerCertificate();
        String identity = Utility::GetCertificateCN(cert);
 
+       Logger::Write(LogInformation, "icinga", "New client connection at " + peerAddress + " for identity '" + identity + "'");
+
        Endpoint::Ptr endpoint;
 
        if (Endpoint::Exists(identity))
index 72c0ba910292de6d3d5505681c07b584c863c7be..9ae73ce439dbea777c636393ba8f3c77ed43ec80 100644 (file)
@@ -70,8 +70,8 @@ private:
 
        Timer::Ptr m_ReconnectTimer;
 
-       set<JsonRpcServer::Ptr> m_Servers;
-       set<JsonRpcClient::Ptr> m_PendingClients;
+       set<TcpSocket::Ptr> m_Servers;
+       set<TlsStream::Ptr> m_PendingClients;
 
        /**
         * Information about a pending API request.
@@ -101,8 +101,8 @@ private:
 
        void ReconnectTimerHandler(void);
 
-       void NewClientHandler(const TcpClient::Ptr& client);
-       void ClientConnectedHandler(const TcpClient::Ptr& client);
+       void NewClientHandler(const Socket::Ptr& client, TlsRole rol);
+       void ClientConnectedHandler(const Stream::Ptr& client, const String& peerAddress);
 };
 
 }
index 132a2314bbdcb3345e133acd04270ac3a9a158e4..cb18bf81eea2e1651662bac7d0ea3b3fe5f6025f 100644 (file)
@@ -39,8 +39,7 @@
 #include "messagepart.h"
 #include "requestmessage.h"
 #include "responsemessage.h"
-#include "jsonrpcclient.h"
-#include "jsonrpcserver.h"
+#include "jsonrpcconnection.h"
 #include "endpoint.h"
 #include "endpointmanager.h"
 
similarity index 68%
rename from lib/remoting/jsonrpcclient.cpp
rename to lib/remoting/jsonrpcconnection.cpp
index 4fd3497468687c2f9684b12b92eb8e00eafa8069..22ec9744f0fe4e521809c41fd02147b77d951ce4 100644 (file)
 using namespace icinga;
 
 /**
- * Constructor for the JsonRpcClient class.
+ * Constructor for the JsonRpcConnection class.
  *
- * @param role The role of the underlying TCP client.
- * @param sslContext SSL context for the TLS connection.
+ * @param stream The stream.
  */
-JsonRpcClient::JsonRpcClient(TcpClientRole role, shared_ptr<SSL_CTX> sslContext)
-       : TlsClient(role, sslContext)
-{
-       OnDataAvailable.connect(boost::bind(&JsonRpcClient::DataAvailableHandler,
-           this));
-}
+JsonRpcConnection::JsonRpcConnection(const Stream::Ptr& stream)
+       : Connection(stream)
+{ }
 
 /**
  * Sends a message to the connected peer.
  *
  * @param message The message.
  */
-void JsonRpcClient::SendMessage(const MessagePart& message)
+void JsonRpcConnection::SendMessage(const MessagePart& message)
 {
        Value value = message.GetDictionary();
        String json = value.Serialize();
        //std::cerr << ">> " << json << std::endl;
-       NetString::WriteStringToIOQueue(this, json);
+       NetString::WriteStringToStream(GetStream(), json);
 }
 
 /**
  * Processes inbound data.
  */
-void JsonRpcClient::DataAvailableHandler(void)
+void JsonRpcConnection::ProcessData(void)
 {
        String jsonString;
 
-       while (NetString::ReadStringFromIOQueue(this, &jsonString)) {
+       while (NetString::ReadStringFromStream(GetStream(), &jsonString)) {
                //std::cerr << "<< " << jsonString << std::endl;
 
                try {
@@ -73,20 +69,3 @@ void JsonRpcClient::DataAvailableHandler(void)
                }
        }
 }
-
-/**
- * Factory function for JSON-RPC clients.
- *
- * @param fd The file descriptor.
- * @param role The role of the underlying TCP client.
- * @param sslContext SSL context for the TLS connection.
- * @returns A new JSON-RPC client.
- */
-JsonRpcClient::Ptr icinga::JsonRpcClientFactory(SOCKET fd, TcpClientRole role,
-    shared_ptr<SSL_CTX> sslContext)
-{
-       JsonRpcClient::Ptr client = boost::make_shared<JsonRpcClient>(role,
-           sslContext);
-       client->SetFD(fd);
-       return client;
-}
similarity index 70%
rename from lib/remoting/jsonrpcclient.h
rename to lib/remoting/jsonrpcconnection.h
index c1e719ab403b7dca9615d0eb91699c628071c993..84589e3bc81b8e679ae18ed6db8ef1ee2f06e376 100644 (file)
  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA.             *
  ******************************************************************************/
 
-#ifndef JSONRPCCLIENT_H
-#define JSONRPCCLIENT_H
+#ifndef JSONRPCCONNECTION_H
+#define JSONRPCCONNECTION_H
 
 namespace icinga
 {
 
 /**
- * A JSON-RPC client.
+ * A JSON-RPC connection.
  *
  * @ingroup remoting
  */
-class I2_REMOTING_API JsonRpcClient : public TlsClient
+class I2_REMOTING_API JsonRpcConnection : public Connection
 {
 public:
-       typedef shared_ptr<JsonRpcClient> Ptr;
-       typedef weak_ptr<JsonRpcClient> WeakPtr;
+       typedef shared_ptr<JsonRpcConnection> Ptr;
+       typedef weak_ptr<JsonRpcConnection> WeakPtr;
 
-       JsonRpcClient(TcpClientRole role, shared_ptr<SSL_CTX> sslContext);
+       JsonRpcConnection(const Stream::Ptr& stream);
 
        void SendMessage(const MessagePart& message);
 
-       boost::signal<void (const JsonRpcClient::Ptr&, const MessagePart&)> OnNewMessage;
+       boost::signal<void (const JsonRpcConnection::Ptr&, const MessagePart&)> OnNewMessage;
 
-private:
-       void DataAvailableHandler(void);
-
-       friend JsonRpcClient::Ptr JsonRpcClientFactory(SOCKET fd, TcpClientRole role, shared_ptr<SSL_CTX> sslContext);
+protected:
+       virtual void ProcessData(void);
 };
 
-JsonRpcClient::Ptr JsonRpcClientFactory(SOCKET fd, TcpClientRole role, shared_ptr<SSL_CTX> sslContext);
-
 }
 
-#endif /* JSONRPCCLIENT_H */
+#endif /* JSONRPCCONNECTION_H */
index 10c03c96a51c760dda978152adbfda70475966b1..2d79775ce4de59209c9221a936a24b3cd382f83f 100644 (file)
     <ClInclude Include="endpoint.h" />
     <ClInclude Include="endpointmanager.h" />
     <ClInclude Include="i2-remoting.h" />
-    <ClInclude Include="jsonrpcclient.h" />
+    <ClInclude Include="jsonrpcconnection.h" />
     <ClInclude Include="requestmessage.h" />
     <ClInclude Include="responsemessage.h" />
-    <ClInclude Include="jsonrpcserver.h" />
     <ClInclude Include="messagepart.h" />
   </ItemGroup>
   <ItemGroup>
       <PrecompiledHeader Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">Create</PrecompiledHeader>
       <PrecompiledHeader Condition="'$(Configuration)|$(Platform)'=='Debug|x64'">Create</PrecompiledHeader>
     </ClCompile>
-    <ClCompile Include="jsonrpcclient.cpp" />
+    <ClCompile Include="jsonrpcconnection.cpp" />
     <ClCompile Include="requestmessage.cpp" />
     <ClCompile Include="responsemessage.cpp" />
-    <ClCompile Include="jsonrpcserver.cpp" />
     <ClCompile Include="messagepart.cpp" />
   </ItemGroup>
   <PropertyGroup Label="Globals">
index 1e576678466014bbc7e6d3e21f9ee7c2318e6aba..268ba65c32c8880ced1258990fcbb4fa32bae2f6 100644 (file)
@@ -1,15 +1,9 @@
 <?xml version="1.0" encoding="utf-8"?>
 <Project ToolsVersion="4.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
   <ItemGroup>
-    <ClCompile Include="jsonrpcclient.cpp">
-      <Filter>Quelldateien</Filter>
-    </ClCompile>
     <ClCompile Include="responsemessage.cpp">
       <Filter>Quelldateien</Filter>
     </ClCompile>
-    <ClCompile Include="jsonrpcserver.cpp">
-      <Filter>Quelldateien</Filter>
-    </ClCompile>
     <ClCompile Include="messagepart.cpp">
       <Filter>Quelldateien</Filter>
     </ClCompile>
     <ClCompile Include="endpointmanager.cpp">
       <Filter>Quelldateien</Filter>
     </ClCompile>
+    <ClCompile Include="jsonrpcconnection.cpp">
+      <Filter>Quelldateien</Filter>
+    </ClCompile>
   </ItemGroup>
   <ItemGroup>
-    <ClInclude Include="jsonrpcclient.h">
-      <Filter>Headerdateien</Filter>
-    </ClInclude>
-    <ClInclude Include="jsonrpcserver.h">
-      <Filter>Headerdateien</Filter>
-    </ClInclude>
     <ClInclude Include="messagepart.h">
       <Filter>Headerdateien</Filter>
     </ClInclude>
@@ -51,6 +42,9 @@
     <ClInclude Include="endpointmanager.h">
       <Filter>Headerdateien</Filter>
     </ClInclude>
+    <ClInclude Include="jsonrpcconnection.h">
+      <Filter>Headerdateien</Filter>
+    </ClInclude>
   </ItemGroup>
   <ItemGroup>
     <Filter Include="Headerdateien">