Module materialize.mzcompose.services.materialized

Expand source code Browse git
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.


from copy import copy

from materialize.mz_version import MzVersion
from materialize.mzcompose import (
    DEFAULT_CRDB_ENVIRONMENT,
    DEFAULT_MZ_ENVIRONMENT_ID,
    DEFAULT_MZ_VOLUMES,
    DEFAULT_SYSTEM_PARAMETERS,
)
from materialize.mzcompose.service import (
    Service,
    ServiceConfig,
    ServiceDependency,
)
from materialize.mzcompose.services.minio import minio_blob_uri


class Materialized(Service):
    class Size:
        DEFAULT_SIZE = 4

    def __init__(
        self,
        name: str | None = None,
        image: str | None = None,
        environment_extra: list[str] = [],
        volumes_extra: list[str] = [],
        depends_on: list[str] = [],
        memory: str | None = None,
        options: list[str] = [],
        persist_blob_url: str | None = None,
        default_size: int = Size.DEFAULT_SIZE,
        environment_id: str | None = None,
        propagate_crashes: bool = True,
        external_cockroach: str | bool = False,
        external_minio: str | bool = False,
        unsafe_mode: bool = True,
        restart: str | None = None,
        use_default_volumes: bool = True,
        ports: list[str] | None = None,
        system_parameter_defaults: dict[str, str] | None = None,
        additional_system_parameter_defaults: dict[str, str] | None = None,
        soft_assertions: bool = True,
        sanity_restart: bool = True,
        platform: str | None = None,
        healthcheck: list[str] | None = None,
    ) -> None:
        if name is None:
            name = "materialized"

        if healthcheck is None:
            healthcheck = ["CMD", "curl", "-f", "localhost:6878/api/readyz"]

        depends_graph: dict[str, ServiceDependency] = {
            s: {"condition": "service_started"} for s in depends_on
        }

        environment = [
            f"MZ_SOFT_ASSERTIONS={int(soft_assertions)}",
            # The following settings can not be baked in the default image, as they
            # are enabled for testing purposes only
            "MZ_ORCHESTRATOR_PROCESS_TCP_PROXY_LISTEN_ADDR=0.0.0.0",
            "MZ_ORCHESTRATOR_PROCESS_PROMETHEUS_SERVICE_DISCOVERY_DIRECTORY=/mzdata/prometheus",
            "MZ_BOOTSTRAP_ROLE=materialize",
            "MZ_INTERNAL_PERSIST_PUBSUB_LISTEN_ADDR=0.0.0.0:6879",
            "MZ_AWS_CONNECTION_ROLE_ARN=arn:aws:iam::123456789000:role/MaterializeConnection",
            "MZ_AWS_EXTERNAL_ID_PREFIX=eb5cb59b-e2fe-41f3-87ca-d2176a495345",
            # Always use the persist catalog if the version has multiple implementations.
            "MZ_CATALOG_STORE=persist",
            # Please think twice before forwarding additional environment
            # variables from the host, as it's easy to write tests that are
            # then accidentally dependent on the state of the host machine.
            #
            # To dynamically change the environment during a workflow run,
            # use Composition.override.
            "MZ_LOG_FILTER",
            "CLUSTERD_LOG_FILTER",
            *environment_extra,
            *DEFAULT_CRDB_ENVIRONMENT,
        ]

        if system_parameter_defaults is None:
            # Has to be copied so we later don't modify the
            # DEFAULT_SYSTEM_PARAMETERS dictionary
            system_parameter_defaults = copy(DEFAULT_SYSTEM_PARAMETERS)

        if additional_system_parameter_defaults is not None:
            system_parameter_defaults.update(additional_system_parameter_defaults)

        if len(system_parameter_defaults) > 0:
            environment += [
                "MZ_SYSTEM_PARAMETER_DEFAULT="
                + ";".join(
                    f"{key}={value}" for key, value in system_parameter_defaults.items()
                )
            ]

        command = []

        if unsafe_mode:
            command += ["--unsafe-mode"]

        if not environment_id:
            environment_id = DEFAULT_MZ_ENVIRONMENT_ID
        command += [f"--environment-id={environment_id}"]

        if external_minio:
            depends_graph["minio"] = {"condition": "service_healthy"}
            address = "minio" if external_minio == True else external_minio
            persist_blob_url = minio_blob_uri(address)

        if persist_blob_url:
            command.append(f"--persist-blob-url={persist_blob_url}")

        if propagate_crashes:
            command += ["--orchestrator-process-propagate-crashes"]

        self.default_storage_size = (
            str(default_size)
            if image
            and "latest" not in image
            and "devel" not in image
            and "unstable" not in image
            and MzVersion.parse_mz(image.split(":")[1]) < MzVersion.parse_mz("v0.41.0")
            else "1"
            if default_size == 1
            else f"{default_size}-1"
        )

        self.default_replica_size = (
            "1" if default_size == 1 else f"{default_size}-{default_size}"
        )
        command += [
            # Issue #15858 prevents the habitual use of large introspection
            # clusters, so we leave the builtin cluster replica size as is.
            # f"--bootstrap-builtin-cluster-replica-size={self.default_replica_size}",
            f"--bootstrap-default-cluster-replica-size={self.default_replica_size}",
            f"--default-storage-host-size={self.default_storage_size}",
        ]

        if external_cockroach:
            address = "cockroach" if external_cockroach == True else external_cockroach
            depends_graph["cockroach"] = {"condition": "service_healthy"}
            command += [
                f"--storage-stash-url=postgres://root@{address}:26257?options=--search_path=storage",
                f"--persist-consensus-url=postgres://root@{address}:26257?options=--search_path=consensus",
            ]
            environment += [
                f"MZ_TIMESTAMP_ORACLE_URL=postgres://root@{address}:26257?options=--search_path=tsoracle",
                "MZ_NO_BUILTIN_COCKROACH=1",
                # Set the adapter stash URL for older environments that need it (versions before
                # v0.92.0).
                f"MZ_ADAPTER_STASH_URL=postgres://root@{address}:26257?options=--search_path=adapter",
            ]

        command += [
            "--orchestrator-process-tcp-proxy-listen-addr=0.0.0.0",
            "--orchestrator-process-prometheus-service-discovery-directory=/mzdata/prometheus",
        ]

        command += options

        config: ServiceConfig = {
            # Use the service name as the hostname so that it is stable across
            # container recreation. (The default hostname is the container ID,
            # which changes when the container is recreated.) This is important
            # when using `external_cockroach=False`, as the consensus/blob URLs
            # refer to the container's hostname, and we don't want those URLs to
            # change when the container is recreated.
            "hostname": name,
        }

        if image:
            config["image"] = image
        else:
            config["mzbuild"] = "materialized"

        if restart:
            config["restart"] = restart

        # Depending on the Docker Compose version, this may either work or be
        # ignored with a warning. Unfortunately no portable way of setting the
        # memory limit is known.
        if memory:
            config["deploy"] = {"resources": {"limits": {"memory": memory}}}

        if sanity_restart:
            # Workaround for https://github.com/docker/compose/issues/11133
            config["labels"] = {"sanity_restart": True}

        if platform:
            config["platform"] = platform

        volumes = []
        if use_default_volumes:
            volumes += DEFAULT_MZ_VOLUMES
        volumes += volumes_extra

        config.update(
            {
                "depends_on": depends_graph,
                "command": command,
                "ports": [6875, 6876, 6877, 6878, 6880, 6881, 26257],
                "environment": environment,
                "volumes": volumes,
                "tmpfs": ["/tmp"],
                "healthcheck": {
                    "test": healthcheck,
                    "interval": "1s",
                    # A fully loaded Materialize can take a long time to start.
                    "start_period": "600s",
                },
            }
        )

        if ports:
            config.update(
                {
                    "allow_host_ports": True,
                    "ports": ports,
                }
            )

        super().__init__(name=name, config=config)

Classes

class Materialized (name: str | None = None, image: str | None = None, environment_extra: list[str] = [], volumes_extra: list[str] = [], depends_on: list[str] = [], memory: str | None = None, options: list[str] = [], persist_blob_url: str | None = None, default_size: int = 4, environment_id: str | None = None, propagate_crashes: bool = True, external_cockroach: str | bool = False, external_minio: str | bool = False, unsafe_mode: bool = True, restart: str | None = None, use_default_volumes: bool = True, ports: list[str] | None = None, system_parameter_defaults: dict[str, str] | None = None, additional_system_parameter_defaults: dict[str, str] | None = None, soft_assertions: bool = True, sanity_restart: bool = True, platform: str | None = None, healthcheck: list[str] | None = None)

A Docker Compose service in a Composition.

Attributes

name
The name of the service.
config
The definition of the service.
Expand source code Browse git
class Materialized(Service):
    class Size:
        DEFAULT_SIZE = 4

    def __init__(
        self,
        name: str | None = None,
        image: str | None = None,
        environment_extra: list[str] = [],
        volumes_extra: list[str] = [],
        depends_on: list[str] = [],
        memory: str | None = None,
        options: list[str] = [],
        persist_blob_url: str | None = None,
        default_size: int = Size.DEFAULT_SIZE,
        environment_id: str | None = None,
        propagate_crashes: bool = True,
        external_cockroach: str | bool = False,
        external_minio: str | bool = False,
        unsafe_mode: bool = True,
        restart: str | None = None,
        use_default_volumes: bool = True,
        ports: list[str] | None = None,
        system_parameter_defaults: dict[str, str] | None = None,
        additional_system_parameter_defaults: dict[str, str] | None = None,
        soft_assertions: bool = True,
        sanity_restart: bool = True,
        platform: str | None = None,
        healthcheck: list[str] | None = None,
    ) -> None:
        if name is None:
            name = "materialized"

        if healthcheck is None:
            healthcheck = ["CMD", "curl", "-f", "localhost:6878/api/readyz"]

        depends_graph: dict[str, ServiceDependency] = {
            s: {"condition": "service_started"} for s in depends_on
        }

        environment = [
            f"MZ_SOFT_ASSERTIONS={int(soft_assertions)}",
            # The following settings can not be baked in the default image, as they
            # are enabled for testing purposes only
            "MZ_ORCHESTRATOR_PROCESS_TCP_PROXY_LISTEN_ADDR=0.0.0.0",
            "MZ_ORCHESTRATOR_PROCESS_PROMETHEUS_SERVICE_DISCOVERY_DIRECTORY=/mzdata/prometheus",
            "MZ_BOOTSTRAP_ROLE=materialize",
            "MZ_INTERNAL_PERSIST_PUBSUB_LISTEN_ADDR=0.0.0.0:6879",
            "MZ_AWS_CONNECTION_ROLE_ARN=arn:aws:iam::123456789000:role/MaterializeConnection",
            "MZ_AWS_EXTERNAL_ID_PREFIX=eb5cb59b-e2fe-41f3-87ca-d2176a495345",
            # Always use the persist catalog if the version has multiple implementations.
            "MZ_CATALOG_STORE=persist",
            # Please think twice before forwarding additional environment
            # variables from the host, as it's easy to write tests that are
            # then accidentally dependent on the state of the host machine.
            #
            # To dynamically change the environment during a workflow run,
            # use Composition.override.
            "MZ_LOG_FILTER",
            "CLUSTERD_LOG_FILTER",
            *environment_extra,
            *DEFAULT_CRDB_ENVIRONMENT,
        ]

        if system_parameter_defaults is None:
            # Has to be copied so we later don't modify the
            # DEFAULT_SYSTEM_PARAMETERS dictionary
            system_parameter_defaults = copy(DEFAULT_SYSTEM_PARAMETERS)

        if additional_system_parameter_defaults is not None:
            system_parameter_defaults.update(additional_system_parameter_defaults)

        if len(system_parameter_defaults) > 0:
            environment += [
                "MZ_SYSTEM_PARAMETER_DEFAULT="
                + ";".join(
                    f"{key}={value}" for key, value in system_parameter_defaults.items()
                )
            ]

        command = []

        if unsafe_mode:
            command += ["--unsafe-mode"]

        if not environment_id:
            environment_id = DEFAULT_MZ_ENVIRONMENT_ID
        command += [f"--environment-id={environment_id}"]

        if external_minio:
            depends_graph["minio"] = {"condition": "service_healthy"}
            address = "minio" if external_minio == True else external_minio
            persist_blob_url = minio_blob_uri(address)

        if persist_blob_url:
            command.append(f"--persist-blob-url={persist_blob_url}")

        if propagate_crashes:
            command += ["--orchestrator-process-propagate-crashes"]

        self.default_storage_size = (
            str(default_size)
            if image
            and "latest" not in image
            and "devel" not in image
            and "unstable" not in image
            and MzVersion.parse_mz(image.split(":")[1]) < MzVersion.parse_mz("v0.41.0")
            else "1"
            if default_size == 1
            else f"{default_size}-1"
        )

        self.default_replica_size = (
            "1" if default_size == 1 else f"{default_size}-{default_size}"
        )
        command += [
            # Issue #15858 prevents the habitual use of large introspection
            # clusters, so we leave the builtin cluster replica size as is.
            # f"--bootstrap-builtin-cluster-replica-size={self.default_replica_size}",
            f"--bootstrap-default-cluster-replica-size={self.default_replica_size}",
            f"--default-storage-host-size={self.default_storage_size}",
        ]

        if external_cockroach:
            address = "cockroach" if external_cockroach == True else external_cockroach
            depends_graph["cockroach"] = {"condition": "service_healthy"}
            command += [
                f"--storage-stash-url=postgres://root@{address}:26257?options=--search_path=storage",
                f"--persist-consensus-url=postgres://root@{address}:26257?options=--search_path=consensus",
            ]
            environment += [
                f"MZ_TIMESTAMP_ORACLE_URL=postgres://root@{address}:26257?options=--search_path=tsoracle",
                "MZ_NO_BUILTIN_COCKROACH=1",
                # Set the adapter stash URL for older environments that need it (versions before
                # v0.92.0).
                f"MZ_ADAPTER_STASH_URL=postgres://root@{address}:26257?options=--search_path=adapter",
            ]

        command += [
            "--orchestrator-process-tcp-proxy-listen-addr=0.0.0.0",
            "--orchestrator-process-prometheus-service-discovery-directory=/mzdata/prometheus",
        ]

        command += options

        config: ServiceConfig = {
            # Use the service name as the hostname so that it is stable across
            # container recreation. (The default hostname is the container ID,
            # which changes when the container is recreated.) This is important
            # when using `external_cockroach=False`, as the consensus/blob URLs
            # refer to the container's hostname, and we don't want those URLs to
            # change when the container is recreated.
            "hostname": name,
        }

        if image:
            config["image"] = image
        else:
            config["mzbuild"] = "materialized"

        if restart:
            config["restart"] = restart

        # Depending on the Docker Compose version, this may either work or be
        # ignored with a warning. Unfortunately no portable way of setting the
        # memory limit is known.
        if memory:
            config["deploy"] = {"resources": {"limits": {"memory": memory}}}

        if sanity_restart:
            # Workaround for https://github.com/docker/compose/issues/11133
            config["labels"] = {"sanity_restart": True}

        if platform:
            config["platform"] = platform

        volumes = []
        if use_default_volumes:
            volumes += DEFAULT_MZ_VOLUMES
        volumes += volumes_extra

        config.update(
            {
                "depends_on": depends_graph,
                "command": command,
                "ports": [6875, 6876, 6877, 6878, 6880, 6881, 26257],
                "environment": environment,
                "volumes": volumes,
                "tmpfs": ["/tmp"],
                "healthcheck": {
                    "test": healthcheck,
                    "interval": "1s",
                    # A fully loaded Materialize can take a long time to start.
                    "start_period": "600s",
                },
            }
        )

        if ports:
            config.update(
                {
                    "allow_host_ports": True,
                    "ports": ports,
                }
            )

        super().__init__(name=name, config=config)

Ancestors

Class variables

var Size