time_t now = time(NULL);
time_t next = Timer::GetNextCall();
- long sleep = (next < now) ? 0 : (next - now);
+ time_t sleep = (next < now) ? 0 : (next - now);
if (m_ShuttingDown)
break;
timeval tv;
- tv.tv_sec = (sleep < 0) ? 0 : sleep;
+ tv.tv_sec = (sleep < 0) ? 0 : (long)sleep;
tv.tv_usec = 0;
int ready;
m_Application = application;
}
-Application::Ptr Component::GetApplication(void)
+Application::Ptr Component::GetApplication(void) const
{
return m_Application.lock();
}
m_Config = componentConfig;
}
-ConfigObject::Ptr Component::GetConfig(void)
+ConfigObject::Ptr Component::GetConfig(void) const
{
return m_Config;
}
typedef weak_ptr<Component> WeakPtr;
void SetApplication(const Application::WeakPtr& application);
- Application::Ptr GetApplication(void);
+ Application::Ptr GetApplication(void) const;
void SetConfig(const ConfigObject::Ptr& componentConfig);
- ConfigObject::Ptr GetConfig(void);
+ ConfigObject::Ptr GetConfig(void) const;
- virtual string GetName(void) = 0;
+ virtual string GetName(void) const = 0;
virtual void Start(void) = 0;
virtual void Stop(void) = 0;
};
#ifdef _MSC_VER
# define HAVE_CXX11
# pragma warning(disable:4251)
+# define _CRT_SECURE_NO_DEPRECATE
#else /* _MSC_VER */
# include "config.h"
#endif /* _MSC_VER */
if (newType == m_Type)
return;
+ // TODO: convert variant data
throw NotImplementedException();
}
using namespace icinga;
-string ConfigFileComponent::GetName(void)
+string ConfigFileComponent::GetName(void) const
{
return "configfilecomponent";
}
typedef shared_ptr<ConfigFileComponent> Ptr;
typedef weak_ptr<ConfigFileComponent> WeakPtr;
- virtual string GetName(void);
+ virtual string GetName(void) const;
virtual void Start(void);
virtual void Stop(void);
};
<PropertyGroup Label="UserMacros" />
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">
<LinkIncremental>true</LinkIncremental>
- <IncludePath>$(SolutionDir)\base;$(SolutionDir)\jsonrpc;$(SolutionDir)\icinga;$(SolutionDir)\cJSON;$(IncludePath)</IncludePath>
+ <IncludePath>$(SolutionDir)\base;$(SolutionDir)\jsonrpc;$(SolutionDir)\icinga;$(IncludePath)</IncludePath>
<LibraryPath>$(OutDir);$(LibraryPath)</LibraryPath>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">
<LinkIncremental>false</LinkIncremental>
- <IncludePath>$(SolutionDir)\base;$(SolutionDir)\jsonrpc;$(SolutionDir)\icinga;$(SolutionDir)\cJSON;$(IncludePath)</IncludePath>
+ <IncludePath>$(SolutionDir)\base;$(SolutionDir)\jsonrpc;$(SolutionDir)\icinga;$(IncludePath)</IncludePath>
<LibraryPath>$(OutDir);$(LibraryPath)</LibraryPath>
</PropertyGroup>
<ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">
return static_pointer_cast<IcingaApplication>(GetApplication());
}
-string ConfigRpcComponent::GetName(void)
+string ConfigRpcComponent::GetName(void) const
{
return "configcomponent";
}
JsonRpcRequest MakeObjectMessage(const ConfigObject::Ptr& object, string method, bool includeProperties);
public:
- virtual string GetName(void);
+ virtual string GetName(void) const;
virtual void Start(void);
virtual void Stop(void);
};
<PropertyGroup Label="UserMacros" />
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">
<LinkIncremental>true</LinkIncremental>
- <IncludePath>$(SolutionDir)\base;$(SolutionDir)\jsonrpc;$(SolutionDir)\cJSON;$(SolutionDir)\icinga;$(IncludePath)</IncludePath>
+ <IncludePath>$(SolutionDir)\base;$(SolutionDir)\jsonrpc;$(SolutionDir)\icinga;$(IncludePath)</IncludePath>
<LibraryPath>$(OutDir);$(LibraryPath)</LibraryPath>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">
<LinkIncremental>false</LinkIncremental>
- <IncludePath>$(SolutionDir)\base;$(SolutionDir)\jsonrpc;$(SolutionDir)\cJSON;$(SolutionDir)\icinga;$(IncludePath)</IncludePath>
+ <IncludePath>$(SolutionDir)\base;$(SolutionDir)\jsonrpc;$(SolutionDir)\icinga;$(IncludePath)</IncludePath>
<LibraryPath>$(OutDir);$(LibraryPath)</LibraryPath>
</PropertyGroup>
<ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">
void EndpointManager::AddConnection(string host, unsigned short port)
{
JsonRpcEndpoint::Ptr endpoint = make_shared<JsonRpcEndpoint>();
- RegisterEndpoint(endpoint);
-
endpoint->Connect(host, port);
+ RegisterEndpoint(endpoint);
}
void EndpointManager::RegisterServer(JsonRpcServer::Ptr server)
int EndpointManager::NewClientHandler(const NewClientEventArgs& ncea)
{
JsonRpcEndpoint::Ptr endpoint = make_shared<JsonRpcEndpoint>();
- RegisterEndpoint(endpoint);
-
endpoint->SetClient(static_pointer_cast<JsonRpcClient>(ncea.Client));
+ RegisterEndpoint(endpoint);
return 0;
}
endpoint->OnNewMethodSource += bind_weak(&EndpointManager::NewMethodSourceHandler, shared_from_this());
endpoint->ForeachMethodSource(bind(&EndpointManager::NewMethodSourceHandler, this, _1));
+
+ NewEndpointEventArgs neea;
+ neea.Source = shared_from_this();
+ neea.Endpoint = endpoint;
+ OnNewEndpoint(neea);
}
void EndpointManager::UnregisterEndpoint(Endpoint::Ptr endpoint)
return 0;
}
-void EndpointManager::ForeachEndpoint(function<int (Endpoint::Ptr)> callback)
+void EndpointManager::ForeachEndpoint(function<int (const NewEndpointEventArgs&)> callback)
{
+ NewEndpointEventArgs neea;
+ neea.Source = shared_from_this();
for (list<Endpoint::Ptr>::iterator i = m_Endpoints.begin(); i != m_Endpoints.end(); i++) {
- callback(*i);
+ neea.Endpoint = *i;
+ callback(neea);
}
}
namespace icinga
{
+struct I2_ICINGA_API NewEndpointEventArgs : public EventArgs
+{
+ Endpoint::Ptr Endpoint;
+};
+
class I2_ICINGA_API EndpointManager : public Object
{
list<JsonRpcServer::Ptr> m_Servers;
void SendAnycastRequest(Endpoint::Ptr sender, const JsonRpcRequest& request, bool fromLocal = true);
void SendMulticastRequest(Endpoint::Ptr sender, const JsonRpcRequest& request, bool fromLocal = true);
- void ForeachEndpoint(function<int (Endpoint::Ptr)> callback);
+ void ForeachEndpoint(function<int (const NewEndpointEventArgs&)> callback);
+
+ Event<NewEndpointEventArgs> OnNewEndpoint;
};
}
#include "virtualendpoint.h"
#include "endpointmanager.h"
#include "icingaapplication.h"
+#include "subscriptioncomponent.h"
#endif /* I2ICINGA_H */
<ClCompile Include="endpointmanager.cpp" />
<ClCompile Include="icingaapplication.cpp" />
<ClCompile Include="jsonrpcendpoint.cpp" />
+ <ClCompile Include="subscriptioncomponent.cpp" />
<ClCompile Include="virtualendpoint.cpp" />
</ItemGroup>
<ItemGroup>
<ClInclude Include="i2-icinga.h" />
<ClInclude Include="icingaapplication.h" />
<ClInclude Include="jsonrpcendpoint.h" />
+ <ClInclude Include="subscriptioncomponent.h" />
<ClInclude Include="virtualendpoint.h" />
</ItemGroup>
<PropertyGroup Label="Globals">
<PropertyGroup Label="UserMacros" />
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">
<LinkIncremental>true</LinkIncremental>
- <IncludePath>$(SolutionDir)\base;$(SolutionDir)\jsonrpc;$(SolutionDir)\cJSON;$(IncludePath)</IncludePath>
+ <IncludePath>$(SolutionDir)\base;$(SolutionDir)\jsonrpc;$(IncludePath)</IncludePath>
<LibraryPath>$(OutDir);$(LibraryPath)</LibraryPath>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">
<LinkIncremental>false</LinkIncremental>
- <IncludePath>$(SolutionDir)\base;$(SolutionDir)\jsonrpc;$(SolutionDir)\cJSON;$(IncludePath)</IncludePath>
+ <IncludePath>$(SolutionDir)\base;$(SolutionDir)\jsonrpc;$(IncludePath)</IncludePath>
<LibraryPath>$(OutDir);$(LibraryPath)</LibraryPath>
</PropertyGroup>
<ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">
connectionCollection->OnObjectRemoved += bind_weak(&IcingaApplication::DeletedRpcConnectionHandler, shared_from_this());
+ SubscriptionComponent::Ptr subscriptionsComponent = make_shared<SubscriptionComponent>();
+ RegisterComponent(subscriptionsComponent);
+
ConfigObject::Ptr fileComponentConfig = make_shared<ConfigObject>("component", "configfile");
fileComponentConfig->SetProperty("configFilename", args[1]);
fileComponentConfig->SetPropertyInteger("replicate", 0);
request.SetVersion("2.0");
request.SetMethod("test");
- for (int i = 0; i < 10000; i++)
+ for (int i = 0; i < 5; i++)
m_EndpointManager->SendMulticastRequest(m_TestEndpoint, request);
return 0;
SetClient(client);
}
-int JsonRpcEndpoint::SyncSubscription(string type, const NewMethodEventArgs& nmea)
-{
- JsonRpcRequest request;
- request.SetVersion("2.0");
- request.SetMethod(type);
-
- Message params;
- params.GetDictionary()->SetValueString("method", nmea.Method);
- request.SetParams(params);
-
- m_Client->SendMessage(request);
-
- return 0;
-}
-
-int JsonRpcEndpoint::SyncSubscriptions(Endpoint::Ptr endpoint)
-{
- if (!endpoint->IsLocal())
- return 0;
-
- endpoint->ForeachMethodSink(bind(&JsonRpcEndpoint::SyncSubscription, this, "message::Subscribe", _1));
- endpoint->ForeachMethodSource(bind(&JsonRpcEndpoint::SyncSubscription, this, "message::Provide", _1));
-
- return 0;
-}
-
void JsonRpcEndpoint::SetClient(JsonRpcClient::Ptr client)
{
m_Client = client;
client->OnNewMessage += bind_weak(&JsonRpcEndpoint::NewMessageHandler, shared_from_this());
client->OnClosed += bind_weak(&JsonRpcEndpoint::ClientClosedHandler, shared_from_this());
client->OnError += bind_weak(&JsonRpcEndpoint::ClientErrorHandler, shared_from_this());
-
- NewMethodEventArgs nmea;
- nmea.Source = shared_from_this();
-
- nmea.Method = "message::Subscribe";
- SyncSubscription("message::Subscribe", nmea);
- SyncSubscription("message::Provide", nmea);
-
- nmea.Method = "message::Provide";
- SyncSubscription("message::Subscribe", nmea);
- SyncSubscription("message::Provide", nmea);
-
- GetEndpointManager()->ForeachEndpoint(bind(&JsonRpcEndpoint::SyncSubscriptions, this, _1));
}
bool JsonRpcEndpoint::IsLocal(void) const
int ClientErrorHandler(const SocketErrorEventArgs& ea);
int ClientReconnectHandler(const TimerEventArgs& ea);
- int SyncSubscription(string type, const NewMethodEventArgs& nmea);
- int SyncSubscriptions(Endpoint::Ptr endpoint);
-
public:
typedef shared_ptr<JsonRpcEndpoint> Ptr;
typedef weak_ptr<JsonRpcEndpoint> WeakPtr;
--- /dev/null
+#include "i2-icinga.h"
+
+using namespace icinga;
+
+IcingaApplication::Ptr SubscriptionComponent::GetIcingaApplication(void) const
+{
+ return static_pointer_cast<IcingaApplication>(GetApplication());
+}
+
+string SubscriptionComponent::GetName(void) const
+{
+ return "subscriptioncomponent";
+}
+
+void SubscriptionComponent::Start(void)
+{
+ m_SubscriptionEndpoint = make_shared<VirtualEndpoint>();
+ m_SubscriptionEndpoint->RegisterMethodHandler("message::Subscribe", bind_weak(&SubscriptionComponent::SubscribeMessageHandler, shared_from_this()));
+ m_SubscriptionEndpoint->RegisterMethodHandler("message::Provide", bind_weak(&SubscriptionComponent::ProvideMessageHandler, shared_from_this()));
+ m_SubscriptionEndpoint->RegisterMethodSource("message::Subscribe");
+ m_SubscriptionEndpoint->RegisterMethodSource("message::Provide");
+
+ EndpointManager::Ptr mgr = GetIcingaApplication()->GetEndpointManager();
+ mgr->OnNewEndpoint += bind_weak(&SubscriptionComponent::NewEndpointHandler, shared_from_this());
+ mgr->ForeachEndpoint(bind(&SubscriptionComponent::NewEndpointHandler, this, _1));
+ mgr->RegisterEndpoint(m_SubscriptionEndpoint);
+}
+
+void SubscriptionComponent::Stop(void)
+{
+ EndpointManager::Ptr mgr = GetIcingaApplication()->GetEndpointManager();
+ mgr->UnregisterEndpoint(m_SubscriptionEndpoint);
+}
+
+int SubscriptionComponent::SyncSubscription(Endpoint::Ptr target, string type, const NewMethodEventArgs& nmea)
+{
+ JsonRpcRequest request;
+ request.SetVersion("2.0");
+ request.SetMethod(type);
+
+ Message params;
+ params.GetDictionary()->SetValueString("method", nmea.Method);
+ request.SetParams(params);
+
+ target->ProcessRequest(m_SubscriptionEndpoint, request);
+
+ return 0;
+}
+
+int SubscriptionComponent::SyncSubscriptions(Endpoint::Ptr target, const NewEndpointEventArgs& neea)
+{
+ Endpoint::Ptr source = neea.Endpoint;
+
+ if (!source->IsLocal())
+ return 0;
+
+ source->ForeachMethodSink(bind(&SubscriptionComponent::SyncSubscription, this, target, "message::Subscribe", _1));
+ source->ForeachMethodSource(bind(&SubscriptionComponent::SyncSubscription, this, target, "message::Provide", _1));
+
+ // TODO: bind to endpoint's events
+ //endpoint->OnNewMethodSink...
+
+ return 0;
+}
+
+int SubscriptionComponent::NewEndpointHandler(const NewEndpointEventArgs& neea)
+{
+ if (neea.Endpoint->IsLocal())
+ return 0;
+
+ EndpointManager::Ptr mgr = GetIcingaApplication()->GetEndpointManager();
+ mgr->ForeachEndpoint(bind(&SubscriptionComponent::SyncSubscriptions, this, neea.Endpoint, _1));
+
+ return 0;
+}
+
+int SubscriptionComponent::SubscribeMessageHandler(const NewRequestEventArgs& nrea)
+{
+ Message params;
+ if (!nrea.Request.GetParams(¶ms))
+ return 0;
+
+ string method;
+ if (!params.GetDictionary()->GetValueString("method", &method))
+ return 0;
+
+ nrea.Sender->RegisterMethodSink(method);
+ return 0;
+}
+
+int SubscriptionComponent::ProvideMessageHandler(const NewRequestEventArgs& nrea)
+{
+ Message params;
+ if (!nrea.Request.GetParams(¶ms))
+ return 0;
+
+ string method;
+ if (!params.GetDictionary()->GetValueString("method", &method))
+ return 0;
+
+ nrea.Sender->RegisterMethodSource(method);
+ return 0;
+}
--- /dev/null
+#ifndef I2_SUBSCRIPTIONCOMPONENT_H
+#define I2_SUBSCRIPTIONCOMPONENT_H
+
+namespace icinga
+{
+
+class SubscriptionComponent : public Component
+{
+private:
+ VirtualEndpoint::Ptr m_SubscriptionEndpoint;
+
+ IcingaApplication::Ptr GetIcingaApplication(void) const;
+
+ int NewEndpointHandler(const NewEndpointEventArgs& neea);
+ int SubscribeMessageHandler(const NewRequestEventArgs& nrea);
+ int ProvideMessageHandler(const NewRequestEventArgs& nrea);
+
+ int SyncSubscription(Endpoint::Ptr target, string type, const NewMethodEventArgs& nmea);
+ int SyncSubscriptions(Endpoint::Ptr target, const NewEndpointEventArgs& neea);
+
+public:
+ virtual string GetName(void) const;
+ virtual void Start(void);
+ virtual void Stop(void);
+};
+
+}
+
+#endif /* I2_SUBSCRIPTIONCOMPONENT_H */
#include <map>
#include <i2-base.h>
-#include <cJSON.h>
#ifdef I2_JSONRPC_BUILD
# define I2_JSONRPC_API I2_EXPORT
#include <cstdio>
#include "i2-jsonrpc.h"
+#include <cJSON.h>
using namespace icinga;
-Dictionary::Ptr Netstring::GetDictionaryFromJson(cJSON *json)
+Dictionary::Ptr Netstring::GetDictionaryFromJson(json_t *json)
{
Dictionary::Ptr dictionary = make_shared<Dictionary>();
return dictionary;
}
-cJSON *Netstring::GetJsonFromDictionary(const Dictionary::Ptr& dictionary)
+json_t *Netstring::GetJsonFromDictionary(const Dictionary::Ptr& dictionary)
{
cJSON *json;
string valueString;
#ifndef NETSTRING_H
#define NETSTRING_H
+struct cJSON;
+
namespace icinga
{
+typedef ::cJSON json_t;
+
class I2_JSONRPC_API Netstring : public Object
{
private:
size_t m_Length;
void *m_Data;
- static Dictionary::Ptr GetDictionaryFromJson(cJSON *json);
- static cJSON *GetJsonFromDictionary(const Dictionary::Ptr& dictionary);
+ static Dictionary::Ptr GetDictionaryFromJson(json_t *json);
+ static json_t *GetJsonFromDictionary(const Dictionary::Ptr& dictionary);
public:
typedef shared_ptr<Netstring> Ptr;