]> granicus.if.org Git - python/commitdiff
Attempt to speed up some subprocess tests (and hopefully keep them reliable).
authorRoss Lagerwall <rosslagerwall@gmail.com>
Sun, 12 Feb 2012 07:01:30 +0000 (09:01 +0200)
committerRoss Lagerwall <rosslagerwall@gmail.com>
Sun, 12 Feb 2012 07:01:30 +0000 (09:01 +0200)
Lib/test/test_subprocess.py

index f2cdc333c9d09e635f89eef4e3f4cec6124df27b..b00e780ea1377c48e0b78c48d9a121648dcb9451 100644 (file)
@@ -686,26 +686,19 @@ class ProcessTestCase(BaseTestCase):
         self.assertEqual(subprocess.list2cmdline(['ab', '']),
                          'ab ""')
 
-
     def test_poll(self):
-        p = subprocess.Popen([sys.executable,
-                          "-c", "import time; time.sleep(1)"])
-        count = 0
-        while p.poll() is None:
-            time.sleep(0.1)
-            count += 1
-        # We expect that the poll loop probably went around about 10 times,
-        # but, based on system scheduling we can't control, it's possible
-        # poll() never returned None.  It "should be" very rare that it
-        # didn't go around at least twice.
-        self.assertGreaterEqual(count, 2)
+        p = subprocess.Popen([sys.executable, "-c",
+                              "import os",
+                              "os.read(1)"], stdin=subprocess.PIPE)
+        self.addCleanup(p.stdin.close)
+        self.assertIsNone(p.poll())
+        os.write(p.stdin.fileno(), b'A')
+        p.wait()
         # Subsequent invocations should just return the returncode
         self.assertEqual(p.poll(), 0)
 
-
     def test_wait(self):
-        p = subprocess.Popen([sys.executable,
-                          "-c", "import time; time.sleep(2)"])
+        p = subprocess.Popen([sys.executable, "-c", "pass"])
         self.assertEqual(p.wait(), 0)
         # Subsequent invocations should just return the returncode
         self.assertEqual(p.wait(), 0)
@@ -797,25 +790,29 @@ class ProcessTestCase(BaseTestCase):
         p = subprocess.Popen([sys.executable, "-c", 'pass'],
                              stdin=subprocess.PIPE)
         self.addCleanup(p.stdin.close)
-        time.sleep(2)
+        p.wait()
         p.communicate(b"x" * 2**20)
 
-    @unittest.skipUnless(hasattr(signal, 'SIGALRM'),
-                         "Requires signal.SIGALRM")
+    @unittest.skipUnless(hasattr(signal, 'SIGUSR1'),
+                         "Requires signal.SIGUSR1")
+    @unittest.skipUnless(hasattr(os, 'kill'),
+                         "Requires os.kill")
+    @unittest.skipUnless(hasattr(os, 'getppid'),
+                         "Requires os.getppid")
     def test_communicate_eintr(self):
         # Issue #12493: communicate() should handle EINTR
         def handler(signum, frame):
             pass
-        old_handler = signal.signal(signal.SIGALRM, handler)
-        self.addCleanup(signal.signal, signal.SIGALRM, old_handler)
+        old_handler = signal.signal(signal.SIGUSR1, handler)
+        self.addCleanup(signal.signal, signal.SIGUSR1, old_handler)
 
-        # the process is running for 2 seconds
-        args = [sys.executable, "-c", 'import time; time.sleep(2)']
+        args = [sys.executable, "-c",
+                'import os, signal;'
+                'os.kill(os.getppid(), signal.SIGUSR1)']
         for stream in ('stdout', 'stderr'):
             kw = {stream: subprocess.PIPE}
             with subprocess.Popen(args, **kw) as process:
-                signal.alarm(1)
-                # communicate() will be interrupted by SIGALRM
+                # communicate() will be interrupted by SIGUSR1
                 process.communicate()