Module materialize.cloudtest.k8s.api.k8s_resource

Expand source code Browse git
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.
import os
import subprocess

from kubernetes.client import AppsV1Api, CoreV1Api, RbacAuthorizationV1Api
from kubernetes.config import new_client_from_config  # type: ignore

from materialize import MZ_ROOT, mzbuild, ui
from materialize.cloudtest import DEFAULT_K8S_CONTEXT_NAME
from materialize.cloudtest.util.common import run_process_with_error_information
from materialize.cloudtest.util.wait import wait
from materialize.rustc_flags import Sanitizer


class K8sResource:
    def __init__(self, namespace: str):
        self.selected_namespace = namespace

    def kubectl(
        self,
        *args: str,
        input: str | None = None,
        capture_output: bool = False,
        suppress_command_error_output: bool = False,
    ) -> None:
        cmd = [
            "kubectl",
            "--context",
            self.context(),
            "--namespace",
            self.namespace(),
            *args,
        ]

        if suppress_command_error_output:
            subprocess.run(
                cmd, text=True, input=input, check=True, capture_output=capture_output
            )
        else:
            run_process_with_error_information(
                cmd, input, capture_output=capture_output
            )

    def api(self) -> CoreV1Api:
        api_client = new_client_from_config(context=self.context())
        return CoreV1Api(api_client)

    def apps_api(self) -> AppsV1Api:
        api_client = new_client_from_config(context=self.context())
        return AppsV1Api(api_client)

    def rbac_api(self) -> RbacAuthorizationV1Api:
        api_client = new_client_from_config(context=self.context())
        return RbacAuthorizationV1Api(api_client)

    def context(self) -> str:
        return DEFAULT_K8S_CONTEXT_NAME

    def namespace(self) -> str:
        return self.selected_namespace

    def kind(self) -> str:
        assert False

    def create(self) -> None:
        assert False

    def image(
        self,
        service: str,
        tag: str | None = None,
        release_mode: bool = True,
        org: str | None = "materialize",
    ) -> str:
        if tag is not None:
            image_name = f"{service}:{tag}"
            if org is not None:
                image_name = f"{org}/{image_name}"

            return image_name
        else:
            coverage = ui.env_is_truthy("CI_COVERAGE_ENABLED")
            sanitizer = Sanitizer[os.getenv("CI_SANITIZER", "none")]
            repo = mzbuild.Repository(
                MZ_ROOT,
                release_mode=release_mode,
                coverage=coverage,
                sanitizer=sanitizer,
            )
            deps = repo.resolve_dependencies([repo.images[service]])
            rimage = deps[service]
            return rimage.spec()

    def wait(
        self,
        condition: str,
        resource: str,
    ) -> None:
        wait(condition=condition, resource=resource, namespace=self.selected_namespace)

Classes

class K8sResource (namespace: str)
Expand source code Browse git
class K8sResource:
    def __init__(self, namespace: str):
        self.selected_namespace = namespace

    def kubectl(
        self,
        *args: str,
        input: str | None = None,
        capture_output: bool = False,
        suppress_command_error_output: bool = False,
    ) -> None:
        cmd = [
            "kubectl",
            "--context",
            self.context(),
            "--namespace",
            self.namespace(),
            *args,
        ]

        if suppress_command_error_output:
            subprocess.run(
                cmd, text=True, input=input, check=True, capture_output=capture_output
            )
        else:
            run_process_with_error_information(
                cmd, input, capture_output=capture_output
            )

    def api(self) -> CoreV1Api:
        api_client = new_client_from_config(context=self.context())
        return CoreV1Api(api_client)

    def apps_api(self) -> AppsV1Api:
        api_client = new_client_from_config(context=self.context())
        return AppsV1Api(api_client)

    def rbac_api(self) -> RbacAuthorizationV1Api:
        api_client = new_client_from_config(context=self.context())
        return RbacAuthorizationV1Api(api_client)

    def context(self) -> str:
        return DEFAULT_K8S_CONTEXT_NAME

    def namespace(self) -> str:
        return self.selected_namespace

    def kind(self) -> str:
        assert False

    def create(self) -> None:
        assert False

    def image(
        self,
        service: str,
        tag: str | None = None,
        release_mode: bool = True,
        org: str | None = "materialize",
    ) -> str:
        if tag is not None:
            image_name = f"{service}:{tag}"
            if org is not None:
                image_name = f"{org}/{image_name}"

            return image_name
        else:
            coverage = ui.env_is_truthy("CI_COVERAGE_ENABLED")
            sanitizer = Sanitizer[os.getenv("CI_SANITIZER", "none")]
            repo = mzbuild.Repository(
                MZ_ROOT,
                release_mode=release_mode,
                coverage=coverage,
                sanitizer=sanitizer,
            )
            deps = repo.resolve_dependencies([repo.images[service]])
            rimage = deps[service]
            return rimage.spec()

    def wait(
        self,
        condition: str,
        resource: str,
    ) -> None:
        wait(condition=condition, resource=resource, namespace=self.selected_namespace)

Subclasses

Methods

def api(self) ‑> kubernetes.client.api.core_v1_api.CoreV1Api
Expand source code Browse git
def api(self) -> CoreV1Api:
    api_client = new_client_from_config(context=self.context())
    return CoreV1Api(api_client)
def apps_api(self) ‑> kubernetes.client.api.apps_v1_api.AppsV1Api
Expand source code Browse git
def apps_api(self) -> AppsV1Api:
    api_client = new_client_from_config(context=self.context())
    return AppsV1Api(api_client)
def context(self) ‑> str
Expand source code Browse git
def context(self) -> str:
    return DEFAULT_K8S_CONTEXT_NAME
def create(self) ‑> None
Expand source code Browse git
def create(self) -> None:
    assert False
def image(self, service: str, tag: str | None = None, release_mode: bool = True, org: str | None = 'materialize') ‑> str
Expand source code Browse git
def image(
    self,
    service: str,
    tag: str | None = None,
    release_mode: bool = True,
    org: str | None = "materialize",
) -> str:
    if tag is not None:
        image_name = f"{service}:{tag}"
        if org is not None:
            image_name = f"{org}/{image_name}"

        return image_name
    else:
        coverage = ui.env_is_truthy("CI_COVERAGE_ENABLED")
        sanitizer = Sanitizer[os.getenv("CI_SANITIZER", "none")]
        repo = mzbuild.Repository(
            MZ_ROOT,
            release_mode=release_mode,
            coverage=coverage,
            sanitizer=sanitizer,
        )
        deps = repo.resolve_dependencies([repo.images[service]])
        rimage = deps[service]
        return rimage.spec()
def kind(self) ‑> str
Expand source code Browse git
def kind(self) -> str:
    assert False
def kubectl(self, *args: str, input: str | None = None, capture_output: bool = False, suppress_command_error_output: bool = False) ‑> None
Expand source code Browse git
def kubectl(
    self,
    *args: str,
    input: str | None = None,
    capture_output: bool = False,
    suppress_command_error_output: bool = False,
) -> None:
    cmd = [
        "kubectl",
        "--context",
        self.context(),
        "--namespace",
        self.namespace(),
        *args,
    ]

    if suppress_command_error_output:
        subprocess.run(
            cmd, text=True, input=input, check=True, capture_output=capture_output
        )
    else:
        run_process_with_error_information(
            cmd, input, capture_output=capture_output
        )
def namespace(self) ‑> str
Expand source code Browse git
def namespace(self) -> str:
    return self.selected_namespace
def rbac_api(self) ‑> kubernetes.client.api.rbac_authorization_v1_api.RbacAuthorizationV1Api
Expand source code Browse git
def rbac_api(self) -> RbacAuthorizationV1Api:
    api_client = new_client_from_config(context=self.context())
    return RbacAuthorizationV1Api(api_client)
def wait(self, condition: str, resource: str) ‑> None
Expand source code Browse git
def wait(
    self,
    condition: str,
    resource: str,
) -> None:
    wait(condition=condition, resource=resource, namespace=self.selected_namespace)