* bpo-35805: Add parser for Message-ID header.
This parser is based on the definition of Identification Fields from RFC 5322
Sec 3.6.4.
This should also prevent folding of Message-ID header using RFC 2047 encoded
words and hence fix bpo-35805.
* Prevent folding of non-ascii message-id headers.
* Add fold method to MsgID token to prevent folding.
The default mappings are:
- :subject: UniqueUnstructuredHeader
- :date: UniqueDateHeader
- :resent-date: DateHeader
- :orig-date: UniqueDateHeader
- :sender: UniqueSingleAddressHeader
- :resent-sender: SingleAddressHeader
- :to: UniqueAddressHeader
- :resent-to: AddressHeader
- :cc: UniqueAddressHeader
- :resent-cc: AddressHeader
- :from: UniqueAddressHeader
- :resent-from: AddressHeader
- :reply-to: UniqueAddressHeader
+ :subject: UniqueUnstructuredHeader
+ :date: UniqueDateHeader
+ :resent-date: DateHeader
+ :orig-date: UniqueDateHeader
+ :sender: UniqueSingleAddressHeader
+ :resent-sender: SingleAddressHeader
+ :to: UniqueAddressHeader
+ :resent-to: AddressHeader
+ :cc: UniqueAddressHeader
+ :resent-cc: AddressHeader
+ :bcc: UniqueAddressHeader
+ :resent-bcc: AddressHeader
+ :from: UniqueAddressHeader
+ :resent-from: AddressHeader
+ :reply-to: UniqueAddressHeader
+ :mime-version: MIMEVersionHeader
+ :content-type: ContentTypeHeader
+ :content-disposition: ContentDispositionHeader
+ :content-transfer-encoding: ContentTransferEncodingHeader
+ :message-id: MessageIDHeader
``HeaderRegistry`` has the following methods:
class UnstructuredTokenList(TokenList):
-
token_type = 'unstructured'
class Phrase(TokenList):
-
token_type = 'phrase'
class Word(TokenList):
-
token_type = 'word'
class CFWSList(WhiteSpaceTokenList):
-
token_type = 'cfws'
class Atom(TokenList):
-
token_type = 'atom'
class Token(TokenList):
-
token_type = 'token'
encode_as_ew = False
class EncodedWord(TokenList):
-
token_type = 'encoded-word'
cte = None
charset = None
class DotAtom(TokenList):
-
token_type = 'dot-atom'
class DotAtomText(TokenList):
-
token_type = 'dot-atom-text'
as_ew_allowed = True
+class NoFoldLiteral(TokenList):
+ token_type = 'no-fold-literal'
+ as_ew_allowed = False
+
+
class AddrSpec(TokenList):
token_type = 'addr-spec'
class ContentType(ParameterizedHeaderValue):
-
token_type = 'content-type'
as_ew_allowed = False
maintype = 'text'
class ContentDisposition(ParameterizedHeaderValue):
-
token_type = 'content-disposition'
as_ew_allowed = False
content_disposition = None
class ContentTransferEncoding(TokenList):
-
token_type = 'content-transfer-encoding'
as_ew_allowed = False
cte = '7bit'
class HeaderLabel(TokenList):
-
token_type = 'header-label'
as_ew_allowed = False
-class Header(TokenList):
+class MsgID(TokenList):
+ token_type = 'msg-id'
+ as_ew_allowed = False
+
+ def fold(self, policy):
+ # message-id tokens may not be folded.
+ return str(self) + policy.linesep
+
+class MessageID(MsgID):
+ token_type = 'message-id'
+
+class Header(TokenList):
token_type = 'header'
addr_spec.append(token)
if not value or value[0] != '@':
addr_spec.defects.append(errors.InvalidHeaderDefect(
- "add-spec local part with no domain"))
+ "addr-spec local part with no domain"))
return addr_spec, value
addr_spec.append(ValueTerminal('@', 'address-at-symbol'))
token, value = get_domain(value[1:])
value = value[1:]
return address_list, value
+
+def get_no_fold_literal(value):
+ """ no-fold-literal = "[" *dtext "]"
+ """
+ no_fold_literal = NoFoldLiteral()
+ if not value:
+ raise errors.HeaderParseError(
+ "expected no-fold-literal but found '{}'".format(value))
+ if value[0] != '[':
+ raise errors.HeaderParseError(
+ "expected '[' at the start of no-fold-literal "
+ "but found '{}'".format(value))
+ no_fold_literal.append(ValueTerminal('[', 'no-fold-literal-start'))
+ value = value[1:]
+ token, value = get_dtext(value)
+ no_fold_literal.append(token)
+ if not value or value[0] != ']':
+ raise errors.HeaderParseError(
+ "expected ']' at the end of no-fold-literal "
+ "but found '{}'".format(value))
+ no_fold_literal.append(ValueTerminal(']', 'no-fold-literal-end'))
+ return no_fold_literal, value[1:]
+
+def get_msg_id(value):
+ """msg-id = [CFWS] "<" id-left '@' id-right ">" [CFWS]
+ id-left = dot-atom-text / obs-id-left
+ id-right = dot-atom-text / no-fold-literal / obs-id-right
+ no-fold-literal = "[" *dtext "]"
+ """
+ msg_id = MsgID()
+ if value[0] in CFWS_LEADER:
+ token, value = get_cfws(value)
+ msg_id.append(token)
+ if not value or value[0] != '<':
+ raise errors.HeaderParseError(
+ "expected msg-id but found '{}'".format(value))
+ msg_id.append(ValueTerminal('<', 'msg-id-start'))
+ value = value[1:]
+ # Parse id-left.
+ try:
+ token, value = get_dot_atom_text(value)
+ except errors.HeaderParseError:
+ try:
+ # obs-id-left is same as local-part of add-spec.
+ token, value = get_obs_local_part(value)
+ msg_id.defects.append(errors.ObsoleteHeaderDefect(
+ "obsolete id-left in msg-id"))
+ except errors.HeaderParseError:
+ raise errors.HeaderParseError(
+ "expected dot-atom-text or obs-id-left"
+ " but found '{}'".format(value))
+ msg_id.append(token)
+ if not value or value[0] != '@':
+ msg_id.defects.append(errors.InvalidHeaderDefect(
+ "msg-id with no id-right"))
+ # Even though there is no id-right, if the local part
+ # ends with `>` let's just parse it too and return
+ # along with the defect.
+ if value and value[0] == '>':
+ msg_id.append(ValueTerminal('>', 'msg-id-end'))
+ value = value[1:]
+ return msg_id, value
+ msg_id.append(ValueTerminal('@', 'address-at-symbol'))
+ value = value[1:]
+ # Parse id-right.
+ try:
+ token, value = get_dot_atom_text(value)
+ except errors.HeaderParseError:
+ try:
+ token, value = get_no_fold_literal(value)
+ except errors.HeaderParseError as e:
+ try:
+ token, value = get_domain(value)
+ msg_id.defects.append(errors.ObsoleteHeaderDefect(
+ "obsolete id-right in msg-id"))
+ except errors.HeaderParseError:
+ raise errors.HeaderParseError(
+ "expected dot-atom-text, no-fold-literal or obs-id-right"
+ " but found '{}'".format(value))
+ msg_id.append(token)
+ if value and value[0] == '>':
+ value = value[1:]
+ else:
+ msg_id.defects.append(errors.InvalidHeaderDefect(
+ "missing trailing '>' on msg-id"))
+ msg_id.append(ValueTerminal('>', 'msg-id-end'))
+ if value and value[0] in CFWS_LEADER:
+ token, value = get_cfws(value)
+ msg_id.append(token)
+ return msg_id, value
+
+
+def parse_message_id(value):
+ """message-id = "Message-ID:" msg-id CRLF
+ """
+ message_id = MessageID()
+ try:
+ token, value = get_msg_id(value)
+ except errors.HeaderParseError:
+ message_id.defects.append(errors.InvalidHeaderDefect(
+ "Expected msg-id but found {!r}".format(value)))
+ message_id.append(token)
+ return message_id
+
#
# XXX: As I begin to add additional header parsers, I'm realizing we probably
# have two level of parser routines: the get_XXX methods that get a token in
return self._cte
+class MessageIDHeader:
+
+ max_count = 1
+ value_parser = staticmethod(parser.parse_message_id)
+
+ @classmethod
+ def parse(cls, value, kwds):
+ kwds['parse_tree'] = parse_tree = cls.value_parser(value)
+ kwds['decoded'] = str(parse_tree)
+ kwds['defects'].extend(parse_tree.all_defects)
+
+
# The header factory #
_default_header_map = {
'content-type': ContentTypeHeader,
'content-disposition': ContentDispositionHeader,
'content-transfer-encoding': ContentTransferEncodingHeader,
+ 'message-id': MessageIDHeader,
}
class HeaderRegistry:
";foo", ";foo", ";foo", [errors.InvalidHeaderDefect]*3
)
+ # get_msg_id
+
+ def test_get_msg_id_valid(self):
+ msg_id = self._test_get_x(
+ parser.get_msg_id,
+ "<simeple.local@example.something.com>",
+ "<simeple.local@example.something.com>",
+ "<simeple.local@example.something.com>",
+ [],
+ '',
+ )
+ self.assertEqual(msg_id.token_type, 'msg-id')
+
+ def test_get_msg_id_obsolete_local(self):
+ msg_id = self._test_get_x(
+ parser.get_msg_id,
+ '<"simeple.local"@example.com>',
+ '<"simeple.local"@example.com>',
+ '<simeple.local@example.com>',
+ [errors.ObsoleteHeaderDefect],
+ '',
+ )
+ self.assertEqual(msg_id.token_type, 'msg-id')
+
+ def test_get_msg_id_non_folding_literal_domain(self):
+ msg_id = self._test_get_x(
+ parser.get_msg_id,
+ "<simple.local@[someexamplecom.domain]>",
+ "<simple.local@[someexamplecom.domain]>",
+ "<simple.local@[someexamplecom.domain]>",
+ [],
+ "",
+ )
+ self.assertEqual(msg_id.token_type, 'msg-id')
+
+
+ def test_get_msg_id_obsolete_domain_part(self):
+ msg_id = self._test_get_x(
+ parser.get_msg_id,
+ "<simplelocal@(old)example.com>",
+ "<simplelocal@(old)example.com>",
+ "<simplelocal@ example.com>",
+ [errors.ObsoleteHeaderDefect],
+ ""
+ )
+
+ def test_get_msg_id_no_id_right_part(self):
+ msg_id = self._test_get_x(
+ parser.get_msg_id,
+ "<simplelocal>",
+ "<simplelocal>",
+ "<simplelocal>",
+ [errors.InvalidHeaderDefect],
+ ""
+ )
+ self.assertEqual(msg_id.token_type, 'msg-id')
+
+ def test_get_msg_id_no_angle_start(self):
+ with self.assertRaises(errors.HeaderParseError):
+ parser.get_msg_id("msgwithnoankle")
+
+ def test_get_msg_id_no_angle_end(self):
+ msg_id = self._test_get_x(
+ parser.get_msg_id,
+ "<simplelocal@domain",
+ "<simplelocal@domain>",
+ "<simplelocal@domain>",
+ [errors.InvalidHeaderDefect],
+ ""
+ )
+ self.assertEqual(msg_id.token_type, 'msg-id')
+
@parameterize
class Test_parse_mime_parameters(TestParserMixin, TestEmailBase):
'xxxxxxxxxxxxxxxxxxxx=3D=3D-xxx-xx-xx?=\n'
' =?utf-8?q?=3E?=\n')
+ def test_message_id_header_is_not_folded(self):
+ h = self.make_header(
+ 'Message-ID',
+ '<somemessageidlongerthan@maxlinelength.com>')
+ self.assertEqual(
+ h.fold(policy=policy.default.clone(max_line_length=20)),
+ 'Message-ID: <somemessageidlongerthan@maxlinelength.com>\n')
+
+ # Test message-id isn't folded when id-right is no-fold-literal.
+ h = self.make_header(
+ 'Message-ID',
+ '<somemessageidlongerthan@[127.0.0.0.0.0.0.0.0.1]>')
+ self.assertEqual(
+ h.fold(policy=policy.default.clone(max_line_length=20)),
+ 'Message-ID: <somemessageidlongerthan@[127.0.0.0.0.0.0.0.0.1]>\n')
+
+ # Test message-id isn't folded when id-right is non-ascii characters.
+ h = self.make_header('Message-ID', '<ईमेल@wők.com>')
+ self.assertEqual(
+ h.fold(policy=policy.default.clone(max_line_length=30)),
+ 'Message-ID: <ईमेल@wők.com>\n')
+
+ # Test message-id is folded without breaking the msg-id token into
+ # encoded words, *even* if they don't fit into max_line_length.
+ h = self.make_header('Message-ID', '<ईमेलfromMessage@wők.com>')
+ self.assertEqual(
+ h.fold(policy=policy.default.clone(max_line_length=20)),
+ 'Message-ID:\n <ईमेलfromMessage@wők.com>\n')
if __name__ == '__main__':
unittest.main()
--- /dev/null
+Add parser for Message-ID header and add it to default HeaderRegistry. This
+should prevent folding of Message-ID using RFC 2048 encoded words.