Module materialize.cloudtest.k8s.api.k8s_resource
Expand source code Browse git
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.
import os
import subprocess
from kubernetes.client import AppsV1Api, CoreV1Api, RbacAuthorizationV1Api
from kubernetes.config import new_client_from_config # type: ignore
from materialize import MZ_ROOT, mzbuild, ui
from materialize.cloudtest import DEFAULT_K8S_CONTEXT_NAME
from materialize.cloudtest.util.common import run_process_with_error_information
from materialize.cloudtest.util.wait import wait
from materialize.rustc_flags import Sanitizer
class K8sResource:
def __init__(self, namespace: str):
self.selected_namespace = namespace
def kubectl(
self,
*args: str,
input: str | None = None,
capture_output: bool = False,
suppress_command_error_output: bool = False,
) -> None:
cmd = [
"kubectl",
"--context",
self.context(),
"--namespace",
self.namespace(),
*args,
]
if suppress_command_error_output:
subprocess.run(
cmd, text=True, input=input, check=True, capture_output=capture_output
)
else:
run_process_with_error_information(
cmd, input, capture_output=capture_output
)
def api(self) -> CoreV1Api:
api_client = new_client_from_config(context=self.context())
return CoreV1Api(api_client)
def apps_api(self) -> AppsV1Api:
api_client = new_client_from_config(context=self.context())
return AppsV1Api(api_client)
def rbac_api(self) -> RbacAuthorizationV1Api:
api_client = new_client_from_config(context=self.context())
return RbacAuthorizationV1Api(api_client)
def context(self) -> str:
return DEFAULT_K8S_CONTEXT_NAME
def namespace(self) -> str:
return self.selected_namespace
def kind(self) -> str:
assert False
def create(self) -> None:
assert False
def image(
self,
service: str,
tag: str | None = None,
release_mode: bool = True,
org: str | None = "materialize",
) -> str:
if tag is not None:
image_name = f"{service}:{tag}"
if org is not None:
image_name = f"{org}/{image_name}"
return image_name
else:
coverage = ui.env_is_truthy("CI_COVERAGE_ENABLED")
sanitizer = Sanitizer[os.getenv("CI_SANITIZER", "none")]
repo = mzbuild.Repository(
MZ_ROOT,
release_mode=release_mode,
coverage=coverage,
sanitizer=sanitizer,
)
deps = repo.resolve_dependencies([repo.images[service]])
rimage = deps[service]
return rimage.spec()
def wait(
self,
condition: str,
resource: str,
) -> None:
wait(condition=condition, resource=resource, namespace=self.selected_namespace)
Classes
class K8sResource (namespace: str)
-
Expand source code Browse git
class K8sResource: def __init__(self, namespace: str): self.selected_namespace = namespace def kubectl( self, *args: str, input: str | None = None, capture_output: bool = False, suppress_command_error_output: bool = False, ) -> None: cmd = [ "kubectl", "--context", self.context(), "--namespace", self.namespace(), *args, ] if suppress_command_error_output: subprocess.run( cmd, text=True, input=input, check=True, capture_output=capture_output ) else: run_process_with_error_information( cmd, input, capture_output=capture_output ) def api(self) -> CoreV1Api: api_client = new_client_from_config(context=self.context()) return CoreV1Api(api_client) def apps_api(self) -> AppsV1Api: api_client = new_client_from_config(context=self.context()) return AppsV1Api(api_client) def rbac_api(self) -> RbacAuthorizationV1Api: api_client = new_client_from_config(context=self.context()) return RbacAuthorizationV1Api(api_client) def context(self) -> str: return DEFAULT_K8S_CONTEXT_NAME def namespace(self) -> str: return self.selected_namespace def kind(self) -> str: assert False def create(self) -> None: assert False def image( self, service: str, tag: str | None = None, release_mode: bool = True, org: str | None = "materialize", ) -> str: if tag is not None: image_name = f"{service}:{tag}" if org is not None: image_name = f"{org}/{image_name}" return image_name else: coverage = ui.env_is_truthy("CI_COVERAGE_ENABLED") sanitizer = Sanitizer[os.getenv("CI_SANITIZER", "none")] repo = mzbuild.Repository( MZ_ROOT, release_mode=release_mode, coverage=coverage, sanitizer=sanitizer, ) deps = repo.resolve_dependencies([repo.images[service]]) rimage = deps[service] return rimage.spec() def wait( self, condition: str, resource: str, ) -> None: wait(condition=condition, resource=resource, namespace=self.selected_namespace)
Subclasses
- K8sClusterRole
- K8sConfigMap
- K8sDeployment
- K8sPod
- K8sRoleBinding
- K8sSecret
- K8sService
- K8sStatefulSet
- Minio
Methods
def api(self) ‑> kubernetes.client.api.core_v1_api.CoreV1Api
-
Expand source code Browse git
def api(self) -> CoreV1Api: api_client = new_client_from_config(context=self.context()) return CoreV1Api(api_client)
def apps_api(self) ‑> kubernetes.client.api.apps_v1_api.AppsV1Api
-
Expand source code Browse git
def apps_api(self) -> AppsV1Api: api_client = new_client_from_config(context=self.context()) return AppsV1Api(api_client)
def context(self) ‑> str
-
Expand source code Browse git
def context(self) -> str: return DEFAULT_K8S_CONTEXT_NAME
def create(self) ‑> None
-
Expand source code Browse git
def create(self) -> None: assert False
def image(self, service: str, tag: str | None = None, release_mode: bool = True, org: str | None = 'materialize') ‑> str
-
Expand source code Browse git
def image( self, service: str, tag: str | None = None, release_mode: bool = True, org: str | None = "materialize", ) -> str: if tag is not None: image_name = f"{service}:{tag}" if org is not None: image_name = f"{org}/{image_name}" return image_name else: coverage = ui.env_is_truthy("CI_COVERAGE_ENABLED") sanitizer = Sanitizer[os.getenv("CI_SANITIZER", "none")] repo = mzbuild.Repository( MZ_ROOT, release_mode=release_mode, coverage=coverage, sanitizer=sanitizer, ) deps = repo.resolve_dependencies([repo.images[service]]) rimage = deps[service] return rimage.spec()
def kind(self) ‑> str
-
Expand source code Browse git
def kind(self) -> str: assert False
def kubectl(self, *args: str, input: str | None = None, capture_output: bool = False, suppress_command_error_output: bool = False) ‑> None
-
Expand source code Browse git
def kubectl( self, *args: str, input: str | None = None, capture_output: bool = False, suppress_command_error_output: bool = False, ) -> None: cmd = [ "kubectl", "--context", self.context(), "--namespace", self.namespace(), *args, ] if suppress_command_error_output: subprocess.run( cmd, text=True, input=input, check=True, capture_output=capture_output ) else: run_process_with_error_information( cmd, input, capture_output=capture_output )
def namespace(self) ‑> str
-
Expand source code Browse git
def namespace(self) -> str: return self.selected_namespace
def rbac_api(self) ‑> kubernetes.client.api.rbac_authorization_v1_api.RbacAuthorizationV1Api
-
Expand source code Browse git
def rbac_api(self) -> RbacAuthorizationV1Api: api_client = new_client_from_config(context=self.context()) return RbacAuthorizationV1Api(api_client)
def wait(self, condition: str, resource: str) ‑> None
-
Expand source code Browse git
def wait( self, condition: str, resource: str, ) -> None: wait(condition=condition, resource=resource, namespace=self.selected_namespace)