1 /******************************************************************************
3 * Copyright (C) 2012-2015 Icinga Development Team (http://www.icinga.org) *
5 * This program is free software; you can redistribute it and/or *
6 * modify it under the terms of the GNU General Public License *
7 * as published by the Free Software Foundation; either version 2 *
8 * of the License, or (at your option) any later version. *
10 * This program is distributed in the hope that it will be useful, *
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
13 * GNU General Public License for more details. *
15 * You should have received a copy of the GNU General Public License *
16 * along with this program; if not, write to the Free Software Foundation *
17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
18 ******************************************************************************/
20 #include "base/socketevents.hpp"
21 #include "base/exception.hpp"
22 #include "base/logger.hpp"
23 #include <boost/thread/once.hpp>
24 #include <boost/foreach.hpp>
31 using namespace icinga;
33 struct SocketEventDescriptor
36 SocketEvents *EventInterface;
37 Object *LifesupportObject;
39 SocketEventDescriptor(void)
40 : Events(0), EventInterface(NULL), LifesupportObject(NULL)
44 static boost::thread l_SocketIOThread;
45 static boost::once_flag l_SocketIOOnceFlag = BOOST_ONCE_INIT;
46 static SOCKET l_SocketIOEventFDs[2];
47 static boost::mutex l_SocketIOMutex;
48 static boost::condition_variable l_SocketIOCV;
49 static bool l_SocketIOFDChanged;
50 static std::map<SOCKET, SocketEventDescriptor> l_SocketIOSockets;
52 void SocketEvents::InitializeThread(void)
54 Socket::SocketPair(l_SocketIOEventFDs);
56 Utility::SetNonBlockingSocket(l_SocketIOEventFDs[0]);
57 Utility::SetNonBlockingSocket(l_SocketIOEventFDs[1]);
60 Utility::SetCloExec(l_SocketIOEventFDs[0]);
61 Utility::SetCloExec(l_SocketIOEventFDs[1]);
64 SocketEventDescriptor sed;
67 l_SocketIOSockets[l_SocketIOEventFDs[0]] = sed;
69 l_SocketIOThread = boost::thread(&SocketEvents::ThreadProc);
72 void SocketEvents::ThreadProc(void)
74 Utility::SetThreadName("SocketIO");
80 typedef std::map<SOCKET, SocketEventDescriptor>::value_type SocketDesc;
83 boost::mutex::scoped_lock lock(l_SocketIOMutex);
85 pfdcount = l_SocketIOSockets.size();
86 pfds = new pollfd[pfdcount];
90 BOOST_FOREACH(const SocketDesc& desc, l_SocketIOSockets) {
91 pfds[i].fd = desc.first;
92 pfds[i].events = desc.second.Events;
100 (void) WSAPoll(pfds, pfdcount, -1);
102 (void) poll(pfds, pfdcount, -1);
106 boost::mutex::scoped_lock lock(l_SocketIOMutex);
108 if (l_SocketIOFDChanged) {
109 l_SocketIOFDChanged = false;
110 l_SocketIOCV.notify_all();
116 for (int i = 0; i < pfdcount; i++) {
117 if ((pfds[i].revents & (POLLIN | POLLOUT | POLLHUP | POLLERR)) == 0)
120 if (pfds[i].fd == l_SocketIOEventFDs[0]) {
122 if (recv(l_SocketIOEventFDs[0], buffer, sizeof(buffer), 0) < 0)
123 Log(LogCritical, "SocketEvents", "Read from event FD failed.");
128 SocketEventDescriptor desc;
132 boost::mutex::scoped_lock lock(l_SocketIOMutex);
134 std::map<SOCKET, SocketEventDescriptor>::const_iterator it = l_SocketIOSockets.find(pfds[i].fd);
136 if (it == l_SocketIOSockets.end())
141 /* We must hold a ref-counted reference to the event object to keep it alive. */
142 ltref = desc.LifesupportObject;
147 desc.EventInterface->OnEvent(pfds[i].revents);
148 } catch (const std::exception& ex) {
149 Log(LogCritical, "SocketEvents")
150 << "Exception thrown in socket I/O handler:\n"
151 << DiagnosticInformation(ex);
153 Log(LogCritical, "SocketEvents", "Exception of unknown type thrown in socket I/O handler.");
161 void SocketEvents::WakeUpThread(bool wait)
164 if (boost::this_thread::get_id() != l_SocketIOThread.get_id()) {
165 boost::mutex::scoped_lock lock(l_SocketIOMutex);
167 l_SocketIOFDChanged = true;
169 (void) send(l_SocketIOEventFDs[1], "T", 1, 0);
171 while (l_SocketIOFDChanged)
172 l_SocketIOCV.wait(lock);
175 (void) send(l_SocketIOEventFDs[1], "T", 1, 0);
180 * Constructor for the SocketEvents class.
182 SocketEvents::SocketEvents(const Socket::Ptr& socket, Object *lifesupportObject)
183 : m_FD(socket->GetFD())
185 boost::call_once(l_SocketIOOnceFlag, &SocketEvents::InitializeThread);
187 Register(lifesupportObject);
190 SocketEvents::~SocketEvents(void)
192 VERIFY(m_FD == INVALID_SOCKET);
195 void SocketEvents::Register(Object *lifesupportObject)
197 boost::mutex::scoped_lock lock(l_SocketIOMutex);
199 VERIFY(m_FD != INVALID_SOCKET);
201 SocketEventDescriptor desc;
203 desc.EventInterface = this;
204 desc.LifesupportObject = lifesupportObject;
206 VERIFY(l_SocketIOSockets.find(m_FD) == l_SocketIOSockets.end());
208 l_SocketIOSockets[m_FD] = desc;
212 /* There's no need to wake up the I/O thread here. */
215 void SocketEvents::Unregister(void)
218 boost::mutex::scoped_lock lock(l_SocketIOMutex);
220 if (m_FD == INVALID_SOCKET)
223 l_SocketIOSockets.erase(m_FD);
224 m_FD = INVALID_SOCKET;
232 void SocketEvents::ChangeEvents(int events)
235 boost::mutex::scoped_lock lock(l_SocketIOMutex);
237 if (m_FD == INVALID_SOCKET)
238 BOOST_THROW_EXCEPTION(std::runtime_error("Tried to read/write from a closed socket."));
240 std::map<SOCKET, SocketEventDescriptor>::iterator it = l_SocketIOSockets.find(m_FD);
242 if (it == l_SocketIOSockets.end())
245 it->second.Events = events;
251 bool SocketEvents::IsHandlingEvents(void) const
253 boost::mutex::scoped_lock lock(l_SocketIOMutex);
257 void SocketEvents::OnEvent(int revents)