misc.python.materialize.buildkite
Buildkite utilities.
1# Copyright Materialize, Inc. and contributors. All rights reserved. 2# 3# Use of this software is governed by the Business Source License 4# included in the LICENSE file at the root of this repository. 5# 6# As of the Change Date specified in that file, in accordance with 7# the Business Source License, use of this software will be governed 8# by the Apache License, Version 2.0. 9 10"""Buildkite utilities.""" 11 12import hashlib 13import os 14from collections.abc import Callable 15from enum import Enum, auto 16from pathlib import Path 17from typing import Any, TypeVar 18 19import yaml 20 21from materialize import git, spawn, ui 22 23T = TypeVar("T") 24 25 26class BuildkiteEnvVar(Enum): 27 # environment 28 BUILDKITE_AGENT_META_DATA_AWS_INSTANCE_TYPE = auto() 29 BUILDKITE_AGENT_META_DATA_INSTANCE_TYPE = auto() 30 31 # build 32 BUILDKITE_PULL_REQUEST = auto() 33 BUILDKITE_BUILD_NUMBER = auto() 34 BUILDKITE_BUILD_ID = auto() 35 BUILDKITE_PIPELINE_DEFAULT_BRANCH = auto() 36 BUILDKITE_PULL_REQUEST_BASE_BRANCH = auto() 37 BUILDKITE_ORGANIZATION_SLUG = auto() 38 BUILDKITE_PIPELINE_SLUG = auto() 39 BUILDKITE_BRANCH = auto() 40 BUILDKITE_COMMIT = auto() 41 BUILDKITE_BUILD_URL = auto() 42 43 # step 44 BUILDKITE_PARALLEL_JOB = auto() 45 BUILDKITE_PARALLEL_JOB_COUNT = auto() 46 BUILDKITE_STEP_KEY = auto() 47 # will be the same for sharded and retried build steps 48 BUILDKITE_STEP_ID = auto() 49 # assumed to be unique 50 BUILDKITE_JOB_ID = auto() 51 BUILDKITE_LABEL = auto() 52 BUILDKITE_RETRY_COUNT = auto() 53 54 55def get_var(var: BuildkiteEnvVar, fallback_value: Any = None) -> Any: 56 return os.getenv(var.name, fallback_value) 57 58 59def is_in_buildkite() -> bool: 60 return ui.env_is_truthy("BUILDKITE") 61 62 63def is_in_pull_request() -> bool: 64 """Note that this is a heuristic.""" 65 66 if not is_in_buildkite(): 67 return False 68 69 if is_pull_request_marker_set(): 70 return True 71 72 if is_on_default_branch(): 73 return False 74 75 if git.is_on_release_version(): 76 return False 77 78 if git.contains_commit("HEAD", "main", fetch=True): 79 return False 80 81 return True 82 83 84def is_pull_request_marker_set() -> bool: 85 # If set, this variable will contain either the ID of the pull request or the string "false". 86 return get_var(BuildkiteEnvVar.BUILDKITE_PULL_REQUEST, "false") != "false" 87 88 89def is_on_default_branch() -> bool: 90 current_branch = get_var(BuildkiteEnvVar.BUILDKITE_BRANCH, "unknown") 91 default_branch = get_var(BuildkiteEnvVar.BUILDKITE_PIPELINE_DEFAULT_BRANCH, "main") 92 return current_branch == default_branch 93 94 95def get_pull_request_base_branch(fallback: str = "main"): 96 return get_var(BuildkiteEnvVar.BUILDKITE_PULL_REQUEST_BASE_BRANCH, fallback) 97 98 99def get_pipeline_default_branch(fallback: str = "main"): 100 return get_var(BuildkiteEnvVar.BUILDKITE_PIPELINE_DEFAULT_BRANCH, fallback) 101 102 103def get_merge_base(url: str = "https://github.com/MaterializeInc/materialize") -> str: 104 base_branch = get_pull_request_base_branch() or get_pipeline_default_branch() 105 merge_base = git.get_common_ancestor_commit( 106 remote=git.get_remote(url), branch=base_branch, fetch_branch=True 107 ) 108 return merge_base 109 110 111def inline_link(url: str, label: str | None = None) -> str: 112 """See https://buildkite.com/docs/pipelines/links-and-images-in-log-output""" 113 link = f"url='{url}'" 114 115 if label: 116 link = f"{link};content='{label}'" 117 118 # These escape codes are not supported by terminals 119 return f"\033]1339;{link}\a" if is_in_buildkite() else f"{label},{url}" 120 121 122def inline_image(url: str, alt: str) -> str: 123 """See https://buildkite.com/docs/pipelines/links-and-images-in-log-output#images-syntax-for-inlining-images""" 124 content = f"url='{url}';alt='{alt}'" 125 # These escape codes are not supported by terminals 126 return f"\033]1338;{content}\a" if is_in_buildkite() else f"{alt},{url}" 127 128 129def find_modified_lines() -> set[tuple[str, int]]: 130 """ 131 Find each line that has been added or modified in the current pull request. 132 """ 133 merge_base = get_merge_base() 134 print(f"Merge base: {merge_base}") 135 result = spawn.capture(["git", "diff", "-U0", merge_base]) 136 137 modified_lines: set[tuple[str, int]] = set() 138 file_path = None 139 for line in result.splitlines(): 140 # +++ b/src/adapter/src/coord/command_handler.rs 141 if line.startswith("+++"): 142 file_path = line.removeprefix("+++ b/") 143 # @@ -641,7 +640,6 @@ impl Coordinator { 144 elif line.startswith("@@ "): 145 # We only care about the second value ("+640,6" in the example), 146 # which contains the line number and length of the modified block 147 # in new code state. 148 parts = line.split(" ")[2] 149 if "," in parts: 150 start, length = map(int, parts.split(",")) 151 else: 152 start = int(parts) 153 length = 1 154 for line_nr in range(start, start + length): 155 assert file_path 156 modified_lines.add((file_path, line_nr)) 157 return modified_lines 158 159 160def upload_artifact(path: Path | str, cwd: Path | None = None, quiet: bool = False): 161 spawn.runv( 162 [ 163 "buildkite-agent", 164 "artifact", 165 "upload", 166 "--log-level", 167 "fatal" if quiet else "notice", 168 path, 169 ], 170 cwd=cwd, 171 ) 172 173 174def get_parallelism_index() -> int: 175 _validate_parallelism_configuration() 176 return int(get_var(BuildkiteEnvVar.BUILDKITE_PARALLEL_JOB, 0)) 177 178 179def get_parallelism_count() -> int: 180 _validate_parallelism_configuration() 181 return int(get_var(BuildkiteEnvVar.BUILDKITE_PARALLEL_JOB_COUNT, 1)) 182 183 184def _accepted_by_shard( 185 identifier: str, 186 parallelism_index: int | None = None, 187 parallelism_count: int | None = None, 188) -> bool: 189 if parallelism_index is None: 190 parallelism_index = get_parallelism_index() 191 if parallelism_count is None: 192 parallelism_count = get_parallelism_count() 193 194 if parallelism_count == 1: 195 return True 196 197 hash_value = int.from_bytes( 198 hashlib.md5(identifier.encode()).digest(), byteorder="big" 199 ) 200 return hash_value % parallelism_count == parallelism_index 201 202 203def _upload_shard_info_metadata(items: list[str]) -> None: 204 label = get_var(BuildkiteEnvVar.BUILDKITE_LABEL) or get_var( 205 BuildkiteEnvVar.BUILDKITE_STEP_KEY 206 ) 207 spawn.runv( 208 ["buildkite-agent", "meta-data", "set", f"Shard for {label}", ", ".join(items)] 209 ) 210 211 212def notify_qa_team_about_failure(failure: str) -> None: 213 if not is_in_buildkite(): 214 return 215 216 label = get_var(BuildkiteEnvVar.BUILDKITE_LABEL) 217 message = f"{label}: {failure}" 218 print(message) 219 pipeline = { 220 "notify": [ 221 { 222 "slack": { 223 "channels": ["#team-testing-bots"], 224 "message": message, 225 }, 226 "if": 'build.state == "passed" || build.state == "failed" || build.state == "canceled"', 227 } 228 ] 229 } 230 spawn.runv( 231 ["buildkite-agent", "pipeline", "upload"], stdin=yaml.dump(pipeline).encode() 232 ) 233 234 235def shard_list(items: list[T], to_identifier: Callable[[T], str]) -> list[T]: 236 if len(items) == 0: 237 return [] 238 239 parallelism_index = get_parallelism_index() 240 parallelism_count = get_parallelism_count() 241 242 if parallelism_count == 1: 243 return items 244 245 accepted_items = [ 246 item 247 for item in items 248 if _accepted_by_shard(to_identifier(item), parallelism_index, parallelism_count) 249 ] 250 251 if is_in_buildkite() and accepted_items: 252 _upload_shard_info_metadata(list(map(to_identifier, accepted_items))) 253 return accepted_items 254 255 256def _validate_parallelism_configuration() -> None: 257 job_index = get_var(BuildkiteEnvVar.BUILDKITE_PARALLEL_JOB) 258 job_count = get_var(BuildkiteEnvVar.BUILDKITE_PARALLEL_JOB_COUNT) 259 260 if job_index is None and job_count is None: 261 # OK 262 return 263 264 job_index_desc = f"${BuildkiteEnvVar.BUILDKITE_PARALLEL_JOB.name} (= '{job_index}')" 265 job_count_desc = ( 266 f"${BuildkiteEnvVar.BUILDKITE_PARALLEL_JOB_COUNT.name} (= '{job_count}')" 267 ) 268 assert ( 269 job_index is not None and job_count is not None 270 ), f"{job_index_desc} and {job_count_desc} need to be either both specified or not specified" 271 272 job_index = int(job_index) 273 job_count = int(job_count) 274 275 assert job_count > 0, f"{job_count_desc} not valid" 276 assert ( 277 0 <= job_index < job_count 278 ), f"{job_index_desc} out of valid range with {job_count_desc}" 279 280 281def truncate_annotation_str(text: str, max_length: int = 900_000) -> str: 282 # 400 Bad Request: The annotation body must be less than 1 MB 283 return text if len(text) <= max_length else text[:max_length] + "..." 284 285 286def get_artifact_url(artifact: dict[str, Any]) -> str: 287 org = get_var(BuildkiteEnvVar.BUILDKITE_ORGANIZATION_SLUG) 288 pipeline = get_var(BuildkiteEnvVar.BUILDKITE_PIPELINE_SLUG) 289 build = get_var(BuildkiteEnvVar.BUILDKITE_BUILD_NUMBER) 290 return f"https://buildkite.com/organizations/{org}/pipelines/{pipeline}/builds/{build}/jobs/{artifact['job_id']}/artifacts/{artifact['id']}" 291 292 293def add_annotation_raw(style: str, markdown: str) -> None: 294 """ 295 Note that this does not trim the data. 296 :param markdown: must not exceed 1 MB 297 """ 298 spawn.runv( 299 [ 300 "buildkite-agent", 301 "annotate", 302 f"--style={style}", 303 f"--context={os.environ['BUILDKITE_JOB_ID']}-{style}", 304 ], 305 stdin=markdown.encode(), 306 ) 307 308 309def add_annotation(style: str, title: str, content: str) -> None: 310 if style == "info": 311 markdown = f"""<details><summary>{title}</summary> 312 313{truncate_annotation_str(content)} 314</details>""" 315 else: 316 markdown = f"""{title} 317 318{truncate_annotation_str(content)}""" 319 add_annotation_raw(style, markdown) 320 321 322def get_job_url_from_build_url(build_url: str, build_job_id: str) -> str: 323 return f"{build_url}#{build_job_id}" 324 325 326def get_job_url_from_pipeline_and_build( 327 pipeline: str, build_number: str | int, build_job_id: str 328) -> str: 329 build_url = f"https://buildkite.com/materialize/{pipeline}/builds/{build_number}" 330 return get_job_url_from_build_url(build_url, build_job_id)
class
BuildkiteEnvVar(enum.Enum):
27class BuildkiteEnvVar(Enum): 28 # environment 29 BUILDKITE_AGENT_META_DATA_AWS_INSTANCE_TYPE = auto() 30 BUILDKITE_AGENT_META_DATA_INSTANCE_TYPE = auto() 31 32 # build 33 BUILDKITE_PULL_REQUEST = auto() 34 BUILDKITE_BUILD_NUMBER = auto() 35 BUILDKITE_BUILD_ID = auto() 36 BUILDKITE_PIPELINE_DEFAULT_BRANCH = auto() 37 BUILDKITE_PULL_REQUEST_BASE_BRANCH = auto() 38 BUILDKITE_ORGANIZATION_SLUG = auto() 39 BUILDKITE_PIPELINE_SLUG = auto() 40 BUILDKITE_BRANCH = auto() 41 BUILDKITE_COMMIT = auto() 42 BUILDKITE_BUILD_URL = auto() 43 44 # step 45 BUILDKITE_PARALLEL_JOB = auto() 46 BUILDKITE_PARALLEL_JOB_COUNT = auto() 47 BUILDKITE_STEP_KEY = auto() 48 # will be the same for sharded and retried build steps 49 BUILDKITE_STEP_ID = auto() 50 # assumed to be unique 51 BUILDKITE_JOB_ID = auto() 52 BUILDKITE_LABEL = auto() 53 BUILDKITE_RETRY_COUNT = auto()
BUILDKITE_AGENT_META_DATA_AWS_INSTANCE_TYPE =
<BuildkiteEnvVar.BUILDKITE_AGENT_META_DATA_AWS_INSTANCE_TYPE: 1>
BUILDKITE_AGENT_META_DATA_INSTANCE_TYPE =
<BuildkiteEnvVar.BUILDKITE_AGENT_META_DATA_INSTANCE_TYPE: 2>
BUILDKITE_PULL_REQUEST =
<BuildkiteEnvVar.BUILDKITE_PULL_REQUEST: 3>
BUILDKITE_BUILD_NUMBER =
<BuildkiteEnvVar.BUILDKITE_BUILD_NUMBER: 4>
BUILDKITE_BUILD_ID =
<BuildkiteEnvVar.BUILDKITE_BUILD_ID: 5>
BUILDKITE_PIPELINE_DEFAULT_BRANCH =
<BuildkiteEnvVar.BUILDKITE_PIPELINE_DEFAULT_BRANCH: 6>
BUILDKITE_PULL_REQUEST_BASE_BRANCH =
<BuildkiteEnvVar.BUILDKITE_PULL_REQUEST_BASE_BRANCH: 7>
BUILDKITE_ORGANIZATION_SLUG =
<BuildkiteEnvVar.BUILDKITE_ORGANIZATION_SLUG: 8>
BUILDKITE_PIPELINE_SLUG =
<BuildkiteEnvVar.BUILDKITE_PIPELINE_SLUG: 9>
BUILDKITE_BRANCH =
<BuildkiteEnvVar.BUILDKITE_BRANCH: 10>
BUILDKITE_COMMIT =
<BuildkiteEnvVar.BUILDKITE_COMMIT: 11>
BUILDKITE_BUILD_URL =
<BuildkiteEnvVar.BUILDKITE_BUILD_URL: 12>
BUILDKITE_PARALLEL_JOB =
<BuildkiteEnvVar.BUILDKITE_PARALLEL_JOB: 13>
BUILDKITE_PARALLEL_JOB_COUNT =
<BuildkiteEnvVar.BUILDKITE_PARALLEL_JOB_COUNT: 14>
BUILDKITE_STEP_KEY =
<BuildkiteEnvVar.BUILDKITE_STEP_KEY: 15>
BUILDKITE_STEP_ID =
<BuildkiteEnvVar.BUILDKITE_STEP_ID: 16>
BUILDKITE_JOB_ID =
<BuildkiteEnvVar.BUILDKITE_JOB_ID: 17>
BUILDKITE_LABEL =
<BuildkiteEnvVar.BUILDKITE_LABEL: 18>
BUILDKITE_RETRY_COUNT =
<BuildkiteEnvVar.BUILDKITE_RETRY_COUNT: 19>
def
is_in_buildkite() -> bool:
def
is_in_pull_request() -> bool:
64def is_in_pull_request() -> bool: 65 """Note that this is a heuristic.""" 66 67 if not is_in_buildkite(): 68 return False 69 70 if is_pull_request_marker_set(): 71 return True 72 73 if is_on_default_branch(): 74 return False 75 76 if git.is_on_release_version(): 77 return False 78 79 if git.contains_commit("HEAD", "main", fetch=True): 80 return False 81 82 return True
Note that this is a heuristic.
def
is_pull_request_marker_set() -> bool:
def
is_on_default_branch() -> bool:
def
get_pull_request_base_branch(fallback: str = 'main'):
def
get_pipeline_default_branch(fallback: str = 'main'):
def
get_merge_base(url: str = 'https://github.com/MaterializeInc/materialize') -> str:
104def get_merge_base(url: str = "https://github.com/MaterializeInc/materialize") -> str: 105 base_branch = get_pull_request_base_branch() or get_pipeline_default_branch() 106 merge_base = git.get_common_ancestor_commit( 107 remote=git.get_remote(url), branch=base_branch, fetch_branch=True 108 ) 109 return merge_base
def
inline_link(url: str, label: str | None = None) -> str:
112def inline_link(url: str, label: str | None = None) -> str: 113 """See https://buildkite.com/docs/pipelines/links-and-images-in-log-output""" 114 link = f"url='{url}'" 115 116 if label: 117 link = f"{link};content='{label}'" 118 119 # These escape codes are not supported by terminals 120 return f"\033]1339;{link}\a" if is_in_buildkite() else f"{label},{url}"
def
inline_image(url: str, alt: str) -> str:
123def inline_image(url: str, alt: str) -> str: 124 """See https://buildkite.com/docs/pipelines/links-and-images-in-log-output#images-syntax-for-inlining-images""" 125 content = f"url='{url}';alt='{alt}'" 126 # These escape codes are not supported by terminals 127 return f"\033]1338;{content}\a" if is_in_buildkite() else f"{alt},{url}"
def
find_modified_lines() -> set[tuple[str, int]]:
130def find_modified_lines() -> set[tuple[str, int]]: 131 """ 132 Find each line that has been added or modified in the current pull request. 133 """ 134 merge_base = get_merge_base() 135 print(f"Merge base: {merge_base}") 136 result = spawn.capture(["git", "diff", "-U0", merge_base]) 137 138 modified_lines: set[tuple[str, int]] = set() 139 file_path = None 140 for line in result.splitlines(): 141 # +++ b/src/adapter/src/coord/command_handler.rs 142 if line.startswith("+++"): 143 file_path = line.removeprefix("+++ b/") 144 # @@ -641,7 +640,6 @@ impl Coordinator { 145 elif line.startswith("@@ "): 146 # We only care about the second value ("+640,6" in the example), 147 # which contains the line number and length of the modified block 148 # in new code state. 149 parts = line.split(" ")[2] 150 if "," in parts: 151 start, length = map(int, parts.split(",")) 152 else: 153 start = int(parts) 154 length = 1 155 for line_nr in range(start, start + length): 156 assert file_path 157 modified_lines.add((file_path, line_nr)) 158 return modified_lines
Find each line that has been added or modified in the current pull request.
def
upload_artifact( path: pathlib.Path | str, cwd: pathlib.Path | None = None, quiet: bool = False):
def
get_parallelism_index() -> int:
def
get_parallelism_count() -> int:
def
notify_qa_team_about_failure(failure: str) -> None:
213def notify_qa_team_about_failure(failure: str) -> None: 214 if not is_in_buildkite(): 215 return 216 217 label = get_var(BuildkiteEnvVar.BUILDKITE_LABEL) 218 message = f"{label}: {failure}" 219 print(message) 220 pipeline = { 221 "notify": [ 222 { 223 "slack": { 224 "channels": ["#team-testing-bots"], 225 "message": message, 226 }, 227 "if": 'build.state == "passed" || build.state == "failed" || build.state == "canceled"', 228 } 229 ] 230 } 231 spawn.runv( 232 ["buildkite-agent", "pipeline", "upload"], stdin=yaml.dump(pipeline).encode() 233 )
def
truncate_annotation_str(text: str, max_length: int = 900000) -> str:
def
get_artifact_url(artifact: dict[str, typing.Any]) -> str:
287def get_artifact_url(artifact: dict[str, Any]) -> str: 288 org = get_var(BuildkiteEnvVar.BUILDKITE_ORGANIZATION_SLUG) 289 pipeline = get_var(BuildkiteEnvVar.BUILDKITE_PIPELINE_SLUG) 290 build = get_var(BuildkiteEnvVar.BUILDKITE_BUILD_NUMBER) 291 return f"https://buildkite.com/organizations/{org}/pipelines/{pipeline}/builds/{build}/jobs/{artifact['job_id']}/artifacts/{artifact['id']}"
def
add_annotation_raw(style: str, markdown: str) -> None:
294def add_annotation_raw(style: str, markdown: str) -> None: 295 """ 296 Note that this does not trim the data. 297 :param markdown: must not exceed 1 MB 298 """ 299 spawn.runv( 300 [ 301 "buildkite-agent", 302 "annotate", 303 f"--style={style}", 304 f"--context={os.environ['BUILDKITE_JOB_ID']}-{style}", 305 ], 306 stdin=markdown.encode(), 307 )
Note that this does not trim the data.
Parameters
- markdown: must not exceed 1 MB
def
add_annotation(style: str, title: str, content: str) -> None:
310def add_annotation(style: str, title: str, content: str) -> None: 311 if style == "info": 312 markdown = f"""<details><summary>{title}</summary> 313 314{truncate_annotation_str(content)} 315</details>""" 316 else: 317 markdown = f"""{title} 318 319{truncate_annotation_str(content)}""" 320 add_annotation_raw(style, markdown)
def
get_job_url_from_build_url(build_url: str, build_job_id: str) -> str:
def
get_job_url_from_pipeline_and_build(pipeline: str, build_number: str | int, build_job_id: str) -> str: