target=self._test_create_grandchild_process, args=(wconn, ))
p.start()
- if not rconn.poll(timeout=5):
+ if not rconn.poll(timeout=60):
raise AssertionError("Could not communicate with child process")
parent_process_status = rconn.recv()
self.assertEqual(parent_process_status, "alive")
p.terminate()
p.join()
- if not rconn.poll(timeout=5):
+ if not rconn.poll(timeout=60):
raise AssertionError("Could not communicate with child process")
parent_process_status = rconn.recv()
self.assertEqual(parent_process_status, "not alive")
def _test_create_grandchild_process(cls, wconn):
p = cls.Process(target=cls._test_report_parent_status, args=(wconn, ))
p.start()
- time.sleep(100)
+ time.sleep(300)
@classmethod
def _test_report_parent_status(cls, wconn):