#include <memory>
#include <thread>
#include <vector>
+#include <stdexcept>
+#include <boost/exception/all.hpp>
#include <boost/asio/deadline_timer.hpp>
#include <boost/asio/io_context.hpp>
#include <boost/asio/spawn.hpp>
boost::asio::io_context& GetIoContext();
+ /*
+ * Custom exceptions thrown in a Boost.Coroutine may cause stack corruption.
+ * Ensure that these are wrapped correctly.
+ *
+ * Inspired by https://github.com/niekbouman/commelec-api/blob/master/commelec-api/coroutine-exception.hpp
+ * Source: http://boost.2283326.n4.nabble.com/coroutine-only-std-exceptions-are-caught-from-coroutines-td4683671.html
+ */
+ static inline boost::exception_ptr convertExceptionPtr(std::exception_ptr ex) {
+ try {
+ throw boost::enable_current_exception(ex);
+ } catch (...) {
+ return boost::current_exception();
+ }
+ }
+
+ static inline void rethrowBoostExceptionPointer() {
+ std::exception_ptr sep;
+ sep = std::current_exception();
+ boost::exception_ptr bep = convertExceptionPtr(sep);
+ boost::rethrow_exception(bep);
+ }
+
+ static inline size_t GetCoroutineStackSize() {
+#ifdef _WIN32
+ // Increase the stack size for Windows coroutines to prevent exception corruption.
+ // Rationale: Low cost Windows agent only & https://github.com/Icinga/icinga2/issues/7431
+ return 8 * 1024 * 1024;
+#else /* _WIN32 */
+ return boost::coroutines::stack_allocator::traits_type::default_size(); // Default 64 KB
+#endif /* _WIN32 */
+ }
+
+ /* With dedicated strand in *Connection classes. */
+ template <typename Handler, typename Function>
+ static void SpawnCoroutine(Handler h, Function f) {
+
+ boost::asio::spawn(std::forward<Handler>(h),
+ [f](boost::asio::yield_context yc) {
+
+ try {
+ f(yc);
+ } catch (const boost::coroutines::detail::forced_unwind &) {
+ // Required for proper stack unwinding when coroutines are destroyed.
+ // https://github.com/boostorg/coroutine/issues/39
+ throw;
+ } catch (...) {
+ // Handle uncaught exceptions outside of the coroutine.
+ rethrowBoostExceptionPointer();
+ }
+ },
+ boost::coroutines::attributes(GetCoroutineStackSize()) // Set a pre-defined stack size.
+ );
+ }
+
+ /* Without strand in the IO executor's context. */
+ template <typename Function>
+ static void SpawnCoroutine(boost::asio::io_context& io, Function f) {
+
+ boost::asio::spawn(io,
+ [f](boost::asio::yield_context yc) {
+
+ try {
+ f(yc);
+ } catch (const boost::coroutines::detail::forced_unwind &) {
+ // Required for proper stack unwinding when coroutines are destroyed.
+ // https://github.com/boostorg/coroutine/issues/39
+ throw;
+ } catch (...) {
+ // Handle uncaught exceptions outside of the coroutine.
+ rethrowBoostExceptionPointer();
+ }
+ },
+ boost::coroutines::attributes(GetCoroutineStackSize()) // Set a pre-defined stack size.
+ );
+ }
+
private:
IoEngine();
Log(LogInformation, "ApiListener")
<< "Started new listener on '[" << localEndpoint.address() << "]:" << localEndpoint.port() << "'";
- asio::spawn(io, [this, acceptor, sslContext](asio::yield_context yc) { ListenerCoroutineProc(yc, acceptor, sslContext); });
+ IoEngine::SpawnCoroutine(io, [this, acceptor, sslContext](asio::yield_context yc) { ListenerCoroutineProc(yc, acceptor, sslContext); });
UpdateStatusFile(localEndpoint);
server->async_accept(sslConn->lowest_layer(), yc);
- asio::spawn(io, [this, sslConn](asio::yield_context yc) { NewClientHandler(yc, sslConn, String(), RoleServer); });
+ IoEngine::SpawnCoroutine(io, [this, sslConn](asio::yield_context yc) { NewClientHandler(yc, sslConn, String(), RoleServer); });
} catch (const std::exception& ex) {
Log(LogCritical, "ApiListener")
<< "Cannot accept new connection: " << ex.what();
auto& io (IoEngine::Get().GetIoContext());
- asio::spawn(io, [this, endpoint, &io, sslContext](asio::yield_context yc) {
+ IoEngine::SpawnCoroutine(io, [this, endpoint, &io, sslContext](asio::yield_context yc) {
String host = endpoint->GetHost();
String port = endpoint->GetPort();
endpoint->AddClient(aclient);
- asio::spawn(IoEngine::Get().GetIoContext(), [this, aclient, endpoint, needSync](asio::yield_context yc) {
+ IoEngine::SpawnCoroutine(IoEngine::Get().GetIoContext(), [this, aclient, endpoint, needSync](asio::yield_context yc) {
CpuBoundWork syncClient (yc);
SyncClient(aclient, endpoint, needSync);
});
+
} else if (!AddAnonymousClient(aclient)) {
Log(LogNotice, "ApiListener")
<< "Ignoring anonymous JSON-RPC connection " << conninfo
HttpServerConnection::Ptr keepAlive (this);
- asio::spawn(m_IoStrand, [this, keepAlive](asio::yield_context yc) { ProcessMessages(yc); });
- asio::spawn(m_IoStrand, [this, keepAlive](asio::yield_context yc) { CheckLiveness(yc); });
+ IoEngine::SpawnCoroutine(m_IoStrand, [this, keepAlive](asio::yield_context yc) { ProcessMessages(yc); });
+ IoEngine::SpawnCoroutine(m_IoStrand, [this, keepAlive](asio::yield_context yc) { CheckLiveness(yc); });
}
void HttpServerConnection::Disconnect()
HttpServerConnection::Ptr keepAlive (this);
- asio::spawn(m_IoStrand, [this, keepAlive](asio::yield_context yc) {
+ IoEngine::SpawnCoroutine(m_IoStrand, [this, keepAlive](asio::yield_context yc) {
if (!m_ShuttingDown) {
m_ShuttingDown = true;
HttpServerConnection::Ptr keepAlive (this);
- asio::spawn(m_IoStrand, [this, keepAlive](asio::yield_context yc) {
+ IoEngine::SpawnCoroutine(m_IoStrand, [this, keepAlive](asio::yield_context yc) {
if (!m_ShuttingDown) {
char buf[128];
asio::mutable_buffer readBuf (buf, 128);
JsonRpcConnection::Ptr keepAlive (this);
- asio::spawn(m_IoStrand, [this, keepAlive](asio::yield_context yc) { HandleIncomingMessages(yc); });
- asio::spawn(m_IoStrand, [this, keepAlive](asio::yield_context yc) { WriteOutgoingMessages(yc); });
- asio::spawn(m_IoStrand, [this, keepAlive](asio::yield_context yc) { HandleAndWriteHeartbeats(yc); });
- asio::spawn(m_IoStrand, [this, keepAlive](asio::yield_context yc) { CheckLiveness(yc); });
+ IoEngine::SpawnCoroutine(m_IoStrand, [this, keepAlive](asio::yield_context yc) { HandleIncomingMessages(yc); });
+ IoEngine::SpawnCoroutine(m_IoStrand, [this, keepAlive](asio::yield_context yc) { WriteOutgoingMessages(yc); });
+ IoEngine::SpawnCoroutine(m_IoStrand, [this, keepAlive](asio::yield_context yc) { HandleAndWriteHeartbeats(yc); });
+ IoEngine::SpawnCoroutine(m_IoStrand, [this, keepAlive](asio::yield_context yc) { CheckLiveness(yc); });
}
void JsonRpcConnection::HandleIncomingMessages(boost::asio::yield_context yc)
JsonRpcConnection::Ptr keepAlive (this);
- asio::spawn(m_IoStrand, [this, keepAlive](asio::yield_context yc) {
+ IoEngine::SpawnCoroutine(m_IoStrand, [this, keepAlive](asio::yield_context yc) {
if (!m_ShuttingDown) {
m_ShuttingDown = true;