FRAME_SIZE_TARGET = 64 * 1024
+ def check_frame_opcodes(self, pickled):
+ """
+ Check the arguments of FRAME opcodes in a protocol 4+ pickle.
+ """
+ frame_opcode_size = 9
+ last_arg = last_pos = None
+ for op, arg, pos in pickletools.genops(pickled):
+ if op.name != 'FRAME':
+ continue
+ if last_pos is not None:
+ # The previous frame's size should be equal to the number
+ # of bytes up to the current frame.
+ frame_size = pos - last_pos - frame_opcode_size
+ self.assertEqual(frame_size, last_arg)
+ last_arg, last_pos = arg, pos
+ # The last frame's size should be equal to the number of bytes up
+ # to the pickle's end.
+ frame_size = len(pickled) - last_pos - frame_opcode_size
+ self.assertEqual(frame_size, last_arg)
+
def test_framing_many_objects(self):
obj = list(range(10**5))
for proto in range(4, pickle.HIGHEST_PROTOCOL + 1):
self.FRAME_SIZE_TARGET / 2)
self.assertLessEqual(bytes_per_frame,
self.FRAME_SIZE_TARGET * 1)
+ self.check_frame_opcodes(pickled)
def test_framing_large_objects(self):
N = 1024 * 1024
self.assertEqual(obj, unpickled)
n_frames = count_opcode(pickle.FRAME, pickled)
self.assertGreaterEqual(n_frames, len(obj))
+ self.check_frame_opcodes(pickled)
def test_optional_frames(self):
if pickle.HIGHEST_PROTOCOL < 4: