folly/folly/python/test/iobuf.py

#!/usr/bin/env python3
# Copyright (c) Meta Platforms, Inc. and affiliates.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# pyre-unsafe

import array
import struct
import sys
import unittest

from folly.iobuf import IOBuf, WritableIOBuf

from .iobuf_helper import (
    get_empty_chain,
    get_empty_writable_chain,
    make_chain,
    to_uppercase_string,
    to_uppercase_string_heap,
)


class IOBufTests(unittest.TestCase):
    def test_empty_chain(self) -> None:
        ebuf = get_empty_chain()
        self.assertFalse(ebuf)
        self.assertTrue(ebuf.is_chained)
        self.assertEqual(len(ebuf), 0)
        self.assertEqual(ebuf.chain_size(), 0)
        self.assertEqual(ebuf.chain_count(), 8)
        self.assertEqual(b"".join(ebuf), b"")
        self.assertEqual(b"", bytes(ebuf))

    def test_chain(self) -> None:
        control = [b"facebook", b"thrift", b"python3", b"cython"]
        chain = make_chain([IOBuf(x) for x in control])
        self.assertTrue(chain.is_chained)
        self.assertTrue(chain)
        self.assertEqual(bytes(chain), control[0])
        self.assertEqual(len(chain), len(control[0]))
        self.assertEqual(chain.chain_size(), sum(len(x) for x in control))
        self.assertEqual(chain.chain_count(), len(control))
        # pyre-fixme[6]: For 1st argument expected `Buffer` but got `Optional[IOBuf]`.
        self.assertEqual(memoryview(chain.next), control[1])
        self.assertEqual(b"".join(chain), b"".join(control))

    def test_cyclic_chain(self) -> None:
        control = [b"aaa", b"aaaa"]
        chain = make_chain([IOBuf(x) for x in control])
        self.assertTrue(chain.is_chained)
        self.assertTrue(chain)
        self.assertEqual(bytes(chain), control[0])
        self.assertEqual(len(chain), len(control[0]))
        self.assertEqual(chain.chain_size(), sum(len(x) for x in control))
        self.assertEqual(chain.chain_count(), len(control))
        # pyre-fixme[6]: For 1st argument expected `Buffer` but got `Optional[IOBuf]`.
        self.assertEqual(memoryview(chain.next), control[1])
        self.assertEqual(b"".join(chain), b"".join(control))

    def test_hash(self) -> None:
        x = b"omg"
        y = b"wtf"
        xb = IOBuf(x)
        yb = IOBuf(y)
        hash(xb)
        self.assertNotEqual(hash(xb), hash(yb))
        self.assertEqual(hash(xb), hash(IOBuf(x)))

    def test_empty(self) -> None:
        x = b""
        xb = IOBuf(x)
        self.assertEqual(memoryview(xb), x)
        self.assertEqual(bytes(xb), x)
        self.assertFalse(xb)
        self.assertEqual(len(xb), len(x))

    def test_iter(self) -> None:
        x = b"testtest"
        xb = IOBuf(x)
        self.assertEqual(b"".join(iter(xb)), x)

    def test_bytes(self) -> None:
        x = b"omgwtfbbq"
        xb = IOBuf(x)
        self.assertEqual(bytes(xb), x)

    def test_cmp(self) -> None:
        x = IOBuf(b"abc")
        y = IOBuf(b"def")
        z = IOBuf(b"abc")
        self.assertEqual(x, z)
        self.assertNotEqual(x, y)
        self.assertLess(x, y)
        self.assertLessEqual(x, y)
        self.assertLessEqual(x, z)
        self.assertGreater(y, x)
        self.assertGreaterEqual(y, x)

    def test_typed(self) -> None:
        # pyre-fixme[6]: Expected `Union[IOBuf, bytearray, bytes, memoryview]` for
        #  1st param but got `array[int]`.
        x = IOBuf(array.array("l", [1, 2, 3, 4, 5]))
        self.assertEqual(x.chain_size(), 5 * struct.calcsize("l"))

    def test_unshaped(self) -> None:
        x = IOBuf(memoryview(b"a").cast("B", shape=[]))
        self.assertEqual(x.chain_size(), 1)

    def test_multidimensional(self) -> None:
        x = IOBuf(memoryview(b"abcdef").cast("B", shape=[3, 2]))
        self.assertEqual(x.chain_size(), 6)

    def test_conversion_from_python_to_cpp(self) -> None:
        iobuf = make_chain(
            [
                IOBuf(memoryview(b"abc")),
                IOBuf(memoryview(b"def")),
                IOBuf(memoryview(b"ghi")),
            ]
        )
        self.assertEqual(to_uppercase_string(iobuf), "ABCDEFGHI")
        self.assertEqual(to_uppercase_string_heap(iobuf), "ABCDEFGHI")

    def test_conversion_from_python_to_cpp_with_wrong_type(self) -> None:
        not_an_iobuf = [1, 2, 3]
        with self.assertRaises(TypeError):
            to_uppercase_string(not_an_iobuf)
        with self.assertRaises(TypeError):
            to_uppercase_string_heap(not_an_iobuf)

    def test_buffer_read_fail(self) -> None:
        finish = bytearray(b"1234567890123456")
        xb = IOBuf(finish)
        buf = memoryview(xb)

        try:
            self.assertEqual(bytes(buf[17]), None)
            self.fail("Expected exception for reading out of bounds")
        except IndexError as e:
            self.assertEqual(
                str(e),
                "index out of bounds on dimension 1",
            )


class WritableIOBufTests(unittest.TestCase):
    def test_bytes_writable(self) -> None:
        x = bytearray(b"omgwtfbbq")
        xb = WritableIOBuf(x)
        self.assertEqual(bytes(xb), x)
        self.assertEqual(xb.writable(), True)

    def test_buffer_overwrite(self) -> None:
        start = b"omgwtfbbq"
        finish = b"123456789"
        x = bytearray(start)
        xb = WritableIOBuf(x)
        buf = memoryview(xb)

        self.assertEqual(bytes(buf), start)
        buf[:] = finish
        self.assertEqual(bytes(buf), finish)

        self.assertNotEqual(bytes(buf), start)

    def test_buffer_write_empty(self) -> None:
        start = bytearray(9)
        finish = b"123456789"
        xb = WritableIOBuf(bytearray(9))
        buf = memoryview(xb)

        self.assertEqual(bytes(buf), start)
        buf[:] = finish
        self.assertEqual(bytes(buf), finish)
        self.assertEqual(memoryview(xb), finish)

        self.assertNotEqual(bytes(buf), start)

    def test_buffer_update_in_place(self) -> None:
        x = bytearray(b"123")
        xb = WritableIOBuf(x)
        memoryview(xb)[:] = b"456"
        self.assertEqual(x, b"456")  # xb wrapped x, so by mutating xb we mutated x

    def test_buffer_write_out_of_bounds(self) -> None:
        start = bytearray(9)
        finish = b"1234567890"
        xb = WritableIOBuf(bytearray(9))
        buf = memoryview(xb)

        self.assertEqual(bytes(buf), start)
        try:
            buf[:] = finish
            self.fail("Expected exception for writing out of bounds")
        except ValueError as e:
            self.assertEqual(
                str(e),
                "memoryview assignment: lvalue and rvalue have different structures",
            )
        self.assertEqual(bytes(buf), start)

    def test_buffer_read_out_of_bounds(self) -> None:
        start = bytearray(b"1234567890")
        xb = WritableIOBuf(start)
        buf = memoryview(xb)

        self.assertEqual(bytes(buf), start)
        try:
            self.assertEqual(bytes(buf[10]), None)
            self.fail("Expected exception for reading out of bounds")
        except IndexError as e:
            self.assertEqual(
                str(e),
                "index out of bounds on dimension 1",
            )

    def test_buffer_write_empty_pieces(self) -> None:
        start = bytearray(9)
        builder = bytearray(9)
        piece_1 = b"123"
        piece_2 = b"456"
        piece_3 = b"789"
        xb = WritableIOBuf(start)

        buf = memoryview(xb)
        buf[0:3] = piece_1
        for i in range(3):
            builder[i] = piece_1[i]
        self.assertEqual(bytes(buf), builder)

        buf = memoryview(xb)
        buf[3:6] = piece_2
        for i in range(3):
            builder[i + 3] = piece_2[i]
        self.assertEqual(bytes(buf), builder)

        buf = memoryview(xb)
        buf[6:9] = piece_3
        for i in range(3):
            builder[i + 6] = piece_3[i]
        self.assertEqual(bytes(buf), builder)

    def test_empty_writable_chain(self) -> None:
        ebuf = get_empty_writable_chain()
        self.assertFalse(ebuf)
        self.assertTrue(ebuf.is_chained)
        self.assertEqual(len(ebuf), 0)
        self.assertEqual(ebuf.chain_size(), 0)
        self.assertEqual(ebuf.chain_count(), 8)
        self.assertEqual(b"".join(ebuf), b"")
        self.assertEqual(b"", bytes(ebuf))

    def test_appendable_writable_chain(self) -> None:
        x = bytearray(b"omgwtfbbq")
        xb = WritableIOBuf(x)
        self.assertFalse(xb.is_chained)

        y = bytearray(b"wtfbbqomg")
        yb = WritableIOBuf(y)
        xb.append_to_chain(yb)

        self.assertTrue(xb.is_chained)
        self.assertEqual(len(xb), 9)
        self.assertEqual(xb.chain_size(), 18)
        self.assertEqual(xb.chain_count(), 2)
        self.assertEqual(b"".join(xb), b"".join([x, y]))

    def test_appendable_writable_chain_overwrite(self) -> None:
        start = bytearray(9)
        x = bytearray(b"omgwtfbbq")
        for i in range(9):
            start[i] = x[i]

        xb = WritableIOBuf(x)
        self.assertFalse(xb.is_chained)

        y = bytearray(b"wtfbbqomg")
        yb = WritableIOBuf(y)
        xb.append_to_chain(yb)

        finish = b"123456789"
        buf = memoryview(xb)
        buf[:] = finish

        self.assertTrue(xb.is_chained)
        self.assertEqual(len(xb), 9)
        self.assertEqual(xb.chain_size(), 18)
        self.assertEqual(xb.chain_count(), 2)
        self.assertEqual(b"".join(xb), b"".join([x, y]))
        self.assertEqual(bytes(buf), x)

        self.assertNotEqual(b"".join(xb), b"".join([start, y]))

    def test_appendable_writable_chain_coalesce(self) -> None:
        x = bytearray(b"omgwtfbbq")
        xb = WritableIOBuf(x)
        self.assertFalse(xb.is_chained)

        y = bytearray(b"wtfbbqomg")
        yb = WritableIOBuf(y)
        xb.append_to_chain(yb)

        xb.coalesce()
        self.assertFalse(xb.is_chained)
        self.assertEqual(len(xb), 18)
        self.assertEqual(xb.chain_size(), 18)
        self.assertEqual(xb.chain_count(), 1)

        buf = memoryview(xb)
        self.assertEqual(bytes(buf), b"".join([x, y]))

    def test_appendable_writable_chain_coalesce_exception(self) -> None:
        x = bytearray(b"omgwtfbbq")
        xb = WritableIOBuf(x)
        self.assertFalse(xb.is_chained)

        y = bytearray(b"wtfbbqomg")
        yb = WritableIOBuf(y)
        xb.append_to_chain(yb)
        test = xb

        self.assertEqual(test, xb)
        self.assertEqual(sys.getrefcount(xb), 3)
        try:
            xb.coalesce()
            self.fail("Expected exception for too many references")
        except RuntimeError as e:
            self.assertEqual(
                str(e), "Cannot coalesce IOBuf with more than one reference"
            )

    def test_buffer_creation_with_size_write(self) -> None:
        finish = bytearray(b"123456789012345")
        xb = WritableIOBuf.create_unitialized(17)
        buf = memoryview(xb)

        self.assertEqual(len(xb), 0)
        self.assertEqual(len(buf), 0)
        self.assertEqual(bytes(buf), b"")
        self.assertEqual(xb.length(), 0)
        xb.append(15)
        buf = memoryview(xb)
        self.assertNotEqual(bytes(buf), b"")
        buf[0:15] = finish
        self.assertEqual(len(xb), 15)
        self.assertEqual(len(memoryview(xb)), 15)
        self.assertEqual(memoryview(xb), finish)
        self.assertEqual(buf, finish)
        self.assertEqual(xb.length(), 15)
        self.assertEqual(bytes(xb), b"123456789012345")

    def test_buffer_creation_with_size_write_fail(self) -> None:
        finish = bytearray(b"12345678901234567")
        xb = WritableIOBuf.create_unitialized(16)
        xb.append(16)
        buf = memoryview(xb)

        self.assertEqual(xb.capacity(), 16)
        try:
            buf[:] = finish
            self.fail("Expected exception for writing out of bounds")
        except ValueError as e:
            self.assertEqual(
                str(e),
                "memoryview assignment: lvalue and rvalue have different structures",
            )

    def test_buffer_creation_with_size_read_fail(self) -> None:
        finish = bytearray(b"1234567890123456")
        xb = WritableIOBuf.create_unitialized(16)
        xb.append(16)
        buf = memoryview(xb)

        buf[:] = finish
        try:
            self.assertEqual(bytes(buf[17]), None)
            self.fail("Expected exception for reading out of bounds")
        except IndexError as e:
            self.assertEqual(
                str(e),
                "index out of bounds on dimension 1",
            )

    def test_buffer_creation_with_size_append(self) -> None:
        xb = WritableIOBuf.create_unitialized(16)
        self.assertEqual(xb.length(), 0)
        self.assertEqual(xb.capacity(), 16)
        xb.append(16)
        self.assertEqual(xb.length(), 16)

    def test_buffer_creation_with_size_append_fail(self) -> None:
        xb = WritableIOBuf.create_unitialized(16)
        self.assertEqual(xb.length(), 0)
        self.assertEqual(xb.capacity(), 16)
        try:
            xb.append(17)
            self.fail("Expected exception for appending more than capacity")
        except ValueError as e:
            self.assertEqual(str(e), "Cannot append more than capacity")

    def test_buffer_creation_with_size_append_fail_negative(self) -> None:
        xb = WritableIOBuf.create_unitialized(16)
        self.assertEqual(xb.length(), 0)
        self.assertEqual(xb.capacity(), 16)
        try:
            xb.append(-1)
            self.fail("Expected exception for negative amount")
        except ValueError as e:
            self.assertEqual(str(e), "Cannot append, amount must be positive")

    def test_buffer_creation_with_size_multiple_append(self) -> None:
        xb = WritableIOBuf.create_unitialized(16)
        self.assertEqual(xb.length(), 0)
        self.assertEqual(xb.capacity(), 16)
        xb.append(15)
        xb.append(1)
        self.assertEqual(xb.length(), 16)

    def test_buffer_creation_with_size_multiple_append_fail(self) -> None:
        xb = WritableIOBuf.create_unitialized(16)
        self.assertEqual(xb.length(), 0)
        self.assertEqual(xb.capacity(), 16)
        xb.append(16)
        try:
            xb.append(1)
            self.fail("Expected exception for appending more than capacity")
        except ValueError as e:
            self.assertEqual(str(e), "Cannot append more than capacity")

    def test_buffer_creation_with_size_trim_start(self) -> None:
        xb = WritableIOBuf.create_unitialized(16)
        self.assertEqual(xb.length(), 0)
        self.assertEqual(xb.capacity(), 16)
        xb.append(5)
        self.assertEqual(xb.length(), 5)
        xb.trim_start(5)
        self.assertEqual(xb.length(), 0)

    def test_buffer_creation_with_size_write_trim_start(self) -> None:
        finish = bytearray(b"1234567890123456")
        xb = WritableIOBuf.create_unitialized(16)
        xb.append(16)
        buf = memoryview(xb)

        buf[0:16] = finish
        self.assertEqual(memoryview(xb), finish)
        self.assertEqual(buf, finish)
        self.assertEqual(xb.length(), 16)
        self.assertEqual(bytes(xb), b"1234567890123456")
        xb.trim_start(6)
        self.assertEqual(bytes(xb), b"7890123456")
        self.assertEqual(xb.length(), 10)
        xb.trim_start(6)
        self.assertEqual(bytes(xb), b"3456")
        self.assertEqual(xb.length(), 4)

    def test_buffer_creation_with_size_trim_start_fail(self) -> None:
        xb = WritableIOBuf.create_unitialized(16)
        self.assertEqual(xb.length(), 0)
        self.assertEqual(xb.capacity(), 16)
        xb.append(5)
        self.assertEqual(xb.length(), 5)
        try:
            xb.trim_start(6)
            self.fail("Expected exception for trimming more than length")
        except ValueError as e:
            self.assertEqual(str(e), "Cannot trim more than length")

    def test_buffer_creation_with_size_trim_start_multiple(self) -> None:
        xb = WritableIOBuf.create_unitialized(16)
        self.assertEqual(xb.length(), 0)
        self.assertEqual(xb.capacity(), 16)
        xb.append(5)
        self.assertEqual(xb.length(), 5)
        xb.trim_start(4)
        xb.trim_start(1)
        self.assertEqual(xb.length(), 0)

    def test_buffer_creation_with_size_trim_start_fail_multiple(self) -> None:
        xb = WritableIOBuf.create_unitialized(16)
        self.assertEqual(xb.length(), 0)
        self.assertEqual(xb.capacity(), 16)
        xb.append(5)
        self.assertEqual(xb.length(), 5)
        xb.trim_start(5)
        try:
            xb.trim_start(1)
            self.fail("Expected exception for trimming more than length")
        except ValueError as e:
            self.assertEqual(str(e), "Cannot trim more than length")

    def test_buffer_creation_with_size_trim_start_fail_negative(self) -> None:
        xb = WritableIOBuf.create_unitialized(16)
        self.assertEqual(xb.length(), 0)
        self.assertEqual(xb.capacity(), 16)

        try:
            xb.trim_start(-1)
            self.fail("Expected exception for negative amount")
        except ValueError as e:
            self.assertEqual(str(e), "Cannot trim start, amount must be positive")

    def test_buffer_creation_with_size_trim_end(self) -> None:
        xb = WritableIOBuf.create_unitialized(16)
        self.assertEqual(xb.length(), 0)
        self.assertEqual(xb.capacity(), 16)
        xb.append(5)
        self.assertEqual(xb.length(), 5)
        xb.trim_end(5)
        self.assertEqual(xb.length(), 0)

    def test_buffer_creation_with_size_write_trim_end(self) -> None:
        finish = bytearray(b"1234567890123456")
        xb = WritableIOBuf.create_unitialized(16)
        xb.append(16)
        buf = memoryview(xb)

        buf[0:16] = finish
        self.assertEqual(memoryview(xb), finish)
        self.assertEqual(buf, finish)
        self.assertEqual(xb.length(), 16)
        self.assertEqual(bytes(xb), b"1234567890123456")
        xb.trim_end(6)
        self.assertEqual(bytes(xb), b"1234567890")
        self.assertEqual(xb.length(), 10)
        xb.trim_end(6)
        self.assertEqual(bytes(xb), b"1234")
        self.assertEqual(xb.length(), 4)

    def test_buffer_creation_with_size_trim_end_fail(self) -> None:
        xb = WritableIOBuf.create_unitialized(16)
        self.assertEqual(xb.length(), 0)
        self.assertEqual(xb.capacity(), 16)
        xb.append(5)
        self.assertEqual(xb.length(), 5)
        try:
            xb.trim_end(6)
            self.fail("Expected exception for trimming more than length")
        except ValueError as e:
            self.assertEqual(str(e), "Cannot trim more than length")

    def test_buffer_creation_with_size_trim_end_multiple(self) -> None:
        xb = WritableIOBuf.create_unitialized(16)
        self.assertEqual(xb.length(), 0)
        self.assertEqual(xb.capacity(), 16)
        xb.append(5)
        self.assertEqual(xb.length(), 5)
        xb.trim_end(4)
        xb.trim_end(1)
        self.assertEqual(xb.length(), 0)

    def test_buffer_creation_with_size_trim_end_fail_multiple(self) -> None:
        xb = WritableIOBuf.create_unitialized(16)
        self.assertEqual(xb.length(), 0)
        self.assertEqual(xb.capacity(), 16)
        xb.append(5)
        self.assertEqual(xb.length(), 5)
        xb.trim_end(5)
        try:
            xb.trim_end(1)
            self.fail("Expected exception for trimming more than length")
        except ValueError as e:
            self.assertEqual(str(e), "Cannot trim more than length")

    def test_buffer_creation_with_size_trim_end_fail_negative(self) -> None:
        xb = WritableIOBuf.create_unitialized(16)
        self.assertEqual(xb.length(), 0)
        self.assertEqual(xb.capacity(), 16)

        try:
            xb.trim_end(-1)
            self.fail("Expected exception for negative amount")
        except ValueError as e:
            self.assertEqual(str(e), "Cannot trim end, amount must be positive")

    def test_buffer_creation_with_size_prepend(self) -> None:
        xb = WritableIOBuf.create_unitialized(16)
        self.assertEqual(xb.length(), 0)
        self.assertEqual(xb.capacity(), 16)
        xb.append(5)
        self.assertEqual(xb.length(), 5)
        xb.trim_start(5)
        self.assertEqual(xb.length(), 0)
        xb.prepend(5)
        self.assertEqual(xb.length(), 5)

    def test_buffer_creation_with_size_write_prepend(self) -> None:
        finish = bytearray(b"1234567890123456")
        xb = WritableIOBuf.create_unitialized(16)
        xb.append(16)
        buf = memoryview(xb)

        buf[0:16] = finish
        self.assertEqual(memoryview(xb), finish)
        self.assertEqual(buf, finish)
        self.assertEqual(xb.length(), 16)
        self.assertEqual(bytes(xb), b"1234567890123456")
        xb.trim_start(6)
        self.assertEqual(bytes(xb), b"7890123456")
        self.assertEqual(xb.length(), 10)
        xb.prepend(6)
        self.assertEqual(bytes(xb), b"1234567890123456")
        self.assertEqual(xb.length(), 16)

    def test_buffer_creation_with_size_prepend_fail(self) -> None:
        xb = WritableIOBuf.create_unitialized(16)
        self.assertEqual(xb.length(), 0)
        self.assertEqual(xb.capacity(), 16)
        xb.append(5)
        self.assertEqual(xb.length(), 5)
        xb.trim_start(5)
        self.assertEqual(xb.length(), 0)
        try:
            xb.prepend(6)
            self.fail("Expected exception for prepending more than headroom")
        except ValueError as e:
            self.assertEqual(str(e), "Cannot prepend more than headroom")

    def test_buffer_creation_with_size_prepend_fail_multiple(self) -> None:
        xb = WritableIOBuf.create_unitialized(16)
        self.assertEqual(xb.length(), 0)
        self.assertEqual(xb.capacity(), 16)
        xb.append(5)
        self.assertEqual(xb.length(), 5)
        xb.trim_start(5)
        self.assertEqual(xb.length(), 0)
        xb.prepend(5)
        try:
            xb.prepend(1)
            self.fail("Expected exception for prepending more than headroom")
        except ValueError as e:
            self.assertEqual(str(e), "Cannot prepend more than headroom")

    def test_buffer_creation_with_size_prepend_fail_negative(self) -> None:
        xb = WritableIOBuf.create_unitialized(16)
        self.assertEqual(xb.length(), 0)
        self.assertEqual(xb.capacity(), 16)

        try:
            xb.prepend(-1)
            self.fail("Expected exception for negative amount")
        except ValueError as e:
            self.assertEqual(str(e), "Cannot prepend, amount must be positive")