void Process::WorkerThreadProc(int taskFd)
{
map<int, Process::Ptr> tasks;
+ pollfd *pfds;
for (;;) {
map<int, Process::Ptr>::iterator it, prev;
#ifndef _MSC_VER
- fd_set readfds;
- int nfds = 0;
+ pfds = (pollfd *)realloc(pfds, (1 + tasks.size()) * sizeof(pollfd));
- FD_ZERO(&readfds);
+ if (pfds == NULL)
+ BOOST_THROW_EXCEPTION(PosixException("realloc() failed.", errno));
- if (tasks.size() < MaxTasksPerThread)
- FD_SET(taskFd, &readfds);
+ int idx = 0;
- if (taskFd > nfds)
- nfds = taskFd;
+ if (tasks.size() < MaxTasksPerThread) {
+ pfds[idx].fd = taskFd;
+ pfds[idx].events = POLLIN;
+ idx++;
+ }
int fd;
BOOST_FOREACH(tie(fd, tuples::ignore), tasks) {
- if (fd > nfds)
- nfds = fd;
-
- FD_SET(fd, &readfds);
+ pfds[idx].fd = fd;
+ pfds[idx].events = POLLIN;
+ idx++;
}
- int rc = select(nfds + 1, &readfds, NULL, NULL, NULL);
+ int rc = poll(pfds, idx, -1);
if (rc < 0 && errno != EINTR)
- BOOST_THROW_EXCEPTION(PosixException("select() failed.", errno));
+ BOOST_THROW_EXCEPTION(PosixException("poll() failed.", errno));
if (rc == 0)
continue;
#endif /* _MSC_VER */
#ifndef _MSC_VER
- if (FD_ISSET(taskFd, &readfds)) {
+ for (int i = 0; i < idx; i++) {
+ if ((pfds[i].revents & POLLIN) == 0)
+ continue;
+
+ if (pfds[i].fd == taskFd) {
#endif /* _MSC_VER */
- while (tasks.size() < MaxTasksPerThread) {
- Process::Ptr task;
+ while (tasks.size() < MaxTasksPerThread) {
+ Process::Ptr task;
- {
- boost::mutex::scoped_lock lock(m_Mutex);
+ {
+ boost::mutex::scoped_lock lock(m_Mutex);
- /* Read one byte for every task we take from the pending tasks list. */
- char buffer;
- int rc = read(taskFd, &buffer, sizeof(buffer));
+#ifndef _MSC_VER
+ /* Read one byte for every task we take from the pending tasks list. */
+ char buffer;
+ int rc = read(taskFd, &buffer, sizeof(buffer));
- if (rc < 0) {
- if (errno == EAGAIN)
- break; /* Someone else was faster and took our task. */
+ if (rc < 0) {
+ if (errno == EAGAIN)
+ break; /* Someone else was faster and took our task. */
- BOOST_THROW_EXCEPTION(PosixException("read() failed.", errno));
- }
+ BOOST_THROW_EXCEPTION(PosixException("read() failed.", errno));
+ }
- assert(!m_Tasks.empty());
+ assert(!m_Tasks.empty());
+#else /* _MSC_VER */
+ if (m_Tasks.empty())
+ break;
+#endif /* _MSC_VER */
- task = m_Tasks.front();
- m_Tasks.pop_front();
- }
+ task = m_Tasks.front();
+ m_Tasks.pop_front();
+ }
- try {
- task->InitTask();
+ try {
+ task->InitTask();
- int fd = fileno(task->m_FP);
- if (fd >= 0)
- tasks[fd] = task;
- } catch (...) {
- Event::Post(boost::bind(&Process::FinishException, task, boost::current_exception()));
+ int fd = fileno(task->m_FP);
+ if (fd >= 0)
+ tasks[fd] = task;
+ } catch (...) {
+ Event::Post(boost::bind(&Process::FinishException, task, boost::current_exception()));
+ }
}
- }
#ifndef _MSC_VER
- }
-#endif /* _MSC_VER */
- for (it = tasks.begin(); it != tasks.end(); ) {
- int fd = it->first;
- Process::Ptr task = it->second;
-
-#ifndef _MSC_VER
- if (!FD_ISSET(fd, &readfds)) {
- it++;
continue;
}
+
+ it = tasks.find(pfds[i].fd);
+
+ if (it == tasks.end())
+ continue;
+#else /* _MSC_VER */
+ for (it = tasks.begin(); it != tasks.end(); ) {
+ int fd = it->first;
#endif /* _MSC_VER */
+ Process::Ptr task = it->second;
- if (!task->RunTask()) {
- prev = it;
- it++;
- tasks.erase(prev);
+ if (!task->RunTask()) {
+ prev = it;
+#ifdef _MSC_VER
+ it++;
+#endif /* _MSC_VER */
+ tasks.erase(prev);
- Event::Post(boost::bind(&Process::FinishResult, task, task->m_Result));
- } else {
- it++;
+ Event::Post(boost::bind(&Process::FinishResult, task, task->m_Result));
+#ifdef _MSC_VER
+ } else {
+ it++;
+#endif /* _MSC_VER */
+ }
+#ifdef _MSC_VER
}
+#else /* _MSC_VER */
}
+#endif /* _MSC_VER */
}
}