]> granicus.if.org Git - icinga2/commitdiff
Bugfixes for the replication component.
authorGunnar Beutner <gunnar@beutner.name>
Fri, 5 Apr 2013 10:09:26 +0000 (12:09 +0200)
committerGunnar Beutner <gunnar@beutner.name>
Fri, 5 Apr 2013 10:09:26 +0000 (12:09 +0200)
components/replication/replicationcomponent.cpp
lib/base/bufferedstream.cpp
lib/base/bufferedstream.h
lib/base/tlsstream.cpp
lib/base/tlsutility.cpp
lib/remoting/endpoint.cpp
lib/remoting/endpoint.h
lib/remoting/endpointmanager.cpp

index b8d2525930285e81edee9d5a2c90ec5a95b49d9d..a347fc6628484a2f0a0bb9a8b32d45aa2c5fe83b 100644 (file)
@@ -119,7 +119,7 @@ RequestMessage ReplicationComponent::MakeObjectMessage(const DynamicObject::Ptr&
        msg.SetParams(params);
 
        params.Set("name", object->GetName());
-       params.Set("type", object->GetType());
+       params.Set("type", object->GetType()->GetName());
 
        String source = object->GetSource();
 
@@ -164,7 +164,7 @@ void ReplicationComponent::TransactionClosingHandler(double tx, const std::set<D
 
        std::ostringstream msgbuf;
        msgbuf << "Sending " << modifiedObjects.size() << " replication updates.";
-       Log(LogDebug, "replication", msgbuf.str());
+       Log(LogInformation, "replication", msgbuf.str());
 
        BOOST_FOREACH(const DynamicObject::WeakPtr& wobject, modifiedObjects) {
                DynamicObject::Ptr object = wobject.lock();
index 6f3db13746a08555bbbed622c9c169f3633601e3..4f9bc7bcdeb3795b0df69b20badf61b0725210fa 100644 (file)
@@ -27,7 +27,8 @@
 using namespace icinga;
 
 BufferedStream::BufferedStream(const Stream::Ptr& innerStream)
-       : m_InnerStream(innerStream), m_RecvQ(boost::make_shared<FIFO>()), m_SendQ(boost::make_shared<FIFO>())
+       : m_InnerStream(innerStream), m_RecvQ(boost::make_shared<FIFO>()), m_SendQ(boost::make_shared<FIFO>()),
+         m_Exception(), m_Blocking(true)
 {
        boost::thread readThread(boost::bind(&BufferedStream::ReadThreadProc, this));
        readThread.detach();
@@ -52,11 +53,14 @@ void BufferedStream::ReadThreadProc(void)
                        m_ReadCV.notify_all();
                }
        } catch (const std::exception& ex) {
-               std::ostringstream msgbuf;
-               msgbuf << "Error for buffered stream (Read): " << boost::diagnostic_information(ex);
-               Log(LogWarning, "base", msgbuf.str());
+               {
+                       boost::mutex::scoped_lock lock(m_Mutex);
+
+                       if (!m_Exception)
+                               m_Exception = boost::current_exception();
 
-               Close();
+                       m_ReadCV.notify_all();
+               }
        }
 }
 
@@ -80,11 +84,14 @@ void BufferedStream::WriteThreadProc(void)
                        m_InnerStream->Write(buffer, rc);
                }
        } catch (const std::exception& ex) {
-               std::ostringstream msgbuf;
-               msgbuf << "Error for buffered stream (Write): " << boost::diagnostic_information(ex);
-               Log(LogWarning, "base", msgbuf.str());
+               {
+                       boost::mutex::scoped_lock lock(m_Mutex);
+
+                       if (!m_Exception)
+                               m_Exception = boost::current_exception();
 
-               Close();
+                       m_WriteCV.notify_all();
+               }
        }
 }
 
@@ -104,6 +111,13 @@ void BufferedStream::Close(void)
 size_t BufferedStream::Read(void *buffer, size_t count)
 {
        boost::mutex::scoped_lock lock(m_Mutex);
+
+       if (m_Blocking)
+               InternalWaitReadable(count, lock);
+
+       if (m_Exception)
+               boost::rethrow_exception(m_Exception);
+
        return m_RecvQ->Read(buffer, count);
 }
 
@@ -117,6 +131,10 @@ size_t BufferedStream::Read(void *buffer, size_t count)
 void BufferedStream::Write(const void *buffer, size_t count)
 {
        boost::mutex::scoped_lock lock(m_Mutex);
+
+       if (m_Exception)
+               boost::rethrow_exception(m_Exception);
+
        m_SendQ->Write(buffer, count);
        m_WriteCV.notify_all(); 
 }
@@ -125,9 +143,22 @@ void BufferedStream::WaitReadable(size_t count)
 {
        boost::mutex::scoped_lock lock(m_Mutex);
 
-       while (m_RecvQ->GetAvailableBytes() < count)
+       InternalWaitReadable(count, lock);
+}
+
+void BufferedStream::InternalWaitReadable(size_t count, boost::mutex::scoped_lock& lock)
+{
+       while (m_RecvQ->GetAvailableBytes() < count && !m_Exception)
                m_ReadCV.wait(lock);
 }
 
 void BufferedStream::WaitWritable(size_t count)
 { /* Nothing to do here. */ }
+
+void BufferedStream::MakeNonBlocking(void)
+{
+       boost::mutex::scoped_lock lock(m_Mutex);
+
+       m_Blocking = false;
+}
+
index d747b93cddc9b5d4c0b1878a41730e28fc8f78e0..08efb94c3588bf0447835699af06899de1bb4148 100644 (file)
@@ -48,11 +48,15 @@ public:
        void WaitReadable(size_t count);
        void WaitWritable(size_t count);
 
+       void MakeNonBlocking(void);
+
 private:
        Stream::Ptr m_InnerStream;
        
        FIFO::Ptr m_RecvQ;
        FIFO::Ptr m_SendQ;
+
+       bool m_Blocking;
        
        boost::exception_ptr m_Exception;
        
@@ -62,6 +66,8 @@ private:
        
        void ReadThreadProc(void);
        void WriteThreadProc(void);
+
+       void InternalWaitReadable(size_t count, boost::mutex::scoped_lock& lock);
 };
 
 }
index 05a9866f0a3b2d47e66f3fb6a159f9f47d6ce5f9..5e727bb436080469d3e1b34320ec9e11c8a2d47a 100644 (file)
@@ -43,6 +43,8 @@ TlsStream::TlsStream(const Stream::Ptr& innerStream, TlsRole role, shared_ptr<SS
        
        if (!m_InnerStream)
                m_InnerStream = boost::make_shared<BufferedStream>(innerStream);
+
+       m_InnerStream->MakeNonBlocking();
        
        m_SSL = shared_ptr<SSL>(SSL_new(m_SSLContext.get()), SSL_free);
 
index 4c14eee5c9d580df17f4ef3d70b6a7a88ff1243f..29aaae233ec21d5881105d616137ff4755d31c8a 100644 (file)
@@ -52,7 +52,7 @@ shared_ptr<SSL_CTX> MakeSSLContext(const String& pubkey, const String& privkey,
 
        shared_ptr<SSL_CTX> sslContext = shared_ptr<SSL_CTX>(SSL_CTX_new(TLSv1_method()), SSL_CTX_free);
 
-       SSL_CTX_set_mode(sslContext.get(), SSL_MODE_ENABLE_PARTIAL_WRITE | SSL_MODE_ACCEPT_MOVING_WRITE_BUFFER);
+       SSL_CTX_set_mode(sslContext.get(), 0);
 
        if (!SSL_CTX_use_certificate_chain_file(sslContext.get(), pubkey.CStr())) {
                BOOST_THROW_EXCEPTION(openssl_error()
index f77d7949c57fde60a56cdfb0ee9ac399a34f770f..713aad0640f2db70a9584801b8015d31c8205067 100644 (file)
@@ -33,7 +33,6 @@ using namespace icinga;
 REGISTER_TYPE(Endpoint);
 
 boost::signals2::signal<void (const Endpoint::Ptr&)> Endpoint::OnConnected;
-boost::signals2::signal<void (const Endpoint::Ptr&)> Endpoint::OnDisconnected;
 
 /**
  * Constructor for the Endpoint class.
@@ -229,15 +228,6 @@ void Endpoint::RegisterTopicHandler(const String& topic, const boost::function<E
        RegisterSubscription(topic);
 }
 
-void Endpoint::UnregisterTopicHandler(const String&, const boost::function<Endpoint::Callback>&)
-{
-       // TODO: implement
-       //m_TopicHandlers[method] -= callback;
-       //UnregisterSubscription(method);
-
-       BOOST_THROW_EXCEPTION(std::runtime_error("Not implemented."));
-}
-
 void Endpoint::ProcessRequest(const Endpoint::Ptr& sender, const RequestMessage& request)
 {
        if (!IsConnected()) {
@@ -260,7 +250,15 @@ void Endpoint::ProcessRequest(const Endpoint::Ptr& sender, const RequestMessage&
 
                Utility::QueueAsyncCallback(boost::bind(boost::ref(*it->second), GetSelf(), sender, request));
        } else {
-               JsonRpc::SendMessage(GetClient(), request);
+               try {
+                       JsonRpc::SendMessage(GetClient(), request);
+               } catch (const std::exception& ex) {
+                       std::ostringstream msgbuf;
+                       msgbuf << "Error while sending JSON-RPC message for endpoint '" << GetName() << "': " << boost::diagnostic_information(ex);
+                       Log(LogWarning, "remoting", msgbuf.str());
+
+                       m_Client.reset();
+               }
        }
 }
 
@@ -272,51 +270,53 @@ void Endpoint::ProcessResponse(const Endpoint::Ptr& sender, const ResponseMessag
        if (IsLocalEndpoint())
                EndpointManager::GetInstance()->ProcessResponseMessage(sender, response);
        else {
-               JsonRpc::SendMessage(GetClient(), response);
+               try {
+                       JsonRpc::SendMessage(GetClient(), response);
+               } catch (const std::exception& ex) {
+                       std::ostringstream msgbuf;
+                       msgbuf << "Error while sending JSON-RPC message for endpoint '" << GetName() << "': " << boost::diagnostic_information(ex);
+                       Log(LogWarning, "remoting", msgbuf.str());
+
+                       m_Client.reset();
+               }
        }
 }
 
 void Endpoint::MessageThreadProc(const Stream::Ptr& stream)
 {
-       try {
-               for (;;) {
-                       MessagePart message = JsonRpc::ReadMessage(stream);
-                       Endpoint::Ptr sender = GetSelf();
-
-                       if (ResponseMessage::IsResponseMessage(message)) {
-                               /* rather than routing the message to the right virtual
-                                * endpoint we just process it here right away. */
-                               EndpointManager::GetInstance()->ProcessResponseMessage(sender, message);
-                               return;
-                       }
-
-                       RequestMessage request = message;
-
-                       String method;
-                       if (!request.GetMethod(&method))
-                               return;
-
-                       String id;
-                       if (request.GetID(&id))
-                               EndpointManager::GetInstance()->SendAnycastMessage(sender, request);
-                       else
-                               EndpointManager::GetInstance()->SendMulticastMessage(sender, request);
-               }
-       } catch (const std::exception& ex) {
-               Log(LogWarning, "jsonrpc", "Lost connection to endpoint '" + GetName() + "': " + boost::diagnostic_information(ex));
+       for (;;) {
+               MessagePart message;
 
-               {
-                       ObjectLock olock(this);
+               try {
+                       message = JsonRpc::ReadMessage(stream);
+               } catch (const std::exception& ex) {
+                       Log(LogWarning, "jsonrpc", "Error while reading JSON-RPC message for endpoint '" + GetName() + "': " + boost::diagnostic_information(ex));
 
-                       // TODO: _only_ clear non-persistent subscriptions
-                       // unregister ourselves if no persistent subscriptions are left (use a
-                       // timer for that, once we have a TTL property for the topics)
-                       ClearSubscriptions();
+                       GetClient()->Close();
 
                        m_Client.reset();
                }
 
-               OnDisconnected(GetSelf());
+               Endpoint::Ptr sender = GetSelf();
+
+               if (ResponseMessage::IsResponseMessage(message)) {
+                       /* rather than routing the message to the right virtual
+                        * endpoint we just process it here right away. */
+                       EndpointManager::GetInstance()->ProcessResponseMessage(sender, message);
+                       return;
+               }
+
+               RequestMessage request = message;
+
+               String method;
+               if (!request.GetMethod(&method))
+                       return;
+
+               String id;
+               if (request.GetID(&id))
+                       EndpointManager::GetInstance()->SendAnycastMessage(sender, request);
+               else
+                       EndpointManager::GetInstance()->SendMulticastMessage(sender, request);
        }
 }
 
index 272dd66d4e3bc41f3a8bf9aece75efa6d272668a..c8e7eee6460da4d07a5a246d362324a07e8b088b 100644 (file)
@@ -69,7 +69,6 @@ public:
        void ClearSubscriptions(void);
 
        void RegisterTopicHandler(const String& topic, const boost::function<Callback>& callback);
-       void UnregisterTopicHandler(const String& topic, const boost::function<Callback>& callback);
 
        String GetNode(void) const;
        String GetService(void) const;
@@ -77,7 +76,6 @@ public:
        static Endpoint::Ptr MakeEndpoint(const String& name, bool replicated, bool local = true);
 
        static boost::signals2::signal<void (const Endpoint::Ptr&)> OnConnected;
-       static boost::signals2::signal<void (const Endpoint::Ptr&)> OnDisconnected;
 
 private:
        Attribute<bool> m_Local;
index af517bfd9eba44e6a673a768ffc74f84a60994d6..ff5759f37f43fb1d0088d625f2419aaecfdf5399 100644 (file)
@@ -203,7 +203,9 @@ void EndpointManager::NewClientHandler(const Socket::Ptr& client, TlsRole role)
        if (!endpoint)
                endpoint = Endpoint::MakeEndpoint(identity, true);
 
-       endpoint->SetClient(tlsStream);
+       BufferedStream::Ptr bufferedStream = boost::make_shared<BufferedStream>(tlsStream);
+
+       endpoint->SetClient(bufferedStream);
 }
 
 /**