Module materialize.cloudtest.util.kubectl
Expand source code Browse git
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.
import json
import subprocess
from pathlib import Path
from typing import Any
import yaml
from materialize.cloudtest.util.common import retry
from materialize.cloudtest.util.wait import wait
class KubectlError(AssertionError):
def __init__(self, returncode: int, cmd: list[str], stdout: bytes, stderr: bytes):
self.returncode = returncode
self.cmd = cmd
self.stdout = stdout
self.stderr = stderr
@classmethod
def from_subprocess_error(cls, e: subprocess.CalledProcessError) -> BaseException:
return cls(e.returncode, e.cmd, e.stdout, e.stderr)
def __str__(self) -> str:
return "\n".join(
[
f"error in kubectl command: {self.cmd} returned {self.returncode}",
f"stdout: {self.stdout.decode('utf-8')}",
f"stderr: {self.stderr.decode('utf-8')}",
],
)
class Kubectl:
def __init__(self, context: str):
self.context = context
def patch(
self,
resource_type: str,
name: str,
namespace: str | None,
patch: Any,
) -> None:
command = [
"kubectl",
"--context",
self.context,
"patch",
resource_type,
name,
"-p",
json.dumps(patch),
"--type",
"merge",
]
if namespace:
command.extend(["-n", namespace])
subprocess.run(
command,
check=True,
)
def wait(
self,
namespace: str | None,
resource_type: str,
resource_name: str | None,
wait_for: str,
timeout_secs: int,
label: str | None = None,
) -> None:
if resource_name is None and label is None:
raise RuntimeError("Either resource_name or label must be set")
if resource_name is None:
resource = resource_type
else:
resource = f"{resource_type}/{resource_name}"
wait(
wait_for,
resource,
timeout_secs,
self.context,
label=label,
namespace=namespace,
)
def delete(
self,
namespace: str | None,
resource_type: str,
resource_name: str,
) -> None:
command = [
"kubectl",
"--context",
self.context,
"delete",
resource_type,
resource_name,
"--wait=true",
"--cascade=foreground",
]
if namespace:
command.extend(["-n", namespace])
try:
subprocess.run(
command,
capture_output=True,
check=True,
text=True,
)
except subprocess.CalledProcessError as e:
if "NotFound" not in e.stderr:
raise KubectlError.from_subprocess_error(e) from e
def get(
self,
namespace: str | None,
resource_type: str,
resource_name: str | None = None,
) -> dict[str, Any]:
command = [
"kubectl",
"--context",
self.context,
"get",
resource_type,
]
if resource_name is not None:
command.append(resource_name)
if namespace:
command.extend(["-n", namespace])
command.extend(["-o", "yaml"])
try:
yaml_data: dict[str, Any] = yaml.safe_load(
subprocess.run(
command,
capture_output=True,
check=True,
).stdout,
)
return yaml_data
except subprocess.CalledProcessError as e:
raise KubectlError.from_subprocess_error(e) from e
def get_retry(
self,
namespace: str | None,
resource_type: str,
resource_name: str,
max_attempts: int,
) -> dict[str, Any]:
def f() -> dict[str, Any]:
return self.get(namespace, resource_type, resource_name)
yaml_data: dict[str, Any] = retry(
f,
max_attempts=max_attempts,
exception_types=[KubectlError],
)
return yaml_data
def get_or_none(
self,
namespace: str | None,
resource_type: str,
resource_name: str | None = None,
) -> dict[str, Any] | None:
try:
return self.get(namespace, resource_type, resource_name)
except KubectlError as e:
if b"NotFound" in e.stderr:
return None
raise
def load_k8s_yaml(
self,
filepath: str,
tests_dir: str,
substitutions: dict[str, str] | None = None,
) -> dict[str, Any]:
"""
Load a Kubernetes YAML specification to assert against. If `substitutions`
are given, find-and-replace in the YAML contents before parsing.
"""
contents = Path(tests_dir).joinpath(filepath).read_text()
for old, new in (substitutions or {}).items():
contents = contents.replace(old, new)
yaml_data: dict[str, Any] = yaml.safe_load(contents)
return yaml_data
Classes
class Kubectl (context: str)
-
Expand source code Browse git
class Kubectl: def __init__(self, context: str): self.context = context def patch( self, resource_type: str, name: str, namespace: str | None, patch: Any, ) -> None: command = [ "kubectl", "--context", self.context, "patch", resource_type, name, "-p", json.dumps(patch), "--type", "merge", ] if namespace: command.extend(["-n", namespace]) subprocess.run( command, check=True, ) def wait( self, namespace: str | None, resource_type: str, resource_name: str | None, wait_for: str, timeout_secs: int, label: str | None = None, ) -> None: if resource_name is None and label is None: raise RuntimeError("Either resource_name or label must be set") if resource_name is None: resource = resource_type else: resource = f"{resource_type}/{resource_name}" wait( wait_for, resource, timeout_secs, self.context, label=label, namespace=namespace, ) def delete( self, namespace: str | None, resource_type: str, resource_name: str, ) -> None: command = [ "kubectl", "--context", self.context, "delete", resource_type, resource_name, "--wait=true", "--cascade=foreground", ] if namespace: command.extend(["-n", namespace]) try: subprocess.run( command, capture_output=True, check=True, text=True, ) except subprocess.CalledProcessError as e: if "NotFound" not in e.stderr: raise KubectlError.from_subprocess_error(e) from e def get( self, namespace: str | None, resource_type: str, resource_name: str | None = None, ) -> dict[str, Any]: command = [ "kubectl", "--context", self.context, "get", resource_type, ] if resource_name is not None: command.append(resource_name) if namespace: command.extend(["-n", namespace]) command.extend(["-o", "yaml"]) try: yaml_data: dict[str, Any] = yaml.safe_load( subprocess.run( command, capture_output=True, check=True, ).stdout, ) return yaml_data except subprocess.CalledProcessError as e: raise KubectlError.from_subprocess_error(e) from e def get_retry( self, namespace: str | None, resource_type: str, resource_name: str, max_attempts: int, ) -> dict[str, Any]: def f() -> dict[str, Any]: return self.get(namespace, resource_type, resource_name) yaml_data: dict[str, Any] = retry( f, max_attempts=max_attempts, exception_types=[KubectlError], ) return yaml_data def get_or_none( self, namespace: str | None, resource_type: str, resource_name: str | None = None, ) -> dict[str, Any] | None: try: return self.get(namespace, resource_type, resource_name) except KubectlError as e: if b"NotFound" in e.stderr: return None raise def load_k8s_yaml( self, filepath: str, tests_dir: str, substitutions: dict[str, str] | None = None, ) -> dict[str, Any]: """ Load a Kubernetes YAML specification to assert against. If `substitutions` are given, find-and-replace in the YAML contents before parsing. """ contents = Path(tests_dir).joinpath(filepath).read_text() for old, new in (substitutions or {}).items(): contents = contents.replace(old, new) yaml_data: dict[str, Any] = yaml.safe_load(contents) return yaml_data
Methods
def delete(self, namespace: str | None, resource_type: str, resource_name: str) ‑> None
-
Expand source code Browse git
def delete( self, namespace: str | None, resource_type: str, resource_name: str, ) -> None: command = [ "kubectl", "--context", self.context, "delete", resource_type, resource_name, "--wait=true", "--cascade=foreground", ] if namespace: command.extend(["-n", namespace]) try: subprocess.run( command, capture_output=True, check=True, text=True, ) except subprocess.CalledProcessError as e: if "NotFound" not in e.stderr: raise KubectlError.from_subprocess_error(e) from e
def get(self, namespace: str | None, resource_type: str, resource_name: str | None = None) ‑> dict[str, typing.Any]
-
Expand source code Browse git
def get( self, namespace: str | None, resource_type: str, resource_name: str | None = None, ) -> dict[str, Any]: command = [ "kubectl", "--context", self.context, "get", resource_type, ] if resource_name is not None: command.append(resource_name) if namespace: command.extend(["-n", namespace]) command.extend(["-o", "yaml"]) try: yaml_data: dict[str, Any] = yaml.safe_load( subprocess.run( command, capture_output=True, check=True, ).stdout, ) return yaml_data except subprocess.CalledProcessError as e: raise KubectlError.from_subprocess_error(e) from e
def get_or_none(self, namespace: str | None, resource_type: str, resource_name: str | None = None) ‑> dict[str, typing.Any] | None
-
Expand source code Browse git
def get_or_none( self, namespace: str | None, resource_type: str, resource_name: str | None = None, ) -> dict[str, Any] | None: try: return self.get(namespace, resource_type, resource_name) except KubectlError as e: if b"NotFound" in e.stderr: return None raise
def get_retry(self, namespace: str | None, resource_type: str, resource_name: str, max_attempts: int) ‑> dict[str, typing.Any]
-
Expand source code Browse git
def get_retry( self, namespace: str | None, resource_type: str, resource_name: str, max_attempts: int, ) -> dict[str, Any]: def f() -> dict[str, Any]: return self.get(namespace, resource_type, resource_name) yaml_data: dict[str, Any] = retry( f, max_attempts=max_attempts, exception_types=[KubectlError], ) return yaml_data
def load_k8s_yaml(self, filepath: str, tests_dir: str, substitutions: dict[str, str] | None = None) ‑> dict[str, typing.Any]
-
Load a Kubernetes YAML specification to assert against. If
substitutions
are given, find-and-replace in the YAML contents before parsing.Expand source code Browse git
def load_k8s_yaml( self, filepath: str, tests_dir: str, substitutions: dict[str, str] | None = None, ) -> dict[str, Any]: """ Load a Kubernetes YAML specification to assert against. If `substitutions` are given, find-and-replace in the YAML contents before parsing. """ contents = Path(tests_dir).joinpath(filepath).read_text() for old, new in (substitutions or {}).items(): contents = contents.replace(old, new) yaml_data: dict[str, Any] = yaml.safe_load(contents) return yaml_data
def patch(self, resource_type: str, name: str, namespace: str | None, patch: Any) ‑> None
-
Expand source code Browse git
def patch( self, resource_type: str, name: str, namespace: str | None, patch: Any, ) -> None: command = [ "kubectl", "--context", self.context, "patch", resource_type, name, "-p", json.dumps(patch), "--type", "merge", ] if namespace: command.extend(["-n", namespace]) subprocess.run( command, check=True, )
def wait(self, namespace: str | None, resource_type: str, resource_name: str | None, wait_for: str, timeout_secs: int, label: str | None = None) ‑> None
-
Expand source code Browse git
def wait( self, namespace: str | None, resource_type: str, resource_name: str | None, wait_for: str, timeout_secs: int, label: str | None = None, ) -> None: if resource_name is None and label is None: raise RuntimeError("Either resource_name or label must be set") if resource_name is None: resource = resource_type else: resource = f"{resource_type}/{resource_name}" wait( wait_for, resource, timeout_secs, self.context, label=label, namespace=namespace, )
class KubectlError (returncode: int, cmd: list[str], stdout: bytes, stderr: bytes)
-
Assertion failed.
Expand source code Browse git
class KubectlError(AssertionError): def __init__(self, returncode: int, cmd: list[str], stdout: bytes, stderr: bytes): self.returncode = returncode self.cmd = cmd self.stdout = stdout self.stderr = stderr @classmethod def from_subprocess_error(cls, e: subprocess.CalledProcessError) -> BaseException: return cls(e.returncode, e.cmd, e.stdout, e.stderr) def __str__(self) -> str: return "\n".join( [ f"error in kubectl command: {self.cmd} returned {self.returncode}", f"stdout: {self.stdout.decode('utf-8')}", f"stderr: {self.stderr.decode('utf-8')}", ], )
Ancestors
- builtins.AssertionError
- builtins.Exception
- builtins.BaseException
Static methods
def from_subprocess_error(e: subprocess.CalledProcessError) ‑> BaseException
-
Expand source code Browse git
@classmethod def from_subprocess_error(cls, e: subprocess.CalledProcessError) -> BaseException: return cls(e.returncode, e.cmd, e.stdout, e.stderr)