Module materialize.mzcompose.services.testdrive

Expand source code Browse git
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.

import random

from materialize import buildkite
from materialize.mzcompose import DEFAULT_MZ_VOLUMES
from materialize.mzcompose.service import (
    Service,
    ServiceDependency,
)


class Testdrive(Service):
    def __init__(
        self,
        name: str = "testdrive",
        mzbuild: str = "testdrive",
        materialize_url: str = "postgres://materialize@materialized:6875",
        materialize_url_internal: str = "postgres://materialize@materialized:6877",
        materialize_use_https: bool = False,
        materialize_params: dict[str, str] = {},
        kafka_url: str = "kafka:9092",
        kafka_default_partitions: int | None = None,
        kafka_args: str | None = None,
        schema_registry_url: str = "http://schema-registry:8081",
        no_reset: bool = False,
        default_timeout: str | None = None,
        seed: int | None = None,
        consistent_seed: bool = False,
        validate_catalog_store: bool = False,
        entrypoint: list[str] | None = None,
        entrypoint_extra: list[str] = [],
        environment: list[str] | None = None,
        volumes_extra: list[str] = [],
        volume_workdir: str = ".:/workdir",
        propagate_uid_gid: bool = True,
        forward_buildkite_shard: bool = False,
        aws_region: str | None = None,
        aws_endpoint: str | None = "http://minio:9000",
        aws_access_key_id: str | None = "minioadmin",
        aws_secret_access_key: str | None = "minioadmin",
        no_consistency_checks: bool = False,
        external_cockroach: bool = False,
        external_minio: bool = False,
        fivetran_destination: bool = False,
        fivetran_destination_url: str = "http://fivetran-destination:6874",
        fivetran_destination_files_path: str = "/share/tmp",
        mz_service: str = "materialized",
    ) -> None:
        depends_graph: dict[str, ServiceDependency] = {}

        if environment is None:
            environment = [
                "TMPDIR=/share/tmp",
                "MZ_SOFT_ASSERTIONS=1",
                # Please think twice before forwarding additional environment
                # variables from the host, as it's easy to write tests that are
                # then accidentally dependent on the state of the host machine.
                #
                # To pass arguments to a testdrive script, use the `--var` CLI
                # option rather than environment variables.
                "MZ_LOG_FILTER",
                "AWS_ACCESS_KEY_ID",
                "AWS_SECRET_ACCESS_KEY",
                "AWS_SESSION_TOKEN",
            ]

        volumes = [
            volume_workdir,
            *(v for v in DEFAULT_MZ_VOLUMES if v.startswith("tmp:")),
        ]
        if volumes_extra:
            volumes.extend(volumes_extra)

        if entrypoint is None:
            entrypoint = [
                "testdrive",
                f"--kafka-addr={kafka_url}",
                f"--schema-registry-url={schema_registry_url}",
                f"--materialize-url={materialize_url}",
                f"--materialize-internal-url={materialize_url_internal}",
                *(["--materialize-use-https"] if materialize_use_https else []),
            ]

        if aws_region:
            entrypoint.append(f"--aws-region={aws_region}")

        if aws_endpoint and not aws_region:
            entrypoint.append(f"--aws-endpoint={aws_endpoint}")
            entrypoint.append(f"--var=aws-endpoint={aws_endpoint}")

        if aws_access_key_id:
            entrypoint.append(f"--aws-access-key-id={aws_access_key_id}")
            entrypoint.append(f"--var=aws-access-key-id={aws_access_key_id}")

        if aws_secret_access_key:
            entrypoint.append(f"--aws-secret-access-key={aws_secret_access_key}")
            entrypoint.append(f"--var=aws-secret-access-key={aws_secret_access_key}")

        if validate_catalog_store:
            entrypoint.append("--validate-catalog-store")

        if no_reset:
            entrypoint.append("--no-reset")

        for k, v in materialize_params.items():
            entrypoint.append(f"--materialize-param={k}={v}")

        if default_timeout is None:
            default_timeout = "360s"
        entrypoint.append(f"--default-timeout={default_timeout}")

        if kafka_default_partitions:
            entrypoint.append(f"--kafka-default-partitions={kafka_default_partitions}")

        if forward_buildkite_shard:
            shard = buildkite.get_parallelism_index()
            shard_count = buildkite.get_parallelism_count()
            entrypoint += [f"--shard={shard}", f"--shard-count={shard_count}"]

        if seed is not None and consistent_seed:
            raise RuntimeError("Can't pass `seed` and `consistent_seed` at same time")
        elif consistent_seed:
            entrypoint.append(f"--seed={random.getrandbits(32)}")
        elif seed is not None:
            entrypoint.append(f"--seed={seed}")

        if no_consistency_checks:
            entrypoint.append("--consistency-checks=disable")

        if fivetran_destination:
            depends_graph["fivetran-destination"] = {"condition": "service_started"}
            entrypoint.append(f"--fivetran-destination-url={fivetran_destination_url}")
            entrypoint.append(
                f"--fivetran-destination-files-path={fivetran_destination_files_path}"
            )

        if external_minio:
            depends_graph["minio"] = {"condition": "service_healthy"}
            persist_blob_url = "s3://minioadmin:minioadmin@persist/persist?endpoint=http://minio:9000/&region=minio"
            entrypoint.append(f"--persist-blob-url={persist_blob_url}")
        else:
            entrypoint.append("--persist-blob-url=file:///mzdata/persist/blob")

        if external_cockroach:
            depends_graph["cockroach"] = {"condition": "service_healthy"}
            entrypoint.append(
                "--persist-consensus-url=postgres://root@cockroach:26257?options=--search_path=consensus"
            )
        else:
            entrypoint.append(
                f"--persist-consensus-url=postgres://root@{mz_service}:26257?options=--search_path=consensus"
            )

        entrypoint.extend(entrypoint_extra)

        super().__init__(
            name=name,
            config={
                "depends_on": depends_graph,
                "mzbuild": mzbuild,
                "entrypoint": entrypoint,
                "environment": environment,
                "volumes": volumes,
                "propagate_uid_gid": propagate_uid_gid,
                "init": True,
            },
        )

Classes

class Testdrive (name: str = 'testdrive', mzbuild: str = 'testdrive', materialize_url: str = 'postgres://materialize@materialized:6875', materialize_url_internal: str = 'postgres://materialize@materialized:6877', materialize_use_https: bool = False, materialize_params: dict[str, str] = {}, kafka_url: str = 'kafka:9092', kafka_default_partitions: int | None = None, kafka_args: str | None = None, schema_registry_url: str = 'http://schema-registry:8081', no_reset: bool = False, default_timeout: str | None = None, seed: int | None = None, consistent_seed: bool = False, validate_catalog_store: bool = False, entrypoint: list[str] | None = None, entrypoint_extra: list[str] = [], environment: list[str] | None = None, volumes_extra: list[str] = [], volume_workdir: str = '.:/workdir', propagate_uid_gid: bool = True, forward_buildkite_shard: bool = False, aws_region: str | None = None, aws_endpoint: str | None = 'http://minio:9000', aws_access_key_id: str | None = 'minioadmin', aws_secret_access_key: str | None = 'minioadmin', no_consistency_checks: bool = False, external_cockroach: bool = False, external_minio: bool = False, fivetran_destination: bool = False, fivetran_destination_url: str = 'http://fivetran-destination:6874', fivetran_destination_files_path: str = '/share/tmp', mz_service: str = 'materialized')

A Docker Compose service in a Composition.

Attributes

name
The name of the service.
config
The definition of the service.
Expand source code Browse git
class Testdrive(Service):
    def __init__(
        self,
        name: str = "testdrive",
        mzbuild: str = "testdrive",
        materialize_url: str = "postgres://materialize@materialized:6875",
        materialize_url_internal: str = "postgres://materialize@materialized:6877",
        materialize_use_https: bool = False,
        materialize_params: dict[str, str] = {},
        kafka_url: str = "kafka:9092",
        kafka_default_partitions: int | None = None,
        kafka_args: str | None = None,
        schema_registry_url: str = "http://schema-registry:8081",
        no_reset: bool = False,
        default_timeout: str | None = None,
        seed: int | None = None,
        consistent_seed: bool = False,
        validate_catalog_store: bool = False,
        entrypoint: list[str] | None = None,
        entrypoint_extra: list[str] = [],
        environment: list[str] | None = None,
        volumes_extra: list[str] = [],
        volume_workdir: str = ".:/workdir",
        propagate_uid_gid: bool = True,
        forward_buildkite_shard: bool = False,
        aws_region: str | None = None,
        aws_endpoint: str | None = "http://minio:9000",
        aws_access_key_id: str | None = "minioadmin",
        aws_secret_access_key: str | None = "minioadmin",
        no_consistency_checks: bool = False,
        external_cockroach: bool = False,
        external_minio: bool = False,
        fivetran_destination: bool = False,
        fivetran_destination_url: str = "http://fivetran-destination:6874",
        fivetran_destination_files_path: str = "/share/tmp",
        mz_service: str = "materialized",
    ) -> None:
        depends_graph: dict[str, ServiceDependency] = {}

        if environment is None:
            environment = [
                "TMPDIR=/share/tmp",
                "MZ_SOFT_ASSERTIONS=1",
                # Please think twice before forwarding additional environment
                # variables from the host, as it's easy to write tests that are
                # then accidentally dependent on the state of the host machine.
                #
                # To pass arguments to a testdrive script, use the `--var` CLI
                # option rather than environment variables.
                "MZ_LOG_FILTER",
                "AWS_ACCESS_KEY_ID",
                "AWS_SECRET_ACCESS_KEY",
                "AWS_SESSION_TOKEN",
            ]

        volumes = [
            volume_workdir,
            *(v for v in DEFAULT_MZ_VOLUMES if v.startswith("tmp:")),
        ]
        if volumes_extra:
            volumes.extend(volumes_extra)

        if entrypoint is None:
            entrypoint = [
                "testdrive",
                f"--kafka-addr={kafka_url}",
                f"--schema-registry-url={schema_registry_url}",
                f"--materialize-url={materialize_url}",
                f"--materialize-internal-url={materialize_url_internal}",
                *(["--materialize-use-https"] if materialize_use_https else []),
            ]

        if aws_region:
            entrypoint.append(f"--aws-region={aws_region}")

        if aws_endpoint and not aws_region:
            entrypoint.append(f"--aws-endpoint={aws_endpoint}")
            entrypoint.append(f"--var=aws-endpoint={aws_endpoint}")

        if aws_access_key_id:
            entrypoint.append(f"--aws-access-key-id={aws_access_key_id}")
            entrypoint.append(f"--var=aws-access-key-id={aws_access_key_id}")

        if aws_secret_access_key:
            entrypoint.append(f"--aws-secret-access-key={aws_secret_access_key}")
            entrypoint.append(f"--var=aws-secret-access-key={aws_secret_access_key}")

        if validate_catalog_store:
            entrypoint.append("--validate-catalog-store")

        if no_reset:
            entrypoint.append("--no-reset")

        for k, v in materialize_params.items():
            entrypoint.append(f"--materialize-param={k}={v}")

        if default_timeout is None:
            default_timeout = "360s"
        entrypoint.append(f"--default-timeout={default_timeout}")

        if kafka_default_partitions:
            entrypoint.append(f"--kafka-default-partitions={kafka_default_partitions}")

        if forward_buildkite_shard:
            shard = buildkite.get_parallelism_index()
            shard_count = buildkite.get_parallelism_count()
            entrypoint += [f"--shard={shard}", f"--shard-count={shard_count}"]

        if seed is not None and consistent_seed:
            raise RuntimeError("Can't pass `seed` and `consistent_seed` at same time")
        elif consistent_seed:
            entrypoint.append(f"--seed={random.getrandbits(32)}")
        elif seed is not None:
            entrypoint.append(f"--seed={seed}")

        if no_consistency_checks:
            entrypoint.append("--consistency-checks=disable")

        if fivetran_destination:
            depends_graph["fivetran-destination"] = {"condition": "service_started"}
            entrypoint.append(f"--fivetran-destination-url={fivetran_destination_url}")
            entrypoint.append(
                f"--fivetran-destination-files-path={fivetran_destination_files_path}"
            )

        if external_minio:
            depends_graph["minio"] = {"condition": "service_healthy"}
            persist_blob_url = "s3://minioadmin:minioadmin@persist/persist?endpoint=http://minio:9000/&region=minio"
            entrypoint.append(f"--persist-blob-url={persist_blob_url}")
        else:
            entrypoint.append("--persist-blob-url=file:///mzdata/persist/blob")

        if external_cockroach:
            depends_graph["cockroach"] = {"condition": "service_healthy"}
            entrypoint.append(
                "--persist-consensus-url=postgres://root@cockroach:26257?options=--search_path=consensus"
            )
        else:
            entrypoint.append(
                f"--persist-consensus-url=postgres://root@{mz_service}:26257?options=--search_path=consensus"
            )

        entrypoint.extend(entrypoint_extra)

        super().__init__(
            name=name,
            config={
                "depends_on": depends_graph,
                "mzbuild": mzbuild,
                "entrypoint": entrypoint,
                "environment": environment,
                "volumes": volumes,
                "propagate_uid_gid": propagate_uid_gid,
                "init": True,
            },
        )

Ancestors