From: Michael Friedrich <michael.friedrich@icinga.com>
Date: Fri, 6 Sep 2019 13:11:55 +0000 (+0200)
Subject: Introduce IoEngine::SpawnCoroutine wrapping asio::spawn and Boost exceptions
X-Git-Tag: v2.11.0~1^2~7^2
X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=refs%2Fpull%2F7491%2Fhead;p=icinga2

Introduce IoEngine::SpawnCoroutine wrapping asio::spawn and Boost exceptions

This is required to

- catch all exceptions and wrap them into Boost exceptions. They
are the only ones allowed with Boost.Coroutine.
- set a dedicated coroutine stack size for Windows.

refs #7431
---

diff --git a/lib/base/io-engine.hpp b/lib/base/io-engine.hpp
index 1c6d46045..b9e4ee6c3 100644
--- a/lib/base/io-engine.hpp
+++ b/lib/base/io-engine.hpp
@@ -9,6 +9,8 @@
 #include <memory>
 #include <thread>
 #include <vector>
+#include <stdexcept>
+#include <boost/exception/all.hpp>
 #include <boost/asio/deadline_timer.hpp>
 #include <boost/asio/io_context.hpp>
 #include <boost/asio/spawn.hpp>
@@ -77,6 +79,82 @@ public:
 
 	boost::asio::io_context& GetIoContext();
 
+	/*
+	 * Custom exceptions thrown in a Boost.Coroutine may cause stack corruption.
+	 * Ensure that these are wrapped correctly.
+	 *
+	 * Inspired by https://github.com/niekbouman/commelec-api/blob/master/commelec-api/coroutine-exception.hpp
+	 * Source: http://boost.2283326.n4.nabble.com/coroutine-only-std-exceptions-are-caught-from-coroutines-td4683671.html
+	 */
+	static inline boost::exception_ptr convertExceptionPtr(std::exception_ptr ex) {
+		try {
+			throw boost::enable_current_exception(ex);
+		} catch (...) {
+			return boost::current_exception();
+		}
+	}
+
+	static inline void rethrowBoostExceptionPointer() {
+		std::exception_ptr sep;
+		sep = std::current_exception();
+		boost::exception_ptr bep = convertExceptionPtr(sep);
+		boost::rethrow_exception(bep);
+	}
+
+	static inline size_t GetCoroutineStackSize() {
+#ifdef _WIN32
+		// Increase the stack size for Windows coroutines to prevent exception corruption.
+		// Rationale: Low cost Windows agent only & https://github.com/Icinga/icinga2/issues/7431
+		return 8 * 1024 * 1024;
+#else /* _WIN32 */
+		return boost::coroutines::stack_allocator::traits_type::default_size(); // Default 64 KB
+#endif /* _WIN32 */
+	}
+
+	/* With dedicated strand in *Connection classes. */
+	template <typename Handler, typename Function>
+	static void SpawnCoroutine(Handler h, Function f) {
+
+		boost::asio::spawn(std::forward<Handler>(h),
+			[f](boost::asio::yield_context yc) {
+
+				try {
+					f(yc);
+				} catch (const boost::coroutines::detail::forced_unwind &) {
+					// Required for proper stack unwinding when coroutines are destroyed.
+					// https://github.com/boostorg/coroutine/issues/39
+					throw;
+				} catch (...) {
+					// Handle uncaught exceptions outside of the coroutine.
+					rethrowBoostExceptionPointer();
+				}
+			},
+			boost::coroutines::attributes(GetCoroutineStackSize()) // Set a pre-defined stack size.
+		);
+	}
+
+	/* Without strand in the IO executor's context. */
+	template <typename Function>
+	static void SpawnCoroutine(boost::asio::io_context& io, Function f) {
+
+		boost::asio::spawn(io,
+			[f](boost::asio::yield_context yc) {
+
+				try {
+					f(yc);
+				} catch (const boost::coroutines::detail::forced_unwind &) {
+					// Required for proper stack unwinding when coroutines are destroyed.
+					// https://github.com/boostorg/coroutine/issues/39
+					throw;
+				} catch (...) {
+					// Handle uncaught exceptions outside of the coroutine.
+					rethrowBoostExceptionPointer();
+				}
+			},
+			boost::coroutines::attributes(GetCoroutineStackSize()) // Set a pre-defined stack size.
+		);
+	}
+
 private:
 	IoEngine();
 
diff --git a/lib/remote/apilistener.cpp b/lib/remote/apilistener.cpp
index 5d82b6dc8..81385e6da 100644
--- a/lib/remote/apilistener.cpp
+++ b/lib/remote/apilistener.cpp
@@ -416,7 +416,7 @@ bool ApiListener::AddListener(const String& node, const String& service)
 	Log(LogInformation, "ApiListener")
 		<< "Started new listener on '[" << localEndpoint.address() << "]:" << localEndpoint.port() << "'";
 
-	asio::spawn(io, [this, acceptor, sslContext](asio::yield_context yc) { ListenerCoroutineProc(yc, acceptor, sslContext); });
+	IoEngine::SpawnCoroutine(io, [this, acceptor, sslContext](asio::yield_context yc) { ListenerCoroutineProc(yc, acceptor, sslContext); });
 
 	UpdateStatusFile(localEndpoint);
 
@@ -435,7 +435,7 @@ void ApiListener::ListenerCoroutineProc(boost::asio::yield_context yc, const std
 
 			server->async_accept(sslConn->lowest_layer(), yc);
 
-			asio::spawn(io, [this, sslConn](asio::yield_context yc) { NewClientHandler(yc, sslConn, String(), RoleServer); });
+			IoEngine::SpawnCoroutine(io, [this, sslConn](asio::yield_context yc) { NewClientHandler(yc, sslConn, String(), RoleServer); });
 		} catch (const std::exception& ex) {
 			Log(LogCritical, "ApiListener")
 				<< "Cannot accept new connection: " << ex.what();
@@ -462,7 +462,7 @@ void ApiListener::AddConnection(const Endpoint::Ptr& endpoint)
 
 	auto& io (IoEngine::Get().GetIoContext());
 
-	asio::spawn(io, [this, endpoint, &io, sslContext](asio::yield_context yc) {
+	IoEngine::SpawnCoroutine(io, [this, endpoint, &io, sslContext](asio::yield_context yc) {
 		String host = endpoint->GetHost();
 		String port = endpoint->GetPort();
 
@@ -664,11 +664,12 @@ void ApiListener::NewClientHandlerInternal(boost::asio::yield_context yc, const
 
 			endpoint->AddClient(aclient);
 
-			asio::spawn(IoEngine::Get().GetIoContext(), [this, aclient, endpoint, needSync](asio::yield_context yc) {
+			IoEngine::SpawnCoroutine(IoEngine::Get().GetIoContext(), [this, aclient, endpoint, needSync](asio::yield_context yc) {
 				CpuBoundWork syncClient (yc);
 
 				SyncClient(aclient, endpoint, needSync);
 			});
+
 		} else if (!AddAnonymousClient(aclient)) {
 			Log(LogNotice, "ApiListener")
 				<< "Ignoring anonymous JSON-RPC connection " << conninfo
diff --git a/lib/remote/httpserverconnection.cpp b/lib/remote/httpserverconnection.cpp
index 556f60857..2589c9d7d 100644
--- a/lib/remote/httpserverconnection.cpp
+++ b/lib/remote/httpserverconnection.cpp
@@ -63,8 +63,8 @@ void HttpServerConnection::Start()
 
 	HttpServerConnection::Ptr keepAlive (this);
 
-	asio::spawn(m_IoStrand, [this, keepAlive](asio::yield_context yc) { ProcessMessages(yc); });
-	asio::spawn(m_IoStrand, [this, keepAlive](asio::yield_context yc) { CheckLiveness(yc); });
+	IoEngine::SpawnCoroutine(m_IoStrand, [this, keepAlive](asio::yield_context yc) { ProcessMessages(yc); });
+	IoEngine::SpawnCoroutine(m_IoStrand, [this, keepAlive](asio::yield_context yc) { CheckLiveness(yc); });
 }
 
 void HttpServerConnection::Disconnect()
@@ -73,7 +73,7 @@ void HttpServerConnection::Disconnect()
 
 	HttpServerConnection::Ptr keepAlive (this);
 
-	asio::spawn(m_IoStrand, [this, keepAlive](asio::yield_context yc) {
+	IoEngine::SpawnCoroutine(m_IoStrand, [this, keepAlive](asio::yield_context yc) {
 		if (!m_ShuttingDown) {
 			m_ShuttingDown = true;
 
@@ -117,7 +117,7 @@ void HttpServerConnection::StartStreaming()
 
 	HttpServerConnection::Ptr keepAlive (this);
 
-	asio::spawn(m_IoStrand, [this, keepAlive](asio::yield_context yc) {
+	IoEngine::SpawnCoroutine(m_IoStrand, [this, keepAlive](asio::yield_context yc) {
 		if (!m_ShuttingDown) {
 			char buf[128];
 			asio::mutable_buffer readBuf (buf, 128);
diff --git a/lib/remote/jsonrpcconnection.cpp b/lib/remote/jsonrpcconnection.cpp
index eb9a946a4..b6d1d41e6 100644
--- a/lib/remote/jsonrpcconnection.cpp
+++ b/lib/remote/jsonrpcconnection.cpp
@@ -52,10 +52,10 @@ void JsonRpcConnection::Start()
 
 	JsonRpcConnection::Ptr keepAlive (this);
 
-	asio::spawn(m_IoStrand, [this, keepAlive](asio::yield_context yc) { HandleIncomingMessages(yc); });
-	asio::spawn(m_IoStrand, [this, keepAlive](asio::yield_context yc) { WriteOutgoingMessages(yc); });
-	asio::spawn(m_IoStrand, [this, keepAlive](asio::yield_context yc) { HandleAndWriteHeartbeats(yc); });
-	asio::spawn(m_IoStrand, [this, keepAlive](asio::yield_context yc) { CheckLiveness(yc); });
+	IoEngine::SpawnCoroutine(m_IoStrand, [this, keepAlive](asio::yield_context yc) { HandleIncomingMessages(yc); });
+	IoEngine::SpawnCoroutine(m_IoStrand, [this, keepAlive](asio::yield_context yc) { WriteOutgoingMessages(yc); });
+	IoEngine::SpawnCoroutine(m_IoStrand, [this, keepAlive](asio::yield_context yc) { HandleAndWriteHeartbeats(yc); });
+	IoEngine::SpawnCoroutine(m_IoStrand, [this, keepAlive](asio::yield_context yc) { CheckLiveness(yc); });
 }
 
 void JsonRpcConnection::HandleIncomingMessages(boost::asio::yield_context yc)
@@ -193,7 +193,7 @@ void JsonRpcConnection::Disconnect()
 
 	JsonRpcConnection::Ptr keepAlive (this);
 
-	asio::spawn(m_IoStrand, [this, keepAlive](asio::yield_context yc) {
+	IoEngine::SpawnCoroutine(m_IoStrand, [this, keepAlive](asio::yield_context yc) {
 		if (!m_ShuttingDown) {
 			m_ShuttingDown = true;