misc.python.materialize.buildkite
Buildkite utilities.
1# Copyright Materialize, Inc. and contributors. All rights reserved. 2# 3# Use of this software is governed by the Business Source License 4# included in the LICENSE file at the root of this repository. 5# 6# As of the Change Date specified in that file, in accordance with 7# the Business Source License, use of this software will be governed 8# by the Apache License, Version 2.0. 9 10"""Buildkite utilities.""" 11 12import os 13import subprocess 14from collections.abc import Callable 15from enum import Enum, auto 16from pathlib import Path 17from typing import Any, TypeVar 18 19import yaml 20 21from materialize import git, spawn, ui 22 23T = TypeVar("T") 24 25 26class BuildkiteEnvVar(Enum): 27 # environment 28 BUILDKITE_AGENT_META_DATA_AWS_INSTANCE_TYPE = auto() 29 BUILDKITE_AGENT_META_DATA_INSTANCE_TYPE = auto() 30 31 # build 32 BUILDKITE_PULL_REQUEST = auto() 33 BUILDKITE_BUILD_NUMBER = auto() 34 BUILDKITE_BUILD_ID = auto() 35 BUILDKITE_PIPELINE_DEFAULT_BRANCH = auto() 36 BUILDKITE_PULL_REQUEST_BASE_BRANCH = auto() 37 BUILDKITE_ORGANIZATION_SLUG = auto() 38 BUILDKITE_PIPELINE_SLUG = auto() 39 BUILDKITE_BRANCH = auto() 40 BUILDKITE_COMMIT = auto() 41 BUILDKITE_BUILD_URL = auto() 42 43 # step 44 BUILDKITE_PARALLEL_JOB = auto() 45 BUILDKITE_PARALLEL_JOB_COUNT = auto() 46 BUILDKITE_STEP_KEY = auto() 47 # will be the same for sharded and retried build steps 48 BUILDKITE_STEP_ID = auto() 49 # assumed to be unique 50 BUILDKITE_JOB_ID = auto() 51 BUILDKITE_LABEL = auto() 52 BUILDKITE_RETRY_COUNT = auto() 53 54 55def get_var(var: BuildkiteEnvVar, fallback_value: Any = None) -> Any: 56 return os.getenv(var.name, fallback_value) 57 58 59def is_in_buildkite() -> bool: 60 return ui.env_is_truthy("BUILDKITE") 61 62 63def is_in_pull_request() -> bool: 64 """Note that this is a heuristic.""" 65 66 if not is_in_buildkite(): 67 return False 68 69 if is_pull_request_marker_set(): 70 return True 71 72 if is_on_default_branch(): 73 return False 74 75 if git.is_on_release_version(): 76 return False 77 78 if git.contains_commit("HEAD", "main"): 79 return False 80 81 return True 82 83 84def is_pull_request_marker_set() -> bool: 85 # If set, this variable will contain either the ID of the pull request or the string "false". 86 return get_var(BuildkiteEnvVar.BUILDKITE_PULL_REQUEST, "false") != "false" 87 88 89def is_on_default_branch() -> bool: 90 current_branch = get_var(BuildkiteEnvVar.BUILDKITE_BRANCH, "unknown") 91 default_branch = get_var(BuildkiteEnvVar.BUILDKITE_PIPELINE_DEFAULT_BRANCH, "main") 92 return current_branch == default_branch 93 94 95def get_pull_request_base_branch(fallback: str = "main"): 96 return get_var(BuildkiteEnvVar.BUILDKITE_PULL_REQUEST_BASE_BRANCH, fallback) 97 98 99def get_pipeline_default_branch(fallback: str = "main"): 100 return get_var(BuildkiteEnvVar.BUILDKITE_PIPELINE_DEFAULT_BRANCH, fallback) 101 102 103def get_merge_base(url: str = "https://github.com/MaterializeInc/materialize") -> str: 104 base_branch = get_pull_request_base_branch() or get_pipeline_default_branch() 105 merge_base = git.get_common_ancestor_commit( 106 remote=git.get_remote(url), branch=base_branch 107 ) 108 return merge_base 109 110 111def inline_link(url: str, label: str | None = None) -> str: 112 """See https://buildkite.com/docs/pipelines/links-and-images-in-log-output""" 113 link = f"url='{url}'" 114 115 if label: 116 link = f"{link};content='{label}'" 117 118 # These escape codes are not supported by terminals 119 return f"\033]1339;{link}\a" if is_in_buildkite() else f"{label},{url}" 120 121 122def inline_image(url: str, alt: str) -> str: 123 """See https://buildkite.com/docs/pipelines/links-and-images-in-log-output#images-syntax-for-inlining-images""" 124 content = f"url='{url}';alt='{alt}'" 125 # These escape codes are not supported by terminals 126 return f"\033]1338;{content}\a" if is_in_buildkite() else f"{alt},{url}" 127 128 129def find_modified_lines() -> set[tuple[str, int]]: 130 """ 131 Find each line that has been added or modified in the current pull request. 132 """ 133 merge_base = get_merge_base() 134 print(f"Merge base: {merge_base}") 135 result = spawn.capture(["git", "diff", "-U0", merge_base]) 136 137 modified_lines: set[tuple[str, int]] = set() 138 file_path = None 139 for line in result.splitlines(): 140 # +++ b/src/adapter/src/coord/command_handler.rs 141 if line.startswith("+++"): 142 file_path = line.removeprefix("+++ b/") 143 # @@ -641,7 +640,6 @@ impl Coordinator { 144 elif line.startswith("@@ "): 145 # We only care about the second value ("+640,6" in the example), 146 # which contains the line number and length of the modified block 147 # in new code state. 148 parts = line.split(" ")[2] 149 if "," in parts: 150 start, length = map(int, parts.split(",")) 151 else: 152 start = int(parts) 153 length = 1 154 for line_nr in range(start, start + length): 155 assert file_path 156 modified_lines.add((file_path, line_nr)) 157 return modified_lines 158 159 160def upload_artifact(path: Path | str, cwd: Path | None = None, quiet: bool = False): 161 spawn.run_with_retries( 162 lambda: spawn.runv( 163 [ 164 "buildkite-agent", 165 "artifact", 166 "upload", 167 "--log-level", 168 "fatal" if quiet else "notice", 169 path, 170 ], 171 cwd=cwd, 172 ) 173 ) 174 175 176def get_parallelism_index() -> int: 177 _validate_parallelism_configuration() 178 return int(get_var(BuildkiteEnvVar.BUILDKITE_PARALLEL_JOB, 0)) 179 180 181def get_parallelism_count() -> int: 182 _validate_parallelism_configuration() 183 return int(get_var(BuildkiteEnvVar.BUILDKITE_PARALLEL_JOB_COUNT, 1)) 184 185 186def _upload_shard_info_metadata(items: list[str]) -> None: 187 label = get_var(BuildkiteEnvVar.BUILDKITE_LABEL) or get_var( 188 BuildkiteEnvVar.BUILDKITE_STEP_KEY 189 ) 190 spawn.runv( 191 ["buildkite-agent", "meta-data", "set", f"Shard for {label}", ", ".join(items)] 192 ) 193 194 195def notify_qa_team_about_failure(failure: str) -> None: 196 if not is_in_buildkite(): 197 return 198 199 label = get_var(BuildkiteEnvVar.BUILDKITE_LABEL) 200 message = f"{label}: {failure}" 201 print(message) 202 pipeline = { 203 "notify": [ 204 { 205 "slack": { 206 "channels": ["#team-testing-bots"], 207 "message": message, 208 }, 209 "if": 'build.state == "passed" || build.state == "failed" || build.state == "canceled"', 210 } 211 ] 212 } 213 spawn.runv( 214 ["buildkite-agent", "pipeline", "upload"], stdin=yaml.dump(pipeline).encode() 215 ) 216 217 218def shard_list(items: list[T], to_identifier: Callable[[T], str]) -> list[T]: 219 if len(items) == 0: 220 return [] 221 222 parallelism_index = get_parallelism_index() 223 parallelism_count = get_parallelism_count() 224 225 if parallelism_count == 1: 226 return items 227 228 accepted_items = [ 229 item 230 for i, item in enumerate(items) 231 if i % parallelism_count == parallelism_index 232 ] 233 234 if is_in_buildkite() and accepted_items: 235 _upload_shard_info_metadata(list(map(to_identifier, accepted_items))) 236 return accepted_items 237 238 239def _validate_parallelism_configuration() -> None: 240 job_index = get_var(BuildkiteEnvVar.BUILDKITE_PARALLEL_JOB) 241 job_count = get_var(BuildkiteEnvVar.BUILDKITE_PARALLEL_JOB_COUNT) 242 243 if job_index is None and job_count is None: 244 # OK 245 return 246 247 job_index_desc = f"${BuildkiteEnvVar.BUILDKITE_PARALLEL_JOB.name} (= '{job_index}')" 248 job_count_desc = ( 249 f"${BuildkiteEnvVar.BUILDKITE_PARALLEL_JOB_COUNT.name} (= '{job_count}')" 250 ) 251 assert ( 252 job_index is not None and job_count is not None 253 ), f"{job_index_desc} and {job_count_desc} need to be either both specified or not specified" 254 255 job_index = int(job_index) 256 job_count = int(job_count) 257 258 assert job_count > 0, f"{job_count_desc} not valid" 259 assert ( 260 0 <= job_index < job_count 261 ), f"{job_index_desc} out of valid range with {job_count_desc}" 262 263 264def truncate_annotation_str(text: str, max_length: int = 900_000) -> str: 265 # 400 Bad Request: The annotation body must be less than 1 MB 266 return text if len(text) <= max_length else text[:max_length] + "..." 267 268 269def get_artifact_url(artifact: dict[str, Any]) -> str: 270 org = get_var(BuildkiteEnvVar.BUILDKITE_ORGANIZATION_SLUG) 271 pipeline = get_var(BuildkiteEnvVar.BUILDKITE_PIPELINE_SLUG) 272 build = get_var(BuildkiteEnvVar.BUILDKITE_BUILD_NUMBER) 273 return f"https://buildkite.com/organizations/{org}/pipelines/{pipeline}/builds/{build}/jobs/{artifact['job_id']}/artifacts/{artifact['id']}" 274 275 276def add_annotation_raw(style: str, markdown: str) -> None: 277 """ 278 Note that this does not trim the data. 279 :param markdown: must not exceed 1 MB 280 """ 281 spawn.runv( 282 [ 283 "buildkite-agent", 284 "annotate", 285 f"--style={style}", 286 f"--context={os.environ['BUILDKITE_JOB_ID']}-{style}", 287 ], 288 stdin=markdown.encode(), 289 ) 290 291 292def add_annotation(style: str, title: str, content: str) -> None: 293 if style == "info": 294 markdown = f"""<details><summary>{title}</summary> 295 296{truncate_annotation_str(content)} 297</details>""" 298 else: 299 markdown = f"""{title} 300 301{truncate_annotation_str(content)}""" 302 add_annotation_raw(style, markdown) 303 304 305def get_job_url_from_build_url(build_url: str, build_job_id: str) -> str: 306 return f"{build_url}#{build_job_id}" 307 308 309def get_job_url_from_pipeline_and_build( 310 pipeline: str, build_number: str | int, build_job_id: str 311) -> str: 312 build_url = f"https://buildkite.com/materialize/{pipeline}/builds/{build_number}" 313 return get_job_url_from_build_url(build_url, build_job_id) 314 315 316def get_build_status(build: str) -> str: 317 return spawn.capture( 318 ["buildkite-agent", "meta-data", "get", build], 319 stderr=subprocess.DEVNULL, 320 )
class
BuildkiteEnvVar(enum.Enum):
27class BuildkiteEnvVar(Enum): 28 # environment 29 BUILDKITE_AGENT_META_DATA_AWS_INSTANCE_TYPE = auto() 30 BUILDKITE_AGENT_META_DATA_INSTANCE_TYPE = auto() 31 32 # build 33 BUILDKITE_PULL_REQUEST = auto() 34 BUILDKITE_BUILD_NUMBER = auto() 35 BUILDKITE_BUILD_ID = auto() 36 BUILDKITE_PIPELINE_DEFAULT_BRANCH = auto() 37 BUILDKITE_PULL_REQUEST_BASE_BRANCH = auto() 38 BUILDKITE_ORGANIZATION_SLUG = auto() 39 BUILDKITE_PIPELINE_SLUG = auto() 40 BUILDKITE_BRANCH = auto() 41 BUILDKITE_COMMIT = auto() 42 BUILDKITE_BUILD_URL = auto() 43 44 # step 45 BUILDKITE_PARALLEL_JOB = auto() 46 BUILDKITE_PARALLEL_JOB_COUNT = auto() 47 BUILDKITE_STEP_KEY = auto() 48 # will be the same for sharded and retried build steps 49 BUILDKITE_STEP_ID = auto() 50 # assumed to be unique 51 BUILDKITE_JOB_ID = auto() 52 BUILDKITE_LABEL = auto() 53 BUILDKITE_RETRY_COUNT = auto()
BUILDKITE_AGENT_META_DATA_AWS_INSTANCE_TYPE =
<BuildkiteEnvVar.BUILDKITE_AGENT_META_DATA_AWS_INSTANCE_TYPE: 1>
BUILDKITE_AGENT_META_DATA_INSTANCE_TYPE =
<BuildkiteEnvVar.BUILDKITE_AGENT_META_DATA_INSTANCE_TYPE: 2>
BUILDKITE_PULL_REQUEST =
<BuildkiteEnvVar.BUILDKITE_PULL_REQUEST: 3>
BUILDKITE_BUILD_NUMBER =
<BuildkiteEnvVar.BUILDKITE_BUILD_NUMBER: 4>
BUILDKITE_BUILD_ID =
<BuildkiteEnvVar.BUILDKITE_BUILD_ID: 5>
BUILDKITE_PIPELINE_DEFAULT_BRANCH =
<BuildkiteEnvVar.BUILDKITE_PIPELINE_DEFAULT_BRANCH: 6>
BUILDKITE_PULL_REQUEST_BASE_BRANCH =
<BuildkiteEnvVar.BUILDKITE_PULL_REQUEST_BASE_BRANCH: 7>
BUILDKITE_ORGANIZATION_SLUG =
<BuildkiteEnvVar.BUILDKITE_ORGANIZATION_SLUG: 8>
BUILDKITE_PIPELINE_SLUG =
<BuildkiteEnvVar.BUILDKITE_PIPELINE_SLUG: 9>
BUILDKITE_BRANCH =
<BuildkiteEnvVar.BUILDKITE_BRANCH: 10>
BUILDKITE_COMMIT =
<BuildkiteEnvVar.BUILDKITE_COMMIT: 11>
BUILDKITE_BUILD_URL =
<BuildkiteEnvVar.BUILDKITE_BUILD_URL: 12>
BUILDKITE_PARALLEL_JOB =
<BuildkiteEnvVar.BUILDKITE_PARALLEL_JOB: 13>
BUILDKITE_PARALLEL_JOB_COUNT =
<BuildkiteEnvVar.BUILDKITE_PARALLEL_JOB_COUNT: 14>
BUILDKITE_STEP_KEY =
<BuildkiteEnvVar.BUILDKITE_STEP_KEY: 15>
BUILDKITE_STEP_ID =
<BuildkiteEnvVar.BUILDKITE_STEP_ID: 16>
BUILDKITE_JOB_ID =
<BuildkiteEnvVar.BUILDKITE_JOB_ID: 17>
BUILDKITE_LABEL =
<BuildkiteEnvVar.BUILDKITE_LABEL: 18>
BUILDKITE_RETRY_COUNT =
<BuildkiteEnvVar.BUILDKITE_RETRY_COUNT: 19>
def
is_in_buildkite() -> bool:
def
is_in_pull_request() -> bool:
64def is_in_pull_request() -> bool: 65 """Note that this is a heuristic.""" 66 67 if not is_in_buildkite(): 68 return False 69 70 if is_pull_request_marker_set(): 71 return True 72 73 if is_on_default_branch(): 74 return False 75 76 if git.is_on_release_version(): 77 return False 78 79 if git.contains_commit("HEAD", "main"): 80 return False 81 82 return True
Note that this is a heuristic.
def
is_pull_request_marker_set() -> bool:
def
is_on_default_branch() -> bool:
def
get_pull_request_base_branch(fallback: str = 'main'):
def
get_pipeline_default_branch(fallback: str = 'main'):
def
get_merge_base(url: str = 'https://github.com/MaterializeInc/materialize') -> str:
def
inline_link(url: str, label: str | None = None) -> str:
112def inline_link(url: str, label: str | None = None) -> str: 113 """See https://buildkite.com/docs/pipelines/links-and-images-in-log-output""" 114 link = f"url='{url}'" 115 116 if label: 117 link = f"{link};content='{label}'" 118 119 # These escape codes are not supported by terminals 120 return f"\033]1339;{link}\a" if is_in_buildkite() else f"{label},{url}"
def
inline_image(url: str, alt: str) -> str:
123def inline_image(url: str, alt: str) -> str: 124 """See https://buildkite.com/docs/pipelines/links-and-images-in-log-output#images-syntax-for-inlining-images""" 125 content = f"url='{url}';alt='{alt}'" 126 # These escape codes are not supported by terminals 127 return f"\033]1338;{content}\a" if is_in_buildkite() else f"{alt},{url}"
def
find_modified_lines() -> set[tuple[str, int]]:
130def find_modified_lines() -> set[tuple[str, int]]: 131 """ 132 Find each line that has been added or modified in the current pull request. 133 """ 134 merge_base = get_merge_base() 135 print(f"Merge base: {merge_base}") 136 result = spawn.capture(["git", "diff", "-U0", merge_base]) 137 138 modified_lines: set[tuple[str, int]] = set() 139 file_path = None 140 for line in result.splitlines(): 141 # +++ b/src/adapter/src/coord/command_handler.rs 142 if line.startswith("+++"): 143 file_path = line.removeprefix("+++ b/") 144 # @@ -641,7 +640,6 @@ impl Coordinator { 145 elif line.startswith("@@ "): 146 # We only care about the second value ("+640,6" in the example), 147 # which contains the line number and length of the modified block 148 # in new code state. 149 parts = line.split(" ")[2] 150 if "," in parts: 151 start, length = map(int, parts.split(",")) 152 else: 153 start = int(parts) 154 length = 1 155 for line_nr in range(start, start + length): 156 assert file_path 157 modified_lines.add((file_path, line_nr)) 158 return modified_lines
Find each line that has been added or modified in the current pull request.
def
upload_artifact( path: pathlib.Path | str, cwd: pathlib.Path | None = None, quiet: bool = False):
def
get_parallelism_index() -> int:
def
get_parallelism_count() -> int:
def
notify_qa_team_about_failure(failure: str) -> None:
196def notify_qa_team_about_failure(failure: str) -> None: 197 if not is_in_buildkite(): 198 return 199 200 label = get_var(BuildkiteEnvVar.BUILDKITE_LABEL) 201 message = f"{label}: {failure}" 202 print(message) 203 pipeline = { 204 "notify": [ 205 { 206 "slack": { 207 "channels": ["#team-testing-bots"], 208 "message": message, 209 }, 210 "if": 'build.state == "passed" || build.state == "failed" || build.state == "canceled"', 211 } 212 ] 213 } 214 spawn.runv( 215 ["buildkite-agent", "pipeline", "upload"], stdin=yaml.dump(pipeline).encode() 216 )
def
truncate_annotation_str(text: str, max_length: int = 900000) -> str:
def
get_artifact_url(artifact: dict[str, typing.Any]) -> str:
270def get_artifact_url(artifact: dict[str, Any]) -> str: 271 org = get_var(BuildkiteEnvVar.BUILDKITE_ORGANIZATION_SLUG) 272 pipeline = get_var(BuildkiteEnvVar.BUILDKITE_PIPELINE_SLUG) 273 build = get_var(BuildkiteEnvVar.BUILDKITE_BUILD_NUMBER) 274 return f"https://buildkite.com/organizations/{org}/pipelines/{pipeline}/builds/{build}/jobs/{artifact['job_id']}/artifacts/{artifact['id']}"
def
add_annotation_raw(style: str, markdown: str) -> None:
277def add_annotation_raw(style: str, markdown: str) -> None: 278 """ 279 Note that this does not trim the data. 280 :param markdown: must not exceed 1 MB 281 """ 282 spawn.runv( 283 [ 284 "buildkite-agent", 285 "annotate", 286 f"--style={style}", 287 f"--context={os.environ['BUILDKITE_JOB_ID']}-{style}", 288 ], 289 stdin=markdown.encode(), 290 )
Note that this does not trim the data.
Parameters
- markdown: must not exceed 1 MB
def
add_annotation(style: str, title: str, content: str) -> None:
293def add_annotation(style: str, title: str, content: str) -> None: 294 if style == "info": 295 markdown = f"""<details><summary>{title}</summary> 296 297{truncate_annotation_str(content)} 298</details>""" 299 else: 300 markdown = f"""{title} 301 302{truncate_annotation_str(content)}""" 303 add_annotation_raw(style, markdown)
def
get_job_url_from_build_url(build_url: str, build_job_id: str) -> str:
def
get_job_url_from_pipeline_and_build(pipeline: str, build_number: str | int, build_job_id: str) -> str:
def
get_build_status(build: str) -> str: