From: Serhiy Storchaka Date: Tue, 5 Jun 2018 16:53:03 +0000 (+0300) Subject: [3.6] bpo-33753: Refactor creating temporary files in test_fileinput. (GH-7377).... X-Git-Tag: v3.6.6rc1~39 X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=b8baa33d931de557d2915c604df85ab168da0904;p=python [3.6] bpo-33753: Refactor creating temporary files in test_fileinput. (GH-7377). (GH-7431) (cherry picked from commit 5f48e2644dcfb47f0bbc0fcdc2b103a19bdec288) --- diff --git a/Lib/test/test_fileinput.py b/Lib/test/test_fileinput.py index 565633fccc..bb74982e5c 100644 --- a/Lib/test/test_fileinput.py +++ b/Lib/test/test_fileinput.py @@ -8,6 +8,7 @@ import re import fileinput import collections import builtins +import tempfile import unittest try: @@ -32,20 +33,15 @@ from unittest import mock # all the work, and a few functions (input, etc.) that use a global _state # variable. -# Write lines (a list of lines) to temp file number i, and return the -# temp file's name. -def writeTmp(i, lines, mode='w'): # opening in text mode is the default - name = TESTFN + str(i) - f = open(name, mode) - for line in lines: - f.write(line) - f.close() - return name - -def remove_tempfiles(*names): - for name in names: - if name: - safe_unlink(name) +class BaseTests: + # Write a content (str or bytes) to temp file, and return the + # temp file's name. + def writeTmp(self, content, *, mode='w'): # opening in text mode is the default + fd, name = tempfile.mkstemp() + self.addCleanup(support.unlink, name) + with open(fd, mode) as f: + f.write(content) + return name class LineReader: @@ -83,23 +79,19 @@ class LineReader: def close(self): pass -class BufferSizesTests(unittest.TestCase): +class BufferSizesTests(BaseTests, unittest.TestCase): def test_buffer_sizes(self): # First, run the tests with default and teeny buffer size. for round, bs in (0, 0), (1, 30): - t1 = t2 = t3 = t4 = None - try: - t1 = writeTmp(1, ["Line %s of file 1\n" % (i+1) for i in range(15)]) - t2 = writeTmp(2, ["Line %s of file 2\n" % (i+1) for i in range(10)]) - t3 = writeTmp(3, ["Line %s of file 3\n" % (i+1) for i in range(5)]) - t4 = writeTmp(4, ["Line %s of file 4\n" % (i+1) for i in range(1)]) - if bs: - with self.assertWarns(DeprecationWarning): - self.buffer_size_test(t1, t2, t3, t4, bs, round) - else: + t1 = self.writeTmp(''.join("Line %s of file 1\n" % (i+1) for i in range(15))) + t2 = self.writeTmp(''.join("Line %s of file 2\n" % (i+1) for i in range(10))) + t3 = self.writeTmp(''.join("Line %s of file 3\n" % (i+1) for i in range(5))) + t4 = self.writeTmp(''.join("Line %s of file 4\n" % (i+1) for i in range(1))) + if bs: + with self.assertWarns(DeprecationWarning): self.buffer_size_test(t1, t2, t3, t4, bs, round) - finally: - remove_tempfiles(t1, t2, t3, t4) + else: + self.buffer_size_test(t1, t2, t3, t4, bs, round) def buffer_size_test(self, t1, t2, t3, t4, bs=0, round=0): pat = re.compile(r'LINE (\d+) OF FILE (\d+)') @@ -186,74 +178,59 @@ class UnconditionallyRaise: self.invoked = True raise self.exception_type() -class FileInputTests(unittest.TestCase): +class FileInputTests(BaseTests, unittest.TestCase): def test_zero_byte_files(self): - t1 = t2 = t3 = t4 = None - try: - t1 = writeTmp(1, [""]) - t2 = writeTmp(2, [""]) - t3 = writeTmp(3, ["The only line there is.\n"]) - t4 = writeTmp(4, [""]) - fi = FileInput(files=(t1, t2, t3, t4)) - - line = fi.readline() - self.assertEqual(line, 'The only line there is.\n') - self.assertEqual(fi.lineno(), 1) - self.assertEqual(fi.filelineno(), 1) - self.assertEqual(fi.filename(), t3) - - line = fi.readline() - self.assertFalse(line) - self.assertEqual(fi.lineno(), 1) - self.assertEqual(fi.filelineno(), 0) - self.assertEqual(fi.filename(), t4) - fi.close() - finally: - remove_tempfiles(t1, t2, t3, t4) + t1 = self.writeTmp("") + t2 = self.writeTmp("") + t3 = self.writeTmp("The only line there is.\n") + t4 = self.writeTmp("") + fi = FileInput(files=(t1, t2, t3, t4)) + + line = fi.readline() + self.assertEqual(line, 'The only line there is.\n') + self.assertEqual(fi.lineno(), 1) + self.assertEqual(fi.filelineno(), 1) + self.assertEqual(fi.filename(), t3) + + line = fi.readline() + self.assertFalse(line) + self.assertEqual(fi.lineno(), 1) + self.assertEqual(fi.filelineno(), 0) + self.assertEqual(fi.filename(), t4) + fi.close() def test_files_that_dont_end_with_newline(self): - t1 = t2 = None - try: - t1 = writeTmp(1, ["A\nB\nC"]) - t2 = writeTmp(2, ["D\nE\nF"]) - fi = FileInput(files=(t1, t2)) - lines = list(fi) - self.assertEqual(lines, ["A\n", "B\n", "C", "D\n", "E\n", "F"]) - self.assertEqual(fi.filelineno(), 3) - self.assertEqual(fi.lineno(), 6) - finally: - remove_tempfiles(t1, t2) + t1 = self.writeTmp("A\nB\nC") + t2 = self.writeTmp("D\nE\nF") + fi = FileInput(files=(t1, t2)) + lines = list(fi) + self.assertEqual(lines, ["A\n", "B\n", "C", "D\n", "E\n", "F"]) + self.assertEqual(fi.filelineno(), 3) + self.assertEqual(fi.lineno(), 6) ## def test_unicode_filenames(self): ## # XXX A unicode string is always returned by writeTmp. ## # So is this needed? -## try: -## t1 = writeTmp(1, ["A\nB"]) -## encoding = sys.getfilesystemencoding() -## if encoding is None: -## encoding = 'ascii' -## fi = FileInput(files=str(t1, encoding)) -## lines = list(fi) -## self.assertEqual(lines, ["A\n", "B"]) -## finally: -## remove_tempfiles(t1) +## t1 = self.writeTmp("A\nB") +## encoding = sys.getfilesystemencoding() +## if encoding is None: +## encoding = 'ascii' +## fi = FileInput(files=str(t1, encoding)) +## lines = list(fi) +## self.assertEqual(lines, ["A\n", "B"]) def test_fileno(self): - t1 = t2 = None - try: - t1 = writeTmp(1, ["A\nB"]) - t2 = writeTmp(2, ["C\nD"]) - fi = FileInput(files=(t1, t2)) - self.assertEqual(fi.fileno(), -1) - line =next( fi) - self.assertNotEqual(fi.fileno(), -1) - fi.nextfile() - self.assertEqual(fi.fileno(), -1) - line = list(fi) - self.assertEqual(fi.fileno(), -1) - finally: - remove_tempfiles(t1, t2) + t1 = self.writeTmp("A\nB") + t2 = self.writeTmp("C\nD") + fi = FileInput(files=(t1, t2)) + self.assertEqual(fi.fileno(), -1) + line = next(fi) + self.assertNotEqual(fi.fileno(), -1) + fi.nextfile() + self.assertEqual(fi.fileno(), -1) + line = list(fi) + self.assertEqual(fi.fileno(), -1) def test_opening_mode(self): try: @@ -262,17 +239,13 @@ class FileInputTests(unittest.TestCase): self.fail("FileInput should reject invalid mode argument") except ValueError: pass - t1 = None - try: - # try opening in universal newline mode - t1 = writeTmp(1, [b"A\nB\r\nC\rD"], mode="wb") - with check_warnings(('', DeprecationWarning)): - fi = FileInput(files=t1, mode="U") - with check_warnings(('', DeprecationWarning)): - lines = list(fi) - self.assertEqual(lines, ["A\n", "B\n", "C\n", "D"]) - finally: - remove_tempfiles(t1) + # try opening in universal newline mode + t1 = self.writeTmp(b"A\nB\r\nC\rD", mode="wb") + with check_warnings(('', DeprecationWarning)): + fi = FileInput(files=t1, mode="U") + with check_warnings(('', DeprecationWarning)): + lines = list(fi) + self.assertEqual(lines, ["A\n", "B\n", "C\n", "D"]) def test_stdin_binary_mode(self): with mock.patch('sys.stdin') as m_stdin: @@ -313,8 +286,7 @@ class FileInputTests(unittest.TestCase): self.invoked = True return open(*args) - t = writeTmp(1, ["\n"]) - self.addCleanup(remove_tempfiles, t) + t = self.writeTmp("\n") custom_open_hook = CustomOpenHook() with FileInput([t], openhook=custom_open_hook) as fi: fi.readline() @@ -357,27 +329,22 @@ class FileInputTests(unittest.TestCase): self.assertEqual(fi.readline(), b'') def test_context_manager(self): - try: - t1 = writeTmp(1, ["A\nB\nC"]) - t2 = writeTmp(2, ["D\nE\nF"]) - with FileInput(files=(t1, t2)) as fi: - lines = list(fi) - self.assertEqual(lines, ["A\n", "B\n", "C", "D\n", "E\n", "F"]) - self.assertEqual(fi.filelineno(), 3) - self.assertEqual(fi.lineno(), 6) - self.assertEqual(fi._files, ()) - finally: - remove_tempfiles(t1, t2) + t1 = self.writeTmp("A\nB\nC") + t2 = self.writeTmp("D\nE\nF") + with FileInput(files=(t1, t2)) as fi: + lines = list(fi) + self.assertEqual(lines, ["A\n", "B\n", "C", "D\n", "E\n", "F"]) + self.assertEqual(fi.filelineno(), 3) + self.assertEqual(fi.lineno(), 6) + self.assertEqual(fi._files, ()) def test_close_on_exception(self): + t1 = self.writeTmp("") try: - t1 = writeTmp(1, [""]) with FileInput(files=t1) as fi: raise OSError except OSError: self.assertEqual(fi._files, ()) - finally: - remove_tempfiles(t1) def test_empty_files_list_specified_to_constructor(self): with FileInput(files=[]) as fi: @@ -386,8 +353,7 @@ class FileInputTests(unittest.TestCase): def test__getitem__(self): """Tests invoking FileInput.__getitem__() with the current line number""" - t = writeTmp(1, ["line1\n", "line2\n"]) - self.addCleanup(remove_tempfiles, t) + t = self.writeTmp("line1\nline2\n") with FileInput(files=[t]) as fi: retval1 = fi[0] self.assertEqual(retval1, "line1\n") @@ -397,8 +363,7 @@ class FileInputTests(unittest.TestCase): def test__getitem__invalid_key(self): """Tests invoking FileInput.__getitem__() with an index unequal to the line number""" - t = writeTmp(1, ["line1\n", "line2\n"]) - self.addCleanup(remove_tempfiles, t) + t = self.writeTmp("line1\nline2\n") with FileInput(files=[t]) as fi: with self.assertRaises(RuntimeError) as cm: fi[1] @@ -407,8 +372,7 @@ class FileInputTests(unittest.TestCase): def test__getitem__eof(self): """Tests invoking FileInput.__getitem__() with the line number but at end-of-input""" - t = writeTmp(1, []) - self.addCleanup(remove_tempfiles, t) + t = self.writeTmp('') with FileInput(files=[t]) as fi: with self.assertRaises(IndexError) as cm: fi[0] @@ -422,8 +386,8 @@ class FileInputTests(unittest.TestCase): os_unlink_orig = os.unlink os_unlink_replacement = UnconditionallyRaise(OSError) try: - t = writeTmp(1, ["\n"]) - self.addCleanup(remove_tempfiles, t) + t = self.writeTmp("\n") + self.addCleanup(support.unlink, t + '.bak') with FileInput(files=[t], inplace=True) as fi: next(fi) # make sure the file is opened os.unlink = os_unlink_replacement @@ -442,8 +406,7 @@ class FileInputTests(unittest.TestCase): os_fstat_orig = os.fstat os_fstat_replacement = UnconditionallyRaise(OSError) try: - t = writeTmp(1, ["\n"]) - self.addCleanup(remove_tempfiles, t) + t = self.writeTmp("\n") with FileInput(files=[t], inplace=True) as fi: os.fstat = os_fstat_replacement fi.readline() @@ -462,8 +425,7 @@ class FileInputTests(unittest.TestCase): os_chmod_orig = os.chmod os_chmod_replacement = UnconditionallyRaise(OSError) try: - t = writeTmp(1, ["\n"]) - self.addCleanup(remove_tempfiles, t) + t = self.writeTmp("\n") with FileInput(files=[t], inplace=True) as fi: os.chmod = os_chmod_replacement fi.readline() @@ -482,8 +444,7 @@ class FileInputTests(unittest.TestCase): self.__call__() unconditionally_raise_ValueError = FilenoRaisesValueError() - t = writeTmp(1, ["\n"]) - self.addCleanup(remove_tempfiles, t) + t = self.writeTmp("\n") with FileInput(files=[t]) as fi: file_backup = fi._file try: @@ -530,6 +491,7 @@ class FileInputTests(unittest.TestCase): self.assertRaises(StopIteration, next, fi) self.assertEqual(src.linesread, []) + class MockFileInput: """A class that mocks out fileinput.FileInput for use during unit tests"""