import fcntl
except ImportError:
pass
-try:
- import multiprocessing
-except ImportError:
- multiprocessing = None
class TestBase(unittest.TestCase):
self._box = self._factory(self._path)
@unittest.skipUnless(hasattr(os, 'fork'), "Test needs fork().")
- @unittest.skipUnless(multiprocessing, "Test needs multiprocessing.")
+ @unittest.skipUnless(hasattr(socket, 'socketpair'), "Test needs socketpair().")
def test_lock_conflict(self):
# Fork off a child process that will lock the mailbox temporarily,
# unlock it and exit.
- ready = multiprocessing.Event()
- done = multiprocessing.Event()
+ c, p = socket.socketpair()
+ self.addCleanup(c.close)
+ self.addCleanup(p.close)
pid = os.fork()
if pid == 0:
try:
# lock the mailbox, and signal the parent it can proceed
self._box.lock()
- ready.set()
+ c.send(b'c')
# wait until the parent is done, and unlock the mailbox
- done.wait(5)
+ c.recv(1)
self._box.unlock()
finally:
os._exit(0)
# In the parent, wait until the child signals it locked the mailbox.
- ready.wait(5)
+ p.recv(1)
try:
self.assertRaises(mailbox.ExternalClashError,
self._box.lock)
finally:
# Signal the child it can now release the lock and exit.
- done.set()
+ p.send(b'p')
# Wait for child to exit. Locking should now succeed.
exited_pid, status = os.waitpid(pid, 0)