From: Michael Friedrich Date: Fri, 24 May 2019 08:07:51 +0000 (+0200) Subject: Quality: Remove old SocketEvent functionality X-Git-Tag: v2.11.0-rc1~85^2~6 X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=5dbb6ad3662397107a4a277c1804e1fc4b8f03f6;p=icinga2 Quality: Remove old SocketEvent functionality --- diff --git a/lib/base/CMakeLists.txt b/lib/base/CMakeLists.txt index 9260b31ed..fb3de3029 100644 --- a/lib/base/CMakeLists.txt +++ b/lib/base/CMakeLists.txt @@ -61,7 +61,6 @@ set(base_SOURCES serializer.cpp serializer.hpp singleton.hpp socket.cpp socket.hpp - socketevents.cpp socketevents-epoll.cpp socketevents-poll.cpp socketevents.hpp stacktrace.cpp stacktrace.hpp statsfunction.hpp stdiostream.cpp stdiostream.hpp diff --git a/lib/base/socketevents-epoll.cpp b/lib/base/socketevents-epoll.cpp deleted file mode 100644 index 0e75ee5e8..000000000 --- a/lib/base/socketevents-epoll.cpp +++ /dev/null @@ -1,189 +0,0 @@ -/* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */ - -#include "base/socketevents.hpp" -#include "base/exception.hpp" -#include "base/logger.hpp" -#include "base/utility.hpp" -#include -#include -#ifdef __linux__ -# include - -using namespace icinga; - -void SocketEventEngineEpoll::InitializeThread(int tid) -{ - m_PollFDs[tid] = epoll_create(128); - Utility::SetCloExec(m_PollFDs[tid]); - - SocketEventDescriptor sed; - - m_Sockets[tid][m_EventFDs[tid][0]] = sed; - m_FDChanged[tid] = true; - - epoll_event event; - memset(&event, 0, sizeof(event)); - event.data.fd = m_EventFDs[tid][0]; - event.events = EPOLLIN; - epoll_ctl(m_PollFDs[tid], EPOLL_CTL_ADD, m_EventFDs[tid][0], &event); -} - -int SocketEventEngineEpoll::PollToEpoll(int events) -{ - int result = 0; - - if (events & POLLIN) - result |= EPOLLIN; - - if (events & POLLOUT) - result |= EPOLLOUT; - - return events; -} - -int SocketEventEngineEpoll::EpollToPoll(int events) -{ - int result = 0; - - if (events & EPOLLIN) - result |= POLLIN; - - if (events & EPOLLOUT) - result |= POLLOUT; - - return events; -} - -void SocketEventEngineEpoll::ThreadProc(int tid) -{ - Utility::SetThreadName("SocketIO"); - - for (;;) { - { - boost::mutex::scoped_lock lock(m_EventMutex[tid]); - - if (m_FDChanged[tid]) { - m_FDChanged[tid] = false; - m_CV[tid].notify_all(); - } - } - - epoll_event pevents[64]; - int ready = epoll_wait(m_PollFDs[tid], pevents, sizeof(pevents) / sizeof(pevents[0]), -1); - - std::vector events; - - { - boost::mutex::scoped_lock lock(m_EventMutex[tid]); - - if (m_FDChanged[tid]) { - m_FDChanged[tid] = false; - - continue; - } - - for (int i = 0; i < ready; i++) { - if (pevents[i].data.fd == m_EventFDs[tid][0]) { - char buffer[512]; - if (recv(m_EventFDs[tid][0], buffer, sizeof(buffer), 0) < 0) - Log(LogCritical, "SocketEvents", "Read from event FD failed."); - - continue; - } - - if ((pevents[i].events & (EPOLLIN | EPOLLOUT | EPOLLHUP | EPOLLERR)) == 0) - continue; - - EventDescription event; - event.REvents = SocketEventEngineEpoll::EpollToPoll(pevents[i].events); - event.Descriptor = m_Sockets[tid][pevents[i].data.fd]; - - events.emplace_back(std::move(event)); - } - } - - for (const EventDescription& event : events) { - try { - event.Descriptor.EventInterface->OnEvent(event.REvents); - } catch (const std::exception& ex) { - Log(LogCritical, "SocketEvents") - << "Exception thrown in socket I/O handler:\n" - << DiagnosticInformation(ex); - } catch (...) { - Log(LogCritical, "SocketEvents", "Exception of unknown type thrown in socket I/O handler."); - } - } - } -} - -void SocketEventEngineEpoll::Register(SocketEvents *se) -{ - int tid = se->m_ID % SOCKET_IOTHREADS; - - { - boost::mutex::scoped_lock lock(m_EventMutex[tid]); - - VERIFY(se->m_FD != INVALID_SOCKET); - - SocketEventDescriptor desc; - desc.EventInterface = se; - - VERIFY(m_Sockets[tid].find(se->m_FD) == m_Sockets[tid].end()); - - m_Sockets[tid][se->m_FD] = desc; - - epoll_event event; - memset(&event, 0, sizeof(event)); - event.data.fd = se->m_FD; - event.events = 0; - epoll_ctl(m_PollFDs[tid], EPOLL_CTL_ADD, se->m_FD, &event); - - se->m_Events = true; - } -} - -void SocketEventEngineEpoll::Unregister(SocketEvents *se) -{ - int tid = se->m_ID % SOCKET_IOTHREADS; - - { - boost::mutex::scoped_lock lock(m_EventMutex[tid]); - - if (se->m_FD == INVALID_SOCKET) - return; - - m_Sockets[tid].erase(se->m_FD); - m_FDChanged[tid] = true; - - epoll_ctl(m_PollFDs[tid], EPOLL_CTL_DEL, se->m_FD, nullptr); - - se->m_FD = INVALID_SOCKET; - se->m_Events = false; - } - - WakeUpThread(tid, true); -} - -void SocketEventEngineEpoll::ChangeEvents(SocketEvents *se, int events) -{ - if (se->m_FD == INVALID_SOCKET) - BOOST_THROW_EXCEPTION(std::runtime_error("Tried to read/write from a closed socket.")); - - int tid = se->m_ID % SOCKET_IOTHREADS; - - { - boost::mutex::scoped_lock lock(m_EventMutex[tid]); - - auto it = m_Sockets[tid].find(se->m_FD); - - if (it == m_Sockets[tid].end()) - return; - - epoll_event event; - memset(&event, 0, sizeof(event)); - event.data.fd = se->m_FD; - event.events = SocketEventEngineEpoll::PollToEpoll(events); - epoll_ctl(m_PollFDs[tid], EPOLL_CTL_MOD, se->m_FD, &event); - } -} -#endif /* __linux__ */ diff --git a/lib/base/socketevents-poll.cpp b/lib/base/socketevents-poll.cpp deleted file mode 100644 index f8200d435..000000000 --- a/lib/base/socketevents-poll.cpp +++ /dev/null @@ -1,190 +0,0 @@ -/* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */ - -#include "base/socketevents.hpp" -#include "base/exception.hpp" -#include "base/logger.hpp" -#include "base/utility.hpp" -#include -#include - -using namespace icinga; - -void SocketEventEnginePoll::InitializeThread(int tid) -{ - SocketEventDescriptor sed; - sed.Events = POLLIN; - - m_Sockets[tid][m_EventFDs[tid][0]] = sed; - m_FDChanged[tid] = true; -} - -void SocketEventEnginePoll::ThreadProc(int tid) -{ - Utility::SetThreadName("SocketIO"); - - std::vector pfds; - std::vector descriptors; - - for (;;) { - { - boost::mutex::scoped_lock lock(m_EventMutex[tid]); - - if (m_FDChanged[tid]) { - pfds.resize(m_Sockets[tid].size()); - descriptors.resize(m_Sockets[tid].size()); - - int i = 0; - - typedef std::map::value_type kv_pair; - - for (const kv_pair& desc : m_Sockets[tid]) { - if (desc.second.Events == 0) - continue; - - int events = desc.second.Events; - - if (desc.second.EventInterface) { - desc.second.EventInterface->m_EnginePrivate = &pfds[i]; - - if (!desc.second.EventInterface->m_Events) - events = 0; - } - - pfds[i].fd = desc.first; - pfds[i].events = events; - descriptors[i] = desc.second; - - i++; - } - - pfds.resize(i); - - m_FDChanged[tid] = false; - m_CV[tid].notify_all(); - } - } - - ASSERT(!pfds.empty()); - -#ifdef _WIN32 - (void) WSAPoll(&pfds[0], pfds.size(), -1); -#else /* _WIN32 */ - (void) poll(&pfds[0], pfds.size(), -1); -#endif /* _WIN32 */ - - std::vector events; - - { - boost::mutex::scoped_lock lock(m_EventMutex[tid]); - - if (m_FDChanged[tid]) - continue; - - for (std::vector::size_type i = 0; i < pfds.size(); i++) { - if ((pfds[i].revents & (POLLIN | POLLOUT | POLLHUP | POLLERR)) == 0) - continue; - - if (pfds[i].fd == m_EventFDs[tid][0]) { - char buffer[512]; - if (recv(m_EventFDs[tid][0], buffer, sizeof(buffer), 0) < 0) - Log(LogCritical, "SocketEvents", "Read from event FD failed."); - - continue; - } - - EventDescription event; - event.REvents = pfds[i].revents; - event.Descriptor = descriptors[i]; - - events.emplace_back(std::move(event)); - } - } - - for (const EventDescription& event : events) { - try { - event.Descriptor.EventInterface->OnEvent(event.REvents); - } catch (const std::exception& ex) { - Log(LogCritical, "SocketEvents") - << "Exception thrown in socket I/O handler:\n" - << DiagnosticInformation(ex); - } catch (...) { - Log(LogCritical, "SocketEvents", "Exception of unknown type thrown in socket I/O handler."); - } - } - } -} - -void SocketEventEnginePoll::Register(SocketEvents *se) -{ - int tid = se->m_ID % SOCKET_IOTHREADS; - - { - boost::mutex::scoped_lock lock(m_EventMutex[tid]); - - VERIFY(se->m_FD != INVALID_SOCKET); - - SocketEventDescriptor desc; - desc.Events = 0; - desc.EventInterface = se; - - VERIFY(m_Sockets[tid].find(se->m_FD) == m_Sockets[tid].end()); - - m_Sockets[tid][se->m_FD] = desc; - - m_FDChanged[tid] = true; - - se->m_Events = true; - } - - WakeUpThread(tid, true); -} - -void SocketEventEnginePoll::Unregister(SocketEvents *se) -{ - int tid = se->m_ID % SOCKET_IOTHREADS; - - { - boost::mutex::scoped_lock lock(m_EventMutex[tid]); - - if (se->m_FD == INVALID_SOCKET) - return; - - m_Sockets[tid].erase(se->m_FD); - m_FDChanged[tid] = true; - - se->m_FD = INVALID_SOCKET; - se->m_Events = false; - } - - WakeUpThread(tid, true); -} - -void SocketEventEnginePoll::ChangeEvents(SocketEvents *se, int events) -{ - if (se->m_FD == INVALID_SOCKET) - BOOST_THROW_EXCEPTION(std::runtime_error("Tried to read/write from a closed socket.")); - - int tid = se->m_ID % SOCKET_IOTHREADS; - - { - boost::mutex::scoped_lock lock(m_EventMutex[tid]); - - auto it = m_Sockets[tid].find(se->m_FD); - - if (it == m_Sockets[tid].end()) - return; - - if (it->second.Events == events) - return; - - it->second.Events = events; - - if (se->m_EnginePrivate && std::this_thread::get_id() == m_Threads[tid].get_id()) - ((pollfd *)se->m_EnginePrivate)->events = events; - else - m_FDChanged[tid] = true; - } - - WakeUpThread(tid, false); -} - diff --git a/lib/base/socketevents.cpp b/lib/base/socketevents.cpp deleted file mode 100644 index f79d4218d..000000000 --- a/lib/base/socketevents.cpp +++ /dev/null @@ -1,142 +0,0 @@ -/* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */ - -#include "base/socketevents.hpp" -#include "base/exception.hpp" -#include "base/logger.hpp" -#include "base/application.hpp" -#include "base/scriptglobal.hpp" -#include "base/utility.hpp" -#include -#include -#ifdef __linux__ -# include -#endif /* __linux__ */ - -using namespace icinga; - -static boost::once_flag l_SocketIOOnceFlag = BOOST_ONCE_INIT; -static SocketEventEngine *l_SocketIOEngine; - -int SocketEvents::m_NextID = 0; - -void SocketEventEngine::Start() -{ - for (int tid = 0; tid < SOCKET_IOTHREADS; tid++) { - Socket::SocketPair(m_EventFDs[tid]); - - Utility::SetNonBlockingSocket(m_EventFDs[tid][0]); - Utility::SetNonBlockingSocket(m_EventFDs[tid][1]); - -#ifndef _WIN32 - Utility::SetCloExec(m_EventFDs[tid][0]); - Utility::SetCloExec(m_EventFDs[tid][1]); -#endif /* _WIN32 */ - - InitializeThread(tid); - - m_Threads[tid] = std::thread(std::bind(&SocketEventEngine::ThreadProc, this, tid)); - } -} - -void SocketEventEngine::WakeUpThread(int sid, bool wait) -{ - int tid = sid % SOCKET_IOTHREADS; - - if (std::this_thread::get_id() == m_Threads[tid].get_id()) - return; - - if (wait) { - boost::mutex::scoped_lock lock(m_EventMutex[tid]); - - m_FDChanged[tid] = true; - - while (m_FDChanged[tid]) { - (void) send(m_EventFDs[tid][1], "T", 1, 0); - - boost::system_time const timeout = boost::get_system_time() + boost::posix_time::milliseconds(50); - m_CV[tid].timed_wait(lock, timeout); - } - } else { - (void) send(m_EventFDs[tid][1], "T", 1, 0); - } -} - -void SocketEvents::InitializeEngine() -{ - String eventEngine = Configuration::EventEngine; - - if (eventEngine.IsEmpty()) -#ifdef __linux__ - eventEngine = "epoll"; -#else /* __linux__ */ - eventEngine = "poll"; -#endif /* __linux__ */ - - if (eventEngine == "poll") - l_SocketIOEngine = new SocketEventEnginePoll(); -#ifdef __linux__ - else if (eventEngine == "epoll") - l_SocketIOEngine = new SocketEventEngineEpoll(); -#endif /* __linux__ */ - else { - Log(LogWarning, "SocketEvents") - << "Invalid event engine selected: " << eventEngine << " - Falling back to 'poll'"; - - eventEngine = "poll"; - - l_SocketIOEngine = new SocketEventEnginePoll(); - } - - l_SocketIOEngine->Start(); - - Configuration::EventEngine = eventEngine; -} - -/** - * Constructor for the SocketEvents class. - */ -SocketEvents::SocketEvents(const Socket::Ptr& socket) - : m_ID(m_NextID++), m_FD(socket->GetFD()), m_EnginePrivate(nullptr) -{ - boost::call_once(l_SocketIOOnceFlag, &SocketEvents::InitializeEngine); - - Register(); -} - -SocketEvents::~SocketEvents() -{ - VERIFY(m_FD == INVALID_SOCKET); -} - -void SocketEvents::Register() -{ - l_SocketIOEngine->Register(this); -} - -void SocketEvents::Unregister() -{ - l_SocketIOEngine->Unregister(this); -} - -void SocketEvents::ChangeEvents(int events) -{ - l_SocketIOEngine->ChangeEvents(this, events); -} - -boost::mutex& SocketEventEngine::GetMutex(int tid) -{ - return m_EventMutex[tid]; -} - -bool SocketEvents::IsHandlingEvents() const -{ - int tid = m_ID % SOCKET_IOTHREADS; - boost::mutex::scoped_lock lock(l_SocketIOEngine->GetMutex(tid)); - return m_Events; -} - -void SocketEvents::OnEvent(int revents) -{ - -} - diff --git a/lib/base/socketevents.hpp b/lib/base/socketevents.hpp deleted file mode 100644 index 5b0f39d74..000000000 --- a/lib/base/socketevents.hpp +++ /dev/null @@ -1,137 +0,0 @@ -/* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */ - -#ifndef SOCKETEVENTS_H -#define SOCKETEVENTS_H - -#include "base/i2-base.hpp" -#include "base/socket.hpp" -#include "base/stream.hpp" -#include -#include - -#ifndef _WIN32 -# include -#endif /* _WIN32 */ - -namespace icinga -{ - -/** - * Socket event interface - * - * @ingroup base - */ -class SocketEvents : public Stream -{ -public: - DECLARE_PTR_TYPEDEFS(SocketEvents); - - ~SocketEvents(); - - virtual void OnEvent(int revents); - - void Unregister(); - - void ChangeEvents(int events); - - bool IsHandlingEvents() const; - - void *GetEnginePrivate() const; - void SetEnginePrivate(void *priv); - -protected: - SocketEvents(const Socket::Ptr& socket); - -private: - int m_ID; - SOCKET m_FD; - bool m_Events; - void *m_EnginePrivate; - - static int m_NextID; - - static void InitializeEngine(); - - void WakeUpThread(bool wait = false); - - void Register(); - - friend class SocketEventEnginePoll; - friend class SocketEventEngineEpoll; -}; - -#define SOCKET_IOTHREADS 8 - -struct SocketEventDescriptor -{ - int Events{POLLIN}; - SocketEvents::Ptr EventInterface; -}; - -struct EventDescription -{ - int REvents; - SocketEventDescriptor Descriptor; -}; - -class SocketEventEngine -{ -public: - void Start(); - - void WakeUpThread(int sid, bool wait); - - boost::mutex& GetMutex(int tid); - -protected: - virtual void InitializeThread(int tid) = 0; - virtual void ThreadProc(int tid) = 0; - virtual void Register(SocketEvents *se) = 0; - virtual void Unregister(SocketEvents *se) = 0; - virtual void ChangeEvents(SocketEvents *se, int events) = 0; - - std::thread m_Threads[SOCKET_IOTHREADS]; - SOCKET m_EventFDs[SOCKET_IOTHREADS][2]; - bool m_FDChanged[SOCKET_IOTHREADS]; - boost::mutex m_EventMutex[SOCKET_IOTHREADS]; - boost::condition_variable m_CV[SOCKET_IOTHREADS]; - std::map m_Sockets[SOCKET_IOTHREADS]; - - friend class SocketEvents; -}; - -class SocketEventEnginePoll final : public SocketEventEngine -{ -public: - void Register(SocketEvents *se) override; - void Unregister(SocketEvents *se) override; - void ChangeEvents(SocketEvents *se, int events) override; - -protected: - void InitializeThread(int tid) override; - void ThreadProc(int tid) override; -}; - -#ifdef __linux__ -class SocketEventEngineEpoll : public SocketEventEngine -{ -public: - virtual void Register(SocketEvents *se); - virtual void Unregister(SocketEvents *se); - virtual void ChangeEvents(SocketEvents *se, int events); - -protected: - virtual void InitializeThread(int tid); - virtual void ThreadProc(int tid); - -private: - SOCKET m_PollFDs[SOCKET_IOTHREADS]; - - static int PollToEpoll(int events); - static int EpollToPoll(int events); -}; -#endif /* __linux__ */ - -} - -#endif /* SOCKETEVENTS_H */ diff --git a/lib/base/tlsstream.hpp b/lib/base/tlsstream.hpp index dd3b556df..44fe0b9c5 100644 --- a/lib/base/tlsstream.hpp +++ b/lib/base/tlsstream.hpp @@ -5,7 +5,6 @@ #include "base/i2-base.hpp" #include "base/socket.hpp" -#include "base/socketevents.hpp" #include "base/stream.hpp" #include "base/tlsutility.hpp" #include "base/fifo.hpp"