def setUp(self):
"""Setup the default logging stream to an internal StringIO instance,
so that we can examine log output as we want."""
+ self._threading_key = support.threading_setup()
+
logger_dict = logging.getLogger().manager.loggerDict
logging._acquireLock()
try:
finally:
logging._releaseLock()
+ self.doCleanups()
+ support.threading_cleanup(*self._threading_key)
+
def assert_log_lines(self, expected_values, stream=None, pat=None):
"""Match the collected log lines against the regular expression
self.expected_log_pat, and compare the extracted group values to
@unittest.skipIf(os.name == 'nt', 'WatchedFileHandler not appropriate for Windows.')
@unittest.skipUnless(threading, 'Threading required for this test.')
- @support.reap_threads
def test_race(self):
# Issue #14632 refers.
def remove_loop(fname, tries):
class SMTPHandlerTest(BaseTest):
TIMEOUT = 8.0
- @support.reap_threads
def test_basic(self):
sockmap = {}
server = TestSMTPServer((support.HOST, 0), self.process_message, 0.001,
self.assertFalse(logger.disabled)
+@unittest.skipIf(True, "FIXME: bpo-30830")
@unittest.skipUnless(threading, 'Threading required for this test.')
class SocketHandlerTest(BaseTest):
os.remove(fn)
return fn
+@unittest.skipIf(True, "FIXME: bpo-30830")
@unittest.skipUnless(hasattr(socket, "AF_UNIX"), "Unix sockets required")
@unittest.skipUnless(threading, 'Threading required for this test.')
class UnixSocketHandlerTest(SocketHandlerTest):
SocketHandlerTest.tearDown(self)
support.unlink(self.address)
+@unittest.skipIf(True, "FIXME: bpo-30830")
@unittest.skipUnless(threading, 'Threading required for this test.')
class DatagramHandlerTest(BaseTest):
self.handled.wait()
self.assertEqual(self.log_output, "spam\neggs\n")
+@unittest.skipIf(True, "FIXME: bpo-30830")
@unittest.skipUnless(hasattr(socket, "AF_UNIX"), "Unix sockets required")
@unittest.skipUnless(threading, 'Threading required for this test.')
class UnixDatagramHandlerTest(DatagramHandlerTest):
self.handled.wait()
self.assertEqual(self.log_output, b'<11>h\xc3\xa4m-sp\xc3\xa4m')
+@unittest.skipIf(True, "FIXME: bpo-30830")
@unittest.skipUnless(hasattr(socket, "AF_UNIX"), "Unix sockets required")
@unittest.skipUnless(threading, 'Threading required for this test.')
class UnixSysLogHandlerTest(SysLogHandlerTest):
SysLogHandlerTest.tearDown(self)
support.unlink(self.address)
+@unittest.skipIf(True, "FIXME: bpo-30830")
@unittest.skipUnless(support.IPV6_ENABLED,
'IPv6 support required for this test.')
@unittest.skipUnless(threading, 'Threading required for this test.')
request.end_headers()
self.handled.set()
- @support.reap_threads
def test_output(self):
# The log message sent to the HTTPHandler is properly received.
logger = logging.getLogger("http")
logging.warning('Exclamation')
self.assertTrue(output.getvalue().endswith('Exclamation!\n'))
+ # listen() uses ConfigSocketReceiver which is based
+ # on socketserver.ThreadingTCPServer
+ @unittest.skipIf(True, "FIXME: bpo-30830")
@unittest.skipUnless(threading, 'listen() needs threading to work')
def setup_via_listener(self, text, verify=None):
text = text.encode("utf-8")
self.fail("join() timed out")
@unittest.skipUnless(threading, 'Threading required for this test.')
- @support.reap_threads
def test_listen_config_10_ok(self):
with support.captured_stdout() as output:
self.setup_via_listener(json.dumps(self.config10))
], stream=output)
@unittest.skipUnless(threading, 'Threading required for this test.')
- @support.reap_threads
def test_listen_config_1_ok(self):
with support.captured_stdout() as output:
self.setup_via_listener(textwrap.dedent(ConfigFileTest.config1))
self.assert_log_lines([])
@unittest.skipUnless(threading, 'Threading required for this test.')
- @support.reap_threads
def test_listen_verify(self):
def verify_fail(stuff):
handler.close()
@patch.object(logging.handlers.QueueListener, 'handle')
- @support.reap_threads
def test_handle_called_with_queue_queue(self, mock_handle):
for i in range(self.repeat):
log_queue = queue.Queue()
@support.requires_multiprocessing_queue
@patch.object(logging.handlers.QueueListener, 'handle')
- @support.reap_threads
def test_handle_called_with_mp_queue(self, mock_handle):
for i in range(self.repeat):
log_queue = multiprocessing.Queue()
return []
@support.requires_multiprocessing_queue
- @support.reap_threads
def test_no_messages_in_queue_after_stop(self):
"""
Five messages are logged then the QueueListener is stopped. This