Benoit Marty
2 months ago
committed by
GitHub
3 changed files with 427 additions and 4 deletions
@ -0,0 +1,227 @@
@@ -0,0 +1,227 @@
|
||||
#!/usr/bin/python3 |
||||
# encoding: utf-8 |
||||
# SPDX-FileCopyrightText: 2024 FC (Fay) Stegerman <flx@obfusk.net> |
||||
# SPDX-License-Identifier: GPL-3.0-or-later |
||||
|
||||
import hashlib |
||||
import os |
||||
import re |
||||
import struct |
||||
import zipfile |
||||
import zlib |
||||
|
||||
from binascii import hexlify |
||||
from typing import Any, Dict, Match, Tuple |
||||
|
||||
DEX_MAGIC = b"dex\n" |
||||
DEX_MAGIC_RE = re.compile(rb"dex\n(\d{3})\x00") |
||||
|
||||
PROF_MAGIC = b"pro\x00" |
||||
PROF_010_P = b"010\x00" |
||||
|
||||
CLASSES_DEX_RE = re.compile(r"classes\d*\.dex") |
||||
ASSET_PROF = "assets/dexopt/baseline.prof" |
||||
|
||||
PG_MAP_ID_RE = re.compile(rb'(~~R8{"backend":"dex".*?"pg-map-id":")([0-9a-f]{7})(")') |
||||
|
||||
ATTRS = ("compress_type", "create_system", "create_version", "date_time", |
||||
"external_attr", "extract_version", "flag_bits") |
||||
LEVELS = (9, 6, 4, 1) |
||||
|
||||
|
||||
class Error(RuntimeError): |
||||
pass |
||||
|
||||
|
||||
# FIXME: is there a better alternative? |
||||
class ReproducibleZipInfo(zipfile.ZipInfo): |
||||
"""Reproducible ZipInfo hack.""" |
||||
|
||||
if "_compresslevel" not in zipfile.ZipInfo.__slots__: # type: ignore[attr-defined] |
||||
if "compress_level" not in zipfile.ZipInfo.__slots__: # type: ignore[attr-defined] |
||||
raise Error("zipfile.ZipInfo has no ._compresslevel") |
||||
|
||||
_compresslevel: int |
||||
_override: Dict[str, Any] = {} |
||||
|
||||
def __init__(self, zinfo: zipfile.ZipInfo, **override: Any) -> None: |
||||
# pylint: disable=W0231 |
||||
if override: |
||||
self._override = {**self._override, **override} |
||||
for k in self.__slots__: |
||||
if hasattr(zinfo, k): |
||||
setattr(self, k, getattr(zinfo, k)) |
||||
|
||||
def __getattribute__(self, name: str) -> Any: |
||||
if name != "_override": |
||||
try: |
||||
return self._override[name] |
||||
except KeyError: |
||||
pass |
||||
return object.__getattribute__(self, name) |
||||
|
||||
|
||||
def fix_pg_map_id(input_dir: str, output_dir: str, map_id: str) -> None: |
||||
file_data = {} |
||||
for filename in [ASSET_PROF] + sorted(os.listdir(input_dir)): |
||||
if re.fullmatch(CLASSES_DEX_RE, filename) or filename == ASSET_PROF: |
||||
print(f"reading {filename!r}...") |
||||
with open(os.path.join(input_dir, *filename.split("/")), "rb") as fh: |
||||
file_data[filename] = fh.read() |
||||
_fix_pg_map_id(file_data, map_id) |
||||
for filename, data in file_data.items(): |
||||
print(f"writing {filename!r}...") |
||||
if "/" in filename: |
||||
os.makedirs(os.path.join(output_dir, *filename.split("/")[:-1]), exist_ok=True) |
||||
with open(os.path.join(output_dir, *filename.split("/")), "wb") as fh: |
||||
fh.write(data) |
||||
|
||||
|
||||
def fix_pg_map_id_apk(input_apk: str, output_apk: str, map_id: str) -> None: |
||||
with open(input_apk, "rb") as fh_raw: |
||||
with zipfile.ZipFile(input_apk) as zf_in: |
||||
with zipfile.ZipFile(output_apk, "w") as zf_out: |
||||
file_data = {} |
||||
for info in zf_in.infolist(): |
||||
if re.fullmatch(CLASSES_DEX_RE, info.filename) or info.filename == ASSET_PROF: |
||||
print(f"reading {info.filename!r}...") |
||||
file_data[info.filename] = zf_in.read(info) |
||||
_fix_pg_map_id(file_data, map_id) |
||||
for info in zf_in.infolist(): |
||||
attrs = {attr: getattr(info, attr) for attr in ATTRS} |
||||
zinfo = ReproducibleZipInfo(info, **attrs) |
||||
if info.compress_type == 8: |
||||
fh_raw.seek(info.header_offset) |
||||
n, m = struct.unpack("<HH", fh_raw.read(30)[26:30]) |
||||
fh_raw.seek(info.header_offset + 30 + m + n) |
||||
ccrc = 0 |
||||
size = info.compress_size |
||||
while size > 0: |
||||
ccrc = zlib.crc32(fh_raw.read(min(size, 4096)), ccrc) |
||||
size -= 4096 |
||||
with zf_in.open(info) as fh_in: |
||||
comps = {lvl: zlib.compressobj(lvl, 8, -15) for lvl in LEVELS} |
||||
ccrcs = {lvl: 0 for lvl in LEVELS} |
||||
while True: |
||||
data = fh_in.read(4096) |
||||
if not data: |
||||
break |
||||
for lvl in LEVELS: |
||||
ccrcs[lvl] = zlib.crc32(comps[lvl].compress(data), ccrcs[lvl]) |
||||
for lvl in LEVELS: |
||||
if ccrc == zlib.crc32(comps[lvl].flush(), ccrcs[lvl]): |
||||
zinfo._compresslevel = lvl |
||||
break |
||||
else: |
||||
raise Error(f"Unable to determine compresslevel for {info.filename!r}") |
||||
elif info.compress_type != 0: |
||||
raise Error(f"Unsupported compress_type {info.compress_type}") |
||||
if re.fullmatch(CLASSES_DEX_RE, info.filename) or info.filename == ASSET_PROF: |
||||
print(f"writing {info.filename!r}...") |
||||
zf_out.writestr(zinfo, file_data[info.filename]) |
||||
else: |
||||
with zf_in.open(info) as fh_in: |
||||
with zf_out.open(zinfo, "w") as fh_out: |
||||
while True: |
||||
data = fh_in.read(4096) |
||||
if not data: |
||||
break |
||||
fh_out.write(data) |
||||
|
||||
|
||||
def _fix_pg_map_id(file_data: Dict[str, bytes], map_id: str) -> None: |
||||
crcs = {} |
||||
for filename in file_data: |
||||
if re.fullmatch(CLASSES_DEX_RE, filename): |
||||
print(f"fixing {filename!r}...") |
||||
data = _fix_dex_id_checksum(file_data[filename], map_id.encode()) |
||||
file_data[filename] = data |
||||
crcs[filename] = zlib.crc32(data) |
||||
if ASSET_PROF in file_data: |
||||
print(f"fixing {ASSET_PROF!r}...") |
||||
file_data[ASSET_PROF] = _fix_prof_checksum(file_data[ASSET_PROF], crcs) |
||||
|
||||
|
||||
def _fix_dex_id_checksum(data: bytes, map_id: bytes) -> bytes: |
||||
def repl(m: Match[bytes]) -> bytes: |
||||
print(f"fixing pg-map-id: {m.group(2)!r} -> {map_id!r}") |
||||
return m.group(1) + map_id + m.group(3) |
||||
|
||||
magic = data[:8] |
||||
if magic[:4] != DEX_MAGIC or not DEX_MAGIC_RE.fullmatch(magic): |
||||
raise Error(f"Unsupported magic {magic!r}") |
||||
print(f"dex version={int(magic[4:7]):03d}") |
||||
checksum, signature = struct.unpack("<I20s", data[8:32]) |
||||
fixed_data = re.sub(PG_MAP_ID_RE, repl, data[32:]) |
||||
if fixed_data == data[32:]: |
||||
print("(not modified)") |
||||
return data |
||||
fixed_sig = hashlib.sha1(fixed_data).digest() |
||||
print(f"fixing signature: {hexlify(signature).decode()} -> {hexlify(fixed_sig).decode()}") |
||||
fixed_data = fixed_sig + fixed_data |
||||
fixed_checksum = zlib.adler32(fixed_data) |
||||
print(f"fixing checksum: 0x{checksum:x} -> 0x{fixed_checksum:x}") |
||||
return magic + int.to_bytes(fixed_checksum, 4, "little") + fixed_data |
||||
|
||||
|
||||
def _fix_prof_checksum(data: bytes, crcs: Dict[str, int]) -> bytes: |
||||
magic, data = _split(data, 4) |
||||
version, data = _split(data, 4) |
||||
if magic == PROF_MAGIC: |
||||
if version == PROF_010_P: |
||||
print("prof version=010 P") |
||||
return PROF_MAGIC + PROF_010_P + _fix_prof_010_p_checksum(data, crcs) |
||||
else: |
||||
raise Error(f"Unsupported prof version {version!r}") |
||||
else: |
||||
raise Error(f"Unsupported magic {magic!r}") |
||||
|
||||
|
||||
def _fix_prof_010_p_checksum(data: bytes, crcs: Dict[str, int]) -> bytes: |
||||
num_dex_files, uncompressed_data_size, compressed_data_size, data = _unpack("<BII", data) |
||||
dex_data_headers = [] |
||||
if len(data) != compressed_data_size: |
||||
raise Error("Compressed data size does not match") |
||||
data = zlib.decompress(data) |
||||
if len(data) != uncompressed_data_size: |
||||
raise Error("Uncompressed data size does not match") |
||||
for i in range(num_dex_files): |
||||
profile_key_size, num_type_ids, hot_method_region_size, \ |
||||
dex_checksum, num_method_ids, data = _unpack("<HHIII", data) |
||||
profile_key, data = _split(data, profile_key_size) |
||||
filename = profile_key.decode() |
||||
fixed_checksum = crcs[filename] |
||||
if fixed_checksum != dex_checksum: |
||||
print(f"fixing {filename!r} checksum: 0x{dex_checksum:x} -> 0x{fixed_checksum:x}") |
||||
dex_data_headers.append(struct.pack( |
||||
"<HHIII", profile_key_size, num_type_ids, hot_method_region_size, |
||||
fixed_checksum, num_method_ids) + profile_key) |
||||
fixed_data = b"".join(dex_data_headers) + data |
||||
fixed_cdata = zlib.compress(fixed_data, 1) |
||||
fixed_hdr = struct.pack("<BII", num_dex_files, uncompressed_data_size, len(fixed_cdata)) |
||||
return fixed_hdr + fixed_cdata |
||||
|
||||
|
||||
def _unpack(fmt: str, data: bytes) -> Any: |
||||
assert all(c in "<BHI" for c in fmt) |
||||
size = fmt.count("B") + 2 * fmt.count("H") + 4 * fmt.count("I") |
||||
return struct.unpack(fmt, data[:size]) + (data[size:],) |
||||
|
||||
|
||||
def _split(data: bytes, size: int) -> Tuple[bytes, bytes]: |
||||
return data[:size], data[size:] |
||||
|
||||
|
||||
if __name__ == "__main__": |
||||
import argparse |
||||
parser = argparse.ArgumentParser(prog="fix-pg-map-id.py") |
||||
parser.add_argument("input_dir_or_apk", metavar="INPUT_DIR_OR_APK") |
||||
parser.add_argument("output_dir_or_apk", metavar="OUTPUT_DIR_OR_APK") |
||||
parser.add_argument("pg_map_id", metavar="PG_MAP_ID") |
||||
args = parser.parse_args() |
||||
if os.path.isdir(args.input_dir_or_apk): |
||||
fix_pg_map_id(args.input_dir_or_apk, args.output_dir_or_apk, args.pg_map_id) |
||||
else: |
||||
fix_pg_map_id_apk(args.input_dir_or_apk, args.output_dir_or_apk, args.pg_map_id) |
||||
|
||||
# vim: set tw=80 sw=4 sts=4 et fdm=marker : |
@ -0,0 +1,199 @@
@@ -0,0 +1,199 @@
|
||||
#!/usr/bin/python3 |
||||
# encoding: utf-8 |
||||
# SPDX-FileCopyrightText: 2024 FC (Fay) Stegerman <flx@obfusk.net> |
||||
# SPDX-License-Identifier: GPL-3.0-or-later |
||||
|
||||
import argparse |
||||
import os |
||||
import shutil |
||||
import subprocess |
||||
import sys |
||||
import tempfile |
||||
|
||||
from typing import Optional, Tuple |
||||
|
||||
COMMANDS = ( |
||||
"fix-compresslevel", |
||||
"fix-files", |
||||
"fix-newlines", |
||||
"fix-pg-map-id", |
||||
"rm-files", |
||||
"sort-apk", |
||||
"sort-baseline", |
||||
) |
||||
|
||||
BUILD_TOOLS_WITH_BROKEN_ZIPALIGN = ("31.0.0", "32.0.0") |
||||
BUILD_TOOLS_WITH_PAGE_SIZE_FROM = "35.0.0-rc1" |
||||
SDK_ENV = ("ANDROID_HOME", "ANDROID_SDK", "ANDROID_SDK_ROOT") |
||||
|
||||
|
||||
def _zipalign_cmd(page_align: bool, page_size: Optional[int]) -> Tuple[str, ...]: |
||||
if page_align: |
||||
if page_size is not None: |
||||
return ("zipalign", "-P", str(page_size), "4") |
||||
return ("zipalign", "-p", "4") |
||||
return ("zipalign", "4") |
||||
|
||||
|
||||
ZIPALIGN = _zipalign_cmd(page_align=False, page_size=None) |
||||
ZIPALIGN_P = _zipalign_cmd(page_align=True, page_size=None) |
||||
|
||||
|
||||
class Error(RuntimeError): |
||||
pass |
||||
|
||||
|
||||
def inplace_fix(command: str, input_file: str, *args: str, |
||||
zipalign: bool = False, page_align: bool = False, |
||||
page_size: Optional[int] = None, internal: bool = False) -> None: |
||||
if command not in COMMANDS: |
||||
raise Error(f"Unknown command {command}") |
||||
exe, script = _script_cmd(command) |
||||
ext = os.path.splitext(input_file)[1] |
||||
with tempfile.TemporaryDirectory() as tdir: |
||||
fixed = os.path.join(tdir, "fixed" + ext) |
||||
run_command(exe, script, input_file, fixed, *args, trim=2) |
||||
if zipalign: |
||||
aligned = os.path.join(tdir, "aligned" + ext) |
||||
zac = zipalign_cmd(page_align=page_align, page_size=page_size, internal=internal) |
||||
run_command(*zac, fixed, aligned, trim=2) |
||||
print(f"[MOVE] {aligned} to {input_file}") |
||||
shutil.move(aligned, input_file) |
||||
else: |
||||
print(f"[MOVE] {fixed} to {input_file}") |
||||
shutil.move(fixed, input_file) |
||||
|
||||
|
||||
def zipalign_cmd(page_align: bool = False, page_size: Optional[int] = None, |
||||
internal: bool = False) -> Tuple[str, ...]: |
||||
""" |
||||
Find zipalign command using $PATH or $ANDROID_HOME etc. |
||||
|
||||
>>> zipalign_cmd() |
||||
('zipalign', '4') |
||||
>>> zipalign_cmd(page_align=True) |
||||
('zipalign', '-p', '4') |
||||
>>> zipalign_cmd(page_align=True, page_size=16) |
||||
('zipalign', '-P', '16', '4') |
||||
>>> cmd = zipalign_cmd(page_align=True, page_size=16, internal=True) |
||||
>>> [x.split("/")[-1] for x in cmd] |
||||
['python3', 'zipalign.py', '-P', '16', '4'] |
||||
>>> os.environ["PATH"] = "" |
||||
>>> for k in SDK_ENV: |
||||
... os.environ[k] = "" |
||||
>>> cmd = zipalign_cmd() |
||||
>>> [x.split("/")[-1] for x in cmd] |
||||
['python3', 'zipalign.py', '4'] |
||||
>>> os.environ["ANDROID_HOME"] = "test/fake-sdk" |
||||
>>> zipalign_cmd() |
||||
[SKIP BROKEN] 31.0.0 |
||||
[FOUND] test/fake-sdk/build-tools/30.0.3/zipalign |
||||
('test/fake-sdk/build-tools/30.0.3/zipalign', '4') |
||||
>>> cmd = zipalign_cmd(page_align=True, page_size=16) |
||||
[SKIP TOO OLD] 31.0.0 |
||||
[SKIP TOO OLD] 30.0.3 |
||||
[SKIP TOO OLD] 26.0.2 |
||||
>>> [x.split("/")[-1] for x in cmd] |
||||
['python3', 'zipalign.py', '-P', '16', '4'] |
||||
>>> os.environ["ANDROID_HOME"] = "test/fake-sdk-2" |
||||
>>> zipalign_cmd(page_align=True, page_size=16) |
||||
[FOUND] test/fake-sdk-2/build-tools/35.0.0-rc1/zipalign |
||||
('test/fake-sdk-2/build-tools/35.0.0-rc1/zipalign', '-P', '16', '4') |
||||
|
||||
""" |
||||
cmd, *args = _zipalign_cmd(page_align, page_size) |
||||
if not internal: |
||||
if shutil.which(cmd): |
||||
return (cmd, *args) |
||||
for k in SDK_ENV: |
||||
if home := os.environ.get(k): |
||||
tools = os.path.join(home, "build-tools") |
||||
if os.path.exists(tools): |
||||
for vsn in sorted(os.listdir(tools), key=_vsn, reverse=True): |
||||
if page_size and _vsn(vsn) < _vsn(BUILD_TOOLS_WITH_PAGE_SIZE_FROM): |
||||
print(f"[SKIP TOO OLD] {vsn}") |
||||
continue |
||||
for s in BUILD_TOOLS_WITH_BROKEN_ZIPALIGN: |
||||
if vsn.startswith(s): |
||||
print(f"[SKIP BROKEN] {vsn}") |
||||
break |
||||
else: |
||||
c = os.path.join(tools, vsn, cmd) |
||||
if shutil.which(c): |
||||
print(f"[FOUND] {c}") |
||||
return (c, *args) |
||||
return (*_script_cmd(cmd), *args) |
||||
|
||||
|
||||
def _vsn(v: str) -> Tuple[int, ...]: |
||||
""" |
||||
>>> vs = "31.0.0 32.1.0-rc1 34.0.0-rc3 34.0.0 35.0.0-rc1".split() |
||||
>>> for v in sorted(vs, key=_vsn, reverse=True): |
||||
... (_vsn(v), v) |
||||
((35, 0, 0, 0, 1), '35.0.0-rc1') |
||||
((34, 0, 0, 1, 0), '34.0.0') |
||||
((34, 0, 0, 0, 3), '34.0.0-rc3') |
||||
((32, 1, 0, 0, 1), '32.1.0-rc1') |
||||
((31, 0, 0, 1, 0), '31.0.0') |
||||
""" |
||||
if "-rc" in v: |
||||
v = v.replace("-rc", ".0.", 1) |
||||
else: |
||||
v = v + ".1.0" |
||||
return tuple(int(x) if x.isdigit() else -1 for x in v.split(".")) |
||||
|
||||
|
||||
def _script_cmd(command: str) -> Tuple[str, str]: |
||||
script_dir = os.path.dirname(__file__) |
||||
for cmd in (command, command.replace("-", "_")): |
||||
script = os.path.join(script_dir, cmd + ".py") |
||||
if os.path.exists(script): |
||||
break |
||||
else: |
||||
raise Error(f"Script for {command} not found") |
||||
exe = sys.executable or "python3" |
||||
return exe, script |
||||
|
||||
|
||||
def run_command(*args: str, trim: int = 1) -> None: |
||||
targs = tuple(os.path.basename(a) for a in args[:trim]) + args[trim:] |
||||
print(f"[RUN] {' '.join(targs)}") |
||||
try: |
||||
subprocess.run(args, check=True) |
||||
except subprocess.CalledProcessError as e: |
||||
raise Error(f"{args[0]} command failed") from e |
||||
except FileNotFoundError as e: |
||||
raise Error(f"{args[0]} command not found") from e |
||||
|
||||
|
||||
def main() -> None: |
||||
prog = os.path.basename(sys.argv[0]) |
||||
usage = (f"{prog} [-h] [--zipalign] [--page-align] [--page-size N] [--internal]\n" |
||||
f"{len('usage: ' + prog) * ' '} COMMAND INPUT_FILE [...]") |
||||
epilog = f"Commands: {', '.join(COMMANDS)}." |
||||
parser = argparse.ArgumentParser(usage=usage, epilog=epilog) |
||||
parser.add_argument("--zipalign", action="store_true", |
||||
help="run zipalign after COMMAND") |
||||
parser.add_argument("--page-align", action="store_true", |
||||
help="run zipalign w/ -p option (implies --zipalign)") |
||||
parser.add_argument("--page-size", metavar="N", type=int, |
||||
help="run zipalign w/ -P N option (implies --page-align)") |
||||
parser.add_argument("--internal", action="store_true", |
||||
help="use zipalign.py instead of searching $PATH/$ANDROID_HOME/etc.") |
||||
parser.add_argument("command", metavar="COMMAND") |
||||
parser.add_argument("input_file", metavar="INPUT_FILE") |
||||
args, rest = parser.parse_known_args() |
||||
try: |
||||
inplace_fix(args.command, args.input_file, *rest, |
||||
zipalign=bool(args.zipalign or args.page_align or args.page_size), |
||||
page_align=bool(args.page_align or args.page_size), |
||||
page_size=args.page_size, internal=args.internal) |
||||
except Error as e: |
||||
print(f"Error: {e}.", file=sys.stderr) |
||||
sys.exit(1) |
||||
|
||||
|
||||
if __name__ == "__main__": |
||||
main() |
||||
|
||||
# vim: set tw=80 sw=4 sts=4 et fdm=marker : |
Loading…
Reference in new issue