Module materialize.cloudtest.application

Expand source code Browse git
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.

import os
import subprocess
import time
from datetime import datetime, timedelta
from typing import List, Optional

from pg8000.exceptions import InterfaceError

from materialize import ROOT, mzbuild
from materialize.cloudtest.k8s import K8sResource
from materialize.cloudtest.k8s.cockroach import COCKROACH_RESOURCES
from materialize.cloudtest.k8s.debezium import DEBEZIUM_RESOURCES
from materialize.cloudtest.k8s.environmentd import (
    EnvironmentdService,
    EnvironmentdStatefulSet,
)
from materialize.cloudtest.k8s.minio import Minio
from materialize.cloudtest.k8s.postgres import POSTGRES_RESOURCES
from materialize.cloudtest.k8s.redpanda import REDPANDA_RESOURCES
from materialize.cloudtest.k8s.role_binding import AdminRoleBinding
from materialize.cloudtest.k8s.ssh import SSH_RESOURCES
from materialize.cloudtest.k8s.testdrive import Testdrive
from materialize.cloudtest.k8s.vpc_endpoints_cluster_role import VpcEndpointsClusterRole
from materialize.cloudtest.wait import wait


class Application:
    resources: List[K8sResource]
    images: List[str]
    release_mode: bool
    aws_region: Optional[str]

    def __init__(self) -> None:
        self.create()

    def create(self) -> None:
        self.acquire_images()
        for resource in self.resources:
            resource.create()

    def acquire_images(self) -> None:
        repo = mzbuild.Repository(ROOT, release_mode=self.release_mode)
        for image in self.images:
            deps = repo.resolve_dependencies([repo.images[image]])
            deps.acquire()
            for dep in deps:
                subprocess.check_call(
                    [
                        "kind",
                        "load",
                        "docker-image",
                        "--name=cloudtest",
                        dep.spec(),
                    ]
                )

    def kubectl(self, *args: str) -> str:
        return subprocess.check_output(
            ["kubectl", "--context", self.context(), *args]
        ).decode("ascii")

    def context(self) -> str:
        return "kind-cloudtest"


class MaterializeApplication(Application):
    def __init__(
        self,
        release_mode: bool = True,
        tag: Optional[str] = None,
        aws_region: Optional[str] = None,
        log_filter: Optional[str] = None,
    ) -> None:
        self.environmentd = EnvironmentdService()
        self.testdrive = Testdrive(release_mode=release_mode, aws_region=aws_region)
        self.release_mode = release_mode
        self.aws_region = aws_region

        # Register the VpcEndpoint CRD.
        self.kubectl(
            "apply",
            "-f",
            os.path.join(
                os.path.abspath(ROOT),
                "src/cloud-resources/src/crd/gen/vpcendpoints.json",
            ),
        )

        # Start metrics-server.
        self.kubectl(
            "apply",
            "-f",
            "https://github.com/kubernetes-sigs/metrics-server/releases/download/metrics-server-helm-chart-3.8.2/components.yaml",
        )

        self.kubectl(
            "patch",
            "deployment",
            "metrics-server",
            "--namespace",
            "kube-system",
            "--type",
            "json",
            "-p",
            '[{"op": "add", "path": "/spec/template/spec/containers/0/args/-", "value": "--kubelet-insecure-tls" }]',
        )

        self.resources = [
            *COCKROACH_RESOURCES,
            *POSTGRES_RESOURCES,
            *REDPANDA_RESOURCES,
            *DEBEZIUM_RESOURCES,
            *SSH_RESOURCES,
            Minio(),
            VpcEndpointsClusterRole(),
            AdminRoleBinding(),
            EnvironmentdStatefulSet(
                release_mode=release_mode, tag=tag, log_filter=log_filter
            ),
            self.environmentd,
            self.testdrive,
        ]

        self.images = ["environmentd", "clusterd", "testdrive", "postgres"]

        # Label the kind nodes in a way that mimics production.
        for node in [
            "cloudtest-control-plane",
            "cloudtest-worker",
            "cloudtest-worker2",
            "cloudtest-worker3",
        ]:
            self.kubectl(
                "label",
                "--overwrite",
                f"node/{node}",
                f"materialize.cloud/availability-zone={node}",
            )

        super().__init__()

    def create(self) -> None:
        super().create()
        wait(condition="condition=Ready", resource="pod/cluster-u1-replica-1-0")

    def wait_replicas(self) -> None:
        # NOTE[btv] - This will need to change if the order of
        # creating clusters/replicas changes, but it seemed fine to
        # assume this order, since we already assume it in `create`.
        wait(condition="condition=Ready", resource="pod/cluster-u1-replica-1-0")
        wait(condition="condition=Ready", resource="pod/cluster-s1-replica-2-0")
        wait(condition="condition=Ready", resource="pod/cluster-s2-replica-3-0")

    def wait_for_sql(self) -> None:
        """Wait until environmentd pod is ready and can accept SQL connections"""
        wait(condition="condition=Ready", resource="pod/environmentd-0")

        start = datetime.now()
        while datetime.now() - start < timedelta(seconds=300):
            try:
                self.environmentd.sql("SELECT 1")
                break
            except InterfaceError as e:
                # Since we crash environmentd, we expect some errors that we swallow.
                print(f"SQL interface not ready, {e} while SELECT 1. Waiting...")
                time.sleep(2)

    def set_environmentd_failpoints(self, failpoints: str) -> None:
        """Set the FAILPOINTS environmentd variable in the stateful set. This
        will most likely restart environmentd"""
        stateful_set = [
            resource
            for resource in self.resources
            if type(resource) == EnvironmentdStatefulSet
        ]
        assert len(stateful_set) == 1
        stateful_set = stateful_set[0]

        stateful_set.env["FAILPOINTS"] = failpoints
        stateful_set.replace()
        self.wait_for_sql()

Classes

class Application
Expand source code Browse git
class Application:
    resources: List[K8sResource]
    images: List[str]
    release_mode: bool
    aws_region: Optional[str]

    def __init__(self) -> None:
        self.create()

    def create(self) -> None:
        self.acquire_images()
        for resource in self.resources:
            resource.create()

    def acquire_images(self) -> None:
        repo = mzbuild.Repository(ROOT, release_mode=self.release_mode)
        for image in self.images:
            deps = repo.resolve_dependencies([repo.images[image]])
            deps.acquire()
            for dep in deps:
                subprocess.check_call(
                    [
                        "kind",
                        "load",
                        "docker-image",
                        "--name=cloudtest",
                        dep.spec(),
                    ]
                )

    def kubectl(self, *args: str) -> str:
        return subprocess.check_output(
            ["kubectl", "--context", self.context(), *args]
        ).decode("ascii")

    def context(self) -> str:
        return "kind-cloudtest"

Subclasses

Class variables

var aws_region : Optional[str]
var images : List[str]
var release_mode : bool
var resources : List[K8sResource]

Methods

def acquire_images(self) ‑> None
Expand source code Browse git
def acquire_images(self) -> None:
    repo = mzbuild.Repository(ROOT, release_mode=self.release_mode)
    for image in self.images:
        deps = repo.resolve_dependencies([repo.images[image]])
        deps.acquire()
        for dep in deps:
            subprocess.check_call(
                [
                    "kind",
                    "load",
                    "docker-image",
                    "--name=cloudtest",
                    dep.spec(),
                ]
            )
def context(self) ‑> str
Expand source code Browse git
def context(self) -> str:
    return "kind-cloudtest"
def create(self) ‑> None
Expand source code Browse git
def create(self) -> None:
    self.acquire_images()
    for resource in self.resources:
        resource.create()
def kubectl(self, *args: str) ‑> str
Expand source code Browse git
def kubectl(self, *args: str) -> str:
    return subprocess.check_output(
        ["kubectl", "--context", self.context(), *args]
    ).decode("ascii")
class MaterializeApplication (release_mode: bool = True, tag: Optional[str] = None, aws_region: Optional[str] = None, log_filter: Optional[str] = None)
Expand source code Browse git
class MaterializeApplication(Application):
    def __init__(
        self,
        release_mode: bool = True,
        tag: Optional[str] = None,
        aws_region: Optional[str] = None,
        log_filter: Optional[str] = None,
    ) -> None:
        self.environmentd = EnvironmentdService()
        self.testdrive = Testdrive(release_mode=release_mode, aws_region=aws_region)
        self.release_mode = release_mode
        self.aws_region = aws_region

        # Register the VpcEndpoint CRD.
        self.kubectl(
            "apply",
            "-f",
            os.path.join(
                os.path.abspath(ROOT),
                "src/cloud-resources/src/crd/gen/vpcendpoints.json",
            ),
        )

        # Start metrics-server.
        self.kubectl(
            "apply",
            "-f",
            "https://github.com/kubernetes-sigs/metrics-server/releases/download/metrics-server-helm-chart-3.8.2/components.yaml",
        )

        self.kubectl(
            "patch",
            "deployment",
            "metrics-server",
            "--namespace",
            "kube-system",
            "--type",
            "json",
            "-p",
            '[{"op": "add", "path": "/spec/template/spec/containers/0/args/-", "value": "--kubelet-insecure-tls" }]',
        )

        self.resources = [
            *COCKROACH_RESOURCES,
            *POSTGRES_RESOURCES,
            *REDPANDA_RESOURCES,
            *DEBEZIUM_RESOURCES,
            *SSH_RESOURCES,
            Minio(),
            VpcEndpointsClusterRole(),
            AdminRoleBinding(),
            EnvironmentdStatefulSet(
                release_mode=release_mode, tag=tag, log_filter=log_filter
            ),
            self.environmentd,
            self.testdrive,
        ]

        self.images = ["environmentd", "clusterd", "testdrive", "postgres"]

        # Label the kind nodes in a way that mimics production.
        for node in [
            "cloudtest-control-plane",
            "cloudtest-worker",
            "cloudtest-worker2",
            "cloudtest-worker3",
        ]:
            self.kubectl(
                "label",
                "--overwrite",
                f"node/{node}",
                f"materialize.cloud/availability-zone={node}",
            )

        super().__init__()

    def create(self) -> None:
        super().create()
        wait(condition="condition=Ready", resource="pod/cluster-u1-replica-1-0")

    def wait_replicas(self) -> None:
        # NOTE[btv] - This will need to change if the order of
        # creating clusters/replicas changes, but it seemed fine to
        # assume this order, since we already assume it in `create`.
        wait(condition="condition=Ready", resource="pod/cluster-u1-replica-1-0")
        wait(condition="condition=Ready", resource="pod/cluster-s1-replica-2-0")
        wait(condition="condition=Ready", resource="pod/cluster-s2-replica-3-0")

    def wait_for_sql(self) -> None:
        """Wait until environmentd pod is ready and can accept SQL connections"""
        wait(condition="condition=Ready", resource="pod/environmentd-0")

        start = datetime.now()
        while datetime.now() - start < timedelta(seconds=300):
            try:
                self.environmentd.sql("SELECT 1")
                break
            except InterfaceError as e:
                # Since we crash environmentd, we expect some errors that we swallow.
                print(f"SQL interface not ready, {e} while SELECT 1. Waiting...")
                time.sleep(2)

    def set_environmentd_failpoints(self, failpoints: str) -> None:
        """Set the FAILPOINTS environmentd variable in the stateful set. This
        will most likely restart environmentd"""
        stateful_set = [
            resource
            for resource in self.resources
            if type(resource) == EnvironmentdStatefulSet
        ]
        assert len(stateful_set) == 1
        stateful_set = stateful_set[0]

        stateful_set.env["FAILPOINTS"] = failpoints
        stateful_set.replace()
        self.wait_for_sql()

Ancestors

Methods

def create(self) ‑> None
Expand source code Browse git
def create(self) -> None:
    super().create()
    wait(condition="condition=Ready", resource="pod/cluster-u1-replica-1-0")
def set_environmentd_failpoints(self, failpoints: str) ‑> None

Set the FAILPOINTS environmentd variable in the stateful set. This will most likely restart environmentd

Expand source code Browse git
def set_environmentd_failpoints(self, failpoints: str) -> None:
    """Set the FAILPOINTS environmentd variable in the stateful set. This
    will most likely restart environmentd"""
    stateful_set = [
        resource
        for resource in self.resources
        if type(resource) == EnvironmentdStatefulSet
    ]
    assert len(stateful_set) == 1
    stateful_set = stateful_set[0]

    stateful_set.env["FAILPOINTS"] = failpoints
    stateful_set.replace()
    self.wait_for_sql()
def wait_for_sql(self) ‑> None

Wait until environmentd pod is ready and can accept SQL connections

Expand source code Browse git
def wait_for_sql(self) -> None:
    """Wait until environmentd pod is ready and can accept SQL connections"""
    wait(condition="condition=Ready", resource="pod/environmentd-0")

    start = datetime.now()
    while datetime.now() - start < timedelta(seconds=300):
        try:
            self.environmentd.sql("SELECT 1")
            break
        except InterfaceError as e:
            # Since we crash environmentd, we expect some errors that we swallow.
            print(f"SQL interface not ready, {e} while SELECT 1. Waiting...")
            time.sleep(2)
def wait_replicas(self) ‑> None
Expand source code Browse git
def wait_replicas(self) -> None:
    # NOTE[btv] - This will need to change if the order of
    # creating clusters/replicas changes, but it seemed fine to
    # assume this order, since we already assume it in `create`.
    wait(condition="condition=Ready", resource="pod/cluster-u1-replica-1-0")
    wait(condition="condition=Ready", resource="pod/cluster-s1-replica-2-0")
    wait(condition="condition=Ready", resource="pod/cluster-s2-replica-3-0")