chromium/net/data/ssl/scripts/crlsetutil.py

#!/usr/bin/env python3
# Copyright 2014 The Chromium Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

"""
This utility takes a JSON input that describes a CRLSet and produces a
CRLSet from it.

The input is taken on stdin and is a dict with the following keys:
  - BlockedBySPKI: An array of strings, where each string is a filename
      containing a PEM certificate, from which an SPKI will be extracted.
  - BlockedByHash: A dict of string to an array of strings. The dict key is
      a filename containing a PEM certificate, representing the issuer cert,
      while the array of strings contain the filenames of PEM format
      certificates whose serials are blocked.
  - LimitedSubjects: A dict of string to an array of strings, where the key is
      a filename containing a PEM format certificate, and the strings are the
      filenames of PEM format certificates. Certificates that share a Subject
      with the key will be restricted to the set of SPKIs extracted from the
      files in the values.
  - Sequence: An optional integer sequence number to use for the CRLSet. If
      not present, defaults to 1.

For example:

{
  "BlockedBySPKI": ["/tmp/blocked-certificate"],
  "BlockedByHash": {
    "/tmp/intermediate-certificate": [1, 2, 3]
  },
  "LimitedSubjects": {
    "/tmp/limited-certificate": [
        "/tmp/limited-certificate",
        "/tmp/limited-certificate2"
    ]
  },
  "Sequence": 23
}
"""

import base64
import collections
import hashlib
import json
import optparse
import six
import struct
import sys


def _pem_cert_to_binary(pem_filename):
  """Decodes the first PEM-encoded certificate in a given file into binary

  Args:
    pem_filename: A filename that contains a PEM-encoded certificate. It may
        contain additional data (keys, textual representation) which will be
        ignored

  Returns:
    A byte array containing the decoded certificate data
  """
  pem_data = ""
  started = False

  with open(pem_filename, 'r') as pem_file:
    for line in pem_file:
      if not started:
        if line.startswith('-----BEGIN CERTIFICATE'):
          started = True
      else:
        if line.startswith('-----END CERTIFICATE'):
          break
        pem_data += line[:-1].strip()

  return base64.b64decode(pem_data)


def _parse_asn1_element(der_bytes):
  """Parses a DER-encoded tag/Length/Value into its component parts

  Args:
    der_bytes: A DER-encoded ASN.1 data type

  Returns:
    A tuple of the ASN.1 tag value, the length of the ASN.1 header that was
    read, the sequence of bytes for the value, and then any data from der_bytes
    that was not part of the tag/Length/Value.
  """
  tag = six.indexbytes(der_bytes, 0)
  length = six.indexbytes(der_bytes, 1)
  header_length = 2

  if length & 0x80:
    num_length_bytes = length & 0x7f
    length = 0
    for i in range(2, 2 + num_length_bytes):
      length <<= 8
      length += six.indexbytes(der_bytes, i)
    header_length = 2 + num_length_bytes

  contents = der_bytes[:header_length + length]
  rest = der_bytes[header_length + length:]

  return (tag, header_length, contents, rest)


class ASN1Iterator(object):
  """Iterator that parses and iterates through a ASN.1 DER structure"""

  def __init__(self, contents):
    self._tag = 0
    self._header_length = 0
    self._rest = None
    self._contents = contents
    self.step_into()

  def step_into(self):
    """Begins processing the inner contents of the next ASN.1 element"""
    (self._tag, self._header_length, self._contents, self._rest) = (
        _parse_asn1_element(self._contents[self._header_length:]))

  def step_over(self):
    """Skips/ignores the next ASN.1 element"""
    (self._tag, self._header_length, self._contents, self._rest) = (
        _parse_asn1_element(self._rest))

  def tag(self):
    """Returns the ASN.1 tag of the current element"""
    return self._tag

  def contents(self):
    """Returns the raw data of the current element"""
    return self._contents

  def encoded_value(self):
    """Returns the encoded value of the current element (i.e. without header)"""
    return self._contents[self._header_length:]


def _der_cert_to_spki(der_bytes):
  """Returns the subjectPublicKeyInfo of a DER-encoded certificate

  Args:
    der_bytes: A DER-encoded certificate (RFC 5280)

  Returns:
    A byte array containing the subjectPublicKeyInfo
  """
  iterator = ASN1Iterator(der_bytes)
  iterator.step_into()  # enter certificate structure
  iterator.step_into()  # enter TBSCertificate
  iterator.step_over()  # over version
  iterator.step_over()  # over serial
  iterator.step_over()  # over signature algorithm
  iterator.step_over()  # over issuer name
  iterator.step_over()  # over validity
  iterator.step_over()  # over subject name
  return iterator.contents()


def der_cert_to_spki_hash(der_cert):
  """Gets the SHA-256 hash of the subjectPublicKeyInfo of a DER encoded cert

  Args:
    der_cert: A string containing the DER-encoded certificate

  Returns:
    The SHA-256 hash of the certificate, as a byte sequence
  """
  return hashlib.sha256(_der_cert_to_spki(der_cert)).digest()


def pem_cert_file_to_spki_hash(pem_filename):
  """Gets the SHA-256 hash of the subjectPublicKeyInfo of a cert in a file

  Args:
    pem_filename: A file containing a PEM-encoded certificate.

  Returns:
    The SHA-256 hash of the first certificate in the file, as a byte sequence
  """
  return der_cert_to_spki_hash(_pem_cert_to_binary(pem_filename))


def der_cert_to_subject_hash(der_bytes):
  """Returns SHA256(subject) of a DER-encoded certificate

  Args:
    der_bytes: A DER-encoded certificate (RFC 5280)

  Returns:
    The SHA-256 hash of the certificate's subject.
  """
  iterator = ASN1Iterator(der_bytes)
  iterator.step_into()  # enter certificate structure
  iterator.step_into()  # enter TBSCertificate
  iterator.step_over()  # over version
  iterator.step_over()  # over serial
  iterator.step_over()  # over signature algorithm
  iterator.step_over()  # over issuer name
  iterator.step_over()  # over validity
  return hashlib.sha256(iterator.contents()).digest()


def pem_cert_file_to_subject_hash(pem_filename):
  """Gets the SHA-256 hash of the subject of a cert in a file

  Args:
    pem_filename: A file containing a PEM-encoded certificate.

  Returns:
    The SHA-256 hash of the subject of the first certificate in the file, as a
    byte sequence
  """
  return der_cert_to_subject_hash(_pem_cert_to_binary(pem_filename))


def der_cert_to_serial(der_bytes):
  """Gets the serial of a DER-encoded certificate, omitting leading 0x00

  Args:
    der_bytes: A DER-encoded certificates (RFC 5280)

  Returns:
    The encoded serial number value (omitting tag and length), and omitting
    any leading 0x00 used to indicate it is a positive INTEGER.
  """
  iterator = ASN1Iterator(der_bytes)
  iterator.step_into()  # enter certificate structure
  iterator.step_into()  # enter TBSCertificate
  iterator.step_over()  # over version
  raw_serial = iterator.encoded_value()
  if six.indexbytes(raw_serial, 0) == 0x00 and len(raw_serial) > 1:
    raw_serial = raw_serial[1:]
  return raw_serial


def pem_cert_file_to_serial(pem_filename):
  """Gets the DER-encoded serial of a cert in a file, omitting leading 0x00

  Args:
    pem_filename: A file containing a PEM-encoded certificate.

  Returns:
    The DER-encoded serial as a byte sequence
  """
  return der_cert_to_serial(_pem_cert_to_binary(pem_filename))


def main():
  parser = optparse.OptionParser(description=sys.modules[__name__].__doc__)
  parser.add_option('-o', '--output',
                    help='Specifies the output file. The default is stdout.')
  options, _ = parser.parse_args()
  outfile = sys.stdout
  if options.output and options.output != '-':
    outfile = open(options.output, 'wb')

  config = json.load(sys.stdin)
  blocked_spkis = [
      base64.b64encode(pem_cert_file_to_spki_hash(pem_file)).decode('ascii')
      for pem_file in config.get('BlockedBySPKI', [])
  ]
  parents = {
      pem_cert_file_to_spki_hash(pem_file): [
          pem_cert_file_to_serial(issued_cert_file)
          for issued_cert_file in issued_certs
      ]
      for pem_file, issued_certs in config.get('BlockedByHash', {}).items()
  }
  limited_subjects = {
      base64.b64encode(pem_cert_file_to_subject_hash(pem_file)).decode('ascii'):
      [
          base64.b64encode(pem_cert_file_to_spki_hash(filename)).decode('ascii')
          for filename in allowed_pems
      ]
      for pem_file, allowed_pems in config.get('LimitedSubjects', {}).items()
  }
  known_interception_spkis = [
      base64.b64encode(pem_cert_file_to_spki_hash(pem_file)).decode('ascii')
      for pem_file in config.get('KnownInterceptionSPKIs', [])
  ]
  blocked_interception_spkis = [
      base64.b64encode(pem_cert_file_to_spki_hash(pem_file)).decode('ascii')
      for pem_file in config.get('BlockedInterceptionSPKIs', [])
  ]
  header_json = {
      'Version': 0,
      'ContentType': 'CRLSet',
      'Sequence': int(config.get("Sequence", 1)),
      'NumParents': len(parents),
      'BlockedSPKIs': blocked_spkis,
      'LimitedSubjects': limited_subjects,
      'KnownInterceptionSPKIs': known_interception_spkis,
      'BlockedInterceptionSPKIs': blocked_interception_spkis
  }
  header = json.dumps(header_json)
  outfile.write(struct.pack('<H', len(header)))
  outfile.write(header.encode('utf-8'))
  for spki, serials in sorted(parents.items()):
    outfile.write(spki)
    outfile.write(struct.pack('<I', len(serials)))
    for serial in serials:
      raw_serial = []
      if not serial:
        raw_serial = b'\x00'
      else:
        raw_serial = serial

      outfile.write(struct.pack('<B', len(raw_serial)))
      outfile.write(raw_serial)
  return 0


if __name__ == '__main__':
  sys.exit(main())