misc.python.materialize.buildkite

Buildkite utilities.

  1# Copyright Materialize, Inc. and contributors. All rights reserved.
  2#
  3# Use of this software is governed by the Business Source License
  4# included in the LICENSE file at the root of this repository.
  5#
  6# As of the Change Date specified in that file, in accordance with
  7# the Business Source License, use of this software will be governed
  8# by the Apache License, Version 2.0.
  9
 10"""Buildkite utilities."""
 11
 12import hashlib
 13import os
 14from collections.abc import Callable
 15from enum import Enum, auto
 16from pathlib import Path
 17from typing import Any, TypeVar
 18
 19import yaml
 20
 21from materialize import git, spawn, ui
 22
 23T = TypeVar("T")
 24
 25
 26class BuildkiteEnvVar(Enum):
 27    # environment
 28    BUILDKITE_AGENT_META_DATA_AWS_INSTANCE_TYPE = auto()
 29    BUILDKITE_AGENT_META_DATA_INSTANCE_TYPE = auto()
 30
 31    # build
 32    BUILDKITE_PULL_REQUEST = auto()
 33    BUILDKITE_BUILD_NUMBER = auto()
 34    BUILDKITE_BUILD_ID = auto()
 35    BUILDKITE_PIPELINE_DEFAULT_BRANCH = auto()
 36    BUILDKITE_PULL_REQUEST_BASE_BRANCH = auto()
 37    BUILDKITE_ORGANIZATION_SLUG = auto()
 38    BUILDKITE_PIPELINE_SLUG = auto()
 39    BUILDKITE_BRANCH = auto()
 40    BUILDKITE_COMMIT = auto()
 41    BUILDKITE_BUILD_URL = auto()
 42
 43    # step
 44    BUILDKITE_PARALLEL_JOB = auto()
 45    BUILDKITE_PARALLEL_JOB_COUNT = auto()
 46    BUILDKITE_STEP_KEY = auto()
 47    # will be the same for sharded and retried build steps
 48    BUILDKITE_STEP_ID = auto()
 49    # assumed to be unique
 50    BUILDKITE_JOB_ID = auto()
 51    BUILDKITE_LABEL = auto()
 52    BUILDKITE_RETRY_COUNT = auto()
 53
 54
 55def get_var(var: BuildkiteEnvVar, fallback_value: Any = None) -> Any:
 56    return os.getenv(var.name, fallback_value)
 57
 58
 59def is_in_buildkite() -> bool:
 60    return ui.env_is_truthy("BUILDKITE")
 61
 62
 63def is_in_pull_request() -> bool:
 64    """Note that this is a heuristic."""
 65
 66    if not is_in_buildkite():
 67        return False
 68
 69    if is_pull_request_marker_set():
 70        return True
 71
 72    if is_on_default_branch():
 73        return False
 74
 75    if git.is_on_release_version():
 76        return False
 77
 78    if git.contains_commit("HEAD", "main", fetch=True):
 79        return False
 80
 81    return True
 82
 83
 84def is_pull_request_marker_set() -> bool:
 85    # If set, this variable will contain either the ID of the pull request or the string "false".
 86    return get_var(BuildkiteEnvVar.BUILDKITE_PULL_REQUEST, "false") != "false"
 87
 88
 89def is_on_default_branch() -> bool:
 90    current_branch = get_var(BuildkiteEnvVar.BUILDKITE_BRANCH, "unknown")
 91    default_branch = get_var(BuildkiteEnvVar.BUILDKITE_PIPELINE_DEFAULT_BRANCH, "main")
 92    return current_branch == default_branch
 93
 94
 95def get_pull_request_base_branch(fallback: str = "main"):
 96    return get_var(BuildkiteEnvVar.BUILDKITE_PULL_REQUEST_BASE_BRANCH, fallback)
 97
 98
 99def get_pipeline_default_branch(fallback: str = "main"):
100    return get_var(BuildkiteEnvVar.BUILDKITE_PIPELINE_DEFAULT_BRANCH, fallback)
101
102
103def get_merge_base(url: str = "https://github.com/MaterializeInc/materialize") -> str:
104    base_branch = get_pull_request_base_branch() or get_pipeline_default_branch()
105    merge_base = git.get_common_ancestor_commit(
106        remote=git.get_remote(url), branch=base_branch, fetch_branch=True
107    )
108    return merge_base
109
110
111def inline_link(url: str, label: str | None = None) -> str:
112    """See https://buildkite.com/docs/pipelines/links-and-images-in-log-output"""
113    link = f"url='{url}'"
114
115    if label:
116        link = f"{link};content='{label}'"
117
118    # These escape codes are not supported by terminals
119    return f"\033]1339;{link}\a" if is_in_buildkite() else f"{label},{url}"
120
121
122def inline_image(url: str, alt: str) -> str:
123    """See https://buildkite.com/docs/pipelines/links-and-images-in-log-output#images-syntax-for-inlining-images"""
124    content = f"url='{url}';alt='{alt}'"
125    # These escape codes are not supported by terminals
126    return f"\033]1338;{content}\a" if is_in_buildkite() else f"{alt},{url}"
127
128
129def find_modified_lines() -> set[tuple[str, int]]:
130    """
131    Find each line that has been added or modified in the current pull request.
132    """
133    merge_base = get_merge_base()
134    print(f"Merge base: {merge_base}")
135    result = spawn.capture(["git", "diff", "-U0", merge_base])
136
137    modified_lines: set[tuple[str, int]] = set()
138    file_path = None
139    for line in result.splitlines():
140        # +++ b/src/adapter/src/coord/command_handler.rs
141        if line.startswith("+++"):
142            file_path = line.removeprefix("+++ b/")
143        # @@ -641,7 +640,6 @@ impl Coordinator {
144        elif line.startswith("@@ "):
145            # We only care about the second value ("+640,6" in the example),
146            # which contains the line number and length of the modified block
147            # in new code state.
148            parts = line.split(" ")[2]
149            if "," in parts:
150                start, length = map(int, parts.split(","))
151            else:
152                start = int(parts)
153                length = 1
154            for line_nr in range(start, start + length):
155                assert file_path
156                modified_lines.add((file_path, line_nr))
157    return modified_lines
158
159
160def upload_artifact(path: Path | str, cwd: Path | None = None, quiet: bool = False):
161    spawn.runv(
162        [
163            "buildkite-agent",
164            "artifact",
165            "upload",
166            "--log-level",
167            "fatal" if quiet else "notice",
168            path,
169        ],
170        cwd=cwd,
171    )
172
173
174def get_parallelism_index() -> int:
175    _validate_parallelism_configuration()
176    return int(get_var(BuildkiteEnvVar.BUILDKITE_PARALLEL_JOB, 0))
177
178
179def get_parallelism_count() -> int:
180    _validate_parallelism_configuration()
181    return int(get_var(BuildkiteEnvVar.BUILDKITE_PARALLEL_JOB_COUNT, 1))
182
183
184def _accepted_by_shard(
185    identifier: str,
186    parallelism_index: int | None = None,
187    parallelism_count: int | None = None,
188) -> bool:
189    if parallelism_index is None:
190        parallelism_index = get_parallelism_index()
191    if parallelism_count is None:
192        parallelism_count = get_parallelism_count()
193
194    if parallelism_count == 1:
195        return True
196
197    hash_value = int.from_bytes(
198        hashlib.md5(identifier.encode()).digest(), byteorder="big"
199    )
200    return hash_value % parallelism_count == parallelism_index
201
202
203def _upload_shard_info_metadata(items: list[str]) -> None:
204    label = get_var(BuildkiteEnvVar.BUILDKITE_LABEL) or get_var(
205        BuildkiteEnvVar.BUILDKITE_STEP_KEY
206    )
207    spawn.runv(
208        ["buildkite-agent", "meta-data", "set", f"Shard for {label}", ", ".join(items)]
209    )
210
211
212def notify_qa_team_about_failure(failure: str) -> None:
213    if not is_in_buildkite():
214        return
215
216    label = get_var(BuildkiteEnvVar.BUILDKITE_LABEL)
217    message = f"{label}: {failure}"
218    print(message)
219    pipeline = {
220        "notify": [
221            {
222                "slack": {
223                    "channels": ["#team-testing-bots"],
224                    "message": message,
225                },
226                "if": 'build.state == "passed" || build.state == "failed" || build.state == "canceled"',
227            }
228        ]
229    }
230    spawn.runv(
231        ["buildkite-agent", "pipeline", "upload"], stdin=yaml.dump(pipeline).encode()
232    )
233
234
235def shard_list(items: list[T], to_identifier: Callable[[T], str]) -> list[T]:
236    if len(items) == 0:
237        return []
238
239    parallelism_index = get_parallelism_index()
240    parallelism_count = get_parallelism_count()
241
242    if parallelism_count == 1:
243        return items
244
245    accepted_items = [
246        item
247        for item in items
248        if _accepted_by_shard(to_identifier(item), parallelism_index, parallelism_count)
249    ]
250
251    if is_in_buildkite() and accepted_items:
252        _upload_shard_info_metadata(list(map(to_identifier, accepted_items)))
253    return accepted_items
254
255
256def _validate_parallelism_configuration() -> None:
257    job_index = get_var(BuildkiteEnvVar.BUILDKITE_PARALLEL_JOB)
258    job_count = get_var(BuildkiteEnvVar.BUILDKITE_PARALLEL_JOB_COUNT)
259
260    if job_index is None and job_count is None:
261        # OK
262        return
263
264    job_index_desc = f"${BuildkiteEnvVar.BUILDKITE_PARALLEL_JOB.name} (= '{job_index}')"
265    job_count_desc = (
266        f"${BuildkiteEnvVar.BUILDKITE_PARALLEL_JOB_COUNT.name} (= '{job_count}')"
267    )
268    assert (
269        job_index is not None and job_count is not None
270    ), f"{job_index_desc} and {job_count_desc} need to be either both specified or not specified"
271
272    job_index = int(job_index)
273    job_count = int(job_count)
274
275    assert job_count > 0, f"{job_count_desc} not valid"
276    assert (
277        0 <= job_index < job_count
278    ), f"{job_index_desc} out of valid range with {job_count_desc}"
279
280
281def truncate_annotation_str(text: str, max_length: int = 900_000) -> str:
282    # 400 Bad Request: The annotation body must be less than 1 MB
283    return text if len(text) <= max_length else text[:max_length] + "..."
284
285
286def get_artifact_url(artifact: dict[str, Any]) -> str:
287    org = get_var(BuildkiteEnvVar.BUILDKITE_ORGANIZATION_SLUG)
288    pipeline = get_var(BuildkiteEnvVar.BUILDKITE_PIPELINE_SLUG)
289    build = get_var(BuildkiteEnvVar.BUILDKITE_BUILD_NUMBER)
290    return f"https://buildkite.com/organizations/{org}/pipelines/{pipeline}/builds/{build}/jobs/{artifact['job_id']}/artifacts/{artifact['id']}"
291
292
293def add_annotation_raw(style: str, markdown: str) -> None:
294    """
295    Note that this does not trim the data.
296    :param markdown: must not exceed 1 MB
297    """
298    spawn.runv(
299        [
300            "buildkite-agent",
301            "annotate",
302            f"--style={style}",
303            f"--context={os.environ['BUILDKITE_JOB_ID']}-{style}",
304        ],
305        stdin=markdown.encode(),
306    )
307
308
309def add_annotation(style: str, title: str, content: str) -> None:
310    if style == "info":
311        markdown = f"""<details><summary>{title}</summary>
312
313{truncate_annotation_str(content)}
314</details>"""
315    else:
316        markdown = f"""{title}
317
318{truncate_annotation_str(content)}"""
319    add_annotation_raw(style, markdown)
320
321
322def get_job_url_from_build_url(build_url: str, build_job_id: str) -> str:
323    return f"{build_url}#{build_job_id}"
324
325
326def get_job_url_from_pipeline_and_build(
327    pipeline: str, build_number: str | int, build_job_id: str
328) -> str:
329    build_url = f"https://buildkite.com/materialize/{pipeline}/builds/{build_number}"
330    return get_job_url_from_build_url(build_url, build_job_id)
class BuildkiteEnvVar(enum.Enum):
27class BuildkiteEnvVar(Enum):
28    # environment
29    BUILDKITE_AGENT_META_DATA_AWS_INSTANCE_TYPE = auto()
30    BUILDKITE_AGENT_META_DATA_INSTANCE_TYPE = auto()
31
32    # build
33    BUILDKITE_PULL_REQUEST = auto()
34    BUILDKITE_BUILD_NUMBER = auto()
35    BUILDKITE_BUILD_ID = auto()
36    BUILDKITE_PIPELINE_DEFAULT_BRANCH = auto()
37    BUILDKITE_PULL_REQUEST_BASE_BRANCH = auto()
38    BUILDKITE_ORGANIZATION_SLUG = auto()
39    BUILDKITE_PIPELINE_SLUG = auto()
40    BUILDKITE_BRANCH = auto()
41    BUILDKITE_COMMIT = auto()
42    BUILDKITE_BUILD_URL = auto()
43
44    # step
45    BUILDKITE_PARALLEL_JOB = auto()
46    BUILDKITE_PARALLEL_JOB_COUNT = auto()
47    BUILDKITE_STEP_KEY = auto()
48    # will be the same for sharded and retried build steps
49    BUILDKITE_STEP_ID = auto()
50    # assumed to be unique
51    BUILDKITE_JOB_ID = auto()
52    BUILDKITE_LABEL = auto()
53    BUILDKITE_RETRY_COUNT = auto()
BUILDKITE_AGENT_META_DATA_AWS_INSTANCE_TYPE = <BuildkiteEnvVar.BUILDKITE_AGENT_META_DATA_AWS_INSTANCE_TYPE: 1>
BUILDKITE_AGENT_META_DATA_INSTANCE_TYPE = <BuildkiteEnvVar.BUILDKITE_AGENT_META_DATA_INSTANCE_TYPE: 2>
BUILDKITE_PULL_REQUEST = <BuildkiteEnvVar.BUILDKITE_PULL_REQUEST: 3>
BUILDKITE_BUILD_NUMBER = <BuildkiteEnvVar.BUILDKITE_BUILD_NUMBER: 4>
BUILDKITE_BUILD_ID = <BuildkiteEnvVar.BUILDKITE_BUILD_ID: 5>
BUILDKITE_PIPELINE_DEFAULT_BRANCH = <BuildkiteEnvVar.BUILDKITE_PIPELINE_DEFAULT_BRANCH: 6>
BUILDKITE_PULL_REQUEST_BASE_BRANCH = <BuildkiteEnvVar.BUILDKITE_PULL_REQUEST_BASE_BRANCH: 7>
BUILDKITE_ORGANIZATION_SLUG = <BuildkiteEnvVar.BUILDKITE_ORGANIZATION_SLUG: 8>
BUILDKITE_PIPELINE_SLUG = <BuildkiteEnvVar.BUILDKITE_PIPELINE_SLUG: 9>
BUILDKITE_BRANCH = <BuildkiteEnvVar.BUILDKITE_BRANCH: 10>
BUILDKITE_COMMIT = <BuildkiteEnvVar.BUILDKITE_COMMIT: 11>
BUILDKITE_BUILD_URL = <BuildkiteEnvVar.BUILDKITE_BUILD_URL: 12>
BUILDKITE_PARALLEL_JOB = <BuildkiteEnvVar.BUILDKITE_PARALLEL_JOB: 13>
BUILDKITE_PARALLEL_JOB_COUNT = <BuildkiteEnvVar.BUILDKITE_PARALLEL_JOB_COUNT: 14>
BUILDKITE_STEP_KEY = <BuildkiteEnvVar.BUILDKITE_STEP_KEY: 15>
BUILDKITE_STEP_ID = <BuildkiteEnvVar.BUILDKITE_STEP_ID: 16>
BUILDKITE_JOB_ID = <BuildkiteEnvVar.BUILDKITE_JOB_ID: 17>
BUILDKITE_LABEL = <BuildkiteEnvVar.BUILDKITE_LABEL: 18>
BUILDKITE_RETRY_COUNT = <BuildkiteEnvVar.BUILDKITE_RETRY_COUNT: 19>
def get_var( var: BuildkiteEnvVar, fallback_value: Any = None) -> Any:
56def get_var(var: BuildkiteEnvVar, fallback_value: Any = None) -> Any:
57    return os.getenv(var.name, fallback_value)
def is_in_buildkite() -> bool:
60def is_in_buildkite() -> bool:
61    return ui.env_is_truthy("BUILDKITE")
def is_in_pull_request() -> bool:
64def is_in_pull_request() -> bool:
65    """Note that this is a heuristic."""
66
67    if not is_in_buildkite():
68        return False
69
70    if is_pull_request_marker_set():
71        return True
72
73    if is_on_default_branch():
74        return False
75
76    if git.is_on_release_version():
77        return False
78
79    if git.contains_commit("HEAD", "main", fetch=True):
80        return False
81
82    return True

Note that this is a heuristic.

def is_pull_request_marker_set() -> bool:
85def is_pull_request_marker_set() -> bool:
86    # If set, this variable will contain either the ID of the pull request or the string "false".
87    return get_var(BuildkiteEnvVar.BUILDKITE_PULL_REQUEST, "false") != "false"
def is_on_default_branch() -> bool:
90def is_on_default_branch() -> bool:
91    current_branch = get_var(BuildkiteEnvVar.BUILDKITE_BRANCH, "unknown")
92    default_branch = get_var(BuildkiteEnvVar.BUILDKITE_PIPELINE_DEFAULT_BRANCH, "main")
93    return current_branch == default_branch
def get_pull_request_base_branch(fallback: str = 'main'):
96def get_pull_request_base_branch(fallback: str = "main"):
97    return get_var(BuildkiteEnvVar.BUILDKITE_PULL_REQUEST_BASE_BRANCH, fallback)
def get_pipeline_default_branch(fallback: str = 'main'):
100def get_pipeline_default_branch(fallback: str = "main"):
101    return get_var(BuildkiteEnvVar.BUILDKITE_PIPELINE_DEFAULT_BRANCH, fallback)
def get_merge_base(url: str = 'https://github.com/MaterializeInc/materialize') -> str:
104def get_merge_base(url: str = "https://github.com/MaterializeInc/materialize") -> str:
105    base_branch = get_pull_request_base_branch() or get_pipeline_default_branch()
106    merge_base = git.get_common_ancestor_commit(
107        remote=git.get_remote(url), branch=base_branch, fetch_branch=True
108    )
109    return merge_base
def inline_image(url: str, alt: str) -> str:
123def inline_image(url: str, alt: str) -> str:
124    """See https://buildkite.com/docs/pipelines/links-and-images-in-log-output#images-syntax-for-inlining-images"""
125    content = f"url='{url}';alt='{alt}'"
126    # These escape codes are not supported by terminals
127    return f"\033]1338;{content}\a" if is_in_buildkite() else f"{alt},{url}"
def find_modified_lines() -> set[tuple[str, int]]:
130def find_modified_lines() -> set[tuple[str, int]]:
131    """
132    Find each line that has been added or modified in the current pull request.
133    """
134    merge_base = get_merge_base()
135    print(f"Merge base: {merge_base}")
136    result = spawn.capture(["git", "diff", "-U0", merge_base])
137
138    modified_lines: set[tuple[str, int]] = set()
139    file_path = None
140    for line in result.splitlines():
141        # +++ b/src/adapter/src/coord/command_handler.rs
142        if line.startswith("+++"):
143            file_path = line.removeprefix("+++ b/")
144        # @@ -641,7 +640,6 @@ impl Coordinator {
145        elif line.startswith("@@ "):
146            # We only care about the second value ("+640,6" in the example),
147            # which contains the line number and length of the modified block
148            # in new code state.
149            parts = line.split(" ")[2]
150            if "," in parts:
151                start, length = map(int, parts.split(","))
152            else:
153                start = int(parts)
154                length = 1
155            for line_nr in range(start, start + length):
156                assert file_path
157                modified_lines.add((file_path, line_nr))
158    return modified_lines

Find each line that has been added or modified in the current pull request.

def upload_artifact( path: pathlib.Path | str, cwd: pathlib.Path | None = None, quiet: bool = False):
161def upload_artifact(path: Path | str, cwd: Path | None = None, quiet: bool = False):
162    spawn.runv(
163        [
164            "buildkite-agent",
165            "artifact",
166            "upload",
167            "--log-level",
168            "fatal" if quiet else "notice",
169            path,
170        ],
171        cwd=cwd,
172    )
def get_parallelism_index() -> int:
175def get_parallelism_index() -> int:
176    _validate_parallelism_configuration()
177    return int(get_var(BuildkiteEnvVar.BUILDKITE_PARALLEL_JOB, 0))
def get_parallelism_count() -> int:
180def get_parallelism_count() -> int:
181    _validate_parallelism_configuration()
182    return int(get_var(BuildkiteEnvVar.BUILDKITE_PARALLEL_JOB_COUNT, 1))
def notify_qa_team_about_failure(failure: str) -> None:
213def notify_qa_team_about_failure(failure: str) -> None:
214    if not is_in_buildkite():
215        return
216
217    label = get_var(BuildkiteEnvVar.BUILDKITE_LABEL)
218    message = f"{label}: {failure}"
219    print(message)
220    pipeline = {
221        "notify": [
222            {
223                "slack": {
224                    "channels": ["#team-testing-bots"],
225                    "message": message,
226                },
227                "if": 'build.state == "passed" || build.state == "failed" || build.state == "canceled"',
228            }
229        ]
230    }
231    spawn.runv(
232        ["buildkite-agent", "pipeline", "upload"], stdin=yaml.dump(pipeline).encode()
233    )
def shard_list(items: list[~T], to_identifier: Callable[[~T], str]) -> list[~T]:
236def shard_list(items: list[T], to_identifier: Callable[[T], str]) -> list[T]:
237    if len(items) == 0:
238        return []
239
240    parallelism_index = get_parallelism_index()
241    parallelism_count = get_parallelism_count()
242
243    if parallelism_count == 1:
244        return items
245
246    accepted_items = [
247        item
248        for item in items
249        if _accepted_by_shard(to_identifier(item), parallelism_index, parallelism_count)
250    ]
251
252    if is_in_buildkite() and accepted_items:
253        _upload_shard_info_metadata(list(map(to_identifier, accepted_items)))
254    return accepted_items
def truncate_annotation_str(text: str, max_length: int = 900000) -> str:
282def truncate_annotation_str(text: str, max_length: int = 900_000) -> str:
283    # 400 Bad Request: The annotation body must be less than 1 MB
284    return text if len(text) <= max_length else text[:max_length] + "..."
def get_artifact_url(artifact: dict[str, typing.Any]) -> str:
287def get_artifact_url(artifact: dict[str, Any]) -> str:
288    org = get_var(BuildkiteEnvVar.BUILDKITE_ORGANIZATION_SLUG)
289    pipeline = get_var(BuildkiteEnvVar.BUILDKITE_PIPELINE_SLUG)
290    build = get_var(BuildkiteEnvVar.BUILDKITE_BUILD_NUMBER)
291    return f"https://buildkite.com/organizations/{org}/pipelines/{pipeline}/builds/{build}/jobs/{artifact['job_id']}/artifacts/{artifact['id']}"
def add_annotation_raw(style: str, markdown: str) -> None:
294def add_annotation_raw(style: str, markdown: str) -> None:
295    """
296    Note that this does not trim the data.
297    :param markdown: must not exceed 1 MB
298    """
299    spawn.runv(
300        [
301            "buildkite-agent",
302            "annotate",
303            f"--style={style}",
304            f"--context={os.environ['BUILDKITE_JOB_ID']}-{style}",
305        ],
306        stdin=markdown.encode(),
307    )

Note that this does not trim the data.

Parameters
  • markdown: must not exceed 1 MB
def add_annotation(style: str, title: str, content: str) -> None:
310def add_annotation(style: str, title: str, content: str) -> None:
311    if style == "info":
312        markdown = f"""<details><summary>{title}</summary>
313
314{truncate_annotation_str(content)}
315</details>"""
316    else:
317        markdown = f"""{title}
318
319{truncate_annotation_str(content)}"""
320    add_annotation_raw(style, markdown)
def get_job_url_from_build_url(build_url: str, build_job_id: str) -> str:
323def get_job_url_from_build_url(build_url: str, build_job_id: str) -> str:
324    return f"{build_url}#{build_job_id}"
def get_job_url_from_pipeline_and_build(pipeline: str, build_number: str | int, build_job_id: str) -> str:
327def get_job_url_from_pipeline_and_build(
328    pipeline: str, build_number: str | int, build_job_id: str
329) -> str:
330    build_url = f"https://buildkite.com/materialize/{pipeline}/builds/{build_number}"
331    return get_job_url_from_build_url(build_url, build_job_id)