@unittest.skipIf(os.name == 'nt', 'WatchedFileHandler not appropriate for Windows.')
@unittest.skipUnless(threading, 'Threading required for this test.')
+ @support.reap_threads
def test_race(self):
# Issue #14632 refers.
def remove_loop(fname, tries):
"""
self.close()
self._thread.join(timeout)
+ alive = self._thread.is_alive()
self._thread = None
+ if alive:
+ self.fail("join() timed out")
class ControlMixin(object):
"""
self.shutdown()
if self._thread is not None:
self._thread.join(timeout)
+ alive = self._thread.is_alive()
self._thread = None
+ if alive:
+ self.fail("join() timed out")
self.server_close()
self.ready.clear()
@unittest.skipUnless(threading, 'Threading required for this test.')
class SMTPHandlerTest(BaseTest):
TIMEOUT = 8.0
+
+ @support.reap_threads
def test_basic(self):
sockmap = {}
server = TestSMTPServer((support.HOST, 0), self.process_message, 0.001,
request.end_headers()
self.handled.set()
+ @support.reap_threads
def test_output(self):
# The log message sent to the HTTPHandler is properly received.
logger = logging.getLogger("http")
t.ready.wait(2.0)
logging.config.stopListening()
t.join(2.0)
+ if t.is_alive():
+ self.fail("join() timed out")
@unittest.skipUnless(threading, 'Threading required for this test.')
+ @support.reap_threads
def test_listen_config_10_ok(self):
with support.captured_stdout() as output:
self.setup_via_listener(json.dumps(self.config10))
], stream=output)
@unittest.skipUnless(threading, 'Threading required for this test.')
+ @support.reap_threads
def test_listen_config_1_ok(self):
with support.captured_stdout() as output:
self.setup_via_listener(textwrap.dedent(ConfigFileTest.config1))
self.assert_log_lines([])
@unittest.skipUnless(threading, 'Threading required for this test.')
+ @support.reap_threads
def test_listen_verify(self):
def verify_fail(stuff):