def setUp(self):
super().setUp()
- self.t1 = time.time()
+ self.t1 = time.monotonic()
if hasattr(self, "ctx"):
self.executor = self.executor_type(
max_workers=self.worker_count,
self.executor.shutdown(wait=True)
self.executor = None
- dt = time.time() - self.t1
+ dt = time.monotonic() - self.t1
if test.support.verbose:
print("%.2fs" % dt, end=' ')
- self.assertLess(dt, 60, "synchronization issue: test lasted too long")
+ self.assertLess(dt, 300, "synchronization issue: test lasted too long")
super().tearDown()
with self.assertRaises(BrokenExecutor):
future.result()
# At some point, the executor should break
- t1 = time.time()
+ t1 = time.monotonic()
while not self.executor._broken:
- if time.time() - t1 > 5:
+ if time.monotonic() - t1 > 5:
self.fail("executor not broken after 5 s.")
time.sleep(0.01)
# ... and from this point submit() is guaranteed to fail