Module materialize.cloudtest.app.materialize_application
Expand source code Browse git
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.
import logging
import os
import subprocess
import time
from datetime import datetime, timedelta
from pg8000.exceptions import InterfaceError
from materialize.cloudtest.app.cloudtest_application_base import (
CloudtestApplicationBase,
)
from materialize.cloudtest.k8s.api.k8s_resource import K8sResource
from materialize.cloudtest.k8s.cockroach import cockroach_resources
from materialize.cloudtest.k8s.debezium import debezium_resources
from materialize.cloudtest.k8s.environmentd import (
EnvironmentdService,
EnvironmentdStatefulSet,
MaterializedAliasService,
)
from materialize.cloudtest.k8s.minio import Minio
from materialize.cloudtest.k8s.mysql import mysql_resources
from materialize.cloudtest.k8s.persist_pubsub import PersistPubSubService
from materialize.cloudtest.k8s.postgres import postgres_resources
from materialize.cloudtest.k8s.redpanda import redpanda_resources
from materialize.cloudtest.k8s.role_binding import AdminRoleBinding
from materialize.cloudtest.k8s.ssh import ssh_resources
from materialize.cloudtest.k8s.testdrive import TestdrivePod
from materialize.cloudtest.k8s.vpc_endpoints_cluster_role import VpcEndpointsClusterRole
from materialize.cloudtest.util.wait import wait
LOGGER = logging.getLogger(__name__)
class MaterializeApplication(CloudtestApplicationBase):
def __init__(
self,
release_mode: bool = True,
tag: str | None = None,
aws_region: str | None = None,
log_filter: str | None = None,
apply_node_selectors: bool = False,
) -> None:
self.tag = tag
self.environmentd = EnvironmentdService()
self.materialized_alias = MaterializedAliasService()
self.testdrive = TestdrivePod(
release_mode=release_mode,
aws_region=aws_region,
apply_node_selectors=apply_node_selectors,
)
self.apply_node_selectors = apply_node_selectors
super().__init__(release_mode, aws_region, log_filter)
# Register the VpcEndpoint CRD.
self.register_vpc_endpoint()
self.start_metrics_server()
self.create_resources_and_wait()
def get_resources(self, log_filter: str | None) -> list[K8sResource]:
return [
*cockroach_resources(apply_node_selectors=self.apply_node_selectors),
*postgres_resources(apply_node_selectors=self.apply_node_selectors),
*mysql_resources(apply_node_selectors=self.apply_node_selectors),
*redpanda_resources(apply_node_selectors=self.apply_node_selectors),
*debezium_resources(apply_node_selectors=self.apply_node_selectors),
*ssh_resources(apply_node_selectors=self.apply_node_selectors),
Minio(apply_node_selectors=self.apply_node_selectors),
VpcEndpointsClusterRole(),
AdminRoleBinding(),
EnvironmentdStatefulSet(
release_mode=self.release_mode,
tag=self.tag,
log_filter=log_filter,
coverage_mode=self.coverage_mode(),
apply_node_selectors=self.apply_node_selectors,
),
PersistPubSubService(),
self.environmentd,
self.materialized_alias,
self.testdrive,
]
def get_images(self) -> list[str]:
return ["environmentd", "clusterd", "testdrive", "postgres"]
def register_vpc_endpoint(self) -> None:
self.kubectl(
"apply",
"-f",
os.path.join(
os.path.abspath(self.mz_root),
"src/cloud-resources/src/crd/gen/vpcendpoints.json",
),
)
def start_metrics_server(self) -> None:
self.kubectl(
"apply",
"-f",
"https://github.com/kubernetes-sigs/metrics-server/releases/download/metrics-server-helm-chart-3.8.2/components.yaml",
)
self.kubectl(
"patch",
"deployment",
"metrics-server",
"--namespace",
"kube-system",
"--type",
"json",
"-p",
'[{"op": "add", "path": "/spec/template/spec/containers/0/args/-", "value": "--kubelet-insecure-tls" }]',
)
def wait_resource_creation_completed(self) -> None:
wait(
condition="condition=Ready",
resource="pod",
label="cluster.environmentd.materialize.cloud/cluster-id=u1",
)
def wait_replicas(self) -> None:
for cluster_id in ("u1", "s1", "s2"):
wait(
condition="condition=Ready",
resource="pod",
label=f"cluster.environmentd.materialize.cloud/cluster-id={cluster_id}",
)
def wait_for_sql(self) -> None:
"""Wait until environmentd pod is ready and can accept SQL connections"""
wait(condition="condition=Ready", resource="pod/environmentd-0")
start = datetime.now()
while datetime.now() - start < timedelta(seconds=300):
try:
self.environmentd.sql("SELECT 1")
break
except InterfaceError as e:
# Since we crash environmentd, we expect some errors that we swallow.
LOGGER.info(f"SQL interface not ready, {e} while SELECT 1. Waiting...")
time.sleep(2)
def set_environmentd_failpoints(self, failpoints: str) -> None:
"""Set the FAILPOINTS environmentd variable in the stateful set. This
will most likely restart environmentd"""
stateful_set = [
resource
for resource in self.resources
if type(resource) == EnvironmentdStatefulSet
]
assert len(stateful_set) == 1
stateful_set = stateful_set[0]
stateful_set.env["FAILPOINTS"] = failpoints
stateful_set.replace()
self.wait_for_sql()
def get_k8s_value(
self, selector: str, json_path: str, remove_quotes: bool = True
) -> str:
value = self.kubectl(
"get",
"pods",
f"--selector={selector}",
"-o",
f"jsonpath='{json_path}'",
)
if remove_quotes:
value = value.replace("'", "")
return value
def get_pod_value(
self, cluster_id: str, json_path: str, remove_quotes: bool = True
) -> str:
return self.get_k8s_value(
f"cluster.environmentd.materialize.cloud/cluster-id={cluster_id}",
json_path,
remove_quotes,
)
def get_pod_label_value(
self, cluster_id: str, label: str, remove_quotes: bool = True
) -> str:
return self.get_pod_value(
cluster_id, "{.items[*].metadata.labels." + label + "}", remove_quotes
)
def get_cluster_node_names(self, cluster_name: str) -> list[str]:
cluster_id = self.get_cluster_id(cluster_name)
print(f"Cluster with name '{cluster_name}' has ID {cluster_id}")
value_string = self.get_pod_value(
cluster_id, "{.items[*].spec.nodeName}", remove_quotes=True
)
values = value_string.split(" ")
return values
def get_cluster_id(self, cluster_name: str) -> str:
cluster_id: str = self.environmentd.sql_query(
f"SELECT id FROM mz_clusters WHERE name = '{cluster_name}'"
)[0][0]
return cluster_id
def get_cluster_and_replica_id(self, mz_table: str, name: str) -> tuple[str, str]:
[cluster_id, replica_id] = self.environmentd.sql_query(
f"SELECT s.cluster_id, r.id FROM {mz_table} s JOIN mz_cluster_replicas r ON r.cluster_id = s.cluster_id WHERE s.name = '{name}'"
)[0]
return cluster_id, replica_id
def suspend_k8s_node(self, node_name: str) -> None:
print(f"Suspending node {node_name}...")
result = subprocess.run(
["docker", "pause", node_name], stderr=subprocess.STDOUT, text=True
)
assert result.returncode == 0, f"Got return code {result.returncode}"
print(f"Suspended node {node_name}.")
def revive_suspended_k8s_node(self, node_name: str) -> None:
print(f"Reviving node {node_name}...")
result = subprocess.run(
["docker", "unpause", node_name], stderr=subprocess.STDOUT, text=True
)
assert result.returncode == 0, f"Got return code {result.returncode}"
print(f"Node {node_name} is running again.")
Classes
class MaterializeApplication (release_mode: bool = True, tag: str | None = None, aws_region: str | None = None, log_filter: str | None = None, apply_node_selectors: bool = False)
-
Expand source code Browse git
class MaterializeApplication(CloudtestApplicationBase): def __init__( self, release_mode: bool = True, tag: str | None = None, aws_region: str | None = None, log_filter: str | None = None, apply_node_selectors: bool = False, ) -> None: self.tag = tag self.environmentd = EnvironmentdService() self.materialized_alias = MaterializedAliasService() self.testdrive = TestdrivePod( release_mode=release_mode, aws_region=aws_region, apply_node_selectors=apply_node_selectors, ) self.apply_node_selectors = apply_node_selectors super().__init__(release_mode, aws_region, log_filter) # Register the VpcEndpoint CRD. self.register_vpc_endpoint() self.start_metrics_server() self.create_resources_and_wait() def get_resources(self, log_filter: str | None) -> list[K8sResource]: return [ *cockroach_resources(apply_node_selectors=self.apply_node_selectors), *postgres_resources(apply_node_selectors=self.apply_node_selectors), *mysql_resources(apply_node_selectors=self.apply_node_selectors), *redpanda_resources(apply_node_selectors=self.apply_node_selectors), *debezium_resources(apply_node_selectors=self.apply_node_selectors), *ssh_resources(apply_node_selectors=self.apply_node_selectors), Minio(apply_node_selectors=self.apply_node_selectors), VpcEndpointsClusterRole(), AdminRoleBinding(), EnvironmentdStatefulSet( release_mode=self.release_mode, tag=self.tag, log_filter=log_filter, coverage_mode=self.coverage_mode(), apply_node_selectors=self.apply_node_selectors, ), PersistPubSubService(), self.environmentd, self.materialized_alias, self.testdrive, ] def get_images(self) -> list[str]: return ["environmentd", "clusterd", "testdrive", "postgres"] def register_vpc_endpoint(self) -> None: self.kubectl( "apply", "-f", os.path.join( os.path.abspath(self.mz_root), "src/cloud-resources/src/crd/gen/vpcendpoints.json", ), ) def start_metrics_server(self) -> None: self.kubectl( "apply", "-f", "https://github.com/kubernetes-sigs/metrics-server/releases/download/metrics-server-helm-chart-3.8.2/components.yaml", ) self.kubectl( "patch", "deployment", "metrics-server", "--namespace", "kube-system", "--type", "json", "-p", '[{"op": "add", "path": "/spec/template/spec/containers/0/args/-", "value": "--kubelet-insecure-tls" }]', ) def wait_resource_creation_completed(self) -> None: wait( condition="condition=Ready", resource="pod", label="cluster.environmentd.materialize.cloud/cluster-id=u1", ) def wait_replicas(self) -> None: for cluster_id in ("u1", "s1", "s2"): wait( condition="condition=Ready", resource="pod", label=f"cluster.environmentd.materialize.cloud/cluster-id={cluster_id}", ) def wait_for_sql(self) -> None: """Wait until environmentd pod is ready and can accept SQL connections""" wait(condition="condition=Ready", resource="pod/environmentd-0") start = datetime.now() while datetime.now() - start < timedelta(seconds=300): try: self.environmentd.sql("SELECT 1") break except InterfaceError as e: # Since we crash environmentd, we expect some errors that we swallow. LOGGER.info(f"SQL interface not ready, {e} while SELECT 1. Waiting...") time.sleep(2) def set_environmentd_failpoints(self, failpoints: str) -> None: """Set the FAILPOINTS environmentd variable in the stateful set. This will most likely restart environmentd""" stateful_set = [ resource for resource in self.resources if type(resource) == EnvironmentdStatefulSet ] assert len(stateful_set) == 1 stateful_set = stateful_set[0] stateful_set.env["FAILPOINTS"] = failpoints stateful_set.replace() self.wait_for_sql() def get_k8s_value( self, selector: str, json_path: str, remove_quotes: bool = True ) -> str: value = self.kubectl( "get", "pods", f"--selector={selector}", "-o", f"jsonpath='{json_path}'", ) if remove_quotes: value = value.replace("'", "") return value def get_pod_value( self, cluster_id: str, json_path: str, remove_quotes: bool = True ) -> str: return self.get_k8s_value( f"cluster.environmentd.materialize.cloud/cluster-id={cluster_id}", json_path, remove_quotes, ) def get_pod_label_value( self, cluster_id: str, label: str, remove_quotes: bool = True ) -> str: return self.get_pod_value( cluster_id, "{.items[*].metadata.labels." + label + "}", remove_quotes ) def get_cluster_node_names(self, cluster_name: str) -> list[str]: cluster_id = self.get_cluster_id(cluster_name) print(f"Cluster with name '{cluster_name}' has ID {cluster_id}") value_string = self.get_pod_value( cluster_id, "{.items[*].spec.nodeName}", remove_quotes=True ) values = value_string.split(" ") return values def get_cluster_id(self, cluster_name: str) -> str: cluster_id: str = self.environmentd.sql_query( f"SELECT id FROM mz_clusters WHERE name = '{cluster_name}'" )[0][0] return cluster_id def get_cluster_and_replica_id(self, mz_table: str, name: str) -> tuple[str, str]: [cluster_id, replica_id] = self.environmentd.sql_query( f"SELECT s.cluster_id, r.id FROM {mz_table} s JOIN mz_cluster_replicas r ON r.cluster_id = s.cluster_id WHERE s.name = '{name}'" )[0] return cluster_id, replica_id def suspend_k8s_node(self, node_name: str) -> None: print(f"Suspending node {node_name}...") result = subprocess.run( ["docker", "pause", node_name], stderr=subprocess.STDOUT, text=True ) assert result.returncode == 0, f"Got return code {result.returncode}" print(f"Suspended node {node_name}.") def revive_suspended_k8s_node(self, node_name: str) -> None: print(f"Reviving node {node_name}...") result = subprocess.run( ["docker", "unpause", node_name], stderr=subprocess.STDOUT, text=True ) assert result.returncode == 0, f"Got return code {result.returncode}" print(f"Node {node_name} is running again.")
Ancestors
Methods
def get_cluster_and_replica_id(self, mz_table: str, name: str) ‑> tuple[str, str]
-
Expand source code Browse git
def get_cluster_and_replica_id(self, mz_table: str, name: str) -> tuple[str, str]: [cluster_id, replica_id] = self.environmentd.sql_query( f"SELECT s.cluster_id, r.id FROM {mz_table} s JOIN mz_cluster_replicas r ON r.cluster_id = s.cluster_id WHERE s.name = '{name}'" )[0] return cluster_id, replica_id
def get_cluster_id(self, cluster_name: str) ‑> str
-
Expand source code Browse git
def get_cluster_id(self, cluster_name: str) -> str: cluster_id: str = self.environmentd.sql_query( f"SELECT id FROM mz_clusters WHERE name = '{cluster_name}'" )[0][0] return cluster_id
def get_cluster_node_names(self, cluster_name: str) ‑> list[str]
-
Expand source code Browse git
def get_cluster_node_names(self, cluster_name: str) -> list[str]: cluster_id = self.get_cluster_id(cluster_name) print(f"Cluster with name '{cluster_name}' has ID {cluster_id}") value_string = self.get_pod_value( cluster_id, "{.items[*].spec.nodeName}", remove_quotes=True ) values = value_string.split(" ") return values
def get_images(self) ‑> list[str]
-
Expand source code Browse git
def get_images(self) -> list[str]: return ["environmentd", "clusterd", "testdrive", "postgres"]
def get_k8s_value(self, selector: str, json_path: str, remove_quotes: bool = True) ‑> str
-
Expand source code Browse git
def get_k8s_value( self, selector: str, json_path: str, remove_quotes: bool = True ) -> str: value = self.kubectl( "get", "pods", f"--selector={selector}", "-o", f"jsonpath='{json_path}'", ) if remove_quotes: value = value.replace("'", "") return value
def get_pod_label_value(self, cluster_id: str, label: str, remove_quotes: bool = True) ‑> str
-
Expand source code Browse git
def get_pod_label_value( self, cluster_id: str, label: str, remove_quotes: bool = True ) -> str: return self.get_pod_value( cluster_id, "{.items[*].metadata.labels." + label + "}", remove_quotes )
def get_pod_value(self, cluster_id: str, json_path: str, remove_quotes: bool = True) ‑> str
-
Expand source code Browse git
def get_pod_value( self, cluster_id: str, json_path: str, remove_quotes: bool = True ) -> str: return self.get_k8s_value( f"cluster.environmentd.materialize.cloud/cluster-id={cluster_id}", json_path, remove_quotes, )
def get_resources(self, log_filter: str | None) ‑> list[K8sResource]
-
Expand source code Browse git
def get_resources(self, log_filter: str | None) -> list[K8sResource]: return [ *cockroach_resources(apply_node_selectors=self.apply_node_selectors), *postgres_resources(apply_node_selectors=self.apply_node_selectors), *mysql_resources(apply_node_selectors=self.apply_node_selectors), *redpanda_resources(apply_node_selectors=self.apply_node_selectors), *debezium_resources(apply_node_selectors=self.apply_node_selectors), *ssh_resources(apply_node_selectors=self.apply_node_selectors), Minio(apply_node_selectors=self.apply_node_selectors), VpcEndpointsClusterRole(), AdminRoleBinding(), EnvironmentdStatefulSet( release_mode=self.release_mode, tag=self.tag, log_filter=log_filter, coverage_mode=self.coverage_mode(), apply_node_selectors=self.apply_node_selectors, ), PersistPubSubService(), self.environmentd, self.materialized_alias, self.testdrive, ]
def register_vpc_endpoint(self) ‑> None
-
Expand source code Browse git
def register_vpc_endpoint(self) -> None: self.kubectl( "apply", "-f", os.path.join( os.path.abspath(self.mz_root), "src/cloud-resources/src/crd/gen/vpcendpoints.json", ), )
def revive_suspended_k8s_node(self, node_name: str) ‑> None
-
Expand source code Browse git
def revive_suspended_k8s_node(self, node_name: str) -> None: print(f"Reviving node {node_name}...") result = subprocess.run( ["docker", "unpause", node_name], stderr=subprocess.STDOUT, text=True ) assert result.returncode == 0, f"Got return code {result.returncode}" print(f"Node {node_name} is running again.")
def set_environmentd_failpoints(self, failpoints: str) ‑> None
-
Set the FAILPOINTS environmentd variable in the stateful set. This will most likely restart environmentd
Expand source code Browse git
def set_environmentd_failpoints(self, failpoints: str) -> None: """Set the FAILPOINTS environmentd variable in the stateful set. This will most likely restart environmentd""" stateful_set = [ resource for resource in self.resources if type(resource) == EnvironmentdStatefulSet ] assert len(stateful_set) == 1 stateful_set = stateful_set[0] stateful_set.env["FAILPOINTS"] = failpoints stateful_set.replace() self.wait_for_sql()
def start_metrics_server(self) ‑> None
-
Expand source code Browse git
def start_metrics_server(self) -> None: self.kubectl( "apply", "-f", "https://github.com/kubernetes-sigs/metrics-server/releases/download/metrics-server-helm-chart-3.8.2/components.yaml", ) self.kubectl( "patch", "deployment", "metrics-server", "--namespace", "kube-system", "--type", "json", "-p", '[{"op": "add", "path": "/spec/template/spec/containers/0/args/-", "value": "--kubelet-insecure-tls" }]', )
def suspend_k8s_node(self, node_name: str) ‑> None
-
Expand source code Browse git
def suspend_k8s_node(self, node_name: str) -> None: print(f"Suspending node {node_name}...") result = subprocess.run( ["docker", "pause", node_name], stderr=subprocess.STDOUT, text=True ) assert result.returncode == 0, f"Got return code {result.returncode}" print(f"Suspended node {node_name}.")
def wait_for_sql(self) ‑> None
-
Wait until environmentd pod is ready and can accept SQL connections
Expand source code Browse git
def wait_for_sql(self) -> None: """Wait until environmentd pod is ready and can accept SQL connections""" wait(condition="condition=Ready", resource="pod/environmentd-0") start = datetime.now() while datetime.now() - start < timedelta(seconds=300): try: self.environmentd.sql("SELECT 1") break except InterfaceError as e: # Since we crash environmentd, we expect some errors that we swallow. LOGGER.info(f"SQL interface not ready, {e} while SELECT 1. Waiting...") time.sleep(2)
def wait_replicas(self) ‑> None
-
Expand source code Browse git
def wait_replicas(self) -> None: for cluster_id in ("u1", "s1", "s2"): wait( condition="condition=Ready", resource="pod", label=f"cluster.environmentd.materialize.cloud/cluster-id={cluster_id}", )
def wait_resource_creation_completed(self) ‑> None
-
Expand source code Browse git
def wait_resource_creation_completed(self) -> None: wait( condition="condition=Ready", resource="pod", label="cluster.environmentd.materialize.cloud/cluster-id=u1", )