"""Reader for training log.
See lib/Analysis/TrainingLogger.cpp for a description of the format.
"""
import ctypes
import dataclasses
import io
import json
import math
import sys
from typing import List, Optional
_element_types = {
"float": ctypes.c_float,
"double": ctypes.c_double,
"int8_t": ctypes.c_int8,
"uint8_t": ctypes.c_uint8,
"int16_t": ctypes.c_int16,
"uint16_t": ctypes.c_uint16,
"int32_t": ctypes.c_int32,
"uint32_t": ctypes.c_uint32,
"int64_t": ctypes.c_int64,
"uint64_t": ctypes.c_uint64,
}
@dataclasses.dataclass(frozen=True)
class TensorSpec:
name: str
port: int
shape: List[int]
element_type: type
@staticmethod
def from_dict(d: dict):
name = d["name"]
port = d["port"]
shape = [int(e) for e in d["shape"]]
element_type_str = d["type"]
if element_type_str not in _element_types:
raise ValueError(f"uknown type: {element_type_str}")
return TensorSpec(
name=name,
port=port,
shape=shape,
element_type=_element_types[element_type_str],
)
class TensorValue:
def __init__(self, spec: TensorSpec, buffer: bytes):
self._spec = spec
self._buffer = buffer
self._view = ctypes.cast(self._buffer, ctypes.POINTER(self._spec.element_type))
self._len = math.prod(self._spec.shape)
def spec(self) -> TensorSpec:
return self._spec
def __len__(self) -> int:
return self._len
def __getitem__(self, index):
if index < 0 or index >= self._len:
raise IndexError(f"Index {index} out of range [0..{self._len})")
return self._view[index]
def read_tensor(fs: io.BufferedReader, ts: TensorSpec) -> TensorValue:
size = math.prod(ts.shape) * ctypes.sizeof(ts.element_type)
data = fs.read(size)
return TensorValue(ts, data)
def pretty_print_tensor_value(tv: TensorValue):
print(f'{tv.spec().name}: {",".join([str(v) for v in tv])}')
def read_header(f: io.BufferedReader):
header = json.loads(f.readline())
tensor_specs = [TensorSpec.from_dict(ts) for ts in header["features"]]
score_spec = TensorSpec.from_dict(header["score"]) if "score" in header else None
advice_spec = TensorSpec.from_dict(header["advice"]) if "advice" in header else None
return tensor_specs, score_spec, advice_spec
def read_one_observation(
context: Optional[str],
event_str: str,
f: io.BufferedReader,
tensor_specs: List[TensorSpec],
score_spec: Optional[TensorSpec],
):
event = json.loads(event_str)
if "context" in event:
context = event["context"]
event = json.loads(f.readline())
observation_id = int(event["observation"])
features = []
for ts in tensor_specs:
features.append(read_tensor(f, ts))
f.readline()
score = None
if score_spec is not None:
score_header = json.loads(f.readline())
assert int(score_header["outcome"]) == observation_id
score = read_tensor(f, score_spec)
f.readline()
return context, observation_id, features, score
def read_stream(fname: str):
with io.BufferedReader(io.FileIO(fname, "rb")) as f:
tensor_specs, score_spec, _ = read_header(f)
context = None
while True:
event_str = f.readline()
if not event_str:
break
context, observation_id, features, score = read_one_observation(
context, event_str, f, tensor_specs, score_spec
)
yield context, observation_id, features, score
def main(args):
last_context = None
for ctx, obs_id, features, score in read_stream(args[1]):
if last_context != ctx:
print(f"context: {ctx}")
last_context = ctx
print(f"observation: {obs_id}")
for fv in features:
pretty_print_tensor_value(fv)
if score:
pretty_print_tensor_value(score)
if __name__ == "__main__":
main(sys.argv)