chromium/components/safe_browsing/content/resources/real_time_url_checks_allowlist/store_real_time_url_allowlist_prefixes.py

#!/usr/bin/env python3
# Copyright 2022 The Chromium Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""
 Fetch the real time URL allowlist hash prefixes using the SBv4 API. Once
 we've validated the response, save these concatenated prefixes as the
 url_hashes in the real_time_url_allowlist.asciipb file.
"""

import base64
import json
import hashlib
import optparse
import os
import re
import subprocess
import sys
import traceback
import urllib.parse
import urllib.request

from validation_utils import (
    HASH_PREFIX_SIZE, CheckHashPrefixesAreValid, CheckHashPrefixesHaveChanged
)

sys.path.append(
    os.path.abspath(os.path.join(os.path.abspath(__file__),
                                 *[os.path.pardir] * 6 + ['google_apis'])))
import google_api_keys

def FetchSafeBrowsingAllowlistHashes():
    """ Return a byte sequence of concatenated 16 byte URL hash prefixes,
    fetched from the Safe Browsing API.
    """
    url = "https://safebrowsing.googleapis.com/v4/threatListUpdates:fetch?key="
    body = {
        'list_update_requests': [
            {
                'threat_type': 'HIGH_CONFIDENCE_ALLOWLIST',
                'platform_type': 'ANDROID',
                'threat_entry_type': 'URL'
            }
        ]
    }
    headers = {'content-type': 'application/json'}
    req = urllib.request.Request(
        url + google_api_keys.GetAPIKey(),
        method='POST',
        data=bytes(json.dumps(body), encoding="utf-8"),
        headers=headers
    )

    # Parse the response - should contain concatenated allowlist hash prefixes
    res = json.loads(urllib.request.urlopen(req).read())
    update_responses = res.get('listUpdateResponses')
    assert len(update_responses) > 0, (
        "Bad response from SB API - no listUpdateResponses key")
    additions = update_responses[0].get('additions')
    assert len(additions) > 0,(
        "Bad response from SB API - no additions found in listUpdateResponses")
    allowlist_hashes = additions[0].get('rawHashes').get('rawHashes')
    prefix_size = additions[0].get('rawHashes').get('prefixSize')
    assert prefix_size >= HASH_PREFIX_SIZE, (
        "Bad response from SB API - the returned prefixes are too small. "
        "They should be at least " + str(HASH_PREFIX_SIZE) + " bytes and they "
        "are " + str(prefix_size) + " bytes.")
    decoded_allowlist_hashes = base64.b64decode(allowlist_hashes)

    # Create string of concatenated 16 byte URL hash prefixes for pb file
    new_hash_prefixes = bytearray(b'')
    for i in range(0, len(decoded_allowlist_hashes), prefix_size):
        new_hash_prefixes += decoded_allowlist_hashes[i:i+HASH_PREFIX_SIZE]
    return bytes(new_hash_prefixes)

def WriteHashesToFile(hash_prefixes):
    """ Write provided hashes to the real_time_url_allowlist.asciipb file """
    outfile = os.path.join(
        os.getcwd(),
        'real_time_url_allowlist.asciipb'
    )
    # Read the ASCII
    with open(outfile, 'r') as ifile:
        ascii_pb_str = ifile.read()
    # New contents should keep version_id and scheme_id then replace url_hashes
    new_contents = (
        ascii_pb_str.split('url_hashes')[0] +
        'url_hashes: ' +
        str(hash_prefixes)[1:]
    )
    # Write new ASCII contents
    with open(outfile, 'w') as ofile:
        ofile.write(new_contents)


class StoreRealTimeUrlAllowlistPrefixes:
  def Run(self):
    parser = optparse.OptionParser()
    parser.add_option('-p', '--buildpath',
                      help='File path of the out build directory.')

    (opts, args) = parser.parse_args()
    if opts.buildpath is None:
      parser.print_help()
      return 1

    try:
      hash_prefixes = FetchSafeBrowsingAllowlistHashes()
      CheckHashPrefixesAreValid(hash_prefixes)
      assert CheckHashPrefixesHaveChanged(hash_prefixes), (
        "The URL hash prefixes have not changed, so we won't update the file"
      )
      WriteHashesToFile(hash_prefixes)
    except Exception as e:
      print("ERROR: Failed to receive valid response from SB API:\n  %s\n%s" %
            (str(e), traceback.format_exc()))
      return 1


def main():
    return StoreRealTimeUrlAllowlistPrefixes().Run()


if __name__ == '__main__':
    sys.exit(main())