Module materialize.cloudtest.k8s.api.k8s_stateful_set
Expand source code Browse git
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.
import logging
from kubernetes.client import V1StatefulSet
from materialize.cloudtest import DEFAULT_K8S_NAMESPACE
from materialize.cloudtest.k8s.api.k8s_resource import K8sResource
LOGGER = logging.getLogger(__name__)
class K8sStatefulSet(K8sResource):
stateful_set: V1StatefulSet
def __init__(self, namespace: str = DEFAULT_K8S_NAMESPACE):
super().__init__(namespace)
self.stateful_set = self.generate_stateful_set()
def generate_stateful_set(self) -> V1StatefulSet:
assert False
def kind(self) -> str:
return "statefulset"
def name(self) -> str:
assert self.stateful_set.metadata is not None
assert self.stateful_set.metadata.name is not None
return self.stateful_set.metadata.name
def create(self) -> None:
apps_v1_api = self.apps_api()
apps_v1_api.create_namespaced_stateful_set(
body=self.stateful_set, namespace=self.namespace()
)
def replace(self) -> None:
apps_v1_api = self.apps_api()
name = self.name()
LOGGER.info(f"Replacing stateful set {name}...")
self.stateful_set = self.generate_stateful_set()
apps_v1_api.replace_namespaced_stateful_set(
name=name, body=self.stateful_set, namespace=self.namespace()
)
# Despite the name "status" this kubectl command will actually wait
# until the rollout is complete.
# See https://github.com/kubernetes/kubernetes/issues/79606#issuecomment-779779928
self.kubectl("rollout", "status", f"statefulset/{name}")
Classes
class K8sStatefulSet (namespace: str = 'default')
-
Expand source code Browse git
class K8sStatefulSet(K8sResource): stateful_set: V1StatefulSet def __init__(self, namespace: str = DEFAULT_K8S_NAMESPACE): super().__init__(namespace) self.stateful_set = self.generate_stateful_set() def generate_stateful_set(self) -> V1StatefulSet: assert False def kind(self) -> str: return "statefulset" def name(self) -> str: assert self.stateful_set.metadata is not None assert self.stateful_set.metadata.name is not None return self.stateful_set.metadata.name def create(self) -> None: apps_v1_api = self.apps_api() apps_v1_api.create_namespaced_stateful_set( body=self.stateful_set, namespace=self.namespace() ) def replace(self) -> None: apps_v1_api = self.apps_api() name = self.name() LOGGER.info(f"Replacing stateful set {name}...") self.stateful_set = self.generate_stateful_set() apps_v1_api.replace_namespaced_stateful_set( name=name, body=self.stateful_set, namespace=self.namespace() ) # Despite the name "status" this kubectl command will actually wait # until the rollout is complete. # See https://github.com/kubernetes/kubernetes/issues/79606#issuecomment-779779928 self.kubectl("rollout", "status", f"statefulset/{name}")
Ancestors
Subclasses
Class variables
var stateful_set : kubernetes.client.models.v1_stateful_set.V1StatefulSet
Methods
def create(self) ‑> None
-
Expand source code Browse git
def create(self) -> None: apps_v1_api = self.apps_api() apps_v1_api.create_namespaced_stateful_set( body=self.stateful_set, namespace=self.namespace() )
def generate_stateful_set(self) ‑> kubernetes.client.models.v1_stateful_set.V1StatefulSet
-
Expand source code Browse git
def generate_stateful_set(self) -> V1StatefulSet: assert False
def kind(self) ‑> str
-
Expand source code Browse git
def kind(self) -> str: return "statefulset"
def name(self) ‑> str
-
Expand source code Browse git
def name(self) -> str: assert self.stateful_set.metadata is not None assert self.stateful_set.metadata.name is not None return self.stateful_set.metadata.name
def replace(self) ‑> None
-
Expand source code Browse git
def replace(self) -> None: apps_v1_api = self.apps_api() name = self.name() LOGGER.info(f"Replacing stateful set {name}...") self.stateful_set = self.generate_stateful_set() apps_v1_api.replace_namespaced_stateful_set( name=name, body=self.stateful_set, namespace=self.namespace() ) # Despite the name "status" this kubectl command will actually wait # until the rollout is complete. # See https://github.com/kubernetes/kubernetes/issues/79606#issuecomment-779779928 self.kubectl("rollout", "status", f"statefulset/{name}")