Module materialize.buildkite

Buildkite utilities.

Expand source code Browse git
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.

"""Buildkite utilities."""

import hashlib
import os
from collections.abc import Callable
from enum import Enum, auto
from pathlib import Path
from typing import Any, TypeVar

from materialize import git, spawn, ui

T = TypeVar("T")


class BuildkiteEnvVar(Enum):
    BUILDKITE_PULL_REQUEST = auto()
    BUILDKITE_BUILD_NUMBER = auto()
    BUILDKITE_PIPELINE_DEFAULT_BRANCH = auto()
    BUILDKITE_PULL_REQUEST_BASE_BRANCH = auto()
    BUILDKITE_ORGANIZATION_SLUG = auto()
    BUILDKITE_PIPELINE_SLUG = auto()
    BUILDKITE_BRANCH = auto()

    BUILDKITE_PARALLEL_JOB = auto()
    BUILDKITE_PARALLEL_JOB_COUNT = auto()
    BUILDKITE_STEP_KEY = auto()
    BUILDKITE_LABEL = auto()


def get_var(var: BuildkiteEnvVar, fallback_value: Any = None) -> Any:
    return os.getenv(var.name, fallback_value)


def is_in_buildkite() -> bool:
    return ui.env_is_truthy("BUILDKITE")


def is_in_pull_request() -> bool:
    """Note that this is a heuristic."""

    if not is_in_buildkite():
        return False

    if is_pull_request_marker_set():
        return True

    if is_on_default_branch():
        return False

    if git.is_on_release_version():
        return False

    return True


def is_pull_request_marker_set() -> bool:
    # If set, this variable will contain either the ID of the pull request or the string "false".
    return get_var(BuildkiteEnvVar.BUILDKITE_PULL_REQUEST, "false") != "false"


def is_on_default_branch() -> bool:
    current_branch = get_var(BuildkiteEnvVar.BUILDKITE_BRANCH, "unknown")
    default_branch = get_var(BuildkiteEnvVar.BUILDKITE_PIPELINE_DEFAULT_BRANCH, "main")
    return current_branch == default_branch


def get_pull_request_base_branch(fallback: str = "main"):
    return get_var(BuildkiteEnvVar.BUILDKITE_PULL_REQUEST_BASE_BRANCH, fallback)


def get_pipeline_default_branch(fallback: str = "main"):
    return get_var(BuildkiteEnvVar.BUILDKITE_PIPELINE_DEFAULT_BRANCH, fallback)


def get_merge_base(url: str = "https://github.com/MaterializeInc/materialize") -> str:
    base_branch = get_pull_request_base_branch() or get_pipeline_default_branch()
    merge_base = git.get_common_ancestor_commit(
        remote=git.get_remote(url), branch=base_branch, fetch_branch=True
    )
    return merge_base


def inline_link(url: str, label: str | None = None) -> str:
    """See https://buildkite.com/docs/pipelines/links-and-images-in-log-output"""
    link = f"url='{url}'"

    if label:
        link = f"{link};content='{label}'"

    # These escape codes are not supported by terminals
    return f"\033]1339;{link}\a" if is_in_buildkite() else f"{label},{url}"


def inline_image(url: str, alt: str) -> str:
    """See https://buildkite.com/docs/pipelines/links-and-images-in-log-output#images-syntax-for-inlining-images"""
    content = f"url='\"{url}\"';alt='\"{alt}\"'"
    # These escape codes are not supported by terminals
    return f"\033]1338;{content}\a" if is_in_buildkite() else f"{alt},{url}"


def find_modified_lines() -> set[tuple[str, int]]:
    """
    Find each line that has been added or modified in the current pull request.
    """
    merge_base = get_merge_base()
    print(f"Merge base: {merge_base}")
    result = spawn.capture(["git", "diff", "-U0", merge_base])

    modified_lines: set[tuple[str, int]] = set()
    file_path = None
    for line in result.splitlines():
        # +++ b/src/adapter/src/coord/command_handler.rs
        if line.startswith("+++"):
            file_path = line.removeprefix("+++ b/")
        # @@ -641,7 +640,6 @@ impl Coordinator {
        elif line.startswith("@@ "):
            # We only care about the second value ("+640,6" in the example),
            # which contains the line number and length of the modified block
            # in new code state.
            parts = line.split(" ")[2]
            if "," in parts:
                start, length = map(int, parts.split(","))
            else:
                start = int(parts)
                length = 1
            for line_nr in range(start, start + length):
                assert file_path
                modified_lines.add((file_path, line_nr))
    return modified_lines


def upload_artifact(path: Path | str, cwd: Path | None = None):
    spawn.runv(
        [
            "buildkite-agent",
            "artifact",
            "upload",
            path,
        ],
        cwd=cwd,
    )


def get_parallelism_index() -> int:
    _validate_parallelism_configuration()
    return int(get_var(BuildkiteEnvVar.BUILDKITE_PARALLEL_JOB, 0))


def get_parallelism_count() -> int:
    _validate_parallelism_configuration()
    return int(get_var(BuildkiteEnvVar.BUILDKITE_PARALLEL_JOB_COUNT, 1))


def _accepted_by_shard(
    identifier: str,
    parallelism_index: int | None = None,
    parallelism_count: int | None = None,
) -> bool:
    if parallelism_index is None:
        parallelism_index = get_parallelism_index()
    if parallelism_count is None:
        parallelism_count = get_parallelism_count()

    if parallelism_count == 1:
        return True

    hash_value = int.from_bytes(
        hashlib.md5(identifier.encode("utf-8")).digest(), byteorder="big"
    )
    return hash_value % parallelism_count == parallelism_index


def _upload_shard_info_metadata(items: list[str]) -> None:
    label = get_var(BuildkiteEnvVar.BUILDKITE_LABEL) or get_var(
        BuildkiteEnvVar.BUILDKITE_STEP_KEY
    )
    spawn.runv(
        ["buildkite-agent", "meta-data", "set", f"Shard for {label}", ", ".join(items)]
    )


def shard_list(items: list[T], to_identifier: Callable[[T], str]) -> list[T]:
    parallelism_index = get_parallelism_index()
    parallelism_count = get_parallelism_count()

    if parallelism_count == 1:
        return items

    if is_in_buildkite():
        _upload_shard_info_metadata(
            [
                to_identifier(item)
                for item in items
                if _accepted_by_shard(
                    to_identifier(item), parallelism_index, parallelism_count
                )
            ]
        )
    return [
        item
        for item in items
        if _accepted_by_shard(to_identifier(item), parallelism_index, parallelism_count)
    ]


def _validate_parallelism_configuration() -> None:
    job_index = get_var(BuildkiteEnvVar.BUILDKITE_PARALLEL_JOB)
    job_count = get_var(BuildkiteEnvVar.BUILDKITE_PARALLEL_JOB_COUNT)

    if job_index is None and job_count is None:
        # OK
        return

    job_index_desc = f"${BuildkiteEnvVar.BUILDKITE_PARALLEL_JOB.name} (= '{job_index}')"
    job_count_desc = (
        f"${BuildkiteEnvVar.BUILDKITE_PARALLEL_JOB_COUNT.name} (= '{job_count}')"
    )
    assert (
        job_index is not None and job_count is not None
    ), f"{job_index_desc} and {job_count_desc} need to be either both specified or not specified"

    job_index = int(job_index)
    job_count = int(job_count)

    assert job_count > 0, f"{job_count_desc} not valid"
    assert (
        0 <= job_index < job_count
    ), f"{job_index_desc} out of valid range with {job_count_desc}"


def truncate_str(text: str, length: int = 900_000) -> str:
    # 400 Bad Request: The annotation body must be less than 1 MB
    return text if len(text) <= length else text[:length] + "..."


def get_artifact_url(artifact: dict[str, Any]) -> str:
    org = get_var(BuildkiteEnvVar.BUILDKITE_ORGANIZATION_SLUG)
    pipeline = get_var(BuildkiteEnvVar.BUILDKITE_PIPELINE_SLUG)
    build = get_var(BuildkiteEnvVar.BUILDKITE_BUILD_NUMBER)
    return f"https://buildkite.com/organizations/{org}/pipelines/{pipeline}/builds/{build}/jobs/{artifact['job_id']}/artifacts/{artifact['id']}"


def add_annotation_raw(style: str, markdown: str) -> None:
    spawn.runv(
        [
            "buildkite-agent",
            "annotate",
            f"--style={style}",
            f"--context={os.environ['BUILDKITE_JOB_ID']}-{style}",
        ],
        stdin=markdown.encode(),
    )


def add_annotation(style: str, title: str, content: str) -> None:
    if style == "info":
        markdown = f"""<details><summary>{title}</summary>

{truncate_str(content)}
</details>"""
    else:
        markdown = f"""{title}

{truncate_str(content)}"""
    add_annotation_raw(style, markdown)

Functions

def add_annotation(style: str, title: str, content: str) ‑> None
Expand source code Browse git
def add_annotation(style: str, title: str, content: str) -> None:
    if style == "info":
        markdown = f"""<details><summary>{title}</summary>

{truncate_str(content)}
</details>"""
    else:
        markdown = f"""{title}

{truncate_str(content)}"""
    add_annotation_raw(style, markdown)
def add_annotation_raw(style: str, markdown: str) ‑> None
Expand source code Browse git
def add_annotation_raw(style: str, markdown: str) -> None:
    spawn.runv(
        [
            "buildkite-agent",
            "annotate",
            f"--style={style}",
            f"--context={os.environ['BUILDKITE_JOB_ID']}-{style}",
        ],
        stdin=markdown.encode(),
    )
def find_modified_lines() ‑> set[tuple[str, int]]

Find each line that has been added or modified in the current pull request.

Expand source code Browse git
def find_modified_lines() -> set[tuple[str, int]]:
    """
    Find each line that has been added or modified in the current pull request.
    """
    merge_base = get_merge_base()
    print(f"Merge base: {merge_base}")
    result = spawn.capture(["git", "diff", "-U0", merge_base])

    modified_lines: set[tuple[str, int]] = set()
    file_path = None
    for line in result.splitlines():
        # +++ b/src/adapter/src/coord/command_handler.rs
        if line.startswith("+++"):
            file_path = line.removeprefix("+++ b/")
        # @@ -641,7 +640,6 @@ impl Coordinator {
        elif line.startswith("@@ "):
            # We only care about the second value ("+640,6" in the example),
            # which contains the line number and length of the modified block
            # in new code state.
            parts = line.split(" ")[2]
            if "," in parts:
                start, length = map(int, parts.split(","))
            else:
                start = int(parts)
                length = 1
            for line_nr in range(start, start + length):
                assert file_path
                modified_lines.add((file_path, line_nr))
    return modified_lines
def get_artifact_url(artifact: dict[str, typing.Any]) ‑> str
Expand source code Browse git
def get_artifact_url(artifact: dict[str, Any]) -> str:
    org = get_var(BuildkiteEnvVar.BUILDKITE_ORGANIZATION_SLUG)
    pipeline = get_var(BuildkiteEnvVar.BUILDKITE_PIPELINE_SLUG)
    build = get_var(BuildkiteEnvVar.BUILDKITE_BUILD_NUMBER)
    return f"https://buildkite.com/organizations/{org}/pipelines/{pipeline}/builds/{build}/jobs/{artifact['job_id']}/artifacts/{artifact['id']}"
def get_merge_base(url: str = 'https://github.com/MaterializeInc/materialize') ‑> str
Expand source code Browse git
def get_merge_base(url: str = "https://github.com/MaterializeInc/materialize") -> str:
    base_branch = get_pull_request_base_branch() or get_pipeline_default_branch()
    merge_base = git.get_common_ancestor_commit(
        remote=git.get_remote(url), branch=base_branch, fetch_branch=True
    )
    return merge_base
def get_parallelism_count() ‑> int
Expand source code Browse git
def get_parallelism_count() -> int:
    _validate_parallelism_configuration()
    return int(get_var(BuildkiteEnvVar.BUILDKITE_PARALLEL_JOB_COUNT, 1))
def get_parallelism_index() ‑> int
Expand source code Browse git
def get_parallelism_index() -> int:
    _validate_parallelism_configuration()
    return int(get_var(BuildkiteEnvVar.BUILDKITE_PARALLEL_JOB, 0))
def get_pipeline_default_branch(fallback: str = 'main')
Expand source code Browse git
def get_pipeline_default_branch(fallback: str = "main"):
    return get_var(BuildkiteEnvVar.BUILDKITE_PIPELINE_DEFAULT_BRANCH, fallback)
def get_pull_request_base_branch(fallback: str = 'main')
Expand source code Browse git
def get_pull_request_base_branch(fallback: str = "main"):
    return get_var(BuildkiteEnvVar.BUILDKITE_PULL_REQUEST_BASE_BRANCH, fallback)
def get_var(var: BuildkiteEnvVar, fallback_value: Any = None) ‑> Any
Expand source code Browse git
def get_var(var: BuildkiteEnvVar, fallback_value: Any = None) -> Any:
    return os.getenv(var.name, fallback_value)
def inline_image(url: str, alt: str) ‑> str
Expand source code Browse git
def inline_image(url: str, alt: str) -> str:
    """See https://buildkite.com/docs/pipelines/links-and-images-in-log-output#images-syntax-for-inlining-images"""
    content = f"url='\"{url}\"';alt='\"{alt}\"'"
    # These escape codes are not supported by terminals
    return f"\033]1338;{content}\a" if is_in_buildkite() else f"{alt},{url}"
Expand source code Browse git
def inline_link(url: str, label: str | None = None) -> str:
    """See https://buildkite.com/docs/pipelines/links-and-images-in-log-output"""
    link = f"url='{url}'"

    if label:
        link = f"{link};content='{label}'"

    # These escape codes are not supported by terminals
    return f"\033]1339;{link}\a" if is_in_buildkite() else f"{label},{url}"
def is_in_buildkite() ‑> bool
Expand source code Browse git
def is_in_buildkite() -> bool:
    return ui.env_is_truthy("BUILDKITE")
def is_in_pull_request() ‑> bool

Note that this is a heuristic.

Expand source code Browse git
def is_in_pull_request() -> bool:
    """Note that this is a heuristic."""

    if not is_in_buildkite():
        return False

    if is_pull_request_marker_set():
        return True

    if is_on_default_branch():
        return False

    if git.is_on_release_version():
        return False

    return True
def is_on_default_branch() ‑> bool
Expand source code Browse git
def is_on_default_branch() -> bool:
    current_branch = get_var(BuildkiteEnvVar.BUILDKITE_BRANCH, "unknown")
    default_branch = get_var(BuildkiteEnvVar.BUILDKITE_PIPELINE_DEFAULT_BRANCH, "main")
    return current_branch == default_branch
def is_pull_request_marker_set() ‑> bool
Expand source code Browse git
def is_pull_request_marker_set() -> bool:
    # If set, this variable will contain either the ID of the pull request or the string "false".
    return get_var(BuildkiteEnvVar.BUILDKITE_PULL_REQUEST, "false") != "false"
def shard_list(items: list[~T], to_identifier: collections.abc.Callable[[~T], str]) ‑> list[~T]
Expand source code Browse git
def shard_list(items: list[T], to_identifier: Callable[[T], str]) -> list[T]:
    parallelism_index = get_parallelism_index()
    parallelism_count = get_parallelism_count()

    if parallelism_count == 1:
        return items

    if is_in_buildkite():
        _upload_shard_info_metadata(
            [
                to_identifier(item)
                for item in items
                if _accepted_by_shard(
                    to_identifier(item), parallelism_index, parallelism_count
                )
            ]
        )
    return [
        item
        for item in items
        if _accepted_by_shard(to_identifier(item), parallelism_index, parallelism_count)
    ]
def truncate_str(text: str, length: int = 900000) ‑> str
Expand source code Browse git
def truncate_str(text: str, length: int = 900_000) -> str:
    # 400 Bad Request: The annotation body must be less than 1 MB
    return text if len(text) <= length else text[:length] + "..."
def upload_artifact(path: pathlib.Path | str, cwd: pathlib.Path | None = None)
Expand source code Browse git
def upload_artifact(path: Path | str, cwd: Path | None = None):
    spawn.runv(
        [
            "buildkite-agent",
            "artifact",
            "upload",
            path,
        ],
        cwd=cwd,
    )

Classes

class BuildkiteEnvVar (*args, **kwds)

Create a collection of name/value pairs.

Example enumeration:

>>> class Color(Enum):
...     RED = 1
...     BLUE = 2
...     GREEN = 3

Access them by:

  • attribute access::
>>> Color.RED
<Color.RED: 1>
  • value lookup:
>>> Color(1)
<Color.RED: 1>
  • name lookup:
>>> Color['RED']
<Color.RED: 1>

Enumerations can be iterated over, and know how many members they have:

>>> len(Color)
3
>>> list(Color)
[<Color.RED: 1>, <Color.BLUE: 2>, <Color.GREEN: 3>]

Methods can be added to enumerations, and members can have their own attributes – see the documentation for details.

Expand source code Browse git
class BuildkiteEnvVar(Enum):
    BUILDKITE_PULL_REQUEST = auto()
    BUILDKITE_BUILD_NUMBER = auto()
    BUILDKITE_PIPELINE_DEFAULT_BRANCH = auto()
    BUILDKITE_PULL_REQUEST_BASE_BRANCH = auto()
    BUILDKITE_ORGANIZATION_SLUG = auto()
    BUILDKITE_PIPELINE_SLUG = auto()
    BUILDKITE_BRANCH = auto()

    BUILDKITE_PARALLEL_JOB = auto()
    BUILDKITE_PARALLEL_JOB_COUNT = auto()
    BUILDKITE_STEP_KEY = auto()
    BUILDKITE_LABEL = auto()

Ancestors

  • enum.Enum

Class variables

var BUILDKITE_BRANCH
var BUILDKITE_BUILD_NUMBER
var BUILDKITE_LABEL
var BUILDKITE_ORGANIZATION_SLUG
var BUILDKITE_PARALLEL_JOB
var BUILDKITE_PARALLEL_JOB_COUNT
var BUILDKITE_PIPELINE_DEFAULT_BRANCH
var BUILDKITE_PIPELINE_SLUG
var BUILDKITE_PULL_REQUEST
var BUILDKITE_PULL_REQUEST_BASE_BRANCH
var BUILDKITE_STEP_KEY