cpython/Tools/c-analyzer/c_common/tables.py

from collections import namedtuple
import csv
import re
import textwrap

from . import NOT_SET, strutil, fsutil


EMPTY = '-'
UNKNOWN = '???'


def parse_markers(markers, default=None):
    if markers is NOT_SET:
        return default
    if not markers:
        return None
    if type(markers) is not str:
        return markers
    if markers == markers[0] * len(markers):
        return [markers]
    return list(markers)


def fix_row(row, **markers):
    if isinstance(row, str):
        raise NotImplementedError(row)
    empty = parse_markers(markers.pop('empty', ('-',)))
    unknown = parse_markers(markers.pop('unknown', ('???',)))
    row = (val if val else None for val in row)
    if not empty:
        if unknown:
            row = (UNKNOWN if val in unknown else val for val in row)
    elif not unknown:
        row = (EMPTY if val in empty else val for val in row)
    else:
        row = (EMPTY if val in empty else (UNKNOWN if val in unknown else val)
               for val in row)
    return tuple(row)


def _fix_read_default(row):
    for value in row:
        yield value.strip()


def _fix_write_default(row, empty=''):
    for value in row:
        yield empty if value is None else str(value)


def _normalize_fix_read(fix):
    if fix is None:
        fix = ''
    if callable(fix):
        def fix_row(row):
            values = fix(row)
            return _fix_read_default(values)
    elif isinstance(fix, str):
        def fix_row(row):
            values = _fix_read_default(row)
            return (None if v == fix else v
                    for v in values)
    else:
        raise NotImplementedError(fix)
    return fix_row


def _normalize_fix_write(fix, empty=''):
    if fix is None:
        fix = empty
    if callable(fix):
        def fix_row(row):
            values = fix(row)
            return _fix_write_default(values, empty)
    elif isinstance(fix, str):
        def fix_row(row):
            return _fix_write_default(row, fix)
    else:
        raise NotImplementedError(fix)
    return fix_row


def read_table(infile, header, *,
               sep='\t',
               fix=None,
               _open=open,
               _get_reader=csv.reader,
               ):
    """Yield each row of the given ???-separated (e.g. tab) file."""
    if isinstance(infile, str):
        with _open(infile, newline='') as infile:
            yield from read_table(
                infile,
                header,
                sep=sep,
                fix=fix,
                _open=_open,
                _get_reader=_get_reader,
            )
            return
    lines = strutil._iter_significant_lines(infile)

    # Validate the header.
    if not isinstance(header, str):
        header = sep.join(header)
    try:
        actualheader = next(lines).strip()
    except StopIteration:
        actualheader = ''
    if actualheader != header:
        raise ValueError(f'bad header {actualheader!r}')

    fix_row = _normalize_fix_read(fix)
    for row in _get_reader(lines, delimiter=sep or '\t'):
        yield tuple(fix_row(row))


def write_table(outfile, header, rows, *,
                sep='\t',
                fix=None,
                backup=True,
                _open=open,
                _get_writer=csv.writer,
                ):
    """Write each of the rows to the given ???-separated (e.g. tab) file."""
    if backup:
        fsutil.create_backup(outfile, backup)
    if isinstance(outfile, str):
        with _open(outfile, 'w', newline='') as outfile:
            return write_table(
                outfile,
                header,
                rows,
                sep=sep,
                fix=fix,
                backup=backup,
                _open=_open,
                _get_writer=_get_writer,
            )

    if isinstance(header, str):
        header = header.split(sep or '\t')
    fix_row = _normalize_fix_write(fix)
    writer = _get_writer(outfile, delimiter=sep or '\t')
    writer.writerow(header)
    for row in rows:
        writer.writerow(
            tuple(fix_row(row))
        )


def parse_table(entries, sep, header=None, rawsep=None, *,
                default=NOT_SET,
                strict=True,
                ):
    header, sep = _normalize_table_file_props(header, sep)
    if not sep:
        raise ValueError('missing "sep"')

    ncols = None
    if header:
        if strict:
            ncols = len(header.split(sep))
        cur_file = None
    for line, filename in strutil.parse_entries(entries, ignoresep=sep):
        _sep = sep
        if filename:
            if header and cur_file != filename:
                cur_file = filename
                # Skip the first line if it's the header.
                if line.strip() == header:
                    continue
                else:
                    # We expected the header.
                    raise NotImplementedError((header, line))
        elif rawsep and sep not in line:
            _sep = rawsep

        row = _parse_row(line, _sep, ncols, default)
        if strict and not ncols:
            ncols = len(row)
        yield row, filename


def parse_row(line, sep, *, ncols=None, default=NOT_SET):
    if not sep:
        raise ValueError('missing "sep"')
    return _parse_row(line, sep, ncols, default)


def _parse_row(line, sep, ncols, default):
    row = tuple(v.strip() for v in line.split(sep))
    if (ncols or 0) > 0:
        diff = ncols - len(row)
        if diff:
            if default is NOT_SET or diff < 0:
                raise Exception(f'bad row (expected {ncols} columns, got {row!r})')
            row += (default,) * diff
    return row


def _normalize_table_file_props(header, sep):
    if not header:
        return None, sep

    if not isinstance(header, str):
        if not sep:
            raise NotImplementedError(header)
        header = sep.join(header)
    elif not sep:
        for sep in ('\t', ',', ' '):
            if sep in header:
                break
        else:
            sep = None
    return header, sep


##################################
# stdout tables

WIDTH = 20


def resolve_columns(specs):
    if isinstance(specs, str):
        specs = specs.replace(',', ' ').strip().split()
    resolved = []
    for raw in specs:
        column = ColumnSpec.from_raw(raw)
        resolved.append(column)
    return resolved


def build_table(specs, *, sep=' ', defaultwidth=None):
    columns = resolve_columns(specs)
    return _build_table(columns, sep=sep, defaultwidth=defaultwidth)


class ColumnSpec(namedtuple('ColumnSpec', 'field label fmt')):

    REGEX = re.compile(textwrap.dedent(r'''
        ^
        (?:
            \[
            (
                (?: [^\s\]] [^\]]* )?
                [^\s\]]
            )  # <label>
            ]
        )?
        ( [-\w]+ )  # <field>
        (?:
            (?:
                :
                ( [<^>] )  # <align>
                ( \d+ )?  # <width1>
            )
            |
            (?:
                (?:
                    :
                    ( \d+ )  # <width2>
                )?
                (?:
                    :
                    ( .*? )  # <fmt>
                )?
            )
        )?
        $
    '''), re.VERBOSE)

    @classmethod
    def from_raw(cls, raw):
        if not raw:
            raise ValueError('missing column spec')
        elif isinstance(raw, cls):
            return raw

        if isinstance(raw, str):
            *values, _ = cls._parse(raw)
        else:
            *values, _ = cls._normalize(raw)
        if values is None:
            raise ValueError(f'unsupported column spec {raw!r}')
        return cls(*values)

    @classmethod
    def parse(cls, specstr):
        parsed = cls._parse(specstr)
        if not parsed:
            return None
        *values, _ = parsed
        return cls(*values)

    @classmethod
    def _parse(cls, specstr):
        m = cls.REGEX.match(specstr)
        if not m:
            return None
        (label, field,
         align, width1,
         width2, fmt,
         ) = m.groups()
        if not label:
            label = field
        if fmt:
            assert not align and not width1, (specstr,)
            _parsed = _parse_fmt(fmt)
            if not _parsed:
                raise NotImplementedError
            elif width2:
                width, _ = _parsed
                if width != int(width2):
                    raise NotImplementedError(specstr)
        elif width2:
            fmt = width2
            width = int(width2)
        else:
            assert not fmt, (fmt, specstr)
            if align:
                width = int(width1) if width1 else len(label)
                fmt = f'{align}{width}'
            else:
                width = None
        return field, label, fmt, width

    @classmethod
    def _normalize(cls, spec):
        if len(spec) == 1:
            raw, = spec
            raise NotImplementedError
            return _resolve_column(raw)

        if len(spec) == 4:
            label, field, width, fmt = spec
            if width:
                if not fmt:
                    fmt = str(width)
                elif _parse_fmt(fmt)[0] != width:
                    raise ValueError(f'width mismatch in {spec}')
        elif len(raw) == 3:
            label, field, fmt = spec
            if not field:
                label, field = None, label
            elif not isinstance(field, str) or not field.isidentifier():
                # XXX This doesn't seem right...
                fmt = f'{field}:{fmt}' if fmt else field
                label, field = None, label
        elif len(raw) == 2:
            label = None
            field, fmt = raw
            if not field:
                field, fmt = fmt, None
            elif not field.isidentifier() or fmt.isidentifier():
                label, field = field, fmt
        else:
            raise NotImplementedError

        fmt = f':{fmt}' if fmt else ''
        if label:
            return cls._parse(f'[{label}]{field}{fmt}')
        else:
            return cls._parse(f'{field}{fmt}')

    @property
    def width(self):
        if not self.fmt:
            return None
        parsed = _parse_fmt(self.fmt)
        if not parsed:
            return None
        width, _ = parsed
        return width

    def resolve_width(self, default=None):
        return _resolve_width(self.width, self.fmt, self.label, default)


def _parse_fmt(fmt):
    if fmt.startswith(tuple('<^>')):
        align = fmt[0]
        width = fmt[1:]
        if width.isdigit():
            return int(width), align
    elif fmt.isdigit():
        return int(fmt), '<'
    return None


def _resolve_width(width, fmt, label, default):
    if width:
        if not isinstance(width, int):
            raise NotImplementedError
        return width
    elif fmt:
        parsed = _parse_fmt(fmt)
        if parsed:
            width, _ = parsed
            if width:
                return width

    if not default:
        return WIDTH
    elif hasattr(default, 'get'):
        defaults = default
        default = defaults.get(None) or WIDTH
        return defaults.get(label) or default
    else:
        return default or WIDTH


def _build_table(columns, *, sep=' ', defaultwidth=None):
    header = []
    div = []
    rowfmt = []
    for spec in columns:
        width = spec.resolve_width(defaultwidth)
        colfmt = spec.fmt
        colfmt = f':{spec.fmt}' if spec.fmt else f':{width}'

        header.append(f' {{:^{width}}} '.format(spec.label))
        div.append('-' * (width + 2))
        rowfmt.append(f' {{{spec.field}{colfmt}}} ')
    return (
        sep.join(header),
        sep.join(div),
        sep.join(rowfmt),
    )