for f in get_files(self):
self.zip_random_open_test(f, self.compression)
+ def zip_read1_test(self, f, compression):
+ self.make_test_archive(f, compression)
+
+ # Read the ZIP archive
+ with zipfile.ZipFile(f, "r") as zipfp, \
+ zipfp.open(TESTFN) as zipopen:
+ zipdata = []
+ while True:
+ read_data = zipopen.read1(-1)
+ if not read_data:
+ break
+ zipdata.append(read_data)
+
+ self.assertEqual(b''.join(zipdata), self.data)
+
+ def test_read1(self):
+ for f in get_files(self):
+ self.zip_read1_test(f, self.compression)
+
+ def zip_read1_10_test(self, f, compression):
+ self.make_test_archive(f, compression)
+
+ # Read the ZIP archive
+ with zipfile.ZipFile(f, "r") as zipfp, \
+ zipfp.open(TESTFN) as zipopen:
+ zipdata = []
+ while True:
+ read_data = zipopen.read1(10)
+ self.assertLessEqual(len(read_data), 10)
+ if not read_data:
+ break
+ zipdata.append(read_data)
+
+ self.assertEqual(b''.join(zipdata), self.data)
+
+ def test_read1_10(self):
+ for f in get_files(self):
+ self.zip_read1_10_test(f, self.compression)
+
def zip_readline_read_test(self, f, compression):
self.make_test_archive(f, compression)
buf = self._readbuffer[self._offset:]
self._readbuffer = b''
self._offset = 0
- data = self._read1(self.MAX_N)
- buf += data
+ while not self._eof:
+ data = self._read1(self.MAX_N)
+ if data:
+ buf += data
+ break
return buf
end = n + self._offset
self._readbuffer = b''
self._offset = 0
if n > 0:
- data = self._read1(n)
- if n < len(data):
- self._readbuffer = data
- self._offset = n
- data = data[:n]
- buf += data
+ while not self._eof:
+ data = self._read1(n)
+ if n < len(data):
+ self._readbuffer = data
+ self._offset = n
+ buf += data[:n]
+ break
+ if data:
+ buf += data
+ break
return buf
def _read1(self, n):