/* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */ #include "base/socketevents.hpp" #include "base/exception.hpp" #include "base/logger.hpp" #include "base/utility.hpp" #include #include #ifdef __linux__ # include using namespace icinga; void SocketEventEngineEpoll::InitializeThread(int tid) { m_PollFDs[tid] = epoll_create(128); Utility::SetCloExec(m_PollFDs[tid]); SocketEventDescriptor sed; m_Sockets[tid][m_EventFDs[tid][0]] = sed; m_FDChanged[tid] = true; epoll_event event; memset(&event, 0, sizeof(event)); event.data.fd = m_EventFDs[tid][0]; event.events = EPOLLIN; epoll_ctl(m_PollFDs[tid], EPOLL_CTL_ADD, m_EventFDs[tid][0], &event); } int SocketEventEngineEpoll::PollToEpoll(int events) { int result = 0; if (events & POLLIN) result |= EPOLLIN; if (events & POLLOUT) result |= EPOLLOUT; return events; } int SocketEventEngineEpoll::EpollToPoll(int events) { int result = 0; if (events & EPOLLIN) result |= POLLIN; if (events & EPOLLOUT) result |= POLLOUT; return events; } void SocketEventEngineEpoll::ThreadProc(int tid) { Utility::SetThreadName("SocketIO"); for (;;) { { boost::mutex::scoped_lock lock(m_EventMutex[tid]); if (m_FDChanged[tid]) { m_FDChanged[tid] = false; m_CV[tid].notify_all(); } } epoll_event pevents[64]; int ready = epoll_wait(m_PollFDs[tid], pevents, sizeof(pevents) / sizeof(pevents[0]), -1); std::vector events; { boost::mutex::scoped_lock lock(m_EventMutex[tid]); if (m_FDChanged[tid]) { m_FDChanged[tid] = false; continue; } for (int i = 0; i < ready; i++) { if (pevents[i].data.fd == m_EventFDs[tid][0]) { char buffer[512]; if (recv(m_EventFDs[tid][0], buffer, sizeof(buffer), 0) < 0) Log(LogCritical, "SocketEvents", "Read from event FD failed."); continue; } if ((pevents[i].events & (EPOLLIN | EPOLLOUT | EPOLLHUP | EPOLLERR)) == 0) continue; EventDescription event; event.REvents = SocketEventEngineEpoll::EpollToPoll(pevents[i].events); event.Descriptor = m_Sockets[tid][pevents[i].data.fd]; events.emplace_back(std::move(event)); } } for (const EventDescription& event : events) { try { event.Descriptor.EventInterface->OnEvent(event.REvents); } catch (const std::exception& ex) { Log(LogCritical, "SocketEvents") << "Exception thrown in socket I/O handler:\n" << DiagnosticInformation(ex); } catch (...) { Log(LogCritical, "SocketEvents", "Exception of unknown type thrown in socket I/O handler."); } } } } void SocketEventEngineEpoll::Register(SocketEvents *se) { int tid = se->m_ID % SOCKET_IOTHREADS; { boost::mutex::scoped_lock lock(m_EventMutex[tid]); VERIFY(se->m_FD != INVALID_SOCKET); SocketEventDescriptor desc; desc.EventInterface = se; VERIFY(m_Sockets[tid].find(se->m_FD) == m_Sockets[tid].end()); m_Sockets[tid][se->m_FD] = desc; epoll_event event; memset(&event, 0, sizeof(event)); event.data.fd = se->m_FD; event.events = 0; epoll_ctl(m_PollFDs[tid], EPOLL_CTL_ADD, se->m_FD, &event); se->m_Events = true; } } void SocketEventEngineEpoll::Unregister(SocketEvents *se) { int tid = se->m_ID % SOCKET_IOTHREADS; { boost::mutex::scoped_lock lock(m_EventMutex[tid]); if (se->m_FD == INVALID_SOCKET) return; m_Sockets[tid].erase(se->m_FD); m_FDChanged[tid] = true; epoll_ctl(m_PollFDs[tid], EPOLL_CTL_DEL, se->m_FD, nullptr); se->m_FD = INVALID_SOCKET; se->m_Events = false; } WakeUpThread(tid, true); } void SocketEventEngineEpoll::ChangeEvents(SocketEvents *se, int events) { if (se->m_FD == INVALID_SOCKET) BOOST_THROW_EXCEPTION(std::runtime_error("Tried to read/write from a closed socket.")); int tid = se->m_ID % SOCKET_IOTHREADS; { boost::mutex::scoped_lock lock(m_EventMutex[tid]); auto it = m_Sockets[tid].find(se->m_FD); if (it == m_Sockets[tid].end()) return; epoll_event event; memset(&event, 0, sizeof(event)); event.data.fd = se->m_FD; event.events = SocketEventEngineEpoll::PollToEpoll(events); epoll_ctl(m_PollFDs[tid], EPOLL_CTL_MOD, se->m_FD, &event); } } #endif /* __linux__ */