From f541a62a34e1880d3827057ecba9d8f8ab1a8e64 Mon Sep 17 00:00:00 2001 From: Gunnar Beutner Date: Mon, 10 Aug 2015 13:33:32 +0200 Subject: [PATCH] Fix: Make sure we're continuously reading from child processes' pipes fixes #9867 --- lib/base/process.cpp | 96 ++++++++++++++++++++++++++++++++++++-------- lib/base/process.hpp | 8 ++++ 2 files changed, 87 insertions(+), 17 deletions(-) diff --git a/lib/base/process.cpp b/lib/base/process.cpp index 5f66940a7..d192044a8 100644 --- a/lib/base/process.cpp +++ b/lib/base/process.cpp @@ -61,7 +61,21 @@ INITIALIZE_ONCE(&Process::StaticInitialize); Process::Process(const Process::Arguments& arguments, const Dictionary::Ptr& extraEnvironment) : m_Arguments(arguments), m_ExtraEnvironment(extraEnvironment), m_Timeout(600) -{ } +#ifdef _WIN32 + , m_ReadPending(false), m_ReadFailed(false), m_Overlapped() +#endif /* _WIN32 */ +{ +#ifdef _WIN32 + m_Overlapped.hEvent = CreateEvent(NULL, TRUE, FALSE, NULL); +#endif /* _WIN32 */ +} + +Process::~Process(void) +{ +#ifdef _WIN32 + CloseHandle(m_Overlapped.hEvent); +#endif /* _WIN32 */ +} void Process::StaticInitialize(void) { @@ -152,6 +166,7 @@ void Process::IOThreadProc(int tid) { #ifdef _WIN32 HANDLE *handles = NULL; + HANDLE *fhandles = NULL; #else /* _WIN32 */ pollfd *pfds = NULL; #endif /* _WIN32 */ @@ -171,8 +186,9 @@ void Process::IOThreadProc(int tid) count = 1 + l_Processes[tid].size(); #ifdef _WIN32 handles = reinterpret_cast(realloc(handles, sizeof(HANDLE) * count)); + fhandles = reinterpret_cast(realloc(fhandles, sizeof(HANDLE) * count)); - handles[0] = l_Events[tid]; + fhandles[0] = l_Events[tid]; #else /* _WIN32 */ pfds = reinterpret_cast(realloc(pfds, sizeof(pollfd) * count)); @@ -185,16 +201,29 @@ void Process::IOThreadProc(int tid) int i = 1; std::pair kv; BOOST_FOREACH(kv, l_Processes[tid]) { + const Process::Ptr& process = kv.second; #ifdef _WIN32 handles[i] = kv.first; + + if (!process->m_ReadPending) { + process->m_ReadPending = true; + + BOOL res = ReadFile(process->m_FD, process->m_ReadBuffer, sizeof(process->m_ReadBuffer), 0, &process->m_Overlapped); + if (res || GetLastError() != ERROR_IO_PENDING) { + process->m_ReadFailed = !res; + SetEvent(process->m_Overlapped.hEvent); + } + } + + fhandles[i] = process->m_Overlapped.hEvent; #else /* _WIN32 */ - pfds[i].fd = kv.second->m_FD; + pfds[i].fd = process->m_FD; pfds[i].events = POLLIN; pfds[i].revents = 0; #endif /* _WIN32 */ - if (kv.second->m_Timeout != 0) { - double delta = kv.second->m_Timeout - (now - kv.second->m_Result.ExecutionStart); + if (process->m_Timeout != 0) { + double delta = process->m_Timeout - (now - process->m_Result.ExecutionStart); if (timeout == -1 || delta < timeout) timeout = delta; @@ -210,7 +239,7 @@ void Process::IOThreadProc(int tid) timeout *= 1000; #ifdef _WIN32 - DWORD rc = WaitForMultipleObjects(count, handles, FALSE, timeout == -1 ? INFINITE : static_cast(timeout)); + DWORD rc = WaitForMultipleObjects(count, fhandles, FALSE, timeout == -1 ? INFINITE : static_cast(timeout)); #else /* _WIN32 */ int rc = poll(pfds, count, timeout); @@ -290,6 +319,39 @@ String Process::PrettyPrintArguments(const Process::Arguments& arguments) #endif /* _WIN32 */ } +#ifdef _WIN32 +static BOOL CreatePipeOverlapped(HANDLE *outReadPipe, HANDLE *outWritePipe, + SECURITY_ATTRIBUTES *securityAttributes, DWORD size, DWORD readMode, DWORD writeMode) +{ + static int pipeIndex = 0; + + if (size == 0) + size = 8192; + + pipeIndex++; + + char pipeName[128]; + sprintf(pipeName, "\\\\.\\Pipe\\OverlappedPipe.%d.%d", (int)GetCurrentProcessId(), pipeIndex); + + *outReadPipe = CreateNamedPipe(pipeName, PIPE_ACCESS_INBOUND | readMode, + PIPE_TYPE_BYTE | PIPE_WAIT, 1, size, size, 60 * 1000, securityAttributes); + + if (!*outReadPipe) + return FALSE; + + *outWritePipe = CreateFile(pipeName, GENERIC_WRITE, 0, securityAttributes, OPEN_EXISTING, FILE_ATTRIBUTE_NORMAL | writeMode, NULL); + + if (*outWritePipe == INVALID_HANDLE_VALUE) { + DWORD error = GetLastError(); + CloseHandle(*outReadPipe); + SetLastError(error); + return FALSE; + } + + return TRUE; +} +#endif /* _WIN32 */ + void Process::Run(const boost::function& callback) { boost::call_once(l_OnceFlag, &Process::ThreadInitialize); @@ -302,7 +364,7 @@ void Process::Run(const boost::function& callback) sa.bInheritHandle = TRUE; HANDLE outReadPipe, outWritePipe; - if (!CreatePipe(&outReadPipe, &outWritePipe, &sa, 0)) + if (!CreatePipeOverlapped(&outReadPipe, &outWritePipe, &sa, 0, FILE_FLAG_OVERLAPPED, 0)) BOOST_THROW_EXCEPTION(win32_error() << boost::errinfo_api_function("CreatePipe") << errinfo_win32_error(GetLastError())); @@ -631,30 +693,30 @@ bool Process::DoEvents(void) } if (!is_timeout) { - char buffer[512]; - for (;;) { #ifdef _WIN32 - DWORD rc; - if (!ReadFile(m_FD, buffer, sizeof(buffer), &rc, NULL) || rc == 0) - break; + m_ReadPending = false; + + DWORD rc; + if (!m_ReadFailed && GetOverlappedResult(m_FD, &m_Overlapped, &rc, TRUE) && rc > 0) { + m_OutputStream.write(m_ReadBuffer, rc); + return true; + } #else /* _WIN32 */ + char buffer[512]; + for (;;) { int rc = read(m_FD, buffer, sizeof(buffer)); if (rc < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) return true; if (rc > 0) { -#endif /* _WIN32 */ m_OutputStream.write(buffer, rc); -#ifdef _WIN32 - return true; -#else /* _WIN32 */ continue; } -#endif /* _WIN32 */ break; } +#endif /* _WIN32 */ } String output = m_OutputStream.str(); diff --git a/lib/base/process.hpp b/lib/base/process.hpp index d5c1f6710..b79b56fe2 100644 --- a/lib/base/process.hpp +++ b/lib/base/process.hpp @@ -68,6 +68,7 @@ public: static const std::deque::size_type MaxTasksPerThread = 512; Process(const Arguments& arguments, const Dictionary::Ptr& extraEnvironment = Dictionary::Ptr()); + ~Process(void); void SetTimeout(double timeout); double GetTimeout(void) const; @@ -93,6 +94,13 @@ private: pid_t m_PID; ConsoleHandle m_FD; +#ifdef _WIN32 + bool m_ReadPending; + bool m_ReadFailed; + OVERLAPPED m_Overlapped; + char m_ReadBuffer[1024]; +#endif /* _WIN32 */ + std::ostringstream m_OutputStream; boost::function m_Callback; ProcessResult m_Result; -- 2.40.0