From 5f48e2644dcfb47f0bbc0fcdc2b103a19bdec288 Mon Sep 17 00:00:00 2001 From: Serhiy Storchaka Date: Tue, 5 Jun 2018 12:08:36 +0300 Subject: [PATCH] bpo-33753: Refactor creating temporary files in test_fileinput. (GH-7377) --- Lib/test/test_fileinput.py | 249 +++++++++++++++---------------------- 1 file changed, 101 insertions(+), 148 deletions(-) diff --git a/Lib/test/test_fileinput.py b/Lib/test/test_fileinput.py index d7efc685d8..c97bb4cb3b 100644 --- a/Lib/test/test_fileinput.py +++ b/Lib/test/test_fileinput.py @@ -8,6 +8,7 @@ import re import fileinput import collections import builtins +import tempfile import unittest try: @@ -33,20 +34,15 @@ from unittest import mock # all the work, and a few functions (input, etc.) that use a global _state # variable. -# Write lines (a list of lines) to temp file number i, and return the -# temp file's name. -def writeTmp(i, lines, mode='w'): # opening in text mode is the default - name = TESTFN + str(i) - f = open(name, mode) - for line in lines: - f.write(line) - f.close() - return name - -def remove_tempfiles(*names): - for name in names: - if name: - safe_unlink(name) +class BaseTests: + # Write a content (str or bytes) to temp file, and return the + # temp file's name. + def writeTmp(self, content, *, mode='w'): # opening in text mode is the default + fd, name = tempfile.mkstemp() + self.addCleanup(support.unlink, name) + with open(fd, mode) as f: + f.write(content) + return name class LineReader: @@ -84,23 +80,19 @@ class LineReader: def close(self): pass -class BufferSizesTests(unittest.TestCase): +class BufferSizesTests(BaseTests, unittest.TestCase): def test_buffer_sizes(self): # First, run the tests with default and teeny buffer size. for round, bs in (0, 0), (1, 30): - t1 = t2 = t3 = t4 = None - try: - t1 = writeTmp(1, ["Line %s of file 1\n" % (i+1) for i in range(15)]) - t2 = writeTmp(2, ["Line %s of file 2\n" % (i+1) for i in range(10)]) - t3 = writeTmp(3, ["Line %s of file 3\n" % (i+1) for i in range(5)]) - t4 = writeTmp(4, ["Line %s of file 4\n" % (i+1) for i in range(1)]) - if bs: - with self.assertWarns(DeprecationWarning): - self.buffer_size_test(t1, t2, t3, t4, bs, round) - else: + t1 = self.writeTmp(''.join("Line %s of file 1\n" % (i+1) for i in range(15))) + t2 = self.writeTmp(''.join("Line %s of file 2\n" % (i+1) for i in range(10))) + t3 = self.writeTmp(''.join("Line %s of file 3\n" % (i+1) for i in range(5))) + t4 = self.writeTmp(''.join("Line %s of file 4\n" % (i+1) for i in range(1))) + if bs: + with self.assertWarns(DeprecationWarning): self.buffer_size_test(t1, t2, t3, t4, bs, round) - finally: - remove_tempfiles(t1, t2, t3, t4) + else: + self.buffer_size_test(t1, t2, t3, t4, bs, round) def buffer_size_test(self, t1, t2, t3, t4, bs=0, round=0): pat = re.compile(r'LINE (\d+) OF FILE (\d+)') @@ -187,74 +179,59 @@ class UnconditionallyRaise: self.invoked = True raise self.exception_type() -class FileInputTests(unittest.TestCase): +class FileInputTests(BaseTests, unittest.TestCase): def test_zero_byte_files(self): - t1 = t2 = t3 = t4 = None - try: - t1 = writeTmp(1, [""]) - t2 = writeTmp(2, [""]) - t3 = writeTmp(3, ["The only line there is.\n"]) - t4 = writeTmp(4, [""]) - fi = FileInput(files=(t1, t2, t3, t4)) - - line = fi.readline() - self.assertEqual(line, 'The only line there is.\n') - self.assertEqual(fi.lineno(), 1) - self.assertEqual(fi.filelineno(), 1) - self.assertEqual(fi.filename(), t3) - - line = fi.readline() - self.assertFalse(line) - self.assertEqual(fi.lineno(), 1) - self.assertEqual(fi.filelineno(), 0) - self.assertEqual(fi.filename(), t4) - fi.close() - finally: - remove_tempfiles(t1, t2, t3, t4) + t1 = self.writeTmp("") + t2 = self.writeTmp("") + t3 = self.writeTmp("The only line there is.\n") + t4 = self.writeTmp("") + fi = FileInput(files=(t1, t2, t3, t4)) + + line = fi.readline() + self.assertEqual(line, 'The only line there is.\n') + self.assertEqual(fi.lineno(), 1) + self.assertEqual(fi.filelineno(), 1) + self.assertEqual(fi.filename(), t3) + + line = fi.readline() + self.assertFalse(line) + self.assertEqual(fi.lineno(), 1) + self.assertEqual(fi.filelineno(), 0) + self.assertEqual(fi.filename(), t4) + fi.close() def test_files_that_dont_end_with_newline(self): - t1 = t2 = None - try: - t1 = writeTmp(1, ["A\nB\nC"]) - t2 = writeTmp(2, ["D\nE\nF"]) - fi = FileInput(files=(t1, t2)) - lines = list(fi) - self.assertEqual(lines, ["A\n", "B\n", "C", "D\n", "E\n", "F"]) - self.assertEqual(fi.filelineno(), 3) - self.assertEqual(fi.lineno(), 6) - finally: - remove_tempfiles(t1, t2) + t1 = self.writeTmp("A\nB\nC") + t2 = self.writeTmp("D\nE\nF") + fi = FileInput(files=(t1, t2)) + lines = list(fi) + self.assertEqual(lines, ["A\n", "B\n", "C", "D\n", "E\n", "F"]) + self.assertEqual(fi.filelineno(), 3) + self.assertEqual(fi.lineno(), 6) ## def test_unicode_filenames(self): ## # XXX A unicode string is always returned by writeTmp. ## # So is this needed? -## try: -## t1 = writeTmp(1, ["A\nB"]) -## encoding = sys.getfilesystemencoding() -## if encoding is None: -## encoding = 'ascii' -## fi = FileInput(files=str(t1, encoding)) -## lines = list(fi) -## self.assertEqual(lines, ["A\n", "B"]) -## finally: -## remove_tempfiles(t1) +## t1 = self.writeTmp("A\nB") +## encoding = sys.getfilesystemencoding() +## if encoding is None: +## encoding = 'ascii' +## fi = FileInput(files=str(t1, encoding)) +## lines = list(fi) +## self.assertEqual(lines, ["A\n", "B"]) def test_fileno(self): - t1 = t2 = None - try: - t1 = writeTmp(1, ["A\nB"]) - t2 = writeTmp(2, ["C\nD"]) - fi = FileInput(files=(t1, t2)) - self.assertEqual(fi.fileno(), -1) - line =next( fi) - self.assertNotEqual(fi.fileno(), -1) - fi.nextfile() - self.assertEqual(fi.fileno(), -1) - line = list(fi) - self.assertEqual(fi.fileno(), -1) - finally: - remove_tempfiles(t1, t2) + t1 = self.writeTmp("A\nB") + t2 = self.writeTmp("C\nD") + fi = FileInput(files=(t1, t2)) + self.assertEqual(fi.fileno(), -1) + line = next(fi) + self.assertNotEqual(fi.fileno(), -1) + fi.nextfile() + self.assertEqual(fi.fileno(), -1) + line = list(fi) + self.assertEqual(fi.fileno(), -1) def test_opening_mode(self): try: @@ -263,17 +240,13 @@ class FileInputTests(unittest.TestCase): self.fail("FileInput should reject invalid mode argument") except ValueError: pass - t1 = None - try: - # try opening in universal newline mode - t1 = writeTmp(1, [b"A\nB\r\nC\rD"], mode="wb") - with check_warnings(('', DeprecationWarning)): - fi = FileInput(files=t1, mode="U") - with check_warnings(('', DeprecationWarning)): - lines = list(fi) - self.assertEqual(lines, ["A\n", "B\n", "C\n", "D"]) - finally: - remove_tempfiles(t1) + # try opening in universal newline mode + t1 = self.writeTmp(b"A\nB\r\nC\rD", mode="wb") + with check_warnings(('', DeprecationWarning)): + fi = FileInput(files=t1, mode="U") + with check_warnings(('', DeprecationWarning)): + lines = list(fi) + self.assertEqual(lines, ["A\n", "B\n", "C\n", "D"]) def test_stdin_binary_mode(self): with mock.patch('sys.stdin') as m_stdin: @@ -314,8 +287,7 @@ class FileInputTests(unittest.TestCase): self.invoked = True return open(*args) - t = writeTmp(1, ["\n"]) - self.addCleanup(remove_tempfiles, t) + t = self.writeTmp("\n") custom_open_hook = CustomOpenHook() with FileInput([t], openhook=custom_open_hook) as fi: fi.readline() @@ -358,27 +330,22 @@ class FileInputTests(unittest.TestCase): self.assertEqual(fi.readline(), b'') def test_context_manager(self): - try: - t1 = writeTmp(1, ["A\nB\nC"]) - t2 = writeTmp(2, ["D\nE\nF"]) - with FileInput(files=(t1, t2)) as fi: - lines = list(fi) - self.assertEqual(lines, ["A\n", "B\n", "C", "D\n", "E\n", "F"]) - self.assertEqual(fi.filelineno(), 3) - self.assertEqual(fi.lineno(), 6) - self.assertEqual(fi._files, ()) - finally: - remove_tempfiles(t1, t2) + t1 = self.writeTmp("A\nB\nC") + t2 = self.writeTmp("D\nE\nF") + with FileInput(files=(t1, t2)) as fi: + lines = list(fi) + self.assertEqual(lines, ["A\n", "B\n", "C", "D\n", "E\n", "F"]) + self.assertEqual(fi.filelineno(), 3) + self.assertEqual(fi.lineno(), 6) + self.assertEqual(fi._files, ()) def test_close_on_exception(self): + t1 = self.writeTmp("") try: - t1 = writeTmp(1, [""]) with FileInput(files=t1) as fi: raise OSError except OSError: self.assertEqual(fi._files, ()) - finally: - remove_tempfiles(t1) def test_empty_files_list_specified_to_constructor(self): with FileInput(files=[]) as fi: @@ -387,8 +354,7 @@ class FileInputTests(unittest.TestCase): def test__getitem__(self): """Tests invoking FileInput.__getitem__() with the current line number""" - t = writeTmp(1, ["line1\n", "line2\n"]) - self.addCleanup(remove_tempfiles, t) + t = self.writeTmp("line1\nline2\n") with FileInput(files=[t]) as fi: retval1 = fi[0] self.assertEqual(retval1, "line1\n") @@ -398,8 +364,7 @@ class FileInputTests(unittest.TestCase): def test__getitem__invalid_key(self): """Tests invoking FileInput.__getitem__() with an index unequal to the line number""" - t = writeTmp(1, ["line1\n", "line2\n"]) - self.addCleanup(remove_tempfiles, t) + t = self.writeTmp("line1\nline2\n") with FileInput(files=[t]) as fi: with self.assertRaises(RuntimeError) as cm: fi[1] @@ -408,8 +373,7 @@ class FileInputTests(unittest.TestCase): def test__getitem__eof(self): """Tests invoking FileInput.__getitem__() with the line number but at end-of-input""" - t = writeTmp(1, []) - self.addCleanup(remove_tempfiles, t) + t = self.writeTmp('') with FileInput(files=[t]) as fi: with self.assertRaises(IndexError) as cm: fi[0] @@ -423,8 +387,8 @@ class FileInputTests(unittest.TestCase): os_unlink_orig = os.unlink os_unlink_replacement = UnconditionallyRaise(OSError) try: - t = writeTmp(1, ["\n"]) - self.addCleanup(remove_tempfiles, t) + t = self.writeTmp("\n") + self.addCleanup(support.unlink, t + '.bak') with FileInput(files=[t], inplace=True) as fi: next(fi) # make sure the file is opened os.unlink = os_unlink_replacement @@ -443,8 +407,7 @@ class FileInputTests(unittest.TestCase): os_fstat_orig = os.fstat os_fstat_replacement = UnconditionallyRaise(OSError) try: - t = writeTmp(1, ["\n"]) - self.addCleanup(remove_tempfiles, t) + t = self.writeTmp("\n") with FileInput(files=[t], inplace=True) as fi: os.fstat = os_fstat_replacement fi.readline() @@ -463,8 +426,7 @@ class FileInputTests(unittest.TestCase): os_chmod_orig = os.chmod os_chmod_replacement = UnconditionallyRaise(OSError) try: - t = writeTmp(1, ["\n"]) - self.addCleanup(remove_tempfiles, t) + t = self.writeTmp("\n") with FileInput(files=[t], inplace=True) as fi: os.chmod = os_chmod_replacement fi.readline() @@ -483,8 +445,7 @@ class FileInputTests(unittest.TestCase): self.__call__() unconditionally_raise_ValueError = FilenoRaisesValueError() - t = writeTmp(1, ["\n"]) - self.addCleanup(remove_tempfiles, t) + t = self.writeTmp("\n") with FileInput(files=[t]) as fi: file_backup = fi._file try: @@ -532,30 +493,22 @@ class FileInputTests(unittest.TestCase): self.assertEqual(src.linesread, []) def test_pathlib_file(self): - t1 = None - try: - t1 = Path(writeTmp(1, ["Pathlib file."])) - with FileInput(t1) as fi: - line = fi.readline() - self.assertEqual(line, 'Pathlib file.') - self.assertEqual(fi.lineno(), 1) - self.assertEqual(fi.filelineno(), 1) - self.assertEqual(fi.filename(), os.fspath(t1)) - finally: - remove_tempfiles(t1) + t1 = Path(self.writeTmp("Pathlib file.")) + with FileInput(t1) as fi: + line = fi.readline() + self.assertEqual(line, 'Pathlib file.') + self.assertEqual(fi.lineno(), 1) + self.assertEqual(fi.filelineno(), 1) + self.assertEqual(fi.filename(), os.fspath(t1)) def test_pathlib_file_inplace(self): - t1 = None - try: - t1 = Path(writeTmp(1, ['Pathlib file.'])) - with FileInput(t1, inplace=True) as fi: - line = fi.readline() - self.assertEqual(line, 'Pathlib file.') - print('Modified %s' % line) - with open(t1) as f: - self.assertEqual(f.read(), 'Modified Pathlib file.\n') - finally: - remove_tempfiles(t1) + t1 = Path(self.writeTmp('Pathlib file.')) + with FileInput(t1, inplace=True) as fi: + line = fi.readline() + self.assertEqual(line, 'Pathlib file.') + print('Modified %s' % line) + with open(t1) as f: + self.assertEqual(f.read(), 'Modified Pathlib file.\n') class MockFileInput: -- 2.40.0