folly/folly/support/gdb.py

#!/usr/bin/env python3
# Copyright (c) Meta Platforms, Inc. and affiliates.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


import itertools
import socket

import gdb
import gdb.printing
import gdb.types


def escape_byte(b):
    if b == 10:
        return "\\n"
    elif b == 34:
        return '\\"'
    elif b == 92:
        return "\\\\"
    elif 32 <= b < 127:
        return chr(b)
    else:
        # Escape non-printable bytes with octal, which is what GDB
        # uses when natively printing strings.
        return "\\{0:03o}".format(b)


def repr_string(v, length):
    # GDB API does not support retrieving the value as a byte string,
    # so we need to round-trip through str (unicode) using surrogate
    # escapes for non-decodable sequences.
    decoded = v.string(encoding="utf8", errors="surrogateescape", length=length)
    byte_string = decoded.encode("utf8", errors="surrogateescape")
    return '"' + "".join(escape_byte(b) for b in byte_string) + '"'


class FBStringPrinter:
    """Print an FBString."""

    def __init__(self, val):
        self.val = val

    def to_string(self):
        size_of_size_t = gdb.lookup_type("size_t").sizeof
        catmask = 0xC0000000 if (size_of_size_t == 4) else 0xC000000000000000
        cap = self.val["store_"]["ml_"]["capacity_"]
        category = cap & catmask

        if category == 0:  # small-string-optimized
            v = self.val["store_"]["small_"]
            sz = v.type.sizeof
            length = sz - v[sz - 1]
            return repr_string(v, length - 1)
        else:
            v = self.val["store_"]["ml_"]
            length = v["size_"]
            return repr_string(v["data_"], length)

    def display_hint(self):
        return "fbstring"


class StringPiecePrinter:
    """Print a (Mutable)StringPiece"""

    def __init__(self, val):
        self.val = val

    def to_string(self):
        ptr = self.val["b_"]
        length = self.val["e_"] - ptr
        return repr_string(ptr, length)

    def display_hint(self):
        return "folly::StringPiece"


class RangePrinter:
    """Print a Range"""

    def __init__(self, val):
        self.val = val

    def children(self):
        count = 0
        item = self.val["b_"]
        end = self.val["e_"]
        while item != end:
            yield "[%d]" % count, item.dereference()
            count += 1
            item += 1

    def to_string(self):
        length = self.val["e_"] - self.val["b_"]
        value_type = self.val.type.template_argument(0)
        return "folly::Range<%s> of length %d" % (value_type, length)

    def display_hint(self):
        return "folly::Range"


class DynamicPrinter:
    """Print a folly::dynamic."""

    def __init__(self, val):
        self.val = val

    def to_string(self):
        d = gdb.types.make_enum_dict(self.val["type_"].type)
        names = {v: k for k, v in d.items()}
        type_ = names[int(self.val["type_"])]

        u = self.val["u_"]
        if type_ == "folly::dynamic::NULLT":
            return "NULL"
        elif type_ == "folly::dynamic::ARRAY":
            return u["array"]
        elif type_ == "folly::dynamic::BOOL":
            return u["boolean"]
        elif type_ == "folly::dynamic::DOUBLE":
            return u["doubl"]
        elif type_ == "folly::dynamic::INT64":
            return u["integer"]
        elif type_ == "folly::dynamic::STRING":
            return u["string"]
        elif type_ == "folly::dynamic::OBJECT":
            t = gdb.lookup_type("folly::dynamic::ObjectImpl").pointer()
            raw_v = u["objectBuffer"]["__data"]
            ptr = raw_v.address.reinterpret_cast(t)
            return ptr.dereference()
        else:
            return "{unknown folly::dynamic type}"

    def display_hint(self):
        return "folly::dynamic"


class IPAddressPrinter:
    """Print a folly::IPAddress."""

    def __init__(self, val):
        self.val = val

    def to_string(self):
        result = ""
        if self.val["family_"] == socket.AF_INET:
            addr = self.val["addr_"]["ipV4Addr"]["addr_"]["bytes_"]["_M_elems"]
            for i in range(0, 4):
                result += "{:d}.".format(int(addr[i]))
        elif self.val["family_"] == socket.AF_INET6:
            addr = self.val["addr_"]["ipV6Addr"]["addr_"]["bytes_"]["_M_elems"]
            for i in range(0, 8):
                result += "{:02x}{:02x}:".format(int(addr[2 * i]), int(addr[2 * i + 1]))
        else:
            return "unknown address family {}".format(self.val["family_"])
        return result[:-1]

    def display_hint(self):
        return "folly::IPAddress"


class SocketAddressPrinter:
    """Print a folly::SocketAddress."""

    def __init__(self, val):
        self.val = val

    def to_string(self):
        result = ""
        if self.val["external_"] != 0:
            return "unix address, printer TBD"
        else:
            ipPrinter = IPAddressPrinter(self.val["storage_"]["addr"])
            if self.val["storage_"]["addr"]["family_"] == socket.AF_INET6:
                result += "[" + ipPrinter.to_string() + "]"
            else:
                result += ipPrinter.to_string()
            result += ":{}".format(self.val["port_"])
        return result

    def display_hint(self):
        return "folly::SocketAddress"


# For Node and Value containers, i.e., kEnableItemIteration == false
class F14HashtableIterator:
    def __init__(self, ht, is_node_container):
        item_type = gdb.lookup_type("{}::{}".format(ht.type.name, "Item"))
        self.item_ptr_type = item_type.pointer()
        self.chunk_ptr = ht["chunks_"]
        pair = ht["sizeAndChunkShiftAndPackedBegin_"]["sizeAndChunkShift_"]
        size_of_size_t = gdb.lookup_type("size_t").sizeof
        if size_of_size_t == 4:
            chunk_count = 1 << pair["chunkShift_"]
        else:
            chunk_count = 1 << (pair["packedSizeAndChunkShift_"] & ((1 << 8) - 1))
        self.chunk_end = self.chunk_ptr + chunk_count
        self.current_chunk = self.chunk_iter(self.chunk_ptr)
        self.is_node_container = is_node_container

    def __iter__(self):
        return self

    # generator to enumerate items in a given chunk
    def chunk_iter(self, chunk_ptr):
        chunk = chunk_ptr.dereference()
        tags = chunk["tags_"]["_M_elems"]
        raw_items = chunk["rawItems_"]["_M_elems"]

        # Enumerate over slots in the chunk
        for i in range(tags.type.sizeof):
            # full items have the top bit set in the tag
            if tags[i] & 0x80:
                item_ptr = raw_items[i]["__data"].cast(self.item_ptr_type)
                item = item_ptr.dereference()
                # node containers stores a pointer to value_type whereas value
                # containers store values inline
                yield item.dereference() if self.is_node_container else item

    def __next__(self):
        while True:
            try:
                # exhaust the current chunk
                return next(self.current_chunk)
            except StopIteration:
                # find the next chunk
                self.chunk_ptr += 1
                if self.chunk_ptr == self.chunk_end:
                    raise StopIteration
                self.current_chunk = self.chunk_iter(self.chunk_ptr)
                pass


# For Vector containers, i.e., kEnableItemIteration == true
class F14HashtableItemIterator:
    def __init__(self, start, size):
        self.item = start
        self.end = self.item + size

    def __iter__(self):
        return self

    def __next__(self):
        # To iterator the first item, gdb will call __next__
        if self.item == self.end:
            raise StopIteration
        val = self.item.dereference()
        self.item = self.item + 1
        return val


class F14Printer:
    """Print an F14 hash map or hash set"""

    @staticmethod
    def get_container_type_name(type):
        name = type.unqualified().strip_typedefs().name
        # strip template arguments
        template_start = name.find("<")
        if template_start < 0:
            return name
        return name[:template_start]

    def __init__(self, val):
        self.val = val
        self.type = val.type
        self.short_typename = self.get_container_type_name(self.type)
        self.is_map = "Map" in self.short_typename

        iter_type = gdb.lookup_type(
            f"{self.type.unqualified().strip_typedefs()}::iterator"
        )
        self.is_node_container = "NodeContainer" in iter_type.name
        self.enable_item_iteration = "VectorContainer" in iter_type.name

    def hashtable(self):
        return self.val["table_"]

    def size(self):
        pair = self.hashtable()["sizeAndChunkShiftAndPackedBegin_"][
            "sizeAndChunkShift_"
        ]
        size_of_size_t = gdb.lookup_type("size_t").sizeof
        if size_of_size_t == 4:
            return pair["size_"]
        else:
            return pair["packedSizeAndChunkShift_"] >> 8

    def to_string(self):
        return "%s with %d elements" % (
            self.short_typename,
            self.size(),
        )

    @staticmethod
    def format_one_map(elt):
        return (elt["first"], elt["second"])

    @staticmethod
    def format_count(i):
        return "[%d]" % i

    @staticmethod
    def flatten(list):
        for elt in list:
            for i in elt:
                yield i

    def children(self):
        counter = map(self.format_count, itertools.count())
        iter = (
            F14HashtableItemIterator(self.hashtable()["values_"], self.size())
            if self.enable_item_iteration
            else F14HashtableIterator(self.hashtable(), self.is_node_container)
        )
        data = self.flatten(map(self.format_one_map, iter)) if self.is_map else iter
        return zip(counter, data)

    def display_hint(self):
        return "map" if self.is_map else "array"


# Iterator for folly::small_vector
class SmallVectorIterator:
    def __init__(self, data, size):
        self.iter = F14HashtableItemIterator(data, size)
        self.count = 0

    def __iter__(self):
        return self

    def __next__(self):
        count = self.count
        self.count += 1
        return ("[%d]" % count, self.iter.__next__())


class SmallVectorPrinter:
    """Printer for folly::small_vector"""

    def __init__(self, val):
        self.val = val
        size = self.val["size_"]
        self.is_inline = (size >> (size.type.sizeof * 8 - 1)) == 0

    def children(self):
        heapPtr = self.val["u"]["pdata_"]["heap_"]
        inlinePtr = self.val["u"]["storage_"]["__data"].address
        ptr = inlinePtr if self.is_inline else heapPtr

        value_type = self.val.type.template_argument(0)
        typedPtr = ptr.reinterpret_cast(value_type.pointer())
        return SmallVectorIterator(typedPtr, self.size())

    def capacity(self):
        inlineCap = self.val.type.template_argument(1)
        outCap = self.val["u"]["pdata_"]["capacity_"]
        return inlineCap if self.is_inline else outCap

    def size(self):
        size = self.val["size_"]
        # Clear off potential high bit flags.
        if size.type.sizeof == 8:
            return size & 0x3FFF_FFFF_FFFF_FFFF
        else:
            return size & 0x3FFF_FFFF

    def to_string(self):
        return "folly::small_vector of length %d, capacity %d" % (
            self.size(),
            self.capacity(),
        )

    def display_hint(self):
        return "array"


class ConcurrentHashMapIterator:
    """Iterator for folly's ConcurrentHashMap"""

    def __init__(self, hashmap):
        self._segments = hashmap["segments_"]
        # Iterator state across segments
        self.segment_idx = 0
        self.segment = None
        self.num_segments = int(hashmap["NumShards"])
        # Iterator state across buckets within a segment
        self.bucket_idx = -1
        self.segment_buckets = None
        self.segment_num_buckets = None

    def __iter__(self):
        return self

    def __next__(self):
        if self.segment_idx >= self.num_segments:
            raise StopIteration
        # Update cached segment
        if self.segment is None:
            self.bucket_idx = -1
            _segment_ptr = self._segments[self.segment_idx]["_M_b"]["_M_p"]
            # Check if the segment has been initialized or setp over it
            if int(_segment_ptr) == 0:
                self.segment_idx += 1
                return self.__next__()
            self.segment = _segment_ptr.dereference()["impl_"]
            _buckets = self.segment["buckets_"]["_M_b"]["_M_p"]
            # Can this even happen?
            if int(_buckets) == 0:
                self.segment_idx += 1
                return self.__next__()
            self.segment_buckets = _buckets.dereference()["buckets_"]
            self.segment_num_buckets = int(self.segment["bucket_count_"]["_M_i"])

        self.bucket_idx += 1
        # Check if we're done with this segment
        if self.bucket_idx >= self.segment_num_buckets:
            self.segment_idx += 1
            self.segment = None
            return self.__next__()

        # Skip null items
        ptr = self.segment_buckets[self.bucket_idx]["link_"]["_M_b"]["_M_p"]
        return self.__next__() if int(ptr) == 0 else ptr["item_"]["item_"]


class ConcurrentHashMapPrinter:
    """Print a ConcurrentHashMap"""

    def __init__(self, val):
        self.val = val
        self.key_type = val.type.template_argument(0)
        self.val_type = val.type.template_argument(1)

    def to_string(self):
        # DEBUG: consume iterator to get map size and print it
        # mapsize = len(list(self.children()))
        return f"folly::ConcurrentHashMap<{self.key_type}, {self.val_type}, ...>"

    def children(self):
        "Returns an iterator of tuple(name, value)"
        return (
            (f'[{str(item["kv_"]["first"])}]', item["kv_"]["second"])
            for idx, item in enumerate(ConcurrentHashMapIterator(self.val))
        )

    def display_hint(self):
        return "folly::ConcurrentHashMap"


def build_pretty_printer():
    pp = gdb.printing.RegexpCollectionPrettyPrinter("folly")
    pp.add_printer("fbstring", "^folly::basic_fbstring<char,.*$", FBStringPrinter)
    pp.add_printer(
        "StringPiece", r"^folly::Range<(\w+ )*char( \w+)*\*>$", StringPiecePrinter
    )
    pp.add_printer("Range", r"^folly::Range<.*", RangePrinter)
    pp.add_printer("dynamic", "^folly::dynamic$", DynamicPrinter)
    pp.add_printer("IPAddress", "^folly::IPAddress$", IPAddressPrinter)
    pp.add_printer("SocketAddress", "^folly::SocketAddress$", SocketAddressPrinter)

    pp.add_printer("F14NodeMap", "^folly::F14NodeMap<.*$", F14Printer)
    pp.add_printer("F14ValueMap", "^folly::F14ValueMap<.*$", F14Printer)
    pp.add_printer("F14VectorMap", "^folly::F14VectorMap<.*$", F14Printer)
    pp.add_printer("F14FastMap", "^folly::F14FastMap<.*$", F14Printer)

    pp.add_printer("F14NodeSet", "^folly::F14NodeSet<.*$", F14Printer)
    pp.add_printer("F14ValueSet", "^folly::F14ValueSet<.*$", F14Printer)
    pp.add_printer("F14VectorSet", "^folly::F14VectorSet<.*$", F14Printer)
    pp.add_printer("F14FastSet", "^folly::F14FastSet<.*$", F14Printer)

    pp.add_printer(
        "ConcurrentHashMap", "^folly::ConcurrentHashMap<.*$", ConcurrentHashMapPrinter
    )

    pp.add_printer("small_vector", "^folly::small_vector<.*$", SmallVectorPrinter)
    return pp


def load():
    gdb.printing.register_pretty_printer(gdb, build_pretty_printer())


def info():
    return "Pretty printers for folly containers"