chromium/chrome/test/chromedriver/log_replay/client_replay_unittest.py

#!/usr/bin/env python3
# Copyright 2018 The Chromium Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

"""Unit tests for the client_replay.CommandSequence class."""

import io
import optparse
import os
import sys
import unittest

import client_replay

_THIS_DIR = os.path.abspath(os.path.dirname(__file__))
_PARENT_DIR = os.path.join(_THIS_DIR, os.pardir)
_TEST_DIR = os.path.join(_PARENT_DIR, "test")
# pylint: disable=g-import-not-at-top
sys.path.insert(1, _TEST_DIR)
import unittest_util
sys.path.remove(_TEST_DIR)

sys.path.insert(1, _PARENT_DIR)
import util
sys.path.insert(1, _PARENT_DIR)
# pylint: enable=g-import-not-at-top

_SESSION_ID = "b15232d5497ec0d8300a5a1ea56f33ce"
_SESSION_ID_ALT = "a81dc5521092a5ba132b9c0b6cf6e84f"

_NO_PARAMS = ("[1531428669.535][INFO]: [b15232d5497ec0d8300a5a1ea56f33ce] "
              "COMMAND GetTitle {\n\n}\n"
              "[1531428670.535][INFO]: [b15232d5497ec0d8300a5a1ea56f33ce] "
              "RESPONSE GetTitle\n")
_WITH_PARAMS = ('[1531428669.535][INFO]: [b15232d5497ec0d8300a5a1ea56f33ce] '
                'COMMAND GetTitle {\n"param1": 7\n}\n'
                '[1531428670.535][INFO]: [b15232d5497ec0d8300a5a1ea56f33ce] '
                'RESPONSE GetTitle {\n"param2": 42\n}\n')
_COMMAND_ONLY = ('[1531428670.535][INFO]: [b15232d5497ec0d8300a5a1ea56f33ce] '
                 'COMMAND GetTitle {\n"param1": 7\n}\n')
_RESPONSE_ONLY = ('[1531428670.535][INFO]: [b15232d5497ec0d8300a5a1ea56f33ce] '
                  'RESPONSE GetTitle {\n"param2": 42\n}\n')
_PAYLOAD_SCRIPT = ('[1531428670.535][INFO]: [b15232d5497ec0d8300a5a1ea56f33ce]'
                   ' RESPONSE GetTitle {\n"param2": "function(){func()}"\n}\n')
_PAYLOAD_READABLE_TIME_LINUX = (
    '[08-12-2019 15:45:34.824002][INFO]: [b15232d5497ec0d8300a5a1ea56f33ce]'
    ' RESPONSE GetTitle {\n"param2": "function(){func()}"\n}\n')
_PAYLOAD_READABLE_TIME_WINDOWS = (
    '[08-12-2019 15:45:34.824][INFO]: [b15232d5497ec0d8300a5a1ea56f33ce]'
    ' RESPONSE GetTitle {\n"param2": "function(){func()}"\n}\n')
_BAD_SCRIPT = ('[1531428670.535][INFO]: [b15232d5497ec0d8300a5a1ea56f33ce]'
               ' RESPONSE GetTitle {\n"param2": "))}\\})}/{)}({(})}"\n}\n')
_MULTI_SESSION = ('[1531428669.535][INFO]: [b15232d5497ec0d8300a5a1ea56f33ce] '
                  'COMMAND GetSessions {\n\n}\n'
                  '[1531428669.535][INFO]: [a81dc5521092a5ba132b9c0b6cf6e84f] '
                  'COMMAND GetSessions {\n\n}\n'
                  '[1531428670.535][INFO]: [b15232d5497ec0d8300a5a1ea56f33ce] '
                  'RESPONSE GetSessions {\n"param2": 42\n}\n'
                  '[1531428670.535][INFO]: [a81dc5521092a5ba132b9c0b6cf6e84f] '
                  'RESPONSE GetSessions {\n"param2": 42\n}\n' + _COMMAND_ONLY)

_WINDOW_ID_1 = "11111111111111111111111111111111"
_WINDOW_ID_2 = "22222222222222222222222222222222"
_WINDOW_IDS = [
    _WINDOW_ID_1,
    _WINDOW_ID_2,
    "other thing",  # Random string not in the targetID format.
    "1234567890123456789012345678901",  # Too short string.
    "123456789012345678901234567890123",  # Too long string.
    "GGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGG",  # String with not allowed symbol.
    "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",  # String with not allowed symbol.
]
_ELEMENT_ID = {"element-6066-11e4-a52e-4f735466cecf": "0.87-1"}
_ELEMENT_IDS = [{"element-6066-11e4-a52e-4f735466cecf": "0.87-1"},
                {"element-6066-11e4-a52e-4f735466cecf": "0.87-2"}]


class ChromeDriverClientReplayUnitTest(unittest.TestCase):
  """base class for test cases"""

  def __init__(self, *args, **kwargs):
    super(ChromeDriverClientReplayUnitTest, self).__init__(*args, **kwargs)

  def testNextCommandEmptyParams(self):
    string_buffer = io.StringIO(_NO_PARAMS)
    command_sequence = client_replay.CommandSequence()
    command_sequence._parser = client_replay._Parser(string_buffer)
    command = command_sequence.NextCommand(None)
    response = command_sequence._last_response

    self.assertEqual(command.name, "GetTitle")
    self.assertEqual(command.GetPayloadPrimitive(), {"sessionId": _SESSION_ID})
    self.assertEqual(command.session_id, _SESSION_ID)

    self.assertEqual(response.name, "GetTitle")
    self.assertEqual(response.GetPayloadPrimitive(), None)
    self.assertEqual(response.session_id, _SESSION_ID)

  def testNextCommandWithParams(self):
    string_buffer = io.StringIO(_WITH_PARAMS)
    command_sequence = client_replay.CommandSequence()
    command_sequence._parser = client_replay._Parser(string_buffer)
    command = command_sequence.NextCommand(None)
    response = command_sequence._last_response

    self.assertEqual(command.name, "GetTitle")
    self.assertEqual(command.GetPayloadPrimitive(), {"param1": 7,
                                                     "sessionId": _SESSION_ID})
    self.assertEqual(command.session_id, _SESSION_ID)

    self.assertEqual(response.name, "GetTitle")
    self.assertEqual(response.GetPayloadPrimitive(), {"param2": 42})
    self.assertEqual(response.session_id, _SESSION_ID)

  def testParserGetNext(self):
    string_buffer = io.StringIO(_WITH_PARAMS)
    command_sequence = client_replay.CommandSequence()
    command_sequence._parser = client_replay._Parser(string_buffer)
    command = command_sequence._parser.GetNext()

    self.assertEqual(command.name, "GetTitle")
    self.assertEqual(command.GetPayloadPrimitive(), {"param1": 7})
    self.assertEqual(command.session_id, _SESSION_ID)

  def testGetNextClientHeaderLine(self):
    string_buffer = io.StringIO(_PAYLOAD_SCRIPT)
    command_sequence = client_replay.CommandSequence()
    command_sequence._parser = client_replay._Parser(string_buffer)
    self.assertEqual(command_sequence._parser._GetNextClientHeaderLine(),
        ("[1531428670.535][INFO]: [b15232d5497ec0d8300a5a1ea56f33ce]"
            " RESPONSE GetTitle {\n"))

  def testGetNextClientHeaderLine_readableTimeLinux(self):
    string_buffer = io.StringIO(_PAYLOAD_READABLE_TIME_LINUX)
    command_sequence = client_replay.CommandSequence()
    command_sequence._parser = client_replay._Parser(string_buffer)
    self.assertEqual(command_sequence._parser._GetNextClientHeaderLine(),
        ("[08-12-2019_15:45:34.824002][INFO]:"
         " [b15232d5497ec0d8300a5a1ea56f33ce] RESPONSE GetTitle {\n"))

  def testGetNextClientHeaderLine_readableTimeWindows(self):
    string_buffer = io.StringIO(_PAYLOAD_READABLE_TIME_WINDOWS)
    command_sequence = client_replay.CommandSequence()
    command_sequence._parser = client_replay._Parser(string_buffer)
    self.assertEqual(command_sequence._parser._GetNextClientHeaderLine(),
        ("[08-12-2019_15:45:34.824][INFO]:"
         " [b15232d5497ec0d8300a5a1ea56f33ce] RESPONSE GetTitle {\n"))

  def testIngestLoggedResponse(self):
    string_buffer = io.StringIO(_RESPONSE_ONLY)
    command_sequence = client_replay.CommandSequence()
    command_sequence._parser = client_replay._Parser(string_buffer)
    response = command_sequence._parser.GetNext()

    self.assertEqual(response.name, "GetTitle")
    self.assertEqual(response.GetPayloadPrimitive(), {"param2": 42})
    self.assertEqual(response.session_id, _SESSION_ID)

  def testIngestRealResponseInitSession(self):
    real_resp = {'value': {
        'sessionId': 'b15232d5497ec0d8300a5a1ea56f33ce',
        'capabilities': {
            'browserVersion': '76.0.3809.100',
            'browserName': 'chrome',
        }
    }}

    command_sequence = client_replay.CommandSequence()
    command_sequence._staged_logged_session_id = _SESSION_ID_ALT
    command_sequence._IngestRealResponse(real_resp)

    self.assertEqual(
        command_sequence._id_map[_SESSION_ID_ALT], _SESSION_ID)
    self.assertEqual(command_sequence._staged_logged_session_id, None)

  def testIngestRealResponseNone(self):
    real_resp = {'value': None}

    command_sequence = client_replay.CommandSequence()
    command_sequence._IngestRealResponse(real_resp)

    self.assertEqual(command_sequence._last_response, None)

  def testIngestRealResponseInt(self):
    real_resp = {'value': 1}

    command_sequence = client_replay.CommandSequence()
    command_sequence._IngestRealResponse(real_resp)

    #last response is not changed by IngestRealResponse,
    #but we want to verify that int response content does not
    #cause error.
    self.assertEqual(command_sequence._last_response, None)

  def testGetPayload_simple(self):
    string_buffer = io.StringIO(_RESPONSE_ONLY)
    header = string_buffer.readline()
    command_sequence = client_replay.CommandSequence()
    command_sequence._parser = client_replay._Parser(string_buffer)
    payload_string = command_sequence._parser._GetPayloadString(header)
    self.assertEqual(payload_string, '{"param2": 42\n}\n')

  def testGetPayload_script(self):
    string_buffer = io.StringIO(_PAYLOAD_SCRIPT)
    header = string_buffer.readline()
    command_sequence = client_replay.CommandSequence()
    command_sequence._parser = client_replay._Parser(string_buffer)
    payload_string = command_sequence._parser._GetPayloadString(header)
    self.assertEqual(payload_string, '{"param2": "function(){func()}"\n}\n')

  def testGetPayload_badscript(self):
    string_buffer = io.StringIO(_BAD_SCRIPT)
    header = string_buffer.readline()
    command_sequence = client_replay.CommandSequence()
    command_sequence._parser = client_replay._Parser(string_buffer)
    payload_string = command_sequence._parser._GetPayloadString(header)
    self.assertEqual(payload_string, '{"param2": "))}\\})}/{)}({(})}"\n}\n')

  def testSubstitutePayloadIds_element(self):
    id_map = {"0.78-1": "0.00-0", "0.78-2": "0.00-1"}
    substituted = {"ELEMENT": "0.78-1"}
    client_replay._ReplaceWindowAndElementIds(substituted, id_map)
    self.assertEqual(substituted, {"ELEMENT": "0.00-0"})

  def testSubstitutePayloadIds_elements(self):
    id_map = {"0.78-1": "0.00-0", "0.78-2": "0.00-1"}
    substituted = [{"ELEMENT": "0.78-1"}, {"ELEMENT": "0.78-2"}]
    client_replay._ReplaceWindowAndElementIds(substituted, id_map)
    self.assertEqual(substituted,
                     [{"ELEMENT": "0.00-0"}, {"ELEMENT": "0.00-1"}])

  def testSubstitutePayloadIds_windows(self):
    id_map = {_WINDOW_ID_2: _WINDOW_ID_1}
    substituted = [_WINDOW_ID_2]
    client_replay._ReplaceWindowAndElementIds(substituted, id_map)
    self.assertEqual(substituted, [_WINDOW_ID_1])

  def testSubstitutePayloadIds_recursion(self):
    id_map = {"0.78-1": "0.00-0", "0.78-2": "0.00-1"}
    substituted = {"args": [{"1": "0.78-1", "2": "0.78-2"}]}
    client_replay._ReplaceWindowAndElementIds(substituted, id_map)
    self.assertEqual(substituted, {"args": [{"1": "0.00-0", "2": "0.00-1"}]})

  def testGetAnyElementids_window(self):
    ids = client_replay._GetAnyElementIds(_WINDOW_IDS)
    self.assertEqual(ids, [_WINDOW_ID_1, _WINDOW_ID_2])

  def testGetAnyElementids_element(self):
    ids = client_replay._GetAnyElementIds(_ELEMENT_ID)
    self.assertEqual(ids, ["0.87-1"])

  def testGetAnyElementids_elements(self):
    ids = client_replay._GetAnyElementIds(_ELEMENT_IDS)
    self.assertEqual(ids, ["0.87-1", "0.87-2"])

  def testGetAnyElementids_string(self):
    ids = client_replay._GetAnyElementIds("true")
    self.assertEqual(ids, None)

  def testGetAnyElementids_invalid(self):
    ids = client_replay._GetAnyElementIds("[ gibberish ]")
    self.assertEqual(ids, None)

  def testCountChar_positive(self):
    self.assertEqual(client_replay._CountChar("{;{;{)]", "{", "}"), 3)

  def testCountChar_onebrace(self):
    self.assertEqual(client_replay._CountChar("{", "{", "}"), 1)

  def testCountChar_nothing(self):
    self.assertEqual(client_replay._CountChar("", "{", "}"), 0)

  def testCountChar_negative(self):
    self.assertEqual(client_replay._CountChar("}){((}{(/)}=}", "{", "}"), -2)

  def testCountChar_quotes(self):
    self.assertEqual(
        client_replay._CountChar('[[][]"[[]]]]]"[[]', "[", "]"), 2)

  def testReplaceUrl_simple(self):
    base_url = "https://base.url.test.com:0000"
    payload = {"url": "https://localhost:12345/"}
    client_replay._ReplaceUrl(payload, base_url)
    self.assertEqual(payload, {"url": "https://base.url.test.com:0000/"})

  def testReplaceUrl_nothing(self):
    payload = {"url": "https://localhost:12345/"}
    client_replay._ReplaceUrl(payload, None)
    self.assertEqual(payload, {"url": "https://localhost:12345/"})

  def testReplaceBinary(self):
    payload_dict = {
        "desiredCapabilities": {
            "goog:chromeOptions": {
                "binary": "/path/to/logged binary/with spaces/"
            },
            "other_things": ["some", "uninteresting", "strings"]
        }
    }
    payload_replaced = {
        "desiredCapabilities": {
            "goog:chromeOptions": {
                "binary": "replacement_binary"
            },
            "other_things": ["some", "uninteresting", "strings"]
        }
    }
    client_replay._ReplaceBinary(payload_dict, "replacement_binary")
    self.assertEqual(payload_replaced, payload_dict)

  def testReplaceBinary_none(self):
    payload_dict = {
        "desiredCapabilities": {
            "goog:chromeOptions": {
                "binary": "/path/to/logged binary/with spaces/"
            },
            "other_things": ["some", "uninteresting", "strings"]
        }
    }
    payload_replaced = {
        "desiredCapabilities": {
            "goog:chromeOptions": {},
            "other_things": ["some", "uninteresting", "strings"]
        }
    }
    client_replay._ReplaceBinary(payload_dict, None)
    self.assertEqual(payload_replaced, payload_dict)

  def testReplaceBinary_nocapabilities(self):
    payload_dict = {"desiredCapabilities": {}}
    payload_replaced = {
        "desiredCapabilities": {
            "goog:chromeOptions": {
                "binary": "replacement_binary"
            }
        }
    }
    client_replay._ReplaceBinary(payload_dict, "replacement_binary")
    self.assertEqual(payload_replaced, payload_dict)

  def testGetCommandName(self):
    self.assertEqual(client_replay._GetCommandName(_PAYLOAD_SCRIPT),
        "GetTitle")

  def testGetSessionId(self):
    self.assertEqual(client_replay._GetSessionId(_PAYLOAD_SCRIPT),
                     _SESSION_ID)

  def testParseCommand_true(self):
    string_buffer = io.StringIO(_COMMAND_ONLY)
    command_sequence = client_replay.CommandSequence()
    command_sequence._parser = client_replay._Parser(string_buffer)
    self.assertTrue(command_sequence._parser.GetNext().IsCommand())

  def testParseCommand_false(self):
    string_buffer = io.StringIO(_RESPONSE_ONLY)
    command_sequence = client_replay.CommandSequence()
    command_sequence._parser = client_replay._Parser(string_buffer)
    self.assertFalse(command_sequence._parser.GetNext().IsCommand())

  def testParseResponse_true(self):
    string_buffer = io.StringIO(_RESPONSE_ONLY)
    command_sequence = client_replay.CommandSequence()
    command_sequence._parser = client_replay._Parser(string_buffer)
    self.assertTrue(command_sequence._parser.GetNext().IsResponse())

  def testParseResponse_false(self):
    string_buffer = io.StringIO(_COMMAND_ONLY)
    command_sequence = client_replay.CommandSequence()
    command_sequence._parser = client_replay._Parser(string_buffer)
    self.assertFalse(command_sequence._parser.GetNext().IsResponse())

  def testHandleGetSessions(self):
    string_buffer = io.StringIO(_MULTI_SESSION)
    command_sequence = client_replay.CommandSequence(string_buffer)
    first_command = command_sequence._parser.GetNext()
    command = command_sequence._HandleGetSessions(
        first_command)
    responses = command_sequence._last_response

    self.assertEqual(command.name, "GetSessions")
    self.assertEqual(command.GetPayloadPrimitive(), {})
    self.assertEqual(command.session_id, _SESSION_ID)

    self.assertEqual(responses.name, "GetSessions")
    self.assertEqual(responses.GetPayloadPrimitive(), [
        {
            "capabilities": {"param2": 42},
            "id": _SESSION_ID
        }, {
            "capabilities": {"param2": 42},
            "id": _SESSION_ID_ALT
        }
    ])
    self.assertEqual(responses.session_id, "")
    self.assertEqual(command_sequence._parser._saved_log_entry.name, "GetTitle")


def main():
  parser = optparse.OptionParser()
  parser.add_option(
      "", "--filter", type="string", default="*",
      help=('Filter for specifying what tests to run, "*" will run all. E.g., '
            "*testReplaceUrl_nothing"))
  parser.add_option(
      "", "--isolated-script-test-output",
      help="JSON output file used by swarming")
  # this option is ignored
  parser.add_option("--isolated-script-test-perf-output", type=str)

  options, _ = parser.parse_args()

  all_tests_suite = unittest.defaultTestLoader.loadTestsFromModule(
      sys.modules[__name__])
  test_suite = unittest_util.FilterTestSuite(all_tests_suite, options.filter)
  result = unittest.TextTestRunner(stream=sys.stdout, verbosity=2)\
          .run(test_suite)

  if options.isolated_script_test_output:
    util.WriteResultToJSONFile([test_suite], [result],
                               options.isolated_script_test_output)

  sys.exit(len(result.failures) + len(result.errors))

if __name__ == "__main__":
  main()