From: Gunnar Beutner Date: Wed, 4 Apr 2012 10:22:46 +0000 (+0200) Subject: Implemented outbound JSON-RPC client connections. X-Git-Tag: v0.0.1~644 X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=261329e4838af99cc9268e1eef052d4c7663fc97;p=icinga2 Implemented outbound JSON-RPC client connections. Made socket error handling more robust. --- diff --git a/base/configobject.cpp b/base/configobject.cpp index b301d5304..92a3b6e36 100644 --- a/base/configobject.cpp +++ b/base/configobject.cpp @@ -57,6 +57,22 @@ void ConfigObject::SetProperty(const string& name, const string& value) } } +void ConfigObject::SetPropertyInteger(const string& name, int value) +{ + char valueString[20]; + sprintf(valueString, "%d", value); + + SetProperty(name, string(valueString)); +} + +void ConfigObject::SetPropertyDouble(const string& name, double value) +{ + char valueString[20]; + sprintf(valueString, "%f", value); + + SetProperty(name, string(valueString)); +} + bool ConfigObject::GetProperty(const string& name, string *value) const { map::const_iterator vi = Properties.find(name); diff --git a/base/socket.cpp b/base/socket.cpp index 2371ad854..8c6b22600 100644 --- a/base/socket.cpp +++ b/base/socket.cpp @@ -16,6 +16,8 @@ Socket::~Socket(void) void Socket::Start(void) { + OnException += bind_weak(&Socket::ExceptionEventHandler, shared_from_this()); + Sockets.push_front(static_pointer_cast(shared_from_this())); } @@ -62,6 +64,49 @@ void Socket::Close(bool from_dtor) Stop(); } +string Socket::FormatErrorCode(int code) +{ + char *message; + string result = "Unknown socket error."; + +#ifdef _WIN32 + if (FormatMessage(FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM, NULL, code, 0, (char *)&message, 0, NULL) != 0) { + result = string(message); + LocalFree(message); + } +#else /* _WIN32 */ + if (code != 0) + message = strerror(code); + + result = string(message); +#endif /* _WIN32 */ + + return result; +} + +int Socket::ExceptionEventHandler(EventArgs::Ptr ea) +{ + int opt; + socklen_t optlen = sizeof(opt); + + int rc = getsockopt(GetFD(), SOL_SOCKET, SO_ERROR, (char *)&opt, &optlen); + + if (rc < 0) { + Close(); + return 0; + } + + if (opt != 0) { + SocketErrorEventArgs::Ptr ea = make_shared(); + ea->Code = opt; + ea->Message = FormatErrorCode(opt); + Close(); + + } + + return 0; +} + void Socket::CloseAllSockets(void) { for (list::iterator i = Sockets.begin(); i != Sockets.end(); ) { diff --git a/base/socket.h b/base/socket.h index 0f0307c0e..1583621ea 100644 --- a/base/socket.h +++ b/base/socket.h @@ -3,11 +3,24 @@ namespace icinga { +struct SocketErrorEventArgs : public EventArgs +{ + typedef shared_ptr Ptr; + typedef weak_ptr WeakPtr; + + int Code; + string Message; +}; + class Socket : public Object { private: SOCKET m_FD; + int ExceptionEventHandler(EventArgs::Ptr ea); + + string FormatErrorCode(int errorCode); + protected: Socket(void); @@ -30,6 +43,7 @@ public: event OnWritable; event OnException; + event OnError; event OnClosed; virtual bool WantsToRead(void) const; diff --git a/base/tcpclient.cpp b/base/tcpclient.cpp index 595be9203..19fb82095 100644 --- a/base/tcpclient.cpp +++ b/base/tcpclient.cpp @@ -6,6 +6,8 @@ TCPClient::TCPClient(void) { m_SendQueue = make_shared(); m_RecvQueue = make_shared(); + + m_PeerPort = 0; } void TCPClient::Start(void) @@ -16,6 +18,35 @@ void TCPClient::Start(void) OnWritable += bind_weak(&TCPClient::WritableEventHandler, shared_from_this()); } +void TCPClient::Connect(const string& hostname, unsigned short port) +{ + hostent *hent; + sockaddr_in sin; + + memset(&sin, 0, sizeof(sin)); + sin.sin_family = AF_INET; + sin.sin_port = htons(port); + + hent = gethostbyname(hostname.c_str()); + + if (hent != NULL) + sin.sin_addr.s_addr = ((in_addr *)hent->h_addr_list[0])->s_addr; + else + sin.sin_addr.s_addr = inet_addr(hostname.c_str()); + + int rc = connect(GetFD(), (sockaddr *)&sin, sizeof(sin)); + +#ifdef _WIN32 + if (rc < 0 && WSAGetLastError() != WSAEWOULDBLOCK) +#else /* _WIN32 */ + if (rc < 0 && errno != EINPROGRESS) +#endif /* _WIN32 */ + Close(); + + m_PeerHost = hostname; + m_PeerPort = port; +} + FIFO::Ptr TCPClient::GetSendQueue(void) { return m_SendQueue; @@ -26,6 +57,17 @@ FIFO::Ptr TCPClient::GetRecvQueue(void) return m_RecvQueue; } + +string TCPClient::GetPeerHost(void) +{ + return m_PeerHost; +} + +int TCPClient::GetPeerPort(void) +{ + return m_PeerPort; +} + int TCPClient::ReadableEventHandler(EventArgs::Ptr ea) { int rc; diff --git a/base/tcpclient.h b/base/tcpclient.h index 8f6506c33..607ed299a 100644 --- a/base/tcpclient.h +++ b/base/tcpclient.h @@ -7,6 +7,9 @@ namespace icinga class TCPClient : public TCPSocket { private: + string m_PeerHost; + int m_PeerPort; + FIFO::Ptr m_SendQueue; FIFO::Ptr m_RecvQueue; @@ -21,11 +24,14 @@ public: virtual void Start(void); - void Connect(const char *hostname, unsigned short port); + void Connect(const string& hostname, unsigned short port); FIFO::Ptr GetSendQueue(void); FIFO::Ptr GetRecvQueue(void); + string GetPeerHost(void); + int GetPeerPort(void); + virtual bool WantsToRead(void) const; virtual bool WantsToWrite(void) const; diff --git a/base/tcpserver.cpp b/base/tcpserver.cpp index bb8f6718c..d35b1af1b 100644 --- a/base/tcpserver.cpp +++ b/base/tcpserver.cpp @@ -26,7 +26,12 @@ void TCPServer::Start(void) void TCPServer::Listen(void) { - listen(GetFD(), SOMAXCONN); + int rc = listen(GetFD(), SOMAXCONN); + + if (rc < 0) { + Close(); + return; + } Start(); } diff --git a/base/tcpsocket.cpp b/base/tcpsocket.cpp index 632fdf417..cfc373954 100644 --- a/base/tcpsocket.cpp +++ b/base/tcpsocket.cpp @@ -27,5 +27,9 @@ void TCPSocket::Bind(const char *hostname, unsigned short port) sin.sin_family = AF_INET; sin.sin_addr.s_addr = hostname ? inet_addr(hostname) : htonl(INADDR_ANY); sin.sin_port = htons(port); - ::bind(GetFD(), (sockaddr *)&sin, sizeof(sin)); + + int rc = ::bind(GetFD(), (sockaddr *)&sin, sizeof(sin)); + + if (rc < 0) + Close(); } diff --git a/base/timer.cpp b/base/timer.cpp index 817ae2f88..42191f389 100644 --- a/base/timer.cpp +++ b/base/timer.cpp @@ -90,6 +90,17 @@ unsigned int Timer::GetInterval(void) const return m_Interval; } +void Timer::SetUserArgs(const EventArgs::Ptr& userArgs) +{ + m_UserArgs = userArgs; +} + + +EventArgs::Ptr Timer::GetUserArgs(void) const +{ + return m_UserArgs; +} + void Timer::Start(void) { Stop(); diff --git a/base/unix.h b/base/unix.h index f0eba440f..1d9908a34 100644 --- a/base/unix.h +++ b/base/unix.h @@ -7,6 +7,7 @@ #include #include #include +#include #include #include #include diff --git a/configrpccomponent/configrpccomponent.cpp b/configrpccomponent/configrpccomponent.cpp index d28e25324..e76c6e4f5 100644 --- a/configrpccomponent/configrpccomponent.cpp +++ b/configrpccomponent/configrpccomponent.cpp @@ -80,8 +80,14 @@ int ConfigRpcComponent::FetchObjectsHandler(NewMessageEventArgs::Ptr ea) int ConfigRpcComponent::LocalObjectCreatedHandler(ConfigObjectEventArgs::Ptr ea) { ConfigObject::Ptr object = static_pointer_cast(ea->Source); - ConnectionManager::Ptr connectionManager = GetIcingaApplication()->GetConnectionManager(); - connectionManager->SendMessage(MakeObjectMessage(object, "config::ObjectCreated", true)); + + int replicate = 0; + object->GetPropertyInteger("replicate", &replicate); + + if (replicate) { + ConnectionManager::Ptr connectionManager = GetIcingaApplication()->GetConnectionManager(); + connectionManager->SendMessage(MakeObjectMessage(object, "config::ObjectCreated", true)); + } return 0; } @@ -89,8 +95,14 @@ int ConfigRpcComponent::LocalObjectCreatedHandler(ConfigObjectEventArgs::Ptr ea) int ConfigRpcComponent::LocalObjectRemovedHandler(ConfigObjectEventArgs::Ptr ea) { ConfigObject::Ptr object = static_pointer_cast(ea->Source); - ConnectionManager::Ptr connectionManager = GetIcingaApplication()->GetConnectionManager(); - connectionManager->SendMessage(MakeObjectMessage(object, "config::ObjectRemoved", false)); + + int replicate = 0; + object->GetPropertyInteger("replicate", &replicate); + + if (replicate) { + ConnectionManager::Ptr connectionManager = GetIcingaApplication()->GetConnectionManager(); + connectionManager->SendMessage(MakeObjectMessage(object, "config::ObjectRemoved", false)); + } return 0; } @@ -98,15 +110,21 @@ int ConfigRpcComponent::LocalObjectRemovedHandler(ConfigObjectEventArgs::Ptr ea) int ConfigRpcComponent::LocalPropertyChangedHandler(ConfigObjectEventArgs::Ptr ea) { ConfigObject::Ptr object = static_pointer_cast(ea->Source); - JsonRpcMessage::Ptr msg = MakeObjectMessage(object, "config::ObjectRemoved", false); - cJSON *params = msg->GetParams(); - cJSON_AddStringToObject(params, "property", ea->Property.c_str()); - string value; - object->GetProperty(ea->Property, &value); - cJSON_AddStringToObject(params, "value", value.c_str()); - - ConnectionManager::Ptr connectionManager = GetIcingaApplication()->GetConnectionManager(); - connectionManager->SendMessage(msg); + + int replicate = 0; + object->GetPropertyInteger("replicate", &replicate); + + if (replicate) { + JsonRpcMessage::Ptr msg = MakeObjectMessage(object, "config::ObjectRemoved", false); + cJSON *params = msg->GetParams(); + cJSON_AddStringToObject(params, "property", ea->Property.c_str()); + string value; + object->GetProperty(ea->Property, &value); + cJSON_AddStringToObject(params, "value", value.c_str()); + + ConnectionManager::Ptr connectionManager = GetIcingaApplication()->GetConnectionManager(); + connectionManager->SendMessage(msg); + } return 0; } diff --git a/icinga/icingaapplication.cpp b/icinga/icingaapplication.cpp index b4bc9f59d..e39aaa52d 100644 --- a/icinga/icingaapplication.cpp +++ b/icinga/icingaapplication.cpp @@ -30,28 +30,33 @@ int IcingaApplication::Main(const vector& args) string componentDirectory = GetExeDirectory() + "/../lib/icinga"; AddComponentSearchDir(componentDirectory); - function NewComponentHandler; - NewComponentHandler = bind_weak(&IcingaApplication::NewComponentHandler, shared_from_this()); ConfigCollection::Ptr componentCollection = GetConfigHive()->GetCollection("component"); + + function NewComponentHandler = bind_weak(&IcingaApplication::NewComponentHandler, shared_from_this()); componentCollection->OnObjectCreated += NewComponentHandler; componentCollection->ForEachObject(NewComponentHandler); - function DeletedComponentHandler; - DeletedComponentHandler = bind_weak(&IcingaApplication::DeletedComponentHandler, shared_from_this()); - componentCollection->OnObjectRemoved += DeletedComponentHandler; + componentCollection->OnObjectRemoved += bind_weak(&IcingaApplication::DeletedComponentHandler, shared_from_this()); - function NewRpcListenerHandler; - NewRpcListenerHandler = bind_weak(&IcingaApplication::NewRpcListenerHandler, shared_from_this()); ConfigCollection::Ptr listenerCollection = GetConfigHive()->GetCollection("rpclistener"); + + function NewRpcListenerHandler = bind_weak(&IcingaApplication::NewRpcListenerHandler, shared_from_this()); listenerCollection->OnObjectCreated += NewRpcListenerHandler; listenerCollection->ForEachObject(NewRpcListenerHandler); - function DeletedRpcListenerHandler; - DeletedRpcListenerHandler = bind_weak(&IcingaApplication::DeletedRpcListenerHandler, shared_from_this()); - listenerCollection->OnObjectRemoved += DeletedRpcListenerHandler; + listenerCollection->OnObjectRemoved += bind_weak(&IcingaApplication::DeletedRpcListenerHandler, shared_from_this()); + + ConfigCollection::Ptr connectionCollection = GetConfigHive()->GetCollection("rpcconnection"); + + function NewRpcConnectionHandler = bind_weak(&IcingaApplication::NewRpcConnectionHandler, shared_from_this()); + connectionCollection->OnObjectCreated += NewRpcConnectionHandler; + connectionCollection->ForEachObject(NewRpcConnectionHandler); + + connectionCollection->OnObjectRemoved += bind_weak(&IcingaApplication::DeletedRpcConnectionHandler, shared_from_this()); ConfigObject::Ptr fileComponentConfig = make_shared("component", "configfilecomponent"); fileComponentConfig->SetProperty("configFilename", args[1]); + fileComponentConfig->SetPropertyInteger("replicate", 0); GetConfigHive()->AddObject(fileComponentConfig); ConfigCollection::Ptr collection = GetConfigHive()->GetCollection("rpclistener"); @@ -109,10 +114,7 @@ int IcingaApplication::NewRpcListenerHandler(ConfigObjectEventArgs::Ptr ea) Log("Creating JSON-RPC listener on port %d", port); - JsonRpcServer::Ptr server = make_shared(); - server->Bind(port); - server->Start(); - GetConnectionManager()->RegisterServer(server); + GetConnectionManager()->AddListener(port); return 0; } @@ -124,4 +126,31 @@ int IcingaApplication::DeletedRpcListenerHandler(ConfigObjectEventArgs::Ptr ea) return 0; } +int IcingaApplication::NewRpcConnectionHandler(ConfigObjectEventArgs::Ptr ea) +{ + ConfigObject::Ptr object = static_pointer_cast(ea->Source); + string hostname; + int port; + + if (!object->GetProperty("hostname", &hostname)) + throw Exception("Parameter 'hostname' is required for 'rpcconnection' objects."); + + if (!object->GetPropertyInteger("port", &port)) + throw Exception("Parameter 'port' is required for 'rpcconnection' objects."); + + Log("Creating JSON-RPC connection to %s:%d", hostname.c_str(), port); + + GetConnectionManager()->AddConnection(hostname, port); + + return 0; +} + +int IcingaApplication::DeletedRpcConnectionHandler(ConfigObjectEventArgs::Ptr ea) +{ + throw Exception("Unsupported operation."); + + return 0; +} + + SET_START_CLASS(icinga::IcingaApplication); diff --git a/icinga/icingaapplication.h b/icinga/icingaapplication.h index a6f691def..71b7bbfff 100644 --- a/icinga/icingaapplication.h +++ b/icinga/icingaapplication.h @@ -15,6 +15,9 @@ private: int NewRpcListenerHandler(ConfigObjectEventArgs::Ptr ea); int DeletedRpcListenerHandler(ConfigObjectEventArgs::Ptr ea); + int NewRpcConnectionHandler(ConfigObjectEventArgs::Ptr ea); + int DeletedRpcConnectionHandler(ConfigObjectEventArgs::Ptr ea); + public: typedef shared_ptr Ptr; typedef weak_ptr WeakPtr; diff --git a/jsonrpc/connectionmanager.cpp b/jsonrpc/connectionmanager.cpp index 6f4f53bf2..c9dd0ae59 100644 --- a/jsonrpc/connectionmanager.cpp +++ b/jsonrpc/connectionmanager.cpp @@ -2,6 +2,28 @@ using namespace icinga; +void ConnectionManager::AddListener(unsigned short port) +{ + JsonRpcServer::Ptr server = make_shared(); + RegisterServer(server); + + server->MakeSocket(); + server->Bind(port); + server->Listen(); + server->Start(); +} + +void ConnectionManager::AddConnection(string host, short port) +{ + JsonRpcClient::Ptr client = make_shared(); + RegisterClient(client); + + client->MakeSocket(); + client->Connect(host, port); + client->Start(); +} + + void ConnectionManager::RegisterServer(JsonRpcServer::Ptr server) { m_Servers.push_front(server); @@ -18,6 +40,7 @@ void ConnectionManager::RegisterClient(JsonRpcClient::Ptr client) { m_Clients.push_front(client); client->OnNewMessage += bind_weak(&ConnectionManager::NewMessageHandler, shared_from_this()); + client->OnClosed += bind_weak(&ConnectionManager::CloseClientHandler, shared_from_this()); } void ConnectionManager::UnregisterClient(JsonRpcClient::Ptr client) @@ -36,7 +59,28 @@ int ConnectionManager::NewClientHandler(NewClientEventArgs::Ptr ncea) int ConnectionManager::CloseClientHandler(EventArgs::Ptr ea) { - UnregisterClient(static_pointer_cast(ea->Source)); + JsonRpcClient::Ptr client = static_pointer_cast(ea->Source); + UnregisterClient(client); + + Timer::Ptr timer = make_shared(); + timer->SetInterval(30); + timer->SetUserArgs(ea); + timer->OnTimerExpired += bind_weak(&ConnectionManager::ReconnectClientHandler, shared_from_this()); + timer->Start(); + m_ReconnectTimers.push_front(timer); + + return 0; +} + +int ConnectionManager::ReconnectClientHandler(TimerEventArgs::Ptr ea) +{ + JsonRpcClient::Ptr client = static_pointer_cast(ea->UserArgs->Source); + Timer::Ptr timer = static_pointer_cast(ea->Source); + + AddConnection(client->GetPeerHost(), client->GetPeerPort()); + + timer->Stop(); + m_ReconnectTimers.remove(timer); return 0; } diff --git a/jsonrpc/connectionmanager.h b/jsonrpc/connectionmanager.h index 62b049be1..fe3999e7e 100644 --- a/jsonrpc/connectionmanager.h +++ b/jsonrpc/connectionmanager.h @@ -9,20 +9,24 @@ class ConnectionManager : public Object list m_Servers; list m_Clients; map< string, event > m_Methods; + list m_ReconnectTimers; int NewClientHandler(NewClientEventArgs::Ptr ncea); int CloseClientHandler(EventArgs::Ptr ea); + int ReconnectClientHandler(TimerEventArgs::Ptr ea); int NewMessageHandler(NewMessageEventArgs::Ptr nmea); -public: - typedef shared_ptr Ptr; - typedef weak_ptr WeakPtr; + void RegisterClient(JsonRpcClient::Ptr server); + void UnregisterClient(JsonRpcClient::Ptr server); void RegisterServer(JsonRpcServer::Ptr server); void UnregisterServer(JsonRpcServer::Ptr server); +public: + typedef shared_ptr Ptr; + typedef weak_ptr WeakPtr; - void RegisterClient(JsonRpcClient::Ptr client); - void UnregisterClient(JsonRpcClient::Ptr client); + void AddListener(unsigned short port); + void AddConnection(string host, short port); void RegisterMethod(string method, function callback); void UnregisterMethod(string method, function callback);