#!/usr/bin/env python3
# Copyright 2012 The Chromium Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Support for formatting a data pack file used for platform agnostic resource
files.
"""
import collections
import os
import struct
import sys
if __name__ == '__main__':
sys.path.append(os.path.join(os.path.dirname(__file__), '../..'))
from grit import util
from grit.node import include
from grit.node import message
from grit.node import structure
PACK_FILE_VERSION = 5
BINARY, UTF8, UTF16 = range(3)
GrdInfoItem = collections.namedtuple('GrdInfoItem',
['textual_id', 'id', 'path'])
class WrongFileVersion(Exception):
pass
class CorruptDataPack(Exception):
pass
class DataPackSizes:
def __init__(self, header, id_table, alias_table, data):
self.header = header
self.id_table = id_table
self.alias_table = alias_table
self.data = data
@property
def total(self):
return sum(v for v in self.__dict__.values())
def __iter__(self):
yield ('header', self.header)
yield ('id_table', self.id_table)
yield ('alias_table', self.alias_table)
yield ('data', self.data)
def __eq__(self, other):
return self.__dict__ == other.__dict__
def __repr__(self):
return self.__class__.__name__ + repr(self.__dict__)
class DataPackContents:
def __init__(self, resources, encoding, version, aliases, sizes):
# Map of resource_id -> str.
self.resources = resources
# Encoding (int).
self.encoding = encoding
# Version (int).
self.version = version
# Map of resource_id->canonical_resource_id
self.aliases = aliases
# DataPackSizes instance.
self.sizes = sizes
def Format(root, lang='en', output_dir='.'):
"""Writes out the data pack file format (platform agnostic resource file)."""
id_map = root.GetIdMap()
data = {}
root.info = []
for node in root.ActiveDescendants():
with node:
if isinstance(node, (include.IncludeNode, message.MessageNode,
structure.StructureNode)):
value = node.GetDataPackValue(lang, util.BINARY)
if value is not None:
resource_id = id_map[node.GetTextualIds()[0]]
data[resource_id] = value
root.info.append('{},{},{}'.format(
node.attrs.get('name'), resource_id, node.source))
return WriteDataPackToString(data, UTF8)
def ReadDataPack(input_file):
return ReadDataPackFromString(util.ReadFile(input_file, util.BINARY))
def ReadDataPackFromString(data):
"""Reads a data pack file and returns a dictionary."""
# Read the header.
version = struct.unpack('<I', data[:4])[0]
if version == 4:
resource_count, encoding = struct.unpack('<IB', data[4:9])
alias_count = 0
header_size = 9
elif version == 5:
encoding, resource_count, alias_count = struct.unpack('<BxxxHH', data[4:12])
header_size = 12
else:
raise WrongFileVersion('Found version: ' + str(version))
resources = {}
kIndexEntrySize = 2 + 4 # Each entry is a uint16 and a uint32.
def entry_at_index(idx):
offset = header_size + idx * kIndexEntrySize
return struct.unpack('<HI', data[offset:offset + kIndexEntrySize])
prev_resource_id, prev_offset = entry_at_index(0)
for i in range(1, resource_count + 1):
resource_id, offset = entry_at_index(i)
resources[prev_resource_id] = data[prev_offset:offset]
prev_resource_id, prev_offset = resource_id, offset
id_table_size = (resource_count + 1) * kIndexEntrySize
# Read the alias table.
kAliasEntrySize = 2 + 2 # uint16, uint16
def alias_at_index(idx):
offset = header_size + id_table_size + idx * kAliasEntrySize
return struct.unpack('<HH', data[offset:offset + kAliasEntrySize])
aliases = {}
for i in range(alias_count):
resource_id, index = alias_at_index(i)
aliased_id = entry_at_index(index)[0]
aliases[resource_id] = aliased_id
resources[resource_id] = resources[aliased_id]
alias_table_size = kAliasEntrySize * alias_count
sizes = DataPackSizes(
header_size, id_table_size, alias_table_size,
len(data) - header_size - id_table_size - alias_table_size)
assert sizes.total == len(data), 'original={} computed={}'.format(
len(data), sizes.total)
return DataPackContents(resources, encoding, version, aliases, sizes)
def WriteDataPackToString(resources, encoding):
"""Returns bytes with a map of id=>data in the data pack format."""
ret = []
# Compute alias map.
resource_ids = sorted(resources)
# Use reversed() so that for duplicates lower IDs clobber higher ones.
id_by_data = {resources[k]: k for k in reversed(resource_ids)}
# Map of resource_id -> resource_id, where value < key.
alias_map = {k: id_by_data[v] for k, v in resources.items()
if id_by_data[v] != k}
# Write file header.
resource_count = len(resources) - len(alias_map)
# Padding bytes added for alignment.
ret.append(struct.pack('<IBxxxHH', PACK_FILE_VERSION, encoding,
resource_count, len(alias_map)))
HEADER_LENGTH = 4 + 4 + 2 + 2
# Each main table entry is: uint16 + uint32 (and an extra entry at the end).
# Each alias table entry is: uint16 + uint16.
data_offset = HEADER_LENGTH + (resource_count + 1) * 6 + len(alias_map) * 4
# Write main table.
index_by_id = {}
deduped_data = []
index = 0
for resource_id in resource_ids:
if resource_id in alias_map:
continue
data = resources[resource_id]
if isinstance(data, str):
data = data.encode('utf-8')
index_by_id[resource_id] = index
ret.append(struct.pack('<HI', resource_id, data_offset))
data_offset += len(data)
deduped_data.append(data)
index += 1
assert index == resource_count
# Add an extra entry at the end.
ret.append(struct.pack('<HI', 0, data_offset))
# Write alias table.
for resource_id in sorted(alias_map):
index = index_by_id[alias_map[resource_id]]
ret.append(struct.pack('<HH', resource_id, index))
# Write data.
ret.extend(deduped_data)
return b''.join(ret)
def WriteDataPack(resources, output_file, encoding):
"""Writes a map of id=>data into output_file as a data pack."""
content = WriteDataPackToString(resources, encoding)
with open(output_file, 'wb') as file:
file.write(content)
def ReadGrdInfo(grd_file):
info_dict = {}
with open(grd_file + '.info') as f:
for line in f:
item = GrdInfoItem._make(line.strip().split(','))
info_dict[int(item.id)] = item
return info_dict
def RePack(output_file,
input_files,
allowlist_file=None,
suppress_removed_key_output=False,
output_info_filepath=None):
"""Write a new data pack file by combining input pack files.
Args:
output_file: path to the new data pack file.
input_files: a list of paths to the data pack files to combine.
allowlist_file: path to the file that contains the list of resource IDs
that should be kept in the output file or None to include
all resources.
suppress_removed_key_output: allows the caller to suppress the output from
RePackFromDataPackStrings.
output_info_file: If not None, specify the output .info filepath.
Raises:
KeyError: if there are duplicate keys or resource encoding is
inconsistent.
"""
input_data_packs = [ReadDataPack(filename) for filename in input_files]
input_info_files = [filename + '.info' for filename in input_files]
allowlist = None
if allowlist_file:
lines = util.ReadFile(allowlist_file, 'utf-8').strip().splitlines()
if not lines:
raise Exception('Allowlist file should not be empty')
allowlist = {int(x) for x in lines}
inputs = [(p.resources, p.encoding) for p in input_data_packs]
resources, encoding = RePackFromDataPackStrings(inputs, allowlist,
suppress_removed_key_output)
WriteDataPack(resources, output_file, encoding)
if output_info_filepath is None:
output_info_filepath = output_file + '.info'
with open(output_info_filepath, 'w') as output_info_file:
for filename in input_info_files:
with open(filename) as info_file:
output_info_file.writelines(info_file.readlines())
def RePackFromDataPackStrings(inputs,
allowlist,
suppress_removed_key_output=False):
"""Combines all inputs into one.
Args:
inputs: a list of (resources_by_id, encoding) tuples to be combined.
allowlist: a list of resource IDs that should be kept in the output string
or None to include all resources.
suppress_removed_key_output: Do not print removed keys.
Returns:
Returns (resources_by_id, encoding).
Raises:
KeyError: if there are duplicate keys or resource encoding is
inconsistent.
"""
resources = {}
encoding = None
for input_resources, input_encoding in inputs:
# Make sure we have no dups.
duplicate_keys = set(input_resources.keys()) & set(resources.keys())
if duplicate_keys:
raise KeyError(
'Duplicate resource IDs: ' + str(list(duplicate_keys)) + '. '
'This is likely because the reserved ID ranges defined in ' +
'tools/gritsettings/resource_ids.spec have been exhausted.')
# Make sure encoding is consistent.
if encoding in (None, BINARY):
encoding = input_encoding
elif input_encoding not in (BINARY, encoding):
raise KeyError('Inconsistent encodings: ' + str(encoding) +
' vs ' + str(input_encoding))
if allowlist:
allowlisted_resources = {key: input_resources[key]
for key in input_resources.keys()
if key in allowlist}
resources.update(allowlisted_resources)
removed_keys = [
key for key in input_resources.keys() if key not in allowlist
]
if not suppress_removed_key_output:
for key in removed_keys:
print('RePackFromDataPackStrings Removed Key:', key)
else:
resources.update(input_resources)
# Encoding is 0 for BINARY, 1 for UTF8 and 2 for UTF16
if encoding is None:
encoding = BINARY
return resources, encoding
def main():
# Write a simple file.
data = {1: '', 4: 'this is id 4', 6: 'this is id 6', 10: ''}
WriteDataPack(data, 'datapack1.pak', UTF8)
data2 = {1000: 'test', 5: 'five'}
WriteDataPack(data2, 'datapack2.pak', UTF8)
print('wrote datapack1 and datapack2 to current directory.')
if __name__ == '__main__':
main()