misc.python.materialize.buildkite

Buildkite utilities.

  1# Copyright Materialize, Inc. and contributors. All rights reserved.
  2#
  3# Use of this software is governed by the Business Source License
  4# included in the LICENSE file at the root of this repository.
  5#
  6# As of the Change Date specified in that file, in accordance with
  7# the Business Source License, use of this software will be governed
  8# by the Apache License, Version 2.0.
  9
 10"""Buildkite utilities."""
 11
 12import os
 13import subprocess
 14from collections.abc import Callable
 15from enum import Enum, auto
 16from pathlib import Path
 17from typing import Any, TypeVar
 18
 19import yaml
 20
 21from materialize import git, spawn, ui
 22
 23T = TypeVar("T")
 24
 25
 26class BuildkiteEnvVar(Enum):
 27    # environment
 28    BUILDKITE_AGENT_META_DATA_AWS_INSTANCE_TYPE = auto()
 29    BUILDKITE_AGENT_META_DATA_INSTANCE_TYPE = auto()
 30
 31    # build
 32    BUILDKITE_PULL_REQUEST = auto()
 33    BUILDKITE_BUILD_NUMBER = auto()
 34    BUILDKITE_BUILD_ID = auto()
 35    BUILDKITE_PIPELINE_DEFAULT_BRANCH = auto()
 36    BUILDKITE_PULL_REQUEST_BASE_BRANCH = auto()
 37    BUILDKITE_ORGANIZATION_SLUG = auto()
 38    BUILDKITE_PIPELINE_SLUG = auto()
 39    BUILDKITE_BRANCH = auto()
 40    BUILDKITE_COMMIT = auto()
 41    BUILDKITE_BUILD_URL = auto()
 42
 43    # step
 44    BUILDKITE_PARALLEL_JOB = auto()
 45    BUILDKITE_PARALLEL_JOB_COUNT = auto()
 46    BUILDKITE_STEP_KEY = auto()
 47    # will be the same for sharded and retried build steps
 48    BUILDKITE_STEP_ID = auto()
 49    # assumed to be unique
 50    BUILDKITE_JOB_ID = auto()
 51    BUILDKITE_LABEL = auto()
 52    BUILDKITE_RETRY_COUNT = auto()
 53
 54
 55def get_var(var: BuildkiteEnvVar, fallback_value: Any = None) -> Any:
 56    return os.getenv(var.name, fallback_value)
 57
 58
 59def is_in_buildkite() -> bool:
 60    return ui.env_is_truthy("BUILDKITE")
 61
 62
 63def is_in_pull_request() -> bool:
 64    """Note that this is a heuristic."""
 65
 66    if not is_in_buildkite():
 67        return False
 68
 69    if is_pull_request_marker_set():
 70        return True
 71
 72    if is_on_default_branch():
 73        return False
 74
 75    if git.is_on_release_version():
 76        return False
 77
 78    if git.contains_commit("HEAD", "main"):
 79        return False
 80
 81    return True
 82
 83
 84def is_pull_request_marker_set() -> bool:
 85    # If set, this variable will contain either the ID of the pull request or the string "false".
 86    return get_var(BuildkiteEnvVar.BUILDKITE_PULL_REQUEST, "false") != "false"
 87
 88
 89def is_on_default_branch() -> bool:
 90    current_branch = get_var(BuildkiteEnvVar.BUILDKITE_BRANCH, "unknown")
 91    default_branch = get_var(BuildkiteEnvVar.BUILDKITE_PIPELINE_DEFAULT_BRANCH, "main")
 92    return current_branch == default_branch
 93
 94
 95def get_pull_request_base_branch(fallback: str = "main"):
 96    return get_var(BuildkiteEnvVar.BUILDKITE_PULL_REQUEST_BASE_BRANCH, fallback)
 97
 98
 99def get_pipeline_default_branch(fallback: str = "main"):
100    return get_var(BuildkiteEnvVar.BUILDKITE_PIPELINE_DEFAULT_BRANCH, fallback)
101
102
103def get_merge_base(url: str = "https://github.com/MaterializeInc/materialize") -> str:
104    base_branch = get_pull_request_base_branch() or get_pipeline_default_branch()
105    merge_base = git.get_common_ancestor_commit(
106        remote=git.get_remote(url), branch=base_branch
107    )
108    return merge_base
109
110
111def inline_link(url: str, label: str | None = None) -> str:
112    """See https://buildkite.com/docs/pipelines/links-and-images-in-log-output"""
113    link = f"url='{url}'"
114
115    if label:
116        link = f"{link};content='{label}'"
117
118    # These escape codes are not supported by terminals
119    return f"\033]1339;{link}\a" if is_in_buildkite() else f"{label},{url}"
120
121
122def inline_image(url: str, alt: str) -> str:
123    """See https://buildkite.com/docs/pipelines/links-and-images-in-log-output#images-syntax-for-inlining-images"""
124    content = f"url='{url}';alt='{alt}'"
125    # These escape codes are not supported by terminals
126    return f"\033]1338;{content}\a" if is_in_buildkite() else f"{alt},{url}"
127
128
129def find_modified_lines() -> set[tuple[str, int]]:
130    """
131    Find each line that has been added or modified in the current pull request.
132    """
133    merge_base = get_merge_base()
134    print(f"Merge base: {merge_base}")
135    result = spawn.capture(["git", "diff", "-U0", merge_base])
136
137    modified_lines: set[tuple[str, int]] = set()
138    file_path = None
139    for line in result.splitlines():
140        # +++ b/src/adapter/src/coord/command_handler.rs
141        if line.startswith("+++"):
142            file_path = line.removeprefix("+++ b/")
143        # @@ -641,7 +640,6 @@ impl Coordinator {
144        elif line.startswith("@@ "):
145            # We only care about the second value ("+640,6" in the example),
146            # which contains the line number and length of the modified block
147            # in new code state.
148            parts = line.split(" ")[2]
149            if "," in parts:
150                start, length = map(int, parts.split(","))
151            else:
152                start = int(parts)
153                length = 1
154            for line_nr in range(start, start + length):
155                assert file_path
156                modified_lines.add((file_path, line_nr))
157    return modified_lines
158
159
160def upload_artifact(path: Path | str, cwd: Path | None = None, quiet: bool = False):
161    spawn.run_with_retries(
162        lambda: spawn.runv(
163            [
164                "buildkite-agent",
165                "artifact",
166                "upload",
167                "--log-level",
168                "fatal" if quiet else "notice",
169                path,
170            ],
171            cwd=cwd,
172        )
173    )
174
175
176def get_parallelism_index() -> int:
177    _validate_parallelism_configuration()
178    return int(get_var(BuildkiteEnvVar.BUILDKITE_PARALLEL_JOB, 0))
179
180
181def get_parallelism_count() -> int:
182    _validate_parallelism_configuration()
183    return int(get_var(BuildkiteEnvVar.BUILDKITE_PARALLEL_JOB_COUNT, 1))
184
185
186def _upload_shard_info_metadata(items: list[str]) -> None:
187    label = get_var(BuildkiteEnvVar.BUILDKITE_LABEL) or get_var(
188        BuildkiteEnvVar.BUILDKITE_STEP_KEY
189    )
190    spawn.runv(
191        ["buildkite-agent", "meta-data", "set", f"Shard for {label}", ", ".join(items)]
192    )
193
194
195def notify_qa_team_about_failure(failure: str) -> None:
196    if not is_in_buildkite():
197        return
198
199    label = get_var(BuildkiteEnvVar.BUILDKITE_LABEL)
200    message = f"{label}: {failure}"
201    print(message)
202    pipeline = {
203        "notify": [
204            {
205                "slack": {
206                    "channels": ["#team-testing-bots"],
207                    "message": message,
208                },
209                "if": 'build.state == "passed" || build.state == "failed" || build.state == "canceled"',
210            }
211        ]
212    }
213    spawn.runv(
214        ["buildkite-agent", "pipeline", "upload"], stdin=yaml.dump(pipeline).encode()
215    )
216
217
218def shard_list(items: list[T], to_identifier: Callable[[T], str]) -> list[T]:
219    if len(items) == 0:
220        return []
221
222    parallelism_index = get_parallelism_index()
223    parallelism_count = get_parallelism_count()
224
225    if parallelism_count == 1:
226        return items
227
228    accepted_items = [
229        item
230        for i, item in enumerate(items)
231        if i % parallelism_count == parallelism_index
232    ]
233
234    if is_in_buildkite() and accepted_items:
235        _upload_shard_info_metadata(list(map(to_identifier, accepted_items)))
236    return accepted_items
237
238
239def _validate_parallelism_configuration() -> None:
240    job_index = get_var(BuildkiteEnvVar.BUILDKITE_PARALLEL_JOB)
241    job_count = get_var(BuildkiteEnvVar.BUILDKITE_PARALLEL_JOB_COUNT)
242
243    if job_index is None and job_count is None:
244        # OK
245        return
246
247    job_index_desc = f"${BuildkiteEnvVar.BUILDKITE_PARALLEL_JOB.name} (= '{job_index}')"
248    job_count_desc = (
249        f"${BuildkiteEnvVar.BUILDKITE_PARALLEL_JOB_COUNT.name} (= '{job_count}')"
250    )
251    assert (
252        job_index is not None and job_count is not None
253    ), f"{job_index_desc} and {job_count_desc} need to be either both specified or not specified"
254
255    job_index = int(job_index)
256    job_count = int(job_count)
257
258    assert job_count > 0, f"{job_count_desc} not valid"
259    assert (
260        0 <= job_index < job_count
261    ), f"{job_index_desc} out of valid range with {job_count_desc}"
262
263
264def truncate_annotation_str(text: str, max_length: int = 900_000) -> str:
265    # 400 Bad Request: The annotation body must be less than 1 MB
266    return text if len(text) <= max_length else text[:max_length] + "..."
267
268
269def get_artifact_url(artifact: dict[str, Any]) -> str:
270    org = get_var(BuildkiteEnvVar.BUILDKITE_ORGANIZATION_SLUG)
271    pipeline = get_var(BuildkiteEnvVar.BUILDKITE_PIPELINE_SLUG)
272    build = get_var(BuildkiteEnvVar.BUILDKITE_BUILD_NUMBER)
273    return f"https://buildkite.com/organizations/{org}/pipelines/{pipeline}/builds/{build}/jobs/{artifact['job_id']}/artifacts/{artifact['id']}"
274
275
276def add_annotation_raw(style: str, markdown: str) -> None:
277    """
278    Note that this does not trim the data.
279    :param markdown: must not exceed 1 MB
280    """
281    spawn.runv(
282        [
283            "buildkite-agent",
284            "annotate",
285            f"--style={style}",
286            f"--context={os.environ['BUILDKITE_JOB_ID']}-{style}",
287        ],
288        stdin=markdown.encode(),
289    )
290
291
292def add_annotation(style: str, title: str, content: str) -> None:
293    if style == "info":
294        markdown = f"""<details><summary>{title}</summary>
295
296{truncate_annotation_str(content)}
297</details>"""
298    else:
299        markdown = f"""{title}
300
301{truncate_annotation_str(content)}"""
302    add_annotation_raw(style, markdown)
303
304
305def get_job_url_from_build_url(build_url: str, build_job_id: str) -> str:
306    return f"{build_url}#{build_job_id}"
307
308
309def get_job_url_from_pipeline_and_build(
310    pipeline: str, build_number: str | int, build_job_id: str
311) -> str:
312    build_url = f"https://buildkite.com/materialize/{pipeline}/builds/{build_number}"
313    return get_job_url_from_build_url(build_url, build_job_id)
314
315
316def get_build_status(build: str) -> str:
317    return spawn.capture(
318        ["buildkite-agent", "meta-data", "get", build],
319        stderr=subprocess.DEVNULL,
320    )
class BuildkiteEnvVar(enum.Enum):
27class BuildkiteEnvVar(Enum):
28    # environment
29    BUILDKITE_AGENT_META_DATA_AWS_INSTANCE_TYPE = auto()
30    BUILDKITE_AGENT_META_DATA_INSTANCE_TYPE = auto()
31
32    # build
33    BUILDKITE_PULL_REQUEST = auto()
34    BUILDKITE_BUILD_NUMBER = auto()
35    BUILDKITE_BUILD_ID = auto()
36    BUILDKITE_PIPELINE_DEFAULT_BRANCH = auto()
37    BUILDKITE_PULL_REQUEST_BASE_BRANCH = auto()
38    BUILDKITE_ORGANIZATION_SLUG = auto()
39    BUILDKITE_PIPELINE_SLUG = auto()
40    BUILDKITE_BRANCH = auto()
41    BUILDKITE_COMMIT = auto()
42    BUILDKITE_BUILD_URL = auto()
43
44    # step
45    BUILDKITE_PARALLEL_JOB = auto()
46    BUILDKITE_PARALLEL_JOB_COUNT = auto()
47    BUILDKITE_STEP_KEY = auto()
48    # will be the same for sharded and retried build steps
49    BUILDKITE_STEP_ID = auto()
50    # assumed to be unique
51    BUILDKITE_JOB_ID = auto()
52    BUILDKITE_LABEL = auto()
53    BUILDKITE_RETRY_COUNT = auto()
BUILDKITE_AGENT_META_DATA_AWS_INSTANCE_TYPE = <BuildkiteEnvVar.BUILDKITE_AGENT_META_DATA_AWS_INSTANCE_TYPE: 1>
BUILDKITE_AGENT_META_DATA_INSTANCE_TYPE = <BuildkiteEnvVar.BUILDKITE_AGENT_META_DATA_INSTANCE_TYPE: 2>
BUILDKITE_PULL_REQUEST = <BuildkiteEnvVar.BUILDKITE_PULL_REQUEST: 3>
BUILDKITE_BUILD_NUMBER = <BuildkiteEnvVar.BUILDKITE_BUILD_NUMBER: 4>
BUILDKITE_BUILD_ID = <BuildkiteEnvVar.BUILDKITE_BUILD_ID: 5>
BUILDKITE_PIPELINE_DEFAULT_BRANCH = <BuildkiteEnvVar.BUILDKITE_PIPELINE_DEFAULT_BRANCH: 6>
BUILDKITE_PULL_REQUEST_BASE_BRANCH = <BuildkiteEnvVar.BUILDKITE_PULL_REQUEST_BASE_BRANCH: 7>
BUILDKITE_ORGANIZATION_SLUG = <BuildkiteEnvVar.BUILDKITE_ORGANIZATION_SLUG: 8>
BUILDKITE_PIPELINE_SLUG = <BuildkiteEnvVar.BUILDKITE_PIPELINE_SLUG: 9>
BUILDKITE_BRANCH = <BuildkiteEnvVar.BUILDKITE_BRANCH: 10>
BUILDKITE_COMMIT = <BuildkiteEnvVar.BUILDKITE_COMMIT: 11>
BUILDKITE_BUILD_URL = <BuildkiteEnvVar.BUILDKITE_BUILD_URL: 12>
BUILDKITE_PARALLEL_JOB = <BuildkiteEnvVar.BUILDKITE_PARALLEL_JOB: 13>
BUILDKITE_PARALLEL_JOB_COUNT = <BuildkiteEnvVar.BUILDKITE_PARALLEL_JOB_COUNT: 14>
BUILDKITE_STEP_KEY = <BuildkiteEnvVar.BUILDKITE_STEP_KEY: 15>
BUILDKITE_STEP_ID = <BuildkiteEnvVar.BUILDKITE_STEP_ID: 16>
BUILDKITE_JOB_ID = <BuildkiteEnvVar.BUILDKITE_JOB_ID: 17>
BUILDKITE_LABEL = <BuildkiteEnvVar.BUILDKITE_LABEL: 18>
BUILDKITE_RETRY_COUNT = <BuildkiteEnvVar.BUILDKITE_RETRY_COUNT: 19>
def get_var( var: BuildkiteEnvVar, fallback_value: Any = None) -> Any:
56def get_var(var: BuildkiteEnvVar, fallback_value: Any = None) -> Any:
57    return os.getenv(var.name, fallback_value)
def is_in_buildkite() -> bool:
60def is_in_buildkite() -> bool:
61    return ui.env_is_truthy("BUILDKITE")
def is_in_pull_request() -> bool:
64def is_in_pull_request() -> bool:
65    """Note that this is a heuristic."""
66
67    if not is_in_buildkite():
68        return False
69
70    if is_pull_request_marker_set():
71        return True
72
73    if is_on_default_branch():
74        return False
75
76    if git.is_on_release_version():
77        return False
78
79    if git.contains_commit("HEAD", "main"):
80        return False
81
82    return True

Note that this is a heuristic.

def is_pull_request_marker_set() -> bool:
85def is_pull_request_marker_set() -> bool:
86    # If set, this variable will contain either the ID of the pull request or the string "false".
87    return get_var(BuildkiteEnvVar.BUILDKITE_PULL_REQUEST, "false") != "false"
def is_on_default_branch() -> bool:
90def is_on_default_branch() -> bool:
91    current_branch = get_var(BuildkiteEnvVar.BUILDKITE_BRANCH, "unknown")
92    default_branch = get_var(BuildkiteEnvVar.BUILDKITE_PIPELINE_DEFAULT_BRANCH, "main")
93    return current_branch == default_branch
def get_pull_request_base_branch(fallback: str = 'main'):
96def get_pull_request_base_branch(fallback: str = "main"):
97    return get_var(BuildkiteEnvVar.BUILDKITE_PULL_REQUEST_BASE_BRANCH, fallback)
def get_pipeline_default_branch(fallback: str = 'main'):
100def get_pipeline_default_branch(fallback: str = "main"):
101    return get_var(BuildkiteEnvVar.BUILDKITE_PIPELINE_DEFAULT_BRANCH, fallback)
def get_merge_base(url: str = 'https://github.com/MaterializeInc/materialize') -> str:
104def get_merge_base(url: str = "https://github.com/MaterializeInc/materialize") -> str:
105    base_branch = get_pull_request_base_branch() or get_pipeline_default_branch()
106    merge_base = git.get_common_ancestor_commit(
107        remote=git.get_remote(url), branch=base_branch
108    )
109    return merge_base
def inline_image(url: str, alt: str) -> str:
123def inline_image(url: str, alt: str) -> str:
124    """See https://buildkite.com/docs/pipelines/links-and-images-in-log-output#images-syntax-for-inlining-images"""
125    content = f"url='{url}';alt='{alt}'"
126    # These escape codes are not supported by terminals
127    return f"\033]1338;{content}\a" if is_in_buildkite() else f"{alt},{url}"
def find_modified_lines() -> set[tuple[str, int]]:
130def find_modified_lines() -> set[tuple[str, int]]:
131    """
132    Find each line that has been added or modified in the current pull request.
133    """
134    merge_base = get_merge_base()
135    print(f"Merge base: {merge_base}")
136    result = spawn.capture(["git", "diff", "-U0", merge_base])
137
138    modified_lines: set[tuple[str, int]] = set()
139    file_path = None
140    for line in result.splitlines():
141        # +++ b/src/adapter/src/coord/command_handler.rs
142        if line.startswith("+++"):
143            file_path = line.removeprefix("+++ b/")
144        # @@ -641,7 +640,6 @@ impl Coordinator {
145        elif line.startswith("@@ "):
146            # We only care about the second value ("+640,6" in the example),
147            # which contains the line number and length of the modified block
148            # in new code state.
149            parts = line.split(" ")[2]
150            if "," in parts:
151                start, length = map(int, parts.split(","))
152            else:
153                start = int(parts)
154                length = 1
155            for line_nr in range(start, start + length):
156                assert file_path
157                modified_lines.add((file_path, line_nr))
158    return modified_lines

Find each line that has been added or modified in the current pull request.

def upload_artifact( path: pathlib.Path | str, cwd: pathlib.Path | None = None, quiet: bool = False):
161def upload_artifact(path: Path | str, cwd: Path | None = None, quiet: bool = False):
162    spawn.run_with_retries(
163        lambda: spawn.runv(
164            [
165                "buildkite-agent",
166                "artifact",
167                "upload",
168                "--log-level",
169                "fatal" if quiet else "notice",
170                path,
171            ],
172            cwd=cwd,
173        )
174    )
def get_parallelism_index() -> int:
177def get_parallelism_index() -> int:
178    _validate_parallelism_configuration()
179    return int(get_var(BuildkiteEnvVar.BUILDKITE_PARALLEL_JOB, 0))
def get_parallelism_count() -> int:
182def get_parallelism_count() -> int:
183    _validate_parallelism_configuration()
184    return int(get_var(BuildkiteEnvVar.BUILDKITE_PARALLEL_JOB_COUNT, 1))
def notify_qa_team_about_failure(failure: str) -> None:
196def notify_qa_team_about_failure(failure: str) -> None:
197    if not is_in_buildkite():
198        return
199
200    label = get_var(BuildkiteEnvVar.BUILDKITE_LABEL)
201    message = f"{label}: {failure}"
202    print(message)
203    pipeline = {
204        "notify": [
205            {
206                "slack": {
207                    "channels": ["#team-testing-bots"],
208                    "message": message,
209                },
210                "if": 'build.state == "passed" || build.state == "failed" || build.state == "canceled"',
211            }
212        ]
213    }
214    spawn.runv(
215        ["buildkite-agent", "pipeline", "upload"], stdin=yaml.dump(pipeline).encode()
216    )
def shard_list(items: list[~T], to_identifier: Callable[[~T], str]) -> list[~T]:
219def shard_list(items: list[T], to_identifier: Callable[[T], str]) -> list[T]:
220    if len(items) == 0:
221        return []
222
223    parallelism_index = get_parallelism_index()
224    parallelism_count = get_parallelism_count()
225
226    if parallelism_count == 1:
227        return items
228
229    accepted_items = [
230        item
231        for i, item in enumerate(items)
232        if i % parallelism_count == parallelism_index
233    ]
234
235    if is_in_buildkite() and accepted_items:
236        _upload_shard_info_metadata(list(map(to_identifier, accepted_items)))
237    return accepted_items
def truncate_annotation_str(text: str, max_length: int = 900000) -> str:
265def truncate_annotation_str(text: str, max_length: int = 900_000) -> str:
266    # 400 Bad Request: The annotation body must be less than 1 MB
267    return text if len(text) <= max_length else text[:max_length] + "..."
def get_artifact_url(artifact: dict[str, typing.Any]) -> str:
270def get_artifact_url(artifact: dict[str, Any]) -> str:
271    org = get_var(BuildkiteEnvVar.BUILDKITE_ORGANIZATION_SLUG)
272    pipeline = get_var(BuildkiteEnvVar.BUILDKITE_PIPELINE_SLUG)
273    build = get_var(BuildkiteEnvVar.BUILDKITE_BUILD_NUMBER)
274    return f"https://buildkite.com/organizations/{org}/pipelines/{pipeline}/builds/{build}/jobs/{artifact['job_id']}/artifacts/{artifact['id']}"
def add_annotation_raw(style: str, markdown: str) -> None:
277def add_annotation_raw(style: str, markdown: str) -> None:
278    """
279    Note that this does not trim the data.
280    :param markdown: must not exceed 1 MB
281    """
282    spawn.runv(
283        [
284            "buildkite-agent",
285            "annotate",
286            f"--style={style}",
287            f"--context={os.environ['BUILDKITE_JOB_ID']}-{style}",
288        ],
289        stdin=markdown.encode(),
290    )

Note that this does not trim the data.

Parameters
  • markdown: must not exceed 1 MB
def add_annotation(style: str, title: str, content: str) -> None:
293def add_annotation(style: str, title: str, content: str) -> None:
294    if style == "info":
295        markdown = f"""<details><summary>{title}</summary>
296
297{truncate_annotation_str(content)}
298</details>"""
299    else:
300        markdown = f"""{title}
301
302{truncate_annotation_str(content)}"""
303    add_annotation_raw(style, markdown)
def get_job_url_from_build_url(build_url: str, build_job_id: str) -> str:
306def get_job_url_from_build_url(build_url: str, build_job_id: str) -> str:
307    return f"{build_url}#{build_job_id}"
def get_job_url_from_pipeline_and_build(pipeline: str, build_number: str | int, build_job_id: str) -> str:
310def get_job_url_from_pipeline_and_build(
311    pipeline: str, build_number: str | int, build_job_id: str
312) -> str:
313    build_url = f"https://buildkite.com/materialize/{pipeline}/builds/{build_number}"
314    return get_job_url_from_build_url(build_url, build_job_id)
def get_build_status(build: str) -> str:
317def get_build_status(build: str) -> str:
318    return spawn.capture(
319        ["buildkite-agent", "meta-data", "get", build],
320        stderr=subprocess.DEVNULL,
321    )