]> granicus.if.org Git - icinga2/commitdiff
Use new I/O engine in ElasticsearchWriter 7135/head
authorAlexander A. Klimov <alexander.klimov@icinga.com>
Tue, 23 Apr 2019 11:15:38 +0000 (13:15 +0200)
committerAlexander A. Klimov <alexander.klimov@icinga.com>
Tue, 23 Apr 2019 12:33:19 +0000 (14:33 +0200)
lib/perfdata/elasticsearchwriter.cpp
lib/perfdata/elasticsearchwriter.hpp

index a8409f6354258af42cf4b90a0db8ca7a55b978ab..8ea4fdef16af34d75928883824d8ec2a407a5909 100644 (file)
@@ -8,7 +8,9 @@
 #include "icinga/compatutility.hpp"
 #include "icinga/service.hpp"
 #include "icinga/checkcommand.hpp"
+#include "base/application.hpp"
 #include "base/defer.hpp"
+#include "base/io-engine.hpp"
 #include "base/tcpsocket.hpp"
 #include "base/stream.hpp"
 #include "base/base64.hpp"
 #include "base/exception.hpp"
 #include "base/statsfunction.hpp"
 #include <boost/algorithm/string.hpp>
+#include <boost/asio/ssl/context.hpp>
+#include <boost/beast/core/flat_buffer.hpp>
+#include <boost/beast/http/field.hpp>
+#include <boost/beast/http/message.hpp>
+#include <boost/beast/http/parser.hpp>
+#include <boost/beast/http/read.hpp>
+#include <boost/beast/http/status.hpp>
+#include <boost/beast/http/string_body.hpp>
+#include <boost/beast/http/verb.hpp>
+#include <boost/beast/http/write.hpp>
 #include <boost/scoped_array.hpp>
+#include <memory>
+#include <string>
 #include <utility>
 
 using namespace icinga;
@@ -418,6 +432,9 @@ void ElasticsearchWriter::Flush()
 
 void ElasticsearchWriter::SendRequest(const String& body)
 {
+       namespace beast = boost::beast;
+       namespace http = beast::http;
+
        Url::Ptr url = new Url();
 
        url->SetScheme(GetEnableTls() ? "https" : "http");
@@ -441,7 +458,7 @@ void ElasticsearchWriter::SendRequest(const String& body)
 
        url->SetPath(path);
 
-       Stream::Ptr stream;
+       OptionalTlsStream stream;
 
        try {
                stream = Connect();
@@ -451,67 +468,74 @@ void ElasticsearchWriter::SendRequest(const String& body)
                return;
        }
 
-       if (!stream)
-               return;
+       Defer s ([&stream]() {
+               if (stream.first) {
+                       stream.first->next_layer().shutdown();
+               }
+       });
 
-       Defer close ([&stream]() { stream->Close(); });
+       http::request<http::string_body> request (http::verb::post, std::string(url->Format(true)), 10);
 
-       HttpRequest req(stream);
+       request.set(http::field::user_agent, "Icinga/" + Application::GetAppVersion());
+       request.set(http::field::host, url->GetHost() + ":" + url->GetPort());
 
        /* Specify required headers by Elasticsearch. */
-       req.AddHeader("Accept", "application/json");
+       request.set(http::field::accept, "application/json");
 
        /* Use application/x-ndjson for bulk streams. While ES
         * is able to handle application/json, the newline separator
         * causes problems with Logstash (#6609).
         */
-       req.AddHeader("Content-Type", "application/x-ndjson");
+       request.set(http::field::content_type, "application/x-ndjson");
 
        /* Send authentication if configured. */
        String username = GetUsername();
        String password = GetPassword();
 
        if (!username.IsEmpty() && !password.IsEmpty())
-               req.AddHeader("Authorization", "Basic " + Base64::Encode(username + ":" + password));
+               request.set(http::field::authorization, "Basic " + Base64::Encode(username + ":" + password));
 
-       req.RequestMethod = "POST";
-       req.RequestUrl = url;
+       request.body() = body;
+       request.set(http::field::content_length, request.body().size());
 
        /* Don't log the request body to debug log, this is already done above. */
        Log(LogDebug, "ElasticsearchWriter")
-               << "Sending " << req.RequestMethod << " request" << ((!username.IsEmpty() && !password.IsEmpty()) ? " with basic auth" : "" )
+               << "Sending " << request.method_string() << " request" << ((!username.IsEmpty() && !password.IsEmpty()) ? " with basic auth" : "" )
                << " to '" << url->Format() << "'.";
 
        try {
-               req.WriteBody(body.CStr(), body.GetLength());
-               req.Finish();
-       } catch (const std::exception& ex) {
+               if (stream.first) {
+                       http::write(*stream.first, request);
+                       stream.first->flush();
+               } else {
+                       http::write(*stream.second, request);
+                       stream.second->flush();
+               }
+       } catch (const std::exception&) {
                Log(LogWarning, "ElasticsearchWriter")
                        << "Cannot write to HTTP API on host '" << GetHost() << "' port '" << GetPort() << "'.";
-               throw ex;
+               throw;
        }
 
-       HttpResponse resp(stream, req);
-       StreamReadContext context;
+       http::parser<false, http::string_body> parser;
+       beast::flat_buffer buf;
 
        try {
-               resp.Parse(context, true);
-               while (resp.Parse(context, true) && !resp.Complete)
-                       ; /* Do nothing */
+               if (stream.first) {
+                       http::read(*stream.first, buf, parser);
+               } else {
+                       http::read(*stream.second, buf, parser);
+               }
        } catch (const std::exception& ex) {
                Log(LogWarning, "ElasticsearchWriter")
                        << "Failed to parse HTTP response from host '" << GetHost() << "' port '" << GetPort() << "': " << DiagnosticInformation(ex, false);
-               throw ex;
+               throw;
        }
 
-       if (!resp.Complete) {
-               Log(LogWarning, "ElasticsearchWriter")
-                       << "Failed to read a complete HTTP response from the Elasticsearch server.";
-               return;
-       }
+       auto& response (parser.get());
 
-       if (resp.StatusCode > 299) {
-               if (resp.StatusCode == 401) {
+       if (response.result_int() > 299) {
+               if (response.result() == http::status::unauthorized) {
                        /* More verbose error logging with Elasticsearch is hidden behind a proxy. */
                        if (!username.IsEmpty() && !password.IsEmpty()) {
                                Log(LogCritical, "ElasticsearchWriter")
@@ -526,30 +550,27 @@ void ElasticsearchWriter::SendRequest(const String& body)
                }
 
                std::ostringstream msgbuf;
-               msgbuf << "Unexpected response code " << resp.StatusCode << " from URL '" << req.RequestUrl->Format() << "'";
+               msgbuf << "Unexpected response code " << response.result_int() << " from URL '" << url->Format() << "'";
 
-               String contentType = resp.Headers->Get("content-type");
+               auto& contentType (response[http::field::content_type]);
 
                if (contentType != "application/json" && contentType != "application/json; charset=utf-8") {
                        msgbuf << "; Unexpected Content-Type: '" << contentType << "'";
                }
 
-               size_t responseSize = resp.GetBodySize();
-               boost::scoped_array<char> buffer(new char[responseSize + 1]);
-               resp.ReadBody(buffer.get(), responseSize);
-               buffer.get()[responseSize] = '\0';
+               auto& body (response.body());
 
 #ifdef I2_DEBUG
-               msgbuf << "; Response body: '" << buffer.get() << "'";
+               msgbuf << "; Response body: '" << body << "'";
 #endif /* I2_DEBUG */
 
-               /* {"statusCode":404,"error":"Not Found","message":"Not Found"} */
                Dictionary::Ptr jsonResponse;
+
                try {
-                       jsonResponse = JsonDecode(buffer.get());
+                       jsonResponse = JsonDecode(body);
                } catch (...) {
                        Log(LogWarning, "ElasticsearchWriter")
-                               << "Unable to parse JSON response:\n" << buffer.get();
+                               << "Unable to parse JSON response:\n" << body;
                        return;
                }
 
@@ -557,51 +578,54 @@ void ElasticsearchWriter::SendRequest(const String& body)
 
                Log(LogCritical, "ElasticsearchWriter")
                        << "Error: '" << error << "'. " << msgbuf.str();
-
-               return;
        }
 }
 
-Stream::Ptr ElasticsearchWriter::Connect()
+OptionalTlsStream ElasticsearchWriter::Connect()
 {
-       TcpSocket::Ptr socket = new TcpSocket();
-
        Log(LogNotice, "ElasticsearchWriter")
                << "Connecting to Elasticsearch on host '" << GetHost() << "' port '" << GetPort() << "'.";
 
-       try {
-               socket->Connect(GetHost(), GetPort());
-       } catch (const std::exception& ex) {
-               Log(LogWarning, "ElasticsearchWriter")
-                       << "Can't connect to Elasticsearch on host '" << GetHost() << "' port '" << GetPort() << "'.";
-               throw ex;
-       }
+       OptionalTlsStream stream;
+       bool tls = GetEnableTls();
 
-       if (GetEnableTls()) {
-               std::shared_ptr<SSL_CTX> sslContext;
+       if (tls) {
+               std::shared_ptr<boost::asio::ssl::context> sslContext;
 
                try {
-                       sslContext = MakeSSLContext(GetCertPath(), GetKeyPath(), GetCaPath());
-               } catch (const std::exception& ex) {
+                       sslContext = MakeAsioSslContext(GetCertPath(), GetKeyPath(), GetCaPath());
+               } catch (const std::exception&) {
                        Log(LogWarning, "ElasticsearchWriter")
                                << "Unable to create SSL context.";
-                       throw ex;
+                       throw;
                }
 
-               TlsStream::Ptr tlsStream = new TlsStream(socket, GetHost(), RoleClient, sslContext);
+               stream.first = std::make_shared<AsioTlsStream>(IoEngine::Get().GetIoService(), *sslContext, GetHost());
+       } else {
+               stream.second = std::make_shared<AsioTcpStream>(IoEngine::Get().GetIoService());
+       }
+
+       try {
+               icinga::Connect(tls ? stream.first->lowest_layer() : stream.second->lowest_layer(), GetHost(), GetPort());
+       } catch (const std::exception&) {
+               Log(LogWarning, "ElasticsearchWriter")
+                       << "Can't connect to Elasticsearch on host '" << GetHost() << "' port '" << GetPort() << "'.";
+               throw;
+       }
+
+       if (tls) {
+               auto& tlsStream (stream.first->next_layer());
 
                try {
-                       tlsStream->Handshake();
-               } catch (const std::exception& ex) {
+                       tlsStream.handshake(tlsStream.client);
+               } catch (const std::exception&) {
                        Log(LogWarning, "ElasticsearchWriter")
                                << "TLS handshake with host '" << GetHost() << "' on port " << GetPort() << " failed.";
-                       throw ex;
+                       throw;
                }
-
-               return tlsStream;
-       } else {
-               return new NetworkStream(socket);
        }
+
+       return std::move(stream);
 }
 
 void ElasticsearchWriter::AssertOnWorkQueue()
index cf60044cd9a2655bd1acc0a3ba51d7cd2051100e..45658f57415a7a0a17828688abcdc2f613ed238b 100644 (file)
@@ -8,6 +8,7 @@
 #include "base/configobject.hpp"
 #include "base/workqueue.hpp"
 #include "base/timer.hpp"
+#include "base/tlsstream.hpp"
 
 namespace icinga
 {
@@ -50,7 +51,7 @@ private:
        void Enqueue(const Checkable::Ptr& checkable, const String& type,
                const Dictionary::Ptr& fields, double ts);
 
-       Stream::Ptr Connect();
+       OptionalTlsStream Connect();
        void AssertOnWorkQueue();
        void ExceptionHandler(boost::exception_ptr exp);
        void FlushTimeout();