1 /* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */
3 #include "base/socketevents.hpp"
4 #include "base/exception.hpp"
5 #include "base/logger.hpp"
6 #include "base/utility.hpp"
7 #include <boost/thread/once.hpp>
10 # include <sys/epoll.h>
12 using namespace icinga;
14 void SocketEventEngineEpoll::InitializeThread(int tid)
16 m_PollFDs[tid] = epoll_create(128);
17 Utility::SetCloExec(m_PollFDs[tid]);
19 SocketEventDescriptor sed;
21 m_Sockets[tid][m_EventFDs[tid][0]] = sed;
22 m_FDChanged[tid] = true;
25 memset(&event, 0, sizeof(event));
26 event.data.fd = m_EventFDs[tid][0];
27 event.events = EPOLLIN;
28 epoll_ctl(m_PollFDs[tid], EPOLL_CTL_ADD, m_EventFDs[tid][0], &event);
31 int SocketEventEngineEpoll::PollToEpoll(int events)
44 int SocketEventEngineEpoll::EpollToPoll(int events)
51 if (events & EPOLLOUT)
57 void SocketEventEngineEpoll::ThreadProc(int tid)
59 Utility::SetThreadName("SocketIO");
63 boost::mutex::scoped_lock lock(m_EventMutex[tid]);
65 if (m_FDChanged[tid]) {
66 m_FDChanged[tid] = false;
67 m_CV[tid].notify_all();
71 epoll_event pevents[64];
72 int ready = epoll_wait(m_PollFDs[tid], pevents, sizeof(pevents) / sizeof(pevents[0]), -1);
74 std::vector<EventDescription> events;
77 boost::mutex::scoped_lock lock(m_EventMutex[tid]);
79 if (m_FDChanged[tid]) {
80 m_FDChanged[tid] = false;
85 for (int i = 0; i < ready; i++) {
86 if (pevents[i].data.fd == m_EventFDs[tid][0]) {
88 if (recv(m_EventFDs[tid][0], buffer, sizeof(buffer), 0) < 0)
89 Log(LogCritical, "SocketEvents", "Read from event FD failed.");
94 if ((pevents[i].events & (EPOLLIN | EPOLLOUT | EPOLLHUP | EPOLLERR)) == 0)
97 EventDescription event;
98 event.REvents = SocketEventEngineEpoll::EpollToPoll(pevents[i].events);
99 event.Descriptor = m_Sockets[tid][pevents[i].data.fd];
101 events.emplace_back(std::move(event));
105 for (const EventDescription& event : events) {
107 event.Descriptor.EventInterface->OnEvent(event.REvents);
108 } catch (const std::exception& ex) {
109 Log(LogCritical, "SocketEvents")
110 << "Exception thrown in socket I/O handler:\n"
111 << DiagnosticInformation(ex);
113 Log(LogCritical, "SocketEvents", "Exception of unknown type thrown in socket I/O handler.");
119 void SocketEventEngineEpoll::Register(SocketEvents *se)
121 int tid = se->m_ID % SOCKET_IOTHREADS;
124 boost::mutex::scoped_lock lock(m_EventMutex[tid]);
126 VERIFY(se->m_FD != INVALID_SOCKET);
128 SocketEventDescriptor desc;
129 desc.EventInterface = se;
131 VERIFY(m_Sockets[tid].find(se->m_FD) == m_Sockets[tid].end());
133 m_Sockets[tid][se->m_FD] = desc;
136 memset(&event, 0, sizeof(event));
137 event.data.fd = se->m_FD;
139 epoll_ctl(m_PollFDs[tid], EPOLL_CTL_ADD, se->m_FD, &event);
145 void SocketEventEngineEpoll::Unregister(SocketEvents *se)
147 int tid = se->m_ID % SOCKET_IOTHREADS;
150 boost::mutex::scoped_lock lock(m_EventMutex[tid]);
152 if (se->m_FD == INVALID_SOCKET)
155 m_Sockets[tid].erase(se->m_FD);
156 m_FDChanged[tid] = true;
158 epoll_ctl(m_PollFDs[tid], EPOLL_CTL_DEL, se->m_FD, nullptr);
160 se->m_FD = INVALID_SOCKET;
161 se->m_Events = false;
164 WakeUpThread(tid, true);
167 void SocketEventEngineEpoll::ChangeEvents(SocketEvents *se, int events)
169 if (se->m_FD == INVALID_SOCKET)
170 BOOST_THROW_EXCEPTION(std::runtime_error("Tried to read/write from a closed socket."));
172 int tid = se->m_ID % SOCKET_IOTHREADS;
175 boost::mutex::scoped_lock lock(m_EventMutex[tid]);
177 auto it = m_Sockets[tid].find(se->m_FD);
179 if (it == m_Sockets[tid].end())
183 memset(&event, 0, sizeof(event));
184 event.data.fd = se->m_FD;
185 event.events = SocketEventEngineEpoll::PollToEpoll(events);
186 epoll_ctl(m_PollFDs[tid], EPOLL_CTL_MOD, se->m_FD, &event);
189 #endif /* __linux__ */