y += dco.flush()
self.assertEqual(y, b'foo')
+ def test_decompress_eof(self):
+ x = b'x\x9cK\xcb\xcf\x07\x00\x02\x82\x01E' # 'foo'
+ dco = zlib.decompressobj()
+ self.assertFalse(dco.eof)
+ dco.decompress(x[:-5])
+ self.assertFalse(dco.eof)
+ dco.decompress(x[-5:])
+ self.assertTrue(dco.eof)
+ dco.flush()
+ self.assertTrue(dco.eof)
+
+ def test_decompress_eof_incomplete_stream(self):
+ x = b'x\x9cK\xcb\xcf\x07\x00\x02\x82\x01E' # 'foo'
+ dco = zlib.decompressobj()
+ self.assertFalse(dco.eof)
+ dco.decompress(x[:-5])
+ self.assertFalse(dco.eof)
+ dco.flush()
+ self.assertFalse(dco.eof)
+
+ def test_decompress_unused_data(self):
+ # Repeated calls to decompress() after EOF should accumulate data in
+ # dco.unused_data, instead of just storing the arg to the last call.
+ x = zlib.compress(HAMLET_SCENE) + HAMLET_SCENE
+ for step in 1, 2, 100:
+ dco = zlib.decompressobj()
+ data = b''.join(dco.decompress(x[i : i + step])
+ for i in range(0, len(x), step))
+ data += dco.flush()
+
++ self.assertTrue(dco.eof)
+ self.assertEqual(data, HAMLET_SCENE)
+ self.assertEqual(dco.unused_data, HAMLET_SCENE)
+
if hasattr(zlib.compressobj(), "copy"):
def test_compresscopy(self):
# Test copying a compression object
preserved.
*/
if (err == Z_STREAM_END) {
- Py_XDECREF(self->unused_data); /* Free original empty string */
- self->unused_data = PyBytes_FromStringAndSize(
- (char *)self->zst.next_in, self->zst.avail_in);
- if (self->unused_data == NULL) {
- Py_DECREF(RetVal);
- goto error;
+ if (self->zst.avail_in > 0) {
+ /* Append the leftover data to the existing value of unused_data. */
+ Py_ssize_t old_size = PyBytes_GET_SIZE(self->unused_data);
+ Py_ssize_t new_size = old_size + self->zst.avail_in;
+ PyObject *new_data;
+ if (new_size <= old_size) { /* Check for overflow. */
+ PyErr_NoMemory();
+ Py_DECREF(RetVal);
+ RetVal = NULL;
+ goto error;
+ }
+ new_data = PyBytes_FromStringAndSize(NULL, new_size);
+ if (new_data == NULL) {
+ Py_DECREF(RetVal);
+ RetVal = NULL;
+ goto error;
+ }
+ Py_MEMCPY(PyBytes_AS_STRING(new_data),
+ PyBytes_AS_STRING(self->unused_data), old_size);
+ Py_MEMCPY(PyBytes_AS_STRING(new_data) + old_size,
+ self->zst.next_in, self->zst.avail_in);
+ Py_DECREF(self->unused_data);
+ self->unused_data = new_data;
}
+ self->eof = 1;
/* We will only get Z_BUF_ERROR if the output buffer was full
but there wasn't more output when we tried again, so it is
not an error condition.