Module materialize.cloudtest.k8s.testdrive

Expand source code Browse git
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.

import os
from inspect import Traceback

from kubernetes.client import V1Container, V1EnvVar, V1ObjectMeta, V1Pod, V1PodSpec

from materialize.cloudtest import DEFAULT_K8S_NAMESPACE
from materialize.cloudtest.k8s.api.k8s_pod import K8sPod


class TestdriveBase:
    def __init__(
        self,
        aws_region: str | None = None,
        materialize_url: str | None = None,
        materialize_internal_url: str | None = None,
        kafka_addr: str | None = None,
        schema_registry_url: str | None = None,
    ) -> None:
        self.aws_region = aws_region
        self.materialize_url = (
            materialize_url
            or "postgres://materialize:materialize@environmentd:6875/materialize"
        )
        self.materialize_internal_url = (
            materialize_internal_url
            or "postgres://mz_system@environmentd:6877/materialize"
        )
        self.kafka_addr = kafka_addr or "redpanda:9092"
        self.schema_registry_url = schema_registry_url or "http://redpanda:8081"

    def run(
        self,
        *args: str,
        input: str | None = None,
        no_reset: bool = False,
        seed: int | None = None,
        caller: Traceback | None = None,
        default_timeout: str = "300s",
        kafka_options: str | None = None,
        log_filter: str = "off",
    ) -> None:
        command: list[str] = [
            "testdrive",
            f"--materialize-url={self.materialize_url}",
            f"--materialize-internal-url={self.materialize_internal_url}",
            f"--kafka-addr={self.kafka_addr}",
            f"--schema-registry-url={self.schema_registry_url}",
            f"--default-timeout={default_timeout}",
            f"--log-filter={log_filter}",
            "--var=replicas=1",
            "--var=default-storage-size=1",
            "--var=default-replica-size=1",
            *([f"--aws-region={self.aws_region}"] if self.aws_region else []),
            # S3 sources are not compatible with Minio unfortunately
            # f"--aws-endpoint=http://minio-service.{self.namespace()}:9000",
            # "--aws-access-key-id=minio",
            # "--aws-secret-access-key=minio123",
            *(["--no-reset"] if no_reset else []),
            *([f"--seed={seed}"] if seed else []),
            *([f"--source={caller.filename}:{caller.lineno}"] if caller else []),
            *([f"--kafka-option={kafka_options}"] if kafka_options else []),
            *args,
        ]

        self._run_internal(
            command,
            input,
        )

    def _run_internal(self, command: list[str], input: str | None = None) -> None:
        raise NotImplementedError


class TestdrivePod(K8sPod, TestdriveBase):
    def __init__(
        self,
        release_mode: bool,
        aws_region: str | None = None,
        namespace: str = DEFAULT_K8S_NAMESPACE,
        materialize_url: str | None = None,
        materialize_internal_url: str | None = None,
        kafka_addr: str | None = None,
        schema_registry_url: str | None = None,
    ) -> None:
        K8sPod.__init__(self, namespace)
        TestdriveBase.__init__(
            self,
            aws_region=aws_region,
            materialize_url=materialize_url,
            materialize_internal_url=materialize_internal_url,
            kafka_addr=kafka_addr,
            schema_registry_url=schema_registry_url,
        )

        metadata = V1ObjectMeta(name="testdrive", namespace=namespace)

        # Pass through AWS credentials from the host
        env = [
            V1EnvVar(name=var, value=os.environ.get(var))
            for var in [
                "AWS_ACCESS_KEY_ID",
                "AWS_SECRET_ACCESS_KEY",
                "AWS_SESSION_TOKEN",
            ]
        ]

        container = V1Container(
            name="testdrive",
            image=self.image("testdrive", release_mode=release_mode),
            command=["sleep", "infinity"],
            env=env,
        )

        pod_spec = V1PodSpec(containers=[container])
        self.pod = V1Pod(metadata=metadata, spec=pod_spec)

    def _run_internal(self, command: list[str], input: str | None = None) -> None:
        self.wait(condition="condition=Ready", resource="pod/testdrive")
        self.kubectl(
            "exec",
            "-it",
            "testdrive",
            "--",
            *command,
            input=input,
        )

Classes

class TestdriveBase (aws_region: str | None = None, materialize_url: str | None = None, materialize_internal_url: str | None = None, kafka_addr: str | None = None, schema_registry_url: str | None = None)
Expand source code Browse git
class TestdriveBase:
    def __init__(
        self,
        aws_region: str | None = None,
        materialize_url: str | None = None,
        materialize_internal_url: str | None = None,
        kafka_addr: str | None = None,
        schema_registry_url: str | None = None,
    ) -> None:
        self.aws_region = aws_region
        self.materialize_url = (
            materialize_url
            or "postgres://materialize:materialize@environmentd:6875/materialize"
        )
        self.materialize_internal_url = (
            materialize_internal_url
            or "postgres://mz_system@environmentd:6877/materialize"
        )
        self.kafka_addr = kafka_addr or "redpanda:9092"
        self.schema_registry_url = schema_registry_url or "http://redpanda:8081"

    def run(
        self,
        *args: str,
        input: str | None = None,
        no_reset: bool = False,
        seed: int | None = None,
        caller: Traceback | None = None,
        default_timeout: str = "300s",
        kafka_options: str | None = None,
        log_filter: str = "off",
    ) -> None:
        command: list[str] = [
            "testdrive",
            f"--materialize-url={self.materialize_url}",
            f"--materialize-internal-url={self.materialize_internal_url}",
            f"--kafka-addr={self.kafka_addr}",
            f"--schema-registry-url={self.schema_registry_url}",
            f"--default-timeout={default_timeout}",
            f"--log-filter={log_filter}",
            "--var=replicas=1",
            "--var=default-storage-size=1",
            "--var=default-replica-size=1",
            *([f"--aws-region={self.aws_region}"] if self.aws_region else []),
            # S3 sources are not compatible with Minio unfortunately
            # f"--aws-endpoint=http://minio-service.{self.namespace()}:9000",
            # "--aws-access-key-id=minio",
            # "--aws-secret-access-key=minio123",
            *(["--no-reset"] if no_reset else []),
            *([f"--seed={seed}"] if seed else []),
            *([f"--source={caller.filename}:{caller.lineno}"] if caller else []),
            *([f"--kafka-option={kafka_options}"] if kafka_options else []),
            *args,
        ]

        self._run_internal(
            command,
            input,
        )

    def _run_internal(self, command: list[str], input: str | None = None) -> None:
        raise NotImplementedError

Subclasses

Methods

def run(self, *args: str, input: str | None = None, no_reset: bool = False, seed: int | None = None, caller: inspect.Traceback | None = None, default_timeout: str = '300s', kafka_options: str | None = None, log_filter: str = 'off') ‑> None
Expand source code Browse git
def run(
    self,
    *args: str,
    input: str | None = None,
    no_reset: bool = False,
    seed: int | None = None,
    caller: Traceback | None = None,
    default_timeout: str = "300s",
    kafka_options: str | None = None,
    log_filter: str = "off",
) -> None:
    command: list[str] = [
        "testdrive",
        f"--materialize-url={self.materialize_url}",
        f"--materialize-internal-url={self.materialize_internal_url}",
        f"--kafka-addr={self.kafka_addr}",
        f"--schema-registry-url={self.schema_registry_url}",
        f"--default-timeout={default_timeout}",
        f"--log-filter={log_filter}",
        "--var=replicas=1",
        "--var=default-storage-size=1",
        "--var=default-replica-size=1",
        *([f"--aws-region={self.aws_region}"] if self.aws_region else []),
        # S3 sources are not compatible with Minio unfortunately
        # f"--aws-endpoint=http://minio-service.{self.namespace()}:9000",
        # "--aws-access-key-id=minio",
        # "--aws-secret-access-key=minio123",
        *(["--no-reset"] if no_reset else []),
        *([f"--seed={seed}"] if seed else []),
        *([f"--source={caller.filename}:{caller.lineno}"] if caller else []),
        *([f"--kafka-option={kafka_options}"] if kafka_options else []),
        *args,
    ]

    self._run_internal(
        command,
        input,
    )
class TestdrivePod (release_mode: bool, aws_region: str | None = None, namespace: str = 'default', materialize_url: str | None = None, materialize_internal_url: str | None = None, kafka_addr: str | None = None, schema_registry_url: str | None = None)
Expand source code Browse git
class TestdrivePod(K8sPod, TestdriveBase):
    def __init__(
        self,
        release_mode: bool,
        aws_region: str | None = None,
        namespace: str = DEFAULT_K8S_NAMESPACE,
        materialize_url: str | None = None,
        materialize_internal_url: str | None = None,
        kafka_addr: str | None = None,
        schema_registry_url: str | None = None,
    ) -> None:
        K8sPod.__init__(self, namespace)
        TestdriveBase.__init__(
            self,
            aws_region=aws_region,
            materialize_url=materialize_url,
            materialize_internal_url=materialize_internal_url,
            kafka_addr=kafka_addr,
            schema_registry_url=schema_registry_url,
        )

        metadata = V1ObjectMeta(name="testdrive", namespace=namespace)

        # Pass through AWS credentials from the host
        env = [
            V1EnvVar(name=var, value=os.environ.get(var))
            for var in [
                "AWS_ACCESS_KEY_ID",
                "AWS_SECRET_ACCESS_KEY",
                "AWS_SESSION_TOKEN",
            ]
        ]

        container = V1Container(
            name="testdrive",
            image=self.image("testdrive", release_mode=release_mode),
            command=["sleep", "infinity"],
            env=env,
        )

        pod_spec = V1PodSpec(containers=[container])
        self.pod = V1Pod(metadata=metadata, spec=pod_spec)

    def _run_internal(self, command: list[str], input: str | None = None) -> None:
        self.wait(condition="condition=Ready", resource="pod/testdrive")
        self.kubectl(
            "exec",
            "-it",
            "testdrive",
            "--",
            *command,
            input=input,
        )

Ancestors