return bz2_decode(input, errors)
class IncrementalEncoder(codecs.IncrementalEncoder):
+ def __init__(self, errors='strict'):
+ assert errors == 'strict'
+ self.errors = errors
+ self.compressobj = bz2.BZ2Compressor()
+
def encode(self, input, final=False):
- assert self.errors == 'strict'
- return bz2.compress(input)
+ if final:
+ c = self.compressobj.compress(input)
+ return c + self.compressobj.flush()
+ else:
+ return self.compressobj.compress(input)
+
+ def reset(self):
+ self.compressobj = bz2.BZ2Compressor()
class IncrementalDecoder(codecs.IncrementalDecoder):
+ def __init__(self, errors='strict'):
+ assert errors == 'strict'
+ self.errors = errors
+ self.decompressobj = bz2.BZ2Decompressor()
+
def decode(self, input, final=False):
- assert self.errors == 'strict'
- return bz2.decompress(input)
+ try:
+ return self.decompressobj.decompress(input)
+ except EOFError:
+ return ''
+
+ def reset(self):
+ self.decompressobj = bz2.BZ2Decompressor()
class StreamWriter(Codec,codecs.StreamWriter):
pass
return zlib_decode(input, errors)
class IncrementalEncoder(codecs.IncrementalEncoder):
+ def __init__(self, errors='strict'):
+ assert errors == 'strict'
+ self.errors = errors
+ self.compressobj = zlib.compressobj()
+
def encode(self, input, final=False):
- assert self.errors == 'strict'
- return zlib.compress(input)
+ if final:
+ c = self.compressobj.compress(input)
+ return c + self.compressobj.flush()
+ else:
+ return self.compressobj.compress(input)
+
+ def reset(self):
+ self.compressobj = zlib.compressobj()
class IncrementalDecoder(codecs.IncrementalDecoder):
+ def __init__(self, errors='strict'):
+ assert errors == 'strict'
+ self.errors = errors
+ self.decompressobj = zlib.decompressobj()
+
def decode(self, input, final=False):
- assert self.errors == 'strict'
- return zlib.decompress(input)
+ if final:
+ c = self.decompressobj.decompress(input)
+ return c + self.decompressobj.flush()
+ else:
+ return self.decompressobj.decompress(input)
+
+ def reset(self):
+ self.decompressobj = zlib.decompressobj()
class StreamWriter(Codec,codecs.StreamWriter):
pass
"punycode",
"unicode_internal"
]
+broken_incremental_coders = broken_unicode_with_streams[:]
try:
import bz2
decodedresult += reader.read()
self.assertEqual(decodedresult, s, "%r != %r (encoding=%r)" % (decodedresult, s, encoding))
+ if encoding not in broken_incremental_coders:
# check incremental decoder/encoder (fetched via the Python
# and C API) and iterencode()/iterdecode()
try:
Library
-------
+- Bug #1586613: fix zlib and bz2 codecs' incremental en/decoders.
+
- Patch #1583880: fix tarfile's problems with long names and posix/
GNU modes.