chromium/chrome/updater/test/service/win/rpc_handler.py

# Copyright 2021 The Chromium Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

import logging
import os
import subprocess
import sys

import impersonate
import proc_util
import uac


class UpdaterTestRPCHandler():
    def echo(self, message):
        """Test method to check if server is reachable."""
        return message

    def RunAsSystem(self, command, env=None, cwd=None, timeout=30):
        """Runs the command as SYSTEM user.

      Args:
          command: The command to run. This argument will be forwarded to
            subprocess.Popen().
          env: Environment variables to pass to command.
          cwd: Working directory for the command.
          timeout: How long the child process should wait before timeout.

      Returns:
          (pid, exit_code, stdout, stderr) tuple.
      """
        try:
            process = subprocess.Popen(command,
                                       stdout=subprocess.PIPE,
                                       stderr=subprocess.PIPE,
                                       env=env,
                                       cwd=cwd)

            stdout, stderr = process.communicate(timeout)
            logging.info('Command %s stdout:\n %s', command, stdout)
            if stderr:
                logging.error('Command %s stderr:\n %s', command, stderr)

            return (process.pid, process.returncode, stdout, stderr)
        except (OSError, subprocess.TimeoutExpired) as err:
            logging.exception(err)
            return (None, None, None, None)

    def RunAsStandardUser(self, command_line, env=None, cwd=None, timeout=30):
        """Runs the command as the non-elevated logon user on default desktop.

      Args:
          command_line: The command line string, includes all arguments.
          env: Environment variables to pass to command.
          cwd: Working directory for the command.
          timeout: How long the child process should wait before timeout.

      Returns:
          (pid, exit_code, stdout, stderr) tuple.
      """
        return impersonate.RunAsStandardUser(command_line, env, cwd, timeout)

    def AnswerUpcomingUACPrompt(self,
                                actions,
                                timeout=10,
                                wait_child=False,
                                source=''):
        """Answers upcoming UAC prompt that does not require username/password.

        Args:
            actions: Actions to take in string, such as 'AADDA', 'A' to accept,
                     'D' to deny.
            timeout: How long the child process should wait for each UAC click.
            wait_child: Whether this thread should wait the completion of child
                        proc.
            source: Optional name of the source that triggers this action
                    (for logging and debugging purposes).

        Returns:
            (pid, exit_code) of the created UAC-answering process.
            If the sub-process is not created, or did not finish in wait time,
            returns (None, None).
        """
        uac_tool = os.path.join(os.path.dirname(__file__), 'answer_uac.py')
        command = ('python %s --actions=%s --timeout=%d --source=%s' %
                   (uac_tool, actions, timeout, source))
        logging.info('Running command: %s', command)

        if wait_child:
            if timeout > 0:
                # Each button click could take `timeout` seconds, and add
                # 1 second extra for child process to finish.
                timeout = timeout * len(actions) + 1
            else:
                # Negative timeout has special meanings, such as
                # win32event.INFINITE.
                # Don't touch it.
                pass
        else:
            timeout = 0  # no wait

        # There could be multiple winlogon.exe instances when there are multiple
        # login sessions. For example, when there's remote desktop session.
        # In this case, find the active session where the UAC prompt is supposed
        # to display.
        winlogon_pids = proc_util.GetPIDsWithName(
            'winlogon.exe', proc_util.GetActiveSessionID())
        if not winlogon_pids:
            logging.error(
                'Unexpected: no active session or no winlogon.exe in it.')
            return (None, None)
        elif len(winlogon_pids) > 1:
            logging.warning(
                'Unexpected multiple winlogon.exe instances within '
                'active session, the first instance will be used.')

        # Must spawn child process on the same desktop as the one that UAC
        # prompts, otherwise the child process will not be able to find the UAC
        # dialog. Please note that there is a slight race condition here as user
        # could change UAC desktop at any time. But we can tolerate this for the
        # testing purpose.
        desktop = 'winlogon' if uac.IsPromptingOnSecureDesktop() else 'default'

        logging.info('Spawn process [%s] for UAC on desktop [%s].', command,
                     desktop)
        pid, exit_code, stdout, stderr = impersonate.RunAsPidOnDeskstop(
            command, winlogon_pids[0], desktop=desktop, timeout=timeout)
        logging.info('Process [%s] is created to answer UAC, exit_code: %s',
                     pid, exit_code)
        if stdout and stdout.strip():
            logging.info('STDOUT: [%s]', stdout)
        if stderr and stderr.strip():
            logging.error('STDERR: [%s]', stderr)
        return (pid, exit_code)