import logging.handlers
try:
- import _thread
+ import _thread, threading
except ImportError:
_thread = None
+ threading = None
+try:
+ import multiprocessing.process
+except ImportError:
+ multiprocessing = None
+
__all__ = [
"Error", "TestFailed", "ResourceDenied", "import_module",
def threading_setup():
if _thread:
- return _thread._count(),
+ return _thread._count(), threading._dangling.copy()
else:
- return 1,
+ return 1, ()
-def threading_cleanup(nb_threads):
+def threading_cleanup(*original_values):
if not _thread:
return
_MAX_COUNT = 10
for count in range(_MAX_COUNT):
- n = _thread._count()
- if n == nb_threads:
+ values = _thread._count(), threading._dangling
+ if values == original_values:
break
time.sleep(0.1)
+ gc_collect()
# XXX print a warning in case of failure?
def reap_threads(func):
Tests
-----
+- Try harder to reap dangling threads in test.support.reap_threads().
+
- Issue #12573: Add resource checks for dangling Thread and Process objects.
- Issue #12549: Correct test_platform to not fail when OS X returns 'x86_64'