From d9730f5b83b3fb38a58f8e1e598f860b060720ed Mon Sep 17 00:00:00 2001 From: Gunnar Beutner Date: Fri, 5 Apr 2013 12:09:26 +0200 Subject: [PATCH] Bugfixes for the replication component. --- .../replication/replicationcomponent.cpp | 4 +- lib/base/bufferedstream.cpp | 51 ++++++++--- lib/base/bufferedstream.h | 6 ++ lib/base/tlsstream.cpp | 2 + lib/base/tlsutility.cpp | 2 +- lib/remoting/endpoint.cpp | 90 +++++++++---------- lib/remoting/endpoint.h | 2 - lib/remoting/endpointmanager.cpp | 4 +- 8 files changed, 100 insertions(+), 61 deletions(-) diff --git a/components/replication/replicationcomponent.cpp b/components/replication/replicationcomponent.cpp index b8d252593..a347fc662 100644 --- a/components/replication/replicationcomponent.cpp +++ b/components/replication/replicationcomponent.cpp @@ -119,7 +119,7 @@ RequestMessage ReplicationComponent::MakeObjectMessage(const DynamicObject::Ptr& msg.SetParams(params); params.Set("name", object->GetName()); - params.Set("type", object->GetType()); + params.Set("type", object->GetType()->GetName()); String source = object->GetSource(); @@ -164,7 +164,7 @@ void ReplicationComponent::TransactionClosingHandler(double tx, const std::set()), m_SendQ(boost::make_shared()) + : m_InnerStream(innerStream), m_RecvQ(boost::make_shared()), m_SendQ(boost::make_shared()), + m_Exception(), m_Blocking(true) { boost::thread readThread(boost::bind(&BufferedStream::ReadThreadProc, this)); readThread.detach(); @@ -52,11 +53,14 @@ void BufferedStream::ReadThreadProc(void) m_ReadCV.notify_all(); } } catch (const std::exception& ex) { - std::ostringstream msgbuf; - msgbuf << "Error for buffered stream (Read): " << boost::diagnostic_information(ex); - Log(LogWarning, "base", msgbuf.str()); + { + boost::mutex::scoped_lock lock(m_Mutex); + + if (!m_Exception) + m_Exception = boost::current_exception(); - Close(); + m_ReadCV.notify_all(); + } } } @@ -80,11 +84,14 @@ void BufferedStream::WriteThreadProc(void) m_InnerStream->Write(buffer, rc); } } catch (const std::exception& ex) { - std::ostringstream msgbuf; - msgbuf << "Error for buffered stream (Write): " << boost::diagnostic_information(ex); - Log(LogWarning, "base", msgbuf.str()); + { + boost::mutex::scoped_lock lock(m_Mutex); + + if (!m_Exception) + m_Exception = boost::current_exception(); - Close(); + m_WriteCV.notify_all(); + } } } @@ -104,6 +111,13 @@ void BufferedStream::Close(void) size_t BufferedStream::Read(void *buffer, size_t count) { boost::mutex::scoped_lock lock(m_Mutex); + + if (m_Blocking) + InternalWaitReadable(count, lock); + + if (m_Exception) + boost::rethrow_exception(m_Exception); + return m_RecvQ->Read(buffer, count); } @@ -117,6 +131,10 @@ size_t BufferedStream::Read(void *buffer, size_t count) void BufferedStream::Write(const void *buffer, size_t count) { boost::mutex::scoped_lock lock(m_Mutex); + + if (m_Exception) + boost::rethrow_exception(m_Exception); + m_SendQ->Write(buffer, count); m_WriteCV.notify_all(); } @@ -125,9 +143,22 @@ void BufferedStream::WaitReadable(size_t count) { boost::mutex::scoped_lock lock(m_Mutex); - while (m_RecvQ->GetAvailableBytes() < count) + InternalWaitReadable(count, lock); +} + +void BufferedStream::InternalWaitReadable(size_t count, boost::mutex::scoped_lock& lock) +{ + while (m_RecvQ->GetAvailableBytes() < count && !m_Exception) m_ReadCV.wait(lock); } void BufferedStream::WaitWritable(size_t count) { /* Nothing to do here. */ } + +void BufferedStream::MakeNonBlocking(void) +{ + boost::mutex::scoped_lock lock(m_Mutex); + + m_Blocking = false; +} + diff --git a/lib/base/bufferedstream.h b/lib/base/bufferedstream.h index d747b93cd..08efb94c3 100644 --- a/lib/base/bufferedstream.h +++ b/lib/base/bufferedstream.h @@ -48,11 +48,15 @@ public: void WaitReadable(size_t count); void WaitWritable(size_t count); + void MakeNonBlocking(void); + private: Stream::Ptr m_InnerStream; FIFO::Ptr m_RecvQ; FIFO::Ptr m_SendQ; + + bool m_Blocking; boost::exception_ptr m_Exception; @@ -62,6 +66,8 @@ private: void ReadThreadProc(void); void WriteThreadProc(void); + + void InternalWaitReadable(size_t count, boost::mutex::scoped_lock& lock); }; } diff --git a/lib/base/tlsstream.cpp b/lib/base/tlsstream.cpp index 05a9866f0..5e727bb43 100644 --- a/lib/base/tlsstream.cpp +++ b/lib/base/tlsstream.cpp @@ -43,6 +43,8 @@ TlsStream::TlsStream(const Stream::Ptr& innerStream, TlsRole role, shared_ptr(innerStream); + + m_InnerStream->MakeNonBlocking(); m_SSL = shared_ptr(SSL_new(m_SSLContext.get()), SSL_free); diff --git a/lib/base/tlsutility.cpp b/lib/base/tlsutility.cpp index 4c14eee5c..29aaae233 100644 --- a/lib/base/tlsutility.cpp +++ b/lib/base/tlsutility.cpp @@ -52,7 +52,7 @@ shared_ptr MakeSSLContext(const String& pubkey, const String& privkey, shared_ptr sslContext = shared_ptr(SSL_CTX_new(TLSv1_method()), SSL_CTX_free); - SSL_CTX_set_mode(sslContext.get(), SSL_MODE_ENABLE_PARTIAL_WRITE | SSL_MODE_ACCEPT_MOVING_WRITE_BUFFER); + SSL_CTX_set_mode(sslContext.get(), 0); if (!SSL_CTX_use_certificate_chain_file(sslContext.get(), pubkey.CStr())) { BOOST_THROW_EXCEPTION(openssl_error() diff --git a/lib/remoting/endpoint.cpp b/lib/remoting/endpoint.cpp index f77d7949c..713aad064 100644 --- a/lib/remoting/endpoint.cpp +++ b/lib/remoting/endpoint.cpp @@ -33,7 +33,6 @@ using namespace icinga; REGISTER_TYPE(Endpoint); boost::signals2::signal Endpoint::OnConnected; -boost::signals2::signal Endpoint::OnDisconnected; /** * Constructor for the Endpoint class. @@ -229,15 +228,6 @@ void Endpoint::RegisterTopicHandler(const String& topic, const boost::function&) -{ - // TODO: implement - //m_TopicHandlers[method] -= callback; - //UnregisterSubscription(method); - - BOOST_THROW_EXCEPTION(std::runtime_error("Not implemented.")); -} - void Endpoint::ProcessRequest(const Endpoint::Ptr& sender, const RequestMessage& request) { if (!IsConnected()) { @@ -260,7 +250,15 @@ void Endpoint::ProcessRequest(const Endpoint::Ptr& sender, const RequestMessage& Utility::QueueAsyncCallback(boost::bind(boost::ref(*it->second), GetSelf(), sender, request)); } else { - JsonRpc::SendMessage(GetClient(), request); + try { + JsonRpc::SendMessage(GetClient(), request); + } catch (const std::exception& ex) { + std::ostringstream msgbuf; + msgbuf << "Error while sending JSON-RPC message for endpoint '" << GetName() << "': " << boost::diagnostic_information(ex); + Log(LogWarning, "remoting", msgbuf.str()); + + m_Client.reset(); + } } } @@ -272,51 +270,53 @@ void Endpoint::ProcessResponse(const Endpoint::Ptr& sender, const ResponseMessag if (IsLocalEndpoint()) EndpointManager::GetInstance()->ProcessResponseMessage(sender, response); else { - JsonRpc::SendMessage(GetClient(), response); + try { + JsonRpc::SendMessage(GetClient(), response); + } catch (const std::exception& ex) { + std::ostringstream msgbuf; + msgbuf << "Error while sending JSON-RPC message for endpoint '" << GetName() << "': " << boost::diagnostic_information(ex); + Log(LogWarning, "remoting", msgbuf.str()); + + m_Client.reset(); + } } } void Endpoint::MessageThreadProc(const Stream::Ptr& stream) { - try { - for (;;) { - MessagePart message = JsonRpc::ReadMessage(stream); - Endpoint::Ptr sender = GetSelf(); - - if (ResponseMessage::IsResponseMessage(message)) { - /* rather than routing the message to the right virtual - * endpoint we just process it here right away. */ - EndpointManager::GetInstance()->ProcessResponseMessage(sender, message); - return; - } - - RequestMessage request = message; - - String method; - if (!request.GetMethod(&method)) - return; - - String id; - if (request.GetID(&id)) - EndpointManager::GetInstance()->SendAnycastMessage(sender, request); - else - EndpointManager::GetInstance()->SendMulticastMessage(sender, request); - } - } catch (const std::exception& ex) { - Log(LogWarning, "jsonrpc", "Lost connection to endpoint '" + GetName() + "': " + boost::diagnostic_information(ex)); + for (;;) { + MessagePart message; - { - ObjectLock olock(this); + try { + message = JsonRpc::ReadMessage(stream); + } catch (const std::exception& ex) { + Log(LogWarning, "jsonrpc", "Error while reading JSON-RPC message for endpoint '" + GetName() + "': " + boost::diagnostic_information(ex)); - // TODO: _only_ clear non-persistent subscriptions - // unregister ourselves if no persistent subscriptions are left (use a - // timer for that, once we have a TTL property for the topics) - ClearSubscriptions(); + GetClient()->Close(); m_Client.reset(); } - OnDisconnected(GetSelf()); + Endpoint::Ptr sender = GetSelf(); + + if (ResponseMessage::IsResponseMessage(message)) { + /* rather than routing the message to the right virtual + * endpoint we just process it here right away. */ + EndpointManager::GetInstance()->ProcessResponseMessage(sender, message); + return; + } + + RequestMessage request = message; + + String method; + if (!request.GetMethod(&method)) + return; + + String id; + if (request.GetID(&id)) + EndpointManager::GetInstance()->SendAnycastMessage(sender, request); + else + EndpointManager::GetInstance()->SendMulticastMessage(sender, request); } } diff --git a/lib/remoting/endpoint.h b/lib/remoting/endpoint.h index 272dd66d4..c8e7eee64 100644 --- a/lib/remoting/endpoint.h +++ b/lib/remoting/endpoint.h @@ -69,7 +69,6 @@ public: void ClearSubscriptions(void); void RegisterTopicHandler(const String& topic, const boost::function& callback); - void UnregisterTopicHandler(const String& topic, const boost::function& callback); String GetNode(void) const; String GetService(void) const; @@ -77,7 +76,6 @@ public: static Endpoint::Ptr MakeEndpoint(const String& name, bool replicated, bool local = true); static boost::signals2::signal OnConnected; - static boost::signals2::signal OnDisconnected; private: Attribute m_Local; diff --git a/lib/remoting/endpointmanager.cpp b/lib/remoting/endpointmanager.cpp index af517bfd9..ff5759f37 100644 --- a/lib/remoting/endpointmanager.cpp +++ b/lib/remoting/endpointmanager.cpp @@ -203,7 +203,9 @@ void EndpointManager::NewClientHandler(const Socket::Ptr& client, TlsRole role) if (!endpoint) endpoint = Endpoint::MakeEndpoint(identity, true); - endpoint->SetClient(tlsStream); + BufferedStream::Ptr bufferedStream = boost::make_shared(tlsStream); + + endpoint->SetClient(bufferedStream); } /** -- 2.40.0