Module materialize.cloudtest.k8s.environmentd
Expand source code Browse git
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.
import operator
import urllib.parse
from collections.abc import Callable
from kubernetes.client import (
V1Container,
V1ContainerPort,
V1EnvVar,
V1EnvVarSource,
V1LabelSelector,
V1ObjectFieldSelector,
V1ObjectMeta,
V1PersistentVolumeClaim,
V1PersistentVolumeClaimSpec,
V1PodSpec,
V1PodTemplateSpec,
V1ResourceRequirements,
V1Service,
V1ServicePort,
V1ServiceSpec,
V1StatefulSet,
V1StatefulSetSpec,
V1Toleration,
V1VolumeMount,
)
from materialize.cloudtest import DEFAULT_K8S_NAMESPACE
from materialize.cloudtest.k8s.api.k8s_service import K8sService
from materialize.cloudtest.k8s.api.k8s_stateful_set import K8sStatefulSet
from materialize.mz_version import MzVersion
from materialize.mzcompose import DEFAULT_SYSTEM_PARAMETERS
class EnvironmentdService(K8sService):
def __init__(self, namespace: str = DEFAULT_K8S_NAMESPACE) -> None:
super().__init__(namespace)
service_port = V1ServicePort(name="sql", port=6875)
http_port = V1ServicePort(name="http", port=6876)
internal_port = V1ServicePort(name="internal", port=6877)
internal_http_port = V1ServicePort(name="internalhttp", port=6878)
self.service = V1Service(
api_version="v1",
kind="Service",
metadata=V1ObjectMeta(name="environmentd", labels={"app": "environmentd"}),
spec=V1ServiceSpec(
type="NodePort",
ports=[service_port, internal_port, http_port, internal_http_port],
selector={"app": "environmentd"},
),
)
class MaterializedAliasService(K8sService):
"""Some testdrive tests expect that Mz is accessible as 'materialized'"""
def __init__(self, namespace: str = DEFAULT_K8S_NAMESPACE) -> None:
super().__init__(namespace)
self.service = V1Service(
api_version="v1",
kind="Service",
metadata=V1ObjectMeta(name="materialized"),
spec=V1ServiceSpec(
type="ExternalName",
external_name=f"environmentd.{namespace}.svc.cluster.local",
),
)
class EnvironmentdStatefulSet(K8sStatefulSet):
def __init__(
self,
tag: str | None = None,
release_mode: bool = True,
coverage_mode: bool = False,
sanitizer_mode: str = "none",
log_filter: str | None = None,
namespace: str = DEFAULT_K8S_NAMESPACE,
minio_namespace: str = DEFAULT_K8S_NAMESPACE,
cockroach_namespace: str = DEFAULT_K8S_NAMESPACE,
apply_node_selectors: bool = False,
) -> None:
self.tag = tag
self.release_mode = release_mode
self.coverage_mode = coverage_mode
self.sanitizer_mode = sanitizer_mode
self.log_filter = log_filter
self.env: dict[str, str] = {}
self.extra_args: list[str] = []
self.minio_namespace = minio_namespace
self.cockroach_namespace = cockroach_namespace
self.apply_node_selectors = apply_node_selectors
super().__init__(namespace)
def generate_stateful_set(self) -> V1StatefulSet:
metadata = V1ObjectMeta(name="environmentd", labels={"app": "environmentd"})
label_selector = V1LabelSelector(match_labels={"app": "environmentd"})
ports = [V1ContainerPort(container_port=5432, name="sql")]
volume_mounts = []
if self.coverage_mode:
volume_mounts.append(V1VolumeMount(name="coverage", mount_path="/coverage"))
container = V1Container(
name="environmentd",
image=self.image(
"environmentd",
tag=self.tag,
release_mode=self.release_mode,
),
args=self.args(),
env=self.env_vars(),
ports=ports,
volume_mounts=volume_mounts,
)
node_selector = None
if self.apply_node_selectors:
node_selector = {"environmentd": "true"}
taint_toleration = V1Toleration(
key="environmentd",
operator="Equal",
value="true",
effect="NoSchedule",
)
pod_spec = V1PodSpec(
containers=[container],
tolerations=[taint_toleration],
node_selector=node_selector,
termination_grace_period_seconds=0,
)
template_spec = V1PodTemplateSpec(metadata=metadata, spec=pod_spec)
return V1StatefulSet(
api_version="apps/v1",
kind="StatefulSet",
metadata=metadata,
spec=V1StatefulSetSpec(
service_name="environmentd",
replicas=1,
pod_management_policy="Parallel",
selector=label_selector,
template=template_spec,
volume_claim_templates=self.claim_templates(),
),
)
def claim_templates(self) -> list[V1PersistentVolumeClaim]:
claim_templates = []
if self.coverage_mode:
claim_templates.append(
V1PersistentVolumeClaim(
metadata=V1ObjectMeta(name="coverage"),
spec=V1PersistentVolumeClaimSpec(
access_modes=["ReadWriteOnce"],
resources=V1ResourceRequirements(requests={"storage": "10Gi"}),
),
)
)
return claim_templates
def args(self) -> list[str]:
s3_endpoint = urllib.parse.quote(
f"http://minio-service.{self.minio_namespace}:9000"
)
args = [
"--availability-zone=1",
"--availability-zone=2",
"--availability-zone=3",
"--aws-account-id=123456789000",
"--aws-external-id-prefix=eb5cb59b-e2fe-41f3-87ca-d2176a495345",
"--announce-egress-ip=1.2.3.4",
"--announce-egress-ip=88.77.66.55",
"--environment-id=cloudtest-test-00000000-0000-0000-0000-000000000000-0",
f"--persist-blob-url=s3://minio:minio123@persist/persist?endpoint={s3_endpoint}®ion=minio",
"--orchestrator=kubernetes",
"--orchestrator-kubernetes-image-pull-policy=if-not-present",
f"--persist-consensus-url=postgres://root@cockroach.{self.cockroach_namespace}:26257?options=--search_path=consensus",
f"--storage-stash-url=postgres://root@cockroach.{self.cockroach_namespace}:26257?options=--search_path=storage",
"--internal-sql-listen-addr=0.0.0.0:6877",
"--internal-http-listen-addr=0.0.0.0:6878",
"--unsafe-mode",
# cloudtest may be called upon to spin up older versions of
# Materialize too! If you are adding a command-line option that is
# only supported on newer releases, do not add it here. Add it as a
# version-gated argument below, using `self._meets_minimum_version`.
]
if self._meets_minimum_version("0.38.0"):
args += [
"--clusterd-image",
self.image(
"clusterd",
tag=self.tag,
release_mode=self.release_mode,
),
]
else:
args += [
"--storaged-image",
self.image(
"storaged",
tag=self.tag,
release_mode=self.release_mode,
),
"--computed-image",
self.image(
"computed",
tag=self.tag,
release_mode=self.release_mode,
),
]
if self._meets_minimum_version("0.53.0"):
args += [
"--bootstrap-role",
"materialize",
]
if self._meets_minimum_version("0.54.0"):
args += [
"--internal-persist-pubsub-listen-addr=0.0.0.0:6879",
"--persist-pubsub-url=http://persist-pubsub",
]
if self._meets_minimum_version("0.60.0-dev"):
args += [
# Kind sets up a basic local-file storage class based on Rancher, named `standard`
"--orchestrator-kubernetes-ephemeral-volume-class=standard"
]
if self._meets_minimum_version("0.63.0-dev"):
args += ["--secrets-controller=kubernetes"]
if self._meets_minimum_version("0.79.0-dev"):
args += [
f"--timestamp-oracle-url=postgres://root@cockroach.{self.cockroach_namespace}:26257?options=--search_path=tsoracle"
]
return args + self.extra_args
def env_vars(self) -> list[V1EnvVar]:
system_parameter_defaults = DEFAULT_SYSTEM_PARAMETERS
if self.log_filter:
system_parameter_defaults["log_filter"] = self.log_filter
if self._meets_maximum_version("0.63.99"):
system_parameter_defaults["enable_managed_clusters"] = "true"
value_from = V1EnvVarSource(
field_ref=V1ObjectFieldSelector(field_path="metadata.name")
)
env = [
V1EnvVar(name="MZ_SOFT_ASSERTIONS", value="1"),
V1EnvVar(name="MZ_POD_NAME", value_from=value_from),
V1EnvVar(name="AWS_REGION", value="minio"),
V1EnvVar(name="AWS_ACCESS_KEY_ID", value="minio"),
V1EnvVar(name="AWS_SECRET_ACCESS_KEY", value="minio123"),
V1EnvVar(name="MZ_ANNOUNCE_EGRESS_IP", value="1.2.3.4,88.77.66.55"),
V1EnvVar(name="MZ_AWS_ACCOUNT_ID", value="123456789000"),
V1EnvVar(
name="MZ_AWS_EXTERNAL_ID_PREFIX",
value="eb5cb59b-e2fe-41f3-87ca-d2176a495345",
),
V1EnvVar(
name="MZ_AWS_PRIVATELINK_AVAILABILITY_ZONES", value="use1-az1,use1-az2"
),
V1EnvVar(
name="MZ_AWS_CONNECTION_ROLE_ARN",
value="arn:aws:iam::123456789000:role/MaterializeConnection",
),
V1EnvVar(
name="MZ_SYSTEM_PARAMETER_DEFAULT",
value=";".join(
[
f"{key}={value}"
for key, value in system_parameter_defaults.items()
]
),
),
# Set the adapter stash URL for older environments that need it (versions before
# v0.92.0).
V1EnvVar(
name="MZ_ADAPTER_STASH_URL",
value=f"postgres://root@cockroach.{self.cockroach_namespace}:26257?options=--search_path=adapter",
),
]
if self.coverage_mode:
env.extend(
[
V1EnvVar(
name="LLVM_PROFILE_FILE",
value="/coverage/environmentd-%p-%9m%c.profraw",
),
V1EnvVar(
name="CI_COVERAGE_ENABLED",
value="1",
),
V1EnvVar(name="MZ_ORCHESTRATOR_KUBERNETES_COVERAGE", value="1"),
]
)
if self.sanitizer_mode != "none":
env.extend(
[
V1EnvVar(
name="CI_SANITIZER_MODE",
value=self.sanitizer_mode,
),
]
)
for k, v in self.env.items():
env.append(V1EnvVar(name=k, value=v))
return env
def _meets_version(self, version: str, operator: Callable, default: bool) -> bool:
"""Determine whether environmentd matches a given version based on a comparison operator"""
if self.tag is None:
return default
try:
tag_version = MzVersion.parse_mz(self.tag)
except ValueError:
return default
cmp_version = MzVersion.parse_without_prefix(version)
return bool(operator(tag_version, cmp_version))
def _meets_minimum_version(self, version: str) -> bool:
return self._meets_version(version=version, operator=operator.ge, default=True)
def _meets_maximum_version(self, version: str) -> bool:
return self._meets_version(version=version, operator=operator.le, default=False)
Classes
class EnvironmentdService (namespace: str = 'default')
-
Expand source code Browse git
class EnvironmentdService(K8sService): def __init__(self, namespace: str = DEFAULT_K8S_NAMESPACE) -> None: super().__init__(namespace) service_port = V1ServicePort(name="sql", port=6875) http_port = V1ServicePort(name="http", port=6876) internal_port = V1ServicePort(name="internal", port=6877) internal_http_port = V1ServicePort(name="internalhttp", port=6878) self.service = V1Service( api_version="v1", kind="Service", metadata=V1ObjectMeta(name="environmentd", labels={"app": "environmentd"}), spec=V1ServiceSpec( type="NodePort", ports=[service_port, internal_port, http_port, internal_http_port], selector={"app": "environmentd"}, ), )
Ancestors
Inherited members
class EnvironmentdStatefulSet (tag: str | None = None, release_mode: bool = True, coverage_mode: bool = False, sanitizer_mode: str = 'none', log_filter: str | None = None, namespace: str = 'default', minio_namespace: str = 'default', cockroach_namespace: str = 'default', apply_node_selectors: bool = False)
-
Expand source code Browse git
class EnvironmentdStatefulSet(K8sStatefulSet): def __init__( self, tag: str | None = None, release_mode: bool = True, coverage_mode: bool = False, sanitizer_mode: str = "none", log_filter: str | None = None, namespace: str = DEFAULT_K8S_NAMESPACE, minio_namespace: str = DEFAULT_K8S_NAMESPACE, cockroach_namespace: str = DEFAULT_K8S_NAMESPACE, apply_node_selectors: bool = False, ) -> None: self.tag = tag self.release_mode = release_mode self.coverage_mode = coverage_mode self.sanitizer_mode = sanitizer_mode self.log_filter = log_filter self.env: dict[str, str] = {} self.extra_args: list[str] = [] self.minio_namespace = minio_namespace self.cockroach_namespace = cockroach_namespace self.apply_node_selectors = apply_node_selectors super().__init__(namespace) def generate_stateful_set(self) -> V1StatefulSet: metadata = V1ObjectMeta(name="environmentd", labels={"app": "environmentd"}) label_selector = V1LabelSelector(match_labels={"app": "environmentd"}) ports = [V1ContainerPort(container_port=5432, name="sql")] volume_mounts = [] if self.coverage_mode: volume_mounts.append(V1VolumeMount(name="coverage", mount_path="/coverage")) container = V1Container( name="environmentd", image=self.image( "environmentd", tag=self.tag, release_mode=self.release_mode, ), args=self.args(), env=self.env_vars(), ports=ports, volume_mounts=volume_mounts, ) node_selector = None if self.apply_node_selectors: node_selector = {"environmentd": "true"} taint_toleration = V1Toleration( key="environmentd", operator="Equal", value="true", effect="NoSchedule", ) pod_spec = V1PodSpec( containers=[container], tolerations=[taint_toleration], node_selector=node_selector, termination_grace_period_seconds=0, ) template_spec = V1PodTemplateSpec(metadata=metadata, spec=pod_spec) return V1StatefulSet( api_version="apps/v1", kind="StatefulSet", metadata=metadata, spec=V1StatefulSetSpec( service_name="environmentd", replicas=1, pod_management_policy="Parallel", selector=label_selector, template=template_spec, volume_claim_templates=self.claim_templates(), ), ) def claim_templates(self) -> list[V1PersistentVolumeClaim]: claim_templates = [] if self.coverage_mode: claim_templates.append( V1PersistentVolumeClaim( metadata=V1ObjectMeta(name="coverage"), spec=V1PersistentVolumeClaimSpec( access_modes=["ReadWriteOnce"], resources=V1ResourceRequirements(requests={"storage": "10Gi"}), ), ) ) return claim_templates def args(self) -> list[str]: s3_endpoint = urllib.parse.quote( f"http://minio-service.{self.minio_namespace}:9000" ) args = [ "--availability-zone=1", "--availability-zone=2", "--availability-zone=3", "--aws-account-id=123456789000", "--aws-external-id-prefix=eb5cb59b-e2fe-41f3-87ca-d2176a495345", "--announce-egress-ip=1.2.3.4", "--announce-egress-ip=88.77.66.55", "--environment-id=cloudtest-test-00000000-0000-0000-0000-000000000000-0", f"--persist-blob-url=s3://minio:minio123@persist/persist?endpoint={s3_endpoint}®ion=minio", "--orchestrator=kubernetes", "--orchestrator-kubernetes-image-pull-policy=if-not-present", f"--persist-consensus-url=postgres://root@cockroach.{self.cockroach_namespace}:26257?options=--search_path=consensus", f"--storage-stash-url=postgres://root@cockroach.{self.cockroach_namespace}:26257?options=--search_path=storage", "--internal-sql-listen-addr=0.0.0.0:6877", "--internal-http-listen-addr=0.0.0.0:6878", "--unsafe-mode", # cloudtest may be called upon to spin up older versions of # Materialize too! If you are adding a command-line option that is # only supported on newer releases, do not add it here. Add it as a # version-gated argument below, using `self._meets_minimum_version`. ] if self._meets_minimum_version("0.38.0"): args += [ "--clusterd-image", self.image( "clusterd", tag=self.tag, release_mode=self.release_mode, ), ] else: args += [ "--storaged-image", self.image( "storaged", tag=self.tag, release_mode=self.release_mode, ), "--computed-image", self.image( "computed", tag=self.tag, release_mode=self.release_mode, ), ] if self._meets_minimum_version("0.53.0"): args += [ "--bootstrap-role", "materialize", ] if self._meets_minimum_version("0.54.0"): args += [ "--internal-persist-pubsub-listen-addr=0.0.0.0:6879", "--persist-pubsub-url=http://persist-pubsub", ] if self._meets_minimum_version("0.60.0-dev"): args += [ # Kind sets up a basic local-file storage class based on Rancher, named `standard` "--orchestrator-kubernetes-ephemeral-volume-class=standard" ] if self._meets_minimum_version("0.63.0-dev"): args += ["--secrets-controller=kubernetes"] if self._meets_minimum_version("0.79.0-dev"): args += [ f"--timestamp-oracle-url=postgres://root@cockroach.{self.cockroach_namespace}:26257?options=--search_path=tsoracle" ] return args + self.extra_args def env_vars(self) -> list[V1EnvVar]: system_parameter_defaults = DEFAULT_SYSTEM_PARAMETERS if self.log_filter: system_parameter_defaults["log_filter"] = self.log_filter if self._meets_maximum_version("0.63.99"): system_parameter_defaults["enable_managed_clusters"] = "true" value_from = V1EnvVarSource( field_ref=V1ObjectFieldSelector(field_path="metadata.name") ) env = [ V1EnvVar(name="MZ_SOFT_ASSERTIONS", value="1"), V1EnvVar(name="MZ_POD_NAME", value_from=value_from), V1EnvVar(name="AWS_REGION", value="minio"), V1EnvVar(name="AWS_ACCESS_KEY_ID", value="minio"), V1EnvVar(name="AWS_SECRET_ACCESS_KEY", value="minio123"), V1EnvVar(name="MZ_ANNOUNCE_EGRESS_IP", value="1.2.3.4,88.77.66.55"), V1EnvVar(name="MZ_AWS_ACCOUNT_ID", value="123456789000"), V1EnvVar( name="MZ_AWS_EXTERNAL_ID_PREFIX", value="eb5cb59b-e2fe-41f3-87ca-d2176a495345", ), V1EnvVar( name="MZ_AWS_PRIVATELINK_AVAILABILITY_ZONES", value="use1-az1,use1-az2" ), V1EnvVar( name="MZ_AWS_CONNECTION_ROLE_ARN", value="arn:aws:iam::123456789000:role/MaterializeConnection", ), V1EnvVar( name="MZ_SYSTEM_PARAMETER_DEFAULT", value=";".join( [ f"{key}={value}" for key, value in system_parameter_defaults.items() ] ), ), # Set the adapter stash URL for older environments that need it (versions before # v0.92.0). V1EnvVar( name="MZ_ADAPTER_STASH_URL", value=f"postgres://root@cockroach.{self.cockroach_namespace}:26257?options=--search_path=adapter", ), ] if self.coverage_mode: env.extend( [ V1EnvVar( name="LLVM_PROFILE_FILE", value="/coverage/environmentd-%p-%9m%c.profraw", ), V1EnvVar( name="CI_COVERAGE_ENABLED", value="1", ), V1EnvVar(name="MZ_ORCHESTRATOR_KUBERNETES_COVERAGE", value="1"), ] ) if self.sanitizer_mode != "none": env.extend( [ V1EnvVar( name="CI_SANITIZER_MODE", value=self.sanitizer_mode, ), ] ) for k, v in self.env.items(): env.append(V1EnvVar(name=k, value=v)) return env def _meets_version(self, version: str, operator: Callable, default: bool) -> bool: """Determine whether environmentd matches a given version based on a comparison operator""" if self.tag is None: return default try: tag_version = MzVersion.parse_mz(self.tag) except ValueError: return default cmp_version = MzVersion.parse_without_prefix(version) return bool(operator(tag_version, cmp_version)) def _meets_minimum_version(self, version: str) -> bool: return self._meets_version(version=version, operator=operator.ge, default=True) def _meets_maximum_version(self, version: str) -> bool: return self._meets_version(version=version, operator=operator.le, default=False)
Ancestors
Methods
def args(self) ‑> list[str]
-
Expand source code Browse git
def args(self) -> list[str]: s3_endpoint = urllib.parse.quote( f"http://minio-service.{self.minio_namespace}:9000" ) args = [ "--availability-zone=1", "--availability-zone=2", "--availability-zone=3", "--aws-account-id=123456789000", "--aws-external-id-prefix=eb5cb59b-e2fe-41f3-87ca-d2176a495345", "--announce-egress-ip=1.2.3.4", "--announce-egress-ip=88.77.66.55", "--environment-id=cloudtest-test-00000000-0000-0000-0000-000000000000-0", f"--persist-blob-url=s3://minio:minio123@persist/persist?endpoint={s3_endpoint}®ion=minio", "--orchestrator=kubernetes", "--orchestrator-kubernetes-image-pull-policy=if-not-present", f"--persist-consensus-url=postgres://root@cockroach.{self.cockroach_namespace}:26257?options=--search_path=consensus", f"--storage-stash-url=postgres://root@cockroach.{self.cockroach_namespace}:26257?options=--search_path=storage", "--internal-sql-listen-addr=0.0.0.0:6877", "--internal-http-listen-addr=0.0.0.0:6878", "--unsafe-mode", # cloudtest may be called upon to spin up older versions of # Materialize too! If you are adding a command-line option that is # only supported on newer releases, do not add it here. Add it as a # version-gated argument below, using `self._meets_minimum_version`. ] if self._meets_minimum_version("0.38.0"): args += [ "--clusterd-image", self.image( "clusterd", tag=self.tag, release_mode=self.release_mode, ), ] else: args += [ "--storaged-image", self.image( "storaged", tag=self.tag, release_mode=self.release_mode, ), "--computed-image", self.image( "computed", tag=self.tag, release_mode=self.release_mode, ), ] if self._meets_minimum_version("0.53.0"): args += [ "--bootstrap-role", "materialize", ] if self._meets_minimum_version("0.54.0"): args += [ "--internal-persist-pubsub-listen-addr=0.0.0.0:6879", "--persist-pubsub-url=http://persist-pubsub", ] if self._meets_minimum_version("0.60.0-dev"): args += [ # Kind sets up a basic local-file storage class based on Rancher, named `standard` "--orchestrator-kubernetes-ephemeral-volume-class=standard" ] if self._meets_minimum_version("0.63.0-dev"): args += ["--secrets-controller=kubernetes"] if self._meets_minimum_version("0.79.0-dev"): args += [ f"--timestamp-oracle-url=postgres://root@cockroach.{self.cockroach_namespace}:26257?options=--search_path=tsoracle" ] return args + self.extra_args
def claim_templates(self) ‑> list[kubernetes.client.models.v1_persistent_volume_claim.V1PersistentVolumeClaim]
-
Expand source code Browse git
def claim_templates(self) -> list[V1PersistentVolumeClaim]: claim_templates = [] if self.coverage_mode: claim_templates.append( V1PersistentVolumeClaim( metadata=V1ObjectMeta(name="coverage"), spec=V1PersistentVolumeClaimSpec( access_modes=["ReadWriteOnce"], resources=V1ResourceRequirements(requests={"storage": "10Gi"}), ), ) ) return claim_templates
def env_vars(self) ‑> list[kubernetes.client.models.v1_env_var.V1EnvVar]
-
Expand source code Browse git
def env_vars(self) -> list[V1EnvVar]: system_parameter_defaults = DEFAULT_SYSTEM_PARAMETERS if self.log_filter: system_parameter_defaults["log_filter"] = self.log_filter if self._meets_maximum_version("0.63.99"): system_parameter_defaults["enable_managed_clusters"] = "true" value_from = V1EnvVarSource( field_ref=V1ObjectFieldSelector(field_path="metadata.name") ) env = [ V1EnvVar(name="MZ_SOFT_ASSERTIONS", value="1"), V1EnvVar(name="MZ_POD_NAME", value_from=value_from), V1EnvVar(name="AWS_REGION", value="minio"), V1EnvVar(name="AWS_ACCESS_KEY_ID", value="minio"), V1EnvVar(name="AWS_SECRET_ACCESS_KEY", value="minio123"), V1EnvVar(name="MZ_ANNOUNCE_EGRESS_IP", value="1.2.3.4,88.77.66.55"), V1EnvVar(name="MZ_AWS_ACCOUNT_ID", value="123456789000"), V1EnvVar( name="MZ_AWS_EXTERNAL_ID_PREFIX", value="eb5cb59b-e2fe-41f3-87ca-d2176a495345", ), V1EnvVar( name="MZ_AWS_PRIVATELINK_AVAILABILITY_ZONES", value="use1-az1,use1-az2" ), V1EnvVar( name="MZ_AWS_CONNECTION_ROLE_ARN", value="arn:aws:iam::123456789000:role/MaterializeConnection", ), V1EnvVar( name="MZ_SYSTEM_PARAMETER_DEFAULT", value=";".join( [ f"{key}={value}" for key, value in system_parameter_defaults.items() ] ), ), # Set the adapter stash URL for older environments that need it (versions before # v0.92.0). V1EnvVar( name="MZ_ADAPTER_STASH_URL", value=f"postgres://root@cockroach.{self.cockroach_namespace}:26257?options=--search_path=adapter", ), ] if self.coverage_mode: env.extend( [ V1EnvVar( name="LLVM_PROFILE_FILE", value="/coverage/environmentd-%p-%9m%c.profraw", ), V1EnvVar( name="CI_COVERAGE_ENABLED", value="1", ), V1EnvVar(name="MZ_ORCHESTRATOR_KUBERNETES_COVERAGE", value="1"), ] ) if self.sanitizer_mode != "none": env.extend( [ V1EnvVar( name="CI_SANITIZER_MODE", value=self.sanitizer_mode, ), ] ) for k, v in self.env.items(): env.append(V1EnvVar(name=k, value=v)) return env
def generate_stateful_set(self) ‑> kubernetes.client.models.v1_stateful_set.V1StatefulSet
-
Expand source code Browse git
def generate_stateful_set(self) -> V1StatefulSet: metadata = V1ObjectMeta(name="environmentd", labels={"app": "environmentd"}) label_selector = V1LabelSelector(match_labels={"app": "environmentd"}) ports = [V1ContainerPort(container_port=5432, name="sql")] volume_mounts = [] if self.coverage_mode: volume_mounts.append(V1VolumeMount(name="coverage", mount_path="/coverage")) container = V1Container( name="environmentd", image=self.image( "environmentd", tag=self.tag, release_mode=self.release_mode, ), args=self.args(), env=self.env_vars(), ports=ports, volume_mounts=volume_mounts, ) node_selector = None if self.apply_node_selectors: node_selector = {"environmentd": "true"} taint_toleration = V1Toleration( key="environmentd", operator="Equal", value="true", effect="NoSchedule", ) pod_spec = V1PodSpec( containers=[container], tolerations=[taint_toleration], node_selector=node_selector, termination_grace_period_seconds=0, ) template_spec = V1PodTemplateSpec(metadata=metadata, spec=pod_spec) return V1StatefulSet( api_version="apps/v1", kind="StatefulSet", metadata=metadata, spec=V1StatefulSetSpec( service_name="environmentd", replicas=1, pod_management_policy="Parallel", selector=label_selector, template=template_spec, volume_claim_templates=self.claim_templates(), ), )
class MaterializedAliasService (namespace: str = 'default')
-
Some testdrive tests expect that Mz is accessible as 'materialized'
Expand source code Browse git
class MaterializedAliasService(K8sService): """Some testdrive tests expect that Mz is accessible as 'materialized'""" def __init__(self, namespace: str = DEFAULT_K8S_NAMESPACE) -> None: super().__init__(namespace) self.service = V1Service( api_version="v1", kind="Service", metadata=V1ObjectMeta(name="materialized"), spec=V1ServiceSpec( type="ExternalName", external_name=f"environmentd.{namespace}.svc.cluster.local", ), )
Ancestors
Inherited members