USAGE = USAGE_FROM_MODULE
# defaults for testing
- failfast = catchbreak = None
+ failfast = catchbreak = buffer = None
def __init__(self, module='__main__', defaultTest=None,
argv=None, testRunner=None,
testLoader=loader.defaultTestLoader, exit=True,
- verbosity=1, failfast=None, catchbreak=None):
+ verbosity=1, failfast=None, catchbreak=None, buffer=None):
if isinstance(module, basestring):
self.module = __import__(module)
for part in module.split('.')[1:]:
self.failfast = failfast
self.catchbreak = catchbreak
self.verbosity = verbosity
+ self.buffer = buffer
self.defaultTest = defaultTest
self.testRunner = testRunner
self.testLoader = testLoader
return
import getopt
- long_opts = ['help', 'verbose', 'quiet', 'failfast', 'catch']
+ long_opts = ['help', 'verbose', 'quiet', 'failfast', 'catch', 'buffer']
try:
- options, args = getopt.getopt(argv[1:], 'hHvqfc', long_opts)
+ options, args = getopt.getopt(argv[1:], 'hHvqfcb', long_opts)
for opt, value in options:
if opt in ('-h','-H','--help'):
self.usageExit()
if self.catchbreak is None:
self.catchbreak = True
# Should this raise an exception if -c is not valid?
+ if opt in ('-b','--buffer'):
+ if self.buffer is None:
+ self.buffer = True
+ # Should this raise an exception if -b is not valid?
if len(args) == 0 and self.defaultTest is None:
# createTests will load tests from self.module
self.testNames = None
parser.add_option('-c', '--catch', dest='catchbreak', default=False,
help='Catch ctrl-C and display results so far',
action='store_true')
+ if self.buffer != False:
+ parser.add_option('-b', '--buffer', dest='buffer', default=False,
+ help='Buffer stdout and stderr during tests',
+ action='store_true')
parser.add_option('-s', '--start-directory', dest='start', default='.',
help="Directory to start discovery ('.' default)")
parser.add_option('-p', '--pattern', dest='pattern', default='test*.py',
self.failfast = options.failfast
if self.catchbreak is None:
self.catchbreak = options.catchbreak
+ if self.buffer is None:
+ self.buffer = options.buffer
if options.verbose:
self.verbosity = 2
if isinstance(self.testRunner, (type, types.ClassType)):
try:
testRunner = self.testRunner(verbosity=self.verbosity,
- failfast=self.failfast)
+ failfast=self.failfast,
+ buffer=self.buffer)
except TypeError:
- # didn't accept the verbosity or failfast arguments
+ # didn't accept the verbosity, buffer or failfast arguments
testRunner = self.testRunner()
else:
# it is assumed to be a TestRunner instance
"""Test result object"""
+import os
+import sys
import traceback
+from cStringIO import StringIO
+
from . import util
from functools import wraps
return method(self, *args, **kw)
return inner
+
+_std_out = sys.stdout
+_std_err = sys.stderr
+
+NEWLINE = os.linesep
+STDOUT_LINE = '%sStdout:%s%%s' % (NEWLINE, NEWLINE)
+STDERR_LINE = '%sStderr:%s%%s' % (NEWLINE, NEWLINE)
+
+
class TestResult(object):
"""Holder for test result information.
self.expectedFailures = []
self.unexpectedSuccesses = []
self.shouldStop = False
+ self.buffer = False
+ self._stdout_buffer = StringIO()
+ self._stderr_buffer = StringIO()
+ self._mirrorOutput = False
def printErrors(self):
"Called by TestRunner after test run"
def startTest(self, test):
"Called when the given test is about to be run"
self.testsRun += 1
+ self._mirrorOutput = False
+ if self.buffer:
+ sys.stdout = self._stdout_buffer
+ sys.stderr = self._stderr_buffer
def startTestRun(self):
"""Called once before any tests are executed.
def stopTest(self, test):
"""Called when the given test has been run"""
+ if self.buffer:
+ if self._mirrorOutput:
+ output = sys.stdout.getvalue()
+ error = sys.stderr.getvalue()
+ if output:
+ if not output.endswith(NEWLINE):
+ output += NEWLINE
+ sys.__stdout__.write(STDOUT_LINE % output)
+ if error:
+ if not error.endswith(NEWLINE):
+ error += NEWLINE
+ sys.__stderr__.write(STDERR_LINE % error)
+
+ sys.stdout = _std_out
+ sys.stderr = _std_err
+ self._stdout_buffer.seek(0)
+ self._stdout_buffer.truncate()
+ self._stderr_buffer.seek(0)
+ self._stderr_buffer.truncate()
+ self._mirrorOutput = False
def stopTestRun(self):
"""Called once after all tests are executed.
returned by sys.exc_info().
"""
self.errors.append((test, self._exc_info_to_string(err, test)))
+ self._mirrorOutput = True
@failfast
def addFailure(self, test, err):
"""Called when an error has occurred. 'err' is a tuple of values as
returned by sys.exc_info()."""
self.failures.append((test, self._exc_info_to_string(err, test)))
+ self._mirrorOutput = True
def addSuccess(self, test):
"Called when a test has completed successfully"
# Skip test runner traceback levels
while tb and self._is_relevant_tb_level(tb):
tb = tb.tb_next
+
if exctype is test.failureException:
# Skip assert*() traceback levels
length = self._count_relevant_tb_levels(tb)
- return ''.join(traceback.format_exception(exctype, value, tb, length))
- return ''.join(traceback.format_exception(exctype, value, tb))
+ msgLines = traceback.format_exception(exctype, value, tb, length)
+ else:
+ msgLines = traceback.format_exception(exctype, value, tb)
+
+ if self.buffer:
+ output = sys.stdout.getvalue()
+ error = sys.stderr.getvalue()
+ if output:
+ if not output.endswith(NEWLINE):
+ output += NEWLINE
+ msgLines.append(STDOUT_LINE % output)
+ if error:
+ if not error.endswith(NEWLINE):
+ error += NEWLINE
+ msgLines.append(STDERR_LINE % error)
+ return ''.join(msgLines)
+
def _is_relevant_tb_level(self, tb):
return '__unittest' in tb.tb_frame.f_globals
resultclass = TextTestResult
def __init__(self, stream=sys.stderr, descriptions=True, verbosity=1,
- failfast=False, resultclass=None):
+ failfast=False, buffer=False, resultclass=None):
self.stream = _WritelnDecorator(stream)
self.descriptions = descriptions
self.verbosity = verbosity
self.failfast = failfast
+ self.buffer = buffer
if resultclass is not None:
self.resultclass = resultclass
result = self._makeResult()
registerResult(result)
result.failfast = self.failfast
+ result.buffer = self.buffer
startTime = time.time()
startTestRun = getattr(result, 'startTestRun', None)
if startTestRun is not None:
p = Program(False)
p.runTests()
- self.assertEqual(FakeRunner.initArgs, [((), {'verbosity': verbosity,
- 'failfast': failfast})])
+ self.assertEqual(FakeRunner.initArgs, [((), {'buffer': None,
+ 'verbosity': verbosity,
+ 'failfast': failfast})])
self.assertEqual(FakeRunner.runArgs, [test])
self.assertEqual(p.result, result)
p = Program(True)
p.runTests()
- self.assertEqual(FakeRunner.initArgs, [((), {'verbosity': verbosity,
- 'failfast': failfast})])
+ self.assertEqual(FakeRunner.initArgs, [((), {'buffer': None,
+ 'verbosity': verbosity,
+ 'failfast': failfast})])
self.assertEqual(FakeRunner.runArgs, [test])
self.assertEqual(p.result, result)
import sys
-from cStringIO import StringIO
+import textwrap
+from cStringIO import StringIO, OutputType
from test import test_support
import unittest
self.errors = []
self.testsRun = 0
self.shouldStop = False
+ self.buffer = False
+
classDict['__init__'] = __init__
OldResult = type('OldResult', (object,), classDict)
runner.run(Test('testFoo'))
+class TestOutputBuffering(unittest.TestCase):
+
+ def setUp(self):
+ self._real_out = sys.__stdout__
+ self._real_err = sys.__stderr__
+
+ def tearDown(self):
+ sys.stdout = sys.__stdout__ = self._real_out
+ sys.stderr = sys.__stderr__ = self._real_err
+
+ def testBufferOutputOff(self):
+ real_out = self._real_out
+ real_err = self._real_err
+
+ result = unittest.TestResult()
+ self.assertFalse(result.buffer)
+
+ self.assertIs(real_out, sys.stdout)
+ self.assertIs(real_err, sys.stderr)
+
+ result.startTest(self)
+
+ self.assertIs(real_out, sys.stdout)
+ self.assertIs(real_err, sys.stderr)
+
+ def testBufferOutputStartTestAddSuccess(self):
+ real_out = self._real_out
+ real_err = self._real_err
+
+ result = unittest.TestResult()
+ self.assertFalse(result.buffer)
+
+ result.buffer = True
+
+ self.assertIs(real_out, sys.stdout)
+ self.assertIs(real_err, sys.stderr)
+
+ result.startTest(self)
+
+ self.assertIsNot(real_out, sys.stdout)
+ self.assertIsNot(real_err, sys.stderr)
+ self.assertIsInstance(sys.stdout, OutputType)
+ self.assertIsInstance(sys.stderr, OutputType)
+ self.assertIsNot(sys.stdout, sys.stderr)
+
+ out_stream = sys.stdout
+ err_stream = sys.stderr
+
+ sys.__stdout__ = StringIO()
+ sys.__stderr__ = StringIO()
+
+ print 'foo'
+ print >> sys.stderr, 'bar'
+
+ self.assertEqual(out_stream.getvalue(), 'foo\n')
+ self.assertEqual(err_stream.getvalue(), 'bar\n')
+
+ self.assertEqual(sys.__stdout__.getvalue(), '')
+ self.assertEqual(sys.__stderr__.getvalue(), '')
+
+ result.addSuccess(self)
+ result.stopTest(self)
+
+ self.assertIs(real_out, sys.stdout)
+ self.assertIs(real_err, sys.stderr)
+
+ self.assertEqual(sys.__stdout__.getvalue(), '')
+ self.assertEqual(sys.__stderr__.getvalue(), '')
+
+ self.assertEqual(out_stream.getvalue(), '')
+ self.assertEqual(err_stream.getvalue(), '')
+
+
+ def getStartedResult(self):
+ result = unittest.TestResult()
+ result.buffer = True
+ result.startTest(self)
+ return result
+
+ def testBufferOutputAddErrorOrFailure(self):
+ def clear():
+ sys.__stdout__ = StringIO()
+ sys.__stderr__ = StringIO()
+
+ for message_attr, add_attr, include_error in [
+ ('errors', 'addError', True),
+ ('failures', 'addFailure', False),
+ ('errors', 'addError', True),
+ ('failures', 'addFailure', False)
+ ]:
+ clear()
+ result = self.getStartedResult()
+ buffered_out = sys.stdout
+ buffered_err = sys.stderr
+
+ print >> sys.stdout, 'foo'
+ if include_error:
+ print >> sys.stderr, 'bar'
+
+
+ addFunction = getattr(result, add_attr)
+ addFunction(self, (None, None, None))
+ result.stopTest(self)
+
+ result_list = getattr(result, message_attr)
+ self.assertEqual(len(result_list), 1)
+
+ test, message = result_list[0]
+ expectedOutMessage = textwrap.dedent("""
+ Stdout:
+ foo
+ """)
+ expectedErrMessage = ''
+ if include_error:
+ expectedErrMessage = textwrap.dedent("""
+ Stderr:
+ bar
+ """)
+ expectedFullMessage = 'None\n%s%s' % (expectedOutMessage, expectedErrMessage)
+
+ self.assertIs(test, self)
+ self.assertEqual(sys.__stdout__.getvalue(), expectedOutMessage)
+ self.assertEqual(sys.__stderr__.getvalue(), expectedErrMessage)
+ self.assertMultiLineEqual(message, expectedFullMessage)
+
if __name__ == '__main__':
unittest.main()