]> granicus.if.org Git - icinga2/commitdiff
Made sockets multi-threaded.
authorGunnar Beutner <gunnar@beutner.name>
Sun, 24 Jun 2012 00:56:48 +0000 (02:56 +0200)
committerGunnar Beutner <gunnar@beutner.name>
Sun, 24 Jun 2012 03:27:01 +0000 (05:27 +0200)
33 files changed:
base/Makefile.am
base/application.cpp
base/base.vcxproj
base/event.cpp [new file with mode: 0644]
base/event.h [new file with mode: 0644]
base/i2-base.h
base/socket.cpp
base/socket.h
base/tcpclient.cpp
base/tcpclient.h
base/tcpserver.cpp
base/tcpserver.h
base/tlsclient.cpp
base/tlsclient.h
components/checker/checker.vcxproj
components/configfile/configfile.vcxproj
components/configrpc/configrpc.vcxproj
components/delegation/delegation.vcxproj
components/demo/demo.vcxproj
components/discovery/discovery.vcxproj
dyn/dyn.vcxproj
dyntest/dyntest.vcxproj
icinga-app/icinga-app.vcxproj
icinga/endpointmanager.cpp
icinga/icinga.vcxproj
icinga/jsonrpcendpoint.cpp
icinga/jsonrpcendpoint.h
jsonrpc/jsonrpc.vcxproj
jsonrpc/jsonrpcclient.cpp
jsonrpc/jsonrpcclient.h
jsonrpc/jsonrpcserver.cpp
third-party/cJSON/cJSON.vcxproj
third-party/mmatch/mmatch.vcxproj

index 02fc2be3cefe9b04dba762470f7894b1c7a066b6..5ceedd7306eac85fd6967a34a35f518b538317cb 100644 (file)
@@ -13,6 +13,8 @@ libbase_la_SOURCES =  \
        configobject.h \
        dictionary.cpp \
        dictionary.h \
+       event.cpp \
+       event.h \
        exception.cpp \
        exception.h \
        fifo.cpp \
index 8ecf9348a075539fc461c0ab325d42bbf55c3395..80470aa94953c4c30b07417f109358f162ee3c16 100644 (file)
@@ -96,9 +96,6 @@ Application::Ptr Application::GetInstance(void)
 void Application::RunEventLoop(void)
 {
        while (!m_ShuttingDown) {
-               fd_set readfds, writefds, exceptfds;
-               int nfds = -1;
-
                Object::ClearHeldObjects();
 
                long sleep = Timer::ProcessTimers();
@@ -106,80 +103,13 @@ void Application::RunEventLoop(void)
                if (m_ShuttingDown)
                        break;
 
-               FD_ZERO(&readfds);
-               FD_ZERO(&writefds);
-               FD_ZERO(&exceptfds);
-
-               Socket::CollectionType::iterator prev, i;
-               for (i = Socket::Sockets.begin();
-                   i != Socket::Sockets.end(); ) {
-                       Socket::Ptr socket = i->lock();
-
-                       prev = i;
-                       i++;
-
-                       if (!socket) {
-                               Socket::Sockets.erase(prev);
-                               continue;
-                       }
-
-                       int fd = socket->GetFD();
-
-                       if (socket->WantsToWrite())
-                               FD_SET(fd, &writefds);
-
-                       if (socket->WantsToRead())
-                               FD_SET(fd, &readfds);
-
-                       FD_SET(fd, &exceptfds);
-
-                       if (fd > nfds)
-                               nfds = fd;
-               }
-
-               timeval tv;
-               tv.tv_sec = sleep;
-               tv.tv_usec = 0;
-
-               int ready;
-
-               if (nfds == -1) {
-                       Sleep(tv.tv_sec * 1000 + tv.tv_usec);
-                       ready = 0;
-               } else
-                       ready = select(nfds + 1, &readfds, &writefds,
-                           &exceptfds, &tv);
-
-               if (ready < 0)
-                       break;
-               else if (ready == 0)
-                       continue;
-
-               for (i = Socket::Sockets.begin();
-                   i != Socket::Sockets.end(); ) {
-                       Socket::Ptr socket = i->lock();
-
-                       prev = i;
-                       i++;
-
-                       if (!socket) {
-                               Socket::Sockets.erase(prev);
-                               continue;
-                       }
-
-                       int fd;
-
-                       fd = socket->GetFD();
-                       if (fd != INVALID_SOCKET && FD_ISSET(fd, &writefds))
-                               socket->OnWritable(socket);
-
-                       fd = socket->GetFD();
-                       if (fd != INVALID_SOCKET && FD_ISSET(fd, &readfds))
-                               socket->OnReadable(socket);
+               vector<Event::Ptr> events;
+               
+               Event::Wait(&events, boost::get_system_time() + boost::posix_time::seconds(sleep));
 
-                       fd = socket->GetFD();
-                       if (fd != INVALID_SOCKET && FD_ISSET(fd, &exceptfds))
-                               socket->OnException(socket);
+               for (vector<Event::Ptr>::iterator it = events.begin(); it != events.end(); it++) {
+                       Event::Ptr ev = *it;
+                       ev->OnEventDelivered();
                }
        }
 }
@@ -296,7 +226,7 @@ void Application::Log(LogSeverity severity, const string& facility, const string
        char timestamp[100];
 
        // TODO: make this configurable
-       if (!IsDebugging() && severity < LogInformation)
+       if (/*!IsDebugging() && */severity < LogInformation)
                return;
 
        string severityStr;
index 53ae7c6d0df376212f4003d99e467472346e8f35..d22ca8f05d7cceff3640c700a492f6b0ad9a4ceb 100644 (file)
@@ -15,6 +15,7 @@
     <ClCompile Include="component.cpp" />
     <ClCompile Include="configobject.cpp" />
     <ClCompile Include="dictionary.cpp" />
+    <ClCompile Include="event.cpp" />
     <ClCompile Include="exception.cpp" />
     <ClCompile Include="fifo.cpp" />
     <ClCompile Include="object.cpp" />
@@ -37,6 +38,7 @@
     <ClInclude Include="component.h" />
     <ClInclude Include="configobject.h" />
     <ClInclude Include="dictionary.h" />
+    <ClInclude Include="event.h" />
     <ClInclude Include="objectmap.h" />
     <ClInclude Include="objectset.h" />
     <ClInclude Include="exception.h" />
@@ -97,6 +99,8 @@
       <Optimization>Disabled</Optimization>
       <PreprocessorDefinitions>_WINDLL;I2_BASE_BUILD;_DEBUG;%(PreprocessorDefinitions)</PreprocessorDefinitions>
       <WarningLevel>Level3</WarningLevel>
+      <MultiProcessorCompilation>true</MultiProcessorCompilation>
+      <MinimalRebuild>false</MinimalRebuild>
     </ClCompile>
     <Link>
       <SubSystem>Windows</SubSystem>
       <PreprocessorDefinitions>_WINDLL;I2_BASE_BUILD;%(PreprocessorDefinitions)</PreprocessorDefinitions>
       <FavorSizeOrSpeed>Speed</FavorSizeOrSpeed>
       <WarningLevel>Level3</WarningLevel>
+      <MultiProcessorCompilation>true</MultiProcessorCompilation>
+      <MinimalRebuild>false</MinimalRebuild>
     </ClCompile>
     <Link>
       <SubSystem>Windows</SubSystem>
diff --git a/base/event.cpp b/base/event.cpp
new file mode 100644 (file)
index 0000000..284f727
--- /dev/null
@@ -0,0 +1,49 @@
+/******************************************************************************
+ * Icinga 2                                                                   *
+ * Copyright (C) 2012 Icinga Development Team (http://www.icinga.org/)        *
+ *                                                                            *
+ * This program is free software; you can redistribute it and/or              *
+ * modify it under the terms of the GNU General Public License                *
+ * as published by the Free Software Foundation; either version 2             *
+ * of the License, or (at your option) any later version.                     *
+ *                                                                            *
+ * This program is distributed in the hope that it will be useful,            *
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of             *
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the              *
+ * GNU General Public License for more details.                               *
+ *                                                                            *
+ * You should have received a copy of the GNU General Public License          *
+ * along with this program; if not, write to the Free Software Foundation     *
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA.             *
+ ******************************************************************************/
+
+#include "i2-base.h"
+
+using namespace icinga;
+
+deque<Event::Ptr> Event::m_Events;
+condition_variable Event::m_EventAvailable;
+mutex Event::m_Mutex;
+
+bool Event::Wait(vector<Event::Ptr> *events, const system_time& wait_until)
+{
+       mutex::scoped_lock lock(m_Mutex);
+
+       while (m_Events.empty()) {
+               if (!m_EventAvailable.timed_wait(lock, wait_until))
+                       return false;
+       }
+       
+       vector<Event::Ptr> result;
+       std::copy(m_Events.begin(), m_Events.end(), back_inserter(*events));
+       m_Events.clear();
+
+       return true;
+}
+
+void Event::Post(const Event::Ptr& ev)
+{
+       mutex::scoped_lock lock(m_Mutex);
+       m_Events.push_back(ev);
+       m_EventAvailable.notify_all();
+}
diff --git a/base/event.h b/base/event.h
new file mode 100644 (file)
index 0000000..bb586ef
--- /dev/null
@@ -0,0 +1,45 @@
+/******************************************************************************
+ * Icinga 2                                                                   *
+ * Copyright (C) 2012 Icinga Development Team (http://www.icinga.org/)        *
+ *                                                                            *
+ * This program is free software; you can redistribute it and/or              *
+ * modify it under the terms of the GNU General Public License                *
+ * as published by the Free Software Foundation; either version 2             *
+ * of the License, or (at your option) any later version.                     *
+ *                                                                            *
+ * This program is distributed in the hope that it will be useful,            *
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of             *
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the              *
+ * GNU General Public License for more details.                               *
+ *                                                                            *
+ * You should have received a copy of the GNU General Public License          *
+ * along with this program; if not, write to the Free Software Foundation     *
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA.             *
+ ******************************************************************************/
+
+#ifndef EVENT_H
+#define EVENT_H
+
+namespace icinga
+{
+
+class Event : public Object
+{
+public:
+       typedef shared_ptr<Event> Ptr;
+       typedef weak_ptr<Event> WeakPtr;
+
+       static bool Wait(vector<Event::Ptr> *events, const system_time& wait_until);
+       static void Post(const Event::Ptr& ev);
+
+       boost::signal<void ()> OnEventDelivered;
+
+private:
+       static deque<Event::Ptr> m_Events;
+       static condition_variable m_EventAvailable;
+       static mutex m_Mutex;
+};
+
+}
+
+#endif /* EVENT_H */
index bdfb75729284cea410a9cd768bbcd4934a082e4a..f795e507ef44a74ef9e35f2e86ab15ed253fc976 100644 (file)
@@ -128,6 +128,7 @@ using boost::thread;
 using boost::thread_group;
 using boost::mutex;
 using boost::condition_variable;
+using boost::system_time;
 
 #if defined(__APPLE__) && defined(__MACH__)
 #      pragma GCC diagnostic ignored "-Wdeprecated-declarations" 
@@ -150,7 +151,7 @@ using boost::condition_variable;
 #include "utility.h"
 #include "object.h"
 #include "exception.h"
-#include "memory.h"
+#include "event.h"
 #include "variant.h"
 #include "dictionary.h"
 #include "timer.h"
index fc90011ceac97a34d0d8340de970baf6a9747a18..b27459cb21520e0c3dc9f0d93de3366dc05c0136 100644 (file)
 
 using namespace icinga;
 
-/**
- * A collection of weak pointers to Socket objects which have been
- * registered with the socket sub-system.
- */
-Socket::CollectionType Socket::Sockets;
-
 /**
  * Constructor for the Socket class.
  */
@@ -40,27 +34,23 @@ Socket::Socket(void)
  */
 Socket::~Socket(void)
 {
-       CloseInternal(true);
+       {
+               mutex::scoped_lock lock(m_Mutex);
+
+               CloseInternal(true);
+       }
 }
 
-/**
- * Registers the socket and starts handling events for it.
- */
 void Socket::Start(void)
 {
-       assert(m_FD != INVALID_SOCKET);
+       assert(!m_ReadThread.joinable() && !m_WriteThread.joinable());
+       assert(GetFD() != INVALID_SOCKET);
 
-       OnException.connect(boost::bind(&Socket::ExceptionEventHandler, this));
+       m_ReadThread = thread(boost::bind(&Socket::ReadThreadProc, static_cast<Socket::Ptr>(GetSelf())));
+       m_ReadThread.detach();
 
-       Sockets.push_back(GetSelf());
-}
-
-/**
- * Unregisters the sockets and stops handling events for it.
- */
-void Socket::Stop(void)
-{
-       Sockets.remove_if(WeakPtrEqual<Socket>(this));
+       m_WriteThread = thread(boost::bind(&Socket::WriteThreadProc, static_cast<Socket::Ptr>(GetSelf())));
+       m_WriteThread.detach();
 }
 
 /**
@@ -70,8 +60,6 @@ void Socket::Stop(void)
  */
 void Socket::SetFD(SOCKET fd)
 {
-       unsigned long lTrue = 1;
-
        /* mark the socket as non-blocking */
        if (fd != INVALID_SOCKET) {
 #ifdef F_GETFL
@@ -83,6 +71,7 @@ void Socket::SetFD(SOCKET fd)
                if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) < 0)
                        throw PosixException("fcntl failed", errno);
 #else /* F_GETFL */
+               unsigned long lTrue = 1;
                ioctlsocket(fd, FIONBIO, &lTrue);
 #endif /* F_GETFL */
        }
@@ -105,6 +94,8 @@ SOCKET Socket::GetFD(void) const
  */
 void Socket::Close(void)
 {
+       mutex::scoped_lock lock(m_Mutex);
+
        CloseInternal(false);
 }
 
@@ -124,9 +115,9 @@ void Socket::CloseInternal(bool from_dtor)
        /* nobody can possibly have a valid event subscription when the
                destructor has been called */
        if (!from_dtor) {
-               Stop();
-
-               OnClosed(GetSelf());
+               Event::Ptr ev = boost::make_shared<Event>();
+               ev->OnEventDelivered.connect(boost::bind(boost::ref(OnClosed), GetSelf()));
+               Event::Post(ev);
        }
 }
 
@@ -171,9 +162,11 @@ int Socket::GetLastSocketError(void)
 void Socket::HandleSocketError(const std::exception& ex)
 {
        if (!OnError.empty()) {
-               OnError(GetSelf(), ex);
+               Event::Ptr ev = boost::make_shared<Event>();
+               ev->OnEventDelivered.connect(boost::bind(boost::ref(OnError), GetSelf(), ex));
+               Event::Post(ev);
 
-               Close();
+               CloseInternal(false);
        } else {
                throw ex;
        }
@@ -181,10 +174,8 @@ void Socket::HandleSocketError(const std::exception& ex)
 
 /**
  * Processes errors that have occured for the socket.
- *
- * @param - Event arguments for the socket error.
  */
-void Socket::ExceptionEventHandler(void)
+void Socket::HandleException(void)
 {
        HandleSocketError(SocketException(
            "select() returned fd in except fdset", GetError()));
@@ -200,6 +191,9 @@ bool Socket::WantsToRead(void) const
        return false;
 }
 
+void Socket::HandleReadable(void)
+{ }
+
 /**
  * Checks whether data should be written for this socket object.
  *
@@ -210,6 +204,9 @@ bool Socket::WantsToWrite(void) const
        return false;
 }
 
+void Socket::HandleWritable(void)
+{ }
+
 /**
  * Formats a sockaddr in a human-readable way.
  *
@@ -236,6 +233,8 @@ string Socket::GetAddressFromSockaddr(sockaddr *address, socklen_t len)
  */
 string Socket::GetClientAddress(void)
 {
+       mutex::scoped_lock lock(m_Mutex);
+
        sockaddr_storage sin;
        socklen_t len = sizeof(sin);
 
@@ -256,6 +255,8 @@ string Socket::GetClientAddress(void)
  */
 string Socket::GetPeerAddress(void)
 {
+       mutex::scoped_lock lock(m_Mutex);
+
        sockaddr_storage sin;
        socklen_t len = sizeof(sin);
 
@@ -286,3 +287,91 @@ SocketException::SocketException(const string& message, int errorCode)
        string msg = message + ": " + details;
        SetMessage(msg.c_str());
 }
+
+void Socket::ReadThreadProc(void)
+{
+       mutex::scoped_lock lock(m_Mutex);
+
+       for (;;) {
+               fd_set readfds, exceptfds;
+
+               FD_ZERO(&readfds);
+               FD_ZERO(&exceptfds);
+
+               int fd = GetFD();
+
+               if (fd == INVALID_SOCKET)
+                       return;
+
+               if (WantsToRead())
+                       FD_SET(fd, &readfds);
+
+               FD_SET(fd, &exceptfds);
+
+               lock.unlock();
+
+               timeval tv;
+               tv.tv_sec = 5;
+               tv.tv_usec = 0;
+               int rc = select(fd + 1, &readfds, NULL, &exceptfds, &tv);
+
+               lock.lock();
+
+               if (rc < 0) {
+                       HandleSocketError(SocketException("select() failed", GetError()));
+                       return;
+               }
+
+               if (FD_ISSET(fd, &readfds))
+                       HandleReadable();
+
+               if (FD_ISSET(fd, &exceptfds))
+                       HandleException();
+
+               if (WantsToWrite())
+                       ; /* notify Write thread */
+       }
+}
+
+void Socket::WriteThreadProc(void)
+{
+       mutex::scoped_lock lock(m_Mutex);
+
+       for (;;) {
+               fd_set writefds;
+
+               FD_ZERO(&writefds);
+
+               int fd = GetFD();
+
+               while (!WantsToWrite()) {
+                       if (GetFD() == INVALID_SOCKET)
+                               return;
+
+                       lock.unlock();
+                       Sleep(500);
+                       lock.lock();
+               }
+
+               FD_SET(fd, &writefds);
+
+               lock.unlock();
+
+               int rc = select(fd + 1, NULL, &writefds, NULL, NULL);
+
+               lock.lock();
+
+               if (rc < 0) {
+                       HandleSocketError(SocketException("select() failed", GetError()));
+                       return;
+               }
+
+               if (FD_ISSET(fd, &writefds))
+                       HandleWritable();
+       }
+}
+
+mutex& Socket::GetMutex(void) const
+{
+       return m_Mutex;
+}
index aa920d44d4d77282da0fefef65ab061afda00d7c..4a391e1dd0ecad0ed966e45c900f0c0b7ec2f4dd 100644 (file)
@@ -33,45 +33,59 @@ public:
        typedef shared_ptr<Socket> Ptr;
        typedef weak_ptr<Socket> WeakPtr;
 
-       typedef list<Socket::WeakPtr> CollectionType;
+       //typedef list<Socket::WeakPtr> CollectionType;
 
-       static Socket::CollectionType Sockets;
+       //static Socket::CollectionType Sockets;
 
        ~Socket(void);
 
-       void SetFD(SOCKET fd);
-       SOCKET GetFD(void) const;
-
-       boost::signal<void (const Socket::Ptr&)> OnReadable;
-       boost::signal<void (const Socket::Ptr&)> OnWritable;
-       boost::signal<void (const Socket::Ptr&)> OnException;
+       //boost::signal<void (const Socket::Ptr&)> OnReadable;
+       //boost::signal<void (const Socket::Ptr&)> OnWritable;
+       //boost::signal<void (const Socket::Ptr&)> OnException;
 
        boost::signal<void (const Socket::Ptr&, const std::exception&)> OnError;
        boost::signal<void (const Socket::Ptr&)> OnClosed;
 
-       virtual bool WantsToRead(void) const;
-       virtual bool WantsToWrite(void) const;
-
        virtual void Start(void);
-       virtual void Stop(void);
+       //virtual void Stop(void);
 
        void Close(void);
 
        string GetClientAddress(void);
        string GetPeerAddress(void);
 
+       mutex& GetMutex(void) const;
+
 protected:
        Socket(void);
 
+       void SetFD(SOCKET fd);
+       SOCKET GetFD(void) const;
+
        int GetError(void) const;
        static int GetLastSocketError(void);
        void HandleSocketError(const std::exception& ex);
 
+       virtual bool WantsToRead(void) const;
+       virtual bool WantsToWrite(void) const;
+
+       virtual void HandleReadable(void);
+       virtual void HandleWritable(void);
+       virtual void HandleException(void);
+
        virtual void CloseInternal(bool from_dtor);
 
+       mutable mutex m_Mutex;
+
 private:
        SOCKET m_FD; /**< The socket descriptor. */
 
+       thread m_ReadThread;
+       thread m_WriteThread;
+
+       void ReadThreadProc(void);
+       void WriteThreadProc(void);
+
        void ExceptionEventHandler(void);
 
        static string GetAddressFromSockaddr(sockaddr *address, socklen_t len);
index 70fa814067a91dbfb25b6e04193586a89045a893..2b0d18f52b9ff1e4ad6a01e6e2d3e06b56bb2010 100644 (file)
@@ -44,17 +44,6 @@ TcpClientRole TcpClient::GetRole(void) const
        return m_Role;
 }
 
-/**
- * Registers the socket and starts processing events for it.
- */
-void TcpClient::Start(void)
-{
-       TcpSocket::Start();
-
-       OnReadable.connect(boost::bind(&TcpClient::ReadableEventHandler, this));
-       OnWritable.connect(boost::bind(&TcpClient::WritableEventHandler, this));
-}
-
 /**
  * Creates a socket and connects to the specified node and service.
  *
@@ -124,7 +113,7 @@ FIFO::Ptr TcpClient::GetSendQueue(void)
        return m_SendQueue;
 }
 
-size_t TcpClient::FlushSendQueue(void)
+void TcpClient::HandleWritable(void)
 {
        int rc;
 
@@ -132,7 +121,7 @@ size_t TcpClient::FlushSendQueue(void)
 
        if (rc <= 0) {
                HandleSocketError(SocketException("send() failed", GetError()));
-               return 0;
+               return;
        }
 
        m_SendQueue->Read(NULL, rc);
@@ -148,46 +137,31 @@ FIFO::Ptr TcpClient::GetRecvQueue(void)
        return m_RecvQueue;
 }
 
-size_t TcpClient::FillRecvQueue(void)
+void TcpClient::HandleReadable(void)
 {
-       int rc;
-
-       size_t bufferSize = FIFO::BlockSize / 2;
-       char *buffer = (char *)m_RecvQueue->GetWriteBuffer(&bufferSize);
-       rc = recv(GetFD(), buffer, bufferSize, 0);
-
-#ifdef _WIN32
-       if (rc < 0 && WSAGetLastError() == WSAEWOULDBLOCK)
-#else /* _WIN32 */
-       if (rc < 0 && errno == EAGAIN)
-#endif /* _WIN32 */
-               return 0;
+       for (;;) {
+               size_t bufferSize = FIFO::BlockSize / 2;
+               char *buffer = (char *)m_RecvQueue->GetWriteBuffer(&bufferSize);
+               int rc = recv(GetFD(), buffer, bufferSize, 0);
+
+       #ifdef _WIN32
+               if (rc < 0 && WSAGetLastError() == WSAEWOULDBLOCK)
+       #else /* _WIN32 */
+               if (rc < 0 && errno == EAGAIN)
+       #endif /* _WIN32 */
+                       return;
+
+               if (rc <= 0) {
+                       HandleSocketError(SocketException("recv() failed", GetError()));
+                       return;
+               }
 
-       if (rc <= 0) {
-               HandleSocketError(SocketException("recv() failed", GetError()));
-               return 0;
+               m_RecvQueue->Write(NULL, rc);
        }
 
-       m_RecvQueue->Write(NULL, rc);
-
-       return rc;
-}
-
-/**
- * Processes data that is available for this socket.
- */
-void TcpClient::ReadableEventHandler(void)
-{
-       if (FillRecvQueue() > 0)
-               OnDataAvailable(GetSelf());
-}
-
-/**
- * Processes data that can be written for this socket.
- */
-void TcpClient::WritableEventHandler(void)
-{
-       FlushSendQueue();
+       Event::Ptr ev = boost::make_shared<Event>();
+       ev->OnEventDelivered.connect(boost::bind(boost::ref(OnDataAvailable), GetSelf()));
+       Event::Post(ev);
 }
 
 /**
index 048e8dad692765649e958dabfde7b984ca4b97ab..479d61734b0400ebc3c01079775ca8388fcfa377 100644 (file)
@@ -51,29 +51,25 @@ public:
 
        TcpClientRole GetRole(void) const;
 
-       virtual void Start(void);
-
        void Connect(const string& node, const string& service);
 
        FIFO::Ptr GetSendQueue(void);
        FIFO::Ptr GetRecvQueue(void);
 
+       boost::signal<void (const TcpClient::Ptr&)> OnDataAvailable;
+
+protected:
        virtual bool WantsToRead(void) const;
        virtual bool WantsToWrite(void) const;
 
-       boost::signal<void (const TcpClient::Ptr&)> OnDataAvailable;
+       virtual void HandleReadable(void);
+       virtual void HandleWritable(void);
 
 private:
        TcpClientRole m_Role;
 
        FIFO::Ptr m_SendQueue;
        FIFO::Ptr m_RecvQueue;
-
-       virtual size_t FillRecvQueue(void);
-       virtual size_t FlushSendQueue(void);
-
-       void ReadableEventHandler(void);
-       void WritableEventHandler(void);
 };
 
 /**
index fb559667b41044e7a13900f09d247d65d4ae28f4..c4b863e54163a2c8d7107b332e4e4ac1f3f7e178 100644 (file)
@@ -34,7 +34,7 @@ TcpServer::TcpServer(void)
  *
  * @param clientFactory The client factory function.
  */
-void TcpServer::SetClientFactory(function<TcpClient::Ptr()> clientFactory)
+void TcpServer::SetClientFactory(function<TcpClient::Ptr(SOCKET)> clientFactory)
 {
        m_ClientFactory = clientFactory;
 }
@@ -44,21 +44,11 @@ void TcpServer::SetClientFactory(function<TcpClient::Ptr()> clientFactory)
  *
  * @returns The client factory function.
  */
-function<TcpClient::Ptr()> TcpServer::GetFactoryFunction(void) const
+function<TcpClient::Ptr(SOCKET)> TcpServer::GetFactoryFunction(void) const
 {
        return m_ClientFactory;
 }
 
-/**
- * Registers the TCP server and starts processing events for it.
- */
-void TcpServer::Start(void)
-{
-       TcpSocket::Start();
-
-       OnReadable.connect(boost::bind(&TcpServer::ReadableEventHandler, this));
-}
-
 /**
  * Starts listening for incoming client connections.
  */
@@ -71,11 +61,21 @@ void TcpServer::Listen(void)
        }
 }
 
+/**
+ * Checks whether the TCP server wants to read (i.e. accept new clients).
+ *
+ * @returns true
+ */
+bool TcpServer::WantsToRead(void) const
+{
+       return true;
+}
+
 /**
  * Accepts a new client and creates a new client object for it
  * using the client factory function.
  */
-void TcpServer::ReadableEventHandler(void)
+void TcpServer::HandleReadable(void)
 {
        int fd;
        sockaddr_storage addr;
@@ -89,19 +89,9 @@ void TcpServer::ReadableEventHandler(void)
                return;
        }
 
-       TcpClient::Ptr client = m_ClientFactory();
-       client->SetFD(fd);
-       client->Start();
+       TcpClient::Ptr client = m_ClientFactory(fd);
 
-       OnNewClient(GetSelf(), client);
-}
-
-/**
- * Checks whether the TCP server wants to read (i.e. accept new clients).
- *
- * @returns true
- */
-bool TcpServer::WantsToRead(void) const
-{
-       return true;
+       Event::Ptr ev = boost::make_shared<Event>();
+       ev->OnEventDelivered.connect(boost::bind(boost::ref(OnNewClient), GetSelf(), client));
+       Event::Post(ev);
 }
index 247f66a787e25f999b52e1ca7fa8c9edf112a527..fa64a34e4d0c1303abf5bc3d4266b04cd901a131 100644 (file)
@@ -37,21 +37,20 @@ public:
 
        TcpServer(void);
 
-       void SetClientFactory(function<TcpClient::Ptr()> function);
-       function<TcpClient::Ptr()> GetFactoryFunction(void) const;
-
-       virtual void Start();
+       void SetClientFactory(function<TcpClient::Ptr(SOCKET)> function);
+       function<TcpClient::Ptr(SOCKET)> GetFactoryFunction(void) const;
 
        void Listen(void);
 
        boost::signal<void (const TcpServer::Ptr&, const TcpClient::Ptr&)> OnNewClient;
 
+protected:
        virtual bool WantsToRead(void) const;
 
-private:
-       void ReadableEventHandler(void);
+       virtual void HandleReadable(void);
 
-       function<TcpClient::Ptr()> m_ClientFactory;
+private:
+       function<TcpClient::Ptr(SOCKET)> m_ClientFactory;
 };
 
 }
index 0c1ab644012ed468b90b2d6f219788527f306903..3ac84a96ec1bc2c69d08dff30944216cd381e44b 100644 (file)
@@ -37,43 +37,8 @@ TlsClient::TlsClient(TcpClientRole role, shared_ptr<SSL_CTX> sslContext) : TcpCl
        m_BlockWrite = false;
 }
 
-/**
- * Takes a certificate as an argument. Does nothing.
- *
- * @param certificate An X509 certificate.
- */
-void TlsClient::NullCertificateDeleter(X509 *certificate)
-{
-       /* Nothing to do here. */
-}
-
-/**
- * Retrieves the X509 certficate for this client.
- *
- * @returns The X509 certificate.
- */
-shared_ptr<X509> TlsClient::GetClientCertificate(void) const
-{
-       return shared_ptr<X509>(SSL_get_certificate(m_SSL.get()), &TlsClient::NullCertificateDeleter);
-}
-
-/**
- * Retrieves the X509 certficate for the peer.
- *
- * @returns The X509 certificate.
- */
-shared_ptr<X509> TlsClient::GetPeerCertificate(void) const
-{
-       return shared_ptr<X509>(SSL_get_peer_certificate(m_SSL.get()), X509_free);
-}
-
-/**
- * Registers the TLS socket and starts processing events for it.
- */
 void TlsClient::Start(void)
 {
-       TcpClient::Start();
-
        m_SSL = shared_ptr<SSL>(SSL_new(m_SSLContext.get()), SSL_free);
 
        if (!m_SSL)
@@ -101,12 +66,48 @@ void TlsClient::Start(void)
                SSL_set_connect_state(m_SSL.get());
 
        SSL_do_handshake(m_SSL.get());
+
+       Socket::Start();
+}
+
+/**
+ * Takes a certificate as an argument. Does nothing.
+ *
+ * @param certificate An X509 certificate.
+ */
+void TlsClient::NullCertificateDeleter(X509 *certificate)
+{
+       /* Nothing to do here. */
+}
+
+/**
+ * Retrieves the X509 certficate for this client.
+ *
+ * @returns The X509 certificate.
+ */
+shared_ptr<X509> TlsClient::GetClientCertificate(void) const
+{
+       mutex::scoped_lock lock(GetMutex());
+
+       return shared_ptr<X509>(SSL_get_certificate(m_SSL.get()), &TlsClient::NullCertificateDeleter);
+}
+
+/**
+ * Retrieves the X509 certficate for the peer.
+ *
+ * @returns The X509 certificate.
+ */
+shared_ptr<X509> TlsClient::GetPeerCertificate(void) const
+{
+       mutex::scoped_lock lock(GetMutex());
+
+       return shared_ptr<X509>(SSL_get_peer_certificate(m_SSL.get()), X509_free);
 }
 
 /**
  * Processes data that is available for this socket.
  */
-size_t TlsClient::FillRecvQueue(void)
+void TlsClient::HandleReadable(void)
 {
        int result;
 
@@ -116,10 +117,9 @@ size_t TlsClient::FillRecvQueue(void)
        result = 0;
 
        for (;;) {
-               int rc;
                size_t bufferSize = FIFO::BlockSize / 2;
                char *buffer = (char *)GetRecvQueue()->GetWriteBuffer(&bufferSize);
-               rc = SSL_read(m_SSL.get(), buffer, bufferSize);
+               int rc = SSL_read(m_SSL.get(), buffer, bufferSize);
 
                if (rc <= 0) {
                        switch (SSL_get_error(m_SSL.get(), rc)) {
@@ -127,36 +127,35 @@ size_t TlsClient::FillRecvQueue(void)
                                        m_BlockRead = true;
                                        /* fall through */
                                case SSL_ERROR_WANT_READ:
-                                       return result;
+                                       goto post_event;
                                case SSL_ERROR_ZERO_RETURN:
-                                       Close();
-                                       return result;
+                                       CloseInternal(false);
+                                       goto post_event;
                                default:
                                        HandleSocketError(OpenSSLException(
                                            "SSL_read failed", ERR_get_error()));
-                                       return result;
+                                       goto post_event;
                        }
                }
 
                GetRecvQueue()->Write(NULL, rc);
-
-               result += rc;
        }
 
-       return result;
+post_event:
+       Event::Ptr ev = boost::make_shared<Event>();
+       ev->OnEventDelivered.connect(boost::bind(boost::ref(OnDataAvailable), GetSelf()));
+       Event::Post(ev);
 }
 
 /**
  * Processes data that can be written for this socket.
  */
-size_t TlsClient::FlushSendQueue(void)
+void TlsClient::HandleWritable(void)
 {
-       int rc;
-
        m_BlockRead = false;
        m_BlockWrite = false;
 
-       rc = SSL_write(m_SSL.get(), (const char *)GetSendQueue()->GetReadBuffer(), GetSendQueue()->GetSize());
+       int rc = SSL_write(m_SSL.get(), (const char *)GetSendQueue()->GetReadBuffer(), GetSendQueue()->GetSize());
 
        if (rc <= 0) {
                switch (SSL_get_error(m_SSL.get(), rc)) {
@@ -164,20 +163,18 @@ size_t TlsClient::FlushSendQueue(void)
                                m_BlockWrite = true;
                                /* fall through */
                        case SSL_ERROR_WANT_WRITE:
-                               return 0;
+                               return;
                        case SSL_ERROR_ZERO_RETURN:
-                               Close();
-                               return 0;
+                               CloseInternal(false);
+                               return;
                        default:
                                HandleSocketError(OpenSSLException(
                                    "SSL_write failed", ERR_get_error()));
-                               return 0;
+                               return;
                }
        }
 
        GetSendQueue()->Read(NULL, rc);
-
-       return rc;
 }
 
 /**
@@ -249,12 +246,29 @@ int TlsClient::SSLVerifyCertificate(int ok, X509_STORE_CTX *x509Context)
        SSL *ssl = (SSL *)X509_STORE_CTX_get_ex_data(x509Context, SSL_get_ex_data_X509_STORE_CTX_idx());
        TlsClient *client = (TlsClient *)SSL_get_ex_data(ssl, m_SSLIndex);
 
+       assert(client->GetMutex().active_count);
+
        if (client == NULL)
                return 0;
 
-       bool valid = (ok != 0);
+       return client->ValidateCertificateInternal(ok, x509Context);
+}
+
+int TlsClient::ValidateCertificateInternal(int ok, X509_STORE_CTX *x509Context)
+{
        shared_ptr<X509> x509Certificate = shared_ptr<X509>(x509Context->cert, &TlsClient::NullCertificateDeleter);
-       client->OnVerifyCertificate(client->GetSelf(), &valid, x509Context, x509Certificate);
+       bool valid = ValidateCertificate((ok != 0), x509Context, x509Certificate);
+
+       if (valid) {
+               Event::Ptr ev = boost::make_shared<Event>();
+               ev->OnEventDelivered.connect(boost::bind(boost::ref(OnCertificateValidated), GetSelf()));
+               Event::Post(ev);
+       }
 
        return valid ? 1 : 0;
 }
+
+bool TlsClient::ValidateCertificate(bool ok, X509_STORE_CTX *x509Context, const shared_ptr<X509>& x509Certificate)
+{
+       return ok;
+}
index ff44cc813400bc30dbc804653ff9d633bc39ee21..87de66ef85bf1b6f7aa98d1e67d5b0e655ae40e2 100644 (file)
@@ -33,18 +33,23 @@ class I2_BASE_API TlsClient : public TcpClient
 public:
        TlsClient(TcpClientRole role, shared_ptr<SSL_CTX> sslContext);
 
+       virtual void Start(void);
+
        shared_ptr<X509> GetClientCertificate(void) const;
        shared_ptr<X509> GetPeerCertificate(void) const;
 
-       virtual void Start(void);
+       boost::signal<void (const TlsClient::Ptr&)> OnCertificateValidated;
+
+protected:
+       void HandleSSLError(void);
 
        virtual bool WantsToRead(void) const;
        virtual bool WantsToWrite(void) const;
 
-       boost::signal<void (const TlsClient::Ptr&, bool *, X509_STORE_CTX *, const shared_ptr<X509>&)> OnVerifyCertificate;
+       virtual void HandleReadable(void);
+       virtual void HandleWritable(void);
 
-protected:
-       void HandleSSLError(void);
+       virtual bool ValidateCertificate(bool ok, X509_STORE_CTX *x509Context, const shared_ptr<X509>& x509Certificate);
 
 private:
        shared_ptr<SSL_CTX> m_SSLContext;
@@ -56,14 +61,12 @@ private:
        static int m_SSLIndex;
        static bool m_SSLIndexInitialized;
 
-       virtual size_t FillRecvQueue(void);
-       virtual size_t FlushSendQueue(void);
-
        virtual void CloseInternal(bool from_dtor);
 
        static void NullCertificateDeleter(X509 *certificate);
 
        static int SSLVerifyCertificate(int ok, X509_STORE_CTX *x509Context);
+       int ValidateCertificateInternal(int ok, X509_STORE_CTX *x509Context);
 };
 
 TcpClient::Ptr TlsClientFactory(TcpClientRole role, shared_ptr<SSL_CTX> sslContext);
index 6e894d9de28e7264cbd1241ca7c57908411883ea..67d311df713a0cdce99c6223e32cbbebe73c6086 100644 (file)
@@ -54,6 +54,8 @@
       <WarningLevel>Level3</WarningLevel>
       <Optimization>Disabled</Optimization>
       <PreprocessorDefinitions>WIN32;_DEBUG;_WINDOWS;_USRDLL;CHECKER_EXPORTS;%(PreprocessorDefinitions)</PreprocessorDefinitions>
+      <MultiProcessorCompilation>true</MultiProcessorCompilation>
+      <MinimalRebuild>false</MinimalRebuild>
     </ClCompile>
     <Link>
       <SubSystem>Windows</SubSystem>
@@ -70,6 +72,8 @@
       <FunctionLevelLinking>true</FunctionLevelLinking>
       <IntrinsicFunctions>true</IntrinsicFunctions>
       <PreprocessorDefinitions>WIN32;NDEBUG;_WINDOWS;_USRDLL;CHECKER_EXPORTS;%(PreprocessorDefinitions)</PreprocessorDefinitions>
+      <MultiProcessorCompilation>true</MultiProcessorCompilation>
+      <MinimalRebuild>false</MinimalRebuild>
     </ClCompile>
     <Link>
       <SubSystem>Windows</SubSystem>
index 01e6e01d4813afa966111b5be308ed0c6b056d65..a4061b5a1c35030765d57686b5780cf7ae49d5eb 100644 (file)
@@ -59,6 +59,8 @@
       <Optimization>Disabled</Optimization>
       <PreprocessorDefinitions>WIN32;_DEBUG;_LIB;%(PreprocessorDefinitions)</PreprocessorDefinitions>
       <WarningLevel>Level3</WarningLevel>
+      <MultiProcessorCompilation>true</MultiProcessorCompilation>
+      <MinimalRebuild>false</MinimalRebuild>
     </ClCompile>
     <Link>
       <SubSystem>Windows</SubSystem>
@@ -80,6 +82,8 @@
       <PreprocessorDefinitions>WIN32;NDEBUG;_LIB;%(PreprocessorDefinitions)</PreprocessorDefinitions>
       <FavorSizeOrSpeed>Speed</FavorSizeOrSpeed>
       <WarningLevel>Level3</WarningLevel>
+      <MultiProcessorCompilation>true</MultiProcessorCompilation>
+      <MinimalRebuild>false</MinimalRebuild>
     </ClCompile>
     <Link>
       <SubSystem>Windows</SubSystem>
index 47805d16a83e000196778cde1ad1a6b1f0528935..eae5056e5c97b078d69ea71937dd588db55d372f 100644 (file)
@@ -61,6 +61,8 @@
       <Optimization>Disabled</Optimization>
       <PreprocessorDefinitions>WIN32;_DEBUG;_WINDOWS;_USRDLL;CONFIGCOMPONENT_EXPORTS;%(PreprocessorDefinitions)</PreprocessorDefinitions>
       <WarningLevel>Level3</WarningLevel>
+      <MultiProcessorCompilation>true</MultiProcessorCompilation>
+      <MinimalRebuild>false</MinimalRebuild>
     </ClCompile>
     <Link>
       <SubSystem>Windows</SubSystem>
@@ -78,6 +80,8 @@
       <PreprocessorDefinitions>WIN32;NDEBUG;_WINDOWS;_USRDLL;CONFIGCOMPONENT_EXPORTS;%(PreprocessorDefinitions)</PreprocessorDefinitions>
       <FavorSizeOrSpeed>Speed</FavorSizeOrSpeed>
       <WarningLevel>Level3</WarningLevel>
+      <MultiProcessorCompilation>true</MultiProcessorCompilation>
+      <MinimalRebuild>false</MinimalRebuild>
     </ClCompile>
     <Link>
       <SubSystem>Windows</SubSystem>
index cfe75bea348d7dff8c4a60f538bc16809be33bb0..8cb021043f6ff5c31937290f0988c44f1e9b08c0 100644 (file)
@@ -54,6 +54,8 @@
       <WarningLevel>Level3</WarningLevel>
       <Optimization>Disabled</Optimization>
       <PreprocessorDefinitions>WIN32;_DEBUG;_WINDOWS;_USRDLL;DELEGATION_EXPORTS;%(PreprocessorDefinitions)</PreprocessorDefinitions>
+      <MultiProcessorCompilation>true</MultiProcessorCompilation>
+      <MinimalRebuild>false</MinimalRebuild>
     </ClCompile>
     <Link>
       <SubSystem>Windows</SubSystem>
@@ -70,6 +72,8 @@
       <FunctionLevelLinking>true</FunctionLevelLinking>
       <IntrinsicFunctions>true</IntrinsicFunctions>
       <PreprocessorDefinitions>WIN32;NDEBUG;_WINDOWS;_USRDLL;DELEGATION_EXPORTS;%(PreprocessorDefinitions)</PreprocessorDefinitions>
+      <MultiProcessorCompilation>true</MultiProcessorCompilation>
+      <MinimalRebuild>false</MinimalRebuild>
     </ClCompile>
     <Link>
       <SubSystem>Windows</SubSystem>
index db011539a637e5c10684c4f03f26eb3263360732..653970c341571b1cf7d9e25e97f0ff08c524c3ce 100644 (file)
@@ -54,6 +54,8 @@
       <Optimization>Disabled</Optimization>
       <PreprocessorDefinitions>WIN32;_DEBUG;_WINDOWS;_USRDLL;DEMO_EXPORTS;%(PreprocessorDefinitions)</PreprocessorDefinitions>
       <WarningLevel>Level3</WarningLevel>
+      <MultiProcessorCompilation>true</MultiProcessorCompilation>
+      <MinimalRebuild>false</MinimalRebuild>
     </ClCompile>
     <Link>
       <SubSystem>Windows</SubSystem>
@@ -70,6 +72,8 @@
       <IntrinsicFunctions>true</IntrinsicFunctions>
       <PreprocessorDefinitions>WIN32;NDEBUG;_WINDOWS;_USRDLL;DEMO_EXPORTS;%(PreprocessorDefinitions)</PreprocessorDefinitions>
       <WarningLevel>Level3</WarningLevel>
+      <MultiProcessorCompilation>true</MultiProcessorCompilation>
+      <MinimalRebuild>false</MinimalRebuild>
     </ClCompile>
     <Link>
       <SubSystem>Windows</SubSystem>
index 4e6a6ed42d012249f8e7aa69a0775c5f94df0aba..ee8f0c6b80c00645fba8c4191c828f068156e8f4 100644 (file)
@@ -54,6 +54,8 @@
       <Optimization>Disabled</Optimization>
       <PreprocessorDefinitions>WIN32;_DEBUG;_WINDOWS;_USRDLL;DISCOVERY_EXPORTS;%(PreprocessorDefinitions)</PreprocessorDefinitions>
       <WarningLevel>Level3</WarningLevel>
+      <MultiProcessorCompilation>true</MultiProcessorCompilation>
+      <MinimalRebuild>false</MinimalRebuild>
     </ClCompile>
     <Link>
       <SubSystem>Windows</SubSystem>
@@ -70,6 +72,8 @@
       <IntrinsicFunctions>true</IntrinsicFunctions>
       <PreprocessorDefinitions>WIN32;NDEBUG;_WINDOWS;_USRDLL;DISCOVERY_EXPORTS;%(PreprocessorDefinitions)</PreprocessorDefinitions>
       <WarningLevel>Level3</WarningLevel>
+      <MultiProcessorCompilation>true</MultiProcessorCompilation>
+      <MinimalRebuild>false</MinimalRebuild>
     </ClCompile>
     <Link>
       <SubSystem>Windows</SubSystem>
index 4d4f244d34a48beb1dafc76f52b3db880f11e449..f9ae0a29bea4259c0fc53a3db627fda5078dc1cd 100644 (file)
@@ -87,6 +87,8 @@
       <WarningLevel>Level3</WarningLevel>
       <Optimization>Disabled</Optimization>
       <PreprocessorDefinitions>_WINDLL;I2_DYN_BUILD;_DEBUG;%(PreprocessorDefinitions)</PreprocessorDefinitions>
+      <MultiProcessorCompilation>true</MultiProcessorCompilation>
+      <MinimalRebuild>false</MinimalRebuild>
     </ClCompile>
     <Link>
       <SubSystem>Windows</SubSystem>
       <FunctionLevelLinking>true</FunctionLevelLinking>
       <IntrinsicFunctions>true</IntrinsicFunctions>
       <PreprocessorDefinitions>_WINDLL;I2_DYN_BUILD;%(PreprocessorDefinitions)</PreprocessorDefinitions>
+      <MultiProcessorCompilation>true</MultiProcessorCompilation>
+      <MinimalRebuild>false</MinimalRebuild>
     </ClCompile>
     <Link>
       <SubSystem>Windows</SubSystem>
index 4a7d9f799c8cf1b3367a6aabf7e2f126831e96fd..495ff32a8f79590ddfca33f9dc4cdbbee09159fa 100644 (file)
@@ -53,6 +53,8 @@
       </PrecompiledHeader>
       <WarningLevel>Level3</WarningLevel>
       <Optimization>Disabled</Optimization>
+      <MultiProcessorCompilation>true</MultiProcessorCompilation>
+      <MinimalRebuild>false</MinimalRebuild>
     </ClCompile>
     <Link>
       <SubSystem>Console</SubSystem>
@@ -69,6 +71,8 @@
       <FunctionLevelLinking>true</FunctionLevelLinking>
       <IntrinsicFunctions>true</IntrinsicFunctions>
       <PreprocessorDefinitions>WIN32;NDEBUG;_CONSOLE;%(PreprocessorDefinitions)</PreprocessorDefinitions>
+      <MultiProcessorCompilation>true</MultiProcessorCompilation>
+      <MinimalRebuild>false</MinimalRebuild>
     </ClCompile>
     <Link>
       <SubSystem>Console</SubSystem>
index 16a24dedff496f67b7c4ab3f2c9567f32b74886d..86bc63b18422f7e0abac6e0a994a9d0683e8a19b 100644 (file)
@@ -58,6 +58,8 @@
       <Optimization>Disabled</Optimization>
       <PreprocessorDefinitions>WIN32;I2_ICINGALAUNCHER_BUILD;_DEBUG;_CONSOLE;%(PreprocessorDefinitions)</PreprocessorDefinitions>
       <WarningLevel>Level3</WarningLevel>
+      <MultiProcessorCompilation>true</MultiProcessorCompilation>
+      <MinimalRebuild>false</MinimalRebuild>
     </ClCompile>
     <Link>
       <SubSystem>Console</SubSystem>
@@ -75,6 +77,8 @@
       <PreprocessorDefinitions>WIN32;I2_ICINGALAUNCHER_BUILD;NDEBUG;_CONSOLE;%(PreprocessorDefinitions)</PreprocessorDefinitions>
       <FavorSizeOrSpeed>Speed</FavorSizeOrSpeed>
       <WarningLevel>Level3</WarningLevel>
+      <MultiProcessorCompilation>true</MultiProcessorCompilation>
+      <MinimalRebuild>false</MinimalRebuild>
     </ClCompile>
     <Link>
       <SubSystem>Console</SubSystem>
index b4d5c912331f5231d3cd56c4b50761521179cef7..84cb4463ccede529c214163cb884150810484633 100644 (file)
@@ -136,6 +136,7 @@ void EndpointManager::NewClientHandler(const TcpClient::Ptr& client)
 
        JsonRpcEndpoint::Ptr endpoint = boost::make_shared<JsonRpcEndpoint>();
        endpoint->SetClient(static_pointer_cast<JsonRpcClient>(client));
+       client->Start();
        RegisterEndpoint(endpoint);
 }
 
index 63ead4e0bcbf23d146cf46842e0b74d9dc884311..78bb39497b7696668a1bbef0d6382b172d4cc183 100644 (file)
@@ -84,6 +84,8 @@
       <Optimization>Disabled</Optimization>
       <PreprocessorDefinitions>WIN32;I2_ICINGA_BUILD;_DEBUG;_CONSOLE;%(PreprocessorDefinitions)</PreprocessorDefinitions>
       <WarningLevel>Level3</WarningLevel>
+      <MultiProcessorCompilation>true</MultiProcessorCompilation>
+      <MinimalRebuild>false</MinimalRebuild>
     </ClCompile>
     <Link>
       <SubSystem>Console</SubSystem>
       <PreprocessorDefinitions>WIN32;I2_ICINGA_BUILD;NDEBUG;_CONSOLE;%(PreprocessorDefinitions)</PreprocessorDefinitions>
       <FavorSizeOrSpeed>Speed</FavorSizeOrSpeed>
       <WarningLevel>Level3</WarningLevel>
+      <MultiProcessorCompilation>true</MultiProcessorCompilation>
+      <MinimalRebuild>false</MinimalRebuild>
     </ClCompile>
     <Link>
       <SubSystem>Console</SubSystem>
index 6aa1dbd6b345945f856e03d111598a1ad7a98530..8815164519d570b7c6231e0d1ea63f0f7580b0e3 100644 (file)
@@ -53,7 +53,7 @@ void JsonRpcEndpoint::SetClient(JsonRpcClient::Ptr client)
        client->OnNewMessage.connect(boost::bind(&JsonRpcEndpoint::NewMessageHandler, this, _2));
        client->OnClosed.connect(boost::bind(&JsonRpcEndpoint::ClientClosedHandler, this));
        client->OnError.connect(boost::bind(&JsonRpcEndpoint::ClientErrorHandler, this, _2));
-       client->OnVerifyCertificate.connect(boost::bind(&JsonRpcEndpoint::VerifyCertificateHandler, this, _2, _4));
+       client->OnCertificateValidated.connect(boost::bind(&JsonRpcEndpoint::CertificateValidatedHandler, this));
 }
 
 bool JsonRpcEndpoint::IsLocal(void) const
@@ -135,15 +135,13 @@ void JsonRpcEndpoint::ClientErrorHandler(const std::exception& ex)
        Application::Log(LogWarning, "jsonrpc", message.str());
 }
 
-void JsonRpcEndpoint::VerifyCertificateHandler(bool *valid, const shared_ptr<X509>& certificate)
+void JsonRpcEndpoint::CertificateValidatedHandler(void)
 {
-       if (certificate && *valid) {
-               string identity = Utility::GetCertificateCN(certificate);
+       string identity = Utility::GetCertificateCN(m_Client->GetPeerCertificate());
 
-               if (GetIdentity().empty() && !identity.empty()) {
-                       m_Identity = identity;
-                       GetEndpointManager()->RegisterEndpoint(GetSelf());
-               }
+       if (GetIdentity().empty() && !identity.empty()) {
+               m_Identity = identity;
+               GetEndpointManager()->RegisterEndpoint(GetSelf());
        }
 }
 
index 852dadf86a231ec56b2b9a74a03f141aa174685e..338bd2b45ec9aeb8ae5dbeba873b9484ca80960b 100644 (file)
@@ -64,7 +64,7 @@ private:
        void NewMessageHandler(const MessagePart& message);
        void ClientClosedHandler(void);
        void ClientErrorHandler(const std::exception& ex);
-       void VerifyCertificateHandler(bool *valid, const shared_ptr<X509>& certificate);
+       void CertificateValidatedHandler(void);
 };
 
 }
index de1f80346c01cb7aacb6085d94daddfc24873817..7ea643878437de0ce68863d89a2ba830ec6e49e4 100644 (file)
@@ -69,6 +69,8 @@
       <Optimization>Disabled</Optimization>
       <PreprocessorDefinitions>WIN32;I2_JSONRPC_BUILD;_DEBUG;_LIB;%(PreprocessorDefinitions)</PreprocessorDefinitions>
       <WarningLevel>Level3</WarningLevel>
+      <MultiProcessorCompilation>true</MultiProcessorCompilation>
+      <MinimalRebuild>false</MinimalRebuild>
     </ClCompile>
     <Link>
       <SubSystem>Windows</SubSystem>
@@ -90,6 +92,8 @@
       <PreprocessorDefinitions>WIN32;I2_JSONRPC_BUILD;NDEBUG;_LIB;%(PreprocessorDefinitions)</PreprocessorDefinitions>
       <FavorSizeOrSpeed>Speed</FavorSizeOrSpeed>
       <WarningLevel>Level3</WarningLevel>
+      <MultiProcessorCompilation>true</MultiProcessorCompilation>
+      <MinimalRebuild>false</MinimalRebuild>
     </ClCompile>
     <Link>
       <SubSystem>Windows</SubSystem>
index c95b049b119b40ea6a26b0ce31eac2141050a61d..e457b9617d63e769cd6d9d271400e61c3a624c2e 100644 (file)
@@ -40,6 +40,8 @@ JsonRpcClient::JsonRpcClient(TcpClientRole role, shared_ptr<SSL_CTX> sslContext)
  */
 void JsonRpcClient::SendMessage(const MessagePart& message)
 {
+       mutex::scoped_lock lock(GetMutex());
+
        Netstring::WriteStringToFIFO(GetSendQueue(), message.ToJsonString());
 }
 
@@ -53,13 +55,17 @@ void JsonRpcClient::DataAvailableHandler(void)
                        string jsonString;
                        MessagePart message;
 
-                       if (!Netstring::ReadStringFromFIFO(GetRecvQueue(), &jsonString))
-                               return;
+                       {
+                               mutex::scoped_lock lock(GetMutex());
+
+                               if (!Netstring::ReadStringFromFIFO(GetRecvQueue(), &jsonString))
+                                       return;
+                       }
 
                        message = MessagePart(jsonString);
                        OnNewMessage(GetSelf(), message);
-               } catch (const Exception& ex) {
-                       Application::Log(LogCritical, "jsonrpc", "Exception while processing message from JSON-RPC client: " + string(ex.GetMessage()));
+               } catch (const std::exception& ex) {
+                       Application::Log(LogCritical, "jsonrpc", "Exception while processing message from JSON-RPC client: " + string(ex.what()));
                        Close();
 
                        return;
@@ -70,11 +76,14 @@ void JsonRpcClient::DataAvailableHandler(void)
 /**
  * Factory function for JSON-RPC clients.
  *
+ * @param fd The file descriptor.
  * @param role The role of the underlying TCP client.
  * @param sslContext SSL context for the TLS connection.
  * @returns A new JSON-RPC client.
  */
-JsonRpcClient::Ptr icinga::JsonRpcClientFactory(TcpClientRole role, shared_ptr<SSL_CTX> sslContext)
+JsonRpcClient::Ptr icinga::JsonRpcClientFactory(SOCKET fd, TcpClientRole role, shared_ptr<SSL_CTX> sslContext)
 {
-       return boost::make_shared<JsonRpcClient>(role, sslContext);
+       JsonRpcClient::Ptr client = boost::make_shared<JsonRpcClient>(role, sslContext);
+       client->SetFD(fd);
+       return client;
 }
index 00a16ada1de18e2a203ad4289d301ef18ccb64dc..6df837d2f95ff4e8c65fb87c9fb336782555b37a 100644 (file)
@@ -42,9 +42,11 @@ public:
 
 private:
        void DataAvailableHandler(void);
+
+       friend JsonRpcClient::Ptr JsonRpcClientFactory(SOCKET fd, TcpClientRole role, shared_ptr<SSL_CTX> sslContext);
 };
 
-JsonRpcClient::Ptr JsonRpcClientFactory(TcpClientRole role, shared_ptr<SSL_CTX> sslContext);
+JsonRpcClient::Ptr JsonRpcClientFactory(SOCKET fd, TcpClientRole role, shared_ptr<SSL_CTX> sslContext);
 
 }
 
index a39d2c594a47b641c649b67b5d6d6282326ba3cc..c5f7e504ea1fed18b463a064a4f30c1bceccbc9a 100644 (file)
@@ -28,5 +28,5 @@ using namespace icinga;
  */
 JsonRpcServer::JsonRpcServer(shared_ptr<SSL_CTX> sslContext)
 {
-       SetClientFactory(boost::bind(&JsonRpcClientFactory, RoleInbound, sslContext));
+       SetClientFactory(boost::bind(&JsonRpcClientFactory, _1, RoleInbound, sslContext));
 }
index 3568a5482db479b1fb755858387eb41fd9d558e2..b045931a1c32a02ee3a2989c67f90d014d4243cd 100644 (file)
@@ -51,6 +51,8 @@
       <Optimization>Disabled</Optimization>
       <PreprocessorDefinitions>WIN32;_DEBUG;_LIB;%(PreprocessorDefinitions)</PreprocessorDefinitions>
       <WarningLevel>Level3</WarningLevel>
+      <MultiProcessorCompilation>true</MultiProcessorCompilation>
+      <MinimalRebuild>false</MinimalRebuild>
     </ClCompile>
     <Link>
       <SubSystem>Windows</SubSystem>
@@ -67,6 +69,8 @@
       <PreprocessorDefinitions>WIN32;NDEBUG;_LIB;%(PreprocessorDefinitions)</PreprocessorDefinitions>
       <FavorSizeOrSpeed>Speed</FavorSizeOrSpeed>
       <WarningLevel>Level3</WarningLevel>
+      <MultiProcessorCompilation>true</MultiProcessorCompilation>
+      <MinimalRebuild>false</MinimalRebuild>
     </ClCompile>
     <Link>
       <SubSystem>Windows</SubSystem>
index 632e28d77a49e70f5af5d7e534bd8194692289f0..b6766c1e9170e0b56cb5397d38499bbc4dc02fc7 100644 (file)
@@ -51,6 +51,8 @@
       <Optimization>Disabled</Optimization>
       <PreprocessorDefinitions>WIN32;_DEBUG;_LIB;%(PreprocessorDefinitions)</PreprocessorDefinitions>
       <WarningLevel>Level3</WarningLevel>
+      <MultiProcessorCompilation>true</MultiProcessorCompilation>
+      <MinimalRebuild>false</MinimalRebuild>
     </ClCompile>
     <Link>
       <SubSystem>Windows</SubSystem>
@@ -66,6 +68,8 @@
       <IntrinsicFunctions>true</IntrinsicFunctions>
       <PreprocessorDefinitions>WIN32;NDEBUG;_LIB;%(PreprocessorDefinitions)</PreprocessorDefinitions>
       <WarningLevel>Level3</WarningLevel>
+      <MultiProcessorCompilation>true</MultiProcessorCompilation>
+      <MinimalRebuild>false</MinimalRebuild>
     </ClCompile>
     <Link>
       <SubSystem>Windows</SubSystem>