From 7474b63dff248ab7d9c080b0f5726855e843aab4 Mon Sep 17 00:00:00 2001 From: Gunnar Beutner Date: Wed, 18 Apr 2012 15:22:25 +0200 Subject: [PATCH] Refactored messaging system. --- base/application.cpp | 60 +++---- base/application.h | 3 +- base/base.vcxproj | 4 + base/configcollection.cpp | 15 +- base/configcollection.h | 8 +- base/confighive.cpp | 2 +- base/confighive.h | 8 +- base/configobject.cpp | 8 +- base/dictionary.cpp | 102 +++++++++++ base/dictionary.h | 39 +++++ base/event.h | 7 +- base/i2-base.h | 2 + base/socket.cpp | 16 +- base/socket.h | 12 +- base/tcpclient.cpp | 38 ++--- base/tcpclient.h | 6 +- base/tcpserver.cpp | 33 +++- base/tcpserver.h | 4 +- base/tcpsocket.cpp | 20 +-- base/timer.cpp | 12 +- base/timer.h | 10 +- base/variant.cpp | 68 ++++++++ base/variant.h | 45 +++++ components/configrpc/configrpccomponent.cpp | 105 ++++++------ components/configrpc/configrpccomponent.h | 15 +- icinga.sln | 7 - icinga/endpoint.cpp | 34 +++- icinga/endpoint.h | 26 ++- icinga/endpointmanager.cpp | 148 ++++++++-------- icinga/endpointmanager.h | 24 ++- icinga/icingaapplication.cpp | 51 ++++-- icinga/icingaapplication.h | 15 +- icinga/jsonrpcendpoint.cpp | 148 +++++++++++++++- icinga/jsonrpcendpoint.h | 21 ++- icinga/virtualendpoint.cpp | 25 +-- icinga/virtualendpoint.h | 17 +- jsonrpc/i2-jsonrpc.h | 2 + jsonrpc/jsonrpc.vcxproj | 28 ---- jsonrpc/jsonrpcclient.cpp | 20 ++- jsonrpc/jsonrpcclient.h | 8 +- jsonrpc/jsonrpcrequest.cpp | 4 + jsonrpc/jsonrpcrequest.h | 64 +++++++ jsonrpc/jsonrpcresponse.cpp | 4 + jsonrpc/jsonrpcresponse.h | 56 +++++++ jsonrpc/message.cpp | 23 +++ jsonrpc/message.h | 22 +++ jsonrpc/netstring.cpp | 85 ++++++++-- jsonrpc/netstring.h | 7 +- msgc/msgc.cpp | 177 -------------------- msgc/msgc.vcxproj | 82 --------- 50 files changed, 1101 insertions(+), 639 deletions(-) create mode 100644 base/dictionary.cpp create mode 100644 base/dictionary.h create mode 100644 base/variant.cpp create mode 100644 base/variant.h create mode 100644 jsonrpc/jsonrpcrequest.cpp create mode 100644 jsonrpc/jsonrpcrequest.h create mode 100644 jsonrpc/jsonrpcresponse.cpp create mode 100644 jsonrpc/jsonrpcresponse.h create mode 100644 jsonrpc/message.cpp create mode 100644 jsonrpc/message.h delete mode 100644 msgc/msgc.cpp delete mode 100644 msgc/msgc.vcxproj diff --git a/base/application.cpp b/base/application.cpp index ace02930c..ec3970577 100644 --- a/base/application.cpp +++ b/base/application.cpp @@ -20,6 +20,11 @@ Application::Application(void) char *debugging = getenv("_DEBUG"); m_Debugging = (debugging && strtol(debugging, NULL, 10) != 0); +#ifdef _WIN32 + if (IsDebuggerPresent()) + m_Debugging = true; +#endif /* _WIN32 */ + m_ShuttingDown = false; m_ConfigHive = make_shared(); } @@ -48,6 +53,8 @@ void Application::RunEventLoop(void) fd_set readfds, writefds, exceptfds; int nfds = -1; + Timer::CallExpiredTimers(); + FD_ZERO(&readfds); FD_ZERO(&writefds); FD_ZERO(&exceptfds); @@ -72,12 +79,7 @@ void Application::RunEventLoop(void) nfds = fd; } - long sleep; - - do { - Timer::CallExpiredTimers(); - sleep = (long)(Timer::GetNextCall() - time(NULL)); - } while (sleep <= 0); + long sleep = (long)(Timer::GetNextCall() - time(NULL)); if (m_ShuttingDown) break; @@ -99,8 +101,8 @@ void Application::RunEventLoop(void) else if (ready == 0) continue; - EventArgs::Ptr ea = make_shared(); - ea->Source = shared_from_this(); + EventArgs ea; + ea.Source = shared_from_this(); list::iterator prev, i; for (i = Socket::Sockets.begin(); i != Socket::Sockets.end(); ) { @@ -206,39 +208,39 @@ Component::Ptr Application::LoadComponent(const string& path, const ConfigObject throw ComponentLoadException("Loadable module does not contain CreateComponent function"); component = Component::Ptr(pCreateComponent()); - component->SetApplication(static_pointer_cast(shared_from_this())); component->SetConfig(componentConfig); + RegisterComponent(component); + return component; +} + +void Application::RegisterComponent(Component::Ptr component) +{ + component->SetApplication(static_pointer_cast(shared_from_this())); m_Components[component->GetName()] = component; component->Start(); - - return component; } -Component::Ptr Application::GetComponent(const string& name) +void Application::UnregisterComponent(Component::Ptr component) { - map::iterator ci = m_Components.find(name); - - if (ci == m_Components.end()) - return Component::Ptr(); + string name = component->GetName(); - return ci->second; + Log("Unloading component '%s'", name.c_str()); + map::iterator i = m_Components.find(name); + if (i != m_Components.end()) { + m_Components.erase(i); + component->Stop(); + } } -void Application::UnloadComponent(const string& name) +Component::Ptr Application::GetComponent(const string& name) { map::iterator ci = m_Components.find(name); if (ci == m_Components.end()) - return; - - Log("Unloading component '%s'", name.c_str()); - - Component::Ptr component = ci->second; - component->Stop(); - m_Components.erase(ci); + return Component::Ptr(); - // TODO: unload DLL + return ci->second; } void Application::Log(const char *format, ...) @@ -396,7 +398,7 @@ int application_main(int argc, char **argv, Application *instance) try { result = Application::Instance->Main(args); } catch (const Exception& ex) { - cout << "---" << endl; + cerr << "---" << endl; string klass = typeid(ex).name(); @@ -410,8 +412,8 @@ int application_main(int argc, char **argv, Application *instance) } #endif /* HAVE_GCC_ABI_DEMANGLE */ - cout << "Exception: " << klass << endl; - cout << "Message: " << ex.GetMessage() << endl; + cerr << "Exception: " << klass << endl; + cerr << "Message: " << ex.GetMessage() << endl; return EXIT_FAILURE; } diff --git a/base/application.h b/base/application.h index 04d388c97..48acddf83 100644 --- a/base/application.h +++ b/base/application.h @@ -38,7 +38,8 @@ public: ConfigHive::Ptr GetConfigHive(void); shared_ptr LoadComponent(const string& path, const ConfigObject::Ptr& componentConfig); - void UnloadComponent(const string& name); + void RegisterComponent(shared_ptr component); + void UnregisterComponent(shared_ptr component); shared_ptr GetComponent(const string& name); void AddComponentSearchDir(const string& componentDirectory); diff --git a/base/base.vcxproj b/base/base.vcxproj index ffc05ae7f..9cc9a0e79 100644 --- a/base/base.vcxproj +++ b/base/base.vcxproj @@ -17,6 +17,7 @@ + @@ -29,6 +30,7 @@ + @@ -40,6 +42,7 @@ + @@ -53,6 +56,7 @@ + diff --git a/base/configcollection.cpp b/base/configcollection.cpp index 585b09db6..a1d24fe06 100644 --- a/base/configcollection.cpp +++ b/base/configcollection.cpp @@ -16,8 +16,8 @@ void ConfigCollection::AddObject(const ConfigObject::Ptr& object) { Objects[object->GetName()] = object; - ConfigObjectEventArgs::Ptr ea = make_shared(); - ea->Source = object; + ConfigObjectEventArgs ea; + ea.Source = object; OnObjectCreated(ea); ConfigHive::Ptr hive = m_Hive.lock(); @@ -32,8 +32,8 @@ void ConfigCollection::RemoveObject(const ConfigObject::Ptr& object) if (oi != Objects.end()) { Objects.erase(oi); - ConfigObjectEventArgs::Ptr ea = make_shared(); - ea->Source = object; + ConfigObjectEventArgs ea; + ea.Source = object; OnObjectRemoved(ea); ConfigHive::Ptr hive = m_Hive.lock(); @@ -52,11 +52,12 @@ ConfigObject::Ptr ConfigCollection::GetObject(const string& name) return oi->second; } -void ConfigCollection::ForEachObject(function callback) +void ConfigCollection::ForEachObject(function callback) { + ConfigObjectEventArgs ea; + for (ObjectIterator oi = Objects.begin(); oi != Objects.end(); oi++) { - ConfigObjectEventArgs::Ptr ea = make_shared(); - ea->Source = oi->second; + ea.Source = oi->second; callback(ea); } } diff --git a/base/configcollection.h b/base/configcollection.h index ebc00261a..be252c375 100644 --- a/base/configcollection.h +++ b/base/configcollection.h @@ -25,11 +25,11 @@ public: void RemoveObject(const ConfigObject::Ptr& object); ConfigObject::Ptr GetObject(const string& name = string()); - void ForEachObject(function callback); + void ForEachObject(function callback); - Event OnObjectCreated; - Event OnObjectRemoved; - Event OnPropertyChanged; + Event OnObjectCreated; + Event OnObjectRemoved; + Event OnPropertyChanged; }; } diff --git a/base/confighive.cpp b/base/confighive.cpp index 93bd49e5a..81623b77c 100644 --- a/base/confighive.cpp +++ b/base/confighive.cpp @@ -30,7 +30,7 @@ ConfigCollection::Ptr ConfigHive::GetCollection(const string& collection) return ci->second; } -void ConfigHive::ForEachObject(const string& type, function callback) +void ConfigHive::ForEachObject(const string& type, function callback) { CollectionIterator ci = Collections.find(type); diff --git a/base/confighive.h b/base/confighive.h index 95034b2ed..f2d78e3cc 100644 --- a/base/confighive.h +++ b/base/confighive.h @@ -18,11 +18,11 @@ public: ConfigObject::Ptr GetObject(const string& collection, const string& name = string()); ConfigCollection::Ptr GetCollection(const string& collection); - void ForEachObject(const string& type, function callback); + void ForEachObject(const string& type, function callback); - Event OnObjectCreated; - Event OnObjectRemoved; - Event OnPropertyChanged; + Event OnObjectCreated; + Event OnObjectRemoved; + Event OnPropertyChanged; }; } diff --git a/base/configobject.cpp b/base/configobject.cpp index 92a3b6e36..9f779a7cd 100644 --- a/base/configobject.cpp +++ b/base/configobject.cpp @@ -44,13 +44,13 @@ void ConfigObject::SetProperty(const string& name, const string& value) ConfigHive::Ptr hive = m_Hive.lock(); if (hive) { - ConfigObjectEventArgs::Ptr ea = make_shared(); - ea->Source = shared_from_this(); - ea->Property = name; + ConfigObjectEventArgs ea; + ea.Source = shared_from_this(); + ea.Property = name; string oldValue; if (GetProperty(name, &oldValue)) - ea->OldValue = oldValue; + ea.OldValue = oldValue; hive->GetCollection(m_Type)->OnPropertyChanged(ea); hive->OnPropertyChanged(ea); diff --git a/base/dictionary.cpp b/base/dictionary.cpp new file mode 100644 index 000000000..4ec935bc6 --- /dev/null +++ b/base/dictionary.cpp @@ -0,0 +1,102 @@ +#include "i2-base.h" + +using namespace icinga; + +bool Dictionary::GetValueVariant(string key, Variant *value) +{ + DictionaryIterator i = m_Data.find(key); + + if (i == m_Data.end()) + return false; + + *value = i->second; + + return true; +} + +void Dictionary::SetValueVariant(string key, const Variant& value) +{ + m_Data.erase(key); + m_Data[key] = value; +} + +bool Dictionary::GetValueString(string key, string *value) +{ + Variant data; + + if (!GetValueVariant(key, &data)) + return false; + + *value = data; + return true; +} + +void Dictionary::SetValueString(string key, const string& value) +{ + SetValueVariant(key, Variant(value)); +} + +bool Dictionary::GetValueInteger(string key, long *value) +{ + Variant data; + + if (!GetValueVariant(key, &data)) + return false; + + *value = data; + return true; +} + +void Dictionary::SetValueInteger(string key, long value) +{ + SetValueVariant(key, Variant(value)); +} + +bool Dictionary::GetValueDictionary(string key, Dictionary::Ptr *value) +{ + Dictionary::Ptr dictionary; + Variant data; + + if (!GetValueVariant(key, &data)) + return false; + + dictionary = dynamic_pointer_cast(data.GetObject()); + + if (dictionary == NULL) + throw InvalidArgumentException(); + + *value = dictionary; + + return true; +} + +void Dictionary::SetValueDictionary(string key, const Dictionary::Ptr& value) +{ + SetValueVariant(key, Variant(value)); +} + +bool Dictionary::GetValueObject(string key, Object::Ptr *value) +{ + Variant data; + + if (!GetValueVariant(key, &data)) + return false; + + *value = data; + return true; +} + +void Dictionary::SetValueObject(string key, const Object::Ptr& value) +{ + SetValueVariant(key, Variant(value)); +} + +DictionaryIterator Dictionary::Begin(void) +{ + return m_Data.begin(); +} + +DictionaryIterator Dictionary::End(void) +{ + return m_Data.end(); +} diff --git a/base/dictionary.h b/base/dictionary.h new file mode 100644 index 000000000..6f062ee08 --- /dev/null +++ b/base/dictionary.h @@ -0,0 +1,39 @@ +#ifndef DICTIONARY_H +#define DICTIONARY_H + +namespace icinga +{ + +typedef map::iterator DictionaryIterator; + +class I2_BASE_API Dictionary : public Object +{ +private: + map m_Data; + +public: + typedef shared_ptr Ptr; + typedef weak_ptr WeakPtr; + + bool GetValueVariant(string key, Variant *value); + void SetValueVariant(string key, const Variant& value); + + bool GetValueString(string key, string *value); + void SetValueString(string key, const string& value); + + bool GetValueInteger(string key, long *value); + void SetValueInteger(string key, long value); + + bool GetValueDictionary(string key, Dictionary::Ptr *value); + void SetValueDictionary(string key, const Dictionary::Ptr& value); + + bool GetValueObject(string key, Object::Ptr *value); + void SetValueObject(string key, const Object::Ptr& value); + + DictionaryIterator Begin(void); + DictionaryIterator End(void); +}; + +} + +#endif /* DICTIONARY_H */ diff --git a/base/event.h b/base/event.h index 5785f376f..a2dbd5e11 100644 --- a/base/event.h +++ b/base/event.h @@ -4,11 +4,8 @@ namespace icinga { -struct I2_BASE_API EventArgs : public Object +struct I2_BASE_API EventArgs { - typedef shared_ptr Ptr; - typedef weak_ptr WeakPtr; - Object::Ptr Source; }; @@ -16,7 +13,7 @@ template class Event { public: - typedef function DelegateType; + typedef function DelegateType; private: list m_Delegates; diff --git a/base/i2-base.h b/base/i2-base.h index a7afa25db..c0f586a70 100644 --- a/base/i2-base.h +++ b/base/i2-base.h @@ -70,6 +70,8 @@ using namespace std::tr1::placeholders; #include "memory.h" #include "delegate.h" #include "event.h" +#include "variant.h" +#include "dictionary.h" #include "timer.h" #include "fifo.h" #include "socket.h" diff --git a/base/socket.cpp b/base/socket.cpp index 42f603aa8..394f8380b 100644 --- a/base/socket.cpp +++ b/base/socket.cpp @@ -16,6 +16,8 @@ Socket::~Socket(void) void Socket::Start(void) { + assert(m_FD != INVALID_SOCKET); + OnException += bind_weak(&Socket::ExceptionEventHandler, shared_from_this()); Sockets.push_front(static_pointer_cast(shared_from_this())); @@ -54,8 +56,8 @@ void Socket::Close(bool from_dtor) /* nobody can possibly have a valid event subscription when the destructor has been called */ if (!from_dtor) { - EventArgs::Ptr ea = make_shared(); - ea->Source = shared_from_this(); + EventArgs ea; + ea.Source = shared_from_this(); OnClosed(ea); } } @@ -84,7 +86,7 @@ string Socket::FormatErrorCode(int code) return result; } -int Socket::ExceptionEventHandler(EventArgs::Ptr ea) +int Socket::ExceptionEventHandler(const EventArgs& ea) { int opt; socklen_t optlen = sizeof(opt); @@ -97,10 +99,10 @@ int Socket::ExceptionEventHandler(EventArgs::Ptr ea) } if (opt != 0) { - SocketErrorEventArgs::Ptr ea = make_shared(); - ea->Code = opt; - ea->Message = FormatErrorCode(ea->Code); - OnError(ea); + SocketErrorEventArgs sea; + sea.Code = opt; + sea.Message = FormatErrorCode(sea.Code); + OnError(sea); Close(); } diff --git a/base/socket.h b/base/socket.h index 3f6f51771..90ac7a266 100644 --- a/base/socket.h +++ b/base/socket.h @@ -17,7 +17,7 @@ class I2_BASE_API Socket : public Object private: SOCKET m_FD; - int ExceptionEventHandler(EventArgs::Ptr ea); + int ExceptionEventHandler(const EventArgs& ea); protected: string FormatErrorCode(int errorCode); @@ -40,12 +40,12 @@ public: static void CloseAllSockets(void); - Event OnReadable; - Event OnWritable; - Event OnException; + Event OnReadable; + Event OnWritable; + Event OnException; - Event OnError; - Event OnClosed; + Event OnError; + Event OnClosed; virtual bool WantsToRead(void) const; virtual bool WantsToWrite(void) const; diff --git a/base/tcpclient.cpp b/base/tcpclient.cpp index 9dcd4d2e7..0325220f4 100644 --- a/base/tcpclient.cpp +++ b/base/tcpclient.cpp @@ -41,15 +41,15 @@ void TCPClient::Connect(const string& hostname, unsigned short port) #else /* _WIN32 */ if (rc < 0 && errno != EINPROGRESS) { #endif /* _WIN32 */ - SocketErrorEventArgs::Ptr ea = make_shared(); + SocketErrorEventArgs sea; #ifdef _WIN32 - ea->Code = WSAGetLastError(); + sea.Code = WSAGetLastError(); #else /* _WIN32 */ - ea->Code = errno; + sea.Code = errno; #endif /* _WIN32 */ - ea->Message = FormatErrorCode(ea->Code); + sea.Message = FormatErrorCode(sea.Code); - OnError(ea); + OnError(sea); Close(); } @@ -79,7 +79,7 @@ int TCPClient::GetPeerPort(void) return m_PeerPort; } -int TCPClient::ReadableEventHandler(EventArgs::Ptr ea) +int TCPClient::ReadableEventHandler(const EventArgs& ea) { int rc; @@ -96,15 +96,15 @@ int TCPClient::ReadableEventHandler(EventArgs::Ptr ea) if (rc <= 0) { if (rc < 0) { - SocketErrorEventArgs::Ptr ea = make_shared(); + SocketErrorEventArgs sea; #ifdef _WIN32 - ea->Code = WSAGetLastError(); + sea.Code = WSAGetLastError(); #else /* _WIN32 */ - ea->Code = errno; + sea.Code = errno; #endif /* _WIN32 */ - ea->Message = FormatErrorCode(ea->Code); + sea.Message = FormatErrorCode(sea.Code); - OnError(ea); + OnError(sea); } Close(); @@ -113,14 +113,14 @@ int TCPClient::ReadableEventHandler(EventArgs::Ptr ea) m_RecvQueue->Write(NULL, rc); - EventArgs::Ptr dea = make_shared(); - dea->Source = shared_from_this(); + EventArgs dea; + dea.Source = shared_from_this(); OnDataAvailable(dea); return 0; } -int TCPClient::WritableEventHandler(EventArgs::Ptr ea) +int TCPClient::WritableEventHandler(const EventArgs& ea) { int rc; @@ -128,15 +128,15 @@ int TCPClient::WritableEventHandler(EventArgs::Ptr ea) if (rc <= 0) { if (rc < 0) { - SocketErrorEventArgs::Ptr ea = make_shared(); + SocketErrorEventArgs sea; #ifdef _WIN32 - ea->Code = WSAGetLastError(); + sea.Code = WSAGetLastError(); #else /* _WIN32 */ - ea->Code = errno; + sea.Code = errno; #endif /* _WIN32 */ - ea->Message = FormatErrorCode(ea->Code); + sea.Message = FormatErrorCode(sea.Code); - OnError(ea); + OnError(sea); } Close(); diff --git a/base/tcpclient.h b/base/tcpclient.h index cca15442d..bc7ac4cf7 100644 --- a/base/tcpclient.h +++ b/base/tcpclient.h @@ -13,8 +13,8 @@ private: FIFO::Ptr m_SendQueue; FIFO::Ptr m_RecvQueue; - int ReadableEventHandler(EventArgs::Ptr ea); - int WritableEventHandler(EventArgs::Ptr ea); + int ReadableEventHandler(const EventArgs& ea); + int WritableEventHandler(const EventArgs& ea); public: typedef shared_ptr Ptr; @@ -35,7 +35,7 @@ public: virtual bool WantsToRead(void) const; virtual bool WantsToWrite(void) const; - Event OnDataAvailable; + Event OnDataAvailable; }; } diff --git a/base/tcpserver.cpp b/base/tcpserver.cpp index d35b1af1b..a96a2f06f 100644 --- a/base/tcpserver.cpp +++ b/base/tcpserver.cpp @@ -36,7 +36,7 @@ void TCPServer::Listen(void) Start(); } -int TCPServer::ReadableEventHandler(EventArgs::Ptr ea) +int TCPServer::ReadableEventHandler(const EventArgs& ea) { int fd; sockaddr_in addr; @@ -44,11 +44,32 @@ int TCPServer::ReadableEventHandler(EventArgs::Ptr ea) fd = accept(GetFD(), (sockaddr *)&addr, &addrlen); - NewClientEventArgs::Ptr nea = make_shared(); - nea->Source = shared_from_this(); - nea->Client = static_pointer_cast(m_ClientFactory()); - nea->Client->SetFD(fd); - nea->Client->Start(); + if (fd == INVALID_SOCKET) { +#ifdef _WIN32 + if (WSAGetLastError() == WSAEWOULDBLOCK) +#else /* _WIN32 */ + if (errno == EINPROGRESS) +#endif /* _WIN32 */ + return 0; + + SocketErrorEventArgs sea; +#ifdef _WIN32 + sea.Code = WSAGetLastError(); +#else /* _WIN32 */ + sea.Code = errno; +#endif /* _WIN32 */ + sea.Message = FormatErrorCode(sea.Code); + + OnError(sea); + + Close(); + } + + NewClientEventArgs nea; + nea.Source = shared_from_this(); + nea.Client = static_pointer_cast(m_ClientFactory()); + nea.Client->SetFD(fd); + nea.Client->Start(); OnNewClient(nea); return 0; diff --git a/base/tcpserver.h b/base/tcpserver.h index 7477837d7..2641c4501 100644 --- a/base/tcpserver.h +++ b/base/tcpserver.h @@ -15,7 +15,7 @@ struct I2_BASE_API NewClientEventArgs : public EventArgs class I2_BASE_API TCPServer : public TCPSocket { private: - int ReadableEventHandler(EventArgs::Ptr ea); + int ReadableEventHandler(const EventArgs& ea); factory_function m_ClientFactory; @@ -32,7 +32,7 @@ public: void Listen(void); - Event OnNewClient; + Event OnNewClient; virtual bool WantsToRead(void) const; }; diff --git a/base/tcpsocket.cpp b/base/tcpsocket.cpp index 009d95194..dc23176ca 100644 --- a/base/tcpsocket.cpp +++ b/base/tcpsocket.cpp @@ -9,14 +9,14 @@ void TCPSocket::MakeSocket(void) int fd = socket(AF_INET, SOCK_STREAM, 0); if (fd == INVALID_SOCKET) { - SocketErrorEventArgs::Ptr ea = make_shared(); + SocketErrorEventArgs sea; #ifdef _WIN32 - ea->Code = WSAGetLastError(); + sea.Code = WSAGetLastError(); #else /* _WIN32 */ - ea->Code = errno; + sea.Code = errno; #endif /* _WIN32 */ - ea->Message = FormatErrorCode(ea->Code); - OnError(ea); + sea.Message = FormatErrorCode(sea.Code); + OnError(sea); } SetFD(fd); @@ -39,15 +39,15 @@ void TCPSocket::Bind(const char *hostname, unsigned short port) int rc = ::bind(GetFD(), (sockaddr *)&sin, sizeof(sin)); if (rc < 0) { - SocketErrorEventArgs::Ptr ea = make_shared(); + SocketErrorEventArgs sea; #ifdef _WIN32 - ea->Code = WSAGetLastError(); + sea.Code = WSAGetLastError(); #else /* _WIN32 */ - ea->Code = errno; + sea.Code = errno; #endif /* _WIN32 */ - ea->Message = FormatErrorCode(ea->Code); + sea.Message = FormatErrorCode(sea.Code); - OnError(ea); + OnError(sea); Close(); } diff --git a/base/timer.cpp b/base/timer.cpp index 42191f389..73c7ff2aa 100644 --- a/base/timer.cpp +++ b/base/timer.cpp @@ -74,10 +74,10 @@ void Timer::StopAllTimers(void) * the timer that originally invoked the delegate */ void Timer::Call(void) { - TimerEventArgs::Ptr ea = make_shared(); - ea->Source = shared_from_this(); - ea->UserArgs = m_UserArgs; - OnTimerExpired(ea); + TimerEventArgs tea; + tea.Source = shared_from_this(); + tea.UserArgs = m_UserArgs; + OnTimerExpired(tea); } void Timer::SetInterval(unsigned int interval) @@ -90,13 +90,13 @@ unsigned int Timer::GetInterval(void) const return m_Interval; } -void Timer::SetUserArgs(const EventArgs::Ptr& userArgs) +void Timer::SetUserArgs(const EventArgs& userArgs) { m_UserArgs = userArgs; } -EventArgs::Ptr Timer::GetUserArgs(void) const +EventArgs Timer::GetUserArgs(void) const { return m_UserArgs; } diff --git a/base/timer.h b/base/timer.h index 4a4e2840b..3e8c7c555 100644 --- a/base/timer.h +++ b/base/timer.h @@ -10,13 +10,13 @@ struct I2_BASE_API TimerEventArgs : public EventArgs typedef shared_ptr Ptr; typedef weak_ptr WeakPtr; - EventArgs::Ptr UserArgs; + EventArgs UserArgs; }; class I2_BASE_API Timer : public Object { private: - EventArgs::Ptr m_UserArgs; + EventArgs m_UserArgs; unsigned int m_Interval; time_t m_Next; @@ -37,8 +37,8 @@ public: void SetInterval(unsigned int interval); unsigned int GetInterval(void) const; - void SetUserArgs(const EventArgs::Ptr& userArgs); - EventArgs::Ptr GetUserArgs(void) const; + void SetUserArgs(const EventArgs& userArgs); + EventArgs GetUserArgs(void) const; static time_t GetNextCall(void); static void CallExpiredTimers(void); @@ -49,7 +49,7 @@ public: void Reschedule(time_t next); - Event OnTimerExpired; + Event OnTimerExpired; }; } diff --git a/base/variant.cpp b/base/variant.cpp new file mode 100644 index 000000000..953fcf907 --- /dev/null +++ b/base/variant.cpp @@ -0,0 +1,68 @@ +#include "i2-base.h" + +using namespace icinga; + +Variant::Variant(void) : m_Type(VariantEmpty) +{ +} + +Variant::Variant(long value) : m_Type(VariantInteger), m_IntegerValue(value) +{ +} + +Variant::Variant(string value) : m_Type(VariantString), m_StringValue(value) +{ +} + +Variant::Variant(Object::Ptr value) : m_Type(VariantObject), m_ObjectValue(value) +{ +} + +void Variant::Convert(VariantType newType) const +{ + if (newType == m_Type) + return; + + throw NotImplementedException(); +} + +VariantType Variant::GetType(void) const +{ + return m_Type; +} + +long Variant::GetInteger(void) const +{ + Convert(VariantInteger); + + return m_IntegerValue; +} + +string Variant::GetString(void) const +{ + Convert(VariantString); + + return m_StringValue; +} + +Object::Ptr Variant::GetObject(void) const +{ + Convert(VariantObject); + + return m_ObjectValue; +} + +Variant::operator long(void) const +{ + return GetInteger(); +} + +Variant::operator string(void) const +{ + return GetString(); +} + +Variant::operator Object::Ptr(void) const +{ + return GetObject(); +} diff --git a/base/variant.h b/base/variant.h new file mode 100644 index 000000000..ef44260e3 --- /dev/null +++ b/base/variant.h @@ -0,0 +1,45 @@ +#ifndef VARIANT_H +#define VARIANT_H + +namespace icinga +{ + +enum I2_BASE_API VariantType +{ + VariantEmpty, + VariantInteger, + VariantString, + VariantObject +}; + +class I2_BASE_API Variant +{ +private: + mutable long m_IntegerValue; + mutable string m_StringValue; + mutable Object::Ptr m_ObjectValue; + + mutable VariantType m_Type; + + void Convert(VariantType newType) const; + +public: + Variant(void); + Variant(long value); + Variant(string value); + Variant(Object::Ptr value); + + VariantType GetType(void) const; + + long GetInteger(void) const; + string GetString(void) const; + Object::Ptr GetObject(void) const; + + operator long(void) const; + operator string(void) const; + operator Object::Ptr(void) const; +}; + +} + +#endif /* VARIANT_H */ \ No newline at end of file diff --git a/components/configrpc/configrpccomponent.cpp b/components/configrpc/configrpccomponent.cpp index 2d9ac1fb0..9edcf0733 100644 --- a/components/configrpc/configrpccomponent.cpp +++ b/components/configrpc/configrpccomponent.cpp @@ -46,114 +46,118 @@ void ConfigRpcComponent::Stop(void) // TODO: implement } -JsonRpcMessage::Ptr ConfigRpcComponent::MakeObjectMessage(const ConfigObject::Ptr& object, string method, bool includeProperties) +JsonRpcRequest ConfigRpcComponent::MakeObjectMessage(const ConfigObject::Ptr& object, string method, bool includeProperties) { - JsonRpcMessage::Ptr msg = make_shared(); - msg->SetVersion("2.0"); - msg->SetMethod(method); - cJSON *params = msg->GetParams(); + JsonRpcRequest msg; + msg.SetVersion("2.0"); + msg.SetMethod(method); - string name = object->GetName(); - cJSON_AddStringToObject(params, "name", name.c_str()); + Message params; + msg.SetParams(params); - string type = object->GetType(); - cJSON_AddStringToObject(params, "type", type.c_str()); + params.GetDictionary()->SetValueString("name", object->GetName()); + params.GetDictionary()->SetValueString("type", object->GetType()); if (includeProperties) { - cJSON *properties = cJSON_CreateObject(); - cJSON_AddItemToObject(params, "properties", properties); + Message properties; + params.GetDictionary()->SetValueDictionary("properties", properties.GetDictionary()); for (ConfigObject::ParameterIterator pi = object->Properties.begin(); pi != object->Properties.end(); pi++) { - cJSON_AddStringToObject(properties, pi->first.c_str(), pi->second.c_str()); + properties.GetDictionary()->SetValueString(pi->first, pi->second); } } return msg; } -int ConfigRpcComponent::FetchObjectsHandler(NewMessageEventArgs::Ptr ea) +int ConfigRpcComponent::FetchObjectsHandler(const NewRequestEventArgs& ea) { - JsonRpcClient::Ptr client = static_pointer_cast(ea->Source); + Endpoint::Ptr client = ea.Sender; ConfigHive::Ptr configHive = GetIcingaApplication()->GetConfigHive(); for (ConfigHive::CollectionIterator ci = configHive->Collections.begin(); ci != configHive->Collections.end(); ci++) { ConfigCollection::Ptr collection = ci->second; for (ConfigCollection::ObjectIterator oi = collection->Objects.begin(); oi != collection->Objects.end(); oi++) { - JsonRpcMessage::Ptr msg = MakeObjectMessage(oi->second, "config::ObjectCreated", true); - client->SendMessage(msg); + client->ProcessRequest(m_ConfigRpcEndpoint, MakeObjectMessage(oi->second, "config::ObjectCreated", true)); } } return 0; } -int ConfigRpcComponent::LocalObjectCreatedHandler(ConfigObjectEventArgs::Ptr ea) +int ConfigRpcComponent::LocalObjectCreatedHandler(const ConfigObjectEventArgs& ea) { - ConfigObject::Ptr object = static_pointer_cast(ea->Source); + ConfigObject::Ptr object = static_pointer_cast(ea.Source); int replicate = 0; object->GetPropertyInteger("replicate", &replicate); if (replicate) { EndpointManager::Ptr mgr = GetIcingaApplication()->GetEndpointManager(); - mgr->SendMessage(m_ConfigRpcEndpoint, Endpoint::Ptr(), MakeObjectMessage(object, "config::ObjectCreated", true)); + mgr->SendMulticastRequest(m_ConfigRpcEndpoint, MakeObjectMessage(object, "config::ObjectCreated", true)); } return 0; } -int ConfigRpcComponent::LocalObjectRemovedHandler(ConfigObjectEventArgs::Ptr ea) +int ConfigRpcComponent::LocalObjectRemovedHandler(const ConfigObjectEventArgs& ea) { - ConfigObject::Ptr object = static_pointer_cast(ea->Source); + ConfigObject::Ptr object = static_pointer_cast(ea.Source); int replicate = 0; object->GetPropertyInteger("replicate", &replicate); if (replicate) { EndpointManager::Ptr mgr = GetIcingaApplication()->GetEndpointManager(); - mgr->SendMessage(m_ConfigRpcEndpoint, Endpoint::Ptr(), MakeObjectMessage(object, "config::ObjectRemoved", false)); + mgr->SendMulticastRequest(m_ConfigRpcEndpoint, MakeObjectMessage(object, "config::ObjectRemoved", false)); } return 0; } -int ConfigRpcComponent::LocalPropertyChangedHandler(ConfigObjectEventArgs::Ptr ea) +int ConfigRpcComponent::LocalPropertyChangedHandler(const ConfigObjectEventArgs& ea) { - ConfigObject::Ptr object = static_pointer_cast(ea->Source); + ConfigObject::Ptr object = static_pointer_cast(ea.Source); int replicate = 0; object->GetPropertyInteger("replicate", &replicate); if (replicate) { - JsonRpcMessage::Ptr msg = MakeObjectMessage(object, "config::PropertyChanged", false); - cJSON *params = msg->GetParams(); + JsonRpcRequest msg = MakeObjectMessage(object, "config::PropertyChanged", false); + Message params; + msg.SetParams(params); - cJSON *properties = cJSON_CreateObject(); - cJSON_AddItemToObject(params, "properties", properties); + Message properties; + params.GetDictionary()->SetValueDictionary("properties", properties.GetDictionary()); string value; - object->GetProperty(ea->Property, &value); + object->GetProperty(ea.Property, &value); - cJSON_AddStringToObject(properties, ea->Property.c_str(), value.c_str()); + properties.GetDictionary()->SetValueString(ea.Property, value); EndpointManager::Ptr mgr = GetIcingaApplication()->GetEndpointManager(); - mgr->SendMessage(m_ConfigRpcEndpoint, Endpoint::Ptr(), msg); + mgr->SendMulticastRequest(m_ConfigRpcEndpoint, msg); } return 0; } -int ConfigRpcComponent::RemoteObjectUpdatedHandler(NewMessageEventArgs::Ptr ea) +int ConfigRpcComponent::RemoteObjectUpdatedHandler(const NewRequestEventArgs& ea) { - JsonRpcMessage::Ptr message = ea->Message; - string name, type, value; + JsonRpcRequest message = ea.Request; bool was_null = false; - if (!message->GetParamString("name", &name)) + Message params; + if (!message.GetParams(¶ms)) return 0; - if (!message->GetParamString("type", &type)) + string name; + if (!params.GetDictionary()->GetValueString("name", &name)) + return 0; + + string type; + if (!params.GetDictionary()->GetValueString("type", &type)) return 0; ConfigHive::Ptr configHive = GetIcingaApplication()->GetConfigHive(); @@ -164,15 +168,12 @@ int ConfigRpcComponent::RemoteObjectUpdatedHandler(NewMessageEventArgs::Ptr ea) object = make_shared(type, name); } - cJSON *properties = message->GetParam("properties"); - - if (properties != NULL) { - for (cJSON *prop = properties->child; prop != NULL; prop = prop->next) { - if (prop->type != cJSON_String) - continue; + Dictionary::Ptr properties; + if (!params.GetDictionary()->GetValueDictionary("properties", &properties)) + return 0; - object->SetProperty(prop->string, prop->valuestring); - } + for (DictionaryIterator i = properties->Begin(); i != properties->End(); i++) { + object->SetProperty(i->first, i->second); } if (was_null) @@ -181,16 +182,20 @@ int ConfigRpcComponent::RemoteObjectUpdatedHandler(NewMessageEventArgs::Ptr ea) return 0; } -int ConfigRpcComponent::RemoteObjectRemovedHandler(NewMessageEventArgs::Ptr ea) +int ConfigRpcComponent::RemoteObjectRemovedHandler(const NewRequestEventArgs& ea) { - JsonRpcRequest::Ptr message = ea->Message->Cast(); - Message::Ptr params = message->GetParams(); - string name, type; + JsonRpcRequest message = ea.Request; - if (!message->GetParamString("name", &name)) + Message params; + if (!message.GetParams(¶ms)) + return 0; + + string name; + if (!params.GetDictionary()->GetValueString("name", &name)) return 0; - if (!message->GetParamString("type", &type)) + string type; + if (!params.GetDictionary()->GetValueString("type", &type)) return 0; ConfigHive::Ptr configHive = GetIcingaApplication()->GetConfigHive(); diff --git a/components/configrpc/configrpccomponent.h b/components/configrpc/configrpccomponent.h index dcf2f740b..ede2cdd18 100644 --- a/components/configrpc/configrpccomponent.h +++ b/components/configrpc/configrpccomponent.h @@ -11,16 +11,15 @@ private: IcingaApplication::Ptr GetIcingaApplication(void); - int FetchObjectsHandler(NewMessageEventArgs::Ptr ea); + int LocalObjectCreatedHandler(const ConfigObjectEventArgs& ea); + int LocalObjectRemovedHandler(const ConfigObjectEventArgs& ea); + int LocalPropertyChangedHandler(const ConfigObjectEventArgs& ea); - int LocalObjectCreatedHandler(ConfigObjectEventArgs::Ptr ea); - int LocalObjectRemovedHandler(ConfigObjectEventArgs::Ptr ea); - int LocalPropertyChangedHandler(ConfigObjectEventArgs::Ptr ea); + int FetchObjectsHandler(const NewRequestEventArgs& ea); + int RemoteObjectUpdatedHandler(const NewRequestEventArgs& ea); + int RemoteObjectRemovedHandler(const NewRequestEventArgs& ea); - int RemoteObjectUpdatedHandler(NewMessageEventArgs::Ptr ea); - int RemoteObjectRemovedHandler(NewMessageEventArgs::Ptr ea); - - JsonRpcMessage::Ptr MakeObjectMessage(const ConfigObject::Ptr& object, string method, bool includeProperties); + JsonRpcRequest MakeObjectMessage(const ConfigObject::Ptr& object, string method, bool includeProperties); public: virtual string GetName(void); diff --git a/icinga.sln b/icinga.sln index 58e5c3854..b165bd7f1 100644 --- a/icinga.sln +++ b/icinga.sln @@ -6,7 +6,6 @@ EndProject Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "jsonrpc", "jsonrpc\jsonrpc.vcxproj", "{8DD52FAC-ECEE-48C2-B266-E7C47ED485F8}" ProjectSection(ProjectDependencies) = postProject {66BED474-C33F-48F9-90BA-BBCFEDC006B8} = {66BED474-C33F-48F9-90BA-BBCFEDC006B8} - {4F00EE82-B829-4872-B8F0-C1A8D86C94B4} = {4F00EE82-B829-4872-B8F0-C1A8D86C94B4} {9C92DA90-FD53-43A9-A244-90F2E8AF9677} = {9C92DA90-FD53-43A9-A244-90F2E8AF9677} EndProjectSection EndProject @@ -33,8 +32,6 @@ Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "configrpc", "components\con {C1FC77E1-04A4-481B-A78B-2F7AF489C2F8} = {C1FC77E1-04A4-481B-A78B-2F7AF489C2F8} EndProjectSection EndProject -Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "msgc", "msgc\msgc.vcxproj", "{4F00EE82-B829-4872-B8F0-C1A8D86C94B4}" -EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Win32 = Debug|Win32 @@ -69,10 +66,6 @@ Global {697C6D7E-3109-484C-A7AF-384D28711610}.Debug|Win32.Build.0 = Debug|Win32 {697C6D7E-3109-484C-A7AF-384D28711610}.Release|Win32.ActiveCfg = Release|Win32 {697C6D7E-3109-484C-A7AF-384D28711610}.Release|Win32.Build.0 = Release|Win32 - {4F00EE82-B829-4872-B8F0-C1A8D86C94B4}.Debug|Win32.ActiveCfg = Debug|Win32 - {4F00EE82-B829-4872-B8F0-C1A8D86C94B4}.Debug|Win32.Build.0 = Debug|Win32 - {4F00EE82-B829-4872-B8F0-C1A8D86C94B4}.Release|Win32.ActiveCfg = Release|Win32 - {4F00EE82-B829-4872-B8F0-C1A8D86C94B4}.Release|Win32.Build.0 = Release|Win32 EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE diff --git a/icinga/endpoint.cpp b/icinga/endpoint.cpp index 63c2c9b99..d95b80807 100644 --- a/icinga/endpoint.cpp +++ b/icinga/endpoint.cpp @@ -2,6 +2,16 @@ using namespace icinga; +EndpointManager::Ptr Endpoint::GetEndpointManager(void) const +{ + return m_EndpointManager; +} + +void Endpoint::SetEndpointManager(EndpointManager::Ptr manager) +{ + m_EndpointManager = manager; +} + void Endpoint::RegisterMethodSink(string method) { m_MethodSinks.insert(method); @@ -12,11 +22,21 @@ void Endpoint::UnregisterMethodSink(string method) m_MethodSinks.erase(method); } -bool Endpoint::IsMethodSink(string method) +bool Endpoint::IsMethodSink(string method) const { return (m_MethodSinks.find(method) != m_MethodSinks.end()); } +void Endpoint::ForeachMethodSink(function callback) +{ + for (set::iterator i = m_MethodSinks.begin(); i != m_MethodSinks.end(); i++) { + NewMethodEventArgs nmea; + nmea.Source = shared_from_this(); + nmea.Method = *i; + callback(nmea); + } +} + void Endpoint::RegisterMethodSource(string method) { m_MethodSources.insert(method); @@ -27,7 +47,17 @@ void Endpoint::UnregisterMethodSource(string method) m_MethodSources.erase(method); } -bool Endpoint::IsMethodSource(string method) +bool Endpoint::IsMethodSource(string method) const { return (m_MethodSources.find(method) != m_MethodSinks.end()); } + +void Endpoint::ForeachMethodSource(function callback) +{ + for (set::iterator i = m_MethodSources.begin(); i != m_MethodSources.end(); i++) { + NewMethodEventArgs nmea; + nmea.Source = shared_from_this(); + nmea.Method = *i; + callback(nmea); + } +} diff --git a/icinga/endpoint.h b/icinga/endpoint.h index 92b0ab661..1a7f8258e 100644 --- a/icinga/endpoint.h +++ b/icinga/endpoint.h @@ -6,26 +6,44 @@ namespace icinga class EndpointManager; +struct I2_ICINGA_API NewMethodEventArgs : public EventArgs +{ + string Method; +}; + class I2_ICINGA_API Endpoint : public Object { private: set m_MethodSinks; set m_MethodSources; + shared_ptr m_EndpointManager; + public: typedef shared_ptr Ptr; typedef weak_ptr WeakPtr; + shared_ptr GetEndpointManager(void) const; + void SetEndpointManager(shared_ptr manager); + void RegisterMethodSink(string method); void UnregisterMethodSink(string method); - bool IsMethodSink(string method); + bool IsMethodSink(string method) const; void RegisterMethodSource(string method); void UnregisterMethodSource(string method); - bool IsMethodSource(string method); + bool IsMethodSource(string method) const; + + virtual bool IsLocal(void) const = 0; + + virtual void ProcessRequest(Endpoint::Ptr sender, const JsonRpcRequest& message) = 0; + virtual void ProcessResponse(Endpoint::Ptr sender, const JsonRpcResponse& message) = 0; + + Event OnNewMethodSink; + Event OnNewMethodSource; - virtual void SendRequest(Endpoint::Ptr sender, JsonRpcRequest::Ptr message) = 0; - virtual void SendResponse(Endpoint::Ptr sender, JsonRpcResponse::Ptr message) = 0; + void ForeachMethodSink(function callback); + void ForeachMethodSource(function callback); }; } diff --git a/icinga/endpointmanager.cpp b/icinga/endpointmanager.cpp index f8de5a48d..bf6b877ce 100644 --- a/icinga/endpointmanager.cpp +++ b/icinga/endpointmanager.cpp @@ -23,14 +23,12 @@ void EndpointManager::AddListener(unsigned short port) server->Start(); } -void EndpointManager::AddConnection(string host, short port) +void EndpointManager::AddConnection(string host, unsigned short port) { - JsonRpcClient::Ptr client = make_shared(); - RegisterClient(client); + JsonRpcEndpoint::Ptr endpoint = make_shared(); + RegisterEndpoint(endpoint); - client->MakeSocket(); - client->Connect(host, port); - client->Start(); + endpoint->Connect(host, port); } void EndpointManager::RegisterServer(JsonRpcServer::Ptr server) @@ -39,81 +37,32 @@ void EndpointManager::RegisterServer(JsonRpcServer::Ptr server) server->OnNewClient += bind_weak(&EndpointManager::NewClientHandler, shared_from_this()); } -void EndpointManager::UnregisterServer(JsonRpcServer::Ptr server) -{ - m_Servers.remove(server); - // TODO: unbind event -} - -void EndpointManager::RegisterClient(JsonRpcClient::Ptr client) -{ - m_Clients.push_front(client); - client->OnNewMessage += bind_weak(&EndpointManager::NewMessageHandler, shared_from_this()); - client->OnClosed += bind_weak(&EndpointManager::CloseClientHandler, shared_from_this()); - client->OnError += bind_weak(&EndpointManager::ErrorClientHandler, shared_from_this()); -} - -void EndpointManager::UnregisterClient(JsonRpcClient::Ptr client) +int EndpointManager::NewClientHandler(const NewClientEventArgs& ncea) { - m_Clients.remove(client); - // TODO: unbind event -} + JsonRpcEndpoint::Ptr endpoint = make_shared(); + RegisterEndpoint(endpoint); -int EndpointManager::NewClientHandler(NewClientEventArgs::Ptr ncea) -{ - JsonRpcClient::Ptr client = static_pointer_cast(ncea->Client); - RegisterClient(client); + endpoint->SetClient(static_pointer_cast(ncea.Client)); return 0; } -int EndpointManager::CloseClientHandler(EventArgs::Ptr ea) -{ - JsonRpcClient::Ptr client = static_pointer_cast(ea->Source); - UnregisterClient(client); - - if (client->GetPeerHost() != string()) { - Timer::Ptr timer = make_shared(); - timer->SetInterval(30); - timer->SetUserArgs(ea); - timer->OnTimerExpired += bind_weak(&EndpointManager::ReconnectClientHandler, shared_from_this()); - timer->Start(); - m_ReconnectTimers.push_front(timer); - } - - return 0; -} - -int EndpointManager::ErrorClientHandler(SocketErrorEventArgs::Ptr ea) -{ - cout << "Error occured for JSON-RPC socket: Code=" << ea->Code << "; Message=" << ea->Message << endl; - - return 0; -} - -int EndpointManager::ReconnectClientHandler(TimerEventArgs::Ptr ea) -{ - JsonRpcClient::Ptr client = static_pointer_cast(ea->UserArgs->Source); - Timer::Ptr timer = static_pointer_cast(ea->Source); - - AddConnection(client->GetPeerHost(), client->GetPeerPort()); - - timer->Stop(); - m_ReconnectTimers.remove(timer); - - return 0; -} - -int EndpointManager::NewMessageHandler(NewMessageEventArgs::Ptr nmea) +void EndpointManager::UnregisterServer(JsonRpcServer::Ptr server) { -// TODO: implement - - return 0; + m_Servers.remove(server); + // TODO: unbind event } void EndpointManager::RegisterEndpoint(Endpoint::Ptr endpoint) { + endpoint->SetEndpointManager(static_pointer_cast(shared_from_this())); m_Endpoints.push_front(endpoint); + + endpoint->OnNewMethodSink += bind_weak(&EndpointManager::NewMethodSinkHandler, shared_from_this()); + endpoint->ForeachMethodSink(bind(&EndpointManager::NewMethodSinkHandler, this, _1)); + + endpoint->OnNewMethodSource += bind_weak(&EndpointManager::NewMethodSourceHandler, shared_from_this()); + endpoint->ForeachMethodSource(bind(&EndpointManager::NewMethodSourceHandler, this, _1)); } void EndpointManager::UnregisterEndpoint(Endpoint::Ptr endpoint) @@ -121,21 +70,21 @@ void EndpointManager::UnregisterEndpoint(Endpoint::Ptr endpoint) m_Endpoints.remove(endpoint); } -void EndpointManager::SendAnycastRequest(Endpoint::Ptr sender, JsonRpcRequest::Ptr request) +void EndpointManager::SendAnycastRequest(Endpoint::Ptr sender, const JsonRpcRequest& request, bool fromLocal) { throw NotImplementedException(); } -void EndpointManager::SendMulticastRequest(Endpoint::Ptr sender, JsonRpcRequest::Ptr request) +void EndpointManager::SendMulticastRequest(Endpoint::Ptr sender, const JsonRpcRequest& request, bool fromLocal) { #ifdef _DEBUG string id; - if (request->GetID(&id)) + if (request.GetID(&id)) throw InvalidArgumentException("Multicast requests must not have an ID."); #endif /* _DEBUG */ string method; - if (!request->GetMethod(&method)) + if (!request.GetMethod(&method)) throw InvalidArgumentException(); for (list::iterator i = m_Endpoints.begin(); i != m_Endpoints.end(); i++) @@ -145,7 +94,58 @@ void EndpointManager::SendMulticastRequest(Endpoint::Ptr sender, JsonRpcRequest: if (endpoint == sender) continue; + /* send non-local messages to just the local endpoints */ + if (!fromLocal && !endpoint->IsLocal()) + continue; + if (endpoint->IsMethodSink(method)) - endpoint->SendRequest(sender, request); + endpoint->ProcessRequest(sender, request); + } +} + +int EndpointManager::NewMethodSinkHandler(const NewMethodEventArgs& ea) +{ + Endpoint::Ptr sender = static_pointer_cast(ea.Source); + + if (!sender->IsLocal()) + return 0; + + JsonRpcRequest request; + request.SetVersion("2.0"); + request.SetMethod("message::Subscribe"); + + Message params; + params.GetDictionary()->SetValueString("method", ea.Method); + request.SetParams(params); + + SendMulticastRequest(sender, request); + + return 0; +} + +int EndpointManager::NewMethodSourceHandler(const NewMethodEventArgs& ea) +{ + Endpoint::Ptr sender = static_pointer_cast(ea.Source); + + if (!sender->IsLocal()) + return 0; + + JsonRpcRequest request; + request.SetVersion("2.0"); + request.SetMethod("message::Provide"); + + Message params; + params.GetDictionary()->SetValueString("method", ea.Method); + request.SetParams(params); + + SendMulticastRequest(sender, request); + + return 0; +} + +void EndpointManager::ForeachEndpoint(function callback) +{ + for (list::iterator i = m_Endpoints.begin(); i != m_Endpoints.end(); i++) { + callback(*i); } } diff --git a/icinga/endpointmanager.h b/icinga/endpointmanager.h index 4c5e43c66..642004fc9 100644 --- a/icinga/endpointmanager.h +++ b/icinga/endpointmanager.h @@ -7,23 +7,17 @@ namespace icinga class I2_ICINGA_API EndpointManager : public Object { list m_Servers; - list m_Clients; - list m_ReconnectTimers; list m_Endpoints; string m_Identity; - int NewClientHandler(NewClientEventArgs::Ptr ncea); - int CloseClientHandler(EventArgs::Ptr ea); - int ErrorClientHandler(SocketErrorEventArgs::Ptr ea); - int ReconnectClientHandler(TimerEventArgs::Ptr ea); - int NewMessageHandler(NewMessageEventArgs::Ptr nmea); - - void RegisterClient(JsonRpcClient::Ptr server); - void UnregisterClient(JsonRpcClient::Ptr server); - void RegisterServer(JsonRpcServer::Ptr server); void UnregisterServer(JsonRpcServer::Ptr server); + int NewClientHandler(const NewClientEventArgs& ncea); + + int NewMethodSinkHandler(const NewMethodEventArgs& ea); + int NewMethodSourceHandler(const NewMethodEventArgs& ea); + public: typedef shared_ptr Ptr; typedef weak_ptr WeakPtr; @@ -32,13 +26,15 @@ public: string GetIdentity(void) const; void AddListener(unsigned short port); - void AddConnection(string host, short port); + void AddConnection(string host, unsigned short port); void RegisterEndpoint(Endpoint::Ptr endpoint); void UnregisterEndpoint(Endpoint::Ptr endpoint); - void SendAnycastRequest(Endpoint::Ptr sender, JsonRpcRequest::Ptr request); - void SendMulticastRequest(Endpoint::Ptr sender, JsonRpcRequest::Ptr request); + void SendAnycastRequest(Endpoint::Ptr sender, const JsonRpcRequest& request, bool fromLocal = true); + void SendMulticastRequest(Endpoint::Ptr sender, const JsonRpcRequest& request, bool fromLocal = true); + + void ForeachEndpoint(function callback); }; } diff --git a/icinga/icingaapplication.cpp b/icinga/icingaapplication.cpp index 7f421555d..f74fc6b30 100644 --- a/icinga/icingaapplication.cpp +++ b/icinga/icingaapplication.cpp @@ -32,7 +32,7 @@ int IcingaApplication::Main(const vector& args) ConfigCollection::Ptr componentCollection = GetConfigHive()->GetCollection("component"); - function NewComponentHandler = bind_weak(&IcingaApplication::NewComponentHandler, shared_from_this()); + function NewComponentHandler = bind_weak(&IcingaApplication::NewComponentHandler, shared_from_this()); componentCollection->OnObjectCreated += NewComponentHandler; componentCollection->ForEachObject(NewComponentHandler); @@ -40,7 +40,7 @@ int IcingaApplication::Main(const vector& args) ConfigCollection::Ptr listenerCollection = GetConfigHive()->GetCollection("rpclistener"); - function NewRpcListenerHandler = bind_weak(&IcingaApplication::NewRpcListenerHandler, shared_from_this()); + function NewRpcListenerHandler = bind_weak(&IcingaApplication::NewRpcListenerHandler, shared_from_this()); listenerCollection->OnObjectCreated += NewRpcListenerHandler; listenerCollection->ForEachObject(NewRpcListenerHandler); @@ -48,7 +48,7 @@ int IcingaApplication::Main(const vector& args) ConfigCollection::Ptr connectionCollection = GetConfigHive()->GetCollection("rpcconnection"); - function NewRpcConnectionHandler = bind_weak(&IcingaApplication::NewRpcConnectionHandler, shared_from_this()); + function NewRpcConnectionHandler = bind_weak(&IcingaApplication::NewRpcConnectionHandler, shared_from_this()); connectionCollection->OnObjectCreated += NewRpcConnectionHandler; connectionCollection->ForEachObject(NewRpcConnectionHandler); @@ -61,11 +61,33 @@ int IcingaApplication::Main(const vector& args) ConfigCollection::Ptr collection = GetConfigHive()->GetCollection("rpclistener"); + m_TestEndpoint = make_shared(); + m_EndpointManager->RegisterEndpoint(m_TestEndpoint); + m_TestEndpoint->RegisterMethodSink("test"); + m_TestEndpoint->RegisterMethodSource("test"); + + m_TestTimer = make_shared(); + m_TestTimer->SetInterval(5); + m_TestTimer->OnTimerExpired += bind_weak(&IcingaApplication::TestTimerHandler, shared_from_this()); + m_TestTimer->Start(); + RunEventLoop(); return EXIT_SUCCESS; } +int IcingaApplication::TestTimerHandler(const TimerEventArgs& tea) +{ + cout << "Problem?" << endl; + + JsonRpcRequest request; + request.SetVersion("2.0"); + request.SetMethod("test"); + m_EndpointManager->SendMulticastRequest(m_TestEndpoint, request); + + return 0; +} + void IcingaApplication::PrintUsage(const string& programPath) { cout << "Syntax: " << programPath << " " << endl; @@ -76,10 +98,10 @@ EndpointManager::Ptr IcingaApplication::GetEndpointManager(void) return m_EndpointManager; } -int IcingaApplication::NewComponentHandler(ConfigObjectEventArgs::Ptr ea) +int IcingaApplication::NewComponentHandler(const ConfigObjectEventArgs& ea) { string path; - ConfigObject::Ptr object = static_pointer_cast(ea->Source); + ConfigObject::Ptr object = static_pointer_cast(ea.Source); if (!object->GetProperty("path", &path)) { #ifdef _WIN32 @@ -96,17 +118,18 @@ int IcingaApplication::NewComponentHandler(ConfigObjectEventArgs::Ptr ea) return 0; } -int IcingaApplication::DeletedComponentHandler(ConfigObjectEventArgs::Ptr ea) +int IcingaApplication::DeletedComponentHandler(const ConfigObjectEventArgs& ea) { - ConfigObject::Ptr object = static_pointer_cast(ea->Source); - UnloadComponent(object->GetName()); + ConfigObject::Ptr object = static_pointer_cast(ea.Source); + Component::Ptr component = GetComponent(object->GetName()); + UnregisterComponent(component); return 0; } -int IcingaApplication::NewRpcListenerHandler(ConfigObjectEventArgs::Ptr ea) +int IcingaApplication::NewRpcListenerHandler(const ConfigObjectEventArgs& ea) { - ConfigObject::Ptr object = static_pointer_cast(ea->Source); + ConfigObject::Ptr object = static_pointer_cast(ea.Source); int port; if (!object->GetPropertyInteger("port", &port)) @@ -119,16 +142,16 @@ int IcingaApplication::NewRpcListenerHandler(ConfigObjectEventArgs::Ptr ea) return 0; } -int IcingaApplication::DeletedRpcListenerHandler(ConfigObjectEventArgs::Ptr ea) +int IcingaApplication::DeletedRpcListenerHandler(const ConfigObjectEventArgs& ea) { throw Exception("Unsupported operation."); return 0; } -int IcingaApplication::NewRpcConnectionHandler(ConfigObjectEventArgs::Ptr ea) +int IcingaApplication::NewRpcConnectionHandler(const ConfigObjectEventArgs& ea) { - ConfigObject::Ptr object = static_pointer_cast(ea->Source); + ConfigObject::Ptr object = static_pointer_cast(ea.Source); string hostname; int port; @@ -145,7 +168,7 @@ int IcingaApplication::NewRpcConnectionHandler(ConfigObjectEventArgs::Ptr ea) return 0; } -int IcingaApplication::DeletedRpcConnectionHandler(ConfigObjectEventArgs::Ptr ea) +int IcingaApplication::DeletedRpcConnectionHandler(const ConfigObjectEventArgs& ea) { throw Exception("Unsupported operation."); diff --git a/icinga/icingaapplication.h b/icinga/icingaapplication.h index 30485b8de..7c46296c2 100644 --- a/icinga/icingaapplication.h +++ b/icinga/icingaapplication.h @@ -8,16 +8,19 @@ class I2_ICINGA_API IcingaApplication : public Application { private: EndpointManager::Ptr m_EndpointManager; + Timer::Ptr m_TestTimer; + VirtualEndpoint::Ptr m_TestEndpoint; - int NewComponentHandler(ConfigObjectEventArgs::Ptr ea); - int DeletedComponentHandler(ConfigObjectEventArgs::Ptr ea); + int NewComponentHandler(const ConfigObjectEventArgs& ea); + int DeletedComponentHandler(const ConfigObjectEventArgs& ea); - int NewRpcListenerHandler(ConfigObjectEventArgs::Ptr ea); - int DeletedRpcListenerHandler(ConfigObjectEventArgs::Ptr ea); + int NewRpcListenerHandler(const ConfigObjectEventArgs& ea); + int DeletedRpcListenerHandler(const ConfigObjectEventArgs& ea); - int NewRpcConnectionHandler(ConfigObjectEventArgs::Ptr ea); - int DeletedRpcConnectionHandler(ConfigObjectEventArgs::Ptr ea); + int NewRpcConnectionHandler(const ConfigObjectEventArgs& ea); + int DeletedRpcConnectionHandler(const ConfigObjectEventArgs& ea); + int TestTimerHandler(const TimerEventArgs& tea); public: typedef shared_ptr Ptr; typedef weak_ptr WeakPtr; diff --git a/icinga/jsonrpcendpoint.cpp b/icinga/jsonrpcendpoint.cpp index 140a04e31..84a55a1ff 100644 --- a/icinga/jsonrpcendpoint.cpp +++ b/icinga/jsonrpcendpoint.cpp @@ -7,9 +7,66 @@ JsonRpcClient::Ptr JsonRpcEndpoint::GetClient(void) return m_Client; } +void JsonRpcEndpoint::Connect(string host, unsigned short port) +{ + JsonRpcClient::Ptr client = make_shared(); + client->MakeSocket(); + client->Connect(host, port); + client->Start(); + + SetClient(client); +} + +int JsonRpcEndpoint::SyncSubscription(string type, const NewMethodEventArgs& nmea) +{ + JsonRpcRequest request; + request.SetVersion("2.0"); + request.SetMethod(type); + + Message params; + params.GetDictionary()->SetValueString("method", nmea.Method); + request.SetParams(params); + + m_Client->SendMessage(request); + + return 0; +} + +int JsonRpcEndpoint::SyncSubscriptions(Endpoint::Ptr endpoint) +{ + if (!endpoint->IsLocal()) + return 0; + + endpoint->ForeachMethodSink(bind(&JsonRpcEndpoint::SyncSubscription, this, "message::Subscribe", _1)); + endpoint->ForeachMethodSource(bind(&JsonRpcEndpoint::SyncSubscription, this, "message::Provide", _1)); + + return 0; +} + void JsonRpcEndpoint::SetClient(JsonRpcClient::Ptr client) { m_Client = client; + client->OnNewMessage += bind_weak(&JsonRpcEndpoint::NewMessageHandler, shared_from_this()); + client->OnClosed += bind_weak(&JsonRpcEndpoint::ClientClosedHandler, shared_from_this()); + client->OnError += bind_weak(&JsonRpcEndpoint::ClientErrorHandler, shared_from_this()); + + NewMethodEventArgs nmea; + nmea.Source = shared_from_this(); + + nmea.Method = "message::Subscribe"; + SyncSubscription("message::Subscribe", nmea); + SyncSubscription("message::Provide", nmea); + + nmea.Method = "message::Provide"; + SyncSubscription("message::Subscribe", nmea); + SyncSubscription("message::Provide", nmea); + + GetEndpointManager()->ForeachEndpoint(bind(&JsonRpcEndpoint::SyncSubscriptions, this, _1)); +} + +bool JsonRpcEndpoint::IsLocal(void) const +{ + return false; } bool JsonRpcEndpoint::IsConnected(void) const @@ -17,14 +74,99 @@ bool JsonRpcEndpoint::IsConnected(void) const return (m_Client.get() != NULL); } -void JsonRpcEndpoint::SendRequest(Endpoint::Ptr sender, JsonRpcRequest::Ptr message) +void JsonRpcEndpoint::ProcessRequest(Endpoint::Ptr sender, const JsonRpcRequest& message) { - if (IsConnected()) + if (IsConnected()) { + string id; + if (message.GetID(&id)) + // TODO: remove calls after a certain timeout (and notify callers?) + m_PendingCalls[id] = sender; + m_Client->SendMessage(message); + } } -void JsonRpcEndpoint::SendResponse(Endpoint::Ptr sender, JsonRpcResponse::Ptr message) +void JsonRpcEndpoint::ProcessResponse(Endpoint::Ptr sender, const JsonRpcResponse& message) { if (IsConnected()) m_Client->SendMessage(message); } + +int JsonRpcEndpoint::NewMessageHandler(const NewMessageEventArgs& nmea) +{ + const Message& message = nmea.Message; + Endpoint::Ptr sender = static_pointer_cast(shared_from_this()); + + string method; + if (message.GetDictionary()->GetValueString("method", &method)) { + JsonRpcRequest request = message; + Message params; + string method; + + if (request.GetMethod(&method) && request.GetParams(¶ms) && + (method == "message::Subscribe" || method == "message::Provide")) { + string sub_method; + if (params.GetDictionary()->GetValueString("method", &sub_method)) { + if (method == "message::Subscribe") + RegisterMethodSink(sub_method); + else + RegisterMethodSource(sub_method); + } + + return 0; + } + + string id; + if (request.GetID(&id)) + GetEndpointManager()->SendAnycastRequest(sender, request, false); + else + GetEndpointManager()->SendMulticastRequest(sender, request, false); + } else { + // TODO: deal with response messages + throw NotImplementedException(); + } + + return 0; +} + +int JsonRpcEndpoint::ClientClosedHandler(const EventArgs& ea) +{ + m_PendingCalls.clear(); + + // TODO: clear method sources/sinks + + if (m_Client->GetPeerHost() != string()) { + Timer::Ptr timer = make_shared(); + timer->SetInterval(30); + timer->SetUserArgs(ea); + timer->OnTimerExpired += bind_weak(&JsonRpcEndpoint::ClientReconnectHandler, shared_from_this()); + timer->Start(); + m_ReconnectTimer = timer; + } + + m_Client.reset(); + + // TODO: persist events, etc., for now we just disable the endpoint + + return 0; +} + +int JsonRpcEndpoint::ClientErrorHandler(const SocketErrorEventArgs& ea) +{ + cerr << "Error occured for JSON-RPC socket: Code=" << ea.Code << "; Message=" << ea.Message << endl; + + return 0; +} + +int JsonRpcEndpoint::ClientReconnectHandler(const TimerEventArgs& ea) +{ + JsonRpcClient::Ptr client = static_pointer_cast(ea.UserArgs.Source); + Timer::Ptr timer = static_pointer_cast(ea.Source); + + m_Client = client; + + timer->Stop(); + m_ReconnectTimer.reset(); + + return 0; +} diff --git a/icinga/jsonrpcendpoint.h b/icinga/jsonrpcendpoint.h index 6f037e97f..e9d1b330d 100644 --- a/icinga/jsonrpcendpoint.h +++ b/icinga/jsonrpcendpoint.h @@ -8,17 +8,32 @@ class I2_ICINGA_API JsonRpcEndpoint : public Endpoint { private: JsonRpcClient::Ptr m_Client; + map m_PendingCalls; + Timer::Ptr m_ReconnectTimer; bool IsConnected(void) const; + + int NewMessageHandler(const NewMessageEventArgs& nmea); + int ClientClosedHandler(const EventArgs& ea); + int ClientErrorHandler(const SocketErrorEventArgs& ea); + int ClientReconnectHandler(const TimerEventArgs& ea); + + int SyncSubscription(string type, const NewMethodEventArgs& nmea); + int SyncSubscriptions(Endpoint::Ptr endpoint); public: - JsonRpcEndpoint(void); + typedef shared_ptr Ptr; + typedef weak_ptr WeakPtr; + + void Connect(string host, unsigned short port); JsonRpcClient::Ptr GetClient(void); void SetClient(JsonRpcClient::Ptr client); - virtual void SendRequest(Endpoint::Ptr sender, JsonRpcRequest::Ptr message); - virtual void SendResponse(Endpoint::Ptr sender, JsonRpcResponse::Ptr message); + virtual bool IsLocal(void) const; + + virtual void ProcessRequest(Endpoint::Ptr sender, const JsonRpcRequest& message); + virtual void ProcessResponse(Endpoint::Ptr sender, const JsonRpcResponse& message); }; } diff --git a/icinga/virtualendpoint.cpp b/icinga/virtualendpoint.cpp index 560a6cd97..15ce559b0 100644 --- a/icinga/virtualendpoint.cpp +++ b/icinga/virtualendpoint.cpp @@ -2,14 +2,19 @@ using namespace icinga; -void VirtualEndpoint::RegisterMethodHandler(string method, function callback) +bool VirtualEndpoint::IsLocal(void) const +{ + return true; +} + +void VirtualEndpoint::RegisterMethodHandler(string method, function callback) { m_MethodHandlers[method] += callback; RegisterMethodSink(method); } -void VirtualEndpoint::UnregisterMethodHandler(string method, function callback) +void VirtualEndpoint::UnregisterMethodHandler(string method, function callback) { // TODO: implement //m_MethodHandlers[method] -= callback; @@ -18,25 +23,25 @@ void VirtualEndpoint::UnregisterMethodHandler(string method, functionGetMethod(&method)) + if (!request.GetMethod(&method)) return; - map >::iterator i = m_MethodHandlers.find(method); + map >::iterator i = m_MethodHandlers.find(method); if (i == m_MethodHandlers.end()) throw InvalidArgumentException(); - NewRequestEventArgs::Ptr nrea = make_shared(); - nrea->Source = shared_from_this(); - nrea->Sender = sender; - nrea->Request = request; + NewRequestEventArgs nrea; + nrea.Source = shared_from_this(); + nrea.Sender = sender; + nrea.Request = request; i->second(nrea); } -void VirtualEndpoint::SendResponse(Endpoint::Ptr sender, JsonRpcResponse::Ptr response) +void VirtualEndpoint::ProcessResponse(Endpoint::Ptr sender, const JsonRpcResponse& response) { // TODO: figure out which request this response belongs to and notify the caller throw NotImplementedException(); diff --git a/icinga/virtualendpoint.h b/icinga/virtualendpoint.h index c7c150683..ecf420225 100644 --- a/icinga/virtualendpoint.h +++ b/icinga/virtualendpoint.h @@ -4,32 +4,31 @@ namespace icinga { -struct I2_JSONRPC_API NewRequestEventArgs : public EventArgs +struct I2_ICINGA_API NewRequestEventArgs : public EventArgs { typedef shared_ptr Ptr; typedef weak_ptr WeakPtr; Endpoint::Ptr Sender; - JsonRpcRequest::Ptr Request; + JsonRpcRequest Request; }; class I2_ICINGA_API VirtualEndpoint : public Endpoint { private: - map< string, Event > m_MethodHandlers; + map< string, Event > m_MethodHandlers; public: typedef shared_ptr Ptr; typedef weak_ptr WeakPtr; - void RegisterMethodHandler(string method, function callback); - void UnregisterMethodHandler(string method, function callback); + void RegisterMethodHandler(string method, function callback); + void UnregisterMethodHandler(string method, function callback); - virtual void RegisterMethodSource(string method); - virtual void UnregisterMethodSource(string method); + virtual bool IsLocal(void) const; - virtual void SendRequest(Endpoint::Ptr sender, JsonRpcRequest::Ptr message); - virtual void SendResponse(Endpoint::Ptr sender, JsonRpcResponse::Ptr message); + virtual void ProcessRequest(Endpoint::Ptr sender, const JsonRpcRequest& message); + virtual void ProcessResponse(Endpoint::Ptr sender, const JsonRpcResponse& message); }; } diff --git a/jsonrpc/i2-jsonrpc.h b/jsonrpc/i2-jsonrpc.h index b7ced984e..7aed38444 100644 --- a/jsonrpc/i2-jsonrpc.h +++ b/jsonrpc/i2-jsonrpc.h @@ -11,6 +11,8 @@ # define I2_JSONRPC_API I2_IMPORT #endif /* I2_JSONRPC_BUILD */ +#include "variant.h" +#include "dictionary.h" #include "message.h" #include "netstring.h" #include "jsonrpcrequest.h" diff --git a/jsonrpc/jsonrpc.vcxproj b/jsonrpc/jsonrpc.vcxproj index 83fe30436..e595d6d80 100644 --- a/jsonrpc/jsonrpc.vcxproj +++ b/jsonrpc/jsonrpc.vcxproj @@ -27,34 +27,6 @@ - - - Document - "$(OutputPath)\msgc" %(Identity) - "$(OutputPath)\msgc" %(Identity) - Compiling %(Identity) - Compiling %(Identity) - %(Filename).cpp %(Filename).h - %(Filename).cpp %(Filename).h - - - - - - - Document - %(Filename).cpp %(Filename).h - %(Filename).cpp %(Filename).h - Compiling %(Identity) - Compiling %(Identity) - "$(OutputPath)\msgc" %(Identity) - "$(OutputPath)\msgc" %(Identity) - - - - - - {8DD52FAC-ECEE-48C2-B266-E7C47ED485F8} Win32Proj diff --git a/jsonrpc/jsonrpcclient.cpp b/jsonrpc/jsonrpcclient.cpp index 45f10dae6..10f232ada 100644 --- a/jsonrpc/jsonrpcclient.cpp +++ b/jsonrpc/jsonrpcclient.cpp @@ -9,30 +9,32 @@ void JsonRpcClient::Start(void) OnDataAvailable += bind_weak(&JsonRpcClient::DataAvailableHandler, shared_from_this()); } -void JsonRpcClient::SendMessage(Message::Ptr message) +void JsonRpcClient::SendMessage(const Message& message) { Netstring::WriteMessageToFIFO(GetSendQueue(), message); } -int JsonRpcClient::DataAvailableHandler(EventArgs::Ptr ea) +int JsonRpcClient::DataAvailableHandler(const EventArgs& ea) { - Message::Ptr message; + Message message; + bool message_read; while (true) { try { - message = Netstring::ReadMessageFromFIFO(GetRecvQueue()); - } catch (const exception&) { + message_read = Netstring::ReadMessageFromFIFO(GetRecvQueue(), &message); + } catch (const Exception& ex) { + cerr << "Exception while reading from JSON-RPC client: " << ex.GetMessage() << endl; Close(); return 1; } - if (message.get() == NULL) + if (!message_read) break; - NewMessageEventArgs::Ptr nea = make_shared(); - nea->Source = shared_from_this(); - nea->Message = message; + NewMessageEventArgs nea; + nea.Source = shared_from_this(); + nea.Message = message; OnNewMessage(nea); } diff --git a/jsonrpc/jsonrpcclient.h b/jsonrpc/jsonrpcclient.h index bafe381e7..449fb8c1c 100644 --- a/jsonrpc/jsonrpcclient.h +++ b/jsonrpc/jsonrpcclient.h @@ -9,23 +9,23 @@ struct I2_JSONRPC_API NewMessageEventArgs : public EventArgs typedef shared_ptr Ptr; typedef weak_ptr WeakPtr; - Message::Ptr Message; + Message Message; }; class I2_JSONRPC_API JsonRpcClient : public TCPClient { private: - int DataAvailableHandler(EventArgs::Ptr ea); + int DataAvailableHandler(const EventArgs& ea); public: typedef shared_ptr Ptr; typedef weak_ptr WeakPtr; - void SendMessage(Message::Ptr message); + void SendMessage(const Message& message); virtual void Start(void); - Event OnNewMessage; + Event OnNewMessage; }; } diff --git a/jsonrpc/jsonrpcrequest.cpp b/jsonrpc/jsonrpcrequest.cpp new file mode 100644 index 000000000..9f252b630 --- /dev/null +++ b/jsonrpc/jsonrpcrequest.cpp @@ -0,0 +1,4 @@ +#include "i2-jsonrpc.h" +#include "jsonrpcrequest.h" + +using namespace icinga; diff --git a/jsonrpc/jsonrpcrequest.h b/jsonrpc/jsonrpcrequest.h new file mode 100644 index 000000000..d9d1df769 --- /dev/null +++ b/jsonrpc/jsonrpcrequest.h @@ -0,0 +1,64 @@ +#ifndef JSONRPCREQUEST_H +#define JSONRPCREQUEST_H + +namespace icinga +{ + +class I2_JSONRPC_API JsonRpcRequest : public Message +{ + +public: + JsonRpcRequest(void) : Message() { } + JsonRpcRequest(const Message& message) : Message(message) { } + + inline bool GetVersion(string *value) const + { + return GetDictionary()->GetValueString("jsonrpc", value); + } + + inline void SetVersion(const string& value) + { + GetDictionary()->SetValueString("jsonrpc", value); + } + + inline bool GetMethod(string *value) const + { + return GetDictionary()->GetValueString("method", value); + } + + inline void SetMethod(const string& value) + { + GetDictionary()->SetValueString("method", value); + } + + inline bool GetParams(Message *value) const + { + Dictionary::Ptr dictionary; + + if (!GetDictionary()->GetValueDictionary("params", &dictionary)) + return false; + + *value = Message(dictionary); + + return true; + } + + inline void SetParams(const Message& value) + { + GetDictionary()->SetValueDictionary("params", value.GetDictionary()); + } + + inline bool GetID(string *value) const + { + return GetDictionary()->GetValueString("id", value); + } + + inline void SetID(const string& value) + { + GetDictionary()->SetValueString("id", value); + } +}; + +} + +#endif /* JSONRPCREQUEST_H */ diff --git a/jsonrpc/jsonrpcresponse.cpp b/jsonrpc/jsonrpcresponse.cpp new file mode 100644 index 000000000..009925892 --- /dev/null +++ b/jsonrpc/jsonrpcresponse.cpp @@ -0,0 +1,4 @@ +#include "i2-jsonrpc.h" +#include "jsonrpcresponse.h" + +using namespace icinga; diff --git a/jsonrpc/jsonrpcresponse.h b/jsonrpc/jsonrpcresponse.h new file mode 100644 index 000000000..aed6ff38a --- /dev/null +++ b/jsonrpc/jsonrpcresponse.h @@ -0,0 +1,56 @@ +#ifndef JSONRPCRESPONSE_H +#define JSONRPCRESPONSE_H + +namespace icinga +{ + +class I2_JSONRPC_API JsonRpcResponse : public Message +{ +public: + JsonRpcResponse(void) : Message() { } + JsonRpcResponse(const Message& message) : Message(message) { } + + inline bool GetVersion(string *value) const + { + return GetDictionary()->GetValueString("jsonrpc", value); + } + + inline void SetJsonRpc(const string& value) + { + GetDictionary()->SetValueString("jsonrpc", value); + } + + bool GetResult(string *value) const + { + return GetDictionary()->GetValueString("result", value); + } + + void SetResult(const string& value) + { + GetDictionary()->SetValueString("result", value); + } + + bool GetError(string *value) const + { + return GetDictionary()->GetValueString("error", value); + } + + void SetError(const string& value) + { + GetDictionary()->SetValueString("error", value); + } + + bool GetID(string *value) const + { + return GetDictionary()->GetValueString("id", value); + } + + void SetID(const string& value) + { + GetDictionary()->SetValueString("id", value); + } +}; + +} + +#endif /* JSONRPCRESPONSE_H */ diff --git a/jsonrpc/message.cpp b/jsonrpc/message.cpp new file mode 100644 index 000000000..2b23caa93 --- /dev/null +++ b/jsonrpc/message.cpp @@ -0,0 +1,23 @@ +#include "i2-jsonrpc.h" + +using namespace icinga; + +Message::Message(void) +{ + m_Dictionary = make_shared(); +} + +Message::Message(const Dictionary::Ptr& dictionary) +{ + m_Dictionary = dictionary; +} + +Message::Message(const Message& message) +{ + m_Dictionary = message.GetDictionary(); +} + +Dictionary::Ptr Message::GetDictionary(void) const +{ + return m_Dictionary; +} diff --git a/jsonrpc/message.h b/jsonrpc/message.h new file mode 100644 index 000000000..589de15c0 --- /dev/null +++ b/jsonrpc/message.h @@ -0,0 +1,22 @@ +#ifndef MESSAGE_H +#define MESSAGE_H + +namespace icinga +{ + +class I2_JSONRPC_API Message +{ +private: + Dictionary::Ptr m_Dictionary; + +public: + Message(void); + Message(const Dictionary::Ptr& dictionary); + Message(const Message& message); + + Dictionary::Ptr GetDictionary(void) const; +}; + +} + +#endif /* MESSAGE_H */ diff --git a/jsonrpc/netstring.cpp b/jsonrpc/netstring.cpp index a5a9515a4..27052739d 100644 --- a/jsonrpc/netstring.cpp +++ b/jsonrpc/netstring.cpp @@ -3,19 +3,72 @@ using namespace icinga; +Dictionary::Ptr Netstring::GetDictionaryFromJson(cJSON *json) +{ + Dictionary::Ptr dictionary = make_shared(); + + for (cJSON *i = json->child; i != NULL; i = i->next) { + switch (i->type) { + case cJSON_Number: + dictionary->SetValueInteger(i->string, i->valueint); + break; + case cJSON_String: + dictionary->SetValueString(i->string, i->valuestring); + break; + case cJSON_Object: + dictionary->SetValueDictionary(i->string, GetDictionaryFromJson(i)); + break; + default: + break; + } + } + + return dictionary; +} + +cJSON *Netstring::GetJsonFromDictionary(const Dictionary::Ptr& dictionary) +{ + cJSON *json; + string valueString; + Dictionary::Ptr valueDictionary; + + json = cJSON_CreateObject(); + + for (DictionaryIterator i = dictionary->Begin(); i != dictionary->End(); i++) { + switch (i->second.GetType()) { + case VariantInteger: + cJSON_AddNumberToObject(json, i->first.c_str(), i->second.GetInteger()); + break; + case VariantString: + valueString = i->second.GetString(); + cJSON_AddStringToObject(json, i->first.c_str(), valueString.c_str()); + break; + case VariantObject: + valueDictionary = dynamic_pointer_cast(i->second.GetObject()); + + if (valueDictionary.get() != NULL) + cJSON_AddItemToObject(json, i->first.c_str(), GetJsonFromDictionary(valueDictionary)); + default: + break; + } + } + + return json; +} + /* based on https://github.com/PeterScott/netstring-c/blob/master/netstring.c */ -Message::Ptr Netstring::ReadMessageFromFIFO(FIFO::Ptr fifo) +bool Netstring::ReadMessageFromFIFO(FIFO::Ptr fifo, Message *message) { size_t buffer_length = fifo->GetSize(); char *buffer = (char *)fifo->GetReadBuffer(); /* minimum netstring length is 3 */ if (buffer_length < 3) - return NULL; + return false; /* no leading zeros allowed */ if (buffer[0] == '0' && isdigit(buffer[1])) - throw exception(/*"Invalid netstring (leading zero)"*/); + throw InvalidArgumentException("Invalid netstring (leading zero)"); size_t len, i; @@ -23,22 +76,22 @@ Message::Ptr Netstring::ReadMessageFromFIFO(FIFO::Ptr fifo) for (i = 0; i < buffer_length && isdigit(buffer[i]); i++) { /* length specifier must have at most 9 characters */ if (i >= 9) - return NULL; + return false; len = len * 10 + (buffer[i] - '0'); } /* make sure the buffer is large enough */ if (i + len + 1 >= buffer_length) - return NULL; + return false; /* check for the colon delimiter */ if (buffer[i++] != ':') - throw exception(/*"Invalid Netstring (missing :)"*/); + throw InvalidArgumentException("Invalid Netstring (missing :)"); /* check for the comma delimiter after the string */ if (buffer[i + len] != ',') - throw exception(/*"Invalid Netstring (missing ,)"*/); + throw InvalidArgumentException("Invalid Netstring (missing ,)"); /* nuke the comma delimiter */ buffer[i + len] = '\0'; @@ -47,30 +100,34 @@ Message::Ptr Netstring::ReadMessageFromFIFO(FIFO::Ptr fifo) if (object == NULL) { /* restore the comma */ buffer[i + len] = ','; - throw exception(/*"Invalid JSON string"*/); + throw InvalidArgumentException("Invalid JSON string"); } /* remove the data from the fifo */ fifo->Read(NULL, i + len + 1); - return make_shared(object); + *message = Message(GetDictionaryFromJson(object)); + cJSON_Delete(object); + return true; } -void Netstring::WriteMessageToFIFO(FIFO::Ptr fifo, Message::Ptr message) +void Netstring::WriteMessageToFIFO(FIFO::Ptr fifo, const Message& message) { char *json; - shared_ptr object = message->GetJson(); + cJSON *object = GetJsonFromDictionary(message.GetDictionary()); size_t len; #ifdef _DEBUG - json = cJSON_Print(object.get()); + json = cJSON_Print(object); #else /* _DEBUG */ - json = cJSON_PrintUnformatted(object.get()); + json = cJSON_PrintUnformatted(object); #endif /* _DEBUG */ + cJSON_Delete(object); + len = strlen(json); char strLength[50]; - sprintf(strLength, "%lu", (unsigned long)len); + sprintf(strLength, "%lu:", (unsigned long)len); fifo->Write(strLength, strlen(strLength)); fifo->Write(json, len); diff --git a/jsonrpc/netstring.h b/jsonrpc/netstring.h index e33c0f8da..1895b1037 100644 --- a/jsonrpc/netstring.h +++ b/jsonrpc/netstring.h @@ -10,12 +10,15 @@ private: size_t m_Length; void *m_Data; + static Dictionary::Ptr Netstring::GetDictionaryFromJson(cJSON *json); + static cJSON *GetJsonFromDictionary(const Dictionary::Ptr& dictionary); + public: typedef shared_ptr Ptr; typedef weak_ptr WeakPtr; - static Message::Ptr ReadMessageFromFIFO(FIFO::Ptr fifo); - static void WriteMessageToFIFO(FIFO::Ptr fifo, Message::Ptr message); + static bool ReadMessageFromFIFO(FIFO::Ptr fifo, Message *message); + static void WriteMessageToFIFO(FIFO::Ptr fifo, const Message& message); }; } diff --git a/msgc/msgc.cpp b/msgc/msgc.cpp deleted file mode 100644 index f85551852..000000000 --- a/msgc/msgc.cpp +++ /dev/null @@ -1,177 +0,0 @@ -#include -#include -#include -#include -#include -#include - -using namespace std; - -void trim(string& str, const char *whitespace = "\r\n\t ") -{ - string::size_type pos; - - pos = str.find_first_not_of(whitespace); - if (pos != string::npos) - str.erase(0, pos); - - pos = str.find_last_not_of(whitespace); - if (pos != string::npos) - str.erase(pos + 1); -} - -int main(int argc, char **argv) -{ - if (argc < 2) { - cerr << "Syntax: " << argv[0] << " " << endl; - return EXIT_FAILURE; - } - - char *pos; - pos = strrchr(argv[1], '.'); - - if (pos == NULL || strcmp(pos, ".message") != 0) { - cerr << "Input filename must have the '.message' extension." << endl; - return EXIT_FAILURE; - } - - char *headername, *implname; - headername = strdup(argv[1]); - strcpy(&(headername[pos - argv[1]]), ".h"); - - implname = strdup(argv[1]); - strcpy(&(implname[pos - argv[1]]), ".cpp"); - - fstream inputfp, headerfp, implfp; - - inputfp.open(argv[1], fstream::in); - headerfp.open(headername, fstream::out | fstream::trunc); - implfp.open(implname, fstream::out | fstream::trunc); - - string line; - string klass, klassupper, base; - bool hasclass = false; - - while (true) { - getline(inputfp, line); - - if (inputfp.fail()) - break; - - if (!hasclass) { - string::size_type index = line.find(':'); - - if (index == string::npos) { - cerr << "Must specify class and base name." << endl; - return EXIT_FAILURE; - } - - klass = line.substr(0, index); - trim(klass); - - klassupper = klass; - transform(klassupper.begin(), klassupper.end(), klassupper.begin(), toupper); - - base = line.substr(index + 1); - trim(base); - - cout << "Class: '" << klass << "' (inherits from: '" << base << "')" << endl; - - headerfp << "#ifndef " << klassupper << "_H" << endl - << "#define " << klassupper << "_H" << endl - << endl - << "namespace icinga" << endl - << "{" << endl - << endl - << "class " << klass << " : public " << base << endl - << "{" << endl - << endl - << "public:" << endl - << "\ttypedef shared_ptr<" << klass << "> Ptr;" << endl - << "\ttypedef weak_ptr<" << klass << "> WeakPtr;" << endl - << endl - << "\t" << klass << "(void) : " << base << "() { }" << endl - << "\t" << klass << "(const Message::Ptr& message) : " << base << "(message) { }" << endl - << endl; - - implfp << "#include \"i2-jsonrpc.h\"" << endl - << "#include \"" << headername << "\"" << endl - << endl - << "using namespace icinga;" << endl - << endl; - - hasclass = true; - } else { - string::size_type index = line.find(':'); - - if (index == string::npos) { - cerr << "Must specify type and property name." << endl; - return EXIT_FAILURE; - } - - string prop = line.substr(0, index); - trim(prop); - - string type = line.substr(index + 1); - trim(type); - - string typeaccessor = type; - typeaccessor[0] = toupper(typeaccessor[0]); - - string rawtype = type; - - /* assume it's a reference type if we don't know the type */ - if (type != "int" && type != "string") { - type = type + "::Ptr"; - typeaccessor = "Message"; - } - - cout << "Property: '" << prop << "' (Type: '" << type << "')" << endl; - - headerfp << endl - << "\tbool Get" << prop << "(" << type << " *value);" << endl - << "\tvoid Set" << prop << "(const " << type << "& value);" << endl; - - implfp << "bool " << klass << "::Get" << prop << "(" << type << " *value)" << endl - << "{" << endl; - - if (typeaccessor == "Message") { - implfp << "\tMessage::Ptr message;" << endl - << endl - << "\tif (!GetProperty" << typeaccessor << "(\"" << prop << "\", &message))" << endl - << "\treturn false;" << endl - << endl - << "\t*value = message->Cast<" + rawtype + ">();" << endl - << "return true;" << endl - << endl; - } else { - implfp << "\treturn GetProperty" << typeaccessor << "(\"" << prop << "\", value);" << endl; - } - - implfp << "}" << endl - << endl; - - implfp << "void " << klass << "::Set" << prop << "(const " << type << "& value)" << endl - << "{" << endl - << "\tSetProperty" << typeaccessor << "(\"" << prop << "\", value);" << endl - << "}" << endl - << endl; - } - } - - headerfp << endl - << "};" << endl - << endl - << "}" << endl - << endl - << "#endif /* " << klassupper << "_H */" << endl; - - inputfp.close(); - headerfp.close(); - implfp.close(); - - free(headername); - free(implname); - - return EXIT_SUCCESS; -} diff --git a/msgc/msgc.vcxproj b/msgc/msgc.vcxproj deleted file mode 100644 index 63e633455..000000000 --- a/msgc/msgc.vcxproj +++ /dev/null @@ -1,82 +0,0 @@ - - - - - Debug - Win32 - - - Release - Win32 - - - - {4F00EE82-B829-4872-B8F0-C1A8D86C94B4} - Win32Proj - msgc - - - - Application - true - Unicode - - - Application - false - true - Unicode - - - - - - - - - - - - - true - - - false - - - - - - Level3 - Disabled - WIN32;_DEBUG;_CONSOLE;%(PreprocessorDefinitions) - - - Console - true - - - - - Level3 - - - MaxSpeed - true - true - WIN32;NDEBUG;_CONSOLE;%(PreprocessorDefinitions) - - - Console - true - true - true - - - - - - - - - \ No newline at end of file -- 2.40.0