cpython/Lib/test/test_pickletools.py

import io
import pickle
import pickletools
from test import support
from test.pickletester import AbstractPickleTests
import doctest
import unittest

class OptimizedPickleTests(AbstractPickleTests, unittest.TestCase):

    def dumps(self, arg, proto=None, **kwargs):
        return pickletools.optimize(pickle.dumps(arg, proto, **kwargs))

    def loads(self, buf, **kwds):
        return pickle.loads(buf, **kwds)

    # Test relies on precise output of dumps()
    test_pickle_to_2x = None

    # Test relies on writing by chunks into a file object.
    test_framed_write_sizes_with_delayed_writer = None

    def test_optimize_long_binget(self):
        data = [str(i) for i in range(257)]
        data.append(data[-1])
        for proto in range(pickle.HIGHEST_PROTOCOL + 1):
            pickled = pickle.dumps(data, proto)
            unpickled = pickle.loads(pickled)
            self.assertEqual(unpickled, data)
            self.assertIs(unpickled[-1], unpickled[-2])

            pickled2 = pickletools.optimize(pickled)
            unpickled2 = pickle.loads(pickled2)
            self.assertEqual(unpickled2, data)
            self.assertIs(unpickled2[-1], unpickled2[-2])
            self.assertNotIn(pickle.LONG_BINGET, pickled2)
            self.assertNotIn(pickle.LONG_BINPUT, pickled2)

    def test_optimize_binput_and_memoize(self):
        pickled = (b'\x80\x04\x95\x15\x00\x00\x00\x00\x00\x00\x00'
                   b']\x94(\x8c\x04spamq\x01\x8c\x03ham\x94h\x02e.')
        #    0: \x80 PROTO      4
        #    2: \x95 FRAME      21
        #   11: ]    EMPTY_LIST
        #   12: \x94 MEMOIZE
        #   13: (    MARK
        #   14: \x8c     SHORT_BINUNICODE 'spam'
        #   20: q        BINPUT     1
        #   22: \x8c     SHORT_BINUNICODE 'ham'
        #   27: \x94     MEMOIZE
        #   28: h        BINGET     2
        #   30: e        APPENDS    (MARK at 13)
        #   31: .    STOP
        self.assertIn(pickle.BINPUT, pickled)
        unpickled = pickle.loads(pickled)
        self.assertEqual(unpickled, ['spam', 'ham', 'ham'])
        self.assertIs(unpickled[1], unpickled[2])

        pickled2 = pickletools.optimize(pickled)
        unpickled2 = pickle.loads(pickled2)
        self.assertEqual(unpickled2, ['spam', 'ham', 'ham'])
        self.assertIs(unpickled2[1], unpickled2[2])
        self.assertNotIn(pickle.BINPUT, pickled2)


class SimpleReader:
    def __init__(self, data):
        self.data = data
        self.pos = 0

    def read(self, n):
        data = self.data[self.pos: self.pos + n]
        self.pos += n
        return data

    def readline(self):
        nl = self.data.find(b'\n', self.pos) + 1
        if not nl:
            nl = len(self.data)
        data = self.data[self.pos: nl]
        self.pos = nl
        return data


class GenopsTests(unittest.TestCase):
    def test_genops(self):
        it = pickletools.genops(b'(I123\nK\x12J\x12\x34\x56\x78t.')
        self.assertEqual([(item[0].name,) +  item[1:] for item in it], [
            ('MARK', None, 0),
            ('INT', 123, 1),
            ('BININT1', 0x12, 6),
            ('BININT', 0x78563412, 8),
            ('TUPLE', None, 13),
            ('STOP', None, 14),
        ])

    def test_from_file(self):
        f = io.BytesIO(b'prefix(I123\nK\x12J\x12\x34\x56\x78t.suffix')
        self.assertEqual(f.read(6), b'prefix')
        it = pickletools.genops(f)
        self.assertEqual([(item[0].name,) +  item[1:] for item in it], [
            ('MARK', None, 6),
            ('INT', 123, 7),
            ('BININT1', 0x12, 12),
            ('BININT', 0x78563412, 14),
            ('TUPLE', None, 19),
            ('STOP', None, 20),
        ])
        self.assertEqual(f.read(), b'suffix')

    def test_without_pos(self):
        f = SimpleReader(b'(I123\nK\x12J\x12\x34\x56\x78t.')
        it = pickletools.genops(f)
        self.assertEqual([(item[0].name,) +  item[1:] for item in it], [
            ('MARK', None, None),
            ('INT', 123, None),
            ('BININT1', 0x12, None),
            ('BININT', 0x78563412, None),
            ('TUPLE', None, None),
            ('STOP', None, None),
        ])

    def test_no_stop(self):
        it = pickletools.genops(b'N')
        item = next(it)
        self.assertEqual(item[0].name, 'NONE')
        with self.assertRaisesRegex(ValueError,
                'pickle exhausted before seeing STOP'):
            next(it)

    def test_truncated_data(self):
        it = pickletools.genops(b'I123')
        with self.assertRaisesRegex(ValueError,
                'no newline found when trying to read stringnl'):
            next(it)
        it = pickletools.genops(b'J\x12\x34')
        with self.assertRaisesRegex(ValueError,
                'not enough data in stream to read int4'):
            next(it)

    def test_unknown_opcode(self):
        it = pickletools.genops(b'N\xff')
        item = next(it)
        self.assertEqual(item[0].name, 'NONE')
        with self.assertRaisesRegex(ValueError,
                r"at position 1, opcode b'\\xff' unknown"):
            next(it)

    def test_unknown_opcode_without_pos(self):
        f = SimpleReader(b'N\xff')
        it = pickletools.genops(f)
        item = next(it)
        self.assertEqual(item[0].name, 'NONE')
        with self.assertRaisesRegex(ValueError,
                r"at position <unknown>, opcode b'\\xff' unknown"):
            next(it)


class DisTests(unittest.TestCase):
    maxDiff = None

    def check_dis(self, data, expected, **kwargs):
        out = io.StringIO()
        pickletools.dis(data, out=out, **kwargs)
        self.assertEqual(out.getvalue(), expected)

    def check_dis_error(self, data, expected, expected_error, **kwargs):
        out = io.StringIO()
        with self.assertRaisesRegex(ValueError, expected_error):
            pickletools.dis(data, out=out, **kwargs)
        self.assertEqual(out.getvalue(), expected)

    def test_mark(self):
        self.check_dis(b'(N(tl.', '''\
    0: (    MARK
    1: N        NONE
    2: (        MARK
    3: t            TUPLE      (MARK at 2)
    4: l        LIST       (MARK at 0)
    5: .    STOP
highest protocol among opcodes = 0
''')

    def test_indentlevel(self):
        self.check_dis(b'(N(tl.', '''\
    0: (    MARK
    1: N      NONE
    2: (      MARK
    3: t        TUPLE      (MARK at 2)
    4: l      LIST       (MARK at 0)
    5: .    STOP
highest protocol among opcodes = 0
''', indentlevel=2)

    def test_mark_without_pos(self):
        self.check_dis(SimpleReader(b'(N(tl.'), '''\
(    MARK
N        NONE
(        MARK
t            TUPLE      (MARK at unknown opcode offset)
l        LIST       (MARK at unknown opcode offset)
.    STOP
highest protocol among opcodes = 0
''')

    def test_no_mark(self):
        self.check_dis_error(b'Nt.', '''\
    0: N    NONE
    1: t    TUPLE
''', 'no MARK exists on stack')

    def test_put(self):
        self.check_dis(b'Np0\nq\x01r\x02\x00\x00\x00\x94.', '''\
    0: N    NONE
    1: p    PUT        0
    4: q    BINPUT     1
    6: r    LONG_BINPUT 2
   11: \\x94 MEMOIZE    (as 3)
   12: .    STOP
highest protocol among opcodes = 4
''')

    def test_put_redefined(self):
        self.check_dis(b'Np1\np1\nq\x01r\x01\x00\x00\x00\x94.', '''\
    0: N    NONE
    1: p    PUT        1
    4: p    PUT        1
    7: q    BINPUT     1
    9: r    LONG_BINPUT 1
   14: \\x94 MEMOIZE    (as 1)
   15: .    STOP
highest protocol among opcodes = 4
''')

    def test_put_empty_stack(self):
        self.check_dis_error(b'p0\n', '''\
    0: p    PUT        0
''', "stack is empty -- can't store into memo")

    def test_put_markobject(self):
        self.check_dis_error(b'(p0\n', '''\
    0: (    MARK
    1: p        PUT        0
''', "can't store markobject in the memo")

    def test_get(self):
        self.check_dis(b'(Np1\ng1\nh\x01j\x01\x00\x00\x00t.', '''\
    0: (    MARK
    1: N        NONE
    2: p        PUT        1
    5: g        GET        1
    8: h        BINGET     1
   10: j        LONG_BINGET 1
   15: t        TUPLE      (MARK at 0)
   16: .    STOP
highest protocol among opcodes = 1
''')

    def test_get_without_put(self):
        self.check_dis_error(b'g1\n.', '''\
    0: g    GET        1
''', 'memo key 1 has never been stored into')
        self.check_dis_error(b'h\x01.', '''\
    0: h    BINGET     1
''', 'memo key 1 has never been stored into')
        self.check_dis_error(b'j\x01\x00\x00\x00.', '''\
    0: j    LONG_BINGET 1
''', 'memo key 1 has never been stored into')

    def test_memo(self):
        memo = {}
        self.check_dis(b'Np1\n.', '''\
    0: N    NONE
    1: p    PUT        1
    4: .    STOP
highest protocol among opcodes = 0
''', memo=memo)
        self.check_dis(b'g1\n.', '''\
    0: g    GET        1
    3: .    STOP
highest protocol among opcodes = 0
''', memo=memo)

    def test_mark_pop(self):
        self.check_dis(b'(N00N.', '''\
    0: (    MARK
    1: N        NONE
    2: 0        POP
    3: 0        POP        (MARK at 0)
    4: N    NONE
    5: .    STOP
highest protocol among opcodes = 0
''')

    def test_too_small_stack(self):
        self.check_dis_error(b'a', '''\
    0: a    APPEND
''', 'tries to pop 2 items from stack with only 0 items')
        self.check_dis_error(b']a', '''\
    0: ]    EMPTY_LIST
    1: a    APPEND
''', 'tries to pop 2 items from stack with only 1 items')

    def test_no_stop(self):
        self.check_dis_error(b'N', '''\
    0: N    NONE
''', 'pickle exhausted before seeing STOP')

    def test_truncated_data(self):
        self.check_dis_error(b'NI123', '''\
    0: N    NONE
''', 'no newline found when trying to read stringnl')
        self.check_dis_error(b'NJ\x12\x34', '''\
    0: N    NONE
''', 'not enough data in stream to read int4')

    def test_unknown_opcode(self):
        self.check_dis_error(b'N\xff', '''\
    0: N    NONE
''', r"at position 1, opcode b'\\xff' unknown")

    def test_stop_not_empty_stack(self):
        self.check_dis_error(b']N.', '''\
    0: ]    EMPTY_LIST
    1: N    NONE
    2: .    STOP
highest protocol among opcodes = 1
''', r'stack not empty after STOP: \[list\]')

    def test_annotate(self):
        self.check_dis(b'(Nt.', '''\
    0: (    MARK Push markobject onto the stack.
    1: N        NONE Push None on the stack.
    2: t        TUPLE      (MARK at 0) Build a tuple out of the topmost stack slice, after markobject.
    3: .    STOP                       Stop the unpickling machine.
highest protocol among opcodes = 0
''', annotate=1)
        self.check_dis(b'(Nt.', '''\
    0: (    MARK            Push markobject onto the stack.
    1: N        NONE        Push None on the stack.
    2: t        TUPLE      (MARK at 0) Build a tuple out of the topmost stack slice, after markobject.
    3: .    STOP                       Stop the unpickling machine.
highest protocol among opcodes = 0
''', annotate=20)
        self.check_dis(b'(((((((ttttttt.', '''\
    0: (    MARK            Push markobject onto the stack.
    1: (        MARK        Push markobject onto the stack.
    2: (            MARK    Push markobject onto the stack.
    3: (                MARK Push markobject onto the stack.
    4: (                    MARK Push markobject onto the stack.
    5: (                        MARK Push markobject onto the stack.
    6: (                            MARK Push markobject onto the stack.
    7: t                                TUPLE      (MARK at 6) Build a tuple out of the topmost stack slice, after markobject.
    8: t                            TUPLE      (MARK at 5) Build a tuple out of the topmost stack slice, after markobject.
    9: t                        TUPLE      (MARK at 4) Build a tuple out of the topmost stack slice, after markobject.
   10: t                    TUPLE      (MARK at 3)     Build a tuple out of the topmost stack slice, after markobject.
   11: t                TUPLE      (MARK at 2)         Build a tuple out of the topmost stack slice, after markobject.
   12: t            TUPLE      (MARK at 1)             Build a tuple out of the topmost stack slice, after markobject.
   13: t        TUPLE      (MARK at 0)                 Build a tuple out of the topmost stack slice, after markobject.
   14: .    STOP                                       Stop the unpickling machine.
highest protocol among opcodes = 0
''', annotate=20)

    def test_string(self):
        self.check_dis(b"S'abc'\n.", '''\
    0: S    STRING     'abc'
    7: .    STOP
highest protocol among opcodes = 0
''')
        self.check_dis(b'S"abc"\n.', '''\
    0: S    STRING     'abc'
    7: .    STOP
highest protocol among opcodes = 0
''')
        self.check_dis(b"S'\xc3\xb5'\n.", '''\
    0: S    STRING     '\\xc3\\xb5'
    6: .    STOP
highest protocol among opcodes = 0
''')

    def test_string_without_quotes(self):
        self.check_dis_error(b"Sabc'\n.", '',
                             'no string quotes around b"abc\'"')
        self.check_dis_error(b'Sabc"\n.', '',
                             "no string quotes around b'abc\"'")
        self.check_dis_error(b"S'abc\n.", '',
                             '''strinq quote b"'" not found at both ends of b"'abc"''')
        self.check_dis_error(b'S"abc\n.', '',
                             r"""strinq quote b'"' not found at both ends of b'"abc'""")
        self.check_dis_error(b"S'abc\"\n.", '',
                             r"""strinq quote b"'" not found at both ends of b'\\'abc"'""")
        self.check_dis_error(b"S\"abc'\n.", '',
                             r"""strinq quote b'"' not found at both ends of b'"abc\\''""")

    def test_binstring(self):
        self.check_dis(b"T\x03\x00\x00\x00abc.", '''\
    0: T    BINSTRING  'abc'
    8: .    STOP
highest protocol among opcodes = 1
''')
        self.check_dis(b"T\x02\x00\x00\x00\xc3\xb5.", '''\
    0: T    BINSTRING  '\\xc3\\xb5'
    7: .    STOP
highest protocol among opcodes = 1
''')

    def test_short_binstring(self):
        self.check_dis(b"U\x03abc.", '''\
    0: U    SHORT_BINSTRING 'abc'
    5: .    STOP
highest protocol among opcodes = 1
''')
        self.check_dis(b"U\x02\xc3\xb5.", '''\
    0: U    SHORT_BINSTRING '\\xc3\\xb5'
    4: .    STOP
highest protocol among opcodes = 1
''')

    def test_global(self):
        self.check_dis(b"cmodule\nname\n.", '''\
    0: c    GLOBAL     'module name'
   13: .    STOP
highest protocol among opcodes = 0
''')
        self.check_dis(b"cm\xc3\xb6dule\nn\xc3\xa4me\n.", '''\
    0: c    GLOBAL     'm\xf6dule n\xe4me'
   15: .    STOP
highest protocol among opcodes = 0
''')

    def test_inst(self):
        self.check_dis(b"(imodule\nname\n.", '''\
    0: (    MARK
    1: i        INST       'module name' (MARK at 0)
   14: .    STOP
highest protocol among opcodes = 0
''')

    def test_persid(self):
        self.check_dis(b"Pabc\n.", '''\
    0: P    PERSID     'abc'
    5: .    STOP
highest protocol among opcodes = 0
''')


class MiscTestCase(unittest.TestCase):
    def test__all__(self):
        not_exported = {
            'bytes_types',
            'UP_TO_NEWLINE', 'TAKEN_FROM_ARGUMENT1',
            'TAKEN_FROM_ARGUMENT4', 'TAKEN_FROM_ARGUMENT4U',
            'TAKEN_FROM_ARGUMENT8U', 'ArgumentDescriptor',
            'read_uint1', 'read_uint2', 'read_int4', 'read_uint4',
            'read_uint8', 'read_stringnl', 'read_stringnl_noescape',
            'read_stringnl_noescape_pair', 'read_string1',
            'read_string4', 'read_bytes1', 'read_bytes4',
            'read_bytes8', 'read_bytearray8', 'read_unicodestringnl',
            'read_unicodestring1', 'read_unicodestring4',
            'read_unicodestring8', 'read_decimalnl_short',
            'read_decimalnl_long', 'read_floatnl', 'read_float8',
            'read_long1', 'read_long4',
            'uint1', 'uint2', 'int4', 'uint4', 'uint8', 'stringnl',
            'stringnl_noescape', 'stringnl_noescape_pair', 'string1',
            'string4', 'bytes1', 'bytes4', 'bytes8', 'bytearray8',
            'unicodestringnl', 'unicodestring1', 'unicodestring4',
            'unicodestring8', 'decimalnl_short', 'decimalnl_long',
            'floatnl', 'float8', 'long1', 'long4',
            'StackObject',
            'pyint', 'pylong', 'pyinteger_or_bool', 'pybool', 'pyfloat',
            'pybytes_or_str', 'pystring', 'pybytes', 'pybytearray',
            'pyunicode', 'pynone', 'pytuple', 'pylist', 'pydict',
            'pyset', 'pyfrozenset', 'pybuffer', 'anyobject',
            'markobject', 'stackslice', 'OpcodeInfo', 'opcodes',
            'code2op',
        }
        support.check__all__(self, pickletools, not_exported=not_exported)


def load_tests(loader, tests, pattern):
    tests.addTest(doctest.DocTestSuite(pickletools))
    return tests


if __name__ == "__main__":
    unittest.main()