p.join(5)
self.assertTrue(success.value)
+ @classmethod
+ def _test_wait_result(cls, c, pid):
+ with c:
+ c.notify()
+ time.sleep(1)
+ if pid is not None:
+ os.kill(pid, signal.SIGINT)
+
+ def test_wait_result(self):
+ if isinstance(self, ProcessesMixin) and sys.platform != 'win32':
+ pid = os.getpid()
+ else:
+ pid = None
+
+ c = self.Condition()
+ with c:
+ self.assertFalse(c.wait(0))
+ self.assertFalse(c.wait(0.1))
+
+ p = self.Process(target=self._test_wait_result, args=(c, pid))
+ p.start()
+
+ self.assertTrue(c.wait(10))
+ if pid is not None:
+ self.assertRaises(KeyboardInterrupt, c.wait, 10)
+
+ p.join()
+
class _TestEvent(BaseTestCase):