chromium/chrome/test/enterprise/e2e/connector/reporting_connector_chronicle/chronicle_api_service.py

# Copyright 2022 The Chromium Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

from datetime import datetime, timedelta
import json
from google.oauth2 import service_account
from googleapiclient import _auth
from .. import Verifyable, VerifyContent


class ChronicleApiService(Verifyable):
  """This class handles retrieving and verifying chronicle messages"""
  messages = []
  serviceAccountInfo = None

  def __init__(self, credentials):
    """Construct a ChronicleApiService instance.

    Args:
      credentials: the account key details.
    """
    try:
      self.serviceAccountInfo = json.loads(credentials)
    except json.JSONDecodeError:
      print('Decoding PubsubApiService credentials JSON has failed')

  def TryVerify(self, content: VerifyContent) -> bool:
    """This method will be called repeatedly until
        success or timeout. Returns boolean"""
    self.loadEvents(content.timestamp, content.device_id)
    return content.device_id in json.dumps(self.messages)

  def _datetimeToIso(self, date):
    return date.strftime('%Y-%m-%dT%H:%M:%S.%f%zZ')

  def loadEvents(self, testStartTime, deviceId):
    """Calls Chronicle API to fetch events

    Args:
      deviceId: A GUID device id that made the action.
      testStartTime: A datetime of the start time for the events
    """
    # Create a credential using Google Developer Service
    # Account Credential and Chronicle API scope.
    SCOPES = ['https://www.googleapis.com/auth/chronicle-backstory']
    credentials = service_account.Credentials.from_service_account_info(
        self.serviceAccountInfo, scopes=SCOPES)

    # Build an HTTP client that can make authorized OAuth requests.
    http_client = _auth.authorized_http(credentials)

    # Construct the URL
    RESULT_EVENTS_KEY = 'events'
    BACKSTORY_API_V1_URL = 'https://backstory.googleapis.com/v1'
    start_time = self._datetimeToIso(testStartTime - timedelta(minutes=15))
    end_time = self._datetimeToIso(datetime.utcnow())
    list_event_url = (
        '{}/events:udmSearch?query=principal.resource.id+%3D+%22{}%22'
        '&time_range.start_time={}&time_range.end_time={}'
        '&limit=100').format(BACKSTORY_API_V1_URL, deviceId, start_time,
                             end_time)

    # Make a request
    print('GET', list_event_url)
    response = http_client.request(list_event_url, 'GET')

    # Parse the response
    if response[0].status == 200:
      events = response[1]
      # List of events returned for further processing
      try:
        events = json.loads(events.decode("utf-8"))
      except json.JSONDecodeError:
        print('Decoding chronicle events JSON response has failed')

      self.messages = events.get(RESULT_EVENTS_KEY, [])
    else:
      # Something went wrong, please see the response detail
      err = response[1]
      print(err)