if self.current_frame:
f = self.current_frame
if f.tell() >= self._FRAME_SIZE_TARGET or force:
- with f.getbuffer() as data:
- n = len(data)
- write = self.file_write
- write(FRAME)
- write(pack("<Q", n))
- write(data)
- f.seek(0)
- f.truncate()
+ data = f.getbuffer()
+ write = self.file_write
+ # Issue a single call to the write method of the underlying
+ # file object for the frame opcode with the size of the
+ # frame. The concatenation is expected to be less expensive
+ # than issuing an additional call to write.
+ write(FRAME + pack("<Q", len(data)))
+
+ # Issue a separate call to write to append the frame
+ # contents without concatenation to the above to avoid a
+ # memory copy.
+ write(data)
+
+ # Start the new frame with a new io.BytesIO instance so that
+ # the file object can have delayed access to the previous frame
+ # contents via an unreleased memoryview of the previous
+ # io.BytesIO instance.
+ self.current_frame = io.BytesIO()
def write(self, data):
if self.current_frame:
else:
return self.file_write(data)
+ def write_large_bytes(self, header, payload):
+ write = self.file_write
+ if self.current_frame:
+ # Terminate the current frame and flush it to the file.
+ self.commit_frame(force=True)
+
+ # Perform direct write of the header and payload of the large binary
+ # object. Be careful not to concatenate the header and the payload
+ # prior to calling 'write' as we do not want to allocate a large
+ # temporary bytes object.
+ # We intentionally do not insert a protocol 4 frame opcode to make
+ # it possible to optimize file.read calls in the loader.
+ write(header)
+ write(payload)
+
class _Unframer:
raise TypeError("file must have a 'write' attribute")
self.framer = _Framer(self._file_write)
self.write = self.framer.write
+ self._write_large_bytes = self.framer.write_large_bytes
self.memo = {}
self.proto = int(protocol)
self.bin = protocol >= 1
if n <= 0xff:
self.write(SHORT_BINBYTES + pack("<B", n) + obj)
elif n > 0xffffffff and self.proto >= 4:
- self.write(BINBYTES8 + pack("<Q", n) + obj)
+ self._write_large_bytes(BINBYTES8 + pack("<Q", n), obj)
+ elif n >= self.framer._FRAME_SIZE_TARGET:
+ self._write_large_bytes(BINBYTES + pack("<I", n), obj)
else:
self.write(BINBYTES + pack("<I", n) + obj)
self.memoize(obj)
if n <= 0xff and self.proto >= 4:
self.write(SHORT_BINUNICODE + pack("<B", n) + encoded)
elif n > 0xffffffff and self.proto >= 4:
- self.write(BINUNICODE8 + pack("<Q", n) + encoded)
+ self._write_large_bytes(BINUNICODE8 + pack("<Q", n), encoded)
+ elif n >= self.framer._FRAME_SIZE_TARGET:
+ self._write_large_bytes(BINUNICODE + pack("<I", n), encoded)
else:
self.write(BINUNICODE + pack("<I", n) + encoded)
else:
if arg > proto:
proto = arg
if pos == 0:
- protoheader = p[pos: end_pos]
+ protoheader = p[pos:end_pos]
else:
opcodes.append((pos, end_pos))
else:
pickler.framer.start_framing()
idx = 0
for op, arg in opcodes:
+ frameless = False
if op is put:
if arg not in newids:
continue
data = pickler.get(newids[arg])
else:
data = p[op:arg]
- pickler.framer.commit_frame()
- pickler.write(data)
+ frameless = len(data) > pickler.framer._FRAME_SIZE_TARGET
+ pickler.framer.commit_frame(force=frameless)
+ if frameless:
+ pickler.framer.file_write(data)
+ else:
+ pickler.write(data)
pickler.framer.end_framing()
return out.getvalue()
def check_frame_opcodes(self, pickled):
"""
Check the arguments of FRAME opcodes in a protocol 4+ pickle.
+
+ Note that binary objects that are larger than FRAME_SIZE_TARGET are not
+ framed by default and are therefore considered a frame by themselves in
+ the following consistency check.
"""
- frame_opcode_size = 9
- last_arg = last_pos = None
+ last_arg = last_pos = last_frame_opcode_size = None
+ frameless_opcode_sizes = {
+ 'BINBYTES': 5,
+ 'BINUNICODE': 5,
+ 'BINBYTES8': 9,
+ 'BINUNICODE8': 9,
+ }
for op, arg, pos in pickletools.genops(pickled):
- if op.name != 'FRAME':
+ if op.name in frameless_opcode_sizes:
+ if len(arg) > self.FRAME_SIZE_TARGET:
+ frame_opcode_size = frameless_opcode_sizes[op.name]
+ arg = len(arg)
+ else:
+ continue
+ elif op.name == 'FRAME':
+ frame_opcode_size = 9
+ else:
continue
+
if last_pos is not None:
# The previous frame's size should be equal to the number
# of bytes up to the current frame.
- frame_size = pos - last_pos - frame_opcode_size
+ frame_size = pos - last_pos - last_frame_opcode_size
self.assertEqual(frame_size, last_arg)
last_arg, last_pos = arg, pos
+ last_frame_opcode_size = frame_opcode_size
# The last frame's size should be equal to the number of bytes up
# to the pickle's end.
- frame_size = len(pickled) - last_pos - frame_opcode_size
+ frame_size = len(pickled) - last_pos - last_frame_opcode_size
self.assertEqual(frame_size, last_arg)
def test_framing_many_objects(self):
def test_framing_large_objects(self):
N = 1024 * 1024
- obj = [b'x' * N, b'y' * N, b'z' * N]
+ obj = [b'x' * N, b'y' * N, 'z' * N]
for proto in range(4, pickle.HIGHEST_PROTOCOL + 1):
- with self.subTest(proto=proto):
- pickled = self.dumps(obj, proto)
- unpickled = self.loads(pickled)
- self.assertEqual(obj, unpickled)
- n_frames = count_opcode(pickle.FRAME, pickled)
- self.assertGreaterEqual(n_frames, len(obj))
- self.check_frame_opcodes(pickled)
+ for fast in [True, False]:
+ with self.subTest(proto=proto, fast=fast):
+ if hasattr(self, 'pickler'):
+ buf = io.BytesIO()
+ pickler = self.pickler(buf, protocol=proto)
+ pickler.fast = fast
+ pickler.dump(obj)
+ pickled = buf.getvalue()
+ elif fast:
+ continue
+ else:
+ # Fallback to self.dumps when fast=False and
+ # self.pickler is not available.
+ pickled = self.dumps(obj, proto)
+ unpickled = self.loads(pickled)
+ # More informative error message in case of failure.
+ self.assertEqual([len(x) for x in obj],
+ [len(x) for x in unpickled])
+ # Perform full equality check if the lengths match.
+ self.assertEqual(obj, unpickled)
+ n_frames = count_opcode(pickle.FRAME, pickled)
+ if not fast:
+ # One frame per memoize for each large object.
+ self.assertGreaterEqual(n_frames, len(obj))
+ else:
+ # One frame at the beginning and one at the end.
+ self.assertGreaterEqual(n_frames, 2)
+ self.check_frame_opcodes(pickled)
def test_optional_frames(self):
if pickle.HIGHEST_PROTOCOL < 4:
count_opcode(pickle.FRAME, pickled))
self.assertEqual(obj, self.loads(some_frames_pickle))
+ def test_framed_write_sizes_with_delayed_writer(self):
+ class ChunkAccumulator:
+ """Accumulate pickler output in a list of raw chunks."""
+
+ def __init__(self):
+ self.chunks = []
+
+ def write(self, chunk):
+ self.chunks.append(chunk)
+
+ def concatenate_chunks(self):
+ # Some chunks can be memoryview instances, we need to convert
+ # them to bytes to be able to call join
+ return b"".join([c.tobytes() if hasattr(c, 'tobytes') else c
+ for c in self.chunks])
+
+ small_objects = [(str(i).encode('ascii'), i % 42, {'i': str(i)})
+ for i in range(int(1e4))]
+
+ for proto in range(4, pickle.HIGHEST_PROTOCOL + 1):
+ # Protocol 4 packs groups of small objects into frames and issues
+ # calls to write only once or twice per frame:
+ # The C pickler issues one call to write per-frame (header and
+ # contents) while Python pickler issues two calls to write: one for
+ # the frame header and one for the frame binary contents.
+ writer = ChunkAccumulator()
+ self.pickler(writer, proto).dump(small_objects)
+
+ # Actually read the binary content of the chunks after the end
+ # of the call to dump: ant memoryview passed to write should not
+ # be released otherwise this delayed access would not be possible.
+ pickled = writer.concatenate_chunks()
+ reconstructed = self.loads(pickled)
+ self.assertEqual(reconstructed, small_objects)
+ self.assertGreater(len(writer.chunks), 1)
+
+ n_frames, remainder = divmod(len(pickled), self.FRAME_SIZE_TARGET)
+ if remainder > 0:
+ n_frames += 1
+
+ # There should be at least one call to write per frame
+ self.assertGreaterEqual(len(writer.chunks), n_frames)
+
+ # but not too many either: there can be one for the proto,
+ # one per-frame header and one per frame for the actual contents.
+ self.assertGreaterEqual(2 * n_frames + 1, len(writer.chunks))
+
+ chunk_sizes = [len(c) for c in writer.chunks[:-1]]
+ large_sizes = [s for s in chunk_sizes
+ if s >= self.FRAME_SIZE_TARGET]
+ small_sizes = [s for s in chunk_sizes
+ if s < self.FRAME_SIZE_TARGET]
+
+ # Large chunks should not be too large:
+ for chunk_size in large_sizes:
+ self.assertGreater(2 * self.FRAME_SIZE_TARGET, chunk_size)
+
+ last_chunk_size = len(writer.chunks[-1])
+ self.assertGreater(2 * self.FRAME_SIZE_TARGET, last_chunk_size)
+
+ # Small chunks (if any) should be very small
+ # (only proto and frame headers)
+ for chunk_size in small_sizes:
+ self.assertGreaterEqual(9, chunk_size)
+
def test_nested_names(self):
global Nested
class Nested:
# Test relies on precise output of dumps()
test_pickle_to_2x = None
+ # Test relies on writing by chunks into a file object.
+ test_framed_write_sizes_with_delayed_writer = None
+
def test_optimize_long_binget(self):
data = [str(i) for i in range(257)]
data.append(data[-1])
--- /dev/null
+The picklers do no longer allocate temporary memory when dumping large
+``bytes`` and ``str`` objects into a file object. Instead the data is
+directly streamed into the underlying file object.
+
+Previously the C implementation would buffer all content and issue a
+single call to ``file.write`` at the end of the dump. With protocol 4
+this behavior has changed to issue one call to ``file.write`` per frame.
+
+The Python pickler with protocol 4 now dumps each frame content as a
+memoryview to an IOBytes instance that is never reused and the
+memoryview is no longer released after the call to write. This makes it
+possible for the file object to delay access to the memoryview of
+previous frames without forcing any additional memory copy as was
+already possible with the C pickler.
return 0;
}
-static int
-_Pickler_OpcodeBoundary(PicklerObject *self)
-{
- Py_ssize_t frame_len;
-
- if (!self->framing || self->frame_start == -1)
- return 0;
- frame_len = self->output_len - self->frame_start - FRAME_HEADER_SIZE;
- if (frame_len >= FRAME_SIZE_TARGET)
- return _Pickler_CommitFrame(self);
- else
- return 0;
-}
-
static PyObject *
_Pickler_GetString(PicklerObject *self)
{
return (result == NULL) ? -1 : 0;
}
+static int
+_Pickler_OpcodeBoundary(PicklerObject *self)
+{
+ Py_ssize_t frame_len;
+
+ if (!self->framing || self->frame_start == -1) {
+ return 0;
+ }
+ frame_len = self->output_len - self->frame_start - FRAME_HEADER_SIZE;
+ if (frame_len >= FRAME_SIZE_TARGET) {
+ if(_Pickler_CommitFrame(self)) {
+ return -1;
+ }
+ /* Flush the content of the commited frame to the underlying
+ * file and reuse the pickler buffer for the next frame so as
+ * to limit memory usage when dumping large complex objects to
+ * a file.
+ *
+ * self->write is NULL when called via dumps.
+ */
+ if (self->write != NULL) {
+ if (_Pickler_FlushToFile(self) < 0) {
+ return -1;
+ }
+ if (_Pickler_ClearBuffer(self) < 0) {
+ return -1;
+ }
+ }
+ }
+ return 0;
+}
+
static Py_ssize_t
_Pickler_Write(PicklerObject *self, const char *s, Py_ssize_t data_len)
{
return 0;
}
+/* No-copy code-path to write large contiguous data directly into the
+ underlying file object, bypassing the output_buffer of the Pickler. */
+static int
+_Pickler_write_large_bytes(
+ PicklerObject *self, const char *header, Py_ssize_t header_size,
+ PyObject *payload)
+{
+ assert(self->output_buffer != NULL);
+ assert(self->write != NULL);
+ PyObject *result;
+
+ /* Commit the previous frame. */
+ if (_Pickler_CommitFrame(self)) {
+ return -1;
+ }
+ /* Disable frameing temporarily */
+ self->framing = 0;
+
+ if (_Pickler_Write(self, header, header_size) < 0) {
+ return -1;
+ }
+ /* Dump the output buffer to the file. */
+ if (_Pickler_FlushToFile(self) < 0) {
+ return -1;
+ }
+
+ /* Stream write the payload into the file without going through the
+ output buffer. */
+ result = PyObject_CallFunctionObjArgs(self->write, payload, NULL);
+ if (result == NULL) {
+ return -1;
+ }
+ Py_DECREF(result);
+
+ /* Reinitialize the buffer for subsequent calls to _Pickler_Write. */
+ if (_Pickler_ClearBuffer(self) < 0) {
+ return -1;
+ }
+
+ /* Re-enable framing for subsequent calls to _Pickler_Write. */
+ self->framing = 1;
+
+ return 0;
+}
+
static int
save_bytes(PicklerObject *self, PyObject *obj)
{
return -1; /* string too large */
}
- if (_Pickler_Write(self, header, len) < 0)
- return -1;
-
- if (_Pickler_Write(self, PyBytes_AS_STRING(obj), size) < 0)
- return -1;
+ if (size < FRAME_SIZE_TARGET || self->write == NULL) {
+ if (_Pickler_Write(self, header, len) < 0) {
+ return -1;
+ }
+ if (_Pickler_Write(self, PyBytes_AS_STRING(obj), size) < 0) {
+ return -1;
+ }
+ }
+ else {
+ /* Bypass the in-memory buffer to directly stream large data
+ into the underlying file object. */
+ if (_Pickler_write_large_bytes(self, header, len, obj) < 0) {
+ return -1;
+ }
+ }
if (memo_put(self, obj) < 0)
return -1;
{
char header[9];
Py_ssize_t len;
+ PyObject *mem;
assert(size >= 0);
if (size <= 0xff && self->proto >= 4) {
return -1;
}
- if (_Pickler_Write(self, header, len) < 0)
- return -1;
- if (_Pickler_Write(self, data, size) < 0)
- return -1;
-
+ if (size < FRAME_SIZE_TARGET || self->write == NULL) {
+ if (_Pickler_Write(self, header, len) < 0) {
+ return -1;
+ }
+ if (_Pickler_Write(self, data, size) < 0) {
+ return -1;
+ }
+ }
+ else {
+ /* Bypass the in-memory buffer to directly stream large data
+ into the underlying file object. */
+ mem = PyMemoryView_FromMemory((char *) data, size, PyBUF_READ);
+ if (mem == NULL) {
+ return -1;
+ }
+ if (_Pickler_write_large_bytes(self, header, len, mem) < 0) {
+ Py_DECREF(mem);
+ return -1;
+ }
+ Py_DECREF(mem);
+ }
return 0;
}