]> granicus.if.org Git - icinga2/commitdiff
Use new I/O engine in InfluxdbWriter 7134/head
authorAlexander A. Klimov <alexander.klimov@icinga.com>
Tue, 23 Apr 2019 09:25:52 +0000 (11:25 +0200)
committerAlexander A. Klimov <alexander.klimov@icinga.com>
Tue, 23 Apr 2019 09:59:37 +0000 (11:59 +0200)
lib/perfdata/influxdbwriter.cpp
lib/perfdata/influxdbwriter.hpp

index d732a83f219047a74056e086e77e5919391631f4..c99164e4a20a7e06d3311a2c166ba005d74ded6f 100644 (file)
@@ -9,7 +9,9 @@
 #include "icinga/macroprocessor.hpp"
 #include "icinga/icingaapplication.hpp"
 #include "icinga/checkcommand.hpp"
+#include "base/application.hpp"
 #include "base/defer.hpp"
+#include "base/io-engine.hpp"
 #include "base/tcpsocket.hpp"
 #include "base/configtype.hpp"
 #include "base/objectlock.hpp"
 #include "base/tlsutility.hpp"
 #include <boost/algorithm/string.hpp>
 #include <boost/algorithm/string/replace.hpp>
+#include <boost/asio/ssl/context.hpp>
+#include <boost/beast/core/flat_buffer.hpp>
+#include <boost/beast/http/field.hpp>
+#include <boost/beast/http/message.hpp>
+#include <boost/beast/http/parser.hpp>
+#include <boost/beast/http/read.hpp>
+#include <boost/beast/http/status.hpp>
+#include <boost/beast/http/string_body.hpp>
+#include <boost/beast/http/verb.hpp>
+#include <boost/beast/http/write.hpp>
 #include <boost/math/special_functions/fpclassify.hpp>
 #include <boost/regex.hpp>
 #include <boost/scoped_array.hpp>
+#include <memory>
+#include <string>
 #include <utility>
 
 using namespace icinga;
@@ -156,44 +170,51 @@ void InfluxdbWriter::ExceptionHandler(boost::exception_ptr exp)
        //TODO: Close the connection, if we keep it open.
 }
 
-Stream::Ptr InfluxdbWriter::Connect()
+OptionalTlsStream InfluxdbWriter::Connect()
 {
-       TcpSocket::Ptr socket = new TcpSocket();
-
        Log(LogNotice, "InfluxdbWriter")
                << "Reconnecting to InfluxDB on host '" << GetHost() << "' port '" << GetPort() << "'.";
 
-       try {
-               socket->Connect(GetHost(), GetPort());
-       } catch (const std::exception& ex) {
-               Log(LogWarning, "InfluxdbWriter")
-                       << "Can't connect to InfluxDB on host '" << GetHost() << "' port '" << GetPort() << "'.";
-               throw ex;
-       }
+       OptionalTlsStream stream;
+       bool ssl = GetSslEnable();
+
+       if (ssl) {
+               std::shared_ptr<boost::asio::ssl::context> sslContext;
 
-       if (GetSslEnable()) {
-               std::shared_ptr<SSL_CTX> sslContext;
                try {
-                       sslContext = MakeSSLContext(GetSslCert(), GetSslKey(), GetSslCaCert());
+                       sslContext = MakeAsioSslContext(GetSslCert(), GetSslKey(), GetSslCaCert());
                } catch (const std::exception& ex) {
                        Log(LogWarning, "InfluxdbWriter")
                                << "Unable to create SSL context.";
-                       throw ex;
+                       throw;
                }
 
-               TlsStream::Ptr tlsStream = new TlsStream(socket, GetHost(), RoleClient, sslContext);
+               stream.first = std::make_shared<AsioTlsStream>(IoEngine::Get().GetIoService(), *sslContext, GetHost());
+       } else {
+               stream.second = std::make_shared<AsioTcpStream>(IoEngine::Get().GetIoService());
+       }
+
+       try {
+               icinga::Connect(ssl ? stream.first->lowest_layer() : stream.second->lowest_layer(), GetHost(), GetPort());
+       } catch (const std::exception& ex) {
+               Log(LogWarning, "InfluxdbWriter")
+                       << "Can't connect to InfluxDB on host '" << GetHost() << "' port '" << GetPort() << "'.";
+               throw;
+       }
+
+       if (ssl) {
+               auto& tlsStream (stream.first->next_layer());
+
                try {
-                       tlsStream->Handshake();
+                       tlsStream.handshake(tlsStream.client);
                } catch (const std::exception& ex) {
                        Log(LogWarning, "InfluxdbWriter")
                                << "TLS handshake with host '" << GetHost() << "' failed.";
-                       throw ex;
+                       throw;
                }
-
-               return tlsStream;
-       } else {
-               return new NetworkStream(socket);
        }
+
+       return std::move(stream);
 }
 
 void InfluxdbWriter::CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
@@ -429,6 +450,9 @@ void InfluxdbWriter::FlushTimeoutWQ()
 
 void InfluxdbWriter::Flush()
 {
+       namespace beast = boost::beast;
+       namespace http = beast::http;
+
        /* Flush can be called from 1) Timeout 2) Threshold 3) on shutdown/reload. */
        if (m_DataBuffer.empty())
                return;
@@ -439,7 +463,7 @@ void InfluxdbWriter::Flush()
        String body = boost::algorithm::join(m_DataBuffer, "\n");
        m_DataBuffer.clear();
 
-       Stream::Ptr stream;
+       OptionalTlsStream stream;
 
        try {
                stream = Connect();
@@ -449,10 +473,11 @@ void InfluxdbWriter::Flush()
                return;
        }
 
-       if (!stream)
-               return;
-
-       Defer close ([&stream]() { stream->Close(); });
+       Defer s ([&stream]() {
+               if (stream.first) {
+                       stream.first->next_layer().shutdown();
+               }
+       });
 
        Url::Ptr url = new Url();
        url->SetScheme(GetSslEnable() ? "https" : "http");
@@ -470,59 +495,64 @@ void InfluxdbWriter::Flush()
        if (!GetPassword().IsEmpty())
                url->AddQueryElement("p", GetPassword());
 
-       HttpRequest req(stream);
-       req.RequestMethod = "POST";
-       req.RequestUrl = url;
+       http::request<http::string_body> request (http::verb::post, std::string(url->Format(true)), 10);
+
+       request.set(http::field::user_agent, "Icinga/" + Application::GetAppVersion());
+       request.set(http::field::host, url->GetHost() + ":" + url->GetPort());
+
+       request.body() = body;
+       request.set(http::field::content_length, request.body().size());
 
        try {
-               req.WriteBody(body.CStr(), body.GetLength());
-               req.Finish();
+               if (stream.first) {
+                       http::write(*stream.first, request);
+                       stream.first->flush();
+               } else {
+                       http::write(*stream.second, request);
+                       stream.second->flush();
+               }
        } catch (const std::exception& ex) {
                Log(LogWarning, "InfluxdbWriter")
                        << "Cannot write to TCP socket on host '" << GetHost() << "' port '" << GetPort() << "'.";
-               throw ex;
+               throw;
        }
 
-       HttpResponse resp(stream, req);
-       StreamReadContext context;
+       http::parser<false, http::string_body> parser;
+       beast::flat_buffer buf;
 
        try {
-               while (resp.Parse(context, true) && !resp.Complete)
-                       ; /* Do nothing */
+               if (stream.first) {
+                       http::read(*stream.first, buf, parser);
+               } else {
+                       http::read(*stream.second, buf, parser);
+               }
        } catch (const std::exception& ex) {
                Log(LogWarning, "InfluxdbWriter")
                        << "Failed to parse HTTP response from host '" << GetHost() << "' port '" << GetPort() << "': " << DiagnosticInformation(ex);
-               throw ex;
+               throw;
        }
 
-       if (!resp.Complete) {
-               Log(LogWarning, "InfluxdbWriter")
-                       << "Failed to read a complete HTTP response from the InfluxDB server.";
-               return;
-       }
+       auto& response (parser.get());
 
-       if (resp.StatusCode != 204) {
+       if (response.result() != http::status::no_content) {
                Log(LogWarning, "InfluxdbWriter")
-                       << "Unexpected response code: " << resp.StatusCode;
+                       << "Unexpected response code: " << response.result();
 
-               String contentType = resp.Headers->Get("content-type");
+               auto& contentType (response[http::field::content_type]);
                if (contentType != "application/json") {
                        Log(LogWarning, "InfluxdbWriter")
                                << "Unexpected Content-Type: " << contentType;
                        return;
                }
 
-               size_t responseSize = resp.GetBodySize();
-               boost::scoped_array<char> buffer(new char[responseSize + 1]);
-               resp.ReadBody(buffer.get(), responseSize);
-               buffer.get()[responseSize] = '\0';
-
                Dictionary::Ptr jsonResponse;
+               auto& body (response.body());
+
                try {
-                       jsonResponse = JsonDecode(buffer.get());
+                       jsonResponse = JsonDecode(body);
                } catch (...) {
                        Log(LogWarning, "InfluxdbWriter")
-                               << "Unable to parse JSON response:\n" << buffer.get();
+                               << "Unable to parse JSON response:\n" << body;
                        return;
                }
 
@@ -530,8 +560,6 @@ void InfluxdbWriter::Flush()
 
                Log(LogCritical, "InfluxdbWriter")
                        << "InfluxDB error message:\n" << error;
-
-               return;
        }
 }
 
index b3d35d365dd0f3693d616b42cf041de1c165bce0..1f7ab830990f586a2700bd605e1663e0ce9cf856 100644 (file)
@@ -8,6 +8,7 @@
 #include "base/configobject.hpp"
 #include "base/tcpsocket.hpp"
 #include "base/timer.hpp"
+#include "base/tlsstream.hpp"
 #include "base/workqueue.hpp"
 #include <fstream>
 
@@ -51,7 +52,7 @@ private:
        static String EscapeKeyOrTagValue(const String& str);
        static String EscapeValue(const Value& value);
 
-       Stream::Ptr Connect();
+       OptionalTlsStream Connect();
 
        void AssertOnWorkQueue();