]> granicus.if.org Git - icinga2/commitdiff
Ensure that SetCorked() works properly 6146/head
authorGunnar Beutner <gunnar.beutner@icinga.com>
Tue, 6 Mar 2018 07:49:43 +0000 (08:49 +0100)
committerGunnar Beutner <gunnar.beutner@icinga.com>
Tue, 6 Mar 2018 08:03:35 +0000 (09:03 +0100)
lib/base/socketevents-poll.cpp
lib/base/tlsstream.cpp
lib/remote/httpserverconnection.cpp
lib/remote/jsonrpcconnection.cpp

index 5f3b103d4d5363443c6ee01b42b0ed1417e426a4..19748434e5c6e3eed8325213f47832e3a6ca1eef 100644 (file)
@@ -57,11 +57,17 @@ void SocketEventEnginePoll::ThreadProc(int tid)
                                        if (desc.second.Events == 0)
                                                continue;
 
-                                       if (desc.second.EventInterface)
+                                       int events = desc.second.Events;
+
+                                       if (desc.second.EventInterface) {
                                                desc.second.EventInterface->m_EnginePrivate = &pfds[i];
 
+                                               if (!desc.second.EventInterface->m_Events)
+                                                       events = 0;
+                                       }
+
                                        pfds[i].fd = desc.first;
-                                       pfds[i].events = desc.second.Events;
+                                       pfds[i].events = events;
                                        descriptors[i] = desc.second;
 
                                        i++;
index e93f119d8010f057f77b929c4b7f8bec8e0d019f..a05a3d7d56e5c2e05acb8f8a52c36af5a35f8f35 100644 (file)
@@ -173,6 +173,8 @@ void TlsStream::OnEvent(int revents)
         */
        ERR_clear_error();
 
+       size_t readTotal = 0;
+
        switch (m_CurrentAction) {
                case TlsActionRead:
                        do {
@@ -181,8 +183,10 @@ void TlsStream::OnEvent(int revents)
                                if (rc > 0) {
                                        m_RecvQ->Write(buffer, rc);
                                        success = true;
+
+                                       readTotal += rc;
                                }
-                       } while (rc > 0);
+                       } while (rc > 0 && readTotal < 64 * 1024);
 
                        if (success)
                                m_CV.notify_all();
@@ -264,7 +268,7 @@ void TlsStream::OnEvent(int revents)
 
                lock.unlock();
 
-               while (m_RecvQ->IsDataAvailable() && IsHandlingEvents())
+               while (!IsCorked() && m_RecvQ->IsDataAvailable() && IsHandlingEvents())
                        SignalDataAvailable();
        }
 
index 53f287a87295688061f22aa8b384e2a6b22df22d..d409c78bfbd80b387f31d13ab6ea33148ccacd07 100644 (file)
@@ -173,8 +173,6 @@ bool HttpServerConnection::ProcessMessage()
                return res;
        }
 
-       m_Stream->SetCorked(true);
-
        m_RequestQueue.Enqueue(std::bind(&HttpServerConnection::ProcessMessageAsync,
                HttpServerConnection::Ptr(this), m_CurrentRequest, response, m_AuthenticatedUser));
 
@@ -347,6 +345,8 @@ void HttpServerConnection::DataAvailableHandler()
        if (!m_Stream->IsEof()) {
                boost::mutex::scoped_lock lock(m_DataHandlerMutex);
 
+               m_Stream->SetCorked(true);
+
                try {
                        while (ProcessMessage())
                                ; /* empty loop body */
@@ -356,6 +356,8 @@ void HttpServerConnection::DataAvailableHandler()
 
                        close = true;
                }
+
+               m_RequestQueue.Enqueue(std::bind(&Stream::SetCorked, m_Stream, false));
        } else
                close = true;
 
index bbb6fe1803b6f82084b5262e8eeef6c094a1bbc6..259679edc18690bb37f5b3eac0e3c16766ff54a0 100644 (file)
@@ -155,8 +155,6 @@ void JsonRpcConnection::MessageHandlerWrapper(const String& jsonString)
 
        try {
                MessageHandler(jsonString);
-
-               m_Stream->SetCorked(false);
        } catch (const std::exception& ex) {
                Log(LogWarning, "JsonRpcConnection")
                        << "Error while reading JSON-RPC message for identity '" << m_Identity
@@ -262,8 +260,6 @@ bool JsonRpcConnection::ProcessMessage()
        if (srs != StatusNewItem)
                return false;
 
-       m_Stream->SetCorked(true);
-
        l_JsonRpcConnectionWorkQueues[m_ID % l_JsonRpcConnectionWorkQueueCount].Enqueue(std::bind(&JsonRpcConnection::MessageHandlerWrapper, JsonRpcConnection::Ptr(this), message));
 
        return true;
@@ -279,6 +275,8 @@ void JsonRpcConnection::DataAvailableHandler()
        if (!m_Stream->IsEof()) {
                boost::mutex::scoped_lock lock(m_DataHandlerMutex);
 
+               m_Stream->SetCorked(true);
+
                try {
                        while (ProcessMessage())
                                ; /* empty loop body */
@@ -291,6 +289,8 @@ void JsonRpcConnection::DataAvailableHandler()
 
                        return;
                }
+
+               l_JsonRpcConnectionWorkQueues[m_ID % l_JsonRpcConnectionWorkQueueCount].Enqueue(std::bind(&Stream::SetCorked, m_Stream, false));
        } else
                close = true;