From: Michael Foord Date: Fri, 2 Apr 2010 21:42:47 +0000 (+0000) Subject: Addition of -b command line option to unittest for buffering stdout and stderr during... X-Git-Tag: v2.7b1~132 X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=5637f04a94242b6b696cf699625f708ce4ecb186;p=python Addition of -b command line option to unittest for buffering stdout and stderr during test runs. --- diff --git a/Lib/unittest/main.py b/Lib/unittest/main.py index e2703ce727..7c1f48c2a7 100644 --- a/Lib/unittest/main.py +++ b/Lib/unittest/main.py @@ -68,12 +68,12 @@ class TestProgram(object): USAGE = USAGE_FROM_MODULE # defaults for testing - failfast = catchbreak = None + failfast = catchbreak = buffer = None def __init__(self, module='__main__', defaultTest=None, argv=None, testRunner=None, testLoader=loader.defaultTestLoader, exit=True, - verbosity=1, failfast=None, catchbreak=None): + verbosity=1, failfast=None, catchbreak=None, buffer=None): if isinstance(module, basestring): self.module = __import__(module) for part in module.split('.')[1:]: @@ -87,6 +87,7 @@ class TestProgram(object): self.failfast = failfast self.catchbreak = catchbreak self.verbosity = verbosity + self.buffer = buffer self.defaultTest = defaultTest self.testRunner = testRunner self.testLoader = testLoader @@ -111,9 +112,9 @@ class TestProgram(object): return import getopt - long_opts = ['help', 'verbose', 'quiet', 'failfast', 'catch'] + long_opts = ['help', 'verbose', 'quiet', 'failfast', 'catch', 'buffer'] try: - options, args = getopt.getopt(argv[1:], 'hHvqfc', long_opts) + options, args = getopt.getopt(argv[1:], 'hHvqfcb', long_opts) for opt, value in options: if opt in ('-h','-H','--help'): self.usageExit() @@ -129,6 +130,10 @@ class TestProgram(object): if self.catchbreak is None: self.catchbreak = True # Should this raise an exception if -c is not valid? + if opt in ('-b','--buffer'): + if self.buffer is None: + self.buffer = True + # Should this raise an exception if -b is not valid? if len(args) == 0 and self.defaultTest is None: # createTests will load tests from self.module self.testNames = None @@ -164,6 +169,10 @@ class TestProgram(object): parser.add_option('-c', '--catch', dest='catchbreak', default=False, help='Catch ctrl-C and display results so far', action='store_true') + if self.buffer != False: + parser.add_option('-b', '--buffer', dest='buffer', default=False, + help='Buffer stdout and stderr during tests', + action='store_true') parser.add_option('-s', '--start-directory', dest='start', default='.', help="Directory to start discovery ('.' default)") parser.add_option('-p', '--pattern', dest='pattern', default='test*.py', @@ -184,6 +193,8 @@ class TestProgram(object): self.failfast = options.failfast if self.catchbreak is None: self.catchbreak = options.catchbreak + if self.buffer is None: + self.buffer = options.buffer if options.verbose: self.verbosity = 2 @@ -203,9 +214,10 @@ class TestProgram(object): if isinstance(self.testRunner, (type, types.ClassType)): try: testRunner = self.testRunner(verbosity=self.verbosity, - failfast=self.failfast) + failfast=self.failfast, + buffer=self.buffer) except TypeError: - # didn't accept the verbosity or failfast arguments + # didn't accept the verbosity, buffer or failfast arguments testRunner = self.testRunner() else: # it is assumed to be a TestRunner instance diff --git a/Lib/unittest/result.py b/Lib/unittest/result.py index ec80e8eea5..46eba0401b 100644 --- a/Lib/unittest/result.py +++ b/Lib/unittest/result.py @@ -1,7 +1,11 @@ """Test result object""" +import os +import sys import traceback +from cStringIO import StringIO + from . import util from functools import wraps @@ -15,6 +19,15 @@ def failfast(method): return method(self, *args, **kw) return inner + +_std_out = sys.stdout +_std_err = sys.stderr + +NEWLINE = os.linesep +STDOUT_LINE = '%sStdout:%s%%s' % (NEWLINE, NEWLINE) +STDERR_LINE = '%sStderr:%s%%s' % (NEWLINE, NEWLINE) + + class TestResult(object): """Holder for test result information. @@ -37,6 +50,10 @@ class TestResult(object): self.expectedFailures = [] self.unexpectedSuccesses = [] self.shouldStop = False + self.buffer = False + self._stdout_buffer = StringIO() + self._stderr_buffer = StringIO() + self._mirrorOutput = False def printErrors(self): "Called by TestRunner after test run" @@ -44,6 +61,10 @@ class TestResult(object): def startTest(self, test): "Called when the given test is about to be run" self.testsRun += 1 + self._mirrorOutput = False + if self.buffer: + sys.stdout = self._stdout_buffer + sys.stderr = self._stderr_buffer def startTestRun(self): """Called once before any tests are executed. @@ -53,6 +74,26 @@ class TestResult(object): def stopTest(self, test): """Called when the given test has been run""" + if self.buffer: + if self._mirrorOutput: + output = sys.stdout.getvalue() + error = sys.stderr.getvalue() + if output: + if not output.endswith(NEWLINE): + output += NEWLINE + sys.__stdout__.write(STDOUT_LINE % output) + if error: + if not error.endswith(NEWLINE): + error += NEWLINE + sys.__stderr__.write(STDERR_LINE % error) + + sys.stdout = _std_out + sys.stderr = _std_err + self._stdout_buffer.seek(0) + self._stdout_buffer.truncate() + self._stderr_buffer.seek(0) + self._stderr_buffer.truncate() + self._mirrorOutput = False def stopTestRun(self): """Called once after all tests are executed. @@ -66,12 +107,14 @@ class TestResult(object): returned by sys.exc_info(). """ self.errors.append((test, self._exc_info_to_string(err, test))) + self._mirrorOutput = True @failfast def addFailure(self, test, err): """Called when an error has occurred. 'err' is a tuple of values as returned by sys.exc_info().""" self.failures.append((test, self._exc_info_to_string(err, test))) + self._mirrorOutput = True def addSuccess(self, test): "Called when a test has completed successfully" @@ -105,11 +148,27 @@ class TestResult(object): # Skip test runner traceback levels while tb and self._is_relevant_tb_level(tb): tb = tb.tb_next + if exctype is test.failureException: # Skip assert*() traceback levels length = self._count_relevant_tb_levels(tb) - return ''.join(traceback.format_exception(exctype, value, tb, length)) - return ''.join(traceback.format_exception(exctype, value, tb)) + msgLines = traceback.format_exception(exctype, value, tb, length) + else: + msgLines = traceback.format_exception(exctype, value, tb) + + if self.buffer: + output = sys.stdout.getvalue() + error = sys.stderr.getvalue() + if output: + if not output.endswith(NEWLINE): + output += NEWLINE + msgLines.append(STDOUT_LINE % output) + if error: + if not error.endswith(NEWLINE): + error += NEWLINE + msgLines.append(STDERR_LINE % error) + return ''.join(msgLines) + def _is_relevant_tb_level(self, tb): return '__unittest' in tb.tb_frame.f_globals diff --git a/Lib/unittest/runner.py b/Lib/unittest/runner.py index 5169d20ece..068aa5bf43 100644 --- a/Lib/unittest/runner.py +++ b/Lib/unittest/runner.py @@ -125,11 +125,12 @@ class TextTestRunner(object): resultclass = TextTestResult def __init__(self, stream=sys.stderr, descriptions=True, verbosity=1, - failfast=False, resultclass=None): + failfast=False, buffer=False, resultclass=None): self.stream = _WritelnDecorator(stream) self.descriptions = descriptions self.verbosity = verbosity self.failfast = failfast + self.buffer = buffer if resultclass is not None: self.resultclass = resultclass @@ -141,6 +142,7 @@ class TextTestRunner(object): result = self._makeResult() registerResult(result) result.failfast = self.failfast + result.buffer = self.buffer startTime = time.time() startTestRun = getattr(result, 'startTestRun', None) if startTestRun is not None: diff --git a/Lib/unittest/test/test_break.py b/Lib/unittest/test/test_break.py index 0de31cd864..1cd75b8d56 100644 --- a/Lib/unittest/test/test_break.py +++ b/Lib/unittest/test/test_break.py @@ -205,8 +205,9 @@ class TestBreak(unittest.TestCase): p = Program(False) p.runTests() - self.assertEqual(FakeRunner.initArgs, [((), {'verbosity': verbosity, - 'failfast': failfast})]) + self.assertEqual(FakeRunner.initArgs, [((), {'buffer': None, + 'verbosity': verbosity, + 'failfast': failfast})]) self.assertEqual(FakeRunner.runArgs, [test]) self.assertEqual(p.result, result) @@ -217,8 +218,9 @@ class TestBreak(unittest.TestCase): p = Program(True) p.runTests() - self.assertEqual(FakeRunner.initArgs, [((), {'verbosity': verbosity, - 'failfast': failfast})]) + self.assertEqual(FakeRunner.initArgs, [((), {'buffer': None, + 'verbosity': verbosity, + 'failfast': failfast})]) self.assertEqual(FakeRunner.runArgs, [test]) self.assertEqual(p.result, result) diff --git a/Lib/unittest/test/test_result.py b/Lib/unittest/test/test_result.py index c29fd9c00e..86376798ee 100644 --- a/Lib/unittest/test/test_result.py +++ b/Lib/unittest/test/test_result.py @@ -1,5 +1,6 @@ import sys -from cStringIO import StringIO +import textwrap +from cStringIO import StringIO, OutputType from test import test_support import unittest @@ -301,6 +302,8 @@ def __init__(self, stream=None, descriptions=None, verbosity=None): self.errors = [] self.testsRun = 0 self.shouldStop = False + self.buffer = False + classDict['__init__'] = __init__ OldResult = type('OldResult', (object,), classDict) @@ -356,5 +359,130 @@ class Test_OldTestResult(unittest.TestCase): runner.run(Test('testFoo')) +class TestOutputBuffering(unittest.TestCase): + + def setUp(self): + self._real_out = sys.__stdout__ + self._real_err = sys.__stderr__ + + def tearDown(self): + sys.stdout = sys.__stdout__ = self._real_out + sys.stderr = sys.__stderr__ = self._real_err + + def testBufferOutputOff(self): + real_out = self._real_out + real_err = self._real_err + + result = unittest.TestResult() + self.assertFalse(result.buffer) + + self.assertIs(real_out, sys.stdout) + self.assertIs(real_err, sys.stderr) + + result.startTest(self) + + self.assertIs(real_out, sys.stdout) + self.assertIs(real_err, sys.stderr) + + def testBufferOutputStartTestAddSuccess(self): + real_out = self._real_out + real_err = self._real_err + + result = unittest.TestResult() + self.assertFalse(result.buffer) + + result.buffer = True + + self.assertIs(real_out, sys.stdout) + self.assertIs(real_err, sys.stderr) + + result.startTest(self) + + self.assertIsNot(real_out, sys.stdout) + self.assertIsNot(real_err, sys.stderr) + self.assertIsInstance(sys.stdout, OutputType) + self.assertIsInstance(sys.stderr, OutputType) + self.assertIsNot(sys.stdout, sys.stderr) + + out_stream = sys.stdout + err_stream = sys.stderr + + sys.__stdout__ = StringIO() + sys.__stderr__ = StringIO() + + print 'foo' + print >> sys.stderr, 'bar' + + self.assertEqual(out_stream.getvalue(), 'foo\n') + self.assertEqual(err_stream.getvalue(), 'bar\n') + + self.assertEqual(sys.__stdout__.getvalue(), '') + self.assertEqual(sys.__stderr__.getvalue(), '') + + result.addSuccess(self) + result.stopTest(self) + + self.assertIs(real_out, sys.stdout) + self.assertIs(real_err, sys.stderr) + + self.assertEqual(sys.__stdout__.getvalue(), '') + self.assertEqual(sys.__stderr__.getvalue(), '') + + self.assertEqual(out_stream.getvalue(), '') + self.assertEqual(err_stream.getvalue(), '') + + + def getStartedResult(self): + result = unittest.TestResult() + result.buffer = True + result.startTest(self) + return result + + def testBufferOutputAddErrorOrFailure(self): + def clear(): + sys.__stdout__ = StringIO() + sys.__stderr__ = StringIO() + + for message_attr, add_attr, include_error in [ + ('errors', 'addError', True), + ('failures', 'addFailure', False), + ('errors', 'addError', True), + ('failures', 'addFailure', False) + ]: + clear() + result = self.getStartedResult() + buffered_out = sys.stdout + buffered_err = sys.stderr + + print >> sys.stdout, 'foo' + if include_error: + print >> sys.stderr, 'bar' + + + addFunction = getattr(result, add_attr) + addFunction(self, (None, None, None)) + result.stopTest(self) + + result_list = getattr(result, message_attr) + self.assertEqual(len(result_list), 1) + + test, message = result_list[0] + expectedOutMessage = textwrap.dedent(""" + Stdout: + foo + """) + expectedErrMessage = '' + if include_error: + expectedErrMessage = textwrap.dedent(""" + Stderr: + bar + """) + expectedFullMessage = 'None\n%s%s' % (expectedOutMessage, expectedErrMessage) + + self.assertIs(test, self) + self.assertEqual(sys.__stdout__.getvalue(), expectedOutMessage) + self.assertEqual(sys.__stderr__.getvalue(), expectedErrMessage) + self.assertMultiLineEqual(message, expectedFullMessage) + if __name__ == '__main__': unittest.main()