]> granicus.if.org Git - icinga2/commitdiff
ApiListener: connect(2) via Boost ASIO
authorAlexander A. Klimov <alexander.klimov@icinga.com>
Mon, 18 Feb 2019 13:56:45 +0000 (14:56 +0100)
committerAlexander A. Klimov <alexander.klimov@icinga.com>
Mon, 1 Apr 2019 09:40:14 +0000 (11:40 +0200)
lib/remote/apilistener.cpp
lib/remote/apilistener.hpp

index c0b75aecf7b51f1d566ed3aac3ebd9cca7e240ae..b5fc8e18bf691fb90b6c5764689869e054563535 100644 (file)
@@ -430,210 +430,70 @@ void ApiListener::ListenerCoroutineProc(boost::asio::yield_context yc, const std
  */
 void ApiListener::AddConnection(const Endpoint::Ptr& endpoint)
 {
-       {
-               ObjectLock olock(this);
+       namespace asio = boost::asio;
+       using asio::ip::tcp;
 
-               auto sslContext (m_SSLContext);
+       auto sslContext (m_SSLContext);
 
-               if (!sslContext) {
-                       Log(LogCritical, "ApiListener", "SSL context is required for AddConnection()");
-                       return;
-               }
+       if (!sslContext) {
+               Log(LogCritical, "ApiListener", "SSL context is required for AddConnection()");
+               return;
        }
 
-       String host = endpoint->GetHost();
-       String port = endpoint->GetPort();
-
-       Log(LogInformation, "ApiListener")
-               << "Reconnecting to endpoint '" << endpoint->GetName() << "' via host '" << host << "' and port '" << port << "'";
-
-       TcpSocket::Ptr client = new TcpSocket();
-
-       try {
-               client->Connect(host, port);
+       auto& io (IoEngine::Get().GetIoService());
 
-               NewClientHandler(client, endpoint->GetName(), RoleClient);
+       asio::spawn(io, [this, endpoint, &io, sslContext](asio::yield_context yc) {
+               String host = endpoint->GetHost();
+               String port = endpoint->GetPort();
 
-               endpoint->SetConnecting(false);
                Log(LogInformation, "ApiListener")
-                               << "Finished reconnecting to endpoint '" << endpoint->GetName() << "' via host '" << host << "' and port '" << port << "'";
-       } catch (const std::exception& ex) {
-               endpoint->SetConnecting(false);
-               client->Close();
-
-               std::ostringstream info;
-               info << "Cannot connect to host '" << host << "' on port '" << port << "'";
-               Log(LogCritical, "ApiListener", info.str());
-               Log(LogDebug, "ApiListener")
-                       << info.str() << "\n" << DiagnosticInformation(ex);
-       }
-}
-
-void ApiListener::NewClientHandler(const Socket::Ptr& client, const String& hostname, ConnectionRole role)
-{
-       try {
-               NewClientHandlerInternal(client, hostname, role);
-       } catch (const std::exception& ex) {
-               Log(LogCritical, "ApiListener")
-                       << "Exception while handling new API client connection: " << DiagnosticInformation(ex, false);
-
-               Log(LogDebug, "ApiListener")
-                       << "Exception while handling new API client connection: " << DiagnosticInformation(ex);
-       }
-}
-
-/**
- * Processes a new client connection.
- *
- * @param client The new client.
- */
-void ApiListener::NewClientHandlerInternal(const Socket::Ptr& client, const String& hostname, ConnectionRole role)
-{
-       CONTEXT("Handling new API client connection");
-
-       String conninfo;
+                       << "Reconnecting to endpoint '" << endpoint->GetName() << "' via host '" << host << "' and port '" << port << "'";
 
-       if (role == RoleClient)
-               conninfo = "to";
-       else
-               conninfo = "from";
-
-       conninfo += " " + client->GetPeerAddress();
-
-       TlsStream::Ptr tlsStream;
-
-       String environmentName = Application::GetAppEnvironment();
-
-       String serverName = hostname;
-
-       if (!environmentName.IsEmpty())
-               serverName += ":" + environmentName;
-
-       {
-               ObjectLock olock(this);
                try {
-                       tlsStream = new TlsStream(client, serverName, role, m_SSLContext);
-               } catch (const std::exception&) {
-                       Log(LogCritical, "ApiListener")
-                               << "Cannot create TLS stream from client connection (" << conninfo << ")";
-                       return;
-               }
-       }
+                       auto sslConn (std::make_shared<AsioTlsStream>(io, *sslContext));
 
-       try {
-               tlsStream->Handshake();
-       } catch (const std::exception& ex) {
-               Log(LogCritical, "ApiListener")
-                       << "Client TLS handshake failed (" << conninfo << "): " << DiagnosticInformation(ex, false);
-               tlsStream->Close();
-               return;
-       }
+                       {
+                               tcp::resolver resolver (io);
+                               tcp::resolver::query query (host, port);
+                               auto result (resolver.async_resolve(query, yc));
+                               auto current (result.begin());
 
-       std::shared_ptr<X509> cert = tlsStream->GetPeerCertificate();
-       String identity;
-       Endpoint::Ptr endpoint;
-       bool verify_ok = false;
+                               for (;;) {
+                                       auto& tcpConn (sslConn->lowest_layer());
 
-       if (cert) {
-               try {
-                       identity = GetCertificateCN(cert);
-               } catch (const std::exception&) {
-                       Log(LogCritical, "ApiListener")
-                               << "Cannot get certificate common name from cert path: '" << GetDefaultCertPath() << "'.";
-                       tlsStream->Close();
-                       return;
-               }
+                                       try {
+                                               tcpConn.open(current->endpoint().protocol());
+                                               tcpConn.set_option(tcp::socket::keep_alive(true));
+                                               tcpConn.async_connect(current->endpoint(), yc);
 
-               verify_ok = tlsStream->IsVerifyOK();
-               if (!hostname.IsEmpty()) {
-                       if (identity != hostname) {
-                               Log(LogWarning, "ApiListener")
-                                       << "Unexpected certificate common name while connecting to endpoint '"
-                                       << hostname << "': got '" << identity << "'";
-                               tlsStream->Close();
-                               return;
-                       } else if (!verify_ok) {
-                               Log(LogWarning, "ApiListener")
-                                       << "Certificate validation failed for endpoint '" << hostname
-                                       << "': " << tlsStream->GetVerifyError();
+                                               break;
+                                       } catch (const std::exception&) {
+                                               if (++current == result.end()) {
+                                                       throw;
+                                               }
+
+                                               if (tcpConn.is_open()) {
+                                                       tcpConn.close();
+                                               }
+                                       }
+                               }
                        }
-               }
-
-               if (verify_ok)
-                       endpoint = Endpoint::GetByName(identity);
-
-               {
-                       Log log(LogInformation, "ApiListener");
 
-                       log << "New client connection for identity '" << identity << "' " << conninfo;
+                       NewClientHandler(yc, sslConn, endpoint->GetName(), RoleClient);
 
-                       if (!verify_ok)
-                               log << " (certificate validation failed: " << tlsStream->GetVerifyError() << ")";
-                       else if (!endpoint)
-                               log << " (no Endpoint object found for identity)";
-               }
-       } else {
-               Log(LogInformation, "ApiListener")
-                       << "New client connection " << conninfo << " (no client certificate)";
-       }
-
-       ClientType ctype;
-
-       if (role == RoleClient) {
-               Dictionary::Ptr message = new Dictionary({
-                       { "jsonrpc", "2.0" },
-                       { "method", "icinga::Hello" },
-                       { "params", new Dictionary() }
-               });
-
-               JsonRpc::SendMessage(tlsStream, message);
-               ctype = ClientJsonRpc;
-       } else {
-               tlsStream->WaitForData(10);
-
-               if (!tlsStream->IsDataAvailable()) {
-                       if (identity.IsEmpty())
-                               Log(LogInformation, "ApiListener")
-                                       << "No data received on new API connection. "
-                                       << "Ensure that the remote endpoints are properly configured in a cluster setup.";
-                       else
-                               Log(LogWarning, "ApiListener")
-                                       << "No data received on new API connection for identity '" << identity << "'. "
-                                       << "Ensure that the remote endpoints are properly configured in a cluster setup.";
-                       tlsStream->Close();
-                       return;
-               }
-
-               char firstByte;
-               tlsStream->Peek(&firstByte, 1, false);
-
-               if (firstByte >= '0' && firstByte <= '9')
-                       ctype = ClientJsonRpc;
-               else
-                       ctype = ClientHttp;
-       }
-
-       if (ctype == ClientJsonRpc) {
-               Log(LogNotice, "ApiListener", "New JSON-RPC client");
-
-               JsonRpcConnection::Ptr aclient = new JsonRpcConnection(identity, verify_ok, tlsStream, role);
-               aclient->Start();
-
-               if (endpoint) {
-                       bool needSync = !endpoint->GetConnected();
-
-                       endpoint->AddClient(aclient);
+                       endpoint->SetConnecting(false);
+                       Log(LogInformation, "ApiListener")
+                               << "Finished reconnecting to endpoint '" << endpoint->GetName() << "' via host '" << host << "' and port '" << port << "'";
+               } catch (const std::exception& ex) {
+                       endpoint->SetConnecting(false);
 
-                       m_SyncQueue.Enqueue(std::bind(&ApiListener::SyncClient, this, aclient, endpoint, needSync));
-               } else {
-                       if (!AddAnonymousClient(aclient)) {
-                               Log(LogNotice, "ApiListener")
-                                       << "Ignoring anonymous JSON-RPC connection " << conninfo
-                                       << ". Max connections (" << GetMaxAnonymousClients() << ") exceeded.";
-                               aclient->Disconnect();
-                       }
+                       std::ostringstream info;
+                       info << "Cannot connect to host '" << host << "' on port '" << port << "'";
+                       Log(LogCritical, "ApiListener", info.str());
+                       Log(LogDebug, "ApiListener")
+                               << info.str() << "\n" << DiagnosticInformation(ex);
                }
-       }
+       });
 }
 
 void ApiListener::NewClientHandler(boost::asio::yield_context yc, const std::shared_ptr<AsioTlsStream>& client, const String& hostname, ConnectionRole role)
@@ -1004,8 +864,7 @@ void ApiListener::ApiReconnectTimerHandler()
                        /* Set connecting state to prevent duplicated queue inserts later. */
                        endpoint->SetConnecting(true);
 
-                       /* Use dynamic thread pool with additional on demand resources with fast throughput. */
-                       EnqueueAsyncCallback(std::bind(&ApiListener::AddConnection, this, endpoint), LowLatencyScheduler);
+                       AddConnection(endpoint);
                }
        }
 
index 09383749362e0eb4587529fc5dd55925965a18d3..b88f1de1e351a2e30358e5b747fad2a73803fcc3 100644 (file)
@@ -130,9 +130,6 @@ private:
        bool AddListener(const String& node, const String& service);
        void AddConnection(const Endpoint::Ptr& endpoint);
 
-       void NewClientHandler(const Socket::Ptr& client, const String& hostname, ConnectionRole role);
-       void NewClientHandlerInternal(const Socket::Ptr& client, const String& hostname, ConnectionRole role);
-
        void NewClientHandler(boost::asio::yield_context yc, const std::shared_ptr<AsioTlsStream>& client, const String& hostname, ConnectionRole role);
        void NewClientHandlerInternal(boost::asio::yield_context yc, const std::shared_ptr<AsioTlsStream>& client, const String& hostname, ConnectionRole role);
        void ListenerCoroutineProc(boost::asio::yield_context yc, const std::shared_ptr<boost::asio::ip::tcp::acceptor>& server, const std::shared_ptr<boost::asio::ssl::context>& sslContext);