Patch by Milan Oberkirch, developed as part of his 2014 GSOC project.
Note that this also fixes a bug in mock_socket ('getpeername' was returning a
simple string instead of the tuple required for IPvX protocols), a bug in
DebugServer with respect to handling binary data (should have been fixed when
decode_data was introduced, but wasn't found until this patch was written),
and a long-standing bug in DebugServer (it was printing an extra blank line at
the end of the displayed message text).
Additionally the SMTPChannel may be extended to implement very specific
interaction behaviour with SMTP clients.
-The code supports :RFC:`5321`, plus the :rfc:`1870` SIZE extension.
+The code supports :RFC:`5321`, plus the :rfc:`1870` SIZE and :rfc:`6531`
+SMTPUTF8 extensions.
SMTPServer Objects
.. class:: SMTPServer(localaddr, remoteaddr, data_size_limit=33554432,\
- map=None, decode_data=True)
+ map=None, enable_SMTPUTF8=False, decode_data=True)
Create a new :class:`SMTPServer` object, which binds to local address
*localaddr*. It will treat *remoteaddr* as an upstream SMTP relayer. It
accepted in a ``DATA`` command. A value of ``None`` or ``0`` means no
limit.
+ *enable_SMTPUTF8* determins whether the ``SMTPUTF8`` extension (as defined
+ in :RFC:`6531`) should be enabled. The default is ``False``. If
+ *enable_SMTPUTF* is set to ``True``, the :meth:`process_smtputf8_message`
+ method must be defined. A :exc:`ValueError` is raised if both
+ *enable_SMTPUTF8* and *decode_data* are set to ``True`` at the same time.
+
A dictionary can be specified in *map* to avoid using a global socket map.
*decode_data* specifies whether the data portion of the SMTP transaction
.. method:: process_message(peer, mailfrom, rcpttos, data)
- Raise :exc:`NotImplementedError` exception. Override this in subclasses to
+ Raise a :exc:`NotImplementedError` exception. Override this in subclasses to
do something useful with this message. Whatever was passed in the
constructor as *remoteaddr* will be available as the :attr:`_remoteaddr`
attribute. *peer* is the remote host's address, *mailfrom* is the envelope
originator, *rcpttos* are the envelope recipients and *data* is a string
- containing the contents of the e-mail (which should be in :rfc:`2822`
+ containing the contents of the e-mail (which should be in :rfc:`5321`
format).
If the *decode_data* constructor keyword is set to ``True``, the *data*
argument will be a unicode string. If it is set to ``False``, it
will be a bytes object.
+ Return ``None`` to request a normal ``250 Ok`` response; otherwise
+ return the desired response string in :RFC:`5321` format.
+
+ .. method:: process_smtputf8_message(peer, mailfrom, rcpttos, data)
+
+ Raise a :exc:`NotImplementedError` exception. Override this in
+ subclasses to do something useful with messages when *enable_SMTPUTF8*
+ has been set to ``True`` and the SMTP client requested ``SMTPUTF8``,
+ since this method is called rather than :meth:`process_message` when the
+ client actively requests ``SMTPUTF8``. The *data* argument will always
+ be a bytes object, and any non-``None`` return value should conform to
+ :rfc:`6531`; otherwise, the API is the same as for
+ :meth:`process_message`.
+
.. attribute:: channel_class
Override this in subclasses to use a custom :class:`SMTPChannel` for
.. versionchanged:: 3.4
The *map* argument was added.
- .. versionchanged:: 3.5 the *decode_data* argument was added, and *localaddr*
- and *remoteaddr* may now contain IPv6 addresses.
+ .. versionchanged:: 3.5
+ *localaddr* and *remoteaddr* may now contain IPv6 addresses.
+
+ .. versionadded:: 3.5
+ the *decode_data* and *enable_SMTPUTF8* constructor arguments, and the
+ :meth:`process_smtputf8_message` method.
DebuggingServer Objects
-------------------
.. class:: SMTPChannel(server, conn, addr, data_size_limit=33554432,\
- map=None, decode_data=True)
+ map=None, enable_SMTPUTF8=False, decode_data=True)
Create a new :class:`SMTPChannel` object which manages the communication
between the server and a single SMTP client.
accepted in a ``DATA`` command. A value of ``None`` or ``0`` means no
limit.
+ *enable_SMTPUTF8* determins whether the ``SMTPUTF8`` extension (as defined
+ in :RFC:`6531`) should be enabled. The default is ``False``. A
+ :exc:`ValueError` is raised if both *enable_SMTPUTF8* and *decode_data* are
+ set to ``True`` at the same time.
+
A dictionary can be specified in *map* to avoid using a global socket map.
*decode_data* specifies whether the data portion of the SMTP transaction
:attr:`SMTPServer.channel_class` of your :class:`SMTPServer`.
.. versionchanged:: 3.5
- the *decode_data* argument was added.
+ the *decode_data* and *enable_SMTPUTF8* arguments were added.
The :class:`SMTPChannel` has the following instance variables:
addresses in the :class:`~smtpd.SMTPServer` constructor, and have it
successfully connect. (Contributed by Milan Oberkirch in :issue:`14758`.)
+* :mod:`~smtpd.SMTPServer` now supports :rfc:`6531` via the *enable_SMTPUTF8*
+ constructor argument and a user-provided
+ :meth:`~smtpd.SMTPServer.process_smtputf8_message` method.
+
smtplib
-------
#! /usr/bin/env python3
-"""An RFC 5321 smtp proxy.
+"""An RFC 5321 smtp proxy with optional RFC 1870 and RFC 6531 extensions.
Usage: %(program)s [options] [localhost:localport [remotehost:remoteport]]
Restrict the total size of the incoming message to "limit" number of
bytes via the RFC 1870 SIZE extension. Defaults to 33554432 bytes.
+ --smtputf8
+ -u
+ Enable the SMTPUTF8 extension and behave as an RFC 6531 smtp proxy.
+
--debug
-d
Turn on debugging prints.
command_size_limit = 512
command_size_limits = collections.defaultdict(lambda x=command_size_limit: x)
- command_size_limits.update({
- 'MAIL': command_size_limit + 26,
- })
- max_command_size_limit = max(command_size_limits.values())
+
+ @property
+ def max_command_size_limit(self):
+ try:
+ return max(self.command_size_limits.values())
+ except ValueError:
+ return self.command_size_limit
def __init__(self, server, conn, addr, data_size_limit=DATA_SIZE_DEFAULT,
- map=None, decode_data=None):
+ map=None, enable_SMTPUTF8=False, decode_data=None):
asynchat.async_chat.__init__(self, conn, map=map)
self.smtp_server = server
self.conn = conn
self.addr = addr
self.data_size_limit = data_size_limit
+ self.enable_SMTPUTF8 = enable_SMTPUTF8
+ if enable_SMTPUTF8:
+ if decode_data:
+ ValueError("decode_data and enable_SMTPUTF8 cannot be set to"
+ " True at the same time")
+ decode_data = False
if decode_data is None:
warn("The decode_data default of True will change to False in 3.6;"
" specify an explicit value for this keyword",
self._linesep = b'\r\n'
self._dotsep = b'.'
self._newline = b'\n'
- self.received_lines = []
- self.smtp_state = self.COMMAND
+ self._set_rset_state()
self.seen_greeting = ''
- self.mailfrom = None
- self.rcpttos = []
- self.received_data = ''
+ self.extended_smtp = False
+ self.command_size_limits.clear()
self.fqdn = socket.getfqdn()
- self.num_bytes = 0
try:
self.peer = conn.getpeername()
except OSError as err:
return
print('Peer:', repr(self.peer), file=DEBUGSTREAM)
self.push('220 %s %s' % (self.fqdn, __version__))
+
+ def _set_post_data_state(self):
+ """Reset state variables to their post-DATA state."""
+ self.smtp_state = self.COMMAND
+ self.mailfrom = None
+ self.rcpttos = []
+ self.require_SMTPUTF8 = False
+ self.num_bytes = 0
self.set_terminator(b'\r\n')
- self.extended_smtp = False
+
+ def _set_rset_state(self):
+ """Reset all state variables except the greeting."""
+ self._set_post_data_state()
+ self.received_data = ''
+ self.received_lines = []
+
# properties for backwards-compatibility
@property
"set 'addr' instead", DeprecationWarning, 2)
self.addr = value
- # Overrides base class for convenience
+ # Overrides base class for convenience.
def push(self, msg):
- asynchat.async_chat.push(self, bytes(msg + '\r\n', 'ascii'))
+ asynchat.async_chat.push(self, bytes(
+ msg + '\r\n', 'utf-8' if self.require_SMTPUTF8 else 'ascii'))
# Implementation of base class abstract method
def collect_incoming_data(self, data):
if not line:
self.push('500 Error: bad syntax')
return
- method = None
if not self._decode_data:
line = str(line, 'utf-8')
i = line.find(' ')
else:
data.append(text)
self.received_data = self._newline.join(data)
- status = self.smtp_server.process_message(self.peer,
- self.mailfrom,
- self.rcpttos,
- self.received_data)
- self.rcpttos = []
- self.mailfrom = None
- self.smtp_state = self.COMMAND
- self.num_bytes = 0
- self.set_terminator(b'\r\n')
+ args = (self.peer, self.mailfrom, self.rcpttos, self.received_data)
+ if self.require_SMTPUTF8:
+ status = self.smtp_server.process_smtputf8_message(*args)
+ else:
+ status = self.smtp_server.process_message(*args)
+ self._set_post_data_state()
if not status:
self.push('250 OK')
else:
if not arg:
self.push('501 Syntax: HELO hostname')
return
+ # See issue #21783 for a discussion of this behavior.
if self.seen_greeting:
self.push('503 Duplicate HELO/EHLO')
- else:
- self.seen_greeting = arg
- self.extended_smtp = False
- self.push('250 %s' % self.fqdn)
+ return
+ self._set_rset_state()
+ self.seen_greeting = arg
+ self.push('250 %s' % self.fqdn)
def smtp_EHLO(self, arg):
if not arg:
self.push('501 Syntax: EHLO hostname')
return
+ # See issue #21783 for a discussion of this behavior.
if self.seen_greeting:
self.push('503 Duplicate HELO/EHLO')
- else:
- self.seen_greeting = arg
- self.extended_smtp = True
- self.push('250-%s' % self.fqdn)
- if self.data_size_limit:
- self.push('250-SIZE %s' % self.data_size_limit)
- self.push('250 HELP')
+ return
+ self._set_rset_state()
+ self.seen_greeting = arg
+ self.extended_smtp = True
+ self.push('250-%s' % self.fqdn)
+ if self.data_size_limit:
+ self.push('250-SIZE %s' % self.data_size_limit)
+ self.command_size_limits['MAIL'] += 26
+ if self.enable_SMTPUTF8:
+ self.push('250-8BITMIME')
+ self.push('250-SMTPUTF8')
+ self.command_size_limits['MAIL'] += 10
+ self.push('250 HELP')
def smtp_NOOP(self, arg):
if arg:
def _getparams(self, params):
# Return any parameters that appear to be syntactically valid according
# to RFC 1869, ignore all others. (Postel rule: accept what we can.)
- params = [param.split('=', 1) for param in params.split()
- if '=' in param]
+ params = [param.split('=', 1) if '=' in param else (param, True)
+ for param in params.split()]
return {k: v for k, v in params if k.isalnum()}
def smtp_HELP(self, arg):
if params is None:
self.push(syntaxerr)
return
+ body = params.pop('BODY', '7BIT')
+ if self.enable_SMTPUTF8 and params.pop('SMTPUTF8', False):
+ if body != '8BITMIME':
+ self.push('501 Syntax: MAIL FROM: <address>'
+ ' [BODY=8BITMIME SMTPUTF8]')
+ return
+ else:
+ self.require_SMTPUTF8 = True
size = params.pop('SIZE', None)
if size:
if not size.isdigit():
if arg:
self.push('501 Syntax: RSET')
return
- # Resets the sender, recipients, and data, but not the greeting
- self.mailfrom = None
- self.rcpttos = []
- self.received_data = ''
- self.smtp_state = self.COMMAND
+ self._set_rset_state()
self.push('250 OK')
def smtp_DATA(self, arg):
def __init__(self, localaddr, remoteaddr,
data_size_limit=DATA_SIZE_DEFAULT, map=None,
- decode_data=None):
+ enable_SMTPUTF8=False, decode_data=None):
self._localaddr = localaddr
self._remoteaddr = remoteaddr
self.data_size_limit = data_size_limit
+ self.enable_SMTPUTF8 = enable_SMTPUTF8
+ if enable_SMTPUTF8:
+ if decode_data:
+ raise ValueError("The decode_data and enable_SMTPUTF8"
+ " parameters cannot be set to True at the"
+ " same time.")
+ decode_data = False
if decode_data is None:
warn("The decode_data default of True will change to False in 3.6;"
" specify an explicit value for this keyword",
def handle_accepted(self, conn, addr):
print('Incoming connection from %s' % repr(addr), file=DEBUGSTREAM)
- channel = self.channel_class(self, conn, addr, self.data_size_limit,
- self._map, self._decode_data)
+ channel = self.channel_class(self,
+ conn,
+ addr,
+ self.data_size_limit,
+ self._map,
+ self.enable_SMTPUTF8,
+ self._decode_data)
# API for "doing something useful with the message"
def process_message(self, peer, mailfrom, rcpttos, data):
containing a `.' followed by other text has had the leading dot
removed.
- This function should return None, for a normal `250 Ok' response;
- otherwise it returns the desired response string in RFC 821 format.
+ This function should return None for a normal `250 Ok' response;
+ otherwise, it should return the desired response string in RFC 821
+ format.
+
+ """
+ raise NotImplementedError
+
+ # API for processing messeges needing Unicode support (RFC 6531, RFC 6532).
+ def process_smtputf8_message(self, peer, mailfrom, rcpttos, data):
+ """Same as ``process_message`` but for messages for which the client
+ has sent the SMTPUTF8 parameter with the MAIL command (see the
+ enable_SMTPUTF8 parameter of the constructor).
+
+ This function should return None for a normal `250 Ok' response;
+ otherwise, it should return the desired response string in RFC 6531
+ format.
"""
raise NotImplementedError
class DebuggingServer(SMTPServer):
- # Do something with the gathered message
- def process_message(self, peer, mailfrom, rcpttos, data):
+
+ def _print_message_content(self, peer, data):
inheaders = 1
- lines = data.split('\n')
- print('---------- MESSAGE FOLLOWS ----------')
+ lines = data.splitlines()
for line in lines:
# headers first
if inheaders and not line:
- print('X-Peer:', peer[0])
+ peerheader = 'X-Peer: ' + peer[0]
+ if not isinstance(data, str):
+ # decoded_data=false; make header match other binary output
+ peerheader = repr(peerheader.encode('utf-8'))
+ print(peerheader)
inheaders = 0
+ if not isinstance(data, str):
+ # Avoid spurious 'str on bytes instance' warning.
+ line = repr(line)
print(line)
+
+ def process_message(self, peer, mailfrom, rcpttos, data):
+ print('---------- MESSAGE FOLLOWS ----------')
+ self._print_message_content(peer, data)
+ print('------------ END MESSAGE ------------')
+
+ def process_smtputf8_message(self, peer, mailfrom, rcpttos, data):
+ print('----- SMTPUTF8 MESSAGE FOLLOWS ------')
+ self._print_message_content(peer, data)
print('------------ END MESSAGE ------------')
class PureProxy(SMTPServer):
+ def __init__(self, *args, **kwargs):
+ if 'enable_SMTPUTF8' in kwargs and kwargs['enable_SMTPUTF8']:
+ raise ValueError("PureProxy does not support SMTPUTF8.")
+ super(PureProxy, self).__init__(*args, **kwargs)
+
def process_message(self, peer, mailfrom, rcpttos, data):
lines = data.split('\n')
# Look for the last header
class MailmanProxy(PureProxy):
+ def __init__(self, *args, **kwargs):
+ if 'enable_SMTPUTF8' in kwargs and kwargs['enable_SMTPUTF8']:
+ raise ValueError("MailmanProxy does not support SMTPUTF8.")
+ super(PureProxy, self).__init__(*args, **kwargs)
+
def process_message(self, peer, mailfrom, rcpttos, data):
from io import StringIO
from Mailman import Utils
class Options:
- setuid = 1
+ setuid = True
classname = 'PureProxy'
size_limit = None
+ enable_SMTPUTF8 = False
def parseargs():
global DEBUGSTREAM
try:
opts, args = getopt.getopt(
- sys.argv[1:], 'nVhc:s:d',
- ['class=', 'nosetuid', 'version', 'help', 'size=', 'debug'])
+ sys.argv[1:], 'nVhc:s:du',
+ ['class=', 'nosetuid', 'version', 'help', 'size=', 'debug',
+ 'smtputf8'])
except getopt.error as e:
usage(1, e)
print(__version__)
sys.exit(0)
elif opt in ('-n', '--nosetuid'):
- options.setuid = 0
+ options.setuid = False
elif opt in ('-c', '--class'):
options.classname = arg
elif opt in ('-d', '--debug'):
DEBUGSTREAM = sys.stderr
+ elif opt in ('-u', '--smtputf8'):
+ options.enable_SMTPUTF8 = True
elif opt in ('-s', '--size'):
try:
int_size = int(arg)
class_ = getattr(mod, classname)
proxy = class_((options.localhost, options.localport),
(options.remotehost, options.remoteport),
- options.size_limit)
+ options.size_limit, enable_SMTPUTF8=options.enable_SMTPUTF8)
if options.setuid:
try:
import pwd
return len(data)
def getpeername(self):
- return 'peer'
+ return ('peer-address', 'peer-port')
def close(self):
pass
import unittest
+import textwrap
from test import support, mock_socket
import socket
import io
class DummyServer(smtpd.SMTPServer):
- def __init__(self, localaddr, remoteaddr, decode_data=True):
- smtpd.SMTPServer.__init__(self, localaddr, remoteaddr,
- decode_data=decode_data)
+ def __init__(self, *args, **kwargs):
+ smtpd.SMTPServer.__init__(self, *args, **kwargs)
self.messages = []
- if decode_data:
+ if self._decode_data:
self.return_status = 'return status'
else:
self.return_status = b'return status'
if data == self.return_status:
return '250 Okish'
+ def process_smtputf8_message(self, *args, **kwargs):
+ return '250 SMTPUTF8 message okish'
+
class DummyDispatcherBroken(Exception):
pass
write_line(b'DATA')
self.assertRaises(NotImplementedError, write_line, b'spam\r\n.\r\n')
+ def test_process_smtputf8_message_unimplemented(self):
+ server = smtpd.SMTPServer((support.HOST, 0), ('b', 0),
+ enable_SMTPUTF8=True)
+ conn, addr = server.accept()
+ channel = smtpd.SMTPChannel(server, conn, addr, enable_SMTPUTF8=True)
+
+ def write_line(line):
+ channel.socket.queue_recv(line)
+ channel.handle_read()
+
+ write_line(b'EHLO example')
+ write_line(b'MAIL From: <eggs@example> BODY=8BITMIME SMTPUTF8')
+ write_line(b'RCPT To: <spam@example>')
+ write_line(b'DATA')
+ self.assertRaises(NotImplementedError, write_line, b'spam\r\n.\r\n')
+
def test_decode_data_default_warns(self):
with self.assertWarns(DeprecationWarning):
smtpd.SMTPServer((support.HOST, 0), ('b', 0))
+ def test_decode_data_and_enable_SMTPUTF8_raises(self):
+ self.assertRaises(
+ ValueError,
+ smtpd.SMTPServer,
+ (support.HOST, 0),
+ ('b', 0),
+ enable_SMTPUTF8=True,
+ decode_data=True)
+
+ def tearDown(self):
+ asyncore.close_all()
+ asyncore.socket = smtpd.socket = socket
+
+
+class DebuggingServerTest(unittest.TestCase):
+
+ def setUp(self):
+ smtpd.socket = asyncore.socket = mock_socket
+
+ def send_data(self, channel, data, enable_SMTPUTF8=False):
+ def write_line(line):
+ channel.socket.queue_recv(line)
+ channel.handle_read()
+ write_line(b'EHLO example')
+ if enable_SMTPUTF8:
+ write_line(b'MAIL From:eggs@example BODY=8BITMIME SMTPUTF8')
+ else:
+ write_line(b'MAIL From:eggs@example')
+ write_line(b'RCPT To:spam@example')
+ write_line(b'DATA')
+ write_line(data)
+ write_line(b'.')
+
+ def test_process_message_with_decode_data_true(self):
+ server = smtpd.DebuggingServer((support.HOST, 0), ('b', 0),
+ decode_data=True)
+ conn, addr = server.accept()
+ channel = smtpd.SMTPChannel(server, conn, addr, decode_data=True)
+ with support.captured_stdout() as s:
+ self.send_data(channel, b'From: test\n\nhello\n')
+ stdout = s.getvalue()
+ self.assertEqual(stdout, textwrap.dedent("""\
+ ---------- MESSAGE FOLLOWS ----------
+ From: test
+ X-Peer: peer-address
+
+ hello
+ ------------ END MESSAGE ------------
+ """))
+
+ def test_process_message_with_decode_data_false(self):
+ server = smtpd.DebuggingServer((support.HOST, 0), ('b', 0),
+ decode_data=False)
+ conn, addr = server.accept()
+ channel = smtpd.SMTPChannel(server, conn, addr, decode_data=False)
+ with support.captured_stdout() as s:
+ self.send_data(channel, b'From: test\n\nh\xc3\xa9llo\xff\n')
+ stdout = s.getvalue()
+ self.assertEqual(stdout, textwrap.dedent("""\
+ ---------- MESSAGE FOLLOWS ----------
+ b'From: test'
+ b'X-Peer: peer-address'
+ b''
+ b'h\\xc3\\xa9llo\\xff'
+ ------------ END MESSAGE ------------
+ """))
+
+ def test_process_message_with_enable_SMTPUTF8_true(self):
+ server = smtpd.DebuggingServer((support.HOST, 0), ('b', 0),
+ enable_SMTPUTF8=True)
+ conn, addr = server.accept()
+ channel = smtpd.SMTPChannel(server, conn, addr, enable_SMTPUTF8=True)
+ with support.captured_stdout() as s:
+ self.send_data(channel, b'From: test\n\nh\xc3\xa9llo\xff\n')
+ stdout = s.getvalue()
+ self.assertEqual(stdout, textwrap.dedent("""\
+ ---------- MESSAGE FOLLOWS ----------
+ b'From: test'
+ b'X-Peer: peer-address'
+ b''
+ b'h\\xc3\\xa9llo\\xff'
+ ------------ END MESSAGE ------------
+ """))
+
+ def test_process_SMTPUTF8_message_with_enable_SMTPUTF8_true(self):
+ server = smtpd.DebuggingServer((support.HOST, 0), ('b', 0),
+ enable_SMTPUTF8=True)
+ conn, addr = server.accept()
+ channel = smtpd.SMTPChannel(server, conn, addr, enable_SMTPUTF8=True)
+ with support.captured_stdout() as s:
+ self.send_data(channel, b'From: test\n\nh\xc3\xa9llo\xff\n',
+ enable_SMTPUTF8=True)
+ stdout = s.getvalue()
+ self.assertEqual(stdout, textwrap.dedent("""\
+ ----- SMTPUTF8 MESSAGE FOLLOWS ------
+ b'From: test'
+ b'X-Peer: peer-address'
+ b''
+ b'h\\xc3\\xa9llo\\xff'
+ ------------ END MESSAGE ------------
+ """))
+
def tearDown(self):
asyncore.close_all()
asyncore.socket = smtpd.socket = socket
smtpd.socket = asyncore.socket = mock_socket
self.old_debugstream = smtpd.DEBUGSTREAM
self.debug = smtpd.DEBUGSTREAM = io.StringIO()
- self.server = DummyServer((support.HOST, 0), ('b', 0))
+ self.server = DummyServer((support.HOST, 0), ('b', 0),
+ decode_data=True)
conn, addr = self.server.accept()
self.channel = smtpd.SMTPChannel(self.server, conn, addr,
decode_data=True)
def test_broken_connect(self):
self.assertRaises(
DummyDispatcherBroken, BrokenDummyServer,
- (support.HOST, 0), ('b', 0))
+ (support.HOST, 0), ('b', 0), decode_data=True)
def test_server_accept(self):
self.server.handle_accept()
self.assertEqual(self.channel.socket.last,
b'500 Error: line too long\r\n')
+ def test_MAIL_command_rejects_SMTPUTF8_by_default(self):
+ self.write_line(b'EHLO example')
+ self.write_line(
+ b'MAIL from: <naive@example.com> BODY=8BITMIME SMTPUTF8')
+ self.assertEqual(self.channel.socket.last[0:1], b'5')
+
def test_data_longer_than_default_data_size_limit(self):
# Hack the default so we don't have to generate so much data.
self.channel.data_size_limit = 1048
self.write_line(b'data\r\nmore\r\n.')
self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
self.assertEqual(self.server.messages,
- [('peer', 'eggs@example', ['spam@example'], 'data\nmore')])
+ [(('peer-address', 'peer-port'),
+ 'eggs@example',
+ ['spam@example'],
+ 'data\nmore')])
def test_DATA_syntax(self):
self.write_line(b'HELO example')
self.write_line(b'DATA')
self.write_line(b'data\r\n.')
self.assertEqual(self.server.messages,
- [('peer', 'eggs@example', ['spam@example','ham@example'], 'data')])
+ [(('peer-address', 'peer-port'),
+ 'eggs@example',
+ ['spam@example','ham@example'],
+ 'data')])
def test_manual_status(self):
# checks that the Channel is able to return a custom status message
self.write_line(b'DATA')
self.write_line(b'data\r\n.')
self.assertEqual(self.server.messages,
- [('peer', 'foo@example', ['eggs@example'], 'data')])
+ [(('peer-address', 'peer-port'),
+ 'foo@example',
+ ['eggs@example'],
+ 'data')])
def test_HELO_RSET(self):
self.write_line(b'HELO example')
self.channel._SMTPChannel__addr = 'spam'
def test_decode_data_default_warning(self):
- server = DummyServer((support.HOST, 0), ('b', 0))
+ with self.assertWarns(DeprecationWarning):
+ server = DummyServer((support.HOST, 0), ('b', 0))
conn, addr = self.server.accept()
with self.assertWarns(DeprecationWarning):
smtpd.SMTPChannel(server, conn, addr)
smtpd.socket = asyncore.socket = mock_socket
self.old_debugstream = smtpd.DEBUGSTREAM
self.debug = smtpd.DEBUGSTREAM = io.StringIO()
- self.server = DummyServer((support.HOSTv6, 0), ('b', 0))
+ self.server = DummyServer((support.HOSTv6, 0), ('b', 0),
+ decode_data=True)
conn, addr = self.server.accept()
self.channel = smtpd.SMTPChannel(self.server, conn, addr,
decode_data=True)
smtpd.socket = asyncore.socket = mock_socket
self.old_debugstream = smtpd.DEBUGSTREAM
self.debug = smtpd.DEBUGSTREAM = io.StringIO()
- self.server = DummyServer((support.HOST, 0), ('b', 0))
+ self.server = DummyServer((support.HOST, 0), ('b', 0),
+ decode_data=True)
conn, addr = self.server.accept()
# Set DATA size limit to 32 bytes for easy testing
self.channel = smtpd.SMTPChannel(self.server, conn, addr, 32,
self.write_line(b'data\r\nmore\r\n.')
self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
self.assertEqual(self.server.messages,
- [('peer', 'eggs@example', ['spam@example'], 'data\nmore')])
+ [(('peer-address', 'peer-port'),
+ 'eggs@example',
+ ['spam@example'],
+ 'data\nmore')])
def test_data_limit_dialog_too_much_data(self):
self.write_line(b'HELO example')
'utf8 enriched text: żźć\nand some plain ascii')
+class SMTPDChannelTestWithEnableSMTPUTF8True(unittest.TestCase):
+ def setUp(self):
+ smtpd.socket = asyncore.socket = mock_socket
+ self.old_debugstream = smtpd.DEBUGSTREAM
+ self.debug = smtpd.DEBUGSTREAM = io.StringIO()
+ self.server = DummyServer((support.HOST, 0), ('b', 0),
+ enable_SMTPUTF8=True)
+ conn, addr = self.server.accept()
+ self.channel = smtpd.SMTPChannel(self.server, conn, addr,
+ enable_SMTPUTF8=True)
+
+ def tearDown(self):
+ asyncore.close_all()
+ asyncore.socket = smtpd.socket = socket
+ smtpd.DEBUGSTREAM = self.old_debugstream
+
+ def write_line(self, line):
+ self.channel.socket.queue_recv(line)
+ self.channel.handle_read()
+
+ def test_MAIL_command_accepts_SMTPUTF8_when_announced(self):
+ self.write_line(b'EHLO example')
+ self.write_line(
+ 'MAIL from: <naïve@example.com> BODY=8BITMIME SMTPUTF8'.encode(
+ 'utf-8')
+ )
+ self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
+
+ def test_process_smtputf8_message(self):
+ self.write_line(b'EHLO example')
+ for mail_parameters in [b'', b'BODY=8BITMIME SMTPUTF8']:
+ self.write_line(b'MAIL from: <a@example> ' + mail_parameters)
+ self.assertEqual(self.channel.socket.last[0:3], b'250')
+ self.write_line(b'rcpt to:<b@example.com>')
+ self.assertEqual(self.channel.socket.last[0:3], b'250')
+ self.write_line(b'data')
+ self.assertEqual(self.channel.socket.last[0:3], b'354')
+ self.write_line(b'c\r\n.')
+ if mail_parameters == b'':
+ self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
+ else:
+ self.assertEqual(self.channel.socket.last,
+ b'250 SMTPUTF8 message okish\r\n')
+
+ def test_utf8_data(self):
+ self.write_line(b'EHLO example')
+ self.write_line(
+ 'MAIL From: naïve@examplé BODY=8BITMIME SMTPUTF8'.encode('utf-8'))
+ self.assertEqual(self.channel.socket.last[0:3], b'250')
+ self.write_line('RCPT To:späm@examplé'.encode('utf-8'))
+ self.assertEqual(self.channel.socket.last[0:3], b'250')
+ self.write_line(b'DATA')
+ self.assertEqual(self.channel.socket.last[0:3], b'354')
+ self.write_line(b'utf8 enriched text: \xc5\xbc\xc5\xba\xc4\x87')
+ self.write_line(b'.')
+ self.assertEqual(
+ self.channel.received_data,
+ b'utf8 enriched text: \xc5\xbc\xc5\xba\xc4\x87')
+
+ def test_MAIL_command_limit_extended_with_SIZE_and_SMTPUTF8(self):
+ self.write_line(b'ehlo example')
+ fill_len = (512 + 26 + 10) - len('mail from:<@example>')
+ self.write_line(b'MAIL from:<' +
+ b'a' * (fill_len + 1) +
+ b'@example>')
+ self.assertEqual(self.channel.socket.last,
+ b'500 Error: line too long\r\n')
+ self.write_line(b'MAIL from:<' +
+ b'a' * fill_len +
+ b'@example>')
+ self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
+
+ def test_multiple_emails_with_extended_command_length(self):
+ self.write_line(b'ehlo example')
+ fill_len = (512 + 26 + 10) - len('mail from:<@example>')
+ for char in [b'a', b'b', b'c']:
+ self.write_line(b'MAIL from:<' + char * fill_len + b'a@example>')
+ self.assertEqual(self.channel.socket.last[0:3], b'500')
+ self.write_line(b'MAIL from:<' + char * fill_len + b'@example>')
+ self.assertEqual(self.channel.socket.last[0:3], b'250')
+ self.write_line(b'rcpt to:<hans@example.com>')
+ self.assertEqual(self.channel.socket.last[0:3], b'250')
+ self.write_line(b'data')
+ self.assertEqual(self.channel.socket.last[0:3], b'354')
+ self.write_line(b'test\r\n.')
+ self.assertEqual(self.channel.socket.last[0:3], b'250')
+
if __name__ == "__main__":
unittest.main()
Library
-------
+- Issue #21725: Added support for RFC 6531 (SMTPUTF8) in smtpd.
+
- Issue #22176: Update the ctypes module's libffi to v3.1. This release
adds support for the Linux AArch64 and POWERPC ELF ABIv2 little endian
architectures.