misc.python.materialize.git
Git utilities.
1# Copyright Materialize, Inc. and contributors. All rights reserved. 2# 3# Use of this software is governed by the Business Source License 4# included in the LICENSE file at the root of this repository. 5# 6# As of the Change Date specified in that file, in accordance with 7# the Business Source License, use of this software will be governed 8# by the Apache License, Version 2.0. 9 10"""Git utilities.""" 11 12import functools 13import subprocess 14import sys 15from pathlib import Path 16from typing import TypeVar 17 18from materialize import spawn 19from materialize.mz_version import MzVersion, TypedVersionBase 20from materialize.util import YesNoOnce 21 22VERSION_TYPE = TypeVar("VERSION_TYPE", bound=TypedVersionBase) 23 24MATERIALIZE_REMOTE_URL = "https://github.com/MaterializeInc/materialize" 25 26fetched_tags_in_remotes: set[str | None] = set() 27 28 29def rev_count(rev: str) -> int: 30 """Count the commits up to a revision. 31 32 Args: 33 rev: A Git revision in any format know to the Git CLI. 34 35 Returns: 36 count: The number of commits in the Git repository starting from the 37 initial commit and ending with the specified commit, inclusive. 38 """ 39 return int(spawn.capture(["git", "rev-list", "--count", rev, "--"]).strip()) 40 41 42def rev_parse(rev: str, *, abbrev: bool = False) -> str: 43 """Compute the hash for a revision. 44 45 Args: 46 rev: A Git revision in any format known to the Git CLI. 47 abbrev: Return a branch or tag name instead of a git sha 48 49 Returns: 50 ref: A 40 character hex-encoded SHA-1 hash representing the ID of the 51 named revision in Git's object database. 52 53 With "abbrev=True" this will return an abbreviated ref, or throw an 54 error if there is no abbrev. 55 """ 56 a = ["--abbrev-ref"] if abbrev else [] 57 out = spawn.capture(["git", "rev-parse", *a, "--verify", rev]).strip() 58 if not out: 59 raise RuntimeError(f"No parsed rev for {rev}") 60 return out 61 62 63@functools.cache 64def expand_globs(root: Path, *specs: Path | str) -> set[str]: 65 """Find unignored files within the specified paths.""" 66 # The goal here is to find all files in the working tree that are not 67 # ignored by .gitignore. Naively using `git ls-files` doesn't work, because 68 # it reports files that have been deleted in the working tree if they are 69 # still present in the index. Using `os.walkdir` doesn't work because there 70 # is no good way to evaluate .gitignore rules from Python. So we use a 71 # combination of `git diff` and `git ls-files`. 72 73 # `git diff` against the empty tree surfaces all tracked files that have 74 # not been deleted. 75 empty_tree = ( 76 "4b825dc642cb6eb9a060e54bf8d69288fbee4904" # git hash-object -t tree /dev/null 77 ) 78 diff_files = spawn.capture( 79 ["git", "diff", "--name-only", "-z", "--relative", empty_tree, "--", *specs], 80 cwd=root, 81 ) 82 83 # `git ls-files --others --exclude-standard` surfaces any non-ignored, 84 # untracked files, which are not included in the `git diff` output above. 85 ls_files = spawn.capture( 86 ["git", "ls-files", "--others", "--exclude-standard", "-z", "--", *specs], 87 cwd=root, 88 ) 89 90 return set(f for f in (diff_files + ls_files).split("\0") if f.strip() != "") 91 92 93def get_version_tags( 94 *, 95 version_type: type[VERSION_TYPE], 96 newest_first: bool = True, 97 fetch: bool = True, 98 remote_url: str = MATERIALIZE_REMOTE_URL, 99) -> list[VERSION_TYPE]: 100 """List all the version-like tags in the repo 101 102 Args: 103 fetch: If false, don't automatically run `git fetch --tags`. 104 prefix: A prefix to strip from each tag before attempting to parse the 105 tag as a version. 106 """ 107 if fetch: 108 _fetch( 109 remote=get_remote(remote_url), 110 include_tags=YesNoOnce.ONCE, 111 force=True, 112 only_tags=True, 113 ) 114 tags = [] 115 for t in spawn.capture(["git", "tag"]).splitlines(): 116 if not t.startswith(version_type.get_prefix()): 117 continue 118 try: 119 tags.append(version_type.parse(t)) 120 except ValueError as e: 121 print(f"WARN: {e}", file=sys.stderr) 122 123 return sorted(tags, reverse=newest_first) 124 125 126def get_latest_version( 127 version_type: type[VERSION_TYPE], excluded_versions: set[VERSION_TYPE] | None = None 128) -> VERSION_TYPE: 129 all_version_tags: list[VERSION_TYPE] = get_version_tags( 130 version_type=version_type, fetch=True 131 ) 132 133 if excluded_versions is not None: 134 all_version_tags = [v for v in all_version_tags if v not in excluded_versions] 135 136 return max(all_version_tags) 137 138 139def get_tags_of_current_commit(include_tags: YesNoOnce = YesNoOnce.ONCE) -> list[str]: 140 if include_tags: 141 fetch(include_tags=include_tags, only_tags=True) 142 143 result = spawn.capture(["git", "tag", "--points-at", "HEAD"]) 144 145 if len(result) == 0: 146 return [] 147 148 return result.splitlines() 149 150 151def is_ancestor(earlier: str, later: str) -> bool: 152 """True if earlier is in an ancestor of later""" 153 try: 154 spawn.capture(["git", "merge-base", "--is-ancestor", earlier, later]) 155 except subprocess.CalledProcessError: 156 return False 157 return True 158 159 160def is_dirty() -> bool: 161 """Check if the working directory has modifications to tracked files""" 162 proc = subprocess.run("git diff --no-ext-diff --quiet --exit-code".split()) 163 idx = subprocess.run("git diff --cached --no-ext-diff --quiet --exit-code".split()) 164 return proc.returncode != 0 or idx.returncode != 0 165 166 167def first_remote_matching(pattern: str) -> str | None: 168 """Get the name of the remote that matches the pattern""" 169 remotes = spawn.capture(["git", "remote", "-v"]) 170 for remote in remotes.splitlines(): 171 if pattern in remote: 172 return remote.split()[0] 173 174 return None 175 176 177def describe() -> str: 178 """Describe the relationship between the current commit and the most recent tag""" 179 return spawn.capture(["git", "describe"]).strip() 180 181 182def fetch( 183 remote: str | None = None, 184 all_remotes: bool = False, 185 include_tags: YesNoOnce = YesNoOnce.NO, 186 force: bool = False, 187 branch: str | None = None, 188 only_tags: bool = False, 189 include_submodules: bool = False, 190) -> str: 191 """Fetch from remotes""" 192 193 if remote is not None and all_remotes: 194 raise RuntimeError("all_remotes must be false when a remote is specified") 195 196 if branch is not None and remote is None: 197 raise RuntimeError("remote must be specified when a branch is specified") 198 199 if branch is not None and only_tags: 200 raise RuntimeError("branch must not be specified if only_tags is set") 201 202 command = ["git", "fetch"] 203 204 if remote: 205 command.append(remote) 206 207 if branch: 208 command.append(branch) 209 210 if all_remotes: 211 command.append("--all") 212 213 # explicitly specify both cases to be independent of the git config 214 if include_submodules: 215 command.append("--recurse-submodules") 216 else: 217 command.append("--no-recurse-submodules") 218 219 fetch_tags = ( 220 include_tags == YesNoOnce.YES 221 # fetch tags again if used with force (tags might have changed) 222 or (include_tags == YesNoOnce.ONCE and force) 223 or ( 224 include_tags == YesNoOnce.ONCE 225 and remote not in fetched_tags_in_remotes 226 and "*" not in fetched_tags_in_remotes 227 ) 228 ) 229 230 if fetch_tags: 231 command.append("--tags") 232 233 if force: 234 command.append("--force") 235 236 if not fetch_tags and only_tags: 237 return "" 238 239 output = spawn.capture(command).strip() 240 241 if fetch_tags: 242 fetched_tags_in_remotes.add(remote) 243 244 if all_remotes: 245 fetched_tags_in_remotes.add("*") 246 247 return output 248 249 250_fetch = fetch # renamed because an argument shadows the fetch name in get_tags 251 252 253def try_get_remote_name_by_url(url: str) -> str | None: 254 result = spawn.capture(["git", "remote", "--verbose"]) 255 for line in result.splitlines(): 256 remote, desc = line.split("\t") 257 if desc.lower() in (f"{url} (fetch)".lower(), f"{url}.git (fetch)".lower()): 258 return remote 259 return None 260 261 262def get_remote( 263 url: str = MATERIALIZE_REMOTE_URL, 264 default_remote_name: str = "origin", 265) -> str: 266 # Alternative syntax 267 remote = try_get_remote_name_by_url(url) or try_get_remote_name_by_url( 268 url.replace("https://github.com/", "git@github.com:") 269 ) 270 if not remote: 271 remote = default_remote_name 272 print(f"Remote for URL {url} not found, using {remote}") 273 274 return remote 275 276 277def get_common_ancestor_commit(remote: str, branch: str, fetch_branch: bool) -> str: 278 if fetch_branch: 279 fetch(remote=remote, branch=branch) 280 281 command = ["git", "merge-base", "HEAD", f"{remote}/{branch}"] 282 return spawn.capture(command).strip() 283 284 285def is_on_release_version() -> bool: 286 git_tags = get_tags_of_current_commit() 287 return any(MzVersion.is_valid_version_string(git_tag) for git_tag in git_tags) 288 289 290def contains_commit( 291 commit_sha: str, 292 target: str = "HEAD", 293 fetch: bool = False, 294 remote_url: str = MATERIALIZE_REMOTE_URL, 295) -> bool: 296 if fetch: 297 remote = get_remote(remote_url) 298 _fetch(remote=remote) 299 target = f"{remote}/{target}" 300 command = ["git", "merge-base", "--is-ancestor", commit_sha, target] 301 return_code = spawn.run_and_get_return_code(command) 302 return return_code == 0 303 304 305def get_tagged_release_version(version_type: type[VERSION_TYPE]) -> VERSION_TYPE | None: 306 """ 307 This returns the release version if exactly this commit is tagged. 308 If multiple release versions are present, the highest one will be returned. 309 None will be returned if the commit is not tagged. 310 """ 311 git_tags = get_tags_of_current_commit() 312 313 versions: list[VERSION_TYPE] = [] 314 315 for git_tag in git_tags: 316 if version_type.is_valid_version_string(git_tag): 317 versions.append(version_type.parse(git_tag)) 318 319 if len(versions) == 0: 320 return None 321 322 if len(versions) > 1: 323 print( 324 "Warning! Commit is tagged with multiple release versions! Returning the highest." 325 ) 326 327 return max(versions) 328 329 330def get_commit_message(commit_sha: str) -> str | None: 331 try: 332 command = ["git", "log", "-1", "--pretty=format:%s", commit_sha] 333 return spawn.capture(command, stderr=subprocess.DEVNULL).strip() 334 except subprocess.CalledProcessError: 335 # Sometimes mz_version() will report a Git SHA that is not available 336 # in the current repository 337 return None 338 339 340def get_branch_name() -> str: 341 """This may not work on Buildkite; consider using the same function from build_context.""" 342 command = ["git", "branch", "--show-current"] 343 return spawn.capture(command).strip() 344 345 346# Work tree mutation 347 348 349def create_branch(name: str) -> None: 350 spawn.runv(["git", "checkout", "-b", name]) 351 352 353def checkout(rev: str, path: str | None = None) -> None: 354 """Git checkout the rev""" 355 cmd = ["git", "checkout", rev] 356 if path: 357 cmd.extend(["--", path]) 358 spawn.runv(cmd) 359 360 361def add_file(file: str) -> None: 362 """Git add a file""" 363 spawn.runv(["git", "add", file]) 364 365 366def commit_all_changed(message: str) -> None: 367 """Commit all changed files with the given message""" 368 spawn.runv(["git", "commit", "-a", "-m", message]) 369 370 371def tag_annotated(tag: str) -> None: 372 """Create an annotated tag on HEAD""" 373 spawn.runv(["git", "tag", "-a", "-m", tag, tag])
30def rev_count(rev: str) -> int: 31 """Count the commits up to a revision. 32 33 Args: 34 rev: A Git revision in any format know to the Git CLI. 35 36 Returns: 37 count: The number of commits in the Git repository starting from the 38 initial commit and ending with the specified commit, inclusive. 39 """ 40 return int(spawn.capture(["git", "rev-list", "--count", rev, "--"]).strip())
Count the commits up to a revision.
Args: rev: A Git revision in any format know to the Git CLI.
Returns: count: The number of commits in the Git repository starting from the initial commit and ending with the specified commit, inclusive.
43def rev_parse(rev: str, *, abbrev: bool = False) -> str: 44 """Compute the hash for a revision. 45 46 Args: 47 rev: A Git revision in any format known to the Git CLI. 48 abbrev: Return a branch or tag name instead of a git sha 49 50 Returns: 51 ref: A 40 character hex-encoded SHA-1 hash representing the ID of the 52 named revision in Git's object database. 53 54 With "abbrev=True" this will return an abbreviated ref, or throw an 55 error if there is no abbrev. 56 """ 57 a = ["--abbrev-ref"] if abbrev else [] 58 out = spawn.capture(["git", "rev-parse", *a, "--verify", rev]).strip() 59 if not out: 60 raise RuntimeError(f"No parsed rev for {rev}") 61 return out
Compute the hash for a revision.
Args: rev: A Git revision in any format known to the Git CLI. abbrev: Return a branch or tag name instead of a git sha
Returns: ref: A 40 character hex-encoded SHA-1 hash representing the ID of the named revision in Git's object database.
With "abbrev=True" this will return an abbreviated ref, or throw an
error if there is no abbrev.
64@functools.cache 65def expand_globs(root: Path, *specs: Path | str) -> set[str]: 66 """Find unignored files within the specified paths.""" 67 # The goal here is to find all files in the working tree that are not 68 # ignored by .gitignore. Naively using `git ls-files` doesn't work, because 69 # it reports files that have been deleted in the working tree if they are 70 # still present in the index. Using `os.walkdir` doesn't work because there 71 # is no good way to evaluate .gitignore rules from Python. So we use a 72 # combination of `git diff` and `git ls-files`. 73 74 # `git diff` against the empty tree surfaces all tracked files that have 75 # not been deleted. 76 empty_tree = ( 77 "4b825dc642cb6eb9a060e54bf8d69288fbee4904" # git hash-object -t tree /dev/null 78 ) 79 diff_files = spawn.capture( 80 ["git", "diff", "--name-only", "-z", "--relative", empty_tree, "--", *specs], 81 cwd=root, 82 ) 83 84 # `git ls-files --others --exclude-standard` surfaces any non-ignored, 85 # untracked files, which are not included in the `git diff` output above. 86 ls_files = spawn.capture( 87 ["git", "ls-files", "--others", "--exclude-standard", "-z", "--", *specs], 88 cwd=root, 89 ) 90 91 return set(f for f in (diff_files + ls_files).split("\0") if f.strip() != "")
Find unignored files within the specified paths.
127def get_latest_version( 128 version_type: type[VERSION_TYPE], excluded_versions: set[VERSION_TYPE] | None = None 129) -> VERSION_TYPE: 130 all_version_tags: list[VERSION_TYPE] = get_version_tags( 131 version_type=version_type, fetch=True 132 ) 133 134 if excluded_versions is not None: 135 all_version_tags = [v for v in all_version_tags if v not in excluded_versions] 136 137 return max(all_version_tags)
152def is_ancestor(earlier: str, later: str) -> bool: 153 """True if earlier is in an ancestor of later""" 154 try: 155 spawn.capture(["git", "merge-base", "--is-ancestor", earlier, later]) 156 except subprocess.CalledProcessError: 157 return False 158 return True
True if earlier is in an ancestor of later
161def is_dirty() -> bool: 162 """Check if the working directory has modifications to tracked files""" 163 proc = subprocess.run("git diff --no-ext-diff --quiet --exit-code".split()) 164 idx = subprocess.run("git diff --cached --no-ext-diff --quiet --exit-code".split()) 165 return proc.returncode != 0 or idx.returncode != 0
Check if the working directory has modifications to tracked files
168def first_remote_matching(pattern: str) -> str | None: 169 """Get the name of the remote that matches the pattern""" 170 remotes = spawn.capture(["git", "remote", "-v"]) 171 for remote in remotes.splitlines(): 172 if pattern in remote: 173 return remote.split()[0] 174 175 return None
Get the name of the remote that matches the pattern
178def describe() -> str: 179 """Describe the relationship between the current commit and the most recent tag""" 180 return spawn.capture(["git", "describe"]).strip()
Describe the relationship between the current commit and the most recent tag
183def fetch( 184 remote: str | None = None, 185 all_remotes: bool = False, 186 include_tags: YesNoOnce = YesNoOnce.NO, 187 force: bool = False, 188 branch: str | None = None, 189 only_tags: bool = False, 190 include_submodules: bool = False, 191) -> str: 192 """Fetch from remotes""" 193 194 if remote is not None and all_remotes: 195 raise RuntimeError("all_remotes must be false when a remote is specified") 196 197 if branch is not None and remote is None: 198 raise RuntimeError("remote must be specified when a branch is specified") 199 200 if branch is not None and only_tags: 201 raise RuntimeError("branch must not be specified if only_tags is set") 202 203 command = ["git", "fetch"] 204 205 if remote: 206 command.append(remote) 207 208 if branch: 209 command.append(branch) 210 211 if all_remotes: 212 command.append("--all") 213 214 # explicitly specify both cases to be independent of the git config 215 if include_submodules: 216 command.append("--recurse-submodules") 217 else: 218 command.append("--no-recurse-submodules") 219 220 fetch_tags = ( 221 include_tags == YesNoOnce.YES 222 # fetch tags again if used with force (tags might have changed) 223 or (include_tags == YesNoOnce.ONCE and force) 224 or ( 225 include_tags == YesNoOnce.ONCE 226 and remote not in fetched_tags_in_remotes 227 and "*" not in fetched_tags_in_remotes 228 ) 229 ) 230 231 if fetch_tags: 232 command.append("--tags") 233 234 if force: 235 command.append("--force") 236 237 if not fetch_tags and only_tags: 238 return "" 239 240 output = spawn.capture(command).strip() 241 242 if fetch_tags: 243 fetched_tags_in_remotes.add(remote) 244 245 if all_remotes: 246 fetched_tags_in_remotes.add("*") 247 248 return output
Fetch from remotes
254def try_get_remote_name_by_url(url: str) -> str | None: 255 result = spawn.capture(["git", "remote", "--verbose"]) 256 for line in result.splitlines(): 257 remote, desc = line.split("\t") 258 if desc.lower() in (f"{url} (fetch)".lower(), f"{url}.git (fetch)".lower()): 259 return remote 260 return None
263def get_remote( 264 url: str = MATERIALIZE_REMOTE_URL, 265 default_remote_name: str = "origin", 266) -> str: 267 # Alternative syntax 268 remote = try_get_remote_name_by_url(url) or try_get_remote_name_by_url( 269 url.replace("https://github.com/", "git@github.com:") 270 ) 271 if not remote: 272 remote = default_remote_name 273 print(f"Remote for URL {url} not found, using {remote}") 274 275 return remote
291def contains_commit( 292 commit_sha: str, 293 target: str = "HEAD", 294 fetch: bool = False, 295 remote_url: str = MATERIALIZE_REMOTE_URL, 296) -> bool: 297 if fetch: 298 remote = get_remote(remote_url) 299 _fetch(remote=remote) 300 target = f"{remote}/{target}" 301 command = ["git", "merge-base", "--is-ancestor", commit_sha, target] 302 return_code = spawn.run_and_get_return_code(command) 303 return return_code == 0
306def get_tagged_release_version(version_type: type[VERSION_TYPE]) -> VERSION_TYPE | None: 307 """ 308 This returns the release version if exactly this commit is tagged. 309 If multiple release versions are present, the highest one will be returned. 310 None will be returned if the commit is not tagged. 311 """ 312 git_tags = get_tags_of_current_commit() 313 314 versions: list[VERSION_TYPE] = [] 315 316 for git_tag in git_tags: 317 if version_type.is_valid_version_string(git_tag): 318 versions.append(version_type.parse(git_tag)) 319 320 if len(versions) == 0: 321 return None 322 323 if len(versions) > 1: 324 print( 325 "Warning! Commit is tagged with multiple release versions! Returning the highest." 326 ) 327 328 return max(versions)
This returns the release version if exactly this commit is tagged. If multiple release versions are present, the highest one will be returned. None will be returned if the commit is not tagged.
331def get_commit_message(commit_sha: str) -> str | None: 332 try: 333 command = ["git", "log", "-1", "--pretty=format:%s", commit_sha] 334 return spawn.capture(command, stderr=subprocess.DEVNULL).strip() 335 except subprocess.CalledProcessError: 336 # Sometimes mz_version() will report a Git SHA that is not available 337 # in the current repository 338 return None
341def get_branch_name() -> str: 342 """This may not work on Buildkite; consider using the same function from build_context.""" 343 command = ["git", "branch", "--show-current"] 344 return spawn.capture(command).strip()
This may not work on Buildkite; consider using the same function from build_context.
354def checkout(rev: str, path: str | None = None) -> None: 355 """Git checkout the rev""" 356 cmd = ["git", "checkout", rev] 357 if path: 358 cmd.extend(["--", path]) 359 spawn.runv(cmd)
Git checkout the rev
Git add a file
367def commit_all_changed(message: str) -> None: 368 """Commit all changed files with the given message""" 369 spawn.runv(["git", "commit", "-a", "-m", message])
Commit all changed files with the given message
372def tag_annotated(tag: str) -> None: 373 """Create an annotated tag on HEAD""" 374 spawn.runv(["git", "tag", "-a", "-m", tag, tag])
Create an annotated tag on HEAD