chromium/chrome/test/enterprise/e2e/policy/cloud_reporting_enabled/cloud_reporting_enabled.py

# Copyright 2024 The Chromium Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

from datetime import datetime
import json
import logging
import os

from google.oauth2 import service_account
from google.auth.transport.requests import Request
import requests

from infra import ChromeEnterpriseTestCase
from chrome_ent_test.infra.core import before_all
from chrome_ent_test.infra.core import category
from chrome_ent_test.infra.core import environment
from chrome_ent_test.infra.core import test


@category("chrome_only")
@environment(file="../policy_test.asset.textpb")
class CloudReportingEnabledTest(ChromeEnterpriseTestCase):
  """Test the CloudReportingEnabled policy:
  https://cloud.google.com/docs/chrome-enterprise/policies/?policy=CloudReportingEnabled."""

  ADMIN_USER_EMAIL = '[email protected]'
  SCOPES = [
      'https://www.googleapis.com/auth/admin.directory.device.chromebrowsers'
  ]

  @before_all
  def setup(self):
    self.InstallChrome(self.win_config['client'])
    self.InstallWebDriver(self.win_config['client'])

  def GetAccessToken(self, service_account_info, admin_user_email, scopes):
    """Obtains an access token using service account key and impersonation."""

    credentials = service_account.Credentials.from_service_account_info(
        service_account_info, scopes=scopes, subject=admin_user_email)

    # Refresh the token if it's invalid
    if credentials.valid is False:
      credentials.refresh(Request())
      logging.info("Access Token refreshed.")
    return credentials.token

  def GetTakeoutUrl(self, device_id):
    url = "https://www.googleapis.com/admin/directory/v1.1beta1/customer/my_customer/devices/chromebrowsers"
    # Add arguments to url
    args = f'?projection=FULL&query={device_id}&orgUnitPath=/CBCM-enrollment'
    url += args
    return url

  @test
  def test_report_and_fetch(self):
    # Domain: chromepizzatest.com / OrgUnit: CBCM-enrollment
    # CloudReporting is enabled
    cmd = f'gsutil cat gs://{self.gsbucket}/secrets/enrollToken'
    token = self.RunCommand(self.win_config['dc'], cmd).rstrip().decode()

    self.SetPolicy(self.win_config['dc'], r'CloudManagementEnrollmentToken',
                   token, 'String')
    self.RunCommand(self.win_config['client'], 'gpupdate /force')

    local_dir = os.path.dirname(os.path.abspath(__file__))
    # Run test on client which triggers enrollment, policy fetch, and a report
    output = self.RunWebDriverTest(self.win_config['client'],
                                   os.path.join(local_dir, '../cbcm_enroll.py'))

    # Get the device id of the browser
    index = output.find('DEVICE_ID=')
    self.assertTrue(index > 0)
    device_id = output[index + len('DEVICE_ID='):].split()[0]

    # Get OAuth2 access token
    key = self.GetFileFromGCSBucket('secrets/cbcmapi.json')
    serviceAccountInfo = json.loads(key)
    access_token = self.GetAccessToken(serviceAccountInfo,
                                       self.ADMIN_USER_EMAIL, self.SCOPES)
    headers = {'Authorization': f'Bearer {access_token}'}

    # Query CBCM API with virtualDeviceId
    url = self.GetTakeoutUrl(device_id)
    response = requests.get(url, headers=headers)
    logging.info('Querying CBCM API for browser data...')
    logging.info('server response = %s' % response)
    json_object = response.json()
    logging.info(json_object)

    last_registration_time = datetime.fromisoformat(
        json_object['browsers'][0]['lastRegistrationTime'])
    last_policy_fetch_time = datetime.fromisoformat(
        json_object['browsers'][0]['lastPolicyFetchTime'])
    last_status_report_time = datetime.fromisoformat(
        json_object['browsers'][0]['lastStatusReportTime'])

    self.assertTrue(int(json_object['browsers'][0]['policyCount']) > 0)
    self.assertTrue(last_policy_fetch_time > last_registration_time)
    self.assertTrue(last_status_report_time > last_registration_time)