
Expand source code Browse git
# Copyright Materialize, Inc. and contributors. All rights reserved.
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.

from copy import copy
from enum import Enum

from materialize.mz_version import MzVersion
from materialize.mzcompose import (
from materialize.mzcompose.service import (
from import minio_blob_uri

class Materialized(Service):
    class Size:
        DEFAULT_SIZE = 4

    def __init__(
        name: str | None = None,
        image: str | None = None,
        environment_extra: list[str] = [],
        volumes_extra: list[str] = [],
        depends_on: list[str] = [],
        memory: str | None = None,
        options: list[str] = [],
        persist_blob_url: str | None = None,
        default_size: int = Size.DEFAULT_SIZE,
        environment_id: str | None = None,
        propagate_crashes: bool = True,
        external_cockroach: str | bool = False,
        external_minio: str | bool = False,
        unsafe_mode: bool = True,
        restart: str | None = None,
        use_default_volumes: bool = True,
        ports: list[str] | None = None,
        system_parameter_defaults: dict[str, str] | None = None,
        additional_system_parameter_defaults: dict[str, str] | None = None,
        soft_assertions: bool = True,
        sanity_restart: bool = True,
        platform: str | None = None,
        healthcheck: list[str] | None = None,
        deploy_generation: int | None = None,
    ) -> None:
        if name is None:
            name = "materialized"

        if healthcheck is None:
            healthcheck = ["CMD", "curl", "-f", "localhost:6878/api/readyz"]

        depends_graph: dict[str, ServiceDependency] = {
            s: {"condition": "service_started"} for s in depends_on

        environment = [
            # The following settings can not be baked in the default image, as they
            # are enabled for testing purposes only
            # Always use the persist catalog if the version has multiple implementations.
            # Please think twice before forwarding additional environment
            # variables from the host, as it's easy to write tests that are
            # then accidentally dependent on the state of the host machine.
            # To dynamically change the environment during a workflow run,
            # use Composition.override.

        if system_parameter_defaults is None:
            # Has to be copied so we later don't modify the
            # DEFAULT_SYSTEM_PARAMETERS dictionary
            system_parameter_defaults = copy(DEFAULT_SYSTEM_PARAMETERS)

        if additional_system_parameter_defaults is not None:

        if len(system_parameter_defaults) > 0:
            environment += [
                + ";".join(
                    f"{key}={value}" for key, value in system_parameter_defaults.items()

        command = []

        if unsafe_mode:
            command += ["--unsafe-mode"]

        if not environment_id:
            environment_id = DEFAULT_MZ_ENVIRONMENT_ID
        command += [f"--environment-id={environment_id}"]

        if external_minio:
            depends_graph["minio"] = {"condition": "service_healthy"}
            address = "minio" if external_minio == True else external_minio
            persist_blob_url = minio_blob_uri(address)

        if persist_blob_url:

        if propagate_crashes:
            command += ["--orchestrator-process-propagate-crashes"]

        if deploy_generation is not None:
            command += [f"--deploy-generation={deploy_generation}"]

        self.default_storage_size = (
            if image
            and "latest" not in image
            and "devel" not in image
            and "unstable" not in image
            and MzVersion.parse_mz(image.split(":")[1]) < MzVersion.parse_mz("v0.41.0")
            else "1" if default_size == 1 else f"{default_size}-1"

        self.default_replica_size = (
            "1" if default_size == 1 else f"{default_size}-{default_size}"
        command += [
            # Issue #15858 prevents the habitual use of large introspection
            # clusters, so we leave the builtin cluster replica size as is.
            # f"--bootstrap-builtin-cluster-replica-size={self.default_replica_size}",

        if external_cockroach:
            address = "cockroach" if external_cockroach == True else external_cockroach
            depends_graph["cockroach"] = {"condition": "service_healthy"}
            command += [
            environment += [
                # Set the adapter stash URL for older environments that need it (versions before
                # v0.92.0).

        command += [

        command += options

        config: ServiceConfig = {
            # Use the service name as the hostname so that it is stable across
            # container recreation. (The default hostname is the container ID,
            # which changes when the container is recreated.) This is important
            # when using `external_cockroach=False`, as the consensus/blob URLs
            # refer to the container's hostname, and we don't want those URLs to
            # change when the container is recreated.
            "hostname": name,

        if image:
            config["image"] = image
            config["mzbuild"] = "materialized"

        if restart:
            config["restart"] = restart

        # Depending on the Docker Compose version, this may either work or be
        # ignored with a warning. Unfortunately no portable way of setting the
        # memory limit is known.
        if memory:
            config["deploy"] = {"resources": {"limits": {"memory": memory}}}

        if sanity_restart:
            # Workaround for
            config["labels"] = {"sanity_restart": True}

        if platform:
            config["platform"] = platform

        volumes = []
        if use_default_volumes:
            volumes += DEFAULT_MZ_VOLUMES
        volumes += volumes_extra

                "depends_on": depends_graph,
                "command": command,
                "ports": [6875, 6876, 6877, 6878, 6880, 6881, 26257],
                "environment": environment,
                "volumes": volumes,
                "tmpfs": ["/tmp"],
                "healthcheck": {
                    "test": healthcheck,
                    "interval": "1s",
                    # A fully loaded Materialize can take a long time to start.
                    "start_period": "600s",

        if ports:
                    "allow_host_ports": True,
                    "ports": ports,

        super().__init__(name=name, config=config)

class DeploymentStatus(Enum):
    """See DeploymentStateInner for reference"""

    INITIALIZING = "Initializing"
    READY_TO_PROMOTE = "ReadyToPromote"
    PROMOTING = "Promoting"
    IS_LEADER = "IsLeader"



class DeploymentStatus (*args, **kwds)

See DeploymentStateInner for reference

Expand source code Browse git
class DeploymentStatus(Enum):
    """See DeploymentStateInner for reference"""

    INITIALIZING = "Initializing"
    READY_TO_PROMOTE = "ReadyToPromote"
    PROMOTING = "Promoting"
    IS_LEADER = "IsLeader"


  • enum.Enum

Class variables

class Materialized (name: str | None = None, image: str | None = None, environment_extra: list[str] = [], volumes_extra: list[str] = [], depends_on: list[str] = [], memory: str | None = None, options: list[str] = [], persist_blob_url: str | None = None, default_size: int = 4, environment_id: str | None = None, propagate_crashes: bool = True, external_cockroach: str | bool = False, external_minio: str | bool = False, unsafe_mode: bool = True, restart: str | None = None, use_default_volumes: bool = True, ports: list[str] | None = None, system_parameter_defaults: dict[str, str] | None = None, additional_system_parameter_defaults: dict[str, str] | None = None, soft_assertions: bool = True, sanity_restart: bool = True, platform: str | None = None, healthcheck: list[str] | None = None, deploy_generation: int | None = None)

A Docker Compose service in a Composition.


The name of the service.
The definition of the service.
Expand source code Browse git
class Materialized(Service):
    class Size:
        DEFAULT_SIZE = 4

    def __init__(
        name: str | None = None,
        image: str | None = None,
        environment_extra: list[str] = [],
        volumes_extra: list[str] = [],
        depends_on: list[str] = [],
        memory: str | None = None,
        options: list[str] = [],
        persist_blob_url: str | None = None,
        default_size: int = Size.DEFAULT_SIZE,
        environment_id: str | None = None,
        propagate_crashes: bool = True,
        external_cockroach: str | bool = False,
        external_minio: str | bool = False,
        unsafe_mode: bool = True,
        restart: str | None = None,
        use_default_volumes: bool = True,
        ports: list[str] | None = None,
        system_parameter_defaults: dict[str, str] | None = None,
        additional_system_parameter_defaults: dict[str, str] | None = None,
        soft_assertions: bool = True,
        sanity_restart: bool = True,
        platform: str | None = None,
        healthcheck: list[str] | None = None,
        deploy_generation: int | None = None,
    ) -> None:
        if name is None:
            name = "materialized"

        if healthcheck is None:
            healthcheck = ["CMD", "curl", "-f", "localhost:6878/api/readyz"]

        depends_graph: dict[str, ServiceDependency] = {
            s: {"condition": "service_started"} for s in depends_on

        environment = [
            # The following settings can not be baked in the default image, as they
            # are enabled for testing purposes only
            # Always use the persist catalog if the version has multiple implementations.
            # Please think twice before forwarding additional environment
            # variables from the host, as it's easy to write tests that are
            # then accidentally dependent on the state of the host machine.
            # To dynamically change the environment during a workflow run,
            # use Composition.override.

        if system_parameter_defaults is None:
            # Has to be copied so we later don't modify the
            # DEFAULT_SYSTEM_PARAMETERS dictionary
            system_parameter_defaults = copy(DEFAULT_SYSTEM_PARAMETERS)

        if additional_system_parameter_defaults is not None:

        if len(system_parameter_defaults) > 0:
            environment += [
                + ";".join(
                    f"{key}={value}" for key, value in system_parameter_defaults.items()

        command = []

        if unsafe_mode:
            command += ["--unsafe-mode"]

        if not environment_id:
            environment_id = DEFAULT_MZ_ENVIRONMENT_ID
        command += [f"--environment-id={environment_id}"]

        if external_minio:
            depends_graph["minio"] = {"condition": "service_healthy"}
            address = "minio" if external_minio == True else external_minio
            persist_blob_url = minio_blob_uri(address)

        if persist_blob_url:

        if propagate_crashes:
            command += ["--orchestrator-process-propagate-crashes"]

        if deploy_generation is not None:
            command += [f"--deploy-generation={deploy_generation}"]

        self.default_storage_size = (
            if image
            and "latest" not in image
            and "devel" not in image
            and "unstable" not in image
            and MzVersion.parse_mz(image.split(":")[1]) < MzVersion.parse_mz("v0.41.0")
            else "1" if default_size == 1 else f"{default_size}-1"

        self.default_replica_size = (
            "1" if default_size == 1 else f"{default_size}-{default_size}"
        command += [
            # Issue #15858 prevents the habitual use of large introspection
            # clusters, so we leave the builtin cluster replica size as is.
            # f"--bootstrap-builtin-cluster-replica-size={self.default_replica_size}",

        if external_cockroach:
            address = "cockroach" if external_cockroach == True else external_cockroach
            depends_graph["cockroach"] = {"condition": "service_healthy"}
            command += [
            environment += [
                # Set the adapter stash URL for older environments that need it (versions before
                # v0.92.0).

        command += [

        command += options

        config: ServiceConfig = {
            # Use the service name as the hostname so that it is stable across
            # container recreation. (The default hostname is the container ID,
            # which changes when the container is recreated.) This is important
            # when using `external_cockroach=False`, as the consensus/blob URLs
            # refer to the container's hostname, and we don't want those URLs to
            # change when the container is recreated.
            "hostname": name,

        if image:
            config["image"] = image
            config["mzbuild"] = "materialized"

        if restart:
            config["restart"] = restart

        # Depending on the Docker Compose version, this may either work or be
        # ignored with a warning. Unfortunately no portable way of setting the
        # memory limit is known.
        if memory:
            config["deploy"] = {"resources": {"limits": {"memory": memory}}}

        if sanity_restart:
            # Workaround for
            config["labels"] = {"sanity_restart": True}

        if platform:
            config["platform"] = platform

        volumes = []
        if use_default_volumes:
            volumes += DEFAULT_MZ_VOLUMES
        volumes += volumes_extra

                "depends_on": depends_graph,
                "command": command,
                "ports": [6875, 6876, 6877, 6878, 6880, 6881, 26257],
                "environment": environment,
                "volumes": volumes,
                "tmpfs": ["/tmp"],
                "healthcheck": {
                    "test": healthcheck,
                    "interval": "1s",
                    # A fully loaded Materialize can take a long time to start.
                    "start_period": "600s",

        if ports:
                    "allow_host_ports": True,
                    "ports": ports,

        super().__init__(name=name, config=config)


Class variables

var Size