#define environ (*_NSGetEnviron())
#endif /* __APPLE__ */
-static boost::mutex l_ProcessMutex;
-static std::map<int, Process::Ptr> l_Processes;
-static int l_EventFDs[2];
+#define IOTHREADS 8
+
+static boost::mutex l_ProcessMutex[IOTHREADS];
+static std::map<int, Process::Ptr> l_Processes[IOTHREADS];
+static int l_EventFDs[IOTHREADS][2];
static boost::once_flag l_OnceFlag = BOOST_ONCE_INIT;
INITIALIZE_ONCE(&Process::StaticInitialize);
void Process::StaticInitialize(void)
{
-#ifdef HAVE_PIPE2
- if (pipe2(l_EventFDs, O_CLOEXEC) < 0) {
- BOOST_THROW_EXCEPTION(posix_error()
- << boost::errinfo_api_function("pipe2")
- << boost::errinfo_errno(errno));
- }
+ for (int tid = 0; tid < IOTHREADS; tid++) {
+ #ifdef HAVE_PIPE2
+ if (pipe2(l_EventFDs[tid], O_CLOEXEC) < 0) {
+ BOOST_THROW_EXCEPTION(posix_error()
+ << boost::errinfo_api_function("pipe2")
+ << boost::errinfo_errno(errno));
+ }
#else /* HAVE_PIPE2 */
- if (pipe(l_EventFDs) < 0) {
- BOOST_THROW_EXCEPTION(posix_error()
- << boost::errinfo_api_function("pipe")
- << boost::errinfo_errno(errno));
- }
+ if (pipe(l_EventFDs[tid]) < 0) {
+ BOOST_THROW_EXCEPTION(posix_error()
+ << boost::errinfo_api_function("pipe")
+ << boost::errinfo_errno(errno));
+ }
- Utility::SetCloExec(l_EventFDs[0]);
- Utility::SetCloExec(l_EventFDs[1]);
+ Utility::SetCloExec(l_EventFDs[tid][0]);
+ Utility::SetCloExec(l_EventFDs[tid][1]);
#endif /* HAVE_PIPE2 */
- Utility::SetNonBlocking(l_EventFDs[0]);
- Utility::SetNonBlocking(l_EventFDs[1]);
+ Utility::SetNonBlocking(l_EventFDs[tid][0]);
+ Utility::SetNonBlocking(l_EventFDs[tid][1]);
+ }
}
void Process::ThreadInitialize(void)
{
/* Note to self: Make sure this runs _after_ we've daemonized. */
- boost::thread t(&Process::IOThreadProc);
- t.detach();
+ for (int tid = 0; tid < IOTHREADS; tid++) {
+ boost::thread t(boost::bind(&Process::IOThreadProc, tid));
+ t.detach();
+ }
}
-void Process::IOThreadProc(void)
+void Process::IOThreadProc(int tid)
{
pollfd *pfds = NULL;
int count = 0;
+ Utility::SetThreadName("ProcessIO");
+
for (;;) {
double now, timeout = -1;
now = Utility::GetTime();
{
- boost::mutex::scoped_lock lock(l_ProcessMutex);
+ boost::mutex::scoped_lock lock(l_ProcessMutex[tid]);
- count = 1 + l_Processes.size();
+ count = 1 + l_Processes[tid].size();
pfds = reinterpret_cast<pollfd *>(realloc(pfds, sizeof(pollfd) * count));
- pfds[0].fd = l_EventFDs[0];
+ pfds[0].fd = l_EventFDs[tid][0];
pfds[0].events = POLLIN;
pfds[0].revents = 0;
int i = 1;
std::pair<int, Process::Ptr> kv;
- BOOST_FOREACH(kv, l_Processes) {
+ BOOST_FOREACH(kv, l_Processes[tid]) {
pfds[i].fd = kv.second->m_FD;
pfds[i].events = POLLIN;
pfds[i].revents = 0;
continue;
{
- boost::mutex::scoped_lock lock(l_ProcessMutex);
+ boost::mutex::scoped_lock lock(l_ProcessMutex[tid]);
if (pfds[0].revents & (POLLIN|POLLHUP|POLLERR)) {
char buffer[512];
- (void) read(l_EventFDs[0], buffer, sizeof(buffer));
+ (void) read(l_EventFDs[tid][0], buffer, sizeof(buffer));
}
for (int i = 1; i < count; i++) {
if (pfds[i].revents & (POLLIN|POLLHUP|POLLERR)) {
std::map<int, Process::Ptr>::iterator it;
- it = l_Processes.find(pfds[i].fd);
+ it = l_Processes[tid].find(pfds[i].fd);
- if (it == l_Processes.end())
+ if (it == l_Processes[tid].end())
continue; /* This should never happen. */
if (!it->second->DoEvents()) {
(void) close(it->first);
- l_Processes.erase(it);
+ l_Processes[tid].erase(it);
}
}
}
m_FD = fds[0];
m_Callback = callback;
+ int tid = GetTID();
+
{
- boost::mutex::scoped_lock lock(l_ProcessMutex);
- l_Processes[m_FD] = GetSelf();
+ boost::mutex::scoped_lock lock(l_ProcessMutex[tid]);
+ l_Processes[tid][m_FD] = GetSelf();
}
- (void) write(l_EventFDs[1], "T", 1);
+ (void) write(l_EventFDs[tid][1], "T", 1);
}
bool Process::DoEvents(void)
return false;
}
+int Process::GetTID(void) const
+{
+ return (reinterpret_cast<uintptr_t>(this) / sizeof(void *)) % IOTHREADS;
+}
+
#endif /* _WIN32 */