]> granicus.if.org Git - icinga2/commitdiff
Implemented outbound JSON-RPC client connections.
authorGunnar Beutner <gunnar.beutner@netways.de>
Wed, 4 Apr 2012 10:22:46 +0000 (12:22 +0200)
committerGunnar Beutner <gunnar.beutner@netways.de>
Wed, 4 Apr 2012 10:36:23 +0000 (12:36 +0200)
Made socket error handling more robust.

14 files changed:
base/configobject.cpp
base/socket.cpp
base/socket.h
base/tcpclient.cpp
base/tcpclient.h
base/tcpserver.cpp
base/tcpsocket.cpp
base/timer.cpp
base/unix.h
configrpccomponent/configrpccomponent.cpp
icinga/icingaapplication.cpp
icinga/icingaapplication.h
jsonrpc/connectionmanager.cpp
jsonrpc/connectionmanager.h

index b301d5304f56fa72f3b5c8b8c83197aad42cf3ba..92a3b6e36ff2b201b1b487c9b0e8d9cefc142da6 100644 (file)
@@ -57,6 +57,22 @@ void ConfigObject::SetProperty(const string& name, const string& value)
        }
 }
 
+void ConfigObject::SetPropertyInteger(const string& name, int value)
+{
+       char valueString[20];
+       sprintf(valueString, "%d", value);
+
+       SetProperty(name, string(valueString));
+}
+
+void ConfigObject::SetPropertyDouble(const string& name, double value)
+{
+       char valueString[20];
+       sprintf(valueString, "%f", value);
+
+       SetProperty(name, string(valueString));
+}
+
 bool ConfigObject::GetProperty(const string& name, string *value) const
 {
        map<string, string>::const_iterator vi = Properties.find(name);
index 2371ad854241f08f708ef12134e5c18f93c30e0c..8c6b226000779c9fdb2f116b55a8924799df3a63 100644 (file)
@@ -16,6 +16,8 @@ Socket::~Socket(void)
 
 void Socket::Start(void)
 {
+       OnException += bind_weak(&Socket::ExceptionEventHandler, shared_from_this());
+
        Sockets.push_front(static_pointer_cast<Socket>(shared_from_this()));
 }
 
@@ -62,6 +64,49 @@ void Socket::Close(bool from_dtor)
                Stop();
 }
 
+string Socket::FormatErrorCode(int code)
+{
+       char *message;
+       string result = "Unknown socket error.";
+
+#ifdef _WIN32
+       if (FormatMessage(FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM, NULL, code, 0, (char *)&message, 0, NULL) != 0) {
+               result = string(message);
+               LocalFree(message);
+       }
+#else /* _WIN32 */
+       if (code != 0)
+               message = strerror(code);
+
+       result = string(message);
+#endif /* _WIN32 */
+
+       return result;
+}
+
+int Socket::ExceptionEventHandler(EventArgs::Ptr ea)
+{
+       int opt;
+       socklen_t optlen = sizeof(opt);
+
+       int rc = getsockopt(GetFD(), SOL_SOCKET, SO_ERROR, (char *)&opt, &optlen);
+
+       if (rc < 0) {
+               Close();
+               return 0;
+       }
+
+       if (opt != 0) {
+               SocketErrorEventArgs::Ptr ea = make_shared<SocketErrorEventArgs>();
+               ea->Code = opt;
+               ea->Message = FormatErrorCode(opt);
+               Close();
+
+       }
+
+       return 0;
+}
+
 void Socket::CloseAllSockets(void)
 {
        for (list<Socket::WeakPtr>::iterator i = Sockets.begin(); i != Sockets.end(); ) {
index 0f0307c0e2993f1ab285473ee015f506c73c0eb5..1583621ea9a2f795105bd4f9d5a490c3cbff210b 100644 (file)
@@ -3,11 +3,24 @@
 
 namespace icinga {
 
+struct SocketErrorEventArgs : public EventArgs
+{
+       typedef shared_ptr<SocketErrorEventArgs> Ptr;
+       typedef weak_ptr<SocketErrorEventArgs> WeakPtr;
+
+       int Code;
+       string Message;
+};
+
 class Socket : public Object
 {
 private:
        SOCKET m_FD;
 
+       int ExceptionEventHandler(EventArgs::Ptr ea);
+
+       string FormatErrorCode(int errorCode);
+
 protected:
        Socket(void);
 
@@ -30,6 +43,7 @@ public:
        event<EventArgs::Ptr> OnWritable;
        event<EventArgs::Ptr> OnException;
 
+       event<SocketErrorEventArgs::Ptr> OnError;
        event<EventArgs::Ptr> OnClosed;
 
        virtual bool WantsToRead(void) const;
index 595be920335809de9ac74c63b6fb14790d38905b..19fb82095c2c37d287626b05dd1bc51fc67234e2 100644 (file)
@@ -6,6 +6,8 @@ TCPClient::TCPClient(void)
 {
        m_SendQueue = make_shared<FIFO>();
        m_RecvQueue = make_shared<FIFO>();
+
+       m_PeerPort = 0;
 }
 
 void TCPClient::Start(void)
@@ -16,6 +18,35 @@ void TCPClient::Start(void)
        OnWritable += bind_weak(&TCPClient::WritableEventHandler, shared_from_this());
 }
 
+void TCPClient::Connect(const string& hostname, unsigned short port)
+{
+       hostent *hent;
+       sockaddr_in sin;
+
+       memset(&sin, 0, sizeof(sin));
+       sin.sin_family = AF_INET;
+       sin.sin_port = htons(port);
+
+       hent = gethostbyname(hostname.c_str());
+
+       if (hent != NULL)
+               sin.sin_addr.s_addr = ((in_addr *)hent->h_addr_list[0])->s_addr;
+       else
+               sin.sin_addr.s_addr = inet_addr(hostname.c_str());
+
+       int rc = connect(GetFD(), (sockaddr *)&sin, sizeof(sin));
+
+#ifdef _WIN32
+       if (rc < 0 && WSAGetLastError() != WSAEWOULDBLOCK)
+#else /* _WIN32 */
+       if (rc < 0 && errno != EINPROGRESS)
+#endif /* _WIN32 */
+               Close();
+
+       m_PeerHost = hostname;
+       m_PeerPort = port;
+}
+
 FIFO::Ptr TCPClient::GetSendQueue(void)
 {
        return m_SendQueue;
@@ -26,6 +57,17 @@ FIFO::Ptr TCPClient::GetRecvQueue(void)
        return m_RecvQueue;
 }
 
+
+string TCPClient::GetPeerHost(void)
+{
+       return m_PeerHost;
+}
+
+int TCPClient::GetPeerPort(void)
+{
+       return m_PeerPort;
+}
+
 int TCPClient::ReadableEventHandler(EventArgs::Ptr ea)
 {
        int rc;
index 8f6506c332f9c5f5853c64174a48d0deab7407e4..607ed299ad493a7b7977c0232287229c60b82aa2 100644 (file)
@@ -7,6 +7,9 @@ namespace icinga
 class TCPClient : public TCPSocket
 {
 private:
+       string m_PeerHost;
+       int m_PeerPort;
+
        FIFO::Ptr m_SendQueue;
        FIFO::Ptr m_RecvQueue;
 
@@ -21,11 +24,14 @@ public:
 
        virtual void Start(void);
 
-       void Connect(const char *hostname, unsigned short port);
+       void Connect(const string& hostname, unsigned short port);
 
        FIFO::Ptr GetSendQueue(void);
        FIFO::Ptr GetRecvQueue(void);
 
+       string GetPeerHost(void);
+       int GetPeerPort(void);
+
        virtual bool WantsToRead(void) const;
        virtual bool WantsToWrite(void) const;
 
index bb8f6718c66a6d52d6440bc13f6855543d7fee8d..d35b1af1b7446da34cefd0232ea0600475941c61 100644 (file)
@@ -26,7 +26,12 @@ void TCPServer::Start(void)
 
 void TCPServer::Listen(void)
 {
-       listen(GetFD(), SOMAXCONN);
+       int rc = listen(GetFD(), SOMAXCONN);
+
+       if (rc < 0) {
+               Close();
+               return;
+       }
 
        Start();
 }
index 632fdf4179465896cf70bb4a3f1cc220f67d1149..cfc3739543c6ebc63714a60d77072b1cc9c17f6e 100644 (file)
@@ -27,5 +27,9 @@ void TCPSocket::Bind(const char *hostname, unsigned short port)
        sin.sin_family = AF_INET;
        sin.sin_addr.s_addr = hostname ? inet_addr(hostname) : htonl(INADDR_ANY);
        sin.sin_port = htons(port);
-       ::bind(GetFD(), (sockaddr *)&sin, sizeof(sin));
+
+       int rc = ::bind(GetFD(), (sockaddr *)&sin, sizeof(sin));
+
+       if (rc < 0)
+               Close();
 }
index 817ae2f8896e9aff8f55fa3a7a0a362a8aa121e1..42191f3891f09786ab5e8a46d069cb03140a14c5 100644 (file)
@@ -90,6 +90,17 @@ unsigned int Timer::GetInterval(void) const
        return m_Interval;
 }
 
+void Timer::SetUserArgs(const EventArgs::Ptr& userArgs)
+{
+       m_UserArgs = userArgs;
+}
+
+
+EventArgs::Ptr Timer::GetUserArgs(void) const
+{
+       return m_UserArgs;
+}
+
 void Timer::Start(void)
 {
        Stop();
index f0eba440f8e1452a87d91799e2397ebad4d75253..1d9908a347362e65e27305540bbc2bb804e674c3 100644 (file)
@@ -7,6 +7,7 @@
 #include <sys/stat.h>
 #include <fcntl.h>
 #include <sys/socket.h>
+#include <netdb.h>
 #include <sys/ioctl.h>
 #include <netinet/in.h>
 #include <arpa/inet.h>
index d28e253240e69f439c7a7bca2a73159b42176ba2..e76c6e4f5bfc04614b40b64f48b7c2549a5ce75b 100644 (file)
@@ -80,8 +80,14 @@ int ConfigRpcComponent::FetchObjectsHandler(NewMessageEventArgs::Ptr ea)
 int ConfigRpcComponent::LocalObjectCreatedHandler(ConfigObjectEventArgs::Ptr ea)
 {
        ConfigObject::Ptr object = static_pointer_cast<ConfigObject>(ea->Source);
-       ConnectionManager::Ptr connectionManager = GetIcingaApplication()->GetConnectionManager();
-       connectionManager->SendMessage(MakeObjectMessage(object, "config::ObjectCreated", true));
+       
+       int replicate = 0;
+       object->GetPropertyInteger("replicate", &replicate);
+
+       if (replicate) {
+               ConnectionManager::Ptr connectionManager = GetIcingaApplication()->GetConnectionManager();
+               connectionManager->SendMessage(MakeObjectMessage(object, "config::ObjectCreated", true));
+       }
 
        return 0;
 }
@@ -89,8 +95,14 @@ int ConfigRpcComponent::LocalObjectCreatedHandler(ConfigObjectEventArgs::Ptr ea)
 int ConfigRpcComponent::LocalObjectRemovedHandler(ConfigObjectEventArgs::Ptr ea)
 {
        ConfigObject::Ptr object = static_pointer_cast<ConfigObject>(ea->Source);
-       ConnectionManager::Ptr connectionManager = GetIcingaApplication()->GetConnectionManager();
-       connectionManager->SendMessage(MakeObjectMessage(object, "config::ObjectRemoved", false));
+       
+       int replicate = 0;
+       object->GetPropertyInteger("replicate", &replicate);
+
+       if (replicate) {
+               ConnectionManager::Ptr connectionManager = GetIcingaApplication()->GetConnectionManager();
+               connectionManager->SendMessage(MakeObjectMessage(object, "config::ObjectRemoved", false));
+       }
 
        return 0;
 }
@@ -98,15 +110,21 @@ int ConfigRpcComponent::LocalObjectRemovedHandler(ConfigObjectEventArgs::Ptr ea)
 int ConfigRpcComponent::LocalPropertyChangedHandler(ConfigObjectEventArgs::Ptr ea)
 {
        ConfigObject::Ptr object = static_pointer_cast<ConfigObject>(ea->Source);
-       JsonRpcMessage::Ptr msg = MakeObjectMessage(object, "config::ObjectRemoved", false);
-       cJSON *params = msg->GetParams();
-       cJSON_AddStringToObject(params, "property", ea->Property.c_str());
-       string value;
-       object->GetProperty(ea->Property, &value);
-       cJSON_AddStringToObject(params, "value", value.c_str());
-
-       ConnectionManager::Ptr connectionManager = GetIcingaApplication()->GetConnectionManager();
-       connectionManager->SendMessage(msg);
+       
+       int replicate = 0;
+       object->GetPropertyInteger("replicate", &replicate);
+
+       if (replicate) {
+               JsonRpcMessage::Ptr msg = MakeObjectMessage(object, "config::ObjectRemoved", false);
+               cJSON *params = msg->GetParams();
+               cJSON_AddStringToObject(params, "property", ea->Property.c_str());
+               string value;
+               object->GetProperty(ea->Property, &value);
+               cJSON_AddStringToObject(params, "value", value.c_str());
+
+               ConnectionManager::Ptr connectionManager = GetIcingaApplication()->GetConnectionManager();
+               connectionManager->SendMessage(msg);
+       }
 
        return 0;
 }
index b4bc9f59d9bcfa9ccf519b7d0738fa4712586e92..e39aaa52d2b3d959ca180792fef30cbe0c9bd21a 100644 (file)
@@ -30,28 +30,33 @@ int IcingaApplication::Main(const vector<string>& args)
        string componentDirectory = GetExeDirectory() + "/../lib/icinga";
        AddComponentSearchDir(componentDirectory);
 
-       function<int (ConfigObjectEventArgs::Ptr)> NewComponentHandler;
-       NewComponentHandler = bind_weak(&IcingaApplication::NewComponentHandler, shared_from_this());
        ConfigCollection::Ptr componentCollection = GetConfigHive()->GetCollection("component");
+
+       function<int (ConfigObjectEventArgs::Ptr)> NewComponentHandler = bind_weak(&IcingaApplication::NewComponentHandler, shared_from_this());
        componentCollection->OnObjectCreated += NewComponentHandler;
        componentCollection->ForEachObject(NewComponentHandler);
 
-       function<int (ConfigObjectEventArgs::Ptr)> DeletedComponentHandler;
-       DeletedComponentHandler = bind_weak(&IcingaApplication::DeletedComponentHandler, shared_from_this());
-       componentCollection->OnObjectRemoved += DeletedComponentHandler;
+       componentCollection->OnObjectRemoved += bind_weak(&IcingaApplication::DeletedComponentHandler, shared_from_this());
 
-       function<int (ConfigObjectEventArgs::Ptr)> NewRpcListenerHandler;
-       NewRpcListenerHandler = bind_weak(&IcingaApplication::NewRpcListenerHandler, shared_from_this());
        ConfigCollection::Ptr listenerCollection = GetConfigHive()->GetCollection("rpclistener");
+
+       function<int (ConfigObjectEventArgs::Ptr)> NewRpcListenerHandler = bind_weak(&IcingaApplication::NewRpcListenerHandler, shared_from_this());
        listenerCollection->OnObjectCreated += NewRpcListenerHandler;
        listenerCollection->ForEachObject(NewRpcListenerHandler);
 
-       function<int (ConfigObjectEventArgs::Ptr)> DeletedRpcListenerHandler;
-       DeletedRpcListenerHandler = bind_weak(&IcingaApplication::DeletedRpcListenerHandler, shared_from_this());
-       listenerCollection->OnObjectRemoved += DeletedRpcListenerHandler;
+       listenerCollection->OnObjectRemoved += bind_weak(&IcingaApplication::DeletedRpcListenerHandler, shared_from_this());
+
+       ConfigCollection::Ptr connectionCollection = GetConfigHive()->GetCollection("rpcconnection");
+
+       function<int (ConfigObjectEventArgs::Ptr)> NewRpcConnectionHandler = bind_weak(&IcingaApplication::NewRpcConnectionHandler, shared_from_this());
+       connectionCollection->OnObjectCreated += NewRpcConnectionHandler;
+       connectionCollection->ForEachObject(NewRpcConnectionHandler);
+
+       connectionCollection->OnObjectRemoved += bind_weak(&IcingaApplication::DeletedRpcConnectionHandler, shared_from_this());
 
        ConfigObject::Ptr fileComponentConfig = make_shared<ConfigObject>("component", "configfilecomponent");
        fileComponentConfig->SetProperty("configFilename", args[1]);
+       fileComponentConfig->SetPropertyInteger("replicate", 0);
        GetConfigHive()->AddObject(fileComponentConfig);
 
        ConfigCollection::Ptr collection = GetConfigHive()->GetCollection("rpclistener");
@@ -109,10 +114,7 @@ int IcingaApplication::NewRpcListenerHandler(ConfigObjectEventArgs::Ptr ea)
 
        Log("Creating JSON-RPC listener on port %d", port);
 
-       JsonRpcServer::Ptr server = make_shared<JsonRpcServer>();
-       server->Bind(port);
-       server->Start();
-       GetConnectionManager()->RegisterServer(server);
+       GetConnectionManager()->AddListener(port);
 
        return 0;
 }
@@ -124,4 +126,31 @@ int IcingaApplication::DeletedRpcListenerHandler(ConfigObjectEventArgs::Ptr ea)
        return 0;
 }
 
+int IcingaApplication::NewRpcConnectionHandler(ConfigObjectEventArgs::Ptr ea)
+{
+       ConfigObject::Ptr object = static_pointer_cast<ConfigObject>(ea->Source);
+       string hostname;
+       int port;
+
+       if (!object->GetProperty("hostname", &hostname))
+               throw Exception("Parameter 'hostname' is required for 'rpcconnection' objects.");
+
+       if (!object->GetPropertyInteger("port", &port))
+               throw Exception("Parameter 'port' is required for 'rpcconnection' objects.");
+
+       Log("Creating JSON-RPC connection to %s:%d", hostname.c_str(), port);
+
+       GetConnectionManager()->AddConnection(hostname, port);
+
+       return 0;
+}
+
+int IcingaApplication::DeletedRpcConnectionHandler(ConfigObjectEventArgs::Ptr ea)
+{
+       throw Exception("Unsupported operation.");
+
+       return 0;
+}
+
+
 SET_START_CLASS(icinga::IcingaApplication);
index a6f691def9bbcddf78333fe5f62d51b2eb8d9410..71b7bbfff440980a02643eefe9aa4f081d19b6de 100644 (file)
@@ -15,6 +15,9 @@ private:
        int NewRpcListenerHandler(ConfigObjectEventArgs::Ptr ea);
        int DeletedRpcListenerHandler(ConfigObjectEventArgs::Ptr ea);
 
+       int NewRpcConnectionHandler(ConfigObjectEventArgs::Ptr ea);
+       int DeletedRpcConnectionHandler(ConfigObjectEventArgs::Ptr ea);
+
 public:
        typedef shared_ptr<IcingaApplication> Ptr;
        typedef weak_ptr<IcingaApplication> WeakPtr;
index 6f4f53bf229b2f1d9872127c4c211f90c23facf9..c9dd0ae59c95459db2179f0684b000c71a68cc3b 100644 (file)
@@ -2,6 +2,28 @@
 
 using namespace icinga;
 
+void ConnectionManager::AddListener(unsigned short port)
+{
+       JsonRpcServer::Ptr server = make_shared<JsonRpcServer>();
+       RegisterServer(server);
+
+       server->MakeSocket();
+       server->Bind(port);
+       server->Listen();
+       server->Start();
+}
+
+void ConnectionManager::AddConnection(string host, short port)
+{
+       JsonRpcClient::Ptr client = make_shared<JsonRpcClient>();
+       RegisterClient(client);
+
+       client->MakeSocket();
+       client->Connect(host, port);
+       client->Start();
+}
+
+
 void ConnectionManager::RegisterServer(JsonRpcServer::Ptr server)
 {
        m_Servers.push_front(server);
@@ -18,6 +40,7 @@ void ConnectionManager::RegisterClient(JsonRpcClient::Ptr client)
 {
        m_Clients.push_front(client);
        client->OnNewMessage += bind_weak(&ConnectionManager::NewMessageHandler, shared_from_this());
+       client->OnClosed += bind_weak(&ConnectionManager::CloseClientHandler, shared_from_this());
 }
 
 void ConnectionManager::UnregisterClient(JsonRpcClient::Ptr client)
@@ -36,7 +59,28 @@ int ConnectionManager::NewClientHandler(NewClientEventArgs::Ptr ncea)
 
 int ConnectionManager::CloseClientHandler(EventArgs::Ptr ea)
 {
-       UnregisterClient(static_pointer_cast<JsonRpcClient>(ea->Source));
+       JsonRpcClient::Ptr client = static_pointer_cast<JsonRpcClient>(ea->Source);
+       UnregisterClient(client);
+
+       Timer::Ptr timer = make_shared<Timer>();
+       timer->SetInterval(30);
+       timer->SetUserArgs(ea);
+       timer->OnTimerExpired += bind_weak(&ConnectionManager::ReconnectClientHandler, shared_from_this());
+       timer->Start();
+       m_ReconnectTimers.push_front(timer);
+
+       return 0;
+}
+
+int ConnectionManager::ReconnectClientHandler(TimerEventArgs::Ptr ea)
+{
+       JsonRpcClient::Ptr client = static_pointer_cast<JsonRpcClient>(ea->UserArgs->Source);
+       Timer::Ptr timer = static_pointer_cast<Timer>(ea->Source);
+
+       AddConnection(client->GetPeerHost(), client->GetPeerPort());
+
+       timer->Stop();
+       m_ReconnectTimers.remove(timer);
 
        return 0;
 }
index 62b049be172caf16866748e23d3595b8a0c97ed2..fe3999e7ef6c8e57734a37d4d16699f825a25253 100644 (file)
@@ -9,20 +9,24 @@ class ConnectionManager : public Object
        list<JsonRpcServer::Ptr> m_Servers;
        list<JsonRpcClient::Ptr> m_Clients;
        map< string, event<NewMessageEventArgs::Ptr> > m_Methods;
+       list<Timer::Ptr> m_ReconnectTimers;
 
        int NewClientHandler(NewClientEventArgs::Ptr ncea);
        int CloseClientHandler(EventArgs::Ptr ea);
+       int ReconnectClientHandler(TimerEventArgs::Ptr ea);
        int NewMessageHandler(NewMessageEventArgs::Ptr nmea);
 
-public:
-       typedef shared_ptr<ConnectionManager> Ptr;
-       typedef weak_ptr<ConnectionManager> WeakPtr;
+       void RegisterClient(JsonRpcClient::Ptr server);
+       void UnregisterClient(JsonRpcClient::Ptr server);
 
        void RegisterServer(JsonRpcServer::Ptr server);
        void UnregisterServer(JsonRpcServer::Ptr server);
+public:
+       typedef shared_ptr<ConnectionManager> Ptr;
+       typedef weak_ptr<ConnectionManager> WeakPtr;
 
-       void RegisterClient(JsonRpcClient::Ptr client);
-       void UnregisterClient(JsonRpcClient::Ptr client);
+       void AddListener(unsigned short port);
+       void AddConnection(string host, short port);
 
        void RegisterMethod(string method, function<int (NewMessageEventArgs::Ptr)> callback);
        void UnregisterMethod(string method, function<int (NewMessageEventArgs::Ptr)> callback);