/****************************************************************************** * Icinga 2 * * Copyright (C) 2012-2018 Icinga Development Team (https://icinga.com/) * * * * This program is free software; you can redistribute it and/or * * modify it under the terms of the GNU General Public License * * as published by the Free Software Foundation; either version 2 * * of the License, or (at your option) any later version. * * * * This program is distributed in the hope that it will be useful, * * but WITHOUT ANY WARRANTY; without even the implied warranty of * * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * * GNU General Public License for more details. * * * * You should have received a copy of the GNU General Public License * * along with this program; if not, write to the Free Software Foundation * * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. * ******************************************************************************/ #include "base/exception.hpp" #include "base/io-engine.hpp" #include "base/lazy-init.hpp" #include "base/logger.hpp" #include #include #include #include #include #include #include using namespace icinga; CpuBoundWork::CpuBoundWork(boost::asio::yield_context yc) : m_Done(false) { auto& ioEngine (IoEngine::Get()); for (;;) { auto availableSlots (ioEngine.m_CpuBoundSemaphore.fetch_sub(1)); if (availableSlots < 1) { ioEngine.m_CpuBoundSemaphore.fetch_add(1); ioEngine.m_AlreadyExpiredTimer.async_wait(yc); continue; } break; } } CpuBoundWork::~CpuBoundWork() { if (!m_Done) { IoEngine::Get().m_CpuBoundSemaphore.fetch_add(1); } } void CpuBoundWork::Done() { if (!m_Done) { IoEngine::Get().m_CpuBoundSemaphore.fetch_add(1); m_Done = true; } } IoBoundWorkSlot::IoBoundWorkSlot(boost::asio::yield_context yc) : yc(yc) { IoEngine::Get().m_CpuBoundSemaphore.fetch_add(1); } IoBoundWorkSlot::~IoBoundWorkSlot() { auto& ioEngine (IoEngine::Get()); for (;;) { auto availableSlots (ioEngine.m_CpuBoundSemaphore.fetch_sub(1)); if (availableSlots < 1) { ioEngine.m_CpuBoundSemaphore.fetch_add(1); ioEngine.m_AlreadyExpiredTimer.async_wait(yc); continue; } break; } } LazyInit> IoEngine::m_Instance ([]() { return std::unique_ptr(new IoEngine()); }); IoEngine& IoEngine::Get() { return *m_Instance.Get(); } boost::asio::io_service& IoEngine::GetIoService() { return m_IoService; } IoEngine::IoEngine() : m_IoService(), m_KeepAlive(m_IoService), m_Threads(decltype(m_Threads)::size_type(std::thread::hardware_concurrency() * 2u)), m_AlreadyExpiredTimer(m_IoService) { m_AlreadyExpiredTimer.expires_at(boost::posix_time::neg_infin); m_CpuBoundSemaphore.store(std::thread::hardware_concurrency() * 3u / 2u); for (auto& thread : m_Threads) { thread = std::thread(&IoEngine::RunEventLoop, this); } } IoEngine::~IoEngine() { for (auto& thread : m_Threads) { m_IoService.post([]() { throw TerminateIoThread(); }); } for (auto& thread : m_Threads) { thread.join(); } } void IoEngine::RunEventLoop() { for (;;) { try { m_IoService.run(); break; } catch (const TerminateIoThread&) { break; } catch (const std::exception& e) { Log(LogCritical, "IoEngine", "Exception during I/O operation!"); Log(LogDebug, "IoEngine") << "Exception during I/O operation: " << DiagnosticInformation(e); } } } AsioConditionVariable::AsioConditionVariable(boost::asio::io_service& io, bool init) : m_Timer(io) { m_Timer.expires_at(init ? boost::posix_time::neg_infin : boost::posix_time::pos_infin); } void AsioConditionVariable::Set() { m_Timer.expires_at(boost::posix_time::neg_infin); } void AsioConditionVariable::Clear() { m_Timer.expires_at(boost::posix_time::pos_infin); } void AsioConditionVariable::Wait(boost::asio::yield_context yc) { boost::system::error_code ec; m_Timer.async_wait(yc[ec]); }