int fd = socket->GetFD();
- if (socket->WantsToRead())
- FD_SET(fd, &readfds);
-
if (socket->WantsToWrite())
FD_SET(fd, &writefds);
+ if (socket->WantsToRead())
+ FD_SET(fd, &readfds);
+
FD_SET(fd, &exceptfds);
if (fd > nfds)
int fd = socket->GetFD();
- if (FD_ISSET(fd, &readfds))
- socket->OnReadable(ea);
-
if (FD_ISSET(fd, &writefds))
socket->OnWritable(ea);
+ if (FD_ISSET(fd, &readfds))
+ socket->OnReadable(ea);
+
if (FD_ISSET(fd, &exceptfds))
socket->OnException(ea);
}
return count;
}
-void *FIFO::GetWriteBuffer(size_t count)
+void *FIFO::GetWriteBuffer(size_t *count)
{
- ResizeBuffer(m_Offset + m_DataSize + count);
+ ResizeBuffer(m_Offset + m_DataSize + *count);
+ *count = m_AllocSize - m_Offset - m_DataSize;
return m_Buffer + m_Offset + m_DataSize;
}
size_t FIFO::Write(const void *buffer, size_t count)
{
if (buffer != NULL) {
- void *target_buffer = GetWriteBuffer(count);
+ size_t bufferSize = count;
+ void *target_buffer = GetWriteBuffer(&bufferSize);
memcpy(target_buffer, buffer, count);
}
size_t GetSize(void) const;
const void *GetReadBuffer(void) const;
- void *GetWriteBuffer(size_t count);
+ void *GetWriteBuffer(size_t *count);
size_t Read(void *buffer, size_t count);
size_t Write(const void *buffer, size_t count);
int TCPClient::ReadableEventHandler(EventArgs::RefType ea)
{
- int read_total, rc;
-
- read_total = 0;
+ int rc;
- while (true) {
- static const size_t BufferSize = FIFO::BlockSize / 2;
- char *buffer = (char *)m_RecvQueue->GetWriteBuffer(BufferSize);
- rc = recv(GetFD(), buffer, BufferSize, 0);
+ size_t bufferSize = FIFO::BlockSize / 2;
+ char *buffer = (char *)m_RecvQueue->GetWriteBuffer(&bufferSize);
+ rc = recv(GetFD(), buffer, bufferSize, 0);
#ifdef _WIN32
- if (rc < 0 && WSAGetLastError() == WSAEWOULDBLOCK)
+ if (rc < 0 && WSAGetLastError() == WSAEWOULDBLOCK)
#else /* _WIN32 */
- if (rc < 0 && errno == EAGAIN)
+ if (rc < 0 && errno == EAGAIN)
#endif /* _WIN32 */
- break;
-
- if (rc <= 0) {
- Close();
- return 0;
- }
-
- m_RecvQueue->Write(NULL, rc);
- read_total += rc;
+ return 0;
- /* make sure we don't starve other sockets */
- if (read_total > 128 * 1024)
- break;
+ if (rc <= 0) {
+ Close();
+ return 0;
}
+ m_RecvQueue->Write(NULL, rc);
+
EventArgs::RefType dea = new_object<EventArgs>();
dea->Source = shared_from_this();
OnDataAvailable(dea);
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
+#include <sys/ioctl.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include "i2-jsonrpc.h"
using namespace icinga;
+using std::map;
+using std::function;
void ConnectionManager::BindServer(JsonRpcServer::RefType server)
{
int ConnectionManager::NewMessageHandler(NewMessageEventArgs::RefType nmea)
{
- OnNewMessage(nmea);
+ JsonRpcMessage::RefType request = nmea->Message;
+ JsonRpcClient::RefType client = static_pointer_cast<JsonRpcClient>(nmea->Source);
+
+ map<string, event<NewMessageEventArgs::RefType> >::iterator i;
+ i = m_Methods.find(request->GetMethod());
+
+ if (i == m_Methods.end()) {
+ JsonRpcMessage::RefType response = new_object<JsonRpcMessage>();
+ response->SetVersion("2.0");
+ response->SetError("Unknown method.");
+ response->SetID(request->GetID());
+ Netstring::WriteJSONToFIFO(client->GetSendQueue(), response->GetJSON());
+
+ return 0;
+ }
+
+ i->second(nmea);
return 0;
}
+
+void ConnectionManager::RegisterMethod(string method, function<int (NewMessageEventArgs::RefType)> callback)
+{
+ map<string, event<NewMessageEventArgs::RefType> >::iterator i;
+ i = m_Methods.find(method);
+
+ if (i == m_Methods.end()) {
+ m_Methods[method] = event<NewMessageEventArgs::RefType>();
+ i = m_Methods.find(method);
+ }
+
+ i->second.bind(callback);
+}
+
+void ConnectionManager::UnregisterMethod(string method, function<int (NewMessageEventArgs::RefType)> function)
+{
+ // TODO: implement
+}
namespace icinga
{
+using std::map;
+
class ConnectionManager : public Object
{
list<JsonRpcServer::RefType> m_Servers;
list<JsonRpcClient::RefType> m_Clients;
+ map< string, event<NewMessageEventArgs::RefType> > m_Methods;
int NewClientHandler(NewClientEventArgs::RefType ncea);
int CloseClientHandler(EventArgs::RefType ea);
void BindClient(JsonRpcClient::RefType client);
void UnbindClient(JsonRpcClient::RefType client);
- event<NewMessageEventArgs::RefType> OnNewMessage;
+ void RegisterMethod(string method, function<int (NewMessageEventArgs::RefType)> function);
+ void UnregisterMethod(string method, function<int (NewMessageEventArgs::RefType)> function);
};
}
#ifndef I2_JSONRPC_H
#define I2_JSONRPC_H
+#include <map>
#include <i2-base.h>
#include "cJSON.h"
return m_JSON;
}
-void JsonRpcMessage::SetFieldString(const char *field, const string& value)
+void JsonRpcMessage::InitJson(void)
{
if (m_JSON == NULL)
m_JSON = cJSON_CreateObject();
+}
+
+void JsonRpcMessage::SetFieldObject(const char *field, cJSON *object)
+{
+ if (m_JSON == NULL && object == NULL)
+ return;
+
+ InitJson();
- cJSON *object = cJSON_CreateString(value.c_str());
cJSON_DeleteItemFromObject(m_JSON, field);
- cJSON_AddItemToObject(m_JSON, field, object);
+
+ if (object != NULL)
+ cJSON_AddItemToObject(m_JSON, field, object);
}
-string JsonRpcMessage::GetFieldString(const char *field)
+cJSON *JsonRpcMessage::GetFieldObject(const char *field)
{
if (m_JSON == NULL)
- m_JSON = cJSON_CreateObject();
+ return NULL;
+
+ return cJSON_GetObjectItem(m_JSON, field);
+}
- cJSON *idObject = cJSON_GetObjectItem(m_JSON, field);
+void JsonRpcMessage::SetFieldString(const char *field, const string& value)
+{
+ cJSON *object = cJSON_CreateString(value.c_str());
+ SetFieldObject(field, object);
+}
- if (idObject == NULL || idObject->type != cJSON_String)
+string JsonRpcMessage::GetFieldString(const char *field)
+{
+ cJSON *object = GetFieldObject(field);
+
+ if (object == NULL || object->type != cJSON_String)
return string();
- return string(idObject->valuestring);
+ return string(object->valuestring);
}
void JsonRpcMessage::SetVersion(const string& version)
return GetFieldString("method");
}
-void JsonRpcMessage::SetParams(const string& params)
+void JsonRpcMessage::ClearParams(void)
{
- SetFieldString("params", params);
+ SetFieldObject("params", NULL);
}
-string JsonRpcMessage::GetParams(void)
+cJSON *JsonRpcMessage::GetParams(void)
{
- return GetFieldString("params");
+ cJSON *object = GetFieldObject("params");
+
+ if (object == NULL) {
+ object = cJSON_CreateObject();
+ cJSON_AddItemToObject(m_JSON, "params", object);
+ }
+
+ return object;
}
-void JsonRpcMessage::SetResult(const string& result)
+void JsonRpcMessage::ClearResult(void)
{
- SetFieldString("result", result);
+ SetFieldObject("result", NULL);
}
-string JsonRpcMessage::GetResult(void)
+cJSON *JsonRpcMessage::GetResult(void)
{
- return GetFieldString("result");
+ cJSON *object = GetFieldObject("result");
+
+ if (object == NULL) {
+ object = cJSON_CreateObject();
+ cJSON_AddItemToObject(m_JSON, "result", object);
+ }
+
+ return object;
}
void JsonRpcMessage::SetError(const string& error)
private:
cJSON *m_JSON;
+ void InitJson(void);
+
void SetFieldString(const char *field, const string& value);
string GetFieldString(const char *field);
+ void ClearField(const char *field);
+ void SetFieldObject(const char *field, cJSON *object);
+ cJSON *GetFieldObject(const char *field);
+
public:
typedef shared_ptr<JsonRpcMessage> RefType;
typedef weak_ptr<JsonRpcMessage> WeakRefType;
void SetMethod(const string& method);
string GetMethod(void);
- void SetParams(const string& params);
- string GetParams(void);
+ void ClearParams(void);
+ cJSON *GetParams(void);
- void SetResult(const string& result);
- string GetResult(void);
+ void ClearResult();
+ cJSON *GetResult(void);
void SetError(const string& error);
string GetError(void);
class MyApplication : public Application
{
-private:
- int m_Foo;
-
public:
typedef shared_ptr<MyApplication> RefType;
typedef weak_ptr<MyApplication> WeakRefType;
- MyApplication(void)
- {
- m_Foo = 0;
- }
-
int Main(const vector<string>& args)
{
- /*FIFO::RefType f = new_object<FIFO>();
- f->Write("12:Hello World!,", 16);
- Netstring::RefType ns = new_object<Netstring>();
- ns->ReadFromFIFO(f);
-
- Timer::RefType t = new_object<Timer>();
- t->SetInterval(2);
- t->OnTimerExpired.bind(bind_weak(&MyApplication::TimerCallback, shared_from_this()));
- t->Start();*/
-
JsonRpcServer::RefType ts = new_object<JsonRpcServer>();
ts->MakeSocket();
ts->Bind(7777);
ts->Listen();
ConnectionManager::RefType cm = new_object<ConnectionManager>();
- cm->OnNewMessage.bind(bind_weak(&MyApplication::MessageHandler, shared_from_this()));
+ cm->RegisterMethod("HelloWorld", bind_weak(&MyApplication::HelloWorld, shared_from_this()));
cm->BindServer(ts);
RunEventLoop();
return 0;
}
- int MessageHandler(NewMessageEventArgs::RefType nea)
+ int HelloWorld(NewMessageEventArgs::RefType nea)
{
JsonRpcClient::RefType client = static_pointer_cast<JsonRpcClient>(nea->Source);
JsonRpcMessage::RefType msg = nea->Message;
JsonRpcMessage::RefType response = new_object<JsonRpcMessage>();
response->SetVersion("2.0");
response->SetID(msg->GetID());
- response->SetResult("moo");
+ cJSON *result = response->GetResult();
+ cJSON_AddStringToObject(result, "greeting", "Hello World!");
client->SendMessage(response);
return 0;
}
-
- int TimerCallback(TimerEventArgs::RefType tda)
- {
- Timer::RefType t = static_pointer_cast<Timer>(tda->Source);
-
- m_Foo++;
-
- printf("Hello World!\n");
-
- if (m_Foo >= 5) {
- t->Stop();
- Shutdown();
- }
-
- return 0;
- }
};
SET_START_CLASS(MyApplication);