chromium/build/android/pylib/symbols/expensive_line_transformer.py

# Copyright 2023 The Chromium Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

from abc import ABC, abstractmethod
import logging
import subprocess
import threading
import time
import uuid

from devil.utils import reraiser_thread


class ExpensiveLineTransformer(ABC):
  def __init__(self, process_start_timeout, minimum_timeout, per_line_timeout):
    self._process_start_timeout = process_start_timeout
    self._minimum_timeout = minimum_timeout
    self._per_line_timeout = per_line_timeout
    self._started = False
    # Allow only one thread to call TransformLines() at a time.
    self._lock = threading.Lock()
    # Ensure that only one thread attempts to kill self._proc in Close().
    self._close_lock = threading.Lock()
    self._closed_called = False
    # Assign to None so that attribute exists if Popen() throws.
    self._proc = None
    # Start process eagerly to hide start-up latency.
    self._proc_start_time = None

  def start(self):
    # delay the start of the process, to allow the initialization of the
    # descendant classes first.
    if self._started:
      logging.error('%s: Trying to start an already started command', self.name)
      return

    # Start process eagerly to hide start-up latency.
    self._proc_start_time = time.time()

    if not self.command:
      logging.error('%s: No command available', self.name)
      return

    self._proc = subprocess.Popen(self.command,
                                  bufsize=1,
                                  stdin=subprocess.PIPE,
                                  stdout=subprocess.PIPE,
                                  universal_newlines=True,
                                  close_fds=True)
    self._started = True

  def IsClosed(self):
    return (not self._started or self._closed_called
            or self._proc.returncode is not None)

  def IsBusy(self):
    return self._lock.locked()

  def IsReady(self):
    return self._started and not self.IsClosed() and not self.IsBusy()

  def TransformLines(self, lines):
    """Symbolizes names found in the given lines.

    If anything goes wrong (process crashes, timeout, etc), returns |lines|.

    Args:
      lines: A list of strings without trailing newlines.

    Returns:
      A list of strings without trailing newlines.
    """
    if not lines:
      return []

    # symbolized output contain more lines than the input, as the symbolized
    # stacktraces will be added. To account for the extra output lines, keep
    # reading until this eof_line token is reached. Using a format that will
    # be considered a "useful line" without modifying its output by
    # third_party/android_platform/development/scripts/stack_core.py
    eof_line = self.getEofLine()
    out_lines = []

    def _reader():
      while True:
        line = self._proc.stdout.readline()
        # Return an empty string at EOF (when stdin is closed).
        if not line:
          break
        line = line[:-1]
        if line == eof_line:
          break
        out_lines.append(line)

    if self.IsBusy():
      logging.warning('%s: Having to wait for transformation.', self.name)

    # Allow only one thread to operate at a time.
    with self._lock:
      if self.IsClosed():
        if self._started and not self._closed_called:
          logging.warning('%s: Process exited with code=%d.', self.name,
                          self._proc.returncode)
          self.Close()
        return lines

      reader_thread = reraiser_thread.ReraiserThread(_reader)
      reader_thread.start()

      try:
        self._proc.stdin.write('\n'.join(lines))
        self._proc.stdin.write('\n{}\n'.format(eof_line))
        self._proc.stdin.flush()
        time_since_proc_start = time.time() - self._proc_start_time
        timeout = (max(0, self._process_start_timeout - time_since_proc_start) +
                   max(self._minimum_timeout,
                       len(lines) * self._per_line_timeout))
        reader_thread.join(timeout)
        if self.IsClosed():
          logging.warning('%s: Close() called by another thread during join().',
                          self.name)
          return lines
        if reader_thread.is_alive():
          logging.error('%s: Timed out after %f seconds with input:', self.name,
                        timeout)
          for l in lines:
            logging.error(l)
          logging.error(eof_line)
          logging.error('%s: End of timed out input.', self.name)
          logging.error('%s: Timed out output was:', self.name)
          for l in out_lines:
            logging.error(l)
          logging.error('%s: End of timed out output.', self.name)
          self.Close()
          return lines
        return out_lines
      except IOError:
        logging.exception('%s: Exception during transformation', self.name)
        self.Close()
        return lines

  def Close(self):
    with self._close_lock:
      needs_closing = not self.IsClosed()
      self._closed_called = True

    if needs_closing:
      self._proc.stdin.close()
      self._proc.kill()
      self._proc.wait()

  def __del__(self):
    # self._proc is None when Popen() fails.
    if not self._closed_called and self._proc:
      logging.error('%s: Forgot to Close()', self.name)
      self.Close()

  @property
  @abstractmethod
  def name(self):
    ...

  @property
  @abstractmethod
  def command(self):
    ...

  @staticmethod
  def getEofLine():
    # Use a format that will be considered a "useful line" without modifying its
    # output by third_party/android_platform/development/scripts/stack_core.py
    return "Generic useful log header: \'{}\'".format(uuid.uuid4().hex)


class ExpensiveLineTransformerPool(ABC):
  def __init__(self, max_restarts, pool_size, passthrough_on_failure):
    self._max_restarts = max_restarts
    self._pool = [self.CreateTransformer() for _ in range(pool_size)]
    self._passthrough_on_failure = passthrough_on_failure
    # Allow only one thread to select from the pool at a time.
    self._lock = threading.Lock()
    self._num_restarts = 0

  def __enter__(self):
    pass

  def __exit__(self, *args):
    self.Close()

  def TransformLines(self, lines):
    with self._lock:
      assert self._pool, 'TransformLines() called on a closed Pool.'

      # transformation is broken.
      if self._num_restarts == self._max_restarts:
        if self._passthrough_on_failure:
          return lines
        raise Exception('%s is broken.' % self.name)

      # Restart any closed transformer.
      for i, d in enumerate(self._pool):
        if d.IsClosed():
          logging.warning('%s: Restarting closed instance.', self.name)
          self._pool[i] = self.CreateTransformer()
          self._num_restarts += 1
          if self._num_restarts == self._max_restarts:
            logging.warning('%s: MAX_RESTARTS reached.', self.name)
            if self._passthrough_on_failure:
              return lines
            raise Exception('%s is broken.' % self.name)

      selected = next((x for x in self._pool if x.IsReady()), self._pool[0])
      # Rotate the order so that next caller will not choose the same one.
      self._pool.remove(selected)
      self._pool.append(selected)

    return selected.TransformLines(lines)

  def Close(self):
    with self._lock:
      for d in self._pool:
        d.Close()
      self._pool = None

  @abstractmethod
  def CreateTransformer(self):
    ...

  @property
  @abstractmethod
  def name(self):
    ...