configobject.h \
dictionary.cpp \
dictionary.h \
+ event.cpp \
+ event.h \
exception.cpp \
exception.h \
fifo.cpp \
void Application::RunEventLoop(void)
{
while (!m_ShuttingDown) {
- fd_set readfds, writefds, exceptfds;
- int nfds = -1;
-
Object::ClearHeldObjects();
long sleep = Timer::ProcessTimers();
if (m_ShuttingDown)
break;
- FD_ZERO(&readfds);
- FD_ZERO(&writefds);
- FD_ZERO(&exceptfds);
-
- Socket::CollectionType::iterator prev, i;
- for (i = Socket::Sockets.begin();
- i != Socket::Sockets.end(); ) {
- Socket::Ptr socket = i->lock();
-
- prev = i;
- i++;
-
- if (!socket) {
- Socket::Sockets.erase(prev);
- continue;
- }
-
- int fd = socket->GetFD();
-
- if (socket->WantsToWrite())
- FD_SET(fd, &writefds);
-
- if (socket->WantsToRead())
- FD_SET(fd, &readfds);
-
- FD_SET(fd, &exceptfds);
-
- if (fd > nfds)
- nfds = fd;
- }
-
- timeval tv;
- tv.tv_sec = sleep;
- tv.tv_usec = 0;
-
- int ready;
-
- if (nfds == -1) {
- Sleep(tv.tv_sec * 1000 + tv.tv_usec);
- ready = 0;
- } else
- ready = select(nfds + 1, &readfds, &writefds,
- &exceptfds, &tv);
-
- if (ready < 0)
- break;
- else if (ready == 0)
- continue;
-
- for (i = Socket::Sockets.begin();
- i != Socket::Sockets.end(); ) {
- Socket::Ptr socket = i->lock();
-
- prev = i;
- i++;
-
- if (!socket) {
- Socket::Sockets.erase(prev);
- continue;
- }
-
- int fd;
-
- fd = socket->GetFD();
- if (fd != INVALID_SOCKET && FD_ISSET(fd, &writefds))
- socket->OnWritable(socket);
-
- fd = socket->GetFD();
- if (fd != INVALID_SOCKET && FD_ISSET(fd, &readfds))
- socket->OnReadable(socket);
+ vector<Event::Ptr> events;
+
+ Event::Wait(&events, boost::get_system_time() + boost::posix_time::seconds(sleep));
- fd = socket->GetFD();
- if (fd != INVALID_SOCKET && FD_ISSET(fd, &exceptfds))
- socket->OnException(socket);
+ for (vector<Event::Ptr>::iterator it = events.begin(); it != events.end(); it++) {
+ Event::Ptr ev = *it;
+ ev->OnEventDelivered();
}
}
}
char timestamp[100];
// TODO: make this configurable
- if (!IsDebugging() && severity < LogInformation)
+ if (/*!IsDebugging() && */severity < LogInformation)
return;
string severityStr;
<ClCompile Include="component.cpp" />
<ClCompile Include="configobject.cpp" />
<ClCompile Include="dictionary.cpp" />
+ <ClCompile Include="event.cpp" />
<ClCompile Include="exception.cpp" />
<ClCompile Include="fifo.cpp" />
<ClCompile Include="object.cpp" />
<ClInclude Include="component.h" />
<ClInclude Include="configobject.h" />
<ClInclude Include="dictionary.h" />
+ <ClInclude Include="event.h" />
<ClInclude Include="objectmap.h" />
<ClInclude Include="objectset.h" />
<ClInclude Include="exception.h" />
<Optimization>Disabled</Optimization>
<PreprocessorDefinitions>_WINDLL;I2_BASE_BUILD;_DEBUG;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<WarningLevel>Level3</WarningLevel>
+ <MultiProcessorCompilation>true</MultiProcessorCompilation>
+ <MinimalRebuild>false</MinimalRebuild>
</ClCompile>
<Link>
<SubSystem>Windows</SubSystem>
<PreprocessorDefinitions>_WINDLL;I2_BASE_BUILD;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<FavorSizeOrSpeed>Speed</FavorSizeOrSpeed>
<WarningLevel>Level3</WarningLevel>
+ <MultiProcessorCompilation>true</MultiProcessorCompilation>
+ <MinimalRebuild>false</MinimalRebuild>
</ClCompile>
<Link>
<SubSystem>Windows</SubSystem>
--- /dev/null
+/******************************************************************************
+ * Icinga 2 *
+ * Copyright (C) 2012 Icinga Development Team (http://www.icinga.org/) *
+ * *
+ * This program is free software; you can redistribute it and/or *
+ * modify it under the terms of the GNU General Public License *
+ * as published by the Free Software Foundation; either version 2 *
+ * of the License, or (at your option) any later version. *
+ * *
+ * This program is distributed in the hope that it will be useful, *
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of *
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
+ * GNU General Public License for more details. *
+ * *
+ * You should have received a copy of the GNU General Public License *
+ * along with this program; if not, write to the Free Software Foundation *
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
+ ******************************************************************************/
+
+#include "i2-base.h"
+
+using namespace icinga;
+
+deque<Event::Ptr> Event::m_Events;
+condition_variable Event::m_EventAvailable;
+mutex Event::m_Mutex;
+
+bool Event::Wait(vector<Event::Ptr> *events, const system_time& wait_until)
+{
+ mutex::scoped_lock lock(m_Mutex);
+
+ while (m_Events.empty()) {
+ if (!m_EventAvailable.timed_wait(lock, wait_until))
+ return false;
+ }
+
+ vector<Event::Ptr> result;
+ std::copy(m_Events.begin(), m_Events.end(), back_inserter(*events));
+ m_Events.clear();
+
+ return true;
+}
+
+void Event::Post(const Event::Ptr& ev)
+{
+ mutex::scoped_lock lock(m_Mutex);
+ m_Events.push_back(ev);
+ m_EventAvailable.notify_all();
+}
--- /dev/null
+/******************************************************************************
+ * Icinga 2 *
+ * Copyright (C) 2012 Icinga Development Team (http://www.icinga.org/) *
+ * *
+ * This program is free software; you can redistribute it and/or *
+ * modify it under the terms of the GNU General Public License *
+ * as published by the Free Software Foundation; either version 2 *
+ * of the License, or (at your option) any later version. *
+ * *
+ * This program is distributed in the hope that it will be useful, *
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of *
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
+ * GNU General Public License for more details. *
+ * *
+ * You should have received a copy of the GNU General Public License *
+ * along with this program; if not, write to the Free Software Foundation *
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
+ ******************************************************************************/
+
+#ifndef EVENT_H
+#define EVENT_H
+
+namespace icinga
+{
+
+class Event : public Object
+{
+public:
+ typedef shared_ptr<Event> Ptr;
+ typedef weak_ptr<Event> WeakPtr;
+
+ static bool Wait(vector<Event::Ptr> *events, const system_time& wait_until);
+ static void Post(const Event::Ptr& ev);
+
+ boost::signal<void ()> OnEventDelivered;
+
+private:
+ static deque<Event::Ptr> m_Events;
+ static condition_variable m_EventAvailable;
+ static mutex m_Mutex;
+};
+
+}
+
+#endif /* EVENT_H */
using boost::thread_group;
using boost::mutex;
using boost::condition_variable;
+using boost::system_time;
#if defined(__APPLE__) && defined(__MACH__)
# pragma GCC diagnostic ignored "-Wdeprecated-declarations"
#include "utility.h"
#include "object.h"
#include "exception.h"
-#include "memory.h"
+#include "event.h"
#include "variant.h"
#include "dictionary.h"
#include "timer.h"
using namespace icinga;
-/**
- * A collection of weak pointers to Socket objects which have been
- * registered with the socket sub-system.
- */
-Socket::CollectionType Socket::Sockets;
-
/**
* Constructor for the Socket class.
*/
*/
Socket::~Socket(void)
{
- CloseInternal(true);
+ {
+ mutex::scoped_lock lock(m_Mutex);
+
+ CloseInternal(true);
+ }
}
-/**
- * Registers the socket and starts handling events for it.
- */
void Socket::Start(void)
{
- assert(m_FD != INVALID_SOCKET);
+ assert(!m_ReadThread.joinable() && !m_WriteThread.joinable());
+ assert(GetFD() != INVALID_SOCKET);
- OnException.connect(boost::bind(&Socket::ExceptionEventHandler, this));
+ m_ReadThread = thread(boost::bind(&Socket::ReadThreadProc, static_cast<Socket::Ptr>(GetSelf())));
+ m_ReadThread.detach();
- Sockets.push_back(GetSelf());
-}
-
-/**
- * Unregisters the sockets and stops handling events for it.
- */
-void Socket::Stop(void)
-{
- Sockets.remove_if(WeakPtrEqual<Socket>(this));
+ m_WriteThread = thread(boost::bind(&Socket::WriteThreadProc, static_cast<Socket::Ptr>(GetSelf())));
+ m_WriteThread.detach();
}
/**
*/
void Socket::SetFD(SOCKET fd)
{
- unsigned long lTrue = 1;
-
/* mark the socket as non-blocking */
if (fd != INVALID_SOCKET) {
#ifdef F_GETFL
if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) < 0)
throw PosixException("fcntl failed", errno);
#else /* F_GETFL */
+ unsigned long lTrue = 1;
ioctlsocket(fd, FIONBIO, &lTrue);
#endif /* F_GETFL */
}
*/
void Socket::Close(void)
{
+ mutex::scoped_lock lock(m_Mutex);
+
CloseInternal(false);
}
/* nobody can possibly have a valid event subscription when the
destructor has been called */
if (!from_dtor) {
- Stop();
-
- OnClosed(GetSelf());
+ Event::Ptr ev = boost::make_shared<Event>();
+ ev->OnEventDelivered.connect(boost::bind(boost::ref(OnClosed), GetSelf()));
+ Event::Post(ev);
}
}
void Socket::HandleSocketError(const std::exception& ex)
{
if (!OnError.empty()) {
- OnError(GetSelf(), ex);
+ Event::Ptr ev = boost::make_shared<Event>();
+ ev->OnEventDelivered.connect(boost::bind(boost::ref(OnError), GetSelf(), ex));
+ Event::Post(ev);
- Close();
+ CloseInternal(false);
} else {
throw ex;
}
/**
* Processes errors that have occured for the socket.
- *
- * @param - Event arguments for the socket error.
*/
-void Socket::ExceptionEventHandler(void)
+void Socket::HandleException(void)
{
HandleSocketError(SocketException(
"select() returned fd in except fdset", GetError()));
return false;
}
+void Socket::HandleReadable(void)
+{ }
+
/**
* Checks whether data should be written for this socket object.
*
return false;
}
+void Socket::HandleWritable(void)
+{ }
+
/**
* Formats a sockaddr in a human-readable way.
*
*/
string Socket::GetClientAddress(void)
{
+ mutex::scoped_lock lock(m_Mutex);
+
sockaddr_storage sin;
socklen_t len = sizeof(sin);
*/
string Socket::GetPeerAddress(void)
{
+ mutex::scoped_lock lock(m_Mutex);
+
sockaddr_storage sin;
socklen_t len = sizeof(sin);
string msg = message + ": " + details;
SetMessage(msg.c_str());
}
+
+void Socket::ReadThreadProc(void)
+{
+ mutex::scoped_lock lock(m_Mutex);
+
+ for (;;) {
+ fd_set readfds, exceptfds;
+
+ FD_ZERO(&readfds);
+ FD_ZERO(&exceptfds);
+
+ int fd = GetFD();
+
+ if (fd == INVALID_SOCKET)
+ return;
+
+ if (WantsToRead())
+ FD_SET(fd, &readfds);
+
+ FD_SET(fd, &exceptfds);
+
+ lock.unlock();
+
+ timeval tv;
+ tv.tv_sec = 5;
+ tv.tv_usec = 0;
+ int rc = select(fd + 1, &readfds, NULL, &exceptfds, &tv);
+
+ lock.lock();
+
+ if (rc < 0) {
+ HandleSocketError(SocketException("select() failed", GetError()));
+ return;
+ }
+
+ if (FD_ISSET(fd, &readfds))
+ HandleReadable();
+
+ if (FD_ISSET(fd, &exceptfds))
+ HandleException();
+
+ if (WantsToWrite())
+ ; /* notify Write thread */
+ }
+}
+
+void Socket::WriteThreadProc(void)
+{
+ mutex::scoped_lock lock(m_Mutex);
+
+ for (;;) {
+ fd_set writefds;
+
+ FD_ZERO(&writefds);
+
+ int fd = GetFD();
+
+ while (!WantsToWrite()) {
+ if (GetFD() == INVALID_SOCKET)
+ return;
+
+ lock.unlock();
+ Sleep(500);
+ lock.lock();
+ }
+
+ FD_SET(fd, &writefds);
+
+ lock.unlock();
+
+ int rc = select(fd + 1, NULL, &writefds, NULL, NULL);
+
+ lock.lock();
+
+ if (rc < 0) {
+ HandleSocketError(SocketException("select() failed", GetError()));
+ return;
+ }
+
+ if (FD_ISSET(fd, &writefds))
+ HandleWritable();
+ }
+}
+
+mutex& Socket::GetMutex(void) const
+{
+ return m_Mutex;
+}
typedef shared_ptr<Socket> Ptr;
typedef weak_ptr<Socket> WeakPtr;
- typedef list<Socket::WeakPtr> CollectionType;
+ //typedef list<Socket::WeakPtr> CollectionType;
- static Socket::CollectionType Sockets;
+ //static Socket::CollectionType Sockets;
~Socket(void);
- void SetFD(SOCKET fd);
- SOCKET GetFD(void) const;
-
- boost::signal<void (const Socket::Ptr&)> OnReadable;
- boost::signal<void (const Socket::Ptr&)> OnWritable;
- boost::signal<void (const Socket::Ptr&)> OnException;
+ //boost::signal<void (const Socket::Ptr&)> OnReadable;
+ //boost::signal<void (const Socket::Ptr&)> OnWritable;
+ //boost::signal<void (const Socket::Ptr&)> OnException;
boost::signal<void (const Socket::Ptr&, const std::exception&)> OnError;
boost::signal<void (const Socket::Ptr&)> OnClosed;
- virtual bool WantsToRead(void) const;
- virtual bool WantsToWrite(void) const;
-
virtual void Start(void);
- virtual void Stop(void);
+ //virtual void Stop(void);
void Close(void);
string GetClientAddress(void);
string GetPeerAddress(void);
+ mutex& GetMutex(void) const;
+
protected:
Socket(void);
+ void SetFD(SOCKET fd);
+ SOCKET GetFD(void) const;
+
int GetError(void) const;
static int GetLastSocketError(void);
void HandleSocketError(const std::exception& ex);
+ virtual bool WantsToRead(void) const;
+ virtual bool WantsToWrite(void) const;
+
+ virtual void HandleReadable(void);
+ virtual void HandleWritable(void);
+ virtual void HandleException(void);
+
virtual void CloseInternal(bool from_dtor);
+ mutable mutex m_Mutex;
+
private:
SOCKET m_FD; /**< The socket descriptor. */
+ thread m_ReadThread;
+ thread m_WriteThread;
+
+ void ReadThreadProc(void);
+ void WriteThreadProc(void);
+
void ExceptionEventHandler(void);
static string GetAddressFromSockaddr(sockaddr *address, socklen_t len);
return m_Role;
}
-/**
- * Registers the socket and starts processing events for it.
- */
-void TcpClient::Start(void)
-{
- TcpSocket::Start();
-
- OnReadable.connect(boost::bind(&TcpClient::ReadableEventHandler, this));
- OnWritable.connect(boost::bind(&TcpClient::WritableEventHandler, this));
-}
-
/**
* Creates a socket and connects to the specified node and service.
*
return m_SendQueue;
}
-size_t TcpClient::FlushSendQueue(void)
+void TcpClient::HandleWritable(void)
{
int rc;
if (rc <= 0) {
HandleSocketError(SocketException("send() failed", GetError()));
- return 0;
+ return;
}
m_SendQueue->Read(NULL, rc);
return m_RecvQueue;
}
-size_t TcpClient::FillRecvQueue(void)
+void TcpClient::HandleReadable(void)
{
- int rc;
-
- size_t bufferSize = FIFO::BlockSize / 2;
- char *buffer = (char *)m_RecvQueue->GetWriteBuffer(&bufferSize);
- rc = recv(GetFD(), buffer, bufferSize, 0);
-
-#ifdef _WIN32
- if (rc < 0 && WSAGetLastError() == WSAEWOULDBLOCK)
-#else /* _WIN32 */
- if (rc < 0 && errno == EAGAIN)
-#endif /* _WIN32 */
- return 0;
+ for (;;) {
+ size_t bufferSize = FIFO::BlockSize / 2;
+ char *buffer = (char *)m_RecvQueue->GetWriteBuffer(&bufferSize);
+ int rc = recv(GetFD(), buffer, bufferSize, 0);
+
+ #ifdef _WIN32
+ if (rc < 0 && WSAGetLastError() == WSAEWOULDBLOCK)
+ #else /* _WIN32 */
+ if (rc < 0 && errno == EAGAIN)
+ #endif /* _WIN32 */
+ return;
+
+ if (rc <= 0) {
+ HandleSocketError(SocketException("recv() failed", GetError()));
+ return;
+ }
- if (rc <= 0) {
- HandleSocketError(SocketException("recv() failed", GetError()));
- return 0;
+ m_RecvQueue->Write(NULL, rc);
}
- m_RecvQueue->Write(NULL, rc);
-
- return rc;
-}
-
-/**
- * Processes data that is available for this socket.
- */
-void TcpClient::ReadableEventHandler(void)
-{
- if (FillRecvQueue() > 0)
- OnDataAvailable(GetSelf());
-}
-
-/**
- * Processes data that can be written for this socket.
- */
-void TcpClient::WritableEventHandler(void)
-{
- FlushSendQueue();
+ Event::Ptr ev = boost::make_shared<Event>();
+ ev->OnEventDelivered.connect(boost::bind(boost::ref(OnDataAvailable), GetSelf()));
+ Event::Post(ev);
}
/**
TcpClientRole GetRole(void) const;
- virtual void Start(void);
-
void Connect(const string& node, const string& service);
FIFO::Ptr GetSendQueue(void);
FIFO::Ptr GetRecvQueue(void);
+ boost::signal<void (const TcpClient::Ptr&)> OnDataAvailable;
+
+protected:
virtual bool WantsToRead(void) const;
virtual bool WantsToWrite(void) const;
- boost::signal<void (const TcpClient::Ptr&)> OnDataAvailable;
+ virtual void HandleReadable(void);
+ virtual void HandleWritable(void);
private:
TcpClientRole m_Role;
FIFO::Ptr m_SendQueue;
FIFO::Ptr m_RecvQueue;
-
- virtual size_t FillRecvQueue(void);
- virtual size_t FlushSendQueue(void);
-
- void ReadableEventHandler(void);
- void WritableEventHandler(void);
};
/**
*
* @param clientFactory The client factory function.
*/
-void TcpServer::SetClientFactory(function<TcpClient::Ptr()> clientFactory)
+void TcpServer::SetClientFactory(function<TcpClient::Ptr(SOCKET)> clientFactory)
{
m_ClientFactory = clientFactory;
}
*
* @returns The client factory function.
*/
-function<TcpClient::Ptr()> TcpServer::GetFactoryFunction(void) const
+function<TcpClient::Ptr(SOCKET)> TcpServer::GetFactoryFunction(void) const
{
return m_ClientFactory;
}
-/**
- * Registers the TCP server and starts processing events for it.
- */
-void TcpServer::Start(void)
-{
- TcpSocket::Start();
-
- OnReadable.connect(boost::bind(&TcpServer::ReadableEventHandler, this));
-}
-
/**
* Starts listening for incoming client connections.
*/
}
}
+/**
+ * Checks whether the TCP server wants to read (i.e. accept new clients).
+ *
+ * @returns true
+ */
+bool TcpServer::WantsToRead(void) const
+{
+ return true;
+}
+
/**
* Accepts a new client and creates a new client object for it
* using the client factory function.
*/
-void TcpServer::ReadableEventHandler(void)
+void TcpServer::HandleReadable(void)
{
int fd;
sockaddr_storage addr;
return;
}
- TcpClient::Ptr client = m_ClientFactory();
- client->SetFD(fd);
- client->Start();
+ TcpClient::Ptr client = m_ClientFactory(fd);
- OnNewClient(GetSelf(), client);
-}
-
-/**
- * Checks whether the TCP server wants to read (i.e. accept new clients).
- *
- * @returns true
- */
-bool TcpServer::WantsToRead(void) const
-{
- return true;
+ Event::Ptr ev = boost::make_shared<Event>();
+ ev->OnEventDelivered.connect(boost::bind(boost::ref(OnNewClient), GetSelf(), client));
+ Event::Post(ev);
}
TcpServer(void);
- void SetClientFactory(function<TcpClient::Ptr()> function);
- function<TcpClient::Ptr()> GetFactoryFunction(void) const;
-
- virtual void Start();
+ void SetClientFactory(function<TcpClient::Ptr(SOCKET)> function);
+ function<TcpClient::Ptr(SOCKET)> GetFactoryFunction(void) const;
void Listen(void);
boost::signal<void (const TcpServer::Ptr&, const TcpClient::Ptr&)> OnNewClient;
+protected:
virtual bool WantsToRead(void) const;
-private:
- void ReadableEventHandler(void);
+ virtual void HandleReadable(void);
- function<TcpClient::Ptr()> m_ClientFactory;
+private:
+ function<TcpClient::Ptr(SOCKET)> m_ClientFactory;
};
}
m_BlockWrite = false;
}
-/**
- * Takes a certificate as an argument. Does nothing.
- *
- * @param certificate An X509 certificate.
- */
-void TlsClient::NullCertificateDeleter(X509 *certificate)
-{
- /* Nothing to do here. */
-}
-
-/**
- * Retrieves the X509 certficate for this client.
- *
- * @returns The X509 certificate.
- */
-shared_ptr<X509> TlsClient::GetClientCertificate(void) const
-{
- return shared_ptr<X509>(SSL_get_certificate(m_SSL.get()), &TlsClient::NullCertificateDeleter);
-}
-
-/**
- * Retrieves the X509 certficate for the peer.
- *
- * @returns The X509 certificate.
- */
-shared_ptr<X509> TlsClient::GetPeerCertificate(void) const
-{
- return shared_ptr<X509>(SSL_get_peer_certificate(m_SSL.get()), X509_free);
-}
-
-/**
- * Registers the TLS socket and starts processing events for it.
- */
void TlsClient::Start(void)
{
- TcpClient::Start();
-
m_SSL = shared_ptr<SSL>(SSL_new(m_SSLContext.get()), SSL_free);
if (!m_SSL)
SSL_set_connect_state(m_SSL.get());
SSL_do_handshake(m_SSL.get());
+
+ Socket::Start();
+}
+
+/**
+ * Takes a certificate as an argument. Does nothing.
+ *
+ * @param certificate An X509 certificate.
+ */
+void TlsClient::NullCertificateDeleter(X509 *certificate)
+{
+ /* Nothing to do here. */
+}
+
+/**
+ * Retrieves the X509 certficate for this client.
+ *
+ * @returns The X509 certificate.
+ */
+shared_ptr<X509> TlsClient::GetClientCertificate(void) const
+{
+ mutex::scoped_lock lock(GetMutex());
+
+ return shared_ptr<X509>(SSL_get_certificate(m_SSL.get()), &TlsClient::NullCertificateDeleter);
+}
+
+/**
+ * Retrieves the X509 certficate for the peer.
+ *
+ * @returns The X509 certificate.
+ */
+shared_ptr<X509> TlsClient::GetPeerCertificate(void) const
+{
+ mutex::scoped_lock lock(GetMutex());
+
+ return shared_ptr<X509>(SSL_get_peer_certificate(m_SSL.get()), X509_free);
}
/**
* Processes data that is available for this socket.
*/
-size_t TlsClient::FillRecvQueue(void)
+void TlsClient::HandleReadable(void)
{
int result;
result = 0;
for (;;) {
- int rc;
size_t bufferSize = FIFO::BlockSize / 2;
char *buffer = (char *)GetRecvQueue()->GetWriteBuffer(&bufferSize);
- rc = SSL_read(m_SSL.get(), buffer, bufferSize);
+ int rc = SSL_read(m_SSL.get(), buffer, bufferSize);
if (rc <= 0) {
switch (SSL_get_error(m_SSL.get(), rc)) {
m_BlockRead = true;
/* fall through */
case SSL_ERROR_WANT_READ:
- return result;
+ goto post_event;
case SSL_ERROR_ZERO_RETURN:
- Close();
- return result;
+ CloseInternal(false);
+ goto post_event;
default:
HandleSocketError(OpenSSLException(
"SSL_read failed", ERR_get_error()));
- return result;
+ goto post_event;
}
}
GetRecvQueue()->Write(NULL, rc);
-
- result += rc;
}
- return result;
+post_event:
+ Event::Ptr ev = boost::make_shared<Event>();
+ ev->OnEventDelivered.connect(boost::bind(boost::ref(OnDataAvailable), GetSelf()));
+ Event::Post(ev);
}
/**
* Processes data that can be written for this socket.
*/
-size_t TlsClient::FlushSendQueue(void)
+void TlsClient::HandleWritable(void)
{
- int rc;
-
m_BlockRead = false;
m_BlockWrite = false;
- rc = SSL_write(m_SSL.get(), (const char *)GetSendQueue()->GetReadBuffer(), GetSendQueue()->GetSize());
+ int rc = SSL_write(m_SSL.get(), (const char *)GetSendQueue()->GetReadBuffer(), GetSendQueue()->GetSize());
if (rc <= 0) {
switch (SSL_get_error(m_SSL.get(), rc)) {
m_BlockWrite = true;
/* fall through */
case SSL_ERROR_WANT_WRITE:
- return 0;
+ return;
case SSL_ERROR_ZERO_RETURN:
- Close();
- return 0;
+ CloseInternal(false);
+ return;
default:
HandleSocketError(OpenSSLException(
"SSL_write failed", ERR_get_error()));
- return 0;
+ return;
}
}
GetSendQueue()->Read(NULL, rc);
-
- return rc;
}
/**
SSL *ssl = (SSL *)X509_STORE_CTX_get_ex_data(x509Context, SSL_get_ex_data_X509_STORE_CTX_idx());
TlsClient *client = (TlsClient *)SSL_get_ex_data(ssl, m_SSLIndex);
+ assert(client->GetMutex().active_count);
+
if (client == NULL)
return 0;
- bool valid = (ok != 0);
+ return client->ValidateCertificateInternal(ok, x509Context);
+}
+
+int TlsClient::ValidateCertificateInternal(int ok, X509_STORE_CTX *x509Context)
+{
shared_ptr<X509> x509Certificate = shared_ptr<X509>(x509Context->cert, &TlsClient::NullCertificateDeleter);
- client->OnVerifyCertificate(client->GetSelf(), &valid, x509Context, x509Certificate);
+ bool valid = ValidateCertificate((ok != 0), x509Context, x509Certificate);
+
+ if (valid) {
+ Event::Ptr ev = boost::make_shared<Event>();
+ ev->OnEventDelivered.connect(boost::bind(boost::ref(OnCertificateValidated), GetSelf()));
+ Event::Post(ev);
+ }
return valid ? 1 : 0;
}
+
+bool TlsClient::ValidateCertificate(bool ok, X509_STORE_CTX *x509Context, const shared_ptr<X509>& x509Certificate)
+{
+ return ok;
+}
public:
TlsClient(TcpClientRole role, shared_ptr<SSL_CTX> sslContext);
+ virtual void Start(void);
+
shared_ptr<X509> GetClientCertificate(void) const;
shared_ptr<X509> GetPeerCertificate(void) const;
- virtual void Start(void);
+ boost::signal<void (const TlsClient::Ptr&)> OnCertificateValidated;
+
+protected:
+ void HandleSSLError(void);
virtual bool WantsToRead(void) const;
virtual bool WantsToWrite(void) const;
- boost::signal<void (const TlsClient::Ptr&, bool *, X509_STORE_CTX *, const shared_ptr<X509>&)> OnVerifyCertificate;
+ virtual void HandleReadable(void);
+ virtual void HandleWritable(void);
-protected:
- void HandleSSLError(void);
+ virtual bool ValidateCertificate(bool ok, X509_STORE_CTX *x509Context, const shared_ptr<X509>& x509Certificate);
private:
shared_ptr<SSL_CTX> m_SSLContext;
static int m_SSLIndex;
static bool m_SSLIndexInitialized;
- virtual size_t FillRecvQueue(void);
- virtual size_t FlushSendQueue(void);
-
virtual void CloseInternal(bool from_dtor);
static void NullCertificateDeleter(X509 *certificate);
static int SSLVerifyCertificate(int ok, X509_STORE_CTX *x509Context);
+ int ValidateCertificateInternal(int ok, X509_STORE_CTX *x509Context);
};
TcpClient::Ptr TlsClientFactory(TcpClientRole role, shared_ptr<SSL_CTX> sslContext);
<WarningLevel>Level3</WarningLevel>
<Optimization>Disabled</Optimization>
<PreprocessorDefinitions>WIN32;_DEBUG;_WINDOWS;_USRDLL;CHECKER_EXPORTS;%(PreprocessorDefinitions)</PreprocessorDefinitions>
+ <MultiProcessorCompilation>true</MultiProcessorCompilation>
+ <MinimalRebuild>false</MinimalRebuild>
</ClCompile>
<Link>
<SubSystem>Windows</SubSystem>
<FunctionLevelLinking>true</FunctionLevelLinking>
<IntrinsicFunctions>true</IntrinsicFunctions>
<PreprocessorDefinitions>WIN32;NDEBUG;_WINDOWS;_USRDLL;CHECKER_EXPORTS;%(PreprocessorDefinitions)</PreprocessorDefinitions>
+ <MultiProcessorCompilation>true</MultiProcessorCompilation>
+ <MinimalRebuild>false</MinimalRebuild>
</ClCompile>
<Link>
<SubSystem>Windows</SubSystem>
<Optimization>Disabled</Optimization>
<PreprocessorDefinitions>WIN32;_DEBUG;_LIB;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<WarningLevel>Level3</WarningLevel>
+ <MultiProcessorCompilation>true</MultiProcessorCompilation>
+ <MinimalRebuild>false</MinimalRebuild>
</ClCompile>
<Link>
<SubSystem>Windows</SubSystem>
<PreprocessorDefinitions>WIN32;NDEBUG;_LIB;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<FavorSizeOrSpeed>Speed</FavorSizeOrSpeed>
<WarningLevel>Level3</WarningLevel>
+ <MultiProcessorCompilation>true</MultiProcessorCompilation>
+ <MinimalRebuild>false</MinimalRebuild>
</ClCompile>
<Link>
<SubSystem>Windows</SubSystem>
<Optimization>Disabled</Optimization>
<PreprocessorDefinitions>WIN32;_DEBUG;_WINDOWS;_USRDLL;CONFIGCOMPONENT_EXPORTS;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<WarningLevel>Level3</WarningLevel>
+ <MultiProcessorCompilation>true</MultiProcessorCompilation>
+ <MinimalRebuild>false</MinimalRebuild>
</ClCompile>
<Link>
<SubSystem>Windows</SubSystem>
<PreprocessorDefinitions>WIN32;NDEBUG;_WINDOWS;_USRDLL;CONFIGCOMPONENT_EXPORTS;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<FavorSizeOrSpeed>Speed</FavorSizeOrSpeed>
<WarningLevel>Level3</WarningLevel>
+ <MultiProcessorCompilation>true</MultiProcessorCompilation>
+ <MinimalRebuild>false</MinimalRebuild>
</ClCompile>
<Link>
<SubSystem>Windows</SubSystem>
<WarningLevel>Level3</WarningLevel>
<Optimization>Disabled</Optimization>
<PreprocessorDefinitions>WIN32;_DEBUG;_WINDOWS;_USRDLL;DELEGATION_EXPORTS;%(PreprocessorDefinitions)</PreprocessorDefinitions>
+ <MultiProcessorCompilation>true</MultiProcessorCompilation>
+ <MinimalRebuild>false</MinimalRebuild>
</ClCompile>
<Link>
<SubSystem>Windows</SubSystem>
<FunctionLevelLinking>true</FunctionLevelLinking>
<IntrinsicFunctions>true</IntrinsicFunctions>
<PreprocessorDefinitions>WIN32;NDEBUG;_WINDOWS;_USRDLL;DELEGATION_EXPORTS;%(PreprocessorDefinitions)</PreprocessorDefinitions>
+ <MultiProcessorCompilation>true</MultiProcessorCompilation>
+ <MinimalRebuild>false</MinimalRebuild>
</ClCompile>
<Link>
<SubSystem>Windows</SubSystem>
<Optimization>Disabled</Optimization>
<PreprocessorDefinitions>WIN32;_DEBUG;_WINDOWS;_USRDLL;DEMO_EXPORTS;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<WarningLevel>Level3</WarningLevel>
+ <MultiProcessorCompilation>true</MultiProcessorCompilation>
+ <MinimalRebuild>false</MinimalRebuild>
</ClCompile>
<Link>
<SubSystem>Windows</SubSystem>
<IntrinsicFunctions>true</IntrinsicFunctions>
<PreprocessorDefinitions>WIN32;NDEBUG;_WINDOWS;_USRDLL;DEMO_EXPORTS;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<WarningLevel>Level3</WarningLevel>
+ <MultiProcessorCompilation>true</MultiProcessorCompilation>
+ <MinimalRebuild>false</MinimalRebuild>
</ClCompile>
<Link>
<SubSystem>Windows</SubSystem>
<Optimization>Disabled</Optimization>
<PreprocessorDefinitions>WIN32;_DEBUG;_WINDOWS;_USRDLL;DISCOVERY_EXPORTS;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<WarningLevel>Level3</WarningLevel>
+ <MultiProcessorCompilation>true</MultiProcessorCompilation>
+ <MinimalRebuild>false</MinimalRebuild>
</ClCompile>
<Link>
<SubSystem>Windows</SubSystem>
<IntrinsicFunctions>true</IntrinsicFunctions>
<PreprocessorDefinitions>WIN32;NDEBUG;_WINDOWS;_USRDLL;DISCOVERY_EXPORTS;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<WarningLevel>Level3</WarningLevel>
+ <MultiProcessorCompilation>true</MultiProcessorCompilation>
+ <MinimalRebuild>false</MinimalRebuild>
</ClCompile>
<Link>
<SubSystem>Windows</SubSystem>
<WarningLevel>Level3</WarningLevel>
<Optimization>Disabled</Optimization>
<PreprocessorDefinitions>_WINDLL;I2_DYN_BUILD;_DEBUG;%(PreprocessorDefinitions)</PreprocessorDefinitions>
+ <MultiProcessorCompilation>true</MultiProcessorCompilation>
+ <MinimalRebuild>false</MinimalRebuild>
</ClCompile>
<Link>
<SubSystem>Windows</SubSystem>
<FunctionLevelLinking>true</FunctionLevelLinking>
<IntrinsicFunctions>true</IntrinsicFunctions>
<PreprocessorDefinitions>_WINDLL;I2_DYN_BUILD;%(PreprocessorDefinitions)</PreprocessorDefinitions>
+ <MultiProcessorCompilation>true</MultiProcessorCompilation>
+ <MinimalRebuild>false</MinimalRebuild>
</ClCompile>
<Link>
<SubSystem>Windows</SubSystem>
</PrecompiledHeader>
<WarningLevel>Level3</WarningLevel>
<Optimization>Disabled</Optimization>
+ <MultiProcessorCompilation>true</MultiProcessorCompilation>
+ <MinimalRebuild>false</MinimalRebuild>
</ClCompile>
<Link>
<SubSystem>Console</SubSystem>
<FunctionLevelLinking>true</FunctionLevelLinking>
<IntrinsicFunctions>true</IntrinsicFunctions>
<PreprocessorDefinitions>WIN32;NDEBUG;_CONSOLE;%(PreprocessorDefinitions)</PreprocessorDefinitions>
+ <MultiProcessorCompilation>true</MultiProcessorCompilation>
+ <MinimalRebuild>false</MinimalRebuild>
</ClCompile>
<Link>
<SubSystem>Console</SubSystem>
<Optimization>Disabled</Optimization>
<PreprocessorDefinitions>WIN32;I2_ICINGALAUNCHER_BUILD;_DEBUG;_CONSOLE;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<WarningLevel>Level3</WarningLevel>
+ <MultiProcessorCompilation>true</MultiProcessorCompilation>
+ <MinimalRebuild>false</MinimalRebuild>
</ClCompile>
<Link>
<SubSystem>Console</SubSystem>
<PreprocessorDefinitions>WIN32;I2_ICINGALAUNCHER_BUILD;NDEBUG;_CONSOLE;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<FavorSizeOrSpeed>Speed</FavorSizeOrSpeed>
<WarningLevel>Level3</WarningLevel>
+ <MultiProcessorCompilation>true</MultiProcessorCompilation>
+ <MinimalRebuild>false</MinimalRebuild>
</ClCompile>
<Link>
<SubSystem>Console</SubSystem>
JsonRpcEndpoint::Ptr endpoint = boost::make_shared<JsonRpcEndpoint>();
endpoint->SetClient(static_pointer_cast<JsonRpcClient>(client));
+ client->Start();
RegisterEndpoint(endpoint);
}
<Optimization>Disabled</Optimization>
<PreprocessorDefinitions>WIN32;I2_ICINGA_BUILD;_DEBUG;_CONSOLE;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<WarningLevel>Level3</WarningLevel>
+ <MultiProcessorCompilation>true</MultiProcessorCompilation>
+ <MinimalRebuild>false</MinimalRebuild>
</ClCompile>
<Link>
<SubSystem>Console</SubSystem>
<PreprocessorDefinitions>WIN32;I2_ICINGA_BUILD;NDEBUG;_CONSOLE;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<FavorSizeOrSpeed>Speed</FavorSizeOrSpeed>
<WarningLevel>Level3</WarningLevel>
+ <MultiProcessorCompilation>true</MultiProcessorCompilation>
+ <MinimalRebuild>false</MinimalRebuild>
</ClCompile>
<Link>
<SubSystem>Console</SubSystem>
client->OnNewMessage.connect(boost::bind(&JsonRpcEndpoint::NewMessageHandler, this, _2));
client->OnClosed.connect(boost::bind(&JsonRpcEndpoint::ClientClosedHandler, this));
client->OnError.connect(boost::bind(&JsonRpcEndpoint::ClientErrorHandler, this, _2));
- client->OnVerifyCertificate.connect(boost::bind(&JsonRpcEndpoint::VerifyCertificateHandler, this, _2, _4));
+ client->OnCertificateValidated.connect(boost::bind(&JsonRpcEndpoint::CertificateValidatedHandler, this));
}
bool JsonRpcEndpoint::IsLocal(void) const
Application::Log(LogWarning, "jsonrpc", message.str());
}
-void JsonRpcEndpoint::VerifyCertificateHandler(bool *valid, const shared_ptr<X509>& certificate)
+void JsonRpcEndpoint::CertificateValidatedHandler(void)
{
- if (certificate && *valid) {
- string identity = Utility::GetCertificateCN(certificate);
+ string identity = Utility::GetCertificateCN(m_Client->GetPeerCertificate());
- if (GetIdentity().empty() && !identity.empty()) {
- m_Identity = identity;
- GetEndpointManager()->RegisterEndpoint(GetSelf());
- }
+ if (GetIdentity().empty() && !identity.empty()) {
+ m_Identity = identity;
+ GetEndpointManager()->RegisterEndpoint(GetSelf());
}
}
void NewMessageHandler(const MessagePart& message);
void ClientClosedHandler(void);
void ClientErrorHandler(const std::exception& ex);
- void VerifyCertificateHandler(bool *valid, const shared_ptr<X509>& certificate);
+ void CertificateValidatedHandler(void);
};
}
<Optimization>Disabled</Optimization>
<PreprocessorDefinitions>WIN32;I2_JSONRPC_BUILD;_DEBUG;_LIB;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<WarningLevel>Level3</WarningLevel>
+ <MultiProcessorCompilation>true</MultiProcessorCompilation>
+ <MinimalRebuild>false</MinimalRebuild>
</ClCompile>
<Link>
<SubSystem>Windows</SubSystem>
<PreprocessorDefinitions>WIN32;I2_JSONRPC_BUILD;NDEBUG;_LIB;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<FavorSizeOrSpeed>Speed</FavorSizeOrSpeed>
<WarningLevel>Level3</WarningLevel>
+ <MultiProcessorCompilation>true</MultiProcessorCompilation>
+ <MinimalRebuild>false</MinimalRebuild>
</ClCompile>
<Link>
<SubSystem>Windows</SubSystem>
*/
void JsonRpcClient::SendMessage(const MessagePart& message)
{
+ mutex::scoped_lock lock(GetMutex());
+
Netstring::WriteStringToFIFO(GetSendQueue(), message.ToJsonString());
}
string jsonString;
MessagePart message;
- if (!Netstring::ReadStringFromFIFO(GetRecvQueue(), &jsonString))
- return;
+ {
+ mutex::scoped_lock lock(GetMutex());
+
+ if (!Netstring::ReadStringFromFIFO(GetRecvQueue(), &jsonString))
+ return;
+ }
message = MessagePart(jsonString);
OnNewMessage(GetSelf(), message);
- } catch (const Exception& ex) {
- Application::Log(LogCritical, "jsonrpc", "Exception while processing message from JSON-RPC client: " + string(ex.GetMessage()));
+ } catch (const std::exception& ex) {
+ Application::Log(LogCritical, "jsonrpc", "Exception while processing message from JSON-RPC client: " + string(ex.what()));
Close();
return;
/**
* Factory function for JSON-RPC clients.
*
+ * @param fd The file descriptor.
* @param role The role of the underlying TCP client.
* @param sslContext SSL context for the TLS connection.
* @returns A new JSON-RPC client.
*/
-JsonRpcClient::Ptr icinga::JsonRpcClientFactory(TcpClientRole role, shared_ptr<SSL_CTX> sslContext)
+JsonRpcClient::Ptr icinga::JsonRpcClientFactory(SOCKET fd, TcpClientRole role, shared_ptr<SSL_CTX> sslContext)
{
- return boost::make_shared<JsonRpcClient>(role, sslContext);
+ JsonRpcClient::Ptr client = boost::make_shared<JsonRpcClient>(role, sslContext);
+ client->SetFD(fd);
+ return client;
}
private:
void DataAvailableHandler(void);
+
+ friend JsonRpcClient::Ptr JsonRpcClientFactory(SOCKET fd, TcpClientRole role, shared_ptr<SSL_CTX> sslContext);
};
-JsonRpcClient::Ptr JsonRpcClientFactory(TcpClientRole role, shared_ptr<SSL_CTX> sslContext);
+JsonRpcClient::Ptr JsonRpcClientFactory(SOCKET fd, TcpClientRole role, shared_ptr<SSL_CTX> sslContext);
}
*/
JsonRpcServer::JsonRpcServer(shared_ptr<SSL_CTX> sslContext)
{
- SetClientFactory(boost::bind(&JsonRpcClientFactory, RoleInbound, sslContext));
+ SetClientFactory(boost::bind(&JsonRpcClientFactory, _1, RoleInbound, sslContext));
}
<Optimization>Disabled</Optimization>
<PreprocessorDefinitions>WIN32;_DEBUG;_LIB;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<WarningLevel>Level3</WarningLevel>
+ <MultiProcessorCompilation>true</MultiProcessorCompilation>
+ <MinimalRebuild>false</MinimalRebuild>
</ClCompile>
<Link>
<SubSystem>Windows</SubSystem>
<PreprocessorDefinitions>WIN32;NDEBUG;_LIB;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<FavorSizeOrSpeed>Speed</FavorSizeOrSpeed>
<WarningLevel>Level3</WarningLevel>
+ <MultiProcessorCompilation>true</MultiProcessorCompilation>
+ <MinimalRebuild>false</MinimalRebuild>
</ClCompile>
<Link>
<SubSystem>Windows</SubSystem>
<Optimization>Disabled</Optimization>
<PreprocessorDefinitions>WIN32;_DEBUG;_LIB;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<WarningLevel>Level3</WarningLevel>
+ <MultiProcessorCompilation>true</MultiProcessorCompilation>
+ <MinimalRebuild>false</MinimalRebuild>
</ClCompile>
<Link>
<SubSystem>Windows</SubSystem>
<IntrinsicFunctions>true</IntrinsicFunctions>
<PreprocessorDefinitions>WIN32;NDEBUG;_LIB;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<WarningLevel>Level3</WarningLevel>
+ <MultiProcessorCompilation>true</MultiProcessorCompilation>
+ <MinimalRebuild>false</MinimalRebuild>
</ClCompile>
<Link>
<SubSystem>Windows</SubSystem>