#include "base/logger.hpp"
#include "base/utility.hpp"
#include "base/perfdatavalue.hpp"
+#include "base/application.hpp"
#include "base/stream.hpp"
#include "base/networkstream.hpp"
-#include "base/json.hpp"
#include "base/context.hpp"
+#include "base/exception.hpp"
+#include "base/json.hpp"
+#include "base/statsfunction.hpp"
#include <boost/algorithm/string/replace.hpp>
using namespace icinga;
REGISTER_TYPE(GelfWriter);
+REGISTER_STATSFUNCTION(GelfWriter, &GelfWriter::StatsFunc);
+
+GelfWriter::GelfWriter(void)
+ : m_WorkQueue(10000000, 1)
+{ }
+
+void GelfWriter::OnConfigLoaded(void)
+{
+ ObjectImpl<GelfWriter>::OnConfigLoaded();
+
+ m_WorkQueue.SetName("GelfWriter, " + GetName());
+}
+
+void GelfWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata)
+{
+ Dictionary::Ptr nodes = new Dictionary();
+
+ for (const GelfWriter::Ptr& gelfwriter : ConfigType::GetObjectsByType<GelfWriter>()) {
+ size_t workQueueItems = gelfwriter->m_WorkQueue.GetLength();
+ double workQueueItemRate = gelfwriter->m_WorkQueue.GetTaskCount(60) / 60.0;
+
+ Dictionary::Ptr stats = new Dictionary();
+ stats->Set("work_queue_items", workQueueItems);
+ stats->Set("work_queue_item_rate", workQueueItemRate);
+ stats->Set("connected", gelfwriter->GetConnected());
+ stats->Set("source", gelfwriter->GetSource());
+
+ nodes->Set(gelfwriter->GetName(), stats);
+
+ perfdata->Add(new PerfdataValue("gelfwriter_" + gelfwriter->GetName() + "_work_queue_items", workQueueItems));
+ perfdata->Add(new PerfdataValue("gelfwriter_" + gelfwriter->GetName() + "_work_queue_item_rate", workQueueItemRate));
+ }
+
+ status->Set("gelfwriter", nodes);
+}
+
void GelfWriter::Start(bool runtimeCreated)
{
ObjectImpl<GelfWriter>::Start(runtimeCreated);
Log(LogInformation, "GelfWriter")
<< "'" << GetName() << "' started.";
+ /* Register exception handler for WQ tasks. */
+ m_WorkQueue.SetExceptionCallback(boost::bind(&GelfWriter::ExceptionHandler, this, _1));
+
+ /* Timer for reconnecting */
m_ReconnectTimer = new Timer();
m_ReconnectTimer->SetInterval(10);
m_ReconnectTimer->OnTimerExpired.connect(boost::bind(&GelfWriter::ReconnectTimerHandler, this));
m_ReconnectTimer->Start();
m_ReconnectTimer->Reschedule(0);
- // Send check results
- Service::OnNewCheckResult.connect(boost::bind(&GelfWriter::CheckResultHandler, this, _1, _2));
- // Send notifications
- Service::OnNotificationSentToUser.connect(boost::bind(&GelfWriter::NotificationToUserHandler, this, _1, _2, _3, _4, _5, _6, _7, _8));
- // Send state change
- Service::OnStateChange.connect(boost::bind(&GelfWriter::StateChangeHandler, this, _1, _2, _3));
+ /* Register event handlers. */
+ Checkable::OnNewCheckResult.connect(boost::bind(&GelfWriter::CheckResultHandler, this, _1, _2));
+ Checkable::OnNotificationSentToUser.connect(boost::bind(&GelfWriter::NotificationToUserHandler, this, _1, _2, _3, _4, _5, _6, _7, _8));
+ Checkable::OnStateChange.connect(boost::bind(&GelfWriter::StateChangeHandler, this, _1, _2, _3));
}
void GelfWriter::Stop(bool runtimeRemoved)
Log(LogInformation, "GelfWriter")
<< "'" << GetName() << "' stopped.";
+ m_WorkQueue.Join();
+
ObjectImpl<GelfWriter>::Stop(runtimeRemoved);
}
-void GelfWriter::ReconnectTimerHandler(void)
+void GelfWriter::AssertOnWorkQueue(void)
+{
+ ASSERT(m_WorkQueue.IsWorkerThread());
+}
+
+void GelfWriter::ExceptionHandler(boost::exception_ptr exp)
+{
+ Log(LogCritical, "GelfWriter", "Exception during Graylog Gelf operation: Verify that your backend is operational!");
+
+ Log(LogDebug, "GelfWriter")
+ << "Exception during Graylog Gelf operation: " << DiagnosticInformation(exp);
+
+ if (GetConnected()) {
+ m_Stream->Close();
+
+ SetConnected(false);
+ }
+}
+
+void GelfWriter::Reconnect(void)
{
- if (m_Stream)
+ AssertOnWorkQueue();
+
+ double startTime = Utility::GetTime();
+
+ CONTEXT("Reconnecting to Graylog Gelf '" + GetName() + "'");
+
+ SetShouldConnect(true);
+
+ if (GetConnected())
return;
TcpSocket::Ptr socket = new TcpSocket();
Log(LogNotice, "GelfWriter")
- << "Reconnecting to GELF endpoint '" << GetHost() << "' port '" << GetPort() << "'.";
+ << "Reconnecting to Graylog Gelf on host '" << GetHost() << "' port '" << GetPort() << "'.";
try {
socket->Connect(GetHost(), GetPort());
- } catch (std::exception&) {
+ } catch (const std::exception& ex) {
Log(LogCritical, "GelfWriter")
- << "Can't connect to GELF endpoint '" << GetHost() << "' port '" << GetPort() << "'.";
- return;
+ << "Can't connect to Graylog Gelf on host '" << GetHost() << "' port '" << GetPort() << "'.";
+ throw ex;
}
m_Stream = new NetworkStream(socket);
+
+ SetConnected(true);
+
+ Log(LogInformation, "GelfWriter")
+ << "Finished reconnecting to Graylog Gelf in " << std::setw(2) << Utility::GetTime() - startTime << " second(s).";
+}
+
+void GelfWriter::ReconnectTimerHandler(void)
+{
+ m_WorkQueue.Enqueue(boost::bind(&GelfWriter::Reconnect, this), PriorityNormal);
+}
+
+void GelfWriter::Disconnect(void)
+{
+ AssertOnWorkQueue();
+
+ if (!GetConnected())
+ return;
+
+ m_Stream->Close();
+
+ SetConnected(false);
}
void GelfWriter::CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
{
+ m_WorkQueue.Enqueue(boost::bind(&GelfWriter::CheckResultHandlerInternal, this, checkable, cr));
+}
+
+void GelfWriter::CheckResultHandlerInternal(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
+{
+ AssertOnWorkQueue();
+
CONTEXT("GELF Processing check result for '" + checkable->GetName() + "'");
Log(LogDebug, "GelfWriter")
- << "GELF Processing check result for '" << checkable->GetName() << "'";
+ << "Processing check result for '" << checkable->GetName() << "'";
Host::Ptr host;
Service::Ptr service;
tie(host, service) = GetHostService(checkable);
- double ts = Utility::GetTime();
Dictionary::Ptr fields = new Dictionary();
fields->Set("_reachable", checkable->IsReachable());
+ double ts = Utility::GetTime();
+
if (cr) {
fields->Set("_latency", cr->CalculateLatency());
fields->Set("_execution_time", cr->CalculateExecutionTime());
}
void GelfWriter::NotificationToUserHandler(const Notification::Ptr& notification, const Checkable::Ptr& checkable,
- const User::Ptr& user, NotificationType notification_type, CheckResult::Ptr const& cr,
- const String& author, const String& comment_text, const String& command_name)
+ const User::Ptr& user, NotificationType notificationType, CheckResult::Ptr const& cr,
+ const String& author, const String& commentText, const String& commandName)
{
+ m_WorkQueue.Enqueue(boost::bind(&GelfWriter::NotificationToUserHandlerInternal, this,
+ notification, checkable, user, notificationType, cr, author, commentText, commandName));
+}
+
+void GelfWriter::NotificationToUserHandlerInternal(const Notification::Ptr& notification, const Checkable::Ptr& checkable,
+ const User::Ptr& user, NotificationType notificationType, CheckResult::Ptr const& cr,
+ const String& author, const String& commentText, const String& commandName)
+{
+ AssertOnWorkQueue();
+
CONTEXT("GELF Processing notification to all users '" + checkable->GetName() + "'");
Log(LogDebug, "GelfWriter")
- << "GELF Processing notification for '" << checkable->GetName() << "'";
+ << "Processing notification for '" << checkable->GetName() << "'";
Host::Ptr host;
Service::Ptr service;
tie(host, service) = GetHostService(checkable);
- double ts = Utility::GetTime();
- String notification_type_str = Notification::NotificationTypeToString(notification_type);
+ String notificationTypeString = Notification::NotificationTypeToString(notificationType);
- String author_comment = "";
+ String authorComment = "";
- if (notification_type == NotificationCustom || notification_type == NotificationAcknowledgement) {
- author_comment = author + ";" + comment_text;
+ if (notificationType == NotificationCustom || notificationType == NotificationAcknowledgement) {
+ authorComment = author + ";" + commentText;
}
String output;
+ double ts = Utility::GetTime();
if (cr) {
output = CompatUtility::GetCheckResultOutput(cr);
fields->Set("short_message", output);
} else {
fields->Set("_type", "HOST NOTIFICATION");
+ //TODO: why?
fields->Set("short_message", "(" + CompatUtility::GetHostStateString(host) + ")");
}
fields->Set("_state", service ? Service::StateToString(service->GetState()) : Host::StateToString(host->GetState()));
fields->Set("_hostname", host->GetName());
- fields->Set("_command", command_name);
- fields->Set("_notification_type", notification_type_str);
- fields->Set("_comment", author_comment);
+ fields->Set("_command", commandName);
+ fields->Set("_notification_type", notificationTypeString);
+ fields->Set("_comment", authorComment);
SendLogMessage(ComposeGelfMessage(fields, GetSource(), ts));
}
void GelfWriter::StateChangeHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type)
{
+ m_WorkQueue.Enqueue(boost::bind(&GelfWriter::StateChangeHandlerInternal, this, checkable, cr, type));
+}
+
+void GelfWriter::StateChangeHandlerInternal(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type)
+{
+ AssertOnWorkQueue();
+
CONTEXT("GELF Processing state change '" + checkable->GetName() + "'");
Log(LogDebug, "GelfWriter")
- << "GELF Processing state change for '" << checkable->GetName() << "'";
+ << "Processing state change for '" << checkable->GetName() << "'";
Host::Ptr host;
Service::Ptr service;
tie(host, service) = GetHostService(checkable);
- double ts = Utility::GetTime();
Dictionary::Ptr fields = new Dictionary();
fields->Set("_last_hard_state", host->GetLastHardState());
}
+ double ts = Utility::GetTime();
+
if (cr) {
fields->Set("short_message", CompatUtility::GetCheckResultOutput(cr));
fields->Set("full_message", cr->GetOutput());
return JsonEncode(fields);
}
-void GelfWriter::SendLogMessage(const String& gelf)
+void GelfWriter::SendLogMessage(const String& gelfMessage)
{
std::ostringstream msgbuf;
- msgbuf << gelf;
+ msgbuf << gelfMessage;
msgbuf << '\0';
String log = msgbuf.str();
ObjectLock olock(this);
- if (!m_Stream)
+ if (!GetConnected())
return;
try {
- //TODO remove
Log(LogDebug, "GelfWriter")
<< "Sending '" << log << "'.";
+
m_Stream->Write(log.CStr(), log.GetLength());
} catch (const std::exception& ex) {
Log(LogCritical, "GelfWriter")
<< "Cannot write to TCP socket on host '" << GetHost() << "' port '" << GetPort() << "'.";
- m_Stream.reset();
+ throw ex;
}
}
#include "base/configobject.hpp"
#include "base/tcpsocket.hpp"
#include "base/timer.hpp"
+#include "base/workqueue.hpp"
#include <fstream>
namespace icinga
{
/**
- * An Icinga gelf writer.
+ * An Icinga Gelf writer for Graylog.
*
* @ingroup perfdata
*/
DECLARE_OBJECT(GelfWriter);
DECLARE_OBJECTNAME(GelfWriter);
+ GelfWriter(void);
+
+ static void StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata);
+
protected:
+ virtual void OnConfigLoaded(void) override;
virtual void Start(bool runtimeCreated) override;
virtual void Stop(bool runtimeRemoved) override;
private:
Stream::Ptr m_Stream;
+ WorkQueue m_WorkQueue;
Timer::Ptr m_ReconnectTimer;
void CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
+ void CheckResultHandlerInternal(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
void NotificationToUserHandler(const Notification::Ptr& notification, const Checkable::Ptr& checkable,
- const User::Ptr& user, NotificationType notification_type, CheckResult::Ptr const& cr,
- const String& author, const String& comment_text, const String& command_name);
- String ComposeGelfMessage(const Dictionary::Ptr& fields, const String& source, double ts);
+ const User::Ptr& user, NotificationType notificationType, const CheckResult::Ptr& cr,
+ const String& author, const String& commentText, const String& commandName);
+ void NotificationToUserHandlerInternal(const Notification::Ptr& notification, const Checkable::Ptr& checkable,
+ const User::Ptr& user, NotificationType notification_type, const CheckResult::Ptr& cr,
+ const String& author, const String& comment_text, const String& command_name);
void StateChangeHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type);
- void SendLogMessage(const String& gelf);
+ void StateChangeHandlerInternal(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type);
+
+ String ComposeGelfMessage(const Dictionary::Ptr& fields, const String& source, double ts);
+ void SendLogMessage(const String& gelfMessage);
void ReconnectTimerHandler(void);
+
+ void Disconnect(void);
+ void Reconnect(void);
+
+ void AssertOnWorkQueue(void);
+
+ void ExceptionHandler(boost::exception_ptr exp);
};
}