Module materialize.mzcompose

The implementation of the mzcompose system for Docker compositions.

For an overview of what mzcompose is and why it exists, see the user-facing documentation.

Expand source code Browse git
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.

"""The implementation of the mzcompose system for Docker compositions.

For an overview of what mzcompose is and why it exists, see the [user-facing
documentation][user-docs].

[user-docs]: https://github.com/MaterializeInc/materialize/blob/main/doc/developer/mzbuild.md
"""

import subprocess
from collections.abc import Iterable
from ssl import SSLContext
from typing import (
    Any,
    Literal,
    TypeVar,
)

import pg8000

from materialize import spawn, ui
from materialize.ui import UIError

T = TypeVar("T")
say = ui.speaker("C> ")


DEFAULT_CONFLUENT_PLATFORM_VERSION = "7.5.0"

DEFAULT_MZ_VOLUMES = [
    "mzdata:/mzdata",
    "mydata:/var/lib/mysql-files",
    "tmp:/share/tmp",
    "scratch:/scratch",
]

DEFAULT_SYSTEM_PARAMETERS = {
    "persist_sink_minimum_batch_updates": "128",
    "enable_multi_worker_storage_persist_sink": "true",
    "storage_persist_sink_minimum_batch_updates": "100",
    "persist_pubsub_push_diff_enabled": "true",
    "persist_pubsub_client_enabled": "true",
    "persist_stats_audit_percent": "100",
    "persist_batch_delete_enabled": "true",
    "enable_columnation_lgalloc": "true",
    "enable_rbac_checks": "true",
    "enable_try_parse_monotonic_iso8601_timestamp": "true",
    "enable_dangerous_functions": "true",
    "enable_disk_cluster_replicas": "true",
    "statement_logging_max_sample_rate": "1.0",
    "statement_logging_default_sample_rate": "1.0",
    # Following values are set based on Load Test environment to
    # reduce CRDB load as we are struggling with it in CI:
    "persist_next_listen_batch_retryer_clamp": "100ms",
    "persist_next_listen_batch_retryer_initial_backoff": "1200ms",
    # Advance coverage on some Persist internals changes
    "persist_streaming_compaction_enabled": "true",
    "persist_streaming_snapshot_and_fetch_enabled": "true",
    "storage_source_decode_fuel": "100000",
    # 128 MiB,
    "compute_dataflow_max_inflight_bytes": "134217728",
    "enable_unified_clusters": "true",
    "enable_jemalloc_profiling": "true",
    "enable_comment": "true",
    "enable_sink_doc_on_option": "true",
    "enable_assert_not_null": "true",
    "enable_specialized_arrangements": "true",
    # TODO(def-,bkirwi): Reenable before this is used in production, #22042
    # "persist_fast_path_limit": "1000",
    "enable_alter_swap": "true",
    "timestamp_oracle": "postgres",
    "default_idle_arrangement_merge_effort": "0",
    "default_arrangement_exert_proportionality": "16",
}

DEFAULT_CRDB_ENVIRONMENT = [
    "COCKROACH_ENGINE_MAX_SYNC_DURATION_DEFAULT=120s",
    "COCKROACH_LOG_MAX_SYNC_DURATION=120s",
]


# TODO(benesch): change to `docker-mzcompose` once v0.39 ships.
DEFAULT_CLOUD_PROVIDER = "mzcompose"
DEFAULT_CLOUD_REGION = "us-east-1"
DEFAULT_ORG_ID = "00000000-0000-0000-0000-000000000000"
DEFAULT_ORDINAL = "0"
DEFAULT_MZ_ENVIRONMENT_ID = f"{DEFAULT_CLOUD_PROVIDER}-{DEFAULT_CLOUD_REGION}-{DEFAULT_ORG_ID}-{DEFAULT_ORDINAL}"


# TODO(benesch): replace with Docker health checks.
def _check_tcp(
    cmd: list[str], host: str, port: int, timeout_secs: int, kind: str = ""
) -> list[str]:
    cmd.extend(
        [
            "timeout",
            str(timeout_secs),
            "bash",
            "-c",
            f"until [ cat < /dev/null > /dev/tcp/{host}/{port} ] ; do sleep 0.1 ; done",
        ]
    )
    try:
        spawn.capture(cmd, stderr=subprocess.STDOUT)
    except subprocess.CalledProcessError as e:
        ui.log_in_automation(
            "wait-for-tcp ({}{}:{}): error running {}: {}, stdout:\n{}\nstderr:\n{}".format(
                kind, host, port, ui.shell_quote(cmd), e, e.stdout, e.stderr
            )
        )
        raise
    return cmd


# TODO(benesch): replace with Docker health checks.
def _wait_for_pg(
    timeout_secs: int,
    query: str,
    dbname: str,
    port: int,
    host: str,
    user: str,
    password: str | None,
    expected: Iterable[Any] | Literal["any"],
    print_result: bool = False,
    ssl_context: SSLContext | None = None,
) -> None:
    """Wait for a pg-compatible database (includes materialized)"""
    obfuscated_password = password[0:1] if password is not None else ""
    args = f"dbname={dbname} host={host} port={port} user={user} password='{obfuscated_password}...'"
    ui.progress(f"waiting for {args} to handle {query!r}", "C")
    error = None
    for remaining in ui.timeout_loop(timeout_secs, tick=0.5):
        try:
            conn = pg8000.connect(
                database=dbname,
                host=host,
                port=port,
                user=user,
                password=password,
                timeout=1,
                ssl_context=ssl_context,
            )
            # The default (autocommit = false) wraps everything in a transaction.
            conn.autocommit = True
            with conn.cursor() as cur:
                cur.execute(query)
                if expected == "any" and cur.rowcount == -1:
                    ui.progress(" success!", finish=True)
                    return
                result = list(cur.fetchall())
                if expected == "any" or result == expected:
                    if print_result:
                        say(f"query result: {result}")
                    else:
                        ui.progress(" success!", finish=True)
                    return
                else:
                    say(
                        f"host={host} port={port} did not return rows matching {expected} got: {result}"
                    )
        except Exception as e:
            ui.progress(f"{e if print_result else ''} {int(remaining)}")
            error = e
    ui.progress(finish=True)
    raise UIError(f"never got correct result for {args}: {error}")

Sub-modules

materialize.mzcompose.composition

The implementation of the mzcompose system for Docker compositions …

materialize.mzcompose.loader
materialize.mzcompose.service

The implementation of the mzcompose system for Docker compositions …

materialize.mzcompose.services