phase_num = 0
def f():
cond.acquire()
- cond.wait()
+ result = cond.wait()
cond.release()
- results1.append(phase_num)
+ results1.append((result, phase_num))
cond.acquire()
- cond.wait()
+ result = cond.wait()
cond.release()
- results2.append(phase_num)
+ results2.append((result, phase_num))
b = Bunch(f, N)
b.wait_for_started()
_wait()
cond.release()
while len(results1) < 3:
_wait()
- self.assertEqual(results1, [1] * 3)
+ self.assertEqual(results1, [(True, 1)] * 3)
self.assertEqual(results2, [])
# Notify 5 threads: they might be in their first or second wait
cond.acquire()
cond.release()
while len(results1) + len(results2) < 8:
_wait()
- self.assertEqual(results1, [1] * 3 + [2] * 2)
- self.assertEqual(results2, [2] * 3)
+ self.assertEqual(results1, [(True, 1)] * 3 + [(True, 2)] * 2)
+ self.assertEqual(results2, [(True, 2)] * 3)
# Notify all threads: they are all in their second wait
cond.acquire()
cond.notify_all()
cond.release()
while len(results2) < 5:
_wait()
- self.assertEqual(results1, [1] * 3 + [2] * 2)
- self.assertEqual(results2, [2] * 3 + [3] * 2)
+ self.assertEqual(results1, [(True, 1)] * 3 + [(True,2)] * 2)
+ self.assertEqual(results2, [(True, 2)] * 3 + [(True, 3)] * 2)
b.wait_for_finished()
def test_notify(self):
def f():
cond.acquire()
t1 = time.time()
- cond.wait(0.5)
+ result = cond.wait(0.5)
t2 = time.time()
cond.release()
- results.append(t2 - t1)
+ results.append((t2 - t1, result))
Bunch(f, N).wait_for_finished()
- self.assertEqual(len(results), 5)
- for dt in results:
+ self.assertEqual(len(results), N)
+ for dt, result in results:
self.assertTimeout(dt, 0.5)
+ # Note that conceptually (that"s the condition variable protocol)
+ # a wait() may succeed even if no one notifies us and before any
+ # timeout occurs. Spurious wakeups can occur.
+ # This makes it hard to verify the result value.
+ # In practice, this implementation has no spurious wakeups.
+ self.assertFalse(result)
class BaseSemaphoreTests(BaseTestCase):