]> granicus.if.org Git - icinga2/blob - lib/base/socketevents-poll.cpp
Merge pull request #7185 from Icinga/bugfix/gelfwriter-wrong-log-facility
[icinga2] / lib / base / socketevents-poll.cpp
1 /* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */
2
3 #include "base/socketevents.hpp"
4 #include "base/exception.hpp"
5 #include "base/logger.hpp"
6 #include "base/utility.hpp"
7 #include <boost/thread/once.hpp>
8 #include <map>
9
10 using namespace icinga;
11
12 void SocketEventEnginePoll::InitializeThread(int tid)
13 {
14         SocketEventDescriptor sed;
15         sed.Events = POLLIN;
16
17         m_Sockets[tid][m_EventFDs[tid][0]] = sed;
18         m_FDChanged[tid] = true;
19 }
20
21 void SocketEventEnginePoll::ThreadProc(int tid)
22 {
23         Utility::SetThreadName("SocketIO");
24
25         std::vector<pollfd> pfds;
26         std::vector<SocketEventDescriptor> descriptors;
27
28         for (;;) {
29                 {
30                         boost::mutex::scoped_lock lock(m_EventMutex[tid]);
31
32                         if (m_FDChanged[tid]) {
33                                 pfds.resize(m_Sockets[tid].size());
34                                 descriptors.resize(m_Sockets[tid].size());
35
36                                 int i = 0;
37
38                                 typedef std::map<SOCKET, SocketEventDescriptor>::value_type kv_pair;
39
40                                 for (const kv_pair& desc : m_Sockets[tid]) {
41                                         if (desc.second.Events == 0)
42                                                 continue;
43
44                                         int events = desc.second.Events;
45
46                                         if (desc.second.EventInterface) {
47                                                 desc.second.EventInterface->m_EnginePrivate = &pfds[i];
48
49                                                 if (!desc.second.EventInterface->m_Events)
50                                                         events = 0;
51                                         }
52
53                                         pfds[i].fd = desc.first;
54                                         pfds[i].events = events;
55                                         descriptors[i] = desc.second;
56
57                                         i++;
58                                 }
59
60                                 pfds.resize(i);
61
62                                 m_FDChanged[tid] = false;
63                                 m_CV[tid].notify_all();
64                         }
65                 }
66
67                 ASSERT(!pfds.empty());
68
69 #ifdef _WIN32
70                 (void) WSAPoll(&pfds[0], pfds.size(), -1);
71 #else /* _WIN32 */
72                 (void) poll(&pfds[0], pfds.size(), -1);
73 #endif /* _WIN32 */
74
75                 std::vector<EventDescription> events;
76
77                 {
78                         boost::mutex::scoped_lock lock(m_EventMutex[tid]);
79
80                         if (m_FDChanged[tid])
81                                 continue;
82
83                         for (std::vector<pollfd>::size_type i = 0; i < pfds.size(); i++) {
84                                 if ((pfds[i].revents & (POLLIN | POLLOUT | POLLHUP | POLLERR)) == 0)
85                                         continue;
86
87                                 if (pfds[i].fd == m_EventFDs[tid][0]) {
88                                         char buffer[512];
89                                         if (recv(m_EventFDs[tid][0], buffer, sizeof(buffer), 0) < 0)
90                                                 Log(LogCritical, "SocketEvents", "Read from event FD failed.");
91
92                                         continue;
93                                 }
94
95                                 EventDescription event;
96                                 event.REvents = pfds[i].revents;
97                                 event.Descriptor = descriptors[i];
98
99                                 events.emplace_back(std::move(event));
100                         }
101                 }
102
103                 for (const EventDescription& event : events) {
104                         try {
105                                 event.Descriptor.EventInterface->OnEvent(event.REvents);
106                         } catch (const std::exception& ex) {
107                                 Log(LogCritical, "SocketEvents")
108                                         << "Exception thrown in socket I/O handler:\n"
109                                         << DiagnosticInformation(ex);
110                         } catch (...) {
111                                 Log(LogCritical, "SocketEvents", "Exception of unknown type thrown in socket I/O handler.");
112                         }
113                 }
114         }
115 }
116
117 void SocketEventEnginePoll::Register(SocketEvents *se)
118 {
119         int tid = se->m_ID % SOCKET_IOTHREADS;
120
121         {
122                 boost::mutex::scoped_lock lock(m_EventMutex[tid]);
123
124                 VERIFY(se->m_FD != INVALID_SOCKET);
125
126                 SocketEventDescriptor desc;
127                 desc.Events = 0;
128                 desc.EventInterface = se;
129
130                 VERIFY(m_Sockets[tid].find(se->m_FD) == m_Sockets[tid].end());
131
132                 m_Sockets[tid][se->m_FD] = desc;
133
134                 m_FDChanged[tid] = true;
135
136                 se->m_Events = true;
137         }
138
139         WakeUpThread(tid, true);
140 }
141
142 void SocketEventEnginePoll::Unregister(SocketEvents *se)
143 {
144         int tid = se->m_ID % SOCKET_IOTHREADS;
145
146         {
147                 boost::mutex::scoped_lock lock(m_EventMutex[tid]);
148
149                 if (se->m_FD == INVALID_SOCKET)
150                         return;
151
152                 m_Sockets[tid].erase(se->m_FD);
153                 m_FDChanged[tid] = true;
154
155                 se->m_FD = INVALID_SOCKET;
156                 se->m_Events = false;
157         }
158
159         WakeUpThread(tid, true);
160 }
161
162 void SocketEventEnginePoll::ChangeEvents(SocketEvents *se, int events)
163 {
164         if (se->m_FD == INVALID_SOCKET)
165                 BOOST_THROW_EXCEPTION(std::runtime_error("Tried to read/write from a closed socket."));
166
167         int tid = se->m_ID % SOCKET_IOTHREADS;
168
169         {
170                 boost::mutex::scoped_lock lock(m_EventMutex[tid]);
171
172                 auto it = m_Sockets[tid].find(se->m_FD);
173
174                 if (it == m_Sockets[tid].end())
175                         return;
176
177                 if (it->second.Events == events)
178                         return;
179
180                 it->second.Events = events;
181
182                 if (se->m_EnginePrivate && std::this_thread::get_id() == m_Threads[tid].get_id())
183                         ((pollfd *)se->m_EnginePrivate)->events = events;
184                 else
185                         m_FDChanged[tid] = true;
186         }
187
188         WakeUpThread(tid, false);
189 }
190