Module materialize.cloudtest.k8s

Expand source code Browse git
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.

import subprocess
from typing import Any, Optional

import pg8000
import sqlparse
from kubernetes.client import (
    AppsV1Api,
    CoreV1Api,
    RbacAuthorizationV1Api,
    V1ClusterRole,
    V1ConfigMap,
    V1Deployment,
    V1Pod,
    V1RoleBinding,
    V1Secret,
    V1Service,
    V1StatefulSet,
)
from kubernetes.client.exceptions import ApiException
from kubernetes.config import new_client_from_config  # type: ignore
from pg8000 import Connection, Cursor

from materialize import ROOT, mzbuild


class K8sResource:
    def kubectl(self, *args: str) -> str:
        return subprocess.check_output(
            ["kubectl", "--context", self.context(), *args]
        ).decode("ascii")

    def api(self) -> CoreV1Api:
        api_client = new_client_from_config(context=self.context())
        return CoreV1Api(api_client)

    def apps_api(self) -> AppsV1Api:
        api_client = new_client_from_config(context=self.context())
        return AppsV1Api(api_client)

    def rbac_api(self) -> RbacAuthorizationV1Api:
        api_client = new_client_from_config(context=self.context())
        return RbacAuthorizationV1Api(api_client)

    def context(self) -> str:
        return "kind-cloudtest"

    def namespace(self) -> str:
        return "default"

    def kind(self) -> str:
        assert False

    def create(self) -> None:
        assert False

    def image(
        self, service: str, tag: Optional[str] = None, release_mode: bool = True
    ) -> str:
        if tag is not None:
            return f"materialize/{service}:{tag}"
        else:
            repo = mzbuild.Repository(ROOT, release_mode=release_mode)
            deps = repo.resolve_dependencies([repo.images[service]])
            rimage = deps[service]
            return rimage.spec()


class K8sPod(K8sResource):
    pod: V1Pod

    def kind(self) -> str:
        return "pod"

    def create(self) -> None:
        core_v1_api = self.api()
        core_v1_api.create_namespaced_pod(body=self.pod, namespace=self.namespace())

    def name(self) -> str:
        assert self.pod.metadata is not None
        assert self.pod.metadata.name is not None
        return self.pod.metadata.name

    def copy(self, source: str, destination: str) -> None:
        self.kubectl("cp", source, f"{self.name()}:{destination}")


class K8sService(K8sResource):
    service: V1Service

    def kind(self) -> str:
        return "service"

    def create(self) -> None:
        core_v1_api = self.api()
        core_v1_api.create_namespaced_service(
            body=self.service, namespace=self.namespace()
        )

    def node_port(self, name: Optional[str] = None) -> int:
        assert self.service and self.service.metadata and self.service.metadata.name
        service = self.api().read_namespaced_service(
            self.service.metadata.name, self.namespace()
        )
        assert service is not None

        spec = service.spec
        assert spec is not None

        ports = spec.ports
        assert ports is not None and len(ports) > 0

        port = next(p for p in ports if name is None or p.name == name)

        node_port = port.node_port
        assert node_port is not None

        return node_port

    def sql_conn(
        self,
        port: Optional[str] = None,
        user: str = "materialize",
    ) -> Connection:
        """Get a connection to run SQL queries against the service"""
        return pg8000.connect(
            host="localhost",
            port=self.node_port(name=port),
            user=user,
        )

    def sql_cursor(
        self,
        port: Optional[str] = None,
        user: str = "materialize",
    ) -> Cursor:
        """Get a cursor to run SQL queries against the service"""
        conn = self.sql_conn(port=port, user=user)
        conn.autocommit = True
        return conn.cursor()

    def sql(
        self,
        sql: str,
        port: Optional[str] = None,
        user: str = "materialize",
    ) -> None:
        """Run a batch of SQL statements against the service."""
        with self.sql_cursor(port=port, user=user) as cursor:
            for statement in sqlparse.split(sql):
                print(f"> {statement}")
                cursor.execute(statement)

    def sql_query(
        self,
        sql: str,
        port: Optional[str] = None,
        user: str = "materialize",
    ) -> Any:
        """Execute a SQL query against the service and return results."""
        with self.sql_cursor(port=port, user=user) as cursor:
            print(f"> {sql}")
            cursor.execute(sql)
            return cursor.fetchall()


class K8sDeployment(K8sResource):
    deployment: V1Deployment

    def kind(self) -> str:
        return "deployment"

    def create(self) -> None:
        apps_v1_api = self.apps_api()
        apps_v1_api.create_namespaced_deployment(
            body=self.deployment, namespace=self.namespace()
        )


class K8sStatefulSet(K8sResource):
    stateful_set: V1StatefulSet

    def __init__(self) -> None:
        self.stateful_set = self.generate_stateful_set()

    def generate_stateful_set(self) -> V1StatefulSet:
        assert False

    def kind(self) -> str:
        return "statefulset"

    def name(self) -> str:
        assert self.stateful_set.metadata is not None
        assert self.stateful_set.metadata.name is not None
        return self.stateful_set.metadata.name

    def create(self) -> None:
        apps_v1_api = self.apps_api()
        apps_v1_api.create_namespaced_stateful_set(
            body=self.stateful_set, namespace=self.namespace()
        )

    def replace(self) -> None:
        apps_v1_api = self.apps_api()
        name = self.name()
        print(f"Replacing stateful set {name}...")
        self.stateful_set = self.generate_stateful_set()
        apps_v1_api.replace_namespaced_stateful_set(
            name=name, body=self.stateful_set, namespace=self.namespace()
        )
        # Despite the name "status" this kubectl command will actually wait
        # until the rollout is complete.
        # See https://github.com/kubernetes/kubernetes/issues/79606#issuecomment-779779928
        self.kubectl("rollout", "status", f"statefulset/{name}")


class K8sConfigMap(K8sResource):
    config_map: V1ConfigMap

    def kind(self) -> str:
        return "configmap"

    def create(self) -> None:
        core_v1_api = self.api()

        # kubectl delete all -all does not clean up configmaps
        try:
            assert self.config_map.metadata is not None
            assert self.config_map.metadata.name is not None
            core_v1_api.delete_namespaced_config_map(
                name=self.config_map.metadata.name, namespace=self.namespace()
            )
        except ApiException:
            pass

        core_v1_api.create_namespaced_config_map(
            body=self.config_map, namespace=self.namespace()
        )


class K8sClusterRole(K8sResource):
    role: V1ClusterRole

    def kind(self) -> str:
        return "clusterrole"

    def create(self) -> None:
        rbac_api = self.rbac_api()

        # kubectl delete all -all does not clean up role bindings
        try:
            assert self.role.metadata is not None
            assert self.role.metadata.name is not None
            rbac_api.delete_cluster_role(name=self.role.metadata.name)
        except ApiException:
            pass

        rbac_api.create_cluster_role(
            body=self.role,
        )


class K8sRoleBinding(K8sResource):
    role_binding: V1RoleBinding

    def kind(self) -> str:
        return "rolebinding"

    def create(self) -> None:
        rbac_api = self.rbac_api()

        # kubectl delete all -all does not clean up role bindings
        try:
            assert self.role_binding.metadata is not None
            assert self.role_binding.metadata.name is not None
            rbac_api.delete_namespaced_role_binding(
                name=self.role_binding.metadata.name, namespace=self.namespace()
            )
        except ApiException:
            pass

        rbac_api.create_namespaced_role_binding(
            body=self.role_binding,
            namespace=self.namespace(),
        )


class K8sSecret(K8sResource):
    secret = V1Secret

    def kind(self) -> str:
        return "secret"

    # kubectl delete all -all does not clean up secrets
    def create(self) -> None:
        core_v1_api = self.api()

        try:
            assert self.secret.metadata is not None
            assert self.secret.metadata.name is not None
            core_v1_api.delete_namespaced_secret(
                name=self.secret.metadata.name, namespace=self.namespace()
            )
        except ApiException:
            pass
        core_v1_api.create_namespaced_secret(
            body=self.secret, namespace=self.namespace()  # type: ignore
        )

Sub-modules

materialize.cloudtest.k8s.cockroach
materialize.cloudtest.k8s.debezium
materialize.cloudtest.k8s.environmentd
materialize.cloudtest.k8s.minio
materialize.cloudtest.k8s.postgres
materialize.cloudtest.k8s.redpanda
materialize.cloudtest.k8s.role_binding
materialize.cloudtest.k8s.ssh
materialize.cloudtest.k8s.testdrive
materialize.cloudtest.k8s.vpc_endpoints_cluster_role

Classes

class K8sClusterRole
Expand source code Browse git
class K8sClusterRole(K8sResource):
    role: V1ClusterRole

    def kind(self) -> str:
        return "clusterrole"

    def create(self) -> None:
        rbac_api = self.rbac_api()

        # kubectl delete all -all does not clean up role bindings
        try:
            assert self.role.metadata is not None
            assert self.role.metadata.name is not None
            rbac_api.delete_cluster_role(name=self.role.metadata.name)
        except ApiException:
            pass

        rbac_api.create_cluster_role(
            body=self.role,
        )

Ancestors

Subclasses

Class variables

var role : kubernetes.client.models.v1_cluster_role.V1ClusterRole

Methods

def create(self) ‑> None
Expand source code Browse git
def create(self) -> None:
    rbac_api = self.rbac_api()

    # kubectl delete all -all does not clean up role bindings
    try:
        assert self.role.metadata is not None
        assert self.role.metadata.name is not None
        rbac_api.delete_cluster_role(name=self.role.metadata.name)
    except ApiException:
        pass

    rbac_api.create_cluster_role(
        body=self.role,
    )
def kind(self) ‑> str
Expand source code Browse git
def kind(self) -> str:
    return "clusterrole"
class K8sConfigMap
Expand source code Browse git
class K8sConfigMap(K8sResource):
    config_map: V1ConfigMap

    def kind(self) -> str:
        return "configmap"

    def create(self) -> None:
        core_v1_api = self.api()

        # kubectl delete all -all does not clean up configmaps
        try:
            assert self.config_map.metadata is not None
            assert self.config_map.metadata.name is not None
            core_v1_api.delete_namespaced_config_map(
                name=self.config_map.metadata.name, namespace=self.namespace()
            )
        except ApiException:
            pass

        core_v1_api.create_namespaced_config_map(
            body=self.config_map, namespace=self.namespace()
        )

Ancestors

Subclasses

Class variables

var config_map : kubernetes.client.models.v1_config_map.V1ConfigMap

Methods

def create(self) ‑> None
Expand source code Browse git
def create(self) -> None:
    core_v1_api = self.api()

    # kubectl delete all -all does not clean up configmaps
    try:
        assert self.config_map.metadata is not None
        assert self.config_map.metadata.name is not None
        core_v1_api.delete_namespaced_config_map(
            name=self.config_map.metadata.name, namespace=self.namespace()
        )
    except ApiException:
        pass

    core_v1_api.create_namespaced_config_map(
        body=self.config_map, namespace=self.namespace()
    )
def kind(self) ‑> str
Expand source code Browse git
def kind(self) -> str:
    return "configmap"
class K8sDeployment
Expand source code Browse git
class K8sDeployment(K8sResource):
    deployment: V1Deployment

    def kind(self) -> str:
        return "deployment"

    def create(self) -> None:
        apps_v1_api = self.apps_api()
        apps_v1_api.create_namespaced_deployment(
            body=self.deployment, namespace=self.namespace()
        )

Ancestors

Subclasses

Class variables

var deployment : kubernetes.client.models.v1_deployment.V1Deployment

Methods

def create(self) ‑> None
Expand source code Browse git
def create(self) -> None:
    apps_v1_api = self.apps_api()
    apps_v1_api.create_namespaced_deployment(
        body=self.deployment, namespace=self.namespace()
    )
def kind(self) ‑> str
Expand source code Browse git
def kind(self) -> str:
    return "deployment"
class K8sPod
Expand source code Browse git
class K8sPod(K8sResource):
    pod: V1Pod

    def kind(self) -> str:
        return "pod"

    def create(self) -> None:
        core_v1_api = self.api()
        core_v1_api.create_namespaced_pod(body=self.pod, namespace=self.namespace())

    def name(self) -> str:
        assert self.pod.metadata is not None
        assert self.pod.metadata.name is not None
        return self.pod.metadata.name

    def copy(self, source: str, destination: str) -> None:
        self.kubectl("cp", source, f"{self.name()}:{destination}")

Ancestors

Subclasses

Class variables

var pod : kubernetes.client.models.v1_pod.V1Pod

Methods

def copy(self, source: str, destination: str) ‑> None
Expand source code Browse git
def copy(self, source: str, destination: str) -> None:
    self.kubectl("cp", source, f"{self.name()}:{destination}")
def create(self) ‑> None
Expand source code Browse git
def create(self) -> None:
    core_v1_api = self.api()
    core_v1_api.create_namespaced_pod(body=self.pod, namespace=self.namespace())
def kind(self) ‑> str
Expand source code Browse git
def kind(self) -> str:
    return "pod"
def name(self) ‑> str
Expand source code Browse git
def name(self) -> str:
    assert self.pod.metadata is not None
    assert self.pod.metadata.name is not None
    return self.pod.metadata.name
class K8sResource
Expand source code Browse git
class K8sResource:
    def kubectl(self, *args: str) -> str:
        return subprocess.check_output(
            ["kubectl", "--context", self.context(), *args]
        ).decode("ascii")

    def api(self) -> CoreV1Api:
        api_client = new_client_from_config(context=self.context())
        return CoreV1Api(api_client)

    def apps_api(self) -> AppsV1Api:
        api_client = new_client_from_config(context=self.context())
        return AppsV1Api(api_client)

    def rbac_api(self) -> RbacAuthorizationV1Api:
        api_client = new_client_from_config(context=self.context())
        return RbacAuthorizationV1Api(api_client)

    def context(self) -> str:
        return "kind-cloudtest"

    def namespace(self) -> str:
        return "default"

    def kind(self) -> str:
        assert False

    def create(self) -> None:
        assert False

    def image(
        self, service: str, tag: Optional[str] = None, release_mode: bool = True
    ) -> str:
        if tag is not None:
            return f"materialize/{service}:{tag}"
        else:
            repo = mzbuild.Repository(ROOT, release_mode=release_mode)
            deps = repo.resolve_dependencies([repo.images[service]])
            rimage = deps[service]
            return rimage.spec()

Subclasses

Methods

def api(self) ‑> kubernetes.client.api.core_v1_api.CoreV1Api
Expand source code Browse git
def api(self) -> CoreV1Api:
    api_client = new_client_from_config(context=self.context())
    return CoreV1Api(api_client)
def apps_api(self) ‑> kubernetes.client.api.apps_v1_api.AppsV1Api
Expand source code Browse git
def apps_api(self) -> AppsV1Api:
    api_client = new_client_from_config(context=self.context())
    return AppsV1Api(api_client)
def context(self) ‑> str
Expand source code Browse git
def context(self) -> str:
    return "kind-cloudtest"
def create(self) ‑> None
Expand source code Browse git
def create(self) -> None:
    assert False
def image(self, service: str, tag: Optional[str] = None, release_mode: bool = True) ‑> str
Expand source code Browse git
def image(
    self, service: str, tag: Optional[str] = None, release_mode: bool = True
) -> str:
    if tag is not None:
        return f"materialize/{service}:{tag}"
    else:
        repo = mzbuild.Repository(ROOT, release_mode=release_mode)
        deps = repo.resolve_dependencies([repo.images[service]])
        rimage = deps[service]
        return rimage.spec()
def kind(self) ‑> str
Expand source code Browse git
def kind(self) -> str:
    assert False
def kubectl(self, *args: str) ‑> str
Expand source code Browse git
def kubectl(self, *args: str) -> str:
    return subprocess.check_output(
        ["kubectl", "--context", self.context(), *args]
    ).decode("ascii")
def namespace(self) ‑> str
Expand source code Browse git
def namespace(self) -> str:
    return "default"
def rbac_api(self) ‑> kubernetes.client.api.rbac_authorization_v1_api.RbacAuthorizationV1Api
Expand source code Browse git
def rbac_api(self) -> RbacAuthorizationV1Api:
    api_client = new_client_from_config(context=self.context())
    return RbacAuthorizationV1Api(api_client)
class K8sRoleBinding
Expand source code Browse git
class K8sRoleBinding(K8sResource):
    role_binding: V1RoleBinding

    def kind(self) -> str:
        return "rolebinding"

    def create(self) -> None:
        rbac_api = self.rbac_api()

        # kubectl delete all -all does not clean up role bindings
        try:
            assert self.role_binding.metadata is not None
            assert self.role_binding.metadata.name is not None
            rbac_api.delete_namespaced_role_binding(
                name=self.role_binding.metadata.name, namespace=self.namespace()
            )
        except ApiException:
            pass

        rbac_api.create_namespaced_role_binding(
            body=self.role_binding,
            namespace=self.namespace(),
        )

Ancestors

Subclasses

Class variables

var role_binding : kubernetes.client.models.v1_role_binding.V1RoleBinding

Methods

def create(self) ‑> None
Expand source code Browse git
def create(self) -> None:
    rbac_api = self.rbac_api()

    # kubectl delete all -all does not clean up role bindings
    try:
        assert self.role_binding.metadata is not None
        assert self.role_binding.metadata.name is not None
        rbac_api.delete_namespaced_role_binding(
            name=self.role_binding.metadata.name, namespace=self.namespace()
        )
    except ApiException:
        pass

    rbac_api.create_namespaced_role_binding(
        body=self.role_binding,
        namespace=self.namespace(),
    )
def kind(self) ‑> str
Expand source code Browse git
def kind(self) -> str:
    return "rolebinding"
class K8sSecret
Expand source code Browse git
class K8sSecret(K8sResource):
    secret = V1Secret

    def kind(self) -> str:
        return "secret"

    # kubectl delete all -all does not clean up secrets
    def create(self) -> None:
        core_v1_api = self.api()

        try:
            assert self.secret.metadata is not None
            assert self.secret.metadata.name is not None
            core_v1_api.delete_namespaced_secret(
                name=self.secret.metadata.name, namespace=self.namespace()
            )
        except ApiException:
            pass
        core_v1_api.create_namespaced_secret(
            body=self.secret, namespace=self.namespace()  # type: ignore
        )

Ancestors

Class variables

var secret

NOTE: This class is auto generated by OpenAPI Generator. Ref: https://openapi-generator.tech

Do not edit the class manually.

Methods

def create(self) ‑> None
Expand source code Browse git
def create(self) -> None:
    core_v1_api = self.api()

    try:
        assert self.secret.metadata is not None
        assert self.secret.metadata.name is not None
        core_v1_api.delete_namespaced_secret(
            name=self.secret.metadata.name, namespace=self.namespace()
        )
    except ApiException:
        pass
    core_v1_api.create_namespaced_secret(
        body=self.secret, namespace=self.namespace()  # type: ignore
    )
def kind(self) ‑> str
Expand source code Browse git
def kind(self) -> str:
    return "secret"
class K8sService
Expand source code Browse git
class K8sService(K8sResource):
    service: V1Service

    def kind(self) -> str:
        return "service"

    def create(self) -> None:
        core_v1_api = self.api()
        core_v1_api.create_namespaced_service(
            body=self.service, namespace=self.namespace()
        )

    def node_port(self, name: Optional[str] = None) -> int:
        assert self.service and self.service.metadata and self.service.metadata.name
        service = self.api().read_namespaced_service(
            self.service.metadata.name, self.namespace()
        )
        assert service is not None

        spec = service.spec
        assert spec is not None

        ports = spec.ports
        assert ports is not None and len(ports) > 0

        port = next(p for p in ports if name is None or p.name == name)

        node_port = port.node_port
        assert node_port is not None

        return node_port

    def sql_conn(
        self,
        port: Optional[str] = None,
        user: str = "materialize",
    ) -> Connection:
        """Get a connection to run SQL queries against the service"""
        return pg8000.connect(
            host="localhost",
            port=self.node_port(name=port),
            user=user,
        )

    def sql_cursor(
        self,
        port: Optional[str] = None,
        user: str = "materialize",
    ) -> Cursor:
        """Get a cursor to run SQL queries against the service"""
        conn = self.sql_conn(port=port, user=user)
        conn.autocommit = True
        return conn.cursor()

    def sql(
        self,
        sql: str,
        port: Optional[str] = None,
        user: str = "materialize",
    ) -> None:
        """Run a batch of SQL statements against the service."""
        with self.sql_cursor(port=port, user=user) as cursor:
            for statement in sqlparse.split(sql):
                print(f"> {statement}")
                cursor.execute(statement)

    def sql_query(
        self,
        sql: str,
        port: Optional[str] = None,
        user: str = "materialize",
    ) -> Any:
        """Execute a SQL query against the service and return results."""
        with self.sql_cursor(port=port, user=user) as cursor:
            print(f"> {sql}")
            cursor.execute(sql)
            return cursor.fetchall()

Ancestors

Subclasses

Class variables

var service : kubernetes.client.models.v1_service.V1Service

Methods

def create(self) ‑> None
Expand source code Browse git
def create(self) -> None:
    core_v1_api = self.api()
    core_v1_api.create_namespaced_service(
        body=self.service, namespace=self.namespace()
    )
def kind(self) ‑> str
Expand source code Browse git
def kind(self) -> str:
    return "service"
def node_port(self, name: Optional[str] = None) ‑> int
Expand source code Browse git
def node_port(self, name: Optional[str] = None) -> int:
    assert self.service and self.service.metadata and self.service.metadata.name
    service = self.api().read_namespaced_service(
        self.service.metadata.name, self.namespace()
    )
    assert service is not None

    spec = service.spec
    assert spec is not None

    ports = spec.ports
    assert ports is not None and len(ports) > 0

    port = next(p for p in ports if name is None or p.name == name)

    node_port = port.node_port
    assert node_port is not None

    return node_port
def sql(self, sql: str, port: Optional[str] = None, user: str = 'materialize') ‑> None

Run a batch of SQL statements against the service.

Expand source code Browse git
def sql(
    self,
    sql: str,
    port: Optional[str] = None,
    user: str = "materialize",
) -> None:
    """Run a batch of SQL statements against the service."""
    with self.sql_cursor(port=port, user=user) as cursor:
        for statement in sqlparse.split(sql):
            print(f"> {statement}")
            cursor.execute(statement)
def sql_conn(self, port: Optional[str] = None, user: str = 'materialize') ‑> pg8000.legacy.Connection

Get a connection to run SQL queries against the service

Expand source code Browse git
def sql_conn(
    self,
    port: Optional[str] = None,
    user: str = "materialize",
) -> Connection:
    """Get a connection to run SQL queries against the service"""
    return pg8000.connect(
        host="localhost",
        port=self.node_port(name=port),
        user=user,
    )
def sql_cursor(self, port: Optional[str] = None, user: str = 'materialize') ‑> pg8000.legacy.Cursor

Get a cursor to run SQL queries against the service

Expand source code Browse git
def sql_cursor(
    self,
    port: Optional[str] = None,
    user: str = "materialize",
) -> Cursor:
    """Get a cursor to run SQL queries against the service"""
    conn = self.sql_conn(port=port, user=user)
    conn.autocommit = True
    return conn.cursor()
def sql_query(self, sql: str, port: Optional[str] = None, user: str = 'materialize') ‑> Any

Execute a SQL query against the service and return results.

Expand source code Browse git
def sql_query(
    self,
    sql: str,
    port: Optional[str] = None,
    user: str = "materialize",
) -> Any:
    """Execute a SQL query against the service and return results."""
    with self.sql_cursor(port=port, user=user) as cursor:
        print(f"> {sql}")
        cursor.execute(sql)
        return cursor.fetchall()
class K8sStatefulSet
Expand source code Browse git
class K8sStatefulSet(K8sResource):
    stateful_set: V1StatefulSet

    def __init__(self) -> None:
        self.stateful_set = self.generate_stateful_set()

    def generate_stateful_set(self) -> V1StatefulSet:
        assert False

    def kind(self) -> str:
        return "statefulset"

    def name(self) -> str:
        assert self.stateful_set.metadata is not None
        assert self.stateful_set.metadata.name is not None
        return self.stateful_set.metadata.name

    def create(self) -> None:
        apps_v1_api = self.apps_api()
        apps_v1_api.create_namespaced_stateful_set(
            body=self.stateful_set, namespace=self.namespace()
        )

    def replace(self) -> None:
        apps_v1_api = self.apps_api()
        name = self.name()
        print(f"Replacing stateful set {name}...")
        self.stateful_set = self.generate_stateful_set()
        apps_v1_api.replace_namespaced_stateful_set(
            name=name, body=self.stateful_set, namespace=self.namespace()
        )
        # Despite the name "status" this kubectl command will actually wait
        # until the rollout is complete.
        # See https://github.com/kubernetes/kubernetes/issues/79606#issuecomment-779779928
        self.kubectl("rollout", "status", f"statefulset/{name}")

Ancestors

Subclasses

Class variables

var stateful_set : kubernetes.client.models.v1_stateful_set.V1StatefulSet

Methods

def create(self) ‑> None
Expand source code Browse git
def create(self) -> None:
    apps_v1_api = self.apps_api()
    apps_v1_api.create_namespaced_stateful_set(
        body=self.stateful_set, namespace=self.namespace()
    )
def generate_stateful_set(self) ‑> kubernetes.client.models.v1_stateful_set.V1StatefulSet
Expand source code Browse git
def generate_stateful_set(self) -> V1StatefulSet:
    assert False
def kind(self) ‑> str
Expand source code Browse git
def kind(self) -> str:
    return "statefulset"
def name(self) ‑> str
Expand source code Browse git
def name(self) -> str:
    assert self.stateful_set.metadata is not None
    assert self.stateful_set.metadata.name is not None
    return self.stateful_set.metadata.name
def replace(self) ‑> None
Expand source code Browse git
def replace(self) -> None:
    apps_v1_api = self.apps_api()
    name = self.name()
    print(f"Replacing stateful set {name}...")
    self.stateful_set = self.generate_stateful_set()
    apps_v1_api.replace_namespaced_stateful_set(
        name=name, body=self.stateful_set, namespace=self.namespace()
    )
    # Despite the name "status" this kubectl command will actually wait
    # until the rollout is complete.
    # See https://github.com/kubernetes/kubernetes/issues/79606#issuecomment-779779928
    self.kubectl("rollout", "status", f"statefulset/{name}")