int RingBuffer::UpdateAndGetValues(RingBuffer::SizeType tv, RingBuffer::SizeType span)
{
- InsertValue(tv, 0);
-
ObjectLock olock(this);
+ InsertValue(tv, 0);
+
if (span > m_Slots.size())
span = m_Slots.size();
bool connected = false;
double zoneLag = 0;
+ double lastMessageSent = 0;
+ double lastMessageReceived = 0;
+ double messagesSentPerSecond = 0;
+ double messagesReceivedPerSecond = 0;
+ double bytesSentPerSecond = 0;
+ double bytesReceivedPerSecond = 0;
+
for (const Endpoint::Ptr& endpoint : zone->GetEndpoints()) {
if (endpoint->GetConnected())
connected = true;
if (eplag > 0 && eplag > zoneLag)
zoneLag = eplag;
+
+ if (endpoint->GetLastMessageSent() > lastMessageSent)
+ lastMessageSent = endpoint->GetLastMessageSent();
+
+ if (endpoint->GetLastMessageReceived() > lastMessageReceived)
+ lastMessageReceived = endpoint->GetLastMessageReceived();
+
+ messagesSentPerSecond += endpoint->GetMessagesSentPerSecond();
+ messagesReceivedPerSecond += endpoint->GetMessagesReceivedPerSecond();
+ bytesSentPerSecond += endpoint->GetBytesSentPerSecond();
+ bytesReceivedPerSecond += endpoint->GetBytesReceivedPerSecond();
}
if (!connected) {
Array::Ptr perfdata = new Array();
perfdata->Add(new PerfdataValue("slave_lag", zoneLag, false, "s", lagWarning, lagCritical));
+ perfdata->Add(new PerfdataValue("last_messages_sent", lastMessageSent));
+ perfdata->Add(new PerfdataValue("last_messages_received", lastMessageReceived));
+ perfdata->Add(new PerfdataValue("sum_messages_sent_per_second", messagesSentPerSecond));
+ perfdata->Add(new PerfdataValue("sum_messages_received_per_second", messagesReceivedPerSecond));
+ perfdata->Add(new PerfdataValue("sum_bytes_sent_per_second", bytesSentPerSecond));
+ perfdata->Add(new PerfdataValue("sum_bytes_received_per_second", bytesReceivedPerSecond));
cr->SetPerformanceData(perfdata);
checkable->ProcessCheckResult(cr);
perfdata->Add(new PerfdataValue("num_hosts_in_downtime", hs.hosts_in_downtime));
perfdata->Add(new PerfdataValue("num_hosts_acknowledged", hs.hosts_acknowledged));
+ std::vector<Endpoint::Ptr> endpoints = ConfigType::GetObjectsByType<Endpoint>();
+
+ double lastMessageSent = 0;
+ double lastMessageReceived = 0;
+ double messagesSentPerSecond = 0;
+ double messagesReceivedPerSecond = 0;
+ double bytesSentPerSecond = 0;
+ double bytesReceivedPerSecond = 0;
+
+ for (Endpoint::Ptr endpoint : endpoints)
+ {
+ if (endpoint->GetLastMessageSent() > lastMessageSent)
+ lastMessageSent = endpoint->GetLastMessageSent();
+
+ if (endpoint->GetLastMessageReceived() > lastMessageReceived)
+ lastMessageReceived = endpoint->GetLastMessageReceived();
+
+ messagesSentPerSecond += endpoint->GetMessagesSentPerSecond();
+ messagesReceivedPerSecond += endpoint->GetMessagesReceivedPerSecond();
+ bytesSentPerSecond += endpoint->GetBytesSentPerSecond();
+ bytesReceivedPerSecond += endpoint->GetBytesReceivedPerSecond();
+ }
+
+ perfdata->Add(new PerfdataValue("last_messages_sent", lastMessageSent));
+ perfdata->Add(new PerfdataValue("last_messages_received", lastMessageReceived));
+ perfdata->Add(new PerfdataValue("sum_messages_sent_per_second", messagesSentPerSecond));
+ perfdata->Add(new PerfdataValue("sum_messages_received_per_second", messagesReceivedPerSecond));
+ perfdata->Add(new PerfdataValue("sum_bytes_sent_per_second", bytesSentPerSecond));
+ perfdata->Add(new PerfdataValue("sum_bytes_received_per_second", bytesReceivedPerSecond));
+
cr->SetOutput("Icinga 2 has been running for " + Utility::FormatDuration(uptime) +
". Version: " + Application::GetAppVersion());
cr->SetPerformanceData(perfdata);
}
try {
- NetString::WriteStringToStream(client->GetStream(), pmessage->Get("message"));
+ size_t bytesSent = NetString::WriteStringToStream(client->GetStream(), pmessage->Get("message"));
+ endpoint->AddMessageSent(bytesSent);
count++;
} catch (const std::exception& ex) {
Log(LogWarning, "ApiListener")
lmessage->Set("method", "log::SetLogPosition");
lmessage->Set("params", lparams);
- JsonRpc::SendMessage(client->GetStream(), lmessage);
+ size_t bytesSent = JsonRpc::SendMessage(client->GetStream(), lmessage);
+ endpoint->AddMessageSent(bytesSent);
}
}
boost::signals2::signal<void(const Endpoint::Ptr&, const JsonRpcConnection::Ptr&)> Endpoint::OnConnected;
boost::signals2::signal<void(const Endpoint::Ptr&, const JsonRpcConnection::Ptr&)> Endpoint::OnDisconnected;
+Endpoint::Endpoint(void)
+ : m_MessagesSent(60), m_BytesSent(60), m_MessagesReceived(60), m_BytesReceived(60)
+{ }
+
void Endpoint::OnAllConfigLoaded(void)
{
ObjectImpl<Endpoint>::OnAllConfigLoaded();
return listener->GetLocalEndpoint();
}
+
+void Endpoint::AddMessageSent(int bytes)
+{
+ double time = Utility::GetTime();
+ m_MessagesSent.InsertValue(time, 1);
+ m_BytesSent.InsertValue(time, bytes);
+ SetLastMessageSent(time);
+}
+
+void Endpoint::AddMessageReceived(int bytes)
+{
+ double time = Utility::GetTime();
+ m_MessagesReceived.InsertValue(time, 1);
+ m_BytesReceived.InsertValue(time, bytes);
+ SetLastMessageReceived(time);
+}
+
+double Endpoint::GetMessagesSentPerSecond(void) const
+{
+ return m_MessagesSent.CalculateRate(Utility::GetTime(), 60);
+}
+
+double Endpoint::GetMessagesReceivedPerSecond(void) const
+{
+ return m_MessagesReceived.CalculateRate(Utility::GetTime(), 60);
+}
+
+double Endpoint::GetBytesSentPerSecond(void) const
+{
+ return m_BytesSent.CalculateRate(Utility::GetTime(), 60);
+}
+
+double Endpoint::GetBytesReceivedPerSecond(void) const
+{
+ return m_BytesReceived.CalculateRate(Utility::GetTime(), 60);
+}
#include "remote/i2-remote.hpp"
#include "remote/endpoint.thpp"
+#include "base/ringbuffer.hpp"
#include <set>
namespace icinga
DECLARE_OBJECT(Endpoint);
DECLARE_OBJECTNAME(Endpoint);
+ Endpoint(void);
+
static boost::signals2::signal<void(const Endpoint::Ptr&, const intrusive_ptr<JsonRpcConnection>&)> OnConnected;
static boost::signals2::signal<void(const Endpoint::Ptr&, const intrusive_ptr<JsonRpcConnection>&)> OnDisconnected;
void SetCachedZone(const intrusive_ptr<Zone>& zone);
+ void AddMessageSent(int bytes);
+ void AddMessageReceived(int bytes);
+
+ double GetMessagesSentPerSecond(void) const override;
+ double GetMessagesReceivedPerSecond(void) const override;
+
+ double GetBytesSentPerSecond(void) const override;
+ double GetBytesReceivedPerSecond(void) const override;
+
protected:
virtual void OnAllConfigLoaded(void) override;
mutable boost::mutex m_ClientsLock;
std::set<intrusive_ptr<JsonRpcConnection> > m_Clients;
intrusive_ptr<Zone> m_Zone;
+
+ mutable RingBuffer m_MessagesSent;
+ mutable RingBuffer m_MessagesReceived;
+ mutable RingBuffer m_BytesSent;
+ mutable RingBuffer m_BytesReceived;
};
}
[no_user_modify, no_storage] bool connected {
get;
};
+
+ Timestamp last_message_sent;
+ Timestamp last_message_received;
+
+ [no_user_modify, no_storage] double messages_sent_per_second {
+ get;
+ };
+
+ [no_user_modify, no_storage] double messages_received_per_second {
+ get;
+ };
+
+ [no_user_modify, no_storage] double bytes_sent_per_second {
+ get;
+ };
+
+ [no_user_modify, no_storage] double bytes_received_per_second {
+ get;
+ };
};
}
ObjectLock olock(m_Stream);
if (m_Stream->IsEof())
return;
- JsonRpc::SendMessage(m_Stream, message);
+ size_t bytesSent = JsonRpc::SendMessage(m_Stream, message);
+ m_Endpoint->AddMessageSent(bytesSent);
} catch (const std::exception& ex) {
std::ostringstream info;
info << "Error while sending JSON-RPC message for identity '" << m_Identity << "'";
origin->FromZone = m_Endpoint->GetZone();
else
origin->FromZone = Zone::GetByName(message->Get("originZone"));
+
+ m_Endpoint->AddMessageReceived(jsonString.GetLength());
}
Value vmethod;