]> granicus.if.org Git - icinga2/commitdiff
Fix: Make sure we're continuously reading from child processes' pipes
authorGunnar Beutner <gunnar@beutner.name>
Mon, 10 Aug 2015 11:33:32 +0000 (13:33 +0200)
committerGunnar Beutner <gunnar@beutner.name>
Wed, 12 Aug 2015 08:37:09 +0000 (10:37 +0200)
fixes #9867

lib/base/process.cpp
lib/base/process.hpp

index 5f66940a74111ce1dd2368d63a578b2a45b9e0b0..d192044a89fe42b3710f3e65837b65138caf69ec 100644 (file)
@@ -61,7 +61,21 @@ INITIALIZE_ONCE(&Process::StaticInitialize);
 
 Process::Process(const Process::Arguments& arguments, const Dictionary::Ptr& extraEnvironment)
        : m_Arguments(arguments), m_ExtraEnvironment(extraEnvironment), m_Timeout(600)
-{ }
+#ifdef _WIN32
+       , m_ReadPending(false), m_ReadFailed(false), m_Overlapped()
+#endif /* _WIN32 */
+{
+#ifdef _WIN32
+       m_Overlapped.hEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
+#endif /* _WIN32 */
+}
+
+Process::~Process(void)
+{
+#ifdef _WIN32
+       CloseHandle(m_Overlapped.hEvent);
+#endif /* _WIN32 */
+}
 
 void Process::StaticInitialize(void)
 {
@@ -152,6 +166,7 @@ void Process::IOThreadProc(int tid)
 {
 #ifdef _WIN32
        HANDLE *handles = NULL;
+       HANDLE *fhandles = NULL;
 #else /* _WIN32 */
        pollfd *pfds = NULL;
 #endif /* _WIN32 */
@@ -171,8 +186,9 @@ void Process::IOThreadProc(int tid)
                        count = 1 + l_Processes[tid].size();
 #ifdef _WIN32
                        handles = reinterpret_cast<HANDLE *>(realloc(handles, sizeof(HANDLE) * count));
+                       fhandles = reinterpret_cast<HANDLE *>(realloc(fhandles, sizeof(HANDLE) * count));
 
-                       handles[0] = l_Events[tid];
+                       fhandles[0] = l_Events[tid];
 
 #else /* _WIN32 */
                        pfds = reinterpret_cast<pollfd *>(realloc(pfds, sizeof(pollfd) * count));
@@ -185,16 +201,29 @@ void Process::IOThreadProc(int tid)
                        int i = 1;
                        std::pair<ProcessHandle, Process::Ptr> kv;
                        BOOST_FOREACH(kv, l_Processes[tid]) {
+                               const Process::Ptr& process = kv.second;
 #ifdef _WIN32
                                handles[i] = kv.first;
+
+                               if (!process->m_ReadPending) {
+                                       process->m_ReadPending = true;
+
+                                       BOOL res = ReadFile(process->m_FD, process->m_ReadBuffer, sizeof(process->m_ReadBuffer), 0, &process->m_Overlapped);
+                                       if (res || GetLastError() != ERROR_IO_PENDING) {
+                                               process->m_ReadFailed = !res;
+                                               SetEvent(process->m_Overlapped.hEvent);
+                                       }
+                               }
+
+                               fhandles[i] = process->m_Overlapped.hEvent;
 #else /* _WIN32 */
-                               pfds[i].fd = kv.second->m_FD;
+                               pfds[i].fd = process->m_FD;
                                pfds[i].events = POLLIN;
                                pfds[i].revents = 0;
 #endif /* _WIN32 */
 
-                               if (kv.second->m_Timeout != 0) {
-                                       double delta = kv.second->m_Timeout - (now - kv.second->m_Result.ExecutionStart);
+                               if (process->m_Timeout != 0) {
+                                       double delta = process->m_Timeout - (now - process->m_Result.ExecutionStart);
 
                                        if (timeout == -1 || delta < timeout)
                                                timeout = delta;
@@ -210,7 +239,7 @@ void Process::IOThreadProc(int tid)
                timeout *= 1000;
 
 #ifdef _WIN32
-               DWORD rc = WaitForMultipleObjects(count, handles, FALSE, timeout == -1 ? INFINITE : static_cast<DWORD>(timeout));
+               DWORD rc = WaitForMultipleObjects(count, fhandles, FALSE, timeout == -1 ? INFINITE : static_cast<DWORD>(timeout));
 #else /* _WIN32 */
                int rc = poll(pfds, count, timeout);
 
@@ -290,6 +319,39 @@ String Process::PrettyPrintArguments(const Process::Arguments& arguments)
 #endif /* _WIN32 */
 }
 
+#ifdef _WIN32
+static BOOL CreatePipeOverlapped(HANDLE *outReadPipe, HANDLE *outWritePipe,
+    SECURITY_ATTRIBUTES *securityAttributes, DWORD size, DWORD readMode, DWORD writeMode)
+{
+       static int pipeIndex = 0;
+
+       if (size == 0)
+               size = 8192;
+
+       pipeIndex++;
+
+       char pipeName[128];
+       sprintf(pipeName, "\\\\.\\Pipe\\OverlappedPipe.%d.%d", (int)GetCurrentProcessId(), pipeIndex);
+
+       *outReadPipe = CreateNamedPipe(pipeName, PIPE_ACCESS_INBOUND | readMode,
+           PIPE_TYPE_BYTE | PIPE_WAIT, 1, size, size, 60 * 1000, securityAttributes);
+       
+       if (!*outReadPipe)
+               return FALSE;
+
+       *outWritePipe = CreateFile(pipeName, GENERIC_WRITE, 0, securityAttributes, OPEN_EXISTING, FILE_ATTRIBUTE_NORMAL | writeMode, NULL);
+
+       if (*outWritePipe == INVALID_HANDLE_VALUE) {
+               DWORD error = GetLastError();
+               CloseHandle(*outReadPipe);
+               SetLastError(error);
+               return FALSE;
+       }
+
+       return TRUE;
+}
+#endif /* _WIN32 */
+
 void Process::Run(const boost::function<void(const ProcessResult&)>& callback)
 {
        boost::call_once(l_OnceFlag, &Process::ThreadInitialize);
@@ -302,7 +364,7 @@ void Process::Run(const boost::function<void(const ProcessResult&)>& callback)
        sa.bInheritHandle = TRUE;
 
        HANDLE outReadPipe, outWritePipe;
-       if (!CreatePipe(&outReadPipe, &outWritePipe, &sa, 0))
+       if (!CreatePipeOverlapped(&outReadPipe, &outWritePipe, &sa, 0, FILE_FLAG_OVERLAPPED, 0))
                BOOST_THROW_EXCEPTION(win32_error()
                        << boost::errinfo_api_function("CreatePipe")
                        << errinfo_win32_error(GetLastError()));
@@ -631,30 +693,30 @@ bool Process::DoEvents(void)
        }
 
        if (!is_timeout) {
-               char buffer[512];
-               for (;;) {
 #ifdef _WIN32
-                       DWORD rc;
-                       if (!ReadFile(m_FD, buffer, sizeof(buffer), &rc, NULL) || rc == 0)
-                               break;
+               m_ReadPending = false;
+
+               DWORD rc;
+               if (!m_ReadFailed && GetOverlappedResult(m_FD, &m_Overlapped, &rc, TRUE) && rc > 0) {
+                       m_OutputStream.write(m_ReadBuffer, rc);
+                       return true;
+               }
 #else /* _WIN32 */
+               char buffer[512];
+               for (;;) {
                        int rc = read(m_FD, buffer, sizeof(buffer));
 
                        if (rc < 0 && (errno == EAGAIN || errno == EWOULDBLOCK))
                                return true;
 
                        if (rc > 0) {
-#endif /* _WIN32 */
                                m_OutputStream.write(buffer, rc);
-#ifdef _WIN32
-                               return true;
-#else /* _WIN32 */
                                continue;
                        }
-#endif /* _WIN32 */
 
                        break;
                }
+#endif /* _WIN32 */
        }
 
        String output = m_OutputStream.str();
index d5c1f67105754aea7c2d49e0270c16c9ff4fa5a6..b79b56fe2844dcc5fd23fae8be8502b95af8cc43 100644 (file)
@@ -68,6 +68,7 @@ public:
        static const std::deque<Process::Ptr>::size_type MaxTasksPerThread = 512;
 
        Process(const Arguments& arguments, const Dictionary::Ptr& extraEnvironment = Dictionary::Ptr());
+       ~Process(void);
 
        void SetTimeout(double timeout);
        double GetTimeout(void) const;
@@ -93,6 +94,13 @@ private:
        pid_t m_PID;
        ConsoleHandle m_FD;
 
+#ifdef _WIN32
+       bool m_ReadPending;
+       bool m_ReadFailed;
+       OVERLAPPED m_Overlapped;
+       char m_ReadBuffer[1024];
+#endif /* _WIN32 */
+
        std::ostringstream m_OutputStream;
        boost::function<void (const ProcessResult&)> m_Callback;
        ProcessResult m_Result;