#!/usr/bin/python3 # encoding: utf-8 # SPDX-FileCopyrightText: 2024 FC (Fay) Stegerman # SPDX-License-Identifier: GPL-3.0-or-later import hashlib import os import re import struct import zipfile import zlib from binascii import hexlify from typing import Any, Dict, Match, Tuple DEX_MAGIC = b"dex\n" DEX_MAGIC_RE = re.compile(rb"dex\n(\d{3})\x00") PROF_MAGIC = b"pro\x00" PROF_010_P = b"010\x00" CLASSES_DEX_RE = re.compile(r"classes\d*\.dex") ASSET_PROF = "assets/dexopt/baseline.prof" PG_MAP_ID_RE = re.compile(rb'(~~R8{"backend":"dex".*?"pg-map-id":")([0-9a-f]{7})(")') ATTRS = ("compress_type", "create_system", "create_version", "date_time", "external_attr", "extract_version", "flag_bits") LEVELS = (9, 6, 4, 1) class Error(RuntimeError): pass # FIXME: is there a better alternative? class ReproducibleZipInfo(zipfile.ZipInfo): """Reproducible ZipInfo hack.""" if "_compresslevel" not in zipfile.ZipInfo.__slots__: # type: ignore[attr-defined] if "compress_level" not in zipfile.ZipInfo.__slots__: # type: ignore[attr-defined] raise Error("zipfile.ZipInfo has no ._compresslevel") _compresslevel: int _override: Dict[str, Any] = {} def __init__(self, zinfo: zipfile.ZipInfo, **override: Any) -> None: # pylint: disable=W0231 if override: self._override = {**self._override, **override} for k in self.__slots__: if hasattr(zinfo, k): setattr(self, k, getattr(zinfo, k)) def __getattribute__(self, name: str) -> Any: if name != "_override": try: return self._override[name] except KeyError: pass return object.__getattribute__(self, name) def fix_pg_map_id(input_dir: str, output_dir: str, map_id: str) -> None: file_data = {} for filename in [ASSET_PROF] + sorted(os.listdir(input_dir)): if re.fullmatch(CLASSES_DEX_RE, filename) or filename == ASSET_PROF: print(f"reading {filename!r}...") with open(os.path.join(input_dir, *filename.split("/")), "rb") as fh: file_data[filename] = fh.read() _fix_pg_map_id(file_data, map_id) for filename, data in file_data.items(): print(f"writing {filename!r}...") if "/" in filename: os.makedirs(os.path.join(output_dir, *filename.split("/")[:-1]), exist_ok=True) with open(os.path.join(output_dir, *filename.split("/")), "wb") as fh: fh.write(data) def fix_pg_map_id_apk(input_apk: str, output_apk: str, map_id: str) -> None: with open(input_apk, "rb") as fh_raw: with zipfile.ZipFile(input_apk) as zf_in: with zipfile.ZipFile(output_apk, "w") as zf_out: file_data = {} for info in zf_in.infolist(): if re.fullmatch(CLASSES_DEX_RE, info.filename) or info.filename == ASSET_PROF: print(f"reading {info.filename!r}...") file_data[info.filename] = zf_in.read(info) _fix_pg_map_id(file_data, map_id) for info in zf_in.infolist(): attrs = {attr: getattr(info, attr) for attr in ATTRS} zinfo = ReproducibleZipInfo(info, **attrs) if info.compress_type == 8: fh_raw.seek(info.header_offset) n, m = struct.unpack(" 0: ccrc = zlib.crc32(fh_raw.read(min(size, 4096)), ccrc) size -= 4096 with zf_in.open(info) as fh_in: comps = {lvl: zlib.compressobj(lvl, 8, -15) for lvl in LEVELS} ccrcs = {lvl: 0 for lvl in LEVELS} while True: data = fh_in.read(4096) if not data: break for lvl in LEVELS: ccrcs[lvl] = zlib.crc32(comps[lvl].compress(data), ccrcs[lvl]) for lvl in LEVELS: if ccrc == zlib.crc32(comps[lvl].flush(), ccrcs[lvl]): zinfo._compresslevel = lvl break else: raise Error(f"Unable to determine compresslevel for {info.filename!r}") elif info.compress_type != 0: raise Error(f"Unsupported compress_type {info.compress_type}") if re.fullmatch(CLASSES_DEX_RE, info.filename) or info.filename == ASSET_PROF: print(f"writing {info.filename!r}...") zf_out.writestr(zinfo, file_data[info.filename]) else: with zf_in.open(info) as fh_in: with zf_out.open(zinfo, "w") as fh_out: while True: data = fh_in.read(4096) if not data: break fh_out.write(data) def _fix_pg_map_id(file_data: Dict[str, bytes], map_id: str) -> None: crcs = {} for filename in file_data: if re.fullmatch(CLASSES_DEX_RE, filename): print(f"fixing {filename!r}...") data = _fix_dex_id_checksum(file_data[filename], map_id.encode()) file_data[filename] = data crcs[filename] = zlib.crc32(data) if ASSET_PROF in file_data: print(f"fixing {ASSET_PROF!r}...") file_data[ASSET_PROF] = _fix_prof_checksum(file_data[ASSET_PROF], crcs) def _fix_dex_id_checksum(data: bytes, map_id: bytes) -> bytes: def repl(m: Match[bytes]) -> bytes: print(f"fixing pg-map-id: {m.group(2)!r} -> {map_id!r}") return m.group(1) + map_id + m.group(3) magic = data[:8] if magic[:4] != DEX_MAGIC or not DEX_MAGIC_RE.fullmatch(magic): raise Error(f"Unsupported magic {magic!r}") print(f"dex version={int(magic[4:7]):03d}") checksum, signature = struct.unpack(" {hexlify(fixed_sig).decode()}") fixed_data = fixed_sig + fixed_data fixed_checksum = zlib.adler32(fixed_data) print(f"fixing checksum: 0x{checksum:x} -> 0x{fixed_checksum:x}") return magic + int.to_bytes(fixed_checksum, 4, "little") + fixed_data def _fix_prof_checksum(data: bytes, crcs: Dict[str, int]) -> bytes: magic, data = _split(data, 4) version, data = _split(data, 4) if magic == PROF_MAGIC: if version == PROF_010_P: print("prof version=010 P") return PROF_MAGIC + PROF_010_P + _fix_prof_010_p_checksum(data, crcs) else: raise Error(f"Unsupported prof version {version!r}") else: raise Error(f"Unsupported magic {magic!r}") def _fix_prof_010_p_checksum(data: bytes, crcs: Dict[str, int]) -> bytes: num_dex_files, uncompressed_data_size, compressed_data_size, data = _unpack(" 0x{fixed_checksum:x}") dex_data_headers.append(struct.pack( " Any: assert all(c in " Tuple[bytes, bytes]: return data[:size], data[size:] if __name__ == "__main__": import argparse parser = argparse.ArgumentParser(prog="fix-pg-map-id.py") parser.add_argument("input_dir_or_apk", metavar="INPUT_DIR_OR_APK") parser.add_argument("output_dir_or_apk", metavar="OUTPUT_DIR_OR_APK") parser.add_argument("pg_map_id", metavar="PG_MAP_ID") args = parser.parse_args() if os.path.isdir(args.input_dir_or_apk): fix_pg_map_id(args.input_dir_or_apk, args.output_dir_or_apk, args.pg_map_id) else: fix_pg_map_id_apk(args.input_dir_or_apk, args.output_dir_or_apk, args.pg_map_id) # vim: set tw=80 sw=4 sts=4 et fdm=marker :