]> granicus.if.org Git - python/commitdiff
Fixes #27930: improved QueueListener behaviour.
authorVinay Sajip <vinay_sajip@yahoo.co.uk>
Thu, 8 Sep 2016 00:13:39 +0000 (01:13 +0100)
committerVinay Sajip <vinay_sajip@yahoo.co.uk>
Thu, 8 Sep 2016 00:13:39 +0000 (01:13 +0100)
Lib/logging/handlers.py
Lib/test/test_logging.py
Misc/NEWS

index c9f8217b06b4a937d3177c39bee0b6ace8bbf1bc..c39a56ff066d22512253b6717767ab7b6328ba4d 100644 (file)
@@ -1,4 +1,4 @@
-# Copyright 2001-2015 by Vinay Sajip. All Rights Reserved.
+# Copyright 2001-2016 by Vinay Sajip. All Rights Reserved.
 #
 # Permission to use, copy, modify, and distribute this software and its
 # documentation for any purpose and without fee is hereby granted,
@@ -18,7 +18,7 @@
 Additional handlers for the logging package for Python. The core package is
 based on PEP 282 and comments thereto in comp.lang.python.
 
-Copyright (C) 2001-2015 Vinay Sajip. All Rights Reserved.
+Copyright (C) 2001-2016 Vinay Sajip. All Rights Reserved.
 
 To use, simply 'import logging.handlers' and log away!
 """
@@ -1366,7 +1366,6 @@ if threading:
             """
             self.queue = queue
             self.handlers = handlers
-            self._stop = threading.Event()
             self._thread = None
             self.respect_handler_level = respect_handler_level
 
@@ -1387,7 +1386,7 @@ if threading:
             LogRecords to process.
             """
             self._thread = t = threading.Thread(target=self._monitor)
-            t.setDaemon(True)
+            t.daemon = True
             t.start()
 
         def prepare(self , record):
@@ -1426,20 +1425,9 @@ if threading:
             """
             q = self.queue
             has_task_done = hasattr(q, 'task_done')
-            while not self._stop.isSet():
-                try:
-                    record = self.dequeue(True)
-                    if record is self._sentinel:
-                        break
-                    self.handle(record)
-                    if has_task_done:
-                        q.task_done()
-                except queue.Empty:
-                    pass
-            # There might still be records in the queue.
             while True:
                 try:
-                    record = self.dequeue(False)
+                    record = self.dequeue(True)
                     if record is self._sentinel:
                         break
                     self.handle(record)
@@ -1466,7 +1454,6 @@ if threading:
             Note that if you don't call this before your application exits, there
             may be some records still left on the queue, which won't be processed.
             """
-            self._stop.set()
             self.enqueue_sentinel()
             self._thread.join()
             self._thread = None
index cb6a6282b13a3e173198b0a12252081c77497a04..85344de5735c4b4368cdc03aa6c9b0e675d0ab0b 100644 (file)
@@ -1,4 +1,4 @@
-# Copyright 2001-2014 by Vinay Sajip. All Rights Reserved.
+# Copyright 2001-2016 by Vinay Sajip. All Rights Reserved.
 #
 # Permission to use, copy, modify, and distribute this software and its
 # documentation for any purpose and without fee is hereby granted,
@@ -16,7 +16,7 @@
 
 """Test harness for the logging module. Run all tests.
 
-Copyright (C) 2001-2014 Vinay Sajip. All Rights Reserved.
+Copyright (C) 2001-2016 Vinay Sajip. All Rights Reserved.
 """
 
 import logging
@@ -3022,6 +3022,84 @@ class QueueHandlerTest(BaseTest):
         self.assertFalse(handler.matches(levelno=logging.ERROR, message='5'))
         self.assertTrue(handler.matches(levelno=logging.CRITICAL, message='6'))
 
+if hasattr(logging.handlers, 'QueueListener'):
+    import multiprocessing
+    from unittest.mock import patch
+
+    class QueueListenerTest(BaseTest):
+        """
+        Tests based on patch submitted for issue #27930. Ensure that
+        QueueListener handles all log messages.
+        """
+
+        repeat = 20
+
+        @staticmethod
+        def setup_and_log(log_queue, ident):
+            """
+            Creates a logger with a QueueHandler that logs to a queue read by a
+            QueueListener. Starts the listener, logs five messages, and stops
+            the listener.
+            """
+            logger = logging.getLogger('test_logger_with_id_%s' % ident)
+            logger.setLevel(logging.DEBUG)
+            handler = logging.handlers.QueueHandler(log_queue)
+            logger.addHandler(handler)
+            listener = logging.handlers.QueueListener(log_queue)
+            listener.start()
+
+            logger.info('one')
+            logger.info('two')
+            logger.info('three')
+            logger.info('four')
+            logger.info('five')
+
+            listener.stop()
+            logger.removeHandler(handler)
+            handler.close()
+
+        @patch.object(logging.handlers.QueueListener, 'handle')
+        def test_handle_called_with_queue_queue(self, mock_handle):
+            for i in range(self.repeat):
+                log_queue = queue.Queue()
+                self.setup_and_log(log_queue, '%s_%s' % (self.id(), i))
+            self.assertEqual(mock_handle.call_count, 5 * self.repeat,
+                             'correct number of handled log messages')
+
+        @patch.object(logging.handlers.QueueListener, 'handle')
+        def test_handle_called_with_mp_queue(self, mock_handle):
+            for i in range(self.repeat):
+                log_queue = multiprocessing.Queue()
+                self.setup_and_log(log_queue, '%s_%s' % (self.id(), i))
+            self.assertEqual(mock_handle.call_count, 5 * self.repeat,
+                             'correct number of handled log messages')
+
+        @staticmethod
+        def get_all_from_queue(log_queue):
+            try:
+                while True:
+                    yield log_queue.get_nowait()
+            except queue.Empty:
+                return []
+
+        def test_no_messages_in_queue_after_stop(self):
+            """
+            Five messages are logged then the QueueListener is stopped. This
+            test then gets everything off the queue. Failure of this test
+            indicates that messages were not registered on the queue until
+            _after_ the QueueListener stopped.
+            """
+            for i in range(self.repeat):
+                queue = multiprocessing.Queue()
+                self.setup_and_log(queue, '%s_%s' %(self.id(), i))
+                # time.sleep(1)
+                items = list(self.get_all_from_queue(queue))
+                expected = [[], [logging.handlers.QueueListener._sentinel]]
+                self.assertIn(items, expected,
+                              'Found unexpected messages in queue: %s' % (
+                                    [m.msg if isinstance(m, logging.LogRecord)
+                                     else m for m in items]))
+
 
 ZERO = datetime.timedelta(0)
 
@@ -4166,7 +4244,7 @@ class NTEventLogHandlerTest(BaseTest):
 # first and restore it at the end.
 @support.run_with_locale('LC_ALL', '')
 def test_main():
-    support.run_unittest(
+    tests = [
         BuiltinLevelsTest, BasicFilterTest, CustomLevelsAndFiltersTest,
         HandlerTest, MemoryHandlerTest, ConfigFileTest, SocketHandlerTest,
         DatagramHandlerTest, MemoryTest, EncodingTest, WarningsTest,
@@ -4177,7 +4255,11 @@ def test_main():
         RotatingFileHandlerTest,  LastResortTest, LogRecordTest,
         ExceptionTest, SysLogHandlerTest, HTTPHandlerTest,
         NTEventLogHandlerTest, TimedRotatingFileHandlerTest,
-        UnixSocketHandlerTest, UnixDatagramHandlerTest, UnixSysLogHandlerTest)
+        UnixSocketHandlerTest, UnixDatagramHandlerTest, UnixSysLogHandlerTest,
+    ]
+    if hasattr(logging.handlers, 'QueueListener'):
+        tests.append(QueueListenerTest)
+    support.run_unittest(*tests)
 
 if __name__ == "__main__":
     test_main()
index d142675eaf59db2325d3746d93c48f074ac20dae..9398b8d440b5f82547bb290f8145328f2546bffc 100644 (file)
--- a/Misc/NEWS
+++ b/Misc/NEWS
@@ -218,6 +218,9 @@ Library
 - Issue #27392: Add loop.connect_accepted_socket().
   Patch by Jim Fulton.
 
+- Issue #27930: Improved behaviour of logging.handlers.QueueListener.
+  Thanks to Paulo Andrade and Petr Viktorin for the analysis and patch.
+
 IDLE
 ----