From: Gunnar Beutner Date: Tue, 6 Mar 2018 07:49:43 +0000 (+0100) Subject: Ensure that SetCorked() works properly X-Git-Tag: v2.8.2~7 X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=6670024f625b554a1d5a91b2f39af50d46db4348;p=icinga2 Ensure that SetCorked() works properly --- diff --git a/lib/base/socketevents-poll.cpp b/lib/base/socketevents-poll.cpp index 76c13cfc6..705ebed21 100644 --- a/lib/base/socketevents-poll.cpp +++ b/lib/base/socketevents-poll.cpp @@ -57,11 +57,17 @@ void SocketEventEnginePoll::ThreadProc(int tid) if (desc.second.Events == 0) continue; - if (desc.second.EventInterface) + int events = desc.second.Events; + + if (desc.second.EventInterface) { desc.second.EventInterface->m_EnginePrivate = &pfds[i]; + if (!desc.second.EventInterface->m_Events) + events = 0; + } + pfds[i].fd = desc.first; - pfds[i].events = desc.second.Events; + pfds[i].events = events; descriptors[i] = desc.second; i++; diff --git a/lib/base/tlsstream.cpp b/lib/base/tlsstream.cpp index 403daf336..3d310c0bf 100644 --- a/lib/base/tlsstream.cpp +++ b/lib/base/tlsstream.cpp @@ -174,6 +174,8 @@ void TlsStream::OnEvent(int revents) */ ERR_clear_error(); + size_t readTotal = 0; + switch (m_CurrentAction) { case TlsActionRead: do { @@ -182,8 +184,10 @@ void TlsStream::OnEvent(int revents) if (rc > 0) { m_RecvQ->Write(buffer, rc); success = true; + + readTotal += rc; } - } while (rc > 0); + } while (rc > 0 && readTotal < 64 * 1024); if (success) m_CV.notify_all(); @@ -265,7 +269,7 @@ void TlsStream::OnEvent(int revents) lock.unlock(); - while (m_RecvQ->IsDataAvailable() && IsHandlingEvents()) + while (!IsCorked() && m_RecvQ->IsDataAvailable() && IsHandlingEvents()) SignalDataAvailable(); } diff --git a/lib/remote/httpserverconnection.cpp b/lib/remote/httpserverconnection.cpp index fdb87097d..69d57343d 100644 --- a/lib/remote/httpserverconnection.cpp +++ b/lib/remote/httpserverconnection.cpp @@ -174,9 +174,7 @@ bool HttpServerConnection::ProcessMessage(void) return res; } - m_Stream->SetCorked(true); - - m_RequestQueue.Enqueue(std::bind(&HttpServerConnection::ProcessMessageAsync, + m_RequestQueue.Enqueue(boost::bind(&HttpServerConnection::ProcessMessageAsync, HttpServerConnection::Ptr(this), m_CurrentRequest, response, m_AuthenticatedUser)); m_Seen = Utility::GetTime(); @@ -348,6 +346,8 @@ void HttpServerConnection::DataAvailableHandler(void) if (!m_Stream->IsEof()) { boost::mutex::scoped_lock lock(m_DataHandlerMutex); + m_Stream->SetCorked(true); + try { while (ProcessMessage()) ; /* empty loop body */ @@ -357,6 +357,8 @@ void HttpServerConnection::DataAvailableHandler(void) close = true; } + + m_RequestQueue.Enqueue(boost::bind(&Stream::SetCorked, m_Stream, false)); } else close = true; diff --git a/lib/remote/jsonrpcconnection.cpp b/lib/remote/jsonrpcconnection.cpp index ac23a1060..add6e1a70 100644 --- a/lib/remote/jsonrpcconnection.cpp +++ b/lib/remote/jsonrpcconnection.cpp @@ -144,8 +144,6 @@ void JsonRpcConnection::MessageHandlerWrapper(const String& jsonString) try { MessageHandler(jsonString); - - m_Stream->SetCorked(false); } catch (const std::exception& ex) { Log(LogWarning, "JsonRpcConnection") << "Error while reading JSON-RPC message for identity '" << m_Identity @@ -249,8 +247,6 @@ bool JsonRpcConnection::ProcessMessage(void) if (srs != StatusNewItem) return false; - m_Stream->SetCorked(true); - l_JsonRpcConnectionWorkQueues[m_ID % l_JsonRpcConnectionWorkQueueCount].Enqueue(boost::bind(&JsonRpcConnection::MessageHandlerWrapper, JsonRpcConnection::Ptr(this), message)); return true; @@ -266,6 +262,8 @@ void JsonRpcConnection::DataAvailableHandler(void) if (!m_Stream->IsEof()) { boost::mutex::scoped_lock lock(m_DataHandlerMutex); + m_Stream->SetCorked(true); + try { while (ProcessMessage()) ; /* empty loop body */ @@ -278,6 +276,8 @@ void JsonRpcConnection::DataAvailableHandler(void) return; } + + l_JsonRpcConnectionWorkQueues[m_ID % l_JsonRpcConnectionWorkQueueCount].Enqueue(boost::bind(&Stream::SetCorked, m_Stream, false)); } else close = true;