]> granicus.if.org Git - python/commitdiff
Resolves issues 5155, 5313, 5331 - bad file descriptor error with processes in processes
authorJesse Noller <jnoller@gmail.com>
Tue, 30 Jun 2009 17:11:52 +0000 (17:11 +0000)
committerJesse Noller <jnoller@gmail.com>
Tue, 30 Jun 2009 17:11:52 +0000 (17:11 +0000)
Doc/library/multiprocessing.rst
Lib/multiprocessing/process.py
Lib/test/test_multiprocessing.py
Misc/ACKS
Misc/NEWS

index 7aaa8e38168a63ec9218b6246df7f0f538313152..04828bb44fd1f7699fb691df5e2477cddc3b7434 100644 (file)
@@ -2101,6 +2101,38 @@ Explicitly pass resources to child processes
            for i in range(10):
                 Process(target=f, args=(lock,)).start()
 
+Beware replacing sys.stdin with a "file like object"
+
+    :mod:`multiprocessing` originally unconditionally called::
+
+        os.close(sys.stdin.fileno())
+
+    In the :meth:`multiprocessing.Process._bootstrap` method of - this resulted
+    in issues with processes-in-processes. This has been changed to::
+
+        sys.stdin.close()
+        sys.stdin = open(os.devnull)
+
+    Which solves the fundamental issue of processes colliding with each other
+    resulting in a bad file descriptor error, but introduces a potential danger
+    to applications which replace :func:`sys.stdin` with a "file-like object"
+    with output buffering, this danger is that if multiple processes call
+    :func:`close()` on this file-like object, it could result in the same
+    data being flushed to the object multiple times, resulting in corruption.
+
+    If you write a file-like object and implement your own caching, you can
+    make it fork-safe by storing the pid whenever you append to the cache,
+    and discarding the cache when the pid changes. For example::
+
+       @property
+       def cache(self):
+           pid = os.getpid()
+           if pid != self._pid:
+               self._pid = pid
+               self._cache = []
+           return self._cache
+
+    For more information, see :issue:`5155`, :issue:`5313` and :issue:`5331`
 
 Windows
 ~~~~~~~
index b034317df025780ac950573e42bbf2086fc0f7eb..0b04e36300fd6c83018d8d1ee0574ffb5cc5bc79 100644 (file)
@@ -220,7 +220,8 @@ class Process(object):
             self._children = set()
             self._counter = itertools.count(1)
             try:
-                os.close(sys.stdin.fileno())
+                sys.stdin.close()
+                sys.stdin = open(os.devnull)
             except (OSError, ValueError):
                 pass
             _current_process = self
index baaece8afd36958bb6b7177c764c951d3f477737..1e4a98e4f24b9d3e24343a088e976003a195db46 100644 (file)
@@ -18,6 +18,7 @@ import socket
 import random
 import logging
 import test_support
+from StringIO import StringIO
 
 
 _multiprocessing = test_support.import_module('_multiprocessing')
@@ -1861,7 +1862,74 @@ class TestInitializers(unittest.TestCase):
         p.join()
         self.assertEqual(self.ns.test, 1)
 
-testcases_other = [OtherTest, TestInvalidHandle, TestInitializers]
+#
+# Issue 5155, 5313, 5331: Test process in processes
+# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
+#
+
+def _ThisSubProcess(q):
+    try:
+        item = q.get(block=False)
+    except Queue.Empty:
+        pass
+
+def _TestProcess(q):
+    queue = multiprocessing.Queue()
+    subProc = multiprocessing.Process(target=_ThisSubProcess, args=(queue,))
+    subProc.start()
+    subProc.join()
+
+def _afunc(x):
+    return x*x
+
+def pool_in_process():
+    pool = multiprocessing.Pool(processes=4)
+    x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
+
+class _file_like(object):
+    def __init__(self, delegate):
+        self._delegate = delegate
+        self._pid = None
+
+    @property
+    def cache(self):
+        pid = os.getpid()
+        # There are no race conditions since fork keeps only the running thread
+        if pid != self._pid:
+            self._pid = pid
+            self._cache = []
+        return self._cache
+
+    def write(self, data):
+        self.cache.append(data)
+
+    def flush(self):
+        self._delegate.write(''.join(self.cache))
+        self._cache = []
+
+class TestStdinBadfiledescriptor(unittest.TestCase):
+
+    def test_queue_in_process(self):
+        queue = multiprocessing.Queue()
+        proc = multiprocessing.Process(target=_TestProcess, args=(queue,))
+        proc.start()
+        proc.join()
+
+    def test_pool_in_process(self):
+        p = multiprocessing.Process(target=pool_in_process)
+        p.start()
+        p.join()
+
+    def test_flushing(self):
+        sio = StringIO()
+        flike = _file_like(sio)
+        flike.write('foo')
+        proc = multiprocessing.Process(target=lambda: flike.flush())
+        flike.flush()
+        assert sio.getvalue() == 'foo'
+
+testcases_other = [OtherTest, TestInvalidHandle, TestInitializers,
+                   TestStdinBadfiledescriptor]
 
 #
 #
index 5cc07e57192b3edb39c18ff789dc5599e7c44100..9159b6fac5d5ffa4bdfe9ae02725de1c953d9219 100644 (file)
--- a/Misc/ACKS
+++ b/Misc/ACKS
@@ -46,6 +46,7 @@ Des Barry
 Ulf Bartelt
 Nick Bastin
 Jeff Bauer
+Mike Bayer
 Michael R Bax
 Anthony Baxter
 Samuel L. Bayer
@@ -183,6 +184,7 @@ Cesar Douady
 Dean Draayer
 John DuBois
 Paul Dubois
+Graham Dumpleton
 Quinn Dunkan
 Robin Dunn
 Luke Dunstan
@@ -553,6 +555,7 @@ Steven Pemberton
 Santiago Peresón
 Mark Perrego
 Trevor Perrin
+Gabriel de Perthuis
 Tim Peters
 Benjamin Peterson
 Chris Petrilli
index 1e244c9518045f9c2be7389f92eed170a8a85323..03cf45c644550cefc52ac4130c3df3487150440d 100644 (file)
--- a/Misc/NEWS
+++ b/Misc/NEWS
@@ -341,6 +341,10 @@ Core and Builtins
 Library
 -------
 
+- Issues #5155, 5313, 5331: multiprocessing.Process._bootstrap was 
+  unconditionally calling "os.close(sys.stdin.fileno())" resulting in file
+  descriptor errors
+
 - Issue #6365: Distutils build_ext inplace mode was copying the compiled 
   extension in a subdirectory if the extension name had dots.