#include "icinga/macroprocessor.hpp"
#include "icinga/icingaapplication.hpp"
#include "icinga/checkcommand.hpp"
+#include "base/application.hpp"
#include "base/defer.hpp"
+#include "base/io-engine.hpp"
#include "base/tcpsocket.hpp"
#include "base/configtype.hpp"
#include "base/objectlock.hpp"
#include "base/tlsutility.hpp"
#include <boost/algorithm/string.hpp>
#include <boost/algorithm/string/replace.hpp>
+#include <boost/asio/ssl/context.hpp>
+#include <boost/beast/core/flat_buffer.hpp>
+#include <boost/beast/http/field.hpp>
+#include <boost/beast/http/message.hpp>
+#include <boost/beast/http/parser.hpp>
+#include <boost/beast/http/read.hpp>
+#include <boost/beast/http/status.hpp>
+#include <boost/beast/http/string_body.hpp>
+#include <boost/beast/http/verb.hpp>
+#include <boost/beast/http/write.hpp>
#include <boost/math/special_functions/fpclassify.hpp>
#include <boost/regex.hpp>
#include <boost/scoped_array.hpp>
+#include <memory>
+#include <string>
#include <utility>
using namespace icinga;
//TODO: Close the connection, if we keep it open.
}
-Stream::Ptr InfluxdbWriter::Connect()
+OptionalTlsStream InfluxdbWriter::Connect()
{
- TcpSocket::Ptr socket = new TcpSocket();
-
Log(LogNotice, "InfluxdbWriter")
<< "Reconnecting to InfluxDB on host '" << GetHost() << "' port '" << GetPort() << "'.";
- try {
- socket->Connect(GetHost(), GetPort());
- } catch (const std::exception& ex) {
- Log(LogWarning, "InfluxdbWriter")
- << "Can't connect to InfluxDB on host '" << GetHost() << "' port '" << GetPort() << "'.";
- throw ex;
- }
+ OptionalTlsStream stream;
+ bool ssl = GetSslEnable();
+
+ if (ssl) {
+ std::shared_ptr<boost::asio::ssl::context> sslContext;
- if (GetSslEnable()) {
- std::shared_ptr<SSL_CTX> sslContext;
try {
- sslContext = MakeSSLContext(GetSslCert(), GetSslKey(), GetSslCaCert());
+ sslContext = MakeAsioSslContext(GetSslCert(), GetSslKey(), GetSslCaCert());
} catch (const std::exception& ex) {
Log(LogWarning, "InfluxdbWriter")
<< "Unable to create SSL context.";
- throw ex;
+ throw;
}
- TlsStream::Ptr tlsStream = new TlsStream(socket, GetHost(), RoleClient, sslContext);
+ stream.first = std::make_shared<AsioTlsStream>(IoEngine::Get().GetIoService(), *sslContext, GetHost());
+ } else {
+ stream.second = std::make_shared<AsioTcpStream>(IoEngine::Get().GetIoService());
+ }
+
+ try {
+ icinga::Connect(ssl ? stream.first->lowest_layer() : stream.second->lowest_layer(), GetHost(), GetPort());
+ } catch (const std::exception& ex) {
+ Log(LogWarning, "InfluxdbWriter")
+ << "Can't connect to InfluxDB on host '" << GetHost() << "' port '" << GetPort() << "'.";
+ throw;
+ }
+
+ if (ssl) {
+ auto& tlsStream (stream.first->next_layer());
+
try {
- tlsStream->Handshake();
+ tlsStream.handshake(tlsStream.client);
} catch (const std::exception& ex) {
Log(LogWarning, "InfluxdbWriter")
<< "TLS handshake with host '" << GetHost() << "' failed.";
- throw ex;
+ throw;
}
-
- return tlsStream;
- } else {
- return new NetworkStream(socket);
}
+
+ return std::move(stream);
}
void InfluxdbWriter::CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
void InfluxdbWriter::Flush()
{
+ namespace beast = boost::beast;
+ namespace http = beast::http;
+
/* Flush can be called from 1) Timeout 2) Threshold 3) on shutdown/reload. */
if (m_DataBuffer.empty())
return;
String body = boost::algorithm::join(m_DataBuffer, "\n");
m_DataBuffer.clear();
- Stream::Ptr stream;
+ OptionalTlsStream stream;
try {
stream = Connect();
return;
}
- if (!stream)
- return;
-
- Defer close ([&stream]() { stream->Close(); });
+ Defer s ([&stream]() {
+ if (stream.first) {
+ stream.first->next_layer().shutdown();
+ }
+ });
Url::Ptr url = new Url();
url->SetScheme(GetSslEnable() ? "https" : "http");
if (!GetPassword().IsEmpty())
url->AddQueryElement("p", GetPassword());
- HttpRequest req(stream);
- req.RequestMethod = "POST";
- req.RequestUrl = url;
+ http::request<http::string_body> request (http::verb::post, std::string(url->Format(true)), 10);
+
+ request.set(http::field::user_agent, "Icinga/" + Application::GetAppVersion());
+ request.set(http::field::host, url->GetHost() + ":" + url->GetPort());
+
+ request.body() = body;
+ request.set(http::field::content_length, request.body().size());
try {
- req.WriteBody(body.CStr(), body.GetLength());
- req.Finish();
+ if (stream.first) {
+ http::write(*stream.first, request);
+ stream.first->flush();
+ } else {
+ http::write(*stream.second, request);
+ stream.second->flush();
+ }
} catch (const std::exception& ex) {
Log(LogWarning, "InfluxdbWriter")
<< "Cannot write to TCP socket on host '" << GetHost() << "' port '" << GetPort() << "'.";
- throw ex;
+ throw;
}
- HttpResponse resp(stream, req);
- StreamReadContext context;
+ http::parser<false, http::string_body> parser;
+ beast::flat_buffer buf;
try {
- while (resp.Parse(context, true) && !resp.Complete)
- ; /* Do nothing */
+ if (stream.first) {
+ http::read(*stream.first, buf, parser);
+ } else {
+ http::read(*stream.second, buf, parser);
+ }
} catch (const std::exception& ex) {
Log(LogWarning, "InfluxdbWriter")
<< "Failed to parse HTTP response from host '" << GetHost() << "' port '" << GetPort() << "': " << DiagnosticInformation(ex);
- throw ex;
+ throw;
}
- if (!resp.Complete) {
- Log(LogWarning, "InfluxdbWriter")
- << "Failed to read a complete HTTP response from the InfluxDB server.";
- return;
- }
+ auto& response (parser.get());
- if (resp.StatusCode != 204) {
+ if (response.result() != http::status::no_content) {
Log(LogWarning, "InfluxdbWriter")
- << "Unexpected response code: " << resp.StatusCode;
+ << "Unexpected response code: " << response.result();
- String contentType = resp.Headers->Get("content-type");
+ auto& contentType (response[http::field::content_type]);
if (contentType != "application/json") {
Log(LogWarning, "InfluxdbWriter")
<< "Unexpected Content-Type: " << contentType;
return;
}
- size_t responseSize = resp.GetBodySize();
- boost::scoped_array<char> buffer(new char[responseSize + 1]);
- resp.ReadBody(buffer.get(), responseSize);
- buffer.get()[responseSize] = '\0';
-
Dictionary::Ptr jsonResponse;
+ auto& body (response.body());
+
try {
- jsonResponse = JsonDecode(buffer.get());
+ jsonResponse = JsonDecode(body);
} catch (...) {
Log(LogWarning, "InfluxdbWriter")
- << "Unable to parse JSON response:\n" << buffer.get();
+ << "Unable to parse JSON response:\n" << body;
return;
}
Log(LogCritical, "InfluxdbWriter")
<< "InfluxDB error message:\n" << error;
-
- return;
}
}