]> granicus.if.org Git - icinga2/blob - lib/base/socketevents.cpp
Fix incorrect variable in Url::ParsePort
[icinga2] / lib / base / socketevents.cpp
1 /******************************************************************************
2  * Icinga 2                                                                   *
3  * Copyright (C) 2012-2015 Icinga Development Team (http://www.icinga.org)    *
4  *                                                                            *
5  * This program is free software; you can redistribute it and/or              *
6  * modify it under the terms of the GNU General Public License                *
7  * as published by the Free Software Foundation; either version 2             *
8  * of the License, or (at your option) any later version.                     *
9  *                                                                            *
10  * This program is distributed in the hope that it will be useful,            *
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of             *
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the              *
13  * GNU General Public License for more details.                               *
14  *                                                                            *
15  * You should have received a copy of the GNU General Public License          *
16  * along with this program; if not, write to the Free Software Foundation     *
17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA.             *
18  ******************************************************************************/
19
20 #include "base/socketevents.hpp"
21 #include "base/exception.hpp"
22 #include "base/logger.hpp"
23 #include <boost/thread/once.hpp>
24 #include <boost/foreach.hpp>
25 #include <map>
26
27 #ifndef _WIN32
28 #       include <poll.h>
29 #endif /* _WIN32 */
30
31 using namespace icinga;
32
33 struct SocketEventDescriptor
34 {
35         int Events;
36         SocketEvents *EventInterface;
37         Object *LifesupportObject;
38
39         SocketEventDescriptor(void)
40                 : Events(0), EventInterface(NULL), LifesupportObject(NULL)
41         { }
42 };
43
44 static boost::thread l_SocketIOThread;
45 static boost::once_flag l_SocketIOOnceFlag = BOOST_ONCE_INIT;
46 static SOCKET l_SocketIOEventFDs[2];
47 static boost::mutex l_SocketIOMutex;
48 static boost::condition_variable l_SocketIOCV;
49 static bool l_SocketIOFDChanged;
50 static std::map<SOCKET, SocketEventDescriptor> l_SocketIOSockets;
51
52 void SocketEvents::InitializeThread(void)
53 {
54         Socket::SocketPair(l_SocketIOEventFDs);
55
56         Utility::SetNonBlockingSocket(l_SocketIOEventFDs[0]);
57         Utility::SetNonBlockingSocket(l_SocketIOEventFDs[1]);
58
59 #ifndef _WIN32
60         Utility::SetCloExec(l_SocketIOEventFDs[0]);
61         Utility::SetCloExec(l_SocketIOEventFDs[1]);
62 #endif /* _WIN32 */
63
64         SocketEventDescriptor sed;
65         sed.Events = POLLIN;
66
67         l_SocketIOSockets[l_SocketIOEventFDs[0]] = sed;
68
69         l_SocketIOThread = boost::thread(&SocketEvents::ThreadProc);
70 }
71
72 void SocketEvents::ThreadProc(void)
73 {
74         Utility::SetThreadName("SocketIO");
75
76         for (;;) {
77                 pollfd *pfds;
78                 int pfdcount;
79
80                 typedef std::map<SOCKET, SocketEventDescriptor>::value_type SocketDesc;
81
82                 {
83                         boost::mutex::scoped_lock lock(l_SocketIOMutex);
84
85                         pfdcount = l_SocketIOSockets.size();
86                         pfds  = new pollfd[pfdcount];
87
88                         int i = 0;
89
90                         BOOST_FOREACH(const SocketDesc& desc, l_SocketIOSockets) {
91                                 pfds[i].fd = desc.first;
92                                 pfds[i].events = desc.second.Events;
93                                 pfds[i].revents = 0;
94
95                                 i++;
96                         }
97                 }
98
99 #ifdef _WIN32
100                 (void) WSAPoll(pfds, pfdcount, -1);
101 #else /* _WIN32 */
102                 (void) poll(pfds, pfdcount, -1);
103 #endif /* _WIN32 */
104
105                 {
106                         boost::mutex::scoped_lock lock(l_SocketIOMutex);
107
108                         if (l_SocketIOFDChanged) {
109                                 l_SocketIOFDChanged = false;
110                                 l_SocketIOCV.notify_all();
111                                 delete [] pfds;
112                                 continue;
113                         }
114                 }
115
116                 for (int i = 0; i < pfdcount; i++) {
117                         if ((pfds[i].revents & (POLLIN | POLLOUT | POLLHUP | POLLERR)) == 0)
118                                 continue;
119
120                         if (pfds[i].fd == l_SocketIOEventFDs[0]) {
121                                 char buffer[512];
122                                 if (recv(l_SocketIOEventFDs[0], buffer, sizeof(buffer), 0) < 0)
123                                         Log(LogCritical, "SocketEvents", "Read from event FD failed.");
124
125                                 continue;
126                         }
127
128                         SocketEventDescriptor desc;
129                         Object::Ptr ltref;
130
131                         {
132                                 boost::mutex::scoped_lock lock(l_SocketIOMutex);
133
134                                 std::map<SOCKET, SocketEventDescriptor>::const_iterator it = l_SocketIOSockets.find(pfds[i].fd);
135
136                                 if (it == l_SocketIOSockets.end())
137                                         continue;
138
139                                 desc = it->second;
140
141                                 /* We must hold a ref-counted reference to the event object to keep it alive. */
142                                 ltref = desc.LifesupportObject;
143                                 VERIFY(ltref);
144                         }
145
146                         try {
147                                 desc.EventInterface->OnEvent(pfds[i].revents);
148                         } catch (const std::exception& ex) {
149                                 Log(LogCritical, "SocketEvents")
150                                     << "Exception thrown in socket I/O handler:\n"
151                                     << DiagnosticInformation(ex);
152                         } catch (...) {
153                                 Log(LogCritical, "SocketEvents", "Exception of unknown type thrown in socket I/O handler.");
154                         }
155                 }
156
157                 delete [] pfds;
158         }
159 }
160
161 void SocketEvents::WakeUpThread(bool wait)
162 {
163         if (wait) {
164                 if (boost::this_thread::get_id() != l_SocketIOThread.get_id()) {
165                         boost::mutex::scoped_lock lock(l_SocketIOMutex);
166
167                         l_SocketIOFDChanged = true;
168
169                         (void) send(l_SocketIOEventFDs[1], "T", 1, 0);
170
171                         while (l_SocketIOFDChanged)
172                                 l_SocketIOCV.wait(lock);
173                 }
174         } else {
175                 (void) send(l_SocketIOEventFDs[1], "T", 1, 0);
176         }
177 }
178
179 /**
180  * Constructor for the SocketEvents class.
181  */
182 SocketEvents::SocketEvents(const Socket::Ptr& socket, Object *lifesupportObject)
183         : m_FD(socket->GetFD())
184 {
185         boost::call_once(l_SocketIOOnceFlag, &SocketEvents::InitializeThread);
186
187         Register(lifesupportObject);
188 }
189
190 SocketEvents::~SocketEvents(void)
191 {
192         VERIFY(m_FD == INVALID_SOCKET);
193 }
194
195 void SocketEvents::Register(Object *lifesupportObject)
196 {
197         boost::mutex::scoped_lock lock(l_SocketIOMutex);
198
199         VERIFY(m_FD != INVALID_SOCKET);
200
201         SocketEventDescriptor desc;
202         desc.Events = 0;
203         desc.EventInterface = this;
204         desc.LifesupportObject = lifesupportObject;
205
206         VERIFY(l_SocketIOSockets.find(m_FD) == l_SocketIOSockets.end());
207
208         l_SocketIOSockets[m_FD] = desc;
209
210         m_Events = true;
211
212         /* There's no need to wake up the I/O thread here. */
213 }
214
215 void SocketEvents::Unregister(void)
216 {
217         {
218                 boost::mutex::scoped_lock lock(l_SocketIOMutex);
219
220                 if (m_FD == INVALID_SOCKET)
221                         return;
222
223                 l_SocketIOSockets.erase(m_FD);
224                 m_FD = INVALID_SOCKET;
225
226                 m_Events = false;
227         }
228
229         WakeUpThread(true);
230 }
231
232 void SocketEvents::ChangeEvents(int events)
233 {
234         {
235                 boost::mutex::scoped_lock lock(l_SocketIOMutex);
236
237                 if (m_FD == INVALID_SOCKET)
238                         BOOST_THROW_EXCEPTION(std::runtime_error("Tried to read/write from a closed socket."));
239
240                 std::map<SOCKET, SocketEventDescriptor>::iterator it = l_SocketIOSockets.find(m_FD);
241
242                 if (it == l_SocketIOSockets.end())
243                         return;
244
245                 it->second.Events = events;
246         }
247
248         WakeUpThread();
249 }
250
251 bool SocketEvents::IsHandlingEvents(void) const
252 {
253         boost::mutex::scoped_lock lock(l_SocketIOMutex);
254         return m_Events;
255 }
256
257 void SocketEvents::OnEvent(int revents)
258 {
259
260 }
261