]> granicus.if.org Git - icinga2/blob - lib/base/socketevents-epoll.cpp
Merge pull request #7185 from Icinga/bugfix/gelfwriter-wrong-log-facility
[icinga2] / lib / base / socketevents-epoll.cpp
1 /* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */
2
3 #include "base/socketevents.hpp"
4 #include "base/exception.hpp"
5 #include "base/logger.hpp"
6 #include "base/utility.hpp"
7 #include <boost/thread/once.hpp>
8 #include <map>
9 #ifdef __linux__
10 #       include <sys/epoll.h>
11
12 using namespace icinga;
13
14 void SocketEventEngineEpoll::InitializeThread(int tid)
15 {
16         m_PollFDs[tid] = epoll_create(128);
17         Utility::SetCloExec(m_PollFDs[tid]);
18
19         SocketEventDescriptor sed;
20
21         m_Sockets[tid][m_EventFDs[tid][0]] = sed;
22         m_FDChanged[tid] = true;
23
24         epoll_event event;
25         memset(&event, 0, sizeof(event));
26         event.data.fd = m_EventFDs[tid][0];
27         event.events = EPOLLIN;
28         epoll_ctl(m_PollFDs[tid], EPOLL_CTL_ADD, m_EventFDs[tid][0], &event);
29 }
30
31 int SocketEventEngineEpoll::PollToEpoll(int events)
32 {
33         int result = 0;
34
35         if (events & POLLIN)
36                 result |= EPOLLIN;
37
38         if (events & POLLOUT)
39                 result |= EPOLLOUT;
40
41         return events;
42 }
43
44 int SocketEventEngineEpoll::EpollToPoll(int events)
45 {
46         int result = 0;
47
48         if (events & EPOLLIN)
49                 result |= POLLIN;
50
51         if (events & EPOLLOUT)
52                 result |= POLLOUT;
53
54         return events;
55 }
56
57 void SocketEventEngineEpoll::ThreadProc(int tid)
58 {
59         Utility::SetThreadName("SocketIO");
60
61         for (;;) {
62                 {
63                         boost::mutex::scoped_lock lock(m_EventMutex[tid]);
64
65                         if (m_FDChanged[tid]) {
66                                 m_FDChanged[tid] = false;
67                                 m_CV[tid].notify_all();
68                         }
69                 }
70
71                 epoll_event pevents[64];
72                 int ready = epoll_wait(m_PollFDs[tid], pevents, sizeof(pevents) / sizeof(pevents[0]), -1);
73
74                 std::vector<EventDescription> events;
75
76                 {
77                         boost::mutex::scoped_lock lock(m_EventMutex[tid]);
78
79                         if (m_FDChanged[tid]) {
80                                 m_FDChanged[tid] = false;
81
82                                 continue;
83                         }
84
85                         for (int i = 0; i < ready; i++) {
86                                 if (pevents[i].data.fd == m_EventFDs[tid][0]) {
87                                         char buffer[512];
88                                         if (recv(m_EventFDs[tid][0], buffer, sizeof(buffer), 0) < 0)
89                                                 Log(LogCritical, "SocketEvents", "Read from event FD failed.");
90
91                                         continue;
92                                 }
93
94                                 if ((pevents[i].events & (EPOLLIN | EPOLLOUT | EPOLLHUP | EPOLLERR)) == 0)
95                                         continue;
96
97                                 EventDescription event;
98                                 event.REvents = SocketEventEngineEpoll::EpollToPoll(pevents[i].events);
99                                 event.Descriptor = m_Sockets[tid][pevents[i].data.fd];
100
101                                 events.emplace_back(std::move(event));
102                         }
103                 }
104
105                 for (const EventDescription& event : events) {
106                         try {
107                                 event.Descriptor.EventInterface->OnEvent(event.REvents);
108                         } catch (const std::exception& ex) {
109                                 Log(LogCritical, "SocketEvents")
110                                         << "Exception thrown in socket I/O handler:\n"
111                                         << DiagnosticInformation(ex);
112                         } catch (...) {
113                                 Log(LogCritical, "SocketEvents", "Exception of unknown type thrown in socket I/O handler.");
114                         }
115                 }
116         }
117 }
118
119 void SocketEventEngineEpoll::Register(SocketEvents *se)
120 {
121         int tid = se->m_ID % SOCKET_IOTHREADS;
122
123         {
124                 boost::mutex::scoped_lock lock(m_EventMutex[tid]);
125
126                 VERIFY(se->m_FD != INVALID_SOCKET);
127
128                 SocketEventDescriptor desc;
129                 desc.EventInterface = se;
130
131                 VERIFY(m_Sockets[tid].find(se->m_FD) == m_Sockets[tid].end());
132
133                 m_Sockets[tid][se->m_FD] = desc;
134
135                 epoll_event event;
136                 memset(&event, 0, sizeof(event));
137                 event.data.fd = se->m_FD;
138                 event.events = 0;
139                 epoll_ctl(m_PollFDs[tid], EPOLL_CTL_ADD, se->m_FD, &event);
140
141                 se->m_Events = true;
142         }
143 }
144
145 void SocketEventEngineEpoll::Unregister(SocketEvents *se)
146 {
147         int tid = se->m_ID % SOCKET_IOTHREADS;
148
149         {
150                 boost::mutex::scoped_lock lock(m_EventMutex[tid]);
151
152                 if (se->m_FD == INVALID_SOCKET)
153                         return;
154
155                 m_Sockets[tid].erase(se->m_FD);
156                 m_FDChanged[tid] = true;
157
158                 epoll_ctl(m_PollFDs[tid], EPOLL_CTL_DEL, se->m_FD, nullptr);
159
160                 se->m_FD = INVALID_SOCKET;
161                 se->m_Events = false;
162         }
163
164         WakeUpThread(tid, true);
165 }
166
167 void SocketEventEngineEpoll::ChangeEvents(SocketEvents *se, int events)
168 {
169         if (se->m_FD == INVALID_SOCKET)
170                 BOOST_THROW_EXCEPTION(std::runtime_error("Tried to read/write from a closed socket."));
171
172         int tid = se->m_ID % SOCKET_IOTHREADS;
173
174         {
175                 boost::mutex::scoped_lock lock(m_EventMutex[tid]);
176
177                 auto it = m_Sockets[tid].find(se->m_FD);
178
179                 if (it == m_Sockets[tid].end())
180                         return;
181
182                 epoll_event event;
183                 memset(&event, 0, sizeof(event));
184                 event.data.fd = se->m_FD;
185                 event.events = SocketEventEngineEpoll::PollToEpoll(events);
186                 epoll_ctl(m_PollFDs[tid], EPOLL_CTL_MOD, se->m_FD, &event);
187         }
188 }
189 #endif /* __linux__ */