Module materialize.cloudtest.k8s.environmentd
Expand source code Browse git
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.
import urllib.parse
from typing import Dict, Optional
from kubernetes.client import (
V1Container,
V1ContainerPort,
V1EnvVar,
V1EnvVarSource,
V1LabelSelector,
V1ObjectFieldSelector,
V1ObjectMeta,
V1PersistentVolumeClaim,
V1PersistentVolumeClaimSpec,
V1PodSpec,
V1PodTemplateSpec,
V1ResourceRequirements,
V1Service,
V1ServicePort,
V1ServiceSpec,
V1StatefulSet,
V1StatefulSetSpec,
V1VolumeMount,
)
from semver import Version
from materialize.cloudtest.k8s import K8sService, K8sStatefulSet
class EnvironmentdService(K8sService):
def __init__(self) -> None:
service_port = V1ServicePort(name="sql", port=6875)
internal_port = V1ServicePort(name="internal", port=6877)
self.service = V1Service(
api_version="v1",
kind="Service",
metadata=V1ObjectMeta(name="environmentd", labels={"app": "environmentd"}),
spec=V1ServiceSpec(
type="NodePort",
ports=[service_port, internal_port],
selector={"app": "environmentd"},
),
)
class MaterializedAliasService(K8sService):
"""Some testdrive tests expect that Mz is accessible as 'materialized'"""
def __init__(self) -> None:
self.service = V1Service(
api_version="v1",
kind="Service",
metadata=V1ObjectMeta(name="materialized"),
spec=V1ServiceSpec(
type="ExternalName",
external_name="environmentd.default.svc.cluster.local",
),
)
class EnvironmentdStatefulSet(K8sStatefulSet):
def __init__(
self,
tag: Optional[str] = None,
release_mode: bool = True,
log_filter: Optional[str] = None,
) -> None:
self.tag = tag
self.release_mode = release_mode
self.log_filter = log_filter
self.env: Dict[str, str] = {}
super().__init__()
def generate_stateful_set(self) -> V1StatefulSet:
metadata = V1ObjectMeta(name="environmentd", labels={"app": "environmentd"})
label_selector = V1LabelSelector(match_labels={"app": "environmentd"})
value_from = V1EnvVarSource(
field_ref=V1ObjectFieldSelector(field_path="metadata.name")
)
env = [
V1EnvVar(name="MZ_POD_NAME", value_from=value_from),
V1EnvVar(name="AWS_REGION", value="minio"),
V1EnvVar(name="AWS_ACCESS_KEY_ID", value="minio"),
V1EnvVar(name="AWS_SECRET_ACCESS_KEY", value="minio123"),
V1EnvVar(name="MZ_ANNOUNCE_EGRESS_IP", value="1.2.3.4,88.77.66.55"),
V1EnvVar(name="MZ_AWS_ACCOUNT_ID", value="123456789000"),
V1EnvVar(
name="MZ_AWS_EXTERNAL_ID_PREFIX",
value="eb5cb59b-e2fe-41f3-87ca-d2176a495345",
),
V1EnvVar(
name="MZ_AWS_PRIVATELINK_AVAILABILITY_ZONES", value="use1-az1,use1-az2"
),
]
for (k, v) in self.env.items():
env.append(V1EnvVar(name=k, value=v))
ports = [V1ContainerPort(container_port=5432, name="sql")]
volume_mounts = [
V1VolumeMount(name="data", mount_path="/data"),
]
s3_endpoint = urllib.parse.quote("http://minio-service.default:9000")
args = [
"--availability-zone=cloudtest-worker",
"--availability-zone=cloudtest-worker2",
"--availability-zone=cloudtest-worker3",
"--aws-account-id=123456789000",
"--aws-external-id-prefix=eb5cb59b-e2fe-41f3-87ca-d2176a495345",
"--announce-egress-ip=1.2.3.4",
"--announce-egress-ip=88.77.66.55",
"--environment-id=cloudtest-test-00000000-0000-0000-0000-000000000000-0",
f"--persist-blob-url=s3://minio:minio123@persist/persist?endpoint={s3_endpoint}®ion=minio",
"--orchestrator=kubernetes",
"--orchestrator-kubernetes-image-pull-policy=if-not-present",
"--persist-consensus-url=postgres://root@cockroach.default:26257?options=--search_path=consensus",
"--adapter-stash-url=postgres://root@cockroach.default:26257?options=--search_path=adapter",
"--storage-stash-url=postgres://root@cockroach.default:26257?options=--search_path=storage",
"--internal-sql-listen-addr=0.0.0.0:6877",
"--unsafe-mode",
# cloudtest may be called upon to spin up older versions of
# Materializel too! If you are adding a command-line option that is
# only supported on newer releases, do not add it here. Add it as a
# version-gated argument below, using `self._meets_minimum_version`.
]
if self.log_filter:
args += [f"--log-filter={self.log_filter}"]
if self._meets_minimum_version("0.38.0"):
args += [
"--clusterd-image",
self.image("clusterd", tag=self.tag, release_mode=self.release_mode),
]
else:
args += [
"--storaged-image",
self.image("storaged", tag=self.tag, release_mode=self.release_mode),
"--computed-image",
self.image("computed", tag=self.tag, release_mode=self.release_mode),
]
container = V1Container(
name="environmentd",
image=self.image(
"environmentd", tag=self.tag, release_mode=self.release_mode
),
args=args,
env=env,
ports=ports,
volume_mounts=volume_mounts,
)
pod_spec = V1PodSpec(containers=[container])
template_spec = V1PodTemplateSpec(metadata=metadata, spec=pod_spec)
claim_templates = [
V1PersistentVolumeClaim(
metadata=V1ObjectMeta(name="data"),
spec=V1PersistentVolumeClaimSpec(
access_modes=["ReadWriteOnce"],
resources=V1ResourceRequirements(requests={"storage": "1Gi"}),
),
)
]
return V1StatefulSet(
api_version="apps/v1",
kind="StatefulSet",
metadata=metadata,
spec=V1StatefulSetSpec(
service_name="environmentd",
replicas=1,
pod_management_policy="Parallel",
selector=label_selector,
template=template_spec,
volume_claim_templates=claim_templates,
),
)
def _meets_minimum_version(self, minimum: str) -> bool:
"""Determine whether environmentd is at least the `minimum` version.
This function matches the function of the same name in MaterializeInc/cloud.
"""
# Assume that unstable and development versions, as indicated by a
# missing or unparseable tag, are always recent enough to support all
# features.
#
# TODO: learn how to do real feature detection on arbitrary versions.
if self.tag is None:
return True
try:
tag_version = Version.parse(self.tag.removeprefix("v"))
except ValueError:
return True
minimum_version = Version.parse(minimum)
return tag_version >= minimum_version
Classes
class EnvironmentdService
-
Expand source code Browse git
class EnvironmentdService(K8sService): def __init__(self) -> None: service_port = V1ServicePort(name="sql", port=6875) internal_port = V1ServicePort(name="internal", port=6877) self.service = V1Service( api_version="v1", kind="Service", metadata=V1ObjectMeta(name="environmentd", labels={"app": "environmentd"}), spec=V1ServiceSpec( type="NodePort", ports=[service_port, internal_port], selector={"app": "environmentd"}, ), )
Ancestors
Inherited members
class EnvironmentdStatefulSet (tag: Optional[str] = None, release_mode: bool = True, log_filter: Optional[str] = None)
-
Expand source code Browse git
class EnvironmentdStatefulSet(K8sStatefulSet): def __init__( self, tag: Optional[str] = None, release_mode: bool = True, log_filter: Optional[str] = None, ) -> None: self.tag = tag self.release_mode = release_mode self.log_filter = log_filter self.env: Dict[str, str] = {} super().__init__() def generate_stateful_set(self) -> V1StatefulSet: metadata = V1ObjectMeta(name="environmentd", labels={"app": "environmentd"}) label_selector = V1LabelSelector(match_labels={"app": "environmentd"}) value_from = V1EnvVarSource( field_ref=V1ObjectFieldSelector(field_path="metadata.name") ) env = [ V1EnvVar(name="MZ_POD_NAME", value_from=value_from), V1EnvVar(name="AWS_REGION", value="minio"), V1EnvVar(name="AWS_ACCESS_KEY_ID", value="minio"), V1EnvVar(name="AWS_SECRET_ACCESS_KEY", value="minio123"), V1EnvVar(name="MZ_ANNOUNCE_EGRESS_IP", value="1.2.3.4,88.77.66.55"), V1EnvVar(name="MZ_AWS_ACCOUNT_ID", value="123456789000"), V1EnvVar( name="MZ_AWS_EXTERNAL_ID_PREFIX", value="eb5cb59b-e2fe-41f3-87ca-d2176a495345", ), V1EnvVar( name="MZ_AWS_PRIVATELINK_AVAILABILITY_ZONES", value="use1-az1,use1-az2" ), ] for (k, v) in self.env.items(): env.append(V1EnvVar(name=k, value=v)) ports = [V1ContainerPort(container_port=5432, name="sql")] volume_mounts = [ V1VolumeMount(name="data", mount_path="/data"), ] s3_endpoint = urllib.parse.quote("http://minio-service.default:9000") args = [ "--availability-zone=cloudtest-worker", "--availability-zone=cloudtest-worker2", "--availability-zone=cloudtest-worker3", "--aws-account-id=123456789000", "--aws-external-id-prefix=eb5cb59b-e2fe-41f3-87ca-d2176a495345", "--announce-egress-ip=1.2.3.4", "--announce-egress-ip=88.77.66.55", "--environment-id=cloudtest-test-00000000-0000-0000-0000-000000000000-0", f"--persist-blob-url=s3://minio:minio123@persist/persist?endpoint={s3_endpoint}®ion=minio", "--orchestrator=kubernetes", "--orchestrator-kubernetes-image-pull-policy=if-not-present", "--persist-consensus-url=postgres://root@cockroach.default:26257?options=--search_path=consensus", "--adapter-stash-url=postgres://root@cockroach.default:26257?options=--search_path=adapter", "--storage-stash-url=postgres://root@cockroach.default:26257?options=--search_path=storage", "--internal-sql-listen-addr=0.0.0.0:6877", "--unsafe-mode", # cloudtest may be called upon to spin up older versions of # Materializel too! If you are adding a command-line option that is # only supported on newer releases, do not add it here. Add it as a # version-gated argument below, using `self._meets_minimum_version`. ] if self.log_filter: args += [f"--log-filter={self.log_filter}"] if self._meets_minimum_version("0.38.0"): args += [ "--clusterd-image", self.image("clusterd", tag=self.tag, release_mode=self.release_mode), ] else: args += [ "--storaged-image", self.image("storaged", tag=self.tag, release_mode=self.release_mode), "--computed-image", self.image("computed", tag=self.tag, release_mode=self.release_mode), ] container = V1Container( name="environmentd", image=self.image( "environmentd", tag=self.tag, release_mode=self.release_mode ), args=args, env=env, ports=ports, volume_mounts=volume_mounts, ) pod_spec = V1PodSpec(containers=[container]) template_spec = V1PodTemplateSpec(metadata=metadata, spec=pod_spec) claim_templates = [ V1PersistentVolumeClaim( metadata=V1ObjectMeta(name="data"), spec=V1PersistentVolumeClaimSpec( access_modes=["ReadWriteOnce"], resources=V1ResourceRequirements(requests={"storage": "1Gi"}), ), ) ] return V1StatefulSet( api_version="apps/v1", kind="StatefulSet", metadata=metadata, spec=V1StatefulSetSpec( service_name="environmentd", replicas=1, pod_management_policy="Parallel", selector=label_selector, template=template_spec, volume_claim_templates=claim_templates, ), ) def _meets_minimum_version(self, minimum: str) -> bool: """Determine whether environmentd is at least the `minimum` version. This function matches the function of the same name in MaterializeInc/cloud. """ # Assume that unstable and development versions, as indicated by a # missing or unparseable tag, are always recent enough to support all # features. # # TODO: learn how to do real feature detection on arbitrary versions. if self.tag is None: return True try: tag_version = Version.parse(self.tag.removeprefix("v")) except ValueError: return True minimum_version = Version.parse(minimum) return tag_version >= minimum_version
Ancestors
Methods
def generate_stateful_set(self) ‑> kubernetes.client.models.v1_stateful_set.V1StatefulSet
-
Expand source code Browse git
def generate_stateful_set(self) -> V1StatefulSet: metadata = V1ObjectMeta(name="environmentd", labels={"app": "environmentd"}) label_selector = V1LabelSelector(match_labels={"app": "environmentd"}) value_from = V1EnvVarSource( field_ref=V1ObjectFieldSelector(field_path="metadata.name") ) env = [ V1EnvVar(name="MZ_POD_NAME", value_from=value_from), V1EnvVar(name="AWS_REGION", value="minio"), V1EnvVar(name="AWS_ACCESS_KEY_ID", value="minio"), V1EnvVar(name="AWS_SECRET_ACCESS_KEY", value="minio123"), V1EnvVar(name="MZ_ANNOUNCE_EGRESS_IP", value="1.2.3.4,88.77.66.55"), V1EnvVar(name="MZ_AWS_ACCOUNT_ID", value="123456789000"), V1EnvVar( name="MZ_AWS_EXTERNAL_ID_PREFIX", value="eb5cb59b-e2fe-41f3-87ca-d2176a495345", ), V1EnvVar( name="MZ_AWS_PRIVATELINK_AVAILABILITY_ZONES", value="use1-az1,use1-az2" ), ] for (k, v) in self.env.items(): env.append(V1EnvVar(name=k, value=v)) ports = [V1ContainerPort(container_port=5432, name="sql")] volume_mounts = [ V1VolumeMount(name="data", mount_path="/data"), ] s3_endpoint = urllib.parse.quote("http://minio-service.default:9000") args = [ "--availability-zone=cloudtest-worker", "--availability-zone=cloudtest-worker2", "--availability-zone=cloudtest-worker3", "--aws-account-id=123456789000", "--aws-external-id-prefix=eb5cb59b-e2fe-41f3-87ca-d2176a495345", "--announce-egress-ip=1.2.3.4", "--announce-egress-ip=88.77.66.55", "--environment-id=cloudtest-test-00000000-0000-0000-0000-000000000000-0", f"--persist-blob-url=s3://minio:minio123@persist/persist?endpoint={s3_endpoint}®ion=minio", "--orchestrator=kubernetes", "--orchestrator-kubernetes-image-pull-policy=if-not-present", "--persist-consensus-url=postgres://root@cockroach.default:26257?options=--search_path=consensus", "--adapter-stash-url=postgres://root@cockroach.default:26257?options=--search_path=adapter", "--storage-stash-url=postgres://root@cockroach.default:26257?options=--search_path=storage", "--internal-sql-listen-addr=0.0.0.0:6877", "--unsafe-mode", # cloudtest may be called upon to spin up older versions of # Materializel too! If you are adding a command-line option that is # only supported on newer releases, do not add it here. Add it as a # version-gated argument below, using `self._meets_minimum_version`. ] if self.log_filter: args += [f"--log-filter={self.log_filter}"] if self._meets_minimum_version("0.38.0"): args += [ "--clusterd-image", self.image("clusterd", tag=self.tag, release_mode=self.release_mode), ] else: args += [ "--storaged-image", self.image("storaged", tag=self.tag, release_mode=self.release_mode), "--computed-image", self.image("computed", tag=self.tag, release_mode=self.release_mode), ] container = V1Container( name="environmentd", image=self.image( "environmentd", tag=self.tag, release_mode=self.release_mode ), args=args, env=env, ports=ports, volume_mounts=volume_mounts, ) pod_spec = V1PodSpec(containers=[container]) template_spec = V1PodTemplateSpec(metadata=metadata, spec=pod_spec) claim_templates = [ V1PersistentVolumeClaim( metadata=V1ObjectMeta(name="data"), spec=V1PersistentVolumeClaimSpec( access_modes=["ReadWriteOnce"], resources=V1ResourceRequirements(requests={"storage": "1Gi"}), ), ) ] return V1StatefulSet( api_version="apps/v1", kind="StatefulSet", metadata=metadata, spec=V1StatefulSetSpec( service_name="environmentd", replicas=1, pod_management_policy="Parallel", selector=label_selector, template=template_spec, volume_claim_templates=claim_templates, ), )
class MaterializedAliasService
-
Some testdrive tests expect that Mz is accessible as 'materialized'
Expand source code Browse git
class MaterializedAliasService(K8sService): """Some testdrive tests expect that Mz is accessible as 'materialized'""" def __init__(self) -> None: self.service = V1Service( api_version="v1", kind="Service", metadata=V1ObjectMeta(name="materialized"), spec=V1ServiceSpec( type="ExternalName", external_name="environmentd.default.svc.cluster.local", ), )
Ancestors
Inherited members