*/
void ApiListener::AddConnection(const Endpoint::Ptr& endpoint)
{
- {
- ObjectLock olock(this);
+ namespace asio = boost::asio;
+ using asio::ip::tcp;
- auto sslContext (m_SSLContext);
+ auto sslContext (m_SSLContext);
- if (!sslContext) {
- Log(LogCritical, "ApiListener", "SSL context is required for AddConnection()");
- return;
- }
+ if (!sslContext) {
+ Log(LogCritical, "ApiListener", "SSL context is required for AddConnection()");
+ return;
}
- String host = endpoint->GetHost();
- String port = endpoint->GetPort();
-
- Log(LogInformation, "ApiListener")
- << "Reconnecting to endpoint '" << endpoint->GetName() << "' via host '" << host << "' and port '" << port << "'";
-
- TcpSocket::Ptr client = new TcpSocket();
-
- try {
- client->Connect(host, port);
+ auto& io (IoEngine::Get().GetIoService());
- NewClientHandler(client, endpoint->GetName(), RoleClient);
+ asio::spawn(io, [this, endpoint, &io, sslContext](asio::yield_context yc) {
+ String host = endpoint->GetHost();
+ String port = endpoint->GetPort();
- endpoint->SetConnecting(false);
Log(LogInformation, "ApiListener")
- << "Finished reconnecting to endpoint '" << endpoint->GetName() << "' via host '" << host << "' and port '" << port << "'";
- } catch (const std::exception& ex) {
- endpoint->SetConnecting(false);
- client->Close();
-
- std::ostringstream info;
- info << "Cannot connect to host '" << host << "' on port '" << port << "'";
- Log(LogCritical, "ApiListener", info.str());
- Log(LogDebug, "ApiListener")
- << info.str() << "\n" << DiagnosticInformation(ex);
- }
-}
-
-void ApiListener::NewClientHandler(const Socket::Ptr& client, const String& hostname, ConnectionRole role)
-{
- try {
- NewClientHandlerInternal(client, hostname, role);
- } catch (const std::exception& ex) {
- Log(LogCritical, "ApiListener")
- << "Exception while handling new API client connection: " << DiagnosticInformation(ex, false);
-
- Log(LogDebug, "ApiListener")
- << "Exception while handling new API client connection: " << DiagnosticInformation(ex);
- }
-}
-
-/**
- * Processes a new client connection.
- *
- * @param client The new client.
- */
-void ApiListener::NewClientHandlerInternal(const Socket::Ptr& client, const String& hostname, ConnectionRole role)
-{
- CONTEXT("Handling new API client connection");
-
- String conninfo;
+ << "Reconnecting to endpoint '" << endpoint->GetName() << "' via host '" << host << "' and port '" << port << "'";
- if (role == RoleClient)
- conninfo = "to";
- else
- conninfo = "from";
-
- conninfo += " " + client->GetPeerAddress();
-
- TlsStream::Ptr tlsStream;
-
- String environmentName = Application::GetAppEnvironment();
-
- String serverName = hostname;
-
- if (!environmentName.IsEmpty())
- serverName += ":" + environmentName;
-
- {
- ObjectLock olock(this);
try {
- tlsStream = new TlsStream(client, serverName, role, m_SSLContext);
- } catch (const std::exception&) {
- Log(LogCritical, "ApiListener")
- << "Cannot create TLS stream from client connection (" << conninfo << ")";
- return;
- }
- }
+ auto sslConn (std::make_shared<AsioTlsStream>(io, *sslContext));
- try {
- tlsStream->Handshake();
- } catch (const std::exception& ex) {
- Log(LogCritical, "ApiListener")
- << "Client TLS handshake failed (" << conninfo << "): " << DiagnosticInformation(ex, false);
- tlsStream->Close();
- return;
- }
+ {
+ tcp::resolver resolver (io);
+ tcp::resolver::query query (host, port);
+ auto result (resolver.async_resolve(query, yc));
+ auto current (result.begin());
- std::shared_ptr<X509> cert = tlsStream->GetPeerCertificate();
- String identity;
- Endpoint::Ptr endpoint;
- bool verify_ok = false;
+ for (;;) {
+ auto& tcpConn (sslConn->lowest_layer());
- if (cert) {
- try {
- identity = GetCertificateCN(cert);
- } catch (const std::exception&) {
- Log(LogCritical, "ApiListener")
- << "Cannot get certificate common name from cert path: '" << GetDefaultCertPath() << "'.";
- tlsStream->Close();
- return;
- }
+ try {
+ tcpConn.open(current->endpoint().protocol());
+ tcpConn.set_option(tcp::socket::keep_alive(true));
+ tcpConn.async_connect(current->endpoint(), yc);
- verify_ok = tlsStream->IsVerifyOK();
- if (!hostname.IsEmpty()) {
- if (identity != hostname) {
- Log(LogWarning, "ApiListener")
- << "Unexpected certificate common name while connecting to endpoint '"
- << hostname << "': got '" << identity << "'";
- tlsStream->Close();
- return;
- } else if (!verify_ok) {
- Log(LogWarning, "ApiListener")
- << "Certificate validation failed for endpoint '" << hostname
- << "': " << tlsStream->GetVerifyError();
+ break;
+ } catch (const std::exception&) {
+ if (++current == result.end()) {
+ throw;
+ }
+
+ if (tcpConn.is_open()) {
+ tcpConn.close();
+ }
+ }
+ }
}
- }
-
- if (verify_ok)
- endpoint = Endpoint::GetByName(identity);
-
- {
- Log log(LogInformation, "ApiListener");
- log << "New client connection for identity '" << identity << "' " << conninfo;
+ NewClientHandler(yc, sslConn, endpoint->GetName(), RoleClient);
- if (!verify_ok)
- log << " (certificate validation failed: " << tlsStream->GetVerifyError() << ")";
- else if (!endpoint)
- log << " (no Endpoint object found for identity)";
- }
- } else {
- Log(LogInformation, "ApiListener")
- << "New client connection " << conninfo << " (no client certificate)";
- }
-
- ClientType ctype;
-
- if (role == RoleClient) {
- Dictionary::Ptr message = new Dictionary({
- { "jsonrpc", "2.0" },
- { "method", "icinga::Hello" },
- { "params", new Dictionary() }
- });
-
- JsonRpc::SendMessage(tlsStream, message);
- ctype = ClientJsonRpc;
- } else {
- tlsStream->WaitForData(10);
-
- if (!tlsStream->IsDataAvailable()) {
- if (identity.IsEmpty())
- Log(LogInformation, "ApiListener")
- << "No data received on new API connection. "
- << "Ensure that the remote endpoints are properly configured in a cluster setup.";
- else
- Log(LogWarning, "ApiListener")
- << "No data received on new API connection for identity '" << identity << "'. "
- << "Ensure that the remote endpoints are properly configured in a cluster setup.";
- tlsStream->Close();
- return;
- }
-
- char firstByte;
- tlsStream->Peek(&firstByte, 1, false);
-
- if (firstByte >= '0' && firstByte <= '9')
- ctype = ClientJsonRpc;
- else
- ctype = ClientHttp;
- }
-
- if (ctype == ClientJsonRpc) {
- Log(LogNotice, "ApiListener", "New JSON-RPC client");
-
- JsonRpcConnection::Ptr aclient = new JsonRpcConnection(identity, verify_ok, tlsStream, role);
- aclient->Start();
-
- if (endpoint) {
- bool needSync = !endpoint->GetConnected();
-
- endpoint->AddClient(aclient);
+ endpoint->SetConnecting(false);
+ Log(LogInformation, "ApiListener")
+ << "Finished reconnecting to endpoint '" << endpoint->GetName() << "' via host '" << host << "' and port '" << port << "'";
+ } catch (const std::exception& ex) {
+ endpoint->SetConnecting(false);
- m_SyncQueue.Enqueue(std::bind(&ApiListener::SyncClient, this, aclient, endpoint, needSync));
- } else {
- if (!AddAnonymousClient(aclient)) {
- Log(LogNotice, "ApiListener")
- << "Ignoring anonymous JSON-RPC connection " << conninfo
- << ". Max connections (" << GetMaxAnonymousClients() << ") exceeded.";
- aclient->Disconnect();
- }
+ std::ostringstream info;
+ info << "Cannot connect to host '" << host << "' on port '" << port << "'";
+ Log(LogCritical, "ApiListener", info.str());
+ Log(LogDebug, "ApiListener")
+ << info.str() << "\n" << DiagnosticInformation(ex);
}
- }
+ });
}
void ApiListener::NewClientHandler(boost::asio::yield_context yc, const std::shared_ptr<AsioTlsStream>& client, const String& hostname, ConnectionRole role)
/* Set connecting state to prevent duplicated queue inserts later. */
endpoint->SetConnecting(true);
- /* Use dynamic thread pool with additional on demand resources with fast throughput. */
- EnqueueAsyncCallback(std::bind(&ApiListener::AddConnection, this, endpoint), LowLatencyScheduler);
+ AddConnection(endpoint);
}
}