cpython/Lib/test/test_email/test_defect_handling.py

import textwrap
import unittest
import contextlib
from email import policy
from email import errors
from test.test_email import TestEmailBase


class TestDefectsBase:

    policy = policy.default
    raise_expected = False

    @contextlib.contextmanager
    def _raise_point(self, defect):
        yield

    def test_same_boundary_inner_outer(self):
        source = textwrap.dedent("""\
            Subject: XX
            From: [email protected]
            To: XX
            Mime-version: 1.0
            Content-type: multipart/mixed;
               boundary="MS_Mac_OE_3071477847_720252_MIME_Part"

            --MS_Mac_OE_3071477847_720252_MIME_Part
            Content-type: multipart/alternative;
               boundary="MS_Mac_OE_3071477847_720252_MIME_Part"

            --MS_Mac_OE_3071477847_720252_MIME_Part
            Content-type: text/plain; charset="ISO-8859-1"
            Content-transfer-encoding: quoted-printable

            text

            --MS_Mac_OE_3071477847_720252_MIME_Part
            Content-type: text/html; charset="ISO-8859-1"
            Content-transfer-encoding: quoted-printable

            <HTML></HTML>

            --MS_Mac_OE_3071477847_720252_MIME_Part--

            --MS_Mac_OE_3071477847_720252_MIME_Part
            Content-type: image/gif; name="xx.gif";
            Content-disposition: attachment
            Content-transfer-encoding: base64

            Some removed base64 encoded chars.

            --MS_Mac_OE_3071477847_720252_MIME_Part--

            """)
        # XXX better would be to actually detect the duplicate.
        with self._raise_point(errors.StartBoundaryNotFoundDefect):
            msg = self._str_msg(source)
        if self.raise_expected: return
        inner = msg.get_payload(0)
        self.assertTrue(hasattr(inner, 'defects'))
        self.assertEqual(len(self.get_defects(inner)), 1)
        self.assertIsInstance(self.get_defects(inner)[0],
                              errors.StartBoundaryNotFoundDefect)

    def test_multipart_no_boundary(self):
        source = textwrap.dedent("""\
            Date: Fri, 6 Apr 2001 09:23:06 -0800 (GMT-0800)
            From: foobar
            Subject: broken mail
            MIME-Version: 1.0
            Content-Type: multipart/report; report-type=delivery-status;

            --JAB03225.986577786/zinfandel.lacita.com

            One part

            --JAB03225.986577786/zinfandel.lacita.com
            Content-Type: message/delivery-status

            Header: Another part

            --JAB03225.986577786/zinfandel.lacita.com--
            """)
        with self._raise_point(errors.NoBoundaryInMultipartDefect):
            msg = self._str_msg(source)
        if self.raise_expected: return
        self.assertIsInstance(msg.get_payload(), str)
        self.assertEqual(len(self.get_defects(msg)), 2)
        self.assertIsInstance(self.get_defects(msg)[0],
                              errors.NoBoundaryInMultipartDefect)
        self.assertIsInstance(self.get_defects(msg)[1],
                              errors.MultipartInvariantViolationDefect)

    multipart_msg = textwrap.dedent("""\
        Date: Wed, 14 Nov 2007 12:56:23 GMT
        From: [email protected]
        To: [email protected]
        Subject: Content-Transfer-Encoding: base64 and multipart
        MIME-Version: 1.0
        Content-Type: multipart/mixed;
            boundary="===============3344438784458119861=="{}

        --===============3344438784458119861==
        Content-Type: text/plain

        Test message

        --===============3344438784458119861==
        Content-Type: application/octet-stream
        Content-Transfer-Encoding: base64

        YWJj

        --===============3344438784458119861==--
        """)

    def test_multipart_invalid_cte(self):
        with self._raise_point(
                errors.InvalidMultipartContentTransferEncodingDefect):
            msg = self._str_msg(
                    self.multipart_msg.format(
                        "\nContent-Transfer-Encoding: base64"))
        if self.raise_expected: return
        self.assertEqual(len(self.get_defects(msg)), 1)
        self.assertIsInstance(self.get_defects(msg)[0],
            errors.InvalidMultipartContentTransferEncodingDefect)

    def test_multipart_no_cte_no_defect(self):
        if self.raise_expected: return
        msg = self._str_msg(self.multipart_msg.format(''))
        self.assertEqual(len(self.get_defects(msg)), 0)

    def test_multipart_valid_cte_no_defect(self):
        if self.raise_expected: return
        for cte in ('7bit', '8bit', 'BINary'):
            msg = self._str_msg(
                self.multipart_msg.format("\nContent-Transfer-Encoding: "+cte))
            self.assertEqual(len(self.get_defects(msg)), 0, "cte="+cte)

    def test_lying_multipart(self):
        source = textwrap.dedent("""\
            From: "Allison Dunlap" <[email protected]>
            To: [email protected]
            Subject: 64423
            Date: Sun, 11 Jul 2004 16:09:27 -0300
            MIME-Version: 1.0
            Content-Type: multipart/alternative;

            Blah blah blah
            """)
        with self._raise_point(errors.NoBoundaryInMultipartDefect):
            msg = self._str_msg(source)
        if self.raise_expected: return
        self.assertTrue(hasattr(msg, 'defects'))
        self.assertEqual(len(self.get_defects(msg)), 2)
        self.assertIsInstance(self.get_defects(msg)[0],
                              errors.NoBoundaryInMultipartDefect)
        self.assertIsInstance(self.get_defects(msg)[1],
                              errors.MultipartInvariantViolationDefect)

    def test_missing_start_boundary(self):
        source = textwrap.dedent("""\
            Content-Type: multipart/mixed; boundary="AAA"
            From: Mail Delivery Subsystem <[email protected]>
            To: [email protected]

            --AAA

            Stuff

            --AAA
            Content-Type: message/rfc822

            From: [email protected]
            To: [email protected]
            Content-Type: multipart/mixed; boundary="BBB"

            --BBB--

            --AAA--

            """)
        # The message structure is:
        #
        # multipart/mixed
        #    text/plain
        #    message/rfc822
        #        multipart/mixed [*]
        #
        # [*] This message is missing its start boundary
        with self._raise_point(errors.StartBoundaryNotFoundDefect):
            outer = self._str_msg(source)
        if self.raise_expected: return
        bad = outer.get_payload(1).get_payload(0)
        self.assertEqual(len(self.get_defects(bad)), 1)
        self.assertIsInstance(self.get_defects(bad)[0],
                              errors.StartBoundaryNotFoundDefect)

    def test_first_line_is_continuation_header(self):
        with self._raise_point(errors.FirstHeaderLineIsContinuationDefect):
            msg = self._str_msg(' Line 1\nSubject: test\n\nbody')
        if self.raise_expected: return
        self.assertEqual(msg.keys(), ['Subject'])
        self.assertEqual(msg.get_payload(), 'body')
        self.assertEqual(len(self.get_defects(msg)), 1)
        self.assertDefectsEqual(self.get_defects(msg),
                                 [errors.FirstHeaderLineIsContinuationDefect])
        self.assertEqual(self.get_defects(msg)[0].line, ' Line 1\n')

    def test_missing_header_body_separator(self):
        # Our heuristic if we see a line that doesn't look like a header (no
        # leading whitespace but no ':') is to assume that the blank line that
        # separates the header from the body is missing, and to stop parsing
        # headers and start parsing the body.
        with self._raise_point(errors.MissingHeaderBodySeparatorDefect):
            msg = self._str_msg('Subject: test\nnot a header\nTo: abc\n\nb\n')
        if self.raise_expected: return
        self.assertEqual(msg.keys(), ['Subject'])
        self.assertEqual(msg.get_payload(), 'not a header\nTo: abc\n\nb\n')
        self.assertDefectsEqual(self.get_defects(msg),
                                [errors.MissingHeaderBodySeparatorDefect])

    def test_bad_padding_in_base64_payload(self):
        source = textwrap.dedent("""\
            Subject: test
            MIME-Version: 1.0
            Content-Type: text/plain; charset="utf-8"
            Content-Transfer-Encoding: base64

            dmk
            """)
        msg = self._str_msg(source)
        with self._raise_point(errors.InvalidBase64PaddingDefect):
            payload = msg.get_payload(decode=True)
        if self.raise_expected: return
        self.assertEqual(payload, b'vi')
        self.assertDefectsEqual(self.get_defects(msg),
                                [errors.InvalidBase64PaddingDefect])

    def test_invalid_chars_in_base64_payload(self):
        source = textwrap.dedent("""\
            Subject: test
            MIME-Version: 1.0
            Content-Type: text/plain; charset="utf-8"
            Content-Transfer-Encoding: base64

            dm\x01k===
            """)
        msg = self._str_msg(source)
        with self._raise_point(errors.InvalidBase64CharactersDefect):
            payload = msg.get_payload(decode=True)
        if self.raise_expected: return
        self.assertEqual(payload, b'vi')
        self.assertDefectsEqual(self.get_defects(msg),
                                [errors.InvalidBase64CharactersDefect])

    def test_invalid_length_of_base64_payload(self):
        source = textwrap.dedent("""\
            Subject: test
            MIME-Version: 1.0
            Content-Type: text/plain; charset="utf-8"
            Content-Transfer-Encoding: base64

            abcde
            """)
        msg = self._str_msg(source)
        with self._raise_point(errors.InvalidBase64LengthDefect):
            payload = msg.get_payload(decode=True)
        if self.raise_expected: return
        self.assertEqual(payload, b'abcde')
        self.assertDefectsEqual(self.get_defects(msg),
                                [errors.InvalidBase64LengthDefect])

    def test_missing_ending_boundary(self):
        source = textwrap.dedent("""\
            To: [email protected]
            Subject: Fwd: 1
            MIME-Version: 1.0
            Content-Type: multipart/alternative;
             boundary="------------000101020201080900040301"

            --------------000101020201080900040301
            Content-Type: text/plain; charset=ISO-8859-1
            Content-Transfer-Encoding: 7bit

            Alternative 1

            --------------000101020201080900040301
            Content-Type: text/html; charset=ISO-8859-1
            Content-Transfer-Encoding: 7bit

            Alternative 2

            """)
        with self._raise_point(errors.CloseBoundaryNotFoundDefect):
            msg = self._str_msg(source)
        if self.raise_expected: return
        self.assertEqual(len(msg.get_payload()), 2)
        self.assertEqual(msg.get_payload(1).get_payload(), 'Alternative 2\n')
        self.assertDefectsEqual(self.get_defects(msg),
                                [errors.CloseBoundaryNotFoundDefect])


class TestDefectDetection(TestDefectsBase, TestEmailBase):

    def get_defects(self, obj):
        return obj.defects


class TestDefectCapture(TestDefectsBase, TestEmailBase):

    class CapturePolicy(policy.EmailPolicy):
        captured = None
        def register_defect(self, obj, defect):
            self.captured.append(defect)

    def setUp(self):
        self.policy = self.CapturePolicy(captured=list())

    def get_defects(self, obj):
        return self.policy.captured


class TestDefectRaising(TestDefectsBase, TestEmailBase):

    policy = TestDefectsBase.policy
    policy = policy.clone(raise_on_defect=True)
    raise_expected = True

    @contextlib.contextmanager
    def _raise_point(self, defect):
        with self.assertRaises(defect):
            yield


if __name__ == '__main__':
    unittest.main()