Made socket error handling more robust.
}
}
+void ConfigObject::SetPropertyInteger(const string& name, int value)
+{
+ char valueString[20];
+ sprintf(valueString, "%d", value);
+
+ SetProperty(name, string(valueString));
+}
+
+void ConfigObject::SetPropertyDouble(const string& name, double value)
+{
+ char valueString[20];
+ sprintf(valueString, "%f", value);
+
+ SetProperty(name, string(valueString));
+}
+
bool ConfigObject::GetProperty(const string& name, string *value) const
{
map<string, string>::const_iterator vi = Properties.find(name);
void Socket::Start(void)
{
+ OnException += bind_weak(&Socket::ExceptionEventHandler, shared_from_this());
+
Sockets.push_front(static_pointer_cast<Socket>(shared_from_this()));
}
Stop();
}
+string Socket::FormatErrorCode(int code)
+{
+ char *message;
+ string result = "Unknown socket error.";
+
+#ifdef _WIN32
+ if (FormatMessage(FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM, NULL, code, 0, (char *)&message, 0, NULL) != 0) {
+ result = string(message);
+ LocalFree(message);
+ }
+#else /* _WIN32 */
+ if (code != 0)
+ message = strerror(code);
+
+ result = string(message);
+#endif /* _WIN32 */
+
+ return result;
+}
+
+int Socket::ExceptionEventHandler(EventArgs::Ptr ea)
+{
+ int opt;
+ socklen_t optlen = sizeof(opt);
+
+ int rc = getsockopt(GetFD(), SOL_SOCKET, SO_ERROR, (char *)&opt, &optlen);
+
+ if (rc < 0) {
+ Close();
+ return 0;
+ }
+
+ if (opt != 0) {
+ SocketErrorEventArgs::Ptr ea = make_shared<SocketErrorEventArgs>();
+ ea->Code = opt;
+ ea->Message = FormatErrorCode(opt);
+ Close();
+
+ }
+
+ return 0;
+}
+
void Socket::CloseAllSockets(void)
{
for (list<Socket::WeakPtr>::iterator i = Sockets.begin(); i != Sockets.end(); ) {
namespace icinga {
+struct SocketErrorEventArgs : public EventArgs
+{
+ typedef shared_ptr<SocketErrorEventArgs> Ptr;
+ typedef weak_ptr<SocketErrorEventArgs> WeakPtr;
+
+ int Code;
+ string Message;
+};
+
class Socket : public Object
{
private:
SOCKET m_FD;
+ int ExceptionEventHandler(EventArgs::Ptr ea);
+
+ string FormatErrorCode(int errorCode);
+
protected:
Socket(void);
event<EventArgs::Ptr> OnWritable;
event<EventArgs::Ptr> OnException;
+ event<SocketErrorEventArgs::Ptr> OnError;
event<EventArgs::Ptr> OnClosed;
virtual bool WantsToRead(void) const;
{
m_SendQueue = make_shared<FIFO>();
m_RecvQueue = make_shared<FIFO>();
+
+ m_PeerPort = 0;
}
void TCPClient::Start(void)
OnWritable += bind_weak(&TCPClient::WritableEventHandler, shared_from_this());
}
+void TCPClient::Connect(const string& hostname, unsigned short port)
+{
+ hostent *hent;
+ sockaddr_in sin;
+
+ memset(&sin, 0, sizeof(sin));
+ sin.sin_family = AF_INET;
+ sin.sin_port = htons(port);
+
+ hent = gethostbyname(hostname.c_str());
+
+ if (hent != NULL)
+ sin.sin_addr.s_addr = ((in_addr *)hent->h_addr_list[0])->s_addr;
+ else
+ sin.sin_addr.s_addr = inet_addr(hostname.c_str());
+
+ int rc = connect(GetFD(), (sockaddr *)&sin, sizeof(sin));
+
+#ifdef _WIN32
+ if (rc < 0 && WSAGetLastError() != WSAEWOULDBLOCK)
+#else /* _WIN32 */
+ if (rc < 0 && errno != EINPROGRESS)
+#endif /* _WIN32 */
+ Close();
+
+ m_PeerHost = hostname;
+ m_PeerPort = port;
+}
+
FIFO::Ptr TCPClient::GetSendQueue(void)
{
return m_SendQueue;
return m_RecvQueue;
}
+
+string TCPClient::GetPeerHost(void)
+{
+ return m_PeerHost;
+}
+
+int TCPClient::GetPeerPort(void)
+{
+ return m_PeerPort;
+}
+
int TCPClient::ReadableEventHandler(EventArgs::Ptr ea)
{
int rc;
class TCPClient : public TCPSocket
{
private:
+ string m_PeerHost;
+ int m_PeerPort;
+
FIFO::Ptr m_SendQueue;
FIFO::Ptr m_RecvQueue;
virtual void Start(void);
- void Connect(const char *hostname, unsigned short port);
+ void Connect(const string& hostname, unsigned short port);
FIFO::Ptr GetSendQueue(void);
FIFO::Ptr GetRecvQueue(void);
+ string GetPeerHost(void);
+ int GetPeerPort(void);
+
virtual bool WantsToRead(void) const;
virtual bool WantsToWrite(void) const;
void TCPServer::Listen(void)
{
- listen(GetFD(), SOMAXCONN);
+ int rc = listen(GetFD(), SOMAXCONN);
+
+ if (rc < 0) {
+ Close();
+ return;
+ }
Start();
}
sin.sin_family = AF_INET;
sin.sin_addr.s_addr = hostname ? inet_addr(hostname) : htonl(INADDR_ANY);
sin.sin_port = htons(port);
- ::bind(GetFD(), (sockaddr *)&sin, sizeof(sin));
+
+ int rc = ::bind(GetFD(), (sockaddr *)&sin, sizeof(sin));
+
+ if (rc < 0)
+ Close();
}
return m_Interval;
}
+void Timer::SetUserArgs(const EventArgs::Ptr& userArgs)
+{
+ m_UserArgs = userArgs;
+}
+
+
+EventArgs::Ptr Timer::GetUserArgs(void) const
+{
+ return m_UserArgs;
+}
+
void Timer::Start(void)
{
Stop();
#include <sys/stat.h>
#include <fcntl.h>
#include <sys/socket.h>
+#include <netdb.h>
#include <sys/ioctl.h>
#include <netinet/in.h>
#include <arpa/inet.h>
int ConfigRpcComponent::LocalObjectCreatedHandler(ConfigObjectEventArgs::Ptr ea)
{
ConfigObject::Ptr object = static_pointer_cast<ConfigObject>(ea->Source);
- ConnectionManager::Ptr connectionManager = GetIcingaApplication()->GetConnectionManager();
- connectionManager->SendMessage(MakeObjectMessage(object, "config::ObjectCreated", true));
+
+ int replicate = 0;
+ object->GetPropertyInteger("replicate", &replicate);
+
+ if (replicate) {
+ ConnectionManager::Ptr connectionManager = GetIcingaApplication()->GetConnectionManager();
+ connectionManager->SendMessage(MakeObjectMessage(object, "config::ObjectCreated", true));
+ }
return 0;
}
int ConfigRpcComponent::LocalObjectRemovedHandler(ConfigObjectEventArgs::Ptr ea)
{
ConfigObject::Ptr object = static_pointer_cast<ConfigObject>(ea->Source);
- ConnectionManager::Ptr connectionManager = GetIcingaApplication()->GetConnectionManager();
- connectionManager->SendMessage(MakeObjectMessage(object, "config::ObjectRemoved", false));
+
+ int replicate = 0;
+ object->GetPropertyInteger("replicate", &replicate);
+
+ if (replicate) {
+ ConnectionManager::Ptr connectionManager = GetIcingaApplication()->GetConnectionManager();
+ connectionManager->SendMessage(MakeObjectMessage(object, "config::ObjectRemoved", false));
+ }
return 0;
}
int ConfigRpcComponent::LocalPropertyChangedHandler(ConfigObjectEventArgs::Ptr ea)
{
ConfigObject::Ptr object = static_pointer_cast<ConfigObject>(ea->Source);
- JsonRpcMessage::Ptr msg = MakeObjectMessage(object, "config::ObjectRemoved", false);
- cJSON *params = msg->GetParams();
- cJSON_AddStringToObject(params, "property", ea->Property.c_str());
- string value;
- object->GetProperty(ea->Property, &value);
- cJSON_AddStringToObject(params, "value", value.c_str());
-
- ConnectionManager::Ptr connectionManager = GetIcingaApplication()->GetConnectionManager();
- connectionManager->SendMessage(msg);
+
+ int replicate = 0;
+ object->GetPropertyInteger("replicate", &replicate);
+
+ if (replicate) {
+ JsonRpcMessage::Ptr msg = MakeObjectMessage(object, "config::ObjectRemoved", false);
+ cJSON *params = msg->GetParams();
+ cJSON_AddStringToObject(params, "property", ea->Property.c_str());
+ string value;
+ object->GetProperty(ea->Property, &value);
+ cJSON_AddStringToObject(params, "value", value.c_str());
+
+ ConnectionManager::Ptr connectionManager = GetIcingaApplication()->GetConnectionManager();
+ connectionManager->SendMessage(msg);
+ }
return 0;
}
string componentDirectory = GetExeDirectory() + "/../lib/icinga";
AddComponentSearchDir(componentDirectory);
- function<int (ConfigObjectEventArgs::Ptr)> NewComponentHandler;
- NewComponentHandler = bind_weak(&IcingaApplication::NewComponentHandler, shared_from_this());
ConfigCollection::Ptr componentCollection = GetConfigHive()->GetCollection("component");
+
+ function<int (ConfigObjectEventArgs::Ptr)> NewComponentHandler = bind_weak(&IcingaApplication::NewComponentHandler, shared_from_this());
componentCollection->OnObjectCreated += NewComponentHandler;
componentCollection->ForEachObject(NewComponentHandler);
- function<int (ConfigObjectEventArgs::Ptr)> DeletedComponentHandler;
- DeletedComponentHandler = bind_weak(&IcingaApplication::DeletedComponentHandler, shared_from_this());
- componentCollection->OnObjectRemoved += DeletedComponentHandler;
+ componentCollection->OnObjectRemoved += bind_weak(&IcingaApplication::DeletedComponentHandler, shared_from_this());
- function<int (ConfigObjectEventArgs::Ptr)> NewRpcListenerHandler;
- NewRpcListenerHandler = bind_weak(&IcingaApplication::NewRpcListenerHandler, shared_from_this());
ConfigCollection::Ptr listenerCollection = GetConfigHive()->GetCollection("rpclistener");
+
+ function<int (ConfigObjectEventArgs::Ptr)> NewRpcListenerHandler = bind_weak(&IcingaApplication::NewRpcListenerHandler, shared_from_this());
listenerCollection->OnObjectCreated += NewRpcListenerHandler;
listenerCollection->ForEachObject(NewRpcListenerHandler);
- function<int (ConfigObjectEventArgs::Ptr)> DeletedRpcListenerHandler;
- DeletedRpcListenerHandler = bind_weak(&IcingaApplication::DeletedRpcListenerHandler, shared_from_this());
- listenerCollection->OnObjectRemoved += DeletedRpcListenerHandler;
+ listenerCollection->OnObjectRemoved += bind_weak(&IcingaApplication::DeletedRpcListenerHandler, shared_from_this());
+
+ ConfigCollection::Ptr connectionCollection = GetConfigHive()->GetCollection("rpcconnection");
+
+ function<int (ConfigObjectEventArgs::Ptr)> NewRpcConnectionHandler = bind_weak(&IcingaApplication::NewRpcConnectionHandler, shared_from_this());
+ connectionCollection->OnObjectCreated += NewRpcConnectionHandler;
+ connectionCollection->ForEachObject(NewRpcConnectionHandler);
+
+ connectionCollection->OnObjectRemoved += bind_weak(&IcingaApplication::DeletedRpcConnectionHandler, shared_from_this());
ConfigObject::Ptr fileComponentConfig = make_shared<ConfigObject>("component", "configfilecomponent");
fileComponentConfig->SetProperty("configFilename", args[1]);
+ fileComponentConfig->SetPropertyInteger("replicate", 0);
GetConfigHive()->AddObject(fileComponentConfig);
ConfigCollection::Ptr collection = GetConfigHive()->GetCollection("rpclistener");
Log("Creating JSON-RPC listener on port %d", port);
- JsonRpcServer::Ptr server = make_shared<JsonRpcServer>();
- server->Bind(port);
- server->Start();
- GetConnectionManager()->RegisterServer(server);
+ GetConnectionManager()->AddListener(port);
return 0;
}
return 0;
}
+int IcingaApplication::NewRpcConnectionHandler(ConfigObjectEventArgs::Ptr ea)
+{
+ ConfigObject::Ptr object = static_pointer_cast<ConfigObject>(ea->Source);
+ string hostname;
+ int port;
+
+ if (!object->GetProperty("hostname", &hostname))
+ throw Exception("Parameter 'hostname' is required for 'rpcconnection' objects.");
+
+ if (!object->GetPropertyInteger("port", &port))
+ throw Exception("Parameter 'port' is required for 'rpcconnection' objects.");
+
+ Log("Creating JSON-RPC connection to %s:%d", hostname.c_str(), port);
+
+ GetConnectionManager()->AddConnection(hostname, port);
+
+ return 0;
+}
+
+int IcingaApplication::DeletedRpcConnectionHandler(ConfigObjectEventArgs::Ptr ea)
+{
+ throw Exception("Unsupported operation.");
+
+ return 0;
+}
+
+
SET_START_CLASS(icinga::IcingaApplication);
int NewRpcListenerHandler(ConfigObjectEventArgs::Ptr ea);
int DeletedRpcListenerHandler(ConfigObjectEventArgs::Ptr ea);
+ int NewRpcConnectionHandler(ConfigObjectEventArgs::Ptr ea);
+ int DeletedRpcConnectionHandler(ConfigObjectEventArgs::Ptr ea);
+
public:
typedef shared_ptr<IcingaApplication> Ptr;
typedef weak_ptr<IcingaApplication> WeakPtr;
using namespace icinga;
+void ConnectionManager::AddListener(unsigned short port)
+{
+ JsonRpcServer::Ptr server = make_shared<JsonRpcServer>();
+ RegisterServer(server);
+
+ server->MakeSocket();
+ server->Bind(port);
+ server->Listen();
+ server->Start();
+}
+
+void ConnectionManager::AddConnection(string host, short port)
+{
+ JsonRpcClient::Ptr client = make_shared<JsonRpcClient>();
+ RegisterClient(client);
+
+ client->MakeSocket();
+ client->Connect(host, port);
+ client->Start();
+}
+
+
void ConnectionManager::RegisterServer(JsonRpcServer::Ptr server)
{
m_Servers.push_front(server);
{
m_Clients.push_front(client);
client->OnNewMessage += bind_weak(&ConnectionManager::NewMessageHandler, shared_from_this());
+ client->OnClosed += bind_weak(&ConnectionManager::CloseClientHandler, shared_from_this());
}
void ConnectionManager::UnregisterClient(JsonRpcClient::Ptr client)
int ConnectionManager::CloseClientHandler(EventArgs::Ptr ea)
{
- UnregisterClient(static_pointer_cast<JsonRpcClient>(ea->Source));
+ JsonRpcClient::Ptr client = static_pointer_cast<JsonRpcClient>(ea->Source);
+ UnregisterClient(client);
+
+ Timer::Ptr timer = make_shared<Timer>();
+ timer->SetInterval(30);
+ timer->SetUserArgs(ea);
+ timer->OnTimerExpired += bind_weak(&ConnectionManager::ReconnectClientHandler, shared_from_this());
+ timer->Start();
+ m_ReconnectTimers.push_front(timer);
+
+ return 0;
+}
+
+int ConnectionManager::ReconnectClientHandler(TimerEventArgs::Ptr ea)
+{
+ JsonRpcClient::Ptr client = static_pointer_cast<JsonRpcClient>(ea->UserArgs->Source);
+ Timer::Ptr timer = static_pointer_cast<Timer>(ea->Source);
+
+ AddConnection(client->GetPeerHost(), client->GetPeerPort());
+
+ timer->Stop();
+ m_ReconnectTimers.remove(timer);
return 0;
}
list<JsonRpcServer::Ptr> m_Servers;
list<JsonRpcClient::Ptr> m_Clients;
map< string, event<NewMessageEventArgs::Ptr> > m_Methods;
+ list<Timer::Ptr> m_ReconnectTimers;
int NewClientHandler(NewClientEventArgs::Ptr ncea);
int CloseClientHandler(EventArgs::Ptr ea);
+ int ReconnectClientHandler(TimerEventArgs::Ptr ea);
int NewMessageHandler(NewMessageEventArgs::Ptr nmea);
-public:
- typedef shared_ptr<ConnectionManager> Ptr;
- typedef weak_ptr<ConnectionManager> WeakPtr;
+ void RegisterClient(JsonRpcClient::Ptr server);
+ void UnregisterClient(JsonRpcClient::Ptr server);
void RegisterServer(JsonRpcServer::Ptr server);
void UnregisterServer(JsonRpcServer::Ptr server);
+public:
+ typedef shared_ptr<ConnectionManager> Ptr;
+ typedef weak_ptr<ConnectionManager> WeakPtr;
- void RegisterClient(JsonRpcClient::Ptr client);
- void UnregisterClient(JsonRpcClient::Ptr client);
+ void AddListener(unsigned short port);
+ void AddConnection(string host, short port);
void RegisterMethod(string method, function<int (NewMessageEventArgs::Ptr)> callback);
void UnregisterMethod(string method, function<int (NewMessageEventArgs::Ptr)> callback);