self.remove_signal_handler(sig)
else:
if self._signal_handlers:
- warinigs.warn(f"Closing the loop {self!r} "
+ warnings.warn(f"Closing the loop {self!r} "
f"on interpreter shutdown "
- f"stage, signal unsubsription is disabled",
+ f"stage, skipping signal handlers removal",
ResourceWarning,
source=self)
self._signal_handlers.clear()
self.assertEqual(len(self.loop._signal_handlers), 0)
m_signal.set_wakeup_fd.assert_called_once_with(-1)
+ @mock.patch('asyncio.unix_events.sys')
+ @mock.patch('asyncio.unix_events.signal')
+ def test_close_on_finalizing(self, m_signal, m_sys):
+ m_signal.NSIG = signal.NSIG
+ self.loop.add_signal_handler(signal.SIGHUP, lambda: True)
+
+ self.assertEqual(len(self.loop._signal_handlers), 1)
+ m_sys.is_finalizing.return_value = True
+ m_signal.signal.reset_mock()
+
+ with self.assertWarnsRegex(ResourceWarning,
+ "skipping signal handlers removal"):
+ self.loop.close()
+
+ self.assertEqual(len(self.loop._signal_handlers), 0)
+ self.assertFalse(m_signal.signal.called)
+
@unittest.skipUnless(hasattr(socket, 'AF_UNIX'),
'UNIX Sockets are not supported')