* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
******************************************************************************/
-#include "perfdata/elasticwriter.hpp"
-#include "perfdata/elasticwriter.tcpp"
+#include "perfdata/elasticsearchwriter.hpp"
+#include "perfdata/elasticsearchwriter.tcpp"
#include "remote/url.hpp"
#include "remote/httprequest.hpp"
#include "remote/httpresponse.hpp"
using namespace icinga;
-REGISTER_TYPE(ElasticWriter);
+REGISTER_TYPE(ElasticsearchWriter);
-REGISTER_STATSFUNCTION(ElasticWriter, &ElasticWriter::StatsFunc);
+REGISTER_STATSFUNCTION(ElasticsearchWriter, &ElasticsearchWriter::StatsFunc);
-ElasticWriter::ElasticWriter(void)
+ElasticsearchWriter::ElasticsearchWriter(void)
: m_WorkQueue(10000000, 1)
{ }
-void ElasticWriter::OnConfigLoaded(void)
+void ElasticsearchWriter::OnConfigLoaded(void)
{
- ObjectImpl<ElasticWriter>::OnConfigLoaded();
+ ObjectImpl<ElasticsearchWriter>::OnConfigLoaded();
- m_WorkQueue.SetName("ElasticWriter, " + GetName());
+ m_WorkQueue.SetName("ElasticsearchWriter, " + GetName());
}
-void ElasticWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata)
+void ElasticsearchWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata)
{
Dictionary::Ptr nodes = new Dictionary();
- for (const ElasticWriter::Ptr& elasticwriter : ConfigType::GetObjectsByType<ElasticWriter>()) {
- size_t workQueueItems = elasticwriter->m_WorkQueue.GetLength();
- double workQueueItemRate = elasticwriter->m_WorkQueue.GetTaskCount(60) / 60.0;
+ for (const ElasticsearchWriter::Ptr& elasticsearchwriter : ConfigType::GetObjectsByType<ElasticsearchWriter>()) {
+ size_t workQueueItems = elasticsearchwriter->m_WorkQueue.GetLength();
+ double workQueueItemRate = elasticsearchwriter->m_WorkQueue.GetTaskCount(60) / 60.0;
Dictionary::Ptr stats = new Dictionary();
stats->Set("work_queue_items", workQueueItems);
stats->Set("work_queue_item_rate", workQueueItemRate);
- nodes->Set(elasticwriter->GetName(), stats);
+ nodes->Set(elasticsearchwriter->GetName(), stats);
- perfdata->Add(new PerfdataValue("elasticwriter_" + elasticwriter->GetName() + "_work_queue_items", workQueueItems));
- perfdata->Add(new PerfdataValue("elasticwriter_" + elasticwriter->GetName() + "_work_queue_item_rate", workQueueItemRate));
+ perfdata->Add(new PerfdataValue("elasticsearchwriter_" + elasticsearchwriter->GetName() + "_work_queue_items", workQueueItems));
+ perfdata->Add(new PerfdataValue("elasticsearchwriter_" + elasticsearchwriter->GetName() + "_work_queue_item_rate", workQueueItemRate));
}
- status->Set("elasticwriter", nodes);
+ status->Set("elasticsearchwriter", nodes);
}
-void ElasticWriter::Start(bool runtimeCreated)
+void ElasticsearchWriter::Start(bool runtimeCreated)
{
- ObjectImpl<ElasticWriter>::Start(runtimeCreated);
+ ObjectImpl<ElasticsearchWriter>::Start(runtimeCreated);
m_EventPrefix = "icinga2.event.";
- Log(LogInformation, "ElasticWriter")
+ Log(LogInformation, "ElasticsearchWriter")
<< "'" << GetName() << "' started.";
- m_WorkQueue.SetExceptionCallback(boost::bind(&ElasticWriter::ExceptionHandler, this, _1));
+ m_WorkQueue.SetExceptionCallback(boost::bind(&ElasticsearchWriter::ExceptionHandler, this, _1));
/* Setup timer for periodically flushing m_DataBuffer */
m_FlushTimer = new Timer();
m_FlushTimer->SetInterval(GetFlushInterval());
- m_FlushTimer->OnTimerExpired.connect(boost::bind(&ElasticWriter::FlushTimeout, this));
+ m_FlushTimer->OnTimerExpired.connect(boost::bind(&ElasticsearchWriter::FlushTimeout, this));
m_FlushTimer->Start();
m_FlushTimer->Reschedule(0);
/* Register for new metrics. */
- Checkable::OnNewCheckResult.connect(boost::bind(&ElasticWriter::CheckResultHandler, this, _1, _2));
- Checkable::OnStateChange.connect(boost::bind(&ElasticWriter::StateChangeHandler, this, _1, _2, _3));
- Checkable::OnNotificationSentToAllUsers.connect(boost::bind(&ElasticWriter::NotificationSentToAllUsersHandler, this, _1, _2, _3, _4, _5, _6, _7));
+ Checkable::OnNewCheckResult.connect(boost::bind(&ElasticsearchWriter::CheckResultHandler, this, _1, _2));
+ Checkable::OnStateChange.connect(boost::bind(&ElasticsearchWriter::StateChangeHandler, this, _1, _2, _3));
+ Checkable::OnNotificationSentToAllUsers.connect(boost::bind(&ElasticsearchWriter::NotificationSentToAllUsersHandler, this, _1, _2, _3, _4, _5, _6, _7));
}
-void ElasticWriter::Stop(bool runtimeRemoved)
+void ElasticsearchWriter::Stop(bool runtimeRemoved)
{
- Log(LogInformation, "ElasticWriter")
+ Log(LogInformation, "ElasticsearchWriter")
<< "'" << GetName() << "' stopped.";
m_WorkQueue.Join();
- ObjectImpl<ElasticWriter>::Stop(runtimeRemoved);
+ ObjectImpl<ElasticsearchWriter>::Stop(runtimeRemoved);
}
-void ElasticWriter::AddCheckResult(const Dictionary::Ptr& fields, const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
+void ElasticsearchWriter::AddCheckResult(const Dictionary::Ptr& fields, const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
{
String prefix = "check_result.";
try {
pdv = PerfdataValue::Parse(val);
} catch (const std::exception&) {
- Log(LogWarning, "ElasticWriter")
+ Log(LogWarning, "ElasticsearchWriter")
<< "Ignoring invalid perfdata value: '" << val << "' for object '"
<< checkable->GetName() << "'.";
}
}
}
-void ElasticWriter::CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
+void ElasticsearchWriter::CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
{
- m_WorkQueue.Enqueue(boost::bind(&ElasticWriter::InternalCheckResultHandler, this, checkable, cr));
+ m_WorkQueue.Enqueue(boost::bind(&ElasticsearchWriter::InternalCheckResultHandler, this, checkable, cr));
}
-void ElasticWriter::InternalCheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
+void ElasticsearchWriter::InternalCheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
{
AssertOnWorkQueue();
Enqueue("checkresult", fields, ts);
}
-void ElasticWriter::StateChangeHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type)
+void ElasticsearchWriter::StateChangeHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type)
{
- m_WorkQueue.Enqueue(boost::bind(&ElasticWriter::StateChangeHandlerInternal, this, checkable, cr, type));
+ m_WorkQueue.Enqueue(boost::bind(&ElasticsearchWriter::StateChangeHandlerInternal, this, checkable, cr, type));
}
-void ElasticWriter::StateChangeHandlerInternal(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type)
+void ElasticsearchWriter::StateChangeHandlerInternal(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type)
{
AssertOnWorkQueue();
Enqueue("statechange", fields, ts);
}
-void ElasticWriter::NotificationSentToAllUsersHandler(const Notification::Ptr& notification,
+void ElasticsearchWriter::NotificationSentToAllUsersHandler(const Notification::Ptr& notification,
const Checkable::Ptr& checkable, const std::set<User::Ptr>& users, NotificationType type,
const CheckResult::Ptr& cr, const String& author, const String& text)
{
- m_WorkQueue.Enqueue(boost::bind(&ElasticWriter::NotificationSentToAllUsersHandlerInternal, this,
+ m_WorkQueue.Enqueue(boost::bind(&ElasticsearchWriter::NotificationSentToAllUsersHandlerInternal, this,
notification, checkable, users, type, cr, author, text));
}
-void ElasticWriter::NotificationSentToAllUsersHandlerInternal(const Notification::Ptr& notification,
+void ElasticsearchWriter::NotificationSentToAllUsersHandlerInternal(const Notification::Ptr& notification,
const Checkable::Ptr& checkable, const std::set<User::Ptr>& users, NotificationType type,
const CheckResult::Ptr& cr, const String& author, const String& text)
{
CONTEXT("Elasticwriter processing notification to all users '" + checkable->GetName() + "'");
- Log(LogDebug, "ElasticWriter")
+ Log(LogDebug, "ElasticsearchWriter")
<< "Processing notification for '" << checkable->GetName() << "'";
Host::Ptr host;
Enqueue("notification", fields, ts);
}
-void ElasticWriter::Enqueue(String type, const Dictionary::Ptr& fields, double ts)
+void ElasticsearchWriter::Enqueue(String type, const Dictionary::Ptr& fields, double ts)
{
/* Atomically buffer the data point. */
boost::mutex::scoped_lock lock(m_DataBufferMutex);
String indexBody = "{ \"index\" : { \"_type\" : \"" + eventType + "\" } }\n";
String fieldsBody = JsonEncode(fields);
- Log(LogDebug, "ElasticWriter")
+ Log(LogDebug, "ElasticsearchWriter")
<< "Add to fields to message list: '" << fieldsBody << "'.";
m_DataBuffer.push_back(indexBody + fieldsBody);
/* Flush if we've buffered too much to prevent excessive memory use. */
if (static_cast<int>(m_DataBuffer.size()) >= GetFlushThreshold()) {
- Log(LogDebug, "ElasticWriter")
+ Log(LogDebug, "ElasticsearchWriter")
<< "Data buffer overflow writing " << m_DataBuffer.size() << " data points";
Flush();
}
}
-void ElasticWriter::FlushTimeout(void)
+void ElasticsearchWriter::FlushTimeout(void)
{
/* Prevent new data points from being added to the array, there is a
* race condition where they could disappear.
/* Flush if there are any data available. */
if (m_DataBuffer.size() > 0) {
- Log(LogDebug, "ElasticWriter")
+ Log(LogDebug, "ElasticsearchWriter")
<< "Timer expired writing " << m_DataBuffer.size() << " data points";
Flush();
}
}
-void ElasticWriter::Flush(void)
+void ElasticsearchWriter::Flush(void)
{
/* Ensure you hold a lock against m_DataBuffer so that things
* don't go missing after creating the body and clearing the buffer.
SendRequest(body);
}
-void ElasticWriter::SendRequest(const String& body)
+void ElasticsearchWriter::SendRequest(const String& body)
{
Url::Ptr url = new Url();
req.RequestUrl = url;
/* Don't log the request body to debug log, this is already done above. */
- Log(LogDebug, "ElasticWriter")
+ Log(LogDebug, "ElasticsearchWriter")
<< "Sending " << req.RequestMethod << " request" << ((!username.IsEmpty() && !password.IsEmpty()) ? " with basic auth" : "" )
<< " to '" << url->Format() << "'.";
req.WriteBody(body.CStr(), body.GetLength());
req.Finish();
} catch (const std::exception& ex) {
- Log(LogWarning, "ElasticWriter")
+ Log(LogWarning, "ElasticsearchWriter")
<< "Cannot write to HTTP API on host '" << GetHost() << "' port '" << GetPort() << "'.";
throw ex;
}
try {
resp.Parse(context, true);
} catch (const std::exception& ex) {
- Log(LogWarning, "ElasticWriter")
+ Log(LogWarning, "ElasticsearchWriter")
<< "Cannot read from HTTP API on host '" << GetHost() << "' port '" << GetPort() << "'.";
throw ex;
}
if (resp.StatusCode == 401) {
/* More verbose error logging with Elasticsearch is hidden behind a proxy. */
if (!username.IsEmpty() && !password.IsEmpty()) {
- Log(LogCritical, "ElasticWriter")
+ Log(LogCritical, "ElasticsearchWriter")
<< "401 Unauthorized. Please ensure that the user '" << username
<< "' is able to authenticate against the HTTP API/Proxy.";
} else {
- Log(LogCritical, "ElasticWriter")
+ Log(LogCritical, "ElasticsearchWriter")
<< "401 Unauthorized. The HTTP API requires authentication but no username/password has been configured.";
}
return;
}
- Log(LogWarning, "ElasticWriter")
+ Log(LogWarning, "ElasticsearchWriter")
<< "Unexpected response code " << resp.StatusCode;
/* Finish parsing the headers and body. */
String contentType = resp.Headers->Get("content-type");
if (contentType != "application/json") {
- Log(LogWarning, "ElasticWriter")
+ Log(LogWarning, "ElasticsearchWriter")
<< "Unexpected Content-Type: " << contentType;
return;
}
try {
jsonResponse = JsonDecode(buffer.get());
} catch (...) {
- Log(LogWarning, "ElasticWriter")
+ Log(LogWarning, "ElasticsearchWriter")
<< "Unable to parse JSON response:\n" << buffer.get();
return;
}
String error = jsonResponse->Get("error");
- Log(LogCritical, "ElasticWriter")
+ Log(LogCritical, "ElasticsearchWriter")
<< "Elasticsearch error message:\n" << error;
}
}
-Stream::Ptr ElasticWriter::Connect(void)
+Stream::Ptr ElasticsearchWriter::Connect(void)
{
TcpSocket::Ptr socket = new TcpSocket();
- Log(LogNotice, "ElasticWriter")
+ Log(LogNotice, "ElasticsearchWriter")
<< "Connecting to Elasticsearch on host '" << GetHost() << "' port '" << GetPort() << "'.";
try {
socket->Connect(GetHost(), GetPort());
} catch (const std::exception& ex) {
- Log(LogWarning, "ElasticWriter")
+ Log(LogWarning, "ElasticsearchWriter")
<< "Can't connect to Elasticsearch on host '" << GetHost() << "' port '" << GetPort() << "'.";
throw ex;
}
try {
sslContext = MakeSSLContext(GetCertPath(), GetKeyPath(), GetCaPath());
} catch (const std::exception& ex) {
- Log(LogWarning, "ElasticWriter")
+ Log(LogWarning, "ElasticsearchWriter")
<< "Unable to create SSL context.";
throw ex;
}
try {
tlsStream->Handshake();
} catch (const std::exception& ex) {
- Log(LogWarning, "ElasticWriter")
+ Log(LogWarning, "ElasticsearchWriter")
<< "TLS handshake with host '" << GetHost() << "' on port " << GetPort() << " failed.";
throw ex;
}
}
}
-void ElasticWriter::AssertOnWorkQueue(void)
+void ElasticsearchWriter::AssertOnWorkQueue(void)
{
ASSERT(m_WorkQueue.IsWorkerThread());
}
-void ElasticWriter::ExceptionHandler(boost::exception_ptr exp)
+void ElasticsearchWriter::ExceptionHandler(boost::exception_ptr exp)
{
- Log(LogCritical, "ElasticWriter", "Exception during Elastic operation: Verify that your backend is operational!");
+ Log(LogCritical, "ElasticsearchWriter", "Exception during Elastic operation: Verify that your backend is operational!");
- Log(LogDebug, "ElasticWriter")
+ Log(LogDebug, "ElasticsearchWriter")
<< "Exception during Elasticsearch operation: " << DiagnosticInformation(exp);
}
-String ElasticWriter::FormatTimestamp(double ts)
+String ElasticsearchWriter::FormatTimestamp(double ts)
{
/* The date format must match the default dynamic date detection
* pattern in indexes. This enables applications like Kibana to