#!/usr/bin/env python3 """ The main build script for CC: Tweaked. This sets up work trees for each branch, ensures they are up-to-date with the remotes and then merges the primary branch (mc-1.15.x) into each other branch. We then build each branch, push changes, and upload all versions. """ import argparse import dataclasses import enum import http.client import itertools import json import os import os.path import subprocess import sys import urllib import urllib.request import urllib.response from typing import Any, TypedDict, assert_never @dataclasses.dataclass(frozen=True, slots=True) class Branch: name: str parent: str | None = None BRANCHES: list[Branch] = [ Branch("mc-1.20.x"), Branch("mc-1.21.x", parent="mc-1.20.x"), Branch("mc-1.21.y", parent="mc-1.21.x"), ] def log(msg: str, *args) -> None: """Print a format string to the console""" print("\033[32m" + msg % tuple(args) + "\033[0m") def git_in(branch: Branch, *cmd: str, **kwargs) -> subprocess.CompletedProcess: """Run a git command on a specific branch.""" return subprocess.run(["git", "-C", branch.name, *cmd], **kwargs) def run_in(branch: Branch, *cmd: str, **kwargs) -> subprocess.CompletedProcess: """Run a command within the context of a specific branch.""" return subprocess.run(cmd, cwd=branch.name, **kwargs) def cmd_output(*cmd: str) -> str: """Run a command, returning its output as a string.""" return subprocess.check_output(cmd, encoding="utf-8") class Change(enum.Enum): ADD = enum.auto() REMOVE = enum.auto() CHANGE = enum.auto() def diff_dicts[K, V](old: dict[K, V], new: dict[K, V]) -> dict[K, Change]: """Compute the difference between two dictionaries.""" changes: dict[K, Change] = {} # Short circuit if there are no changes if old == new: return changes for key, old_value in old.items(): if key not in new: changes[key] = Change.REMOVE elif old_value != new[key]: changes[key] = Change.CHANGE for key in new: if key not in old: changes[key] = Change.ADD return changes ################################################################################ # Crowdin Translation support ################################################################################ CROWDIN_PROJECT = 710005 EN_US = "projects/common/src/generated/resources/assets/computercraft/lang/en_us.json" class CrowdinResponse[T](TypedDict): """ The root Crowdin response. This includes the actual response data, and some (currently ignored) pagination info. """ data: T pagination: object class CrowdinData[T](TypedDict): data: T class ProjectFile(TypedDict): """A file in a Crowdin project.""" id: int name: str path: str revisionId: int class FileDownloadResult(TypedDict): """A link to downloading a file in the project.""" url: str class StorageUploadResult(TypedDict): id: int def request_json(request: str | urllib.request.Request) -> Any: """Make a HTTP request, and then decode the response as JSON.""" response: http.client.HTTPResponse with urllib.request.urlopen(request) as response: if response.status not in (200, 201): url = request if isinstance(request, str) else request.full_url raise ValueError(f"Bad response to {url}: {response.status} {response.reason}") return json.load(response) def request_crowdin[ T ]( path: str, ty: type[T], *, method: str = "GET", body: str | None = None, headers: dict[str, str] | None = None ) -> CrowdinResponse[T]: """Make a request to the Crowdin API""" request = urllib.request.Request( f"https://api.crowdin.com/api/v2{path}", data=body.encode() if body is not None else None, method=method, headers={"Accept": "application/json"}, ) if (token := os.getenv("CROWDIN_TOKEN")) is not None: request.add_header("Authorization", f"Bearer {token}") else: log("No crowdin token available") sys.exit(1) if headers is not None: request.headers.update(headers) return request_json(request) class TranslationLoader: """ Loads translations across multiple branches. Different branches of CC: Tweaked may have different translations. For instance: - New feature on later versions of the game may require their own translations. - We may change translations on trunk, but have not merged them into other branches. To upload translations (`upload_translations`), we want to find translations that are available in any branches. However, doing a naive union of all branches is a little over-eager, as it does not account for the case where translations have been deleted but not yet merged. Instead, we simulate the result of a `git merge` on each branch, and *then* take the union of all translations. This is definitely overkill, but I can't stop myself! """ def __init__(self) -> None: self._ref_contents: dict[str, dict[str, str]] = {} """A map of git references to their file contents.""" self._branch_contents: dict[str, dict[str, str]] = {} """A map of branches to their effective contents.""" def _get_ref_contents(self, ref: str) -> dict[str, str]: if (content := self._ref_contents.get(ref)) is not None: return content content = self._ref_contents[ref] = json.loads(cmd_output("git", "show", f"{ref}:{EN_US}")) return content def get_effective_translation(self, branch: Branch) -> dict[str, str]: """Simulate a merge on a branch, and compute the resulting translations after that merge.""" if (content := self._branch_contents.get(branch.name)) is not None: return content content = self._branch_contents[branch.name] = self._get_effective_translation(branch) return content def _get_effective_translation(self, branch: Branch) -> dict[str, str]: current: dict[str, str] = self._get_ref_contents(branch.name) if branch.parent is None: return current base_ref = cmd_output("git", "merge-base", branch.parent, branch.name).strip() base: dict[str, str] = self._get_ref_contents(base_ref) parent = self._branch_contents[branch.parent] differences = diff_dicts(base, current) # Apply the diff between the base and parent. for key, change in diff_dicts(base, parent).items(): if key in differences: log(f"Warning '{key}' has changed in both {branch.parent} and {branch}") match change: case Change.ADD | Change.CHANGE: current[key] = parent[key] case Change.REMOVE: del current[key] case _: assert_never(change) return current def upload_translations() -> None: """ Upload our en_us.json translations to Crowdin. """ # Fetch the current translations from Crowdin. raw_files = request_crowdin(f"/projects/{CROWDIN_PROJECT}/files", list[CrowdinData[ProjectFile]])["data"] files = [file["data"] for file in raw_files if file["data"]["name"] == "en_us.json"] if len(files) != 1: log(f"Found {len(files)} matching translations") sys.exit(1) remote_dl = request_crowdin(f"/projects/{CROWDIN_PROJECT}/files/{files[0]["id"]}/download", FileDownloadResult) remote_strings: dict[str, str] = request_json(remote_dl["data"]["url"]) # Compute our local translations across our branches. local_strings: dict[str, str] = {} translations = TranslationLoader() for branch in BRANCHES: for key, value in translations.get_effective_translation(branch).items(): if (current := local_strings.get(key)) is not None and current != value: log(f"Warning '{key}' is translated as:") log(f" - {current!r}") log(f" - {value!r}") else: local_strings[key] = value # Take the difference diff = diff_dicts(remote_strings, local_strings) if len(diff) == 0: log("No changes found. Exiting.") return for key, change in diff.items(): match change: case Change.ADD: print(f"\033[32m+ {key}: {local_strings[key]!r}\033[0m") case Change.REMOVE: print(f"\033[31m- {key}: {remote_strings[key]!r}\033[0m") case Change.CHANGE: print(f"\033[31m- {key}: {remote_strings[key]!r}\033[0m") print(f"\033[32m+ {key}: {local_strings[key]!r}\033[0m") check = input("Upload the above translations? [y/N]").lower().strip() if check != "y": sys.exit(1) # And upload. storage = request_crowdin( "/storages", StorageUploadResult, method="POST", body=json.dumps(local_strings, indent=4) + "\n", headers={"Crowdin-API-FileName": "en_us.json", "Content-Type": "application/json"}, )["data"]["id"] request_crowdin( f"/projects/{CROWDIN_PROJECT}/files/{files[0]["id"]}", object, method="PUT", body=json.dumps({"storageId": storage}), headers={"Content-Type": "application/json"}, ) log("Translations updated!") ################################################################################ # Git and Gradle Commands ################################################################################ def setup() -> None: """ Setup the repository suitable for working with multiple versions. """ # Update git remote. log("Updating from remotes") subprocess.check_call(["git", "fetch", "origin"]) # Setup git repository. for branch in BRANCHES: if not os.path.isdir(branch.name): log(f"Creating worktree for {branch.name}") subprocess.check_call(["git", "worktree", "add", branch.name, branch.name]) def check_git() -> None: """ Check all worktrees are in a sensible state prior to merging. """ setup() # Ensure every worktree is on the right branch, has no uncommited changes and is # up-to-date with the remote. ok = True for branch in BRANCHES: status = git_in(branch, "status", "--porcelain", check=True, stdout=subprocess.PIPE).stdout if len(status.strip()) > 0: log(f"{branch.name} has changes. Build will not continue.") ok = False continue actual_branch = ( git_in(branch, "rev-parse", "--abbrev-ref", "HEAD", check=True, stdout=subprocess.PIPE) .stdout.decode("utf-8") .strip() ) if actual_branch != branch.name: log(f"{branch.name} is actually on {actual_branch} right now") ok = False continue if git_in(branch, "merge-base", "--is-ancestor", f"origin/{branch.name}", branch.name).returncode != 0: log(f"{branch.name} is not up-to-date with remote.") ok = False continue if not ok: sys.exit(1) def build() -> None: """ Merge in parent branches, then build all branches. """ check_git() # Merge each branch into the next one. for branch in BRANCHES: if ( branch.parent is not None and git_in(branch, "merge-base", "--is-ancestor", branch.parent, branch.name).returncode != 0 ): log(f"{branch.name} is not up-to-date with {branch.parent}.") ret = git_in(branch, "merge", "--no-edit", branch.parent).returncode if ret != 0: log(f"Merge {branch.parent} -> {branch.name} failed. Aborting.") sys.exit(ret) log("Git state is up-to-date. Preparing to build - you might want to make a cup of tea.") for branch in BRANCHES: log(f"Building {branch.name}") ret = run_in(branch, "./gradlew", "build", "--no-daemon").returncode if ret != 0: log(f"Build failed") sys.exit(ret) def release() -> None: """Publish releases for each version.""" build() check = ( input("Are you sure you want to release? Make sure you've performed some manual checks first! [y/N]") .lower() .strip() ) if check != "y": sys.exit(1) subprocess.check_call(["git", "push", "origin", *(b.name for b in BRANCHES)]) for branch in BRANCHES: log(f"Uploading {branch.name}") ret = run_in(branch, "./gradlew", "publish", "--no-daemon").returncode if ret != 0: log(f"Upload failed. Good luck in recovering from this!") sys.exit(ret) log(f"Finished. Well done, you may now enjoy your tea!") def main() -> None: # Validate arguments. parser = argparse.ArgumentParser(description="Build scripts for CC: Tweaked") subparsers = parser.add_subparsers( description="The subcommand to run. Subcommands implicitly run their dependencies.", dest="subcommand", required=True, ) subparsers.add_parser("setup", help="Setup the git repository and build environment.").set_defaults(func=setup) subparsers.add_parser("check-git", help="Check the git worktrees are in a state ready for merging.").set_defaults( func=check_git ) subparsers.add_parser("build", help="Merge and build all branches.").set_defaults(func=build) subparsers.add_parser("release", help="Publish a release.").set_defaults(func=release) subparsers.add_parser("upload-translations", help="Upload crowdin translations.").set_defaults( func=upload_translations ) parser.parse_args().func() if __name__ == "__main__": main()