misc.python.materialize.mzcompose

The implementation of the mzcompose system for Docker compositions.

For an overview of what mzcompose is and why it exists, see the user-facing documentation.

  1# Copyright Materialize, Inc. and contributors. All rights reserved.
  2#
  3# Use of this software is governed by the Business Source License
  4# included in the LICENSE file at the root of this repository.
  5#
  6# As of the Change Date specified in that file, in accordance with
  7# the Business Source License, use of this software will be governed
  8# by the Apache License, Version 2.0.
  9
 10"""The implementation of the mzcompose system for Docker compositions.
 11
 12For an overview of what mzcompose is and why it exists, see the [user-facing
 13documentation][user-docs].
 14
 15[user-docs]: https://github.com/MaterializeInc/materialize/blob/main/doc/developer/mzbuild.md
 16"""
 17
 18import subprocess
 19from collections.abc import Iterable
 20from typing import Any, Literal, TypeVar
 21
 22import psycopg
 23
 24from materialize import spawn, ui
 25from materialize.mz_version import MzVersion
 26from materialize.ui import UIError
 27
 28T = TypeVar("T")
 29say = ui.speaker("C> ")
 30
 31
 32DEFAULT_CONFLUENT_PLATFORM_VERSION = "7.7.0"
 33
 34DEFAULT_MZ_VOLUMES = [
 35    "mzdata:/mzdata",
 36    "mydata:/var/lib/mysql-files",
 37    "tmp:/share/tmp",
 38    "scratch:/scratch",
 39]
 40
 41
 42# Parameters which disable systems that periodically/unpredictably impact performance
 43ADDITIONAL_BENCHMARKING_SYSTEM_PARAMETERS = {
 44    "enable_statement_lifecycle_logging": "false",
 45    "persist_catalog_force_compaction_fuel": "0",
 46    "statement_logging_default_sample_rate": "0",
 47    "statement_logging_max_sample_rate": "0",
 48    # Default of 128 MB increases memory usage by a lot for some small
 49    # performance in benchmarks, see for example FastPathLimit scenario: 55%
 50    # more memory, 5% faster
 51    "persist_blob_cache_mem_limit_bytes": "1048576",
 52    # This would increase the memory usage of many tests, making it harder to
 53    # tell small memory increase regressions
 54    "persist_blob_cache_scale_with_threads": "false",
 55}
 56
 57
 58def get_default_system_parameters(
 59    version: MzVersion | None = None,
 60    zero_downtime: bool = False,
 61    force_source_table_syntax: bool = False,
 62) -> dict[str, str]:
 63    """For upgrade tests we only want parameters set when all environmentd /
 64    clusterd processes have reached a specific version (or higher)
 65    """
 66
 67    if not version:
 68        version = MzVersion.parse_cargo()
 69
 70    return {
 71        # -----
 72        # Unsafe functions
 73        "unsafe_enable_unsafe_functions": "true",
 74        # -----
 75        # To reduce CRDB load as we are struggling with it in CI (values based on load test environment):
 76        "persist_next_listen_batch_retryer_clamp": "16s",
 77        "persist_next_listen_batch_retryer_initial_backoff": "100ms",
 78        "persist_next_listen_batch_retryer_fixed_sleep": "1200ms",
 79        # -----
 80        # Persist internals changes: advance coverage
 81        "persist_enable_arrow_lgalloc_noncc_sizes": "true",
 82        "persist_enable_s3_lgalloc_noncc_sizes": "true",
 83        # -----
 84        # Others (ordered by name)
 85        "allow_real_time_recency": "true",
 86        "cluster_always_use_disk": "true",
 87        "compute_dataflow_max_inflight_bytes": "134217728",  # 128 MiB
 88        "compute_hydration_concurrency": "2",
 89        "compute_replica_expiration_offset": "3d",
 90        "compute_apply_column_demands": "true",
 91        "disk_cluster_replicas_default": "true",
 92        "enable_0dt_deployment": "true" if zero_downtime else "false",
 93        "enable_0dt_deployment_panic_after_timeout": "true",
 94        "enable_0dt_deployment_sources": (
 95            "true" if version >= MzVersion.parse_mz("v0.132.0-dev") else "false"
 96        ),
 97        "enable_alter_swap": "true",
 98        "enable_columnation_lgalloc": "true",
 99        "enable_compute_correction_v2": "true",
100        "enable_compute_logical_backpressure": "true",
101        "enable_connection_validation_syntax": "true",
102        "enable_continual_task_builtins": (
103            "true" if version > MzVersion.parse_mz("v0.127.0-dev") else "false"
104        ),
105        "enable_continual_task_create": "true",
106        "enable_continual_task_retain": "true",
107        "enable_continual_task_transform": "true",
108        "enable_copy_to_expr": "true",
109        "enable_create_table_from_source": "true",
110        "enable_disk_cluster_replicas": "true",
111        "enable_eager_delta_joins": "true",
112        "enable_envelope_debezium_in_subscribe": "true",
113        "enable_expressions_in_limit_syntax": "true",
114        "enable_introspection_subscribes": "true",
115        "enable_kafka_sink_partition_by": "true",
116        "enable_logical_compaction_window": "true",
117        "enable_multi_worker_storage_persist_sink": "true",
118        "enable_multi_replica_sources": "true",
119        "enable_rbac_checks": "true",
120        "enable_reduce_mfp_fusion": "true",
121        "enable_refresh_every_mvs": "true",
122        "enable_cluster_schedule_refresh": "true",
123        "enable_statement_lifecycle_logging": "true",
124        "unsafe_enable_table_keys": "true",
125        "enable_variadic_left_join_lowering": "true",
126        "enable_worker_core_affinity": "true",
127        "kafka_default_metadata_fetch_interval": "1s",
128        "mysql_offset_known_interval": "1s",
129        "force_source_table_syntax": "true" if force_source_table_syntax else "false",
130        "ore_overflowing_behavior": "panic",
131        "persist_batch_columnar_format": (
132            "structured" if version > MzVersion.parse_mz("v0.135.0-dev") else "both_v2"
133        ),
134        "persist_batch_delete_enabled": "true",
135        "persist_batch_structured_order": "true",
136        "persist_batch_builder_structured": "true",
137        "persist_batch_structured_key_lower_len": "256",
138        "persist_batch_max_run_len": "4",
139        "persist_catalog_force_compaction_fuel": "1024",
140        "persist_catalog_force_compaction_wait": "1s",
141        "persist_encoding_enable_dictionary": "true",
142        "persist_fast_path_limit": "1000",
143        "persist_fast_path_order": "true",
144        "persist_inline_writes_single_max_bytes": "4096",
145        "persist_inline_writes_total_max_bytes": "1048576",
146        "persist_pubsub_client_enabled": "true",
147        "persist_pubsub_push_diff_enabled": "true",
148        "persist_record_compactions": "true",
149        "persist_record_schema_id": (
150            "true" if version > MzVersion.parse_mz("v0.127.0-dev") else "false"
151        ),
152        # 16 MiB - large enough to avoid a big perf hit, small enough to get more coverage...
153        "persist_blob_target_size": "16777216",
154        "persist_stats_audit_percent": "100",
155        "persist_use_critical_since_catalog": "true",
156        "persist_use_critical_since_snapshot": "false" if zero_downtime else "true",
157        "persist_use_critical_since_source": "false" if zero_downtime else "true",
158        "persist_part_decode_format": "arrow",
159        "persist_blob_cache_scale_with_threads": "true",
160        "pg_offset_known_interval": "1s",
161        "statement_logging_default_sample_rate": "0.01",
162        "statement_logging_max_sample_rate": "0.01",
163        "storage_reclock_to_latest": "true",
164        "storage_source_decode_fuel": "100000",
165        "storage_statistics_collection_interval": "1000",
166        "storage_statistics_interval": "2000",
167        "storage_use_continual_feedback_upsert": "true",
168        "with_0dt_deployment_max_wait": "1800s",
169        # End of list (ordered by name)
170    }
171
172
173DEFAULT_CRDB_ENVIRONMENT = [
174    "COCKROACH_ENGINE_MAX_SYNC_DURATION_DEFAULT=120s",
175    "COCKROACH_LOG_MAX_SYNC_DURATION=120s",
176]
177
178
179# TODO(benesch): change to `docker-mzcompose` once v0.39 ships.
180DEFAULT_CLOUD_PROVIDER = "mzcompose"
181DEFAULT_CLOUD_REGION = "us-east-1"
182DEFAULT_ORG_ID = "00000000-0000-0000-0000-000000000000"
183DEFAULT_ORDINAL = "0"
184DEFAULT_MZ_ENVIRONMENT_ID = f"{DEFAULT_CLOUD_PROVIDER}-{DEFAULT_CLOUD_REGION}-{DEFAULT_ORG_ID}-{DEFAULT_ORDINAL}"
185
186
187# TODO(benesch): replace with Docker health checks.
188def _check_tcp(
189    cmd: list[str], host: str, port: int, timeout_secs: int, kind: str = ""
190) -> list[str]:
191    cmd.extend(
192        [
193            "timeout",
194            str(timeout_secs),
195            "bash",
196            "-c",
197            f"until [ cat < /dev/null > /dev/tcp/{host}/{port} ] ; do sleep 0.1 ; done",
198        ]
199    )
200    try:
201        spawn.capture(cmd, stderr=subprocess.STDOUT)
202    except subprocess.CalledProcessError as e:
203        ui.log_in_automation(
204            "wait-for-tcp ({}{}:{}): error running {}: {}, stdout:\n{}\nstderr:\n{}".format(
205                kind, host, port, ui.shell_quote(cmd), e, e.stdout, e.stderr
206            )
207        )
208        raise
209    return cmd
210
211
212# TODO(benesch): replace with Docker health checks.
213def _wait_for_pg(
214    timeout_secs: int,
215    query: str,
216    dbname: str,
217    port: int,
218    host: str,
219    user: str,
220    password: str | None,
221    expected: Iterable[Any] | Literal["any"],
222    print_result: bool = False,
223    sslmode: str = "disable",
224) -> None:
225    """Wait for a pg-compatible database (includes materialized)"""
226    obfuscated_password = password[0:1] if password is not None else ""
227    args = f"dbname={dbname} host={host} port={port} user={user} password='{obfuscated_password}...'"
228    ui.progress(f"waiting for {args} to handle {query!r}", "C")
229    error = None
230    for remaining in ui.timeout_loop(timeout_secs, tick=0.5):
231        try:
232            conn = psycopg.connect(
233                dbname=dbname,
234                host=host,
235                port=port,
236                user=user,
237                password=password,
238                connect_timeout=1,
239                sslmode=sslmode,
240            )
241            # The default (autocommit = false) wraps everything in a transaction.
242            conn.autocommit = True
243            with conn.cursor() as cur:
244                cur.execute(query.encode())
245                if expected == "any" and cur.rowcount == -1:
246                    ui.progress(" success!", finish=True)
247                    return
248                result = list(cur.fetchall())
249                if expected == "any" or result == expected:
250                    if print_result:
251                        say(f"query result: {result}")
252                    else:
253                        ui.progress(" success!", finish=True)
254                    return
255                else:
256                    say(
257                        f"host={host} port={port} did not return rows matching {expected} got: {result}"
258                    )
259        except Exception as e:
260            ui.progress(f"{e if print_result else ''} {int(remaining)}")
261            error = e
262    ui.progress(finish=True)
263    raise UIError(f"never got correct result for {args}: {error}")
264
265
266def bootstrap_cluster_replica_size() -> str:
267    return "bootstrap"
268
269
270def cluster_replica_size_map() -> dict[str, dict[str, Any]]:
271    def replica_size(
272        workers: int,
273        scale: int,
274        disabled: bool = False,
275        is_cc: bool = True,
276        memory_limit: str | None = None,
277    ) -> dict[str, Any]:
278        return {
279            "cpu_exclusive": False,
280            "cpu_limit": None,
281            "credits_per_hour": f"{workers * scale}",
282            "disabled": disabled,
283            "disk_limit": None,
284            "is_cc": is_cc,
285            "memory_limit": memory_limit or "4Gi",
286            "scale": scale,
287            "workers": workers,
288            # "selectors": {},
289        }
290
291    replica_sizes = {
292        bootstrap_cluster_replica_size(): replica_size(1, 1),
293        "2-4": replica_size(4, 2),
294        "free": replica_size(0, 0, disabled=True),
295        "1cc": replica_size(1, 1),
296        "1C": replica_size(1, 1),
297        "1-no-disk": replica_size(1, 1, is_cc=False),
298        "2-no-disk": replica_size(2, 1, is_cc=False),
299    }
300
301    for i in range(0, 6):
302        workers = 1 << i
303        replica_sizes[f"{workers}"] = replica_size(workers, 1)
304        for mem in [4, 8, 16, 32]:
305            replica_sizes[f"{workers}-{mem}G"] = replica_size(
306                workers, 1, memory_limit=f"{mem} GiB"
307            )
308
309        replica_sizes[f"{workers}-1"] = replica_size(1, workers)
310        replica_sizes[f"{workers}-{workers}"] = replica_size(workers, workers)
311        replica_sizes[f"mem-{workers}"] = replica_size(
312            workers, 1, memory_limit=f"{workers} GiB"
313        )
314
315    return replica_sizes
def say(msg: str) -> None:
55    def say(msg: str) -> None:
56        if not Verbosity.quiet:
57            print(f"{prefix}{msg}", file=sys.stderr)
DEFAULT_CONFLUENT_PLATFORM_VERSION = '7.7.0'
DEFAULT_MZ_VOLUMES = ['mzdata:/mzdata', 'mydata:/var/lib/mysql-files', 'tmp:/share/tmp', 'scratch:/scratch']
ADDITIONAL_BENCHMARKING_SYSTEM_PARAMETERS = {'enable_statement_lifecycle_logging': 'false', 'persist_catalog_force_compaction_fuel': '0', 'statement_logging_default_sample_rate': '0', 'statement_logging_max_sample_rate': '0', 'persist_blob_cache_mem_limit_bytes': '1048576', 'persist_blob_cache_scale_with_threads': 'false'}
def get_default_system_parameters( version: materialize.mz_version.MzVersion | None = None, zero_downtime: bool = False, force_source_table_syntax: bool = False) -> dict[str, str]:
 59def get_default_system_parameters(
 60    version: MzVersion | None = None,
 61    zero_downtime: bool = False,
 62    force_source_table_syntax: bool = False,
 63) -> dict[str, str]:
 64    """For upgrade tests we only want parameters set when all environmentd /
 65    clusterd processes have reached a specific version (or higher)
 66    """
 67
 68    if not version:
 69        version = MzVersion.parse_cargo()
 70
 71    return {
 72        # -----
 73        # Unsafe functions
 74        "unsafe_enable_unsafe_functions": "true",
 75        # -----
 76        # To reduce CRDB load as we are struggling with it in CI (values based on load test environment):
 77        "persist_next_listen_batch_retryer_clamp": "16s",
 78        "persist_next_listen_batch_retryer_initial_backoff": "100ms",
 79        "persist_next_listen_batch_retryer_fixed_sleep": "1200ms",
 80        # -----
 81        # Persist internals changes: advance coverage
 82        "persist_enable_arrow_lgalloc_noncc_sizes": "true",
 83        "persist_enable_s3_lgalloc_noncc_sizes": "true",
 84        # -----
 85        # Others (ordered by name)
 86        "allow_real_time_recency": "true",
 87        "cluster_always_use_disk": "true",
 88        "compute_dataflow_max_inflight_bytes": "134217728",  # 128 MiB
 89        "compute_hydration_concurrency": "2",
 90        "compute_replica_expiration_offset": "3d",
 91        "compute_apply_column_demands": "true",
 92        "disk_cluster_replicas_default": "true",
 93        "enable_0dt_deployment": "true" if zero_downtime else "false",
 94        "enable_0dt_deployment_panic_after_timeout": "true",
 95        "enable_0dt_deployment_sources": (
 96            "true" if version >= MzVersion.parse_mz("v0.132.0-dev") else "false"
 97        ),
 98        "enable_alter_swap": "true",
 99        "enable_columnation_lgalloc": "true",
100        "enable_compute_correction_v2": "true",
101        "enable_compute_logical_backpressure": "true",
102        "enable_connection_validation_syntax": "true",
103        "enable_continual_task_builtins": (
104            "true" if version > MzVersion.parse_mz("v0.127.0-dev") else "false"
105        ),
106        "enable_continual_task_create": "true",
107        "enable_continual_task_retain": "true",
108        "enable_continual_task_transform": "true",
109        "enable_copy_to_expr": "true",
110        "enable_create_table_from_source": "true",
111        "enable_disk_cluster_replicas": "true",
112        "enable_eager_delta_joins": "true",
113        "enable_envelope_debezium_in_subscribe": "true",
114        "enable_expressions_in_limit_syntax": "true",
115        "enable_introspection_subscribes": "true",
116        "enable_kafka_sink_partition_by": "true",
117        "enable_logical_compaction_window": "true",
118        "enable_multi_worker_storage_persist_sink": "true",
119        "enable_multi_replica_sources": "true",
120        "enable_rbac_checks": "true",
121        "enable_reduce_mfp_fusion": "true",
122        "enable_refresh_every_mvs": "true",
123        "enable_cluster_schedule_refresh": "true",
124        "enable_statement_lifecycle_logging": "true",
125        "unsafe_enable_table_keys": "true",
126        "enable_variadic_left_join_lowering": "true",
127        "enable_worker_core_affinity": "true",
128        "kafka_default_metadata_fetch_interval": "1s",
129        "mysql_offset_known_interval": "1s",
130        "force_source_table_syntax": "true" if force_source_table_syntax else "false",
131        "ore_overflowing_behavior": "panic",
132        "persist_batch_columnar_format": (
133            "structured" if version > MzVersion.parse_mz("v0.135.0-dev") else "both_v2"
134        ),
135        "persist_batch_delete_enabled": "true",
136        "persist_batch_structured_order": "true",
137        "persist_batch_builder_structured": "true",
138        "persist_batch_structured_key_lower_len": "256",
139        "persist_batch_max_run_len": "4",
140        "persist_catalog_force_compaction_fuel": "1024",
141        "persist_catalog_force_compaction_wait": "1s",
142        "persist_encoding_enable_dictionary": "true",
143        "persist_fast_path_limit": "1000",
144        "persist_fast_path_order": "true",
145        "persist_inline_writes_single_max_bytes": "4096",
146        "persist_inline_writes_total_max_bytes": "1048576",
147        "persist_pubsub_client_enabled": "true",
148        "persist_pubsub_push_diff_enabled": "true",
149        "persist_record_compactions": "true",
150        "persist_record_schema_id": (
151            "true" if version > MzVersion.parse_mz("v0.127.0-dev") else "false"
152        ),
153        # 16 MiB - large enough to avoid a big perf hit, small enough to get more coverage...
154        "persist_blob_target_size": "16777216",
155        "persist_stats_audit_percent": "100",
156        "persist_use_critical_since_catalog": "true",
157        "persist_use_critical_since_snapshot": "false" if zero_downtime else "true",
158        "persist_use_critical_since_source": "false" if zero_downtime else "true",
159        "persist_part_decode_format": "arrow",
160        "persist_blob_cache_scale_with_threads": "true",
161        "pg_offset_known_interval": "1s",
162        "statement_logging_default_sample_rate": "0.01",
163        "statement_logging_max_sample_rate": "0.01",
164        "storage_reclock_to_latest": "true",
165        "storage_source_decode_fuel": "100000",
166        "storage_statistics_collection_interval": "1000",
167        "storage_statistics_interval": "2000",
168        "storage_use_continual_feedback_upsert": "true",
169        "with_0dt_deployment_max_wait": "1800s",
170        # End of list (ordered by name)
171    }

For upgrade tests we only want parameters set when all environmentd / clusterd processes have reached a specific version (or higher)

DEFAULT_CRDB_ENVIRONMENT = ['COCKROACH_ENGINE_MAX_SYNC_DURATION_DEFAULT=120s', 'COCKROACH_LOG_MAX_SYNC_DURATION=120s']
DEFAULT_CLOUD_PROVIDER = 'mzcompose'
DEFAULT_CLOUD_REGION = 'us-east-1'
DEFAULT_ORG_ID = '00000000-0000-0000-0000-000000000000'
DEFAULT_ORDINAL = '0'
DEFAULT_MZ_ENVIRONMENT_ID = 'mzcompose-us-east-1-00000000-0000-0000-0000-000000000000-0'
def bootstrap_cluster_replica_size() -> str:
267def bootstrap_cluster_replica_size() -> str:
268    return "bootstrap"
def cluster_replica_size_map() -> dict[str, dict[str, typing.Any]]:
271def cluster_replica_size_map() -> dict[str, dict[str, Any]]:
272    def replica_size(
273        workers: int,
274        scale: int,
275        disabled: bool = False,
276        is_cc: bool = True,
277        memory_limit: str | None = None,
278    ) -> dict[str, Any]:
279        return {
280            "cpu_exclusive": False,
281            "cpu_limit": None,
282            "credits_per_hour": f"{workers * scale}",
283            "disabled": disabled,
284            "disk_limit": None,
285            "is_cc": is_cc,
286            "memory_limit": memory_limit or "4Gi",
287            "scale": scale,
288            "workers": workers,
289            # "selectors": {},
290        }
291
292    replica_sizes = {
293        bootstrap_cluster_replica_size(): replica_size(1, 1),
294        "2-4": replica_size(4, 2),
295        "free": replica_size(0, 0, disabled=True),
296        "1cc": replica_size(1, 1),
297        "1C": replica_size(1, 1),
298        "1-no-disk": replica_size(1, 1, is_cc=False),
299        "2-no-disk": replica_size(2, 1, is_cc=False),
300    }
301
302    for i in range(0, 6):
303        workers = 1 << i
304        replica_sizes[f"{workers}"] = replica_size(workers, 1)
305        for mem in [4, 8, 16, 32]:
306            replica_sizes[f"{workers}-{mem}G"] = replica_size(
307                workers, 1, memory_limit=f"{mem} GiB"
308            )
309
310        replica_sizes[f"{workers}-1"] = replica_size(1, workers)
311        replica_sizes[f"{workers}-{workers}"] = replica_size(workers, workers)
312        replica_sizes[f"mem-{workers}"] = replica_size(
313            workers, 1, memory_limit=f"{workers} GiB"
314        )
315
316    return replica_sizes