import socket
import errno
import copy
+import warnings
import email
import email.message
import email.generator
'Message', 'MaildirMessage', 'mboxMessage', 'MHMessage',
'BabylMessage', 'MMDFMessage']
+linesep = os.linesep.encode('ascii')
+
class Mailbox:
"""A group of messages in a particular place."""
raise NotImplementedError('Method must be implemented by subclass')
def get_string(self, key):
- """Return a string representation or raise a KeyError."""
+ """Return a string representation or raise a KeyError.
+
+ Uses email.message.Message to create a 7bit clean string
+ representation of the message."""
+ return email.message_from_bytes(self.get_bytes(key)).as_string()
+
+ def get_bytes(self, key):
+ """Return a byte string representation or raise a KeyError."""
raise NotImplementedError('Method must be implemented by subclass')
def get_file(self, key):
"""Flush and close the mailbox."""
raise NotImplementedError('Method must be implemented by subclass')
+ def _string_to_bytes(self, message):
+ # If a message is not 7bit clean, we refuse to handle it since it
+ # likely came from reading invalid messages in text mode, and that way
+ # lies mojibake.
+ try:
+ return message.encode('ascii')
+ except UnicodeError:
+ raise ValueError("String input must be ASCII-only; "
+ "use bytes or a Message instead")
+
def _dump_message(self, message, target, mangle_from_=False):
- # This assumes the target file is open in *text* mode with the
- # desired encoding and newline setting.
+ # This assumes the target file is open in binary mode.
"""Dump message contents to target file."""
if isinstance(message, email.message.Message):
- buffer = io.StringIO()
- gen = email.generator.Generator(buffer, mangle_from_, 0)
+ buffer = io.BytesIO()
+ gen = email.generator.BytesGenerator(buffer, mangle_from_, 0)
gen.flatten(message)
buffer.seek(0)
data = buffer.read()
- ##data = data.replace('\n', os.linesep)
+ data = data.replace(b'\n', linesep)
target.write(data)
- elif isinstance(message, str):
+ elif isinstance(message, (str, bytes, io.StringIO)):
+ if isinstance(message, io.StringIO):
+ warnings.warn("Use of StringIO input is deprecated, "
+ "use BytesIO instead", DeprecationWarning, 3)
+ message = message.getvalue()
+ if isinstance(message, str):
+ message = self._string_to_bytes(message)
if mangle_from_:
- message = message.replace('\nFrom ', '\n>From ')
- ##message = message.replace('\n', os.linesep)
+ message = message.replace(b'\nFrom ', b'\n>From ')
+ message = message.replace(b'\n', linesep)
target.write(message)
elif hasattr(message, 'read'):
+ if hasattr(message, 'buffer'):
+ warnings.warn("Use of text mode files is deprecated, "
+ "use a binary mode file instead", DeprecationWarning, 3)
+ message = message.buffer
while True:
line = message.readline()
+ # Universal newline support.
+ if line.endswith(b'\r\n'):
+ line = line[:-2] + b'\n'
+ elif line.endswith(b'\r'):
+ line = line[:-1] + b'\n'
if not line:
break
- if mangle_from_ and line.startswith('From '):
- line = '>From ' + line[5:]
- ##line = line.replace('\n', os.linesep)
+ if mangle_from_ and line.startswith(b'From '):
+ line = b'>From ' + line[5:]
+ line = line.replace(b'\n', linesep)
target.write(line)
else:
raise TypeError('Invalid message type: %s' % type(message))
def get_message(self, key):
"""Return a Message representation or raise a KeyError."""
subpath = self._lookup(key)
- f = open(os.path.join(self._path, subpath), 'r', newline='')
+ f = open(os.path.join(self._path, subpath), 'rb')
try:
if self._factory:
msg = self._factory(f)
msg.set_date(os.path.getmtime(os.path.join(self._path, subpath)))
return msg
- def get_string(self, key):
- """Return a string representation or raise a KeyError."""
- f = open(os.path.join(self._path, self._lookup(key)), 'r', newline='')
+ def get_bytes(self, key):
+ """Return a bytes representation or raise a KeyError."""
+ f = open(os.path.join(self._path, self._lookup(key)), 'rb')
try:
- return f.read()
+ return f.read().replace(linesep, b'\n')
finally:
f.close()
def get_file(self, key):
"""Return a file-like representation or raise a KeyError."""
- f = open(os.path.join(self._path, self._lookup(key)), 'r', newline='')
+ f = open(os.path.join(self._path, self._lookup(key)), 'rb')
return _ProxyFile(f)
def iterkeys(self):
"""Initialize a single-file mailbox."""
Mailbox.__init__(self, path, factory, create)
try:
- f = open(self._path, 'r+', newline='')
+ f = open(self._path, 'rb+')
except IOError as e:
if e.errno == errno.ENOENT:
if create:
- f = open(self._path, 'w+', newline='')
+ f = open(self._path, 'wb+')
else:
raise NoSuchMailboxError(self._path)
elif e.errno == errno.EACCES:
- f = open(self._path, 'r', newline='')
+ f = open(self._path, 'rb')
else:
raise
self._file = f
"""Return a Message representation or raise a KeyError."""
start, stop = self._lookup(key)
self._file.seek(start)
- from_line = self._file.readline().replace(os.linesep, '')
+ from_line = self._file.readline().replace(linesep, b'')
string = self._file.read(stop - self._file.tell())
- msg = self._message_factory(string.replace(os.linesep, '\n'))
- msg.set_from(from_line[5:])
+ msg = self._message_factory(string.replace(linesep, b'\n'))
+ msg.set_from(from_line[5:].decode('ascii'))
return msg
def get_string(self, key, from_=False):
+ """Return a string representation or raise a KeyError."""
+ return email.message_from_bytes(
+ self.get_bytes(key)).as_string(unixfrom=from_)
+
+ def get_bytes(self, key, from_=False):
"""Return a string representation or raise a KeyError."""
start, stop = self._lookup(key)
self._file.seek(start)
if not from_:
self._file.readline()
string = self._file.read(stop - self._file.tell())
- return string.replace(os.linesep, '\n')
+ return string.replace(linesep, b'\n')
def get_file(self, key, from_=False):
"""Return a file-like representation or raise a KeyError."""
def _install_message(self, message):
"""Format a message and blindly write to self._file."""
from_line = None
- if isinstance(message, str) and message.startswith('From '):
- newline = message.find('\n')
+ if isinstance(message, str):
+ message = self._string_to_bytes(message)
+ if isinstance(message, bytes) and message.startswith(b'From '):
+ newline = message.find(b'\n')
if newline != -1:
from_line = message[:newline]
message = message[newline + 1:]
else:
from_line = message
- message = ''
+ message = b''
elif isinstance(message, _mboxMMDFMessage):
- from_line = 'From ' + message.get_from()
+ author = message.get_from().encode('ascii')
+ from_line = b'From ' + author
elif isinstance(message, email.message.Message):
from_line = message.get_unixfrom() # May be None.
+ if from_line is not None:
+ from_line = from_line.encode('ascii')
if from_line is None:
- from_line = 'From MAILER-DAEMON %s' % time.asctime(time.gmtime())
+ from_line = b'From MAILER-DAEMON ' + time.asctime(time.gmtime()).encode()
start = self._file.tell()
- self._file.write(from_line + os.linesep)
+ self._file.write(from_line + linesep)
self._dump_message(message, self._file, self._mangle_from_)
stop = self._file.tell()
return (start, stop)
def _pre_message_hook(self, f):
"""Called before writing each message to file f."""
if f.tell() != 0:
- f.write(os.linesep)
+ f.write(linesep)
def _generate_toc(self):
"""Generate key-to-(start, stop) table of contents."""
while True:
line_pos = self._file.tell()
line = self._file.readline()
- if line.startswith('From '):
+ if line.startswith(b'From '):
if len(stops) < len(starts):
- stops.append(line_pos - len(os.linesep))
+ stops.append(line_pos - len(linesep))
starts.append(line_pos)
elif not line:
stops.append(line_pos)
def _pre_message_hook(self, f):
"""Called before writing each message to file f."""
- f.write('\001\001\001\001' + os.linesep)
+ f.write(b'\001\001\001\001' + linesep)
def _post_message_hook(self, f):
"""Called after writing each message to file f."""
- f.write(os.linesep + '\001\001\001\001' + os.linesep)
+ f.write(linesep + b'\001\001\001\001' + linesep)
def _generate_toc(self):
"""Generate key-to-(start, stop) table of contents."""
line_pos = next_pos
line = self._file.readline()
next_pos = self._file.tell()
- if line.startswith('\001\001\001\001' + os.linesep):
+ if line.startswith(b'\001\001\001\001' + linesep):
starts.append(next_pos)
while True:
line_pos = next_pos
line = self._file.readline()
next_pos = self._file.tell()
- if line == '\001\001\001\001' + os.linesep:
- stops.append(line_pos - len(os.linesep))
+ if line == b'\001\001\001\001' + linesep:
+ stops.append(line_pos - len(linesep))
break
elif not line:
stops.append(line_pos)
"""Replace the keyed message; raise KeyError if it doesn't exist."""
path = os.path.join(self._path, str(key))
try:
- f = open(path, 'r+', newline='')
+ f = open(path, 'rb+')
except IOError as e:
if e.errno == errno.ENOENT:
raise KeyError('No message with key: %s' % key)
"""Return a Message representation or raise a KeyError."""
try:
if self._locked:
- f = open(os.path.join(self._path, str(key)), 'r+', newline='')
+ f = open(os.path.join(self._path, str(key)), 'rb+')
else:
- f = open(os.path.join(self._path, str(key)), 'r', newline='')
+ f = open(os.path.join(self._path, str(key)), 'rb')
except IOError as e:
if e.errno == errno.ENOENT:
raise KeyError('No message with key: %s' % key)
msg.add_sequence(name)
return msg
- def get_string(self, key):
- """Return a string representation or raise a KeyError."""
+ def get_bytes(self, key):
+ """Return a bytes representation or raise a KeyError."""
try:
if self._locked:
- f = open(os.path.join(self._path, str(key)), 'r+', newline='')
+ f = open(os.path.join(self._path, str(key)), 'rb+')
else:
- f = open(os.path.join(self._path, str(key)), 'r', newline='')
+ f = open(os.path.join(self._path, str(key)), 'rb')
except IOError as e:
if e.errno == errno.ENOENT:
raise KeyError('No message with key: %s' % key)
if self._locked:
_lock_file(f)
try:
- return f.read()
+ return f.read().replace(linesep, b'\n')
finally:
if self._locked:
_unlock_file(f)
def get_file(self, key):
"""Return a file-like representation or raise a KeyError."""
try:
- f = open(os.path.join(self._path, str(key)), 'r', newline='')
+ f = open(os.path.join(self._path, str(key)), 'rb')
except IOError as e:
if e.errno == errno.ENOENT:
raise KeyError('No message with key: %s' % key)
def get_sequences(self):
"""Return a name-to-key-list dictionary to define each sequence."""
results = {}
- f = open(os.path.join(self._path, '.mh_sequences'), 'r', newline='')
+ f = open(os.path.join(self._path, '.mh_sequences'), 'r')
try:
all_keys = set(self.keys())
for line in f:
def set_sequences(self, sequences):
"""Set sequences using the given name-to-key-list dictionary."""
- f = open(os.path.join(self._path, '.mh_sequences'), 'r+', newline='')
+ f = open(os.path.join(self._path, '.mh_sequences'), 'r+')
try:
os.close(os.open(f.name, os.O_WRONLY | os.O_TRUNC))
for name, keys in sequences.items():
if len(keys) == 0:
continue
- f.write('%s:' % name)
+ f.write(name + ':')
prev = None
completing = False
for key in sorted(set(keys)):
"""Return a Message representation or raise a KeyError."""
start, stop = self._lookup(key)
self._file.seek(start)
- self._file.readline() # Skip '1,' line specifying labels.
- original_headers = io.StringIO()
+ self._file.readline() # Skip b'1,' line specifying labels.
+ original_headers = io.BytesIO()
while True:
line = self._file.readline()
- if line == '*** EOOH ***' + os.linesep or not line:
+ if line == b'*** EOOH ***' + linesep or not line:
break
- original_headers.write(line.replace(os.linesep, '\n'))
- visible_headers = io.StringIO()
+ original_headers.write(line.replace(linesep, b'\n'))
+ visible_headers = io.BytesIO()
while True:
line = self._file.readline()
- if line == os.linesep or not line:
+ if line == linesep or not line:
break
- visible_headers.write(line.replace(os.linesep, '\n'))
- body = self._file.read(stop - self._file.tell()).replace(os.linesep,
- '\n')
+ visible_headers.write(line.replace(linesep, b'\n'))
+ # Read up to the stop, or to the end
+ n = stop - self._file.tell()
+ assert n >= 0
+ body = self._file.read(n)
+ body = body.replace(linesep, b'\n')
msg = BabylMessage(original_headers.getvalue() + body)
msg.set_visible(visible_headers.getvalue())
if key in self._labels:
msg.set_labels(self._labels[key])
return msg
- def get_string(self, key):
+ def get_bytes(self, key):
"""Return a string representation or raise a KeyError."""
start, stop = self._lookup(key)
self._file.seek(start)
- self._file.readline() # Skip '1,' line specifying labels.
- original_headers = io.StringIO()
+ self._file.readline() # Skip b'1,' line specifying labels.
+ original_headers = io.BytesIO()
while True:
line = self._file.readline()
- if line == '*** EOOH ***' + os.linesep or not line:
+ if line == b'*** EOOH ***' + linesep or not line:
break
- original_headers.write(line.replace(os.linesep, '\n'))
+ original_headers.write(line.replace(linesep, b'\n'))
while True:
line = self._file.readline()
- if line == os.linesep or not line:
+ if line == linesep or not line:
break
- return original_headers.getvalue() + \
- self._file.read(stop - self._file.tell()).replace(os.linesep,
- '\n')
+ headers = original_headers.getvalue()
+ n = stop - self._file.tell()
+ assert n >= 0
+ data = self._file.read(n)
+ data = data.replace(linesep, b'\n')
+ return headers + data
def get_file(self, key):
"""Return a file-like representation or raise a KeyError."""
- return io.StringIO(self.get_string(key).replace('\n',
- os.linesep))
+ return io.BytesIO(self.get_bytes(key).replace(b'\n', linesep))
def get_labels(self):
"""Return a list of user-defined labels in the mailbox."""
line_pos = next_pos
line = self._file.readline()
next_pos = self._file.tell()
- if line == '\037\014' + os.linesep:
+ if line == b'\037\014' + linesep:
if len(stops) < len(starts):
- stops.append(line_pos - len(os.linesep))
+ stops.append(line_pos - len(linesep))
starts.append(next_pos)
labels = [label.strip() for label
- in self._file.readline()[1:].split(',')
+ in self._file.readline()[1:].split(b',')
if label.strip()]
label_lists.append(labels)
- elif line == '\037' or line == '\037' + os.linesep:
+ elif line == b'\037' or line == b'\037' + linesep:
if len(stops) < len(starts):
- stops.append(line_pos - len(os.linesep))
+ stops.append(line_pos - len(linesep))
elif not line:
- stops.append(line_pos - len(os.linesep))
+ stops.append(line_pos - len(linesep))
break
self._toc = dict(enumerate(zip(starts, stops)))
self._labels = dict(enumerate(label_lists))
def _pre_mailbox_hook(self, f):
"""Called before writing the mailbox to file f."""
- f.write('BABYL OPTIONS:%sVersion: 5%sLabels:%s%s\037' %
- (os.linesep, os.linesep, ','.join(self.get_labels()),
- os.linesep))
+ babyl = b'BABYL OPTIONS:' + linesep
+ babyl += b'Version: 5' + linesep
+ labels = self.get_labels()
+ labels = (label.encode() for label in labels)
+ babyl += b'Labels:' + b','.join(labels) + linesep
+ babyl += b'\037'
+ f.write(babyl)
def _pre_message_hook(self, f):
"""Called before writing each message to file f."""
- f.write('\014' + os.linesep)
+ f.write(b'\014' + linesep)
def _post_message_hook(self, f):
"""Called after writing each message to file f."""
- f.write(os.linesep + '\037')
+ f.write(linesep + b'\037')
def _install_message(self, message):
"""Write message contents and return (start, stop)."""
special_labels.append(label)
else:
labels.append(label)
- self._file.write('1')
+ self._file.write(b'1')
for label in special_labels:
- self._file.write(', ' + label)
- self._file.write(',,')
+ self._file.write(b', ' + label.encode())
+ self._file.write(b',,')
for label in labels:
- self._file.write(' ' + label + ',')
- self._file.write(os.linesep)
+ self._file.write(b' ' + label.encode() + b',')
+ self._file.write(linesep)
else:
- self._file.write('1,,' + os.linesep)
+ self._file.write(b'1,,' + linesep)
if isinstance(message, email.message.Message):
- orig_buffer = io.StringIO()
- orig_generator = email.generator.Generator(orig_buffer, False, 0)
+ orig_buffer = io.BytesIO()
+ orig_generator = email.generator.BytesGenerator(orig_buffer, False, 0)
orig_generator.flatten(message)
orig_buffer.seek(0)
while True:
line = orig_buffer.readline()
- self._file.write(line.replace('\n', os.linesep))
- if line == '\n' or not line:
+ self._file.write(line.replace(b'\n', linesep))
+ if line == b'\n' or not line:
break
- self._file.write('*** EOOH ***' + os.linesep)
+ self._file.write(b'*** EOOH ***' + linesep)
if isinstance(message, BabylMessage):
- vis_buffer = io.StringIO()
- vis_generator = email.generator.Generator(vis_buffer, False, 0)
+ vis_buffer = io.BytesIO()
+ vis_generator = email.generator.BytesGenerator(vis_buffer, False, 0)
vis_generator.flatten(message.get_visible())
while True:
line = vis_buffer.readline()
- self._file.write(line.replace('\n', os.linesep))
- if line == '\n' or not line:
+ self._file.write(line.replace(b'\n', linesep))
+ if line == b'\n' or not line:
break
else:
orig_buffer.seek(0)
while True:
line = orig_buffer.readline()
- self._file.write(line.replace('\n', os.linesep))
- if line == '\n' or not line:
+ self._file.write(line.replace(b'\n', linesep))
+ if line == b'\n' or not line:
break
while True:
buffer = orig_buffer.read(4096) # Buffer size is arbitrary.
if not buffer:
break
- self._file.write(buffer.replace('\n', os.linesep))
- elif isinstance(message, str):
- body_start = message.find('\n\n') + 2
+ self._file.write(buffer.replace(b'\n', linesep))
+ elif isinstance(message, (bytes, str, io.StringIO)):
+ if isinstance(message, io.StringIO):
+ warnings.warn("Use of StringIO input is deprecated, "
+ "use BytesIO instead", DeprecationWarning, 3)
+ message = message.getvalue()
+ if isinstance(message, str):
+ message = self._string_to_bytes(message)
+ body_start = message.find(b'\n\n') + 2
if body_start - 2 != -1:
- self._file.write(message[:body_start].replace('\n',
- os.linesep))
- self._file.write('*** EOOH ***' + os.linesep)
- self._file.write(message[:body_start].replace('\n',
- os.linesep))
- self._file.write(message[body_start:].replace('\n',
- os.linesep))
+ self._file.write(message[:body_start].replace(b'\n', linesep))
+ self._file.write(b'*** EOOH ***' + linesep)
+ self._file.write(message[:body_start].replace(b'\n', linesep))
+ self._file.write(message[body_start:].replace(b'\n', linesep))
else:
- self._file.write('*** EOOH ***' + os.linesep + os.linesep)
- self._file.write(message.replace('\n', os.linesep))
+ self._file.write(b'*** EOOH ***' + linesep + linesep)
+ self._file.write(message.replace(b'\n', linesep))
elif hasattr(message, 'readline'):
+ if hasattr(message, 'buffer'):
+ warnings.warn("Use of text mode files is deprecated, "
+ "use a binary mode file instead", DeprecationWarning, 3)
+ message = message.buffer
original_pos = message.tell()
first_pass = True
while True:
line = message.readline()
- self._file.write(line.replace('\n', os.linesep))
- if line == '\n' or not line:
- self._file.write('*** EOOH ***' + os.linesep)
+ # Universal newline support.
+ if line.endswith(b'\r\n'):
+ line = line[:-2] + b'\n'
+ elif line.endswith(b'\r'):
+ line = line[:-1] + b'\n'
+ self._file.write(line.replace(b'\n', linesep))
+ if line == b'\n' or not line:
+ self._file.write(b'*** EOOH ***' + linesep)
if first_pass:
first_pass = False
message.seek(original_pos)
buffer = message.read(4096) # Buffer size is arbitrary.
if not buffer:
break
- self._file.write(buffer.replace('\n', os.linesep))
+ self._file.write(buffer.replace(b'\n', linesep))
else:
raise TypeError('Invalid message type: %s' % type(message))
stop = self._file.tell()
self._become_message(copy.deepcopy(message))
if isinstance(message, Message):
message._explain_to(self)
+ elif isinstance(message, bytes):
+ self._become_message(email.message_from_bytes(message))
elif isinstance(message, str):
self._become_message(email.message_from_string(message))
- elif hasattr(message, "read"):
+ elif isinstance(message, io.TextIOWrapper):
self._become_message(email.message_from_file(message))
+ elif hasattr(message, "read"):
+ self._become_message(email.message_from_binary_file(message))
elif message is None:
email.message.Message.__init__(self)
else:
if not sequence in self._sequences:
self._sequences.append(sequence)
else:
- raise TypeError('sequence must be a string: %s' % type(sequence))
+ raise TypeError('sequence type must be str: %s' % type(sequence))
def remove_sequence(self, sequence):
"""Remove sequence from the list of sequences including the message."""
"""Read bytes."""
return self._read(size, self._file.read)
+ def read1(self, size=None):
+ """Read bytes."""
+ return self._read(size, self._file.read1)
+
def readline(self, size=None):
"""Read a line."""
return self._read(size, self._file.readline)
def __exit__(self, *exc):
self.close()
+ def readable(self):
+ return self._file.readable()
+
+ def writable(self):
+ return self._file.writable()
+
+ def seekable(self):
+ return self._file.seekable()
+
+ def flush(self):
+ return self._file.flush()
+
+ @property
+ def closed(self):
+ return self._file.closed
+
class _PartialFile(_ProxyFile):
"""A read-only wrapper of part of a file."""
"""Read size bytes using read_method, honoring start and stop."""
remaining = self._stop - self._pos
if remaining <= 0:
- return ''
+ return b''
if size is None or size < 0 or size > remaining:
size = remaining
return _ProxyFile._read(self, size, read_method)
"""Create a file if it doesn't exist and open for reading and writing."""
fd = os.open(path, os.O_CREAT | os.O_EXCL | os.O_RDWR, 0o666)
try:
- return open(path, 'r+', newline='')
+ return open(path, 'rb+')
finally:
os.close(fd)
import email.message
import re
import io
+import tempfile
from test import support
import unittest
+import textwrap
import mailbox
import glob
try:
class TestMailbox(TestBase):
+ maxDiff = None
+
_factory = None # Overridden by subclasses to reuse tests
_template = 'From: foo\n\n%s'
self.assertEqual(len(self._box), 2)
keys.append(self._box.add(email.message_from_string(_sample_message)))
self.assertEqual(len(self._box), 3)
- keys.append(self._box.add(io.StringIO(_sample_message)))
+ keys.append(self._box.add(io.BytesIO(_bytes_sample_message)))
self.assertEqual(len(self._box), 4)
keys.append(self._box.add(_sample_message))
self.assertEqual(len(self._box), 5)
+ keys.append(self._box.add(_bytes_sample_message))
+ self.assertEqual(len(self._box), 6)
+ with self.assertWarns(DeprecationWarning):
+ keys.append(self._box.add(
+ io.TextIOWrapper(io.BytesIO(_bytes_sample_message))))
+ self.assertEqual(len(self._box), 7)
self.assertEqual(self._box.get_string(keys[0]), self._template % 0)
- for i in (1, 2, 3, 4):
+ for i in (1, 2, 3, 4, 5, 6):
self._check_sample(self._box[keys[i]])
+ _nonascii_msg = textwrap.dedent("""\
+ From: foo
+ Subject: Falinaptár házhozszállítással. Már rendeltél?
+
+ 0
+ """)
+
+ def test_add_invalid_8bit_bytes_header(self):
+ key = self._box.add(self._nonascii_msg.encode('latin1'))
+ self.assertEqual(len(self._box), 1)
+ self.assertEqual(self._box.get_bytes(key),
+ self._nonascii_msg.encode('latin1'))
+
+ def test_invalid_nonascii_header_as_string(self):
+ subj = self._nonascii_msg.splitlines()[1]
+ key = self._box.add(subj.encode('latin1'))
+ self.assertEqual(self._box.get_string(key),
+ 'Subject: =?unknown-8bit?b?RmFsaW5hcHThciBo4Xpob3pzeuFsbO104XNz'
+ 'YWwuIE3hciByZW5kZWx06Ww/?=\n\n')
+
+ def test_add_nonascii_header_raises(self):
+ with self.assertRaisesRegex(ValueError, "ASCII-only"):
+ self._box.add(self._nonascii_msg)
+
+ _non_latin_bin_msg = textwrap.dedent("""\
+ From: foo@bar.com
+ To: báz
+ Subject: Maintenant je vous présente mon collègue, le pouf célèbre
+ \tJean de Baddie
+ Mime-Version: 1.0
+ Content-Type: text/plain; charset="utf-8"
+ Content-Transfer-Encoding: 8bit
+
+ Да, они летят.
+ """).encode('utf-8')
+
+ def test_add_8bit_body(self):
+ key = self._box.add(self._non_latin_bin_msg)
+ self.assertEqual(self._box.get_bytes(key),
+ self._non_latin_bin_msg)
+ with self._box.get_file(key) as f:
+ self.assertEqual(f.read(),
+ self._non_latin_bin_msg.replace(b'\n',
+ os.linesep.encode()))
+ self.assertEqual(self._box[key].get_payload(),
+ "Да, они летят.\n")
+
+ def test_add_binary_file(self):
+ with tempfile.TemporaryFile('wb+') as f:
+ f.write(_bytes_sample_message)
+ f.seek(0)
+ key = self._box.add(f)
+ # See issue 11062
+ if not isinstance(self._box, mailbox.Babyl):
+ self.assertEqual(self._box.get_bytes(key).split(b'\n'),
+ _bytes_sample_message.split(b'\n'))
+
+ def test_add_binary_nonascii_file(self):
+ with tempfile.TemporaryFile('wb+') as f:
+ f.write(self._non_latin_bin_msg)
+ f.seek(0)
+ key = self._box.add(f)
+ # See issue 11062
+ if not isinstance(self._box, mailbox.Babyl):
+ self.assertEqual(self._box.get_bytes(key).split(b'\n'),
+ self._non_latin_bin_msg.split(b'\n'))
+
+ def test_add_text_file_warns(self):
+ with tempfile.TemporaryFile('w+') as f:
+ f.write(_sample_message)
+ f.seek(0)
+ with self.assertWarns(DeprecationWarning):
+ key = self._box.add(f)
+ # See issue 11062
+ if not isinstance(self._box, mailbox.Babyl):
+ self.assertEqual(self._box.get_bytes(key).split(b'\n'),
+ _bytes_sample_message.split(b'\n'))
+
+ def test_add_StringIO_warns(self):
+ with self.assertWarns(DeprecationWarning):
+ key = self._box.add(io.StringIO(self._template % "0"))
+ self.assertEqual(self._box.get_string(key), self._template % "0")
+
+ def test_add_nonascii_StringIO_raises(self):
+ with self.assertWarns(DeprecationWarning):
+ with self.assertRaisesRegex(ValueError, "ASCII-only"):
+ self._box.add(io.StringIO(self._nonascii_msg))
+
def test_remove(self):
# Remove messages using remove()
self._test_remove_or_delitem(self._box.remove)
self.assertEqual(msg0.get_payload(), '0')
self._check_sample(self._box.get_message(key1))
+ def test_get_bytes(self):
+ # Get bytes representations of messages
+ key0 = self._box.add(self._template % 0)
+ key1 = self._box.add(_sample_message)
+ self.assertEqual(self._box.get_bytes(key0),
+ (self._template % 0).encode('ascii'))
+ self.assertEqual(self._box.get_bytes(key1), _bytes_sample_message)
+
def test_get_string(self):
# Get string representations of messages
key0 = self._box.add(self._template % 0)
key1 = self._box.add(_sample_message)
self.assertEqual(self._box.get_string(key0), self._template % 0)
- self.assertEqual(self._box.get_string(key1), _sample_message)
+ self.assertEqual(self._box.get_string(key1).split('\n'),
+ _sample_message.split('\n'))
def test_get_file(self):
# Get file representations of messages
data0 = file.read()
with self._box.get_file(key1) as file:
data1 = file.read()
- self.assertEqual(data0.replace(os.linesep, '\n'),
+ self.assertEqual(data0.decode('ascii').replace(os.linesep, '\n'),
self._template % 0)
- self.assertEqual(data1.replace(os.linesep, '\n'),
+ self.assertEqual(data1.decode('ascii').replace(os.linesep, '\n'),
_sample_message)
def test_iterkeys(self):
def test_dump_message(self):
# Write message representations to disk
for input in (email.message_from_string(_sample_message),
- _sample_message, io.StringIO(_sample_message)):
- output = io.StringIO()
+ _sample_message, io.BytesIO(_bytes_sample_message)):
+ output = io.BytesIO()
self._box._dump_message(input, output)
- self.assertEqual(output.getvalue(), _sample_message)
- output = io.StringIO()
+ self.assertEqual(output.getvalue(),
+ _bytes_sample_message.replace(b'\n', os.linesep.encode()))
+ output = io.BytesIO()
self.assertRaises(TypeError,
lambda: self._box._dump_message(None, output))
self.assertRaises(NotImplementedError, lambda: box.__getitem__(''))
self.assertRaises(NotImplementedError, lambda: box.get_message(''))
self.assertRaises(NotImplementedError, lambda: box.get_string(''))
+ self.assertRaises(NotImplementedError, lambda: box.get_bytes(''))
self.assertRaises(NotImplementedError, lambda: box.get_file(''))
self.assertRaises(NotImplementedError, lambda: '' in box)
self.assertRaises(NotImplementedError, lambda: box.__contains__(''))
"Host name mismatch: '%s' should be '%s'" %
(groups[4], hostname))
previous_groups = groups
- tmp_file.write(_sample_message)
+ tmp_file.write(_bytes_sample_message)
tmp_file.seek(0)
- self.assertEqual(tmp_file.read(), _sample_message)
+ self.assertEqual(tmp_file.read(), _bytes_sample_message)
tmp_file.close()
file_count = len(os.listdir(os.path.join(self._path, "tmp")))
self.assertEqual(file_count, repetitions,
self.assertEqual(self._box[key].get_from(), 'foo@bar blah')
self.assertEqual(self._box[key].get_payload(), '0')
+ def test_add_from_bytes(self):
+ # Add a byte string starting with 'From ' to the mailbox
+ key = self._box.add(b'From foo@bar blah\nFrom: foo\n\n0')
+ self.assertEqual(self._box[key].get_from(), 'foo@bar blah')
+ self.assertEqual(self._box[key].get_payload(), '0')
+
def test_add_mbox_or_mmdf_message(self):
# Add an mboxMessage or MMDFMessage
for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage):
self._box._file.seek(0)
contents = self._box._file.read()
self._box.close()
- with open(self._path, 'r', newline='') as f:
+ with open(self._path, 'rb') as f:
self.assertEqual(contents, f.read())
self._box = self._factory(self._path)
self._post_initialize_hook(msg)
self._check_sample(msg)
+ def test_initialize_with_binary_file(self):
+ # Initialize based on contents of binary file
+ with open(self._path, 'wb+') as f:
+ f.write(_bytes_sample_message)
+ f.seek(0)
+ msg = self._factory(f)
+ self._post_initialize_hook(msg)
+ self._check_sample(msg)
+
def test_initialize_with_nothing(self):
# Initialize without arguments
msg = self._factory()
msg_plain = mailbox.Message(msg)
self._check_sample(msg_plain)
+ def test_x_from_bytes(self):
+ # Convert all formats to Message
+ for class_ in (mailbox.Message, mailbox.MaildirMessage,
+ mailbox.mboxMessage, mailbox.MHMessage,
+ mailbox.BabylMessage, mailbox.MMDFMessage):
+ msg = class_(_bytes_sample_message)
+ self._check_sample(msg)
+
def test_x_to_invalid(self):
# Convert all formats to an invalid format
for class_ in (mailbox.Message, mailbox.MaildirMessage,
--NMuMz9nt05w80d4+--
"""
+_bytes_sample_message = _sample_message.encode('ascii')
+
_sample_headers = {
"Return-Path":"<gkj@gregorykjohnson.com>",
"X-Original-To":"gkj+person@localhost",