misc.python.materialize.mzcompose
The implementation of the mzcompose system for Docker compositions.
For an overview of what mzcompose is and why it exists, see the user-facing documentation.
1# Copyright Materialize, Inc. and contributors. All rights reserved. 2# 3# Use of this software is governed by the Business Source License 4# included in the LICENSE file at the root of this repository. 5# 6# As of the Change Date specified in that file, in accordance with 7# the Business Source License, use of this software will be governed 8# by the Apache License, Version 2.0. 9 10"""The implementation of the mzcompose system for Docker compositions. 11 12For an overview of what mzcompose is and why it exists, see the [user-facing 13documentation][user-docs]. 14 15[user-docs]: https://github.com/MaterializeInc/materialize/blob/main/doc/developer/mzbuild.md 16""" 17 18import subprocess 19from collections.abc import Iterable 20from typing import Any, Literal, TypeVar 21 22import psycopg 23 24from materialize import spawn, ui 25from materialize.mz_version import MzVersion 26from materialize.ui import UIError 27 28T = TypeVar("T") 29say = ui.speaker("C> ") 30 31 32DEFAULT_CONFLUENT_PLATFORM_VERSION = "7.7.0" 33 34DEFAULT_MZ_VOLUMES = [ 35 "mzdata:/mzdata", 36 "mydata:/var/lib/mysql-files", 37 "tmp:/share/tmp", 38 "scratch:/scratch", 39] 40 41 42# Parameters which disable systems that periodically/unpredictably impact performance 43ADDITIONAL_BENCHMARKING_SYSTEM_PARAMETERS = { 44 "enable_statement_lifecycle_logging": "false", 45 "persist_catalog_force_compaction_fuel": "0", 46 "statement_logging_default_sample_rate": "0", 47 "statement_logging_max_sample_rate": "0", 48 # Default of 128 MB increases memory usage by a lot for some small 49 # performance in benchmarks, see for example FastPathLimit scenario: 55% 50 # more memory, 5% faster 51 "persist_blob_cache_mem_limit_bytes": "1048576", 52 # This would increase the memory usage of many tests, making it harder to 53 # tell small memory increase regressions 54 "persist_blob_cache_scale_with_threads": "false", 55} 56 57 58def get_default_system_parameters( 59 version: MzVersion | None = None, 60 zero_downtime: bool = False, 61 force_source_table_syntax: bool = False, 62) -> dict[str, str]: 63 """For upgrade tests we only want parameters set when all environmentd / 64 clusterd processes have reached a specific version (or higher) 65 """ 66 67 if not version: 68 version = MzVersion.parse_cargo() 69 70 return { 71 # ----- 72 # Unsafe functions 73 "unsafe_enable_unsafe_functions": "true", 74 # ----- 75 # To reduce CRDB load as we are struggling with it in CI (values based on load test environment): 76 "persist_next_listen_batch_retryer_clamp": "16s", 77 "persist_next_listen_batch_retryer_initial_backoff": "100ms", 78 "persist_next_listen_batch_retryer_fixed_sleep": "1200ms", 79 # ----- 80 # Persist internals changes: advance coverage 81 "persist_enable_arrow_lgalloc_noncc_sizes": "true", 82 "persist_enable_s3_lgalloc_noncc_sizes": "true", 83 # ----- 84 # Others (ordered by name) 85 "allow_real_time_recency": "true", 86 "cluster_always_use_disk": "true", 87 "compute_dataflow_max_inflight_bytes": "134217728", # 128 MiB 88 "compute_hydration_concurrency": "2", 89 "compute_replica_expiration_offset": "3d", 90 "compute_apply_column_demands": "true", 91 "disk_cluster_replicas_default": "true", 92 "enable_0dt_deployment": "true" if zero_downtime else "false", 93 "enable_0dt_deployment_panic_after_timeout": "true", 94 "enable_0dt_deployment_sources": ( 95 "true" if version >= MzVersion.parse_mz("v0.132.0-dev") else "false" 96 ), 97 "enable_alter_swap": "true", 98 "enable_columnation_lgalloc": "true", 99 "enable_compute_correction_v2": "true", 100 "enable_compute_logical_backpressure": "true", 101 "enable_connection_validation_syntax": "true", 102 "enable_continual_task_builtins": ( 103 "true" if version > MzVersion.parse_mz("v0.127.0-dev") else "false" 104 ), 105 "enable_continual_task_create": "true", 106 "enable_continual_task_retain": "true", 107 "enable_continual_task_transform": "true", 108 "enable_copy_to_expr": "true", 109 "enable_create_table_from_source": "true", 110 "enable_disk_cluster_replicas": "true", 111 "enable_eager_delta_joins": "true", 112 "enable_envelope_debezium_in_subscribe": "true", 113 "enable_expressions_in_limit_syntax": "true", 114 "enable_introspection_subscribes": "true", 115 "enable_kafka_sink_partition_by": "true", 116 "enable_logical_compaction_window": "true", 117 "enable_multi_worker_storage_persist_sink": "true", 118 "enable_multi_replica_sources": "true", 119 "enable_rbac_checks": "true", 120 "enable_reduce_mfp_fusion": "true", 121 "enable_refresh_every_mvs": "true", 122 "enable_cluster_schedule_refresh": "true", 123 "enable_statement_lifecycle_logging": "true", 124 "unsafe_enable_table_keys": "true", 125 "enable_variadic_left_join_lowering": "true", 126 "enable_worker_core_affinity": "true", 127 "kafka_default_metadata_fetch_interval": "1s", 128 "mysql_offset_known_interval": "1s", 129 "force_source_table_syntax": "true" if force_source_table_syntax else "false", 130 "ore_overflowing_behavior": "panic", 131 "persist_batch_columnar_format": ( 132 "structured" if version > MzVersion.parse_mz("v0.135.0-dev") else "both_v2" 133 ), 134 "persist_batch_delete_enabled": "true", 135 "persist_batch_structured_order": "true", 136 "persist_batch_builder_structured": "true", 137 "persist_batch_structured_key_lower_len": "256", 138 "persist_batch_max_run_len": "4", 139 "persist_catalog_force_compaction_fuel": "1024", 140 "persist_catalog_force_compaction_wait": "1s", 141 "persist_encoding_enable_dictionary": "true", 142 "persist_fast_path_limit": "1000", 143 "persist_fast_path_order": "true", 144 "persist_inline_writes_single_max_bytes": "4096", 145 "persist_inline_writes_total_max_bytes": "1048576", 146 "persist_pubsub_client_enabled": "true", 147 "persist_pubsub_push_diff_enabled": "true", 148 "persist_record_compactions": "true", 149 "persist_record_schema_id": ( 150 "true" if version > MzVersion.parse_mz("v0.127.0-dev") else "false" 151 ), 152 # 16 MiB - large enough to avoid a big perf hit, small enough to get more coverage... 153 "persist_blob_target_size": "16777216", 154 "persist_stats_audit_percent": "100", 155 "persist_use_critical_since_catalog": "true", 156 "persist_use_critical_since_snapshot": "false" if zero_downtime else "true", 157 "persist_use_critical_since_source": "false" if zero_downtime else "true", 158 "persist_part_decode_format": "arrow", 159 "persist_blob_cache_scale_with_threads": "true", 160 "pg_offset_known_interval": "1s", 161 "statement_logging_default_sample_rate": "0.01", 162 "statement_logging_max_sample_rate": "0.01", 163 "storage_reclock_to_latest": "true", 164 "storage_source_decode_fuel": "100000", 165 "storage_statistics_collection_interval": "1000", 166 "storage_statistics_interval": "2000", 167 "storage_use_continual_feedback_upsert": "true", 168 "with_0dt_deployment_max_wait": "1800s", 169 # End of list (ordered by name) 170 } 171 172 173DEFAULT_CRDB_ENVIRONMENT = [ 174 "COCKROACH_ENGINE_MAX_SYNC_DURATION_DEFAULT=120s", 175 "COCKROACH_LOG_MAX_SYNC_DURATION=120s", 176] 177 178 179# TODO(benesch): change to `docker-mzcompose` once v0.39 ships. 180DEFAULT_CLOUD_PROVIDER = "mzcompose" 181DEFAULT_CLOUD_REGION = "us-east-1" 182DEFAULT_ORG_ID = "00000000-0000-0000-0000-000000000000" 183DEFAULT_ORDINAL = "0" 184DEFAULT_MZ_ENVIRONMENT_ID = f"{DEFAULT_CLOUD_PROVIDER}-{DEFAULT_CLOUD_REGION}-{DEFAULT_ORG_ID}-{DEFAULT_ORDINAL}" 185 186 187# TODO(benesch): replace with Docker health checks. 188def _check_tcp( 189 cmd: list[str], host: str, port: int, timeout_secs: int, kind: str = "" 190) -> list[str]: 191 cmd.extend( 192 [ 193 "timeout", 194 str(timeout_secs), 195 "bash", 196 "-c", 197 f"until [ cat < /dev/null > /dev/tcp/{host}/{port} ] ; do sleep 0.1 ; done", 198 ] 199 ) 200 try: 201 spawn.capture(cmd, stderr=subprocess.STDOUT) 202 except subprocess.CalledProcessError as e: 203 ui.log_in_automation( 204 "wait-for-tcp ({}{}:{}): error running {}: {}, stdout:\n{}\nstderr:\n{}".format( 205 kind, host, port, ui.shell_quote(cmd), e, e.stdout, e.stderr 206 ) 207 ) 208 raise 209 return cmd 210 211 212# TODO(benesch): replace with Docker health checks. 213def _wait_for_pg( 214 timeout_secs: int, 215 query: str, 216 dbname: str, 217 port: int, 218 host: str, 219 user: str, 220 password: str | None, 221 expected: Iterable[Any] | Literal["any"], 222 print_result: bool = False, 223 sslmode: str = "disable", 224) -> None: 225 """Wait for a pg-compatible database (includes materialized)""" 226 obfuscated_password = password[0:1] if password is not None else "" 227 args = f"dbname={dbname} host={host} port={port} user={user} password='{obfuscated_password}...'" 228 ui.progress(f"waiting for {args} to handle {query!r}", "C") 229 error = None 230 for remaining in ui.timeout_loop(timeout_secs, tick=0.5): 231 try: 232 conn = psycopg.connect( 233 dbname=dbname, 234 host=host, 235 port=port, 236 user=user, 237 password=password, 238 connect_timeout=1, 239 sslmode=sslmode, 240 ) 241 # The default (autocommit = false) wraps everything in a transaction. 242 conn.autocommit = True 243 with conn.cursor() as cur: 244 cur.execute(query.encode()) 245 if expected == "any" and cur.rowcount == -1: 246 ui.progress(" success!", finish=True) 247 return 248 result = list(cur.fetchall()) 249 if expected == "any" or result == expected: 250 if print_result: 251 say(f"query result: {result}") 252 else: 253 ui.progress(" success!", finish=True) 254 return 255 else: 256 say( 257 f"host={host} port={port} did not return rows matching {expected} got: {result}" 258 ) 259 except Exception as e: 260 ui.progress(f"{e if print_result else ''} {int(remaining)}") 261 error = e 262 ui.progress(finish=True) 263 raise UIError(f"never got correct result for {args}: {error}") 264 265 266def bootstrap_cluster_replica_size() -> str: 267 return "bootstrap" 268 269 270def cluster_replica_size_map() -> dict[str, dict[str, Any]]: 271 def replica_size( 272 workers: int, 273 scale: int, 274 disabled: bool = False, 275 is_cc: bool = True, 276 memory_limit: str | None = None, 277 ) -> dict[str, Any]: 278 return { 279 "cpu_exclusive": False, 280 "cpu_limit": None, 281 "credits_per_hour": f"{workers * scale}", 282 "disabled": disabled, 283 "disk_limit": None, 284 "is_cc": is_cc, 285 "memory_limit": memory_limit or "4Gi", 286 "scale": scale, 287 "workers": workers, 288 # "selectors": {}, 289 } 290 291 replica_sizes = { 292 bootstrap_cluster_replica_size(): replica_size(1, 1), 293 "2-4": replica_size(4, 2), 294 "free": replica_size(0, 0, disabled=True), 295 "1cc": replica_size(1, 1), 296 "1C": replica_size(1, 1), 297 "1-no-disk": replica_size(1, 1, is_cc=False), 298 "2-no-disk": replica_size(2, 1, is_cc=False), 299 } 300 301 for i in range(0, 6): 302 workers = 1 << i 303 replica_sizes[f"{workers}"] = replica_size(workers, 1) 304 for mem in [4, 8, 16, 32]: 305 replica_sizes[f"{workers}-{mem}G"] = replica_size( 306 workers, 1, memory_limit=f"{mem} GiB" 307 ) 308 309 replica_sizes[f"{workers}-1"] = replica_size(1, workers) 310 replica_sizes[f"{workers}-{workers}"] = replica_size(workers, workers) 311 replica_sizes[f"mem-{workers}"] = replica_size( 312 workers, 1, memory_limit=f"{workers} GiB" 313 ) 314 315 return replica_sizes
def
say(msg: str) -> None:
DEFAULT_CONFLUENT_PLATFORM_VERSION =
'7.7.0'
DEFAULT_MZ_VOLUMES =
['mzdata:/mzdata', 'mydata:/var/lib/mysql-files', 'tmp:/share/tmp', 'scratch:/scratch']
ADDITIONAL_BENCHMARKING_SYSTEM_PARAMETERS =
{'enable_statement_lifecycle_logging': 'false', 'persist_catalog_force_compaction_fuel': '0', 'statement_logging_default_sample_rate': '0', 'statement_logging_max_sample_rate': '0', 'persist_blob_cache_mem_limit_bytes': '1048576', 'persist_blob_cache_scale_with_threads': 'false'}
def
get_default_system_parameters( version: materialize.mz_version.MzVersion | None = None, zero_downtime: bool = False, force_source_table_syntax: bool = False) -> dict[str, str]:
59def get_default_system_parameters( 60 version: MzVersion | None = None, 61 zero_downtime: bool = False, 62 force_source_table_syntax: bool = False, 63) -> dict[str, str]: 64 """For upgrade tests we only want parameters set when all environmentd / 65 clusterd processes have reached a specific version (or higher) 66 """ 67 68 if not version: 69 version = MzVersion.parse_cargo() 70 71 return { 72 # ----- 73 # Unsafe functions 74 "unsafe_enable_unsafe_functions": "true", 75 # ----- 76 # To reduce CRDB load as we are struggling with it in CI (values based on load test environment): 77 "persist_next_listen_batch_retryer_clamp": "16s", 78 "persist_next_listen_batch_retryer_initial_backoff": "100ms", 79 "persist_next_listen_batch_retryer_fixed_sleep": "1200ms", 80 # ----- 81 # Persist internals changes: advance coverage 82 "persist_enable_arrow_lgalloc_noncc_sizes": "true", 83 "persist_enable_s3_lgalloc_noncc_sizes": "true", 84 # ----- 85 # Others (ordered by name) 86 "allow_real_time_recency": "true", 87 "cluster_always_use_disk": "true", 88 "compute_dataflow_max_inflight_bytes": "134217728", # 128 MiB 89 "compute_hydration_concurrency": "2", 90 "compute_replica_expiration_offset": "3d", 91 "compute_apply_column_demands": "true", 92 "disk_cluster_replicas_default": "true", 93 "enable_0dt_deployment": "true" if zero_downtime else "false", 94 "enable_0dt_deployment_panic_after_timeout": "true", 95 "enable_0dt_deployment_sources": ( 96 "true" if version >= MzVersion.parse_mz("v0.132.0-dev") else "false" 97 ), 98 "enable_alter_swap": "true", 99 "enable_columnation_lgalloc": "true", 100 "enable_compute_correction_v2": "true", 101 "enable_compute_logical_backpressure": "true", 102 "enable_connection_validation_syntax": "true", 103 "enable_continual_task_builtins": ( 104 "true" if version > MzVersion.parse_mz("v0.127.0-dev") else "false" 105 ), 106 "enable_continual_task_create": "true", 107 "enable_continual_task_retain": "true", 108 "enable_continual_task_transform": "true", 109 "enable_copy_to_expr": "true", 110 "enable_create_table_from_source": "true", 111 "enable_disk_cluster_replicas": "true", 112 "enable_eager_delta_joins": "true", 113 "enable_envelope_debezium_in_subscribe": "true", 114 "enable_expressions_in_limit_syntax": "true", 115 "enable_introspection_subscribes": "true", 116 "enable_kafka_sink_partition_by": "true", 117 "enable_logical_compaction_window": "true", 118 "enable_multi_worker_storage_persist_sink": "true", 119 "enable_multi_replica_sources": "true", 120 "enable_rbac_checks": "true", 121 "enable_reduce_mfp_fusion": "true", 122 "enable_refresh_every_mvs": "true", 123 "enable_cluster_schedule_refresh": "true", 124 "enable_statement_lifecycle_logging": "true", 125 "unsafe_enable_table_keys": "true", 126 "enable_variadic_left_join_lowering": "true", 127 "enable_worker_core_affinity": "true", 128 "kafka_default_metadata_fetch_interval": "1s", 129 "mysql_offset_known_interval": "1s", 130 "force_source_table_syntax": "true" if force_source_table_syntax else "false", 131 "ore_overflowing_behavior": "panic", 132 "persist_batch_columnar_format": ( 133 "structured" if version > MzVersion.parse_mz("v0.135.0-dev") else "both_v2" 134 ), 135 "persist_batch_delete_enabled": "true", 136 "persist_batch_structured_order": "true", 137 "persist_batch_builder_structured": "true", 138 "persist_batch_structured_key_lower_len": "256", 139 "persist_batch_max_run_len": "4", 140 "persist_catalog_force_compaction_fuel": "1024", 141 "persist_catalog_force_compaction_wait": "1s", 142 "persist_encoding_enable_dictionary": "true", 143 "persist_fast_path_limit": "1000", 144 "persist_fast_path_order": "true", 145 "persist_inline_writes_single_max_bytes": "4096", 146 "persist_inline_writes_total_max_bytes": "1048576", 147 "persist_pubsub_client_enabled": "true", 148 "persist_pubsub_push_diff_enabled": "true", 149 "persist_record_compactions": "true", 150 "persist_record_schema_id": ( 151 "true" if version > MzVersion.parse_mz("v0.127.0-dev") else "false" 152 ), 153 # 16 MiB - large enough to avoid a big perf hit, small enough to get more coverage... 154 "persist_blob_target_size": "16777216", 155 "persist_stats_audit_percent": "100", 156 "persist_use_critical_since_catalog": "true", 157 "persist_use_critical_since_snapshot": "false" if zero_downtime else "true", 158 "persist_use_critical_since_source": "false" if zero_downtime else "true", 159 "persist_part_decode_format": "arrow", 160 "persist_blob_cache_scale_with_threads": "true", 161 "pg_offset_known_interval": "1s", 162 "statement_logging_default_sample_rate": "0.01", 163 "statement_logging_max_sample_rate": "0.01", 164 "storage_reclock_to_latest": "true", 165 "storage_source_decode_fuel": "100000", 166 "storage_statistics_collection_interval": "1000", 167 "storage_statistics_interval": "2000", 168 "storage_use_continual_feedback_upsert": "true", 169 "with_0dt_deployment_max_wait": "1800s", 170 # End of list (ordered by name) 171 }
For upgrade tests we only want parameters set when all environmentd / clusterd processes have reached a specific version (or higher)
DEFAULT_CRDB_ENVIRONMENT =
['COCKROACH_ENGINE_MAX_SYNC_DURATION_DEFAULT=120s', 'COCKROACH_LOG_MAX_SYNC_DURATION=120s']
DEFAULT_CLOUD_PROVIDER =
'mzcompose'
DEFAULT_CLOUD_REGION =
'us-east-1'
DEFAULT_ORG_ID =
'00000000-0000-0000-0000-000000000000'
DEFAULT_ORDINAL =
'0'
DEFAULT_MZ_ENVIRONMENT_ID =
'mzcompose-us-east-1-00000000-0000-0000-0000-000000000000-0'
def
bootstrap_cluster_replica_size() -> str:
def
cluster_replica_size_map() -> dict[str, dict[str, typing.Any]]:
271def cluster_replica_size_map() -> dict[str, dict[str, Any]]: 272 def replica_size( 273 workers: int, 274 scale: int, 275 disabled: bool = False, 276 is_cc: bool = True, 277 memory_limit: str | None = None, 278 ) -> dict[str, Any]: 279 return { 280 "cpu_exclusive": False, 281 "cpu_limit": None, 282 "credits_per_hour": f"{workers * scale}", 283 "disabled": disabled, 284 "disk_limit": None, 285 "is_cc": is_cc, 286 "memory_limit": memory_limit or "4Gi", 287 "scale": scale, 288 "workers": workers, 289 # "selectors": {}, 290 } 291 292 replica_sizes = { 293 bootstrap_cluster_replica_size(): replica_size(1, 1), 294 "2-4": replica_size(4, 2), 295 "free": replica_size(0, 0, disabled=True), 296 "1cc": replica_size(1, 1), 297 "1C": replica_size(1, 1), 298 "1-no-disk": replica_size(1, 1, is_cc=False), 299 "2-no-disk": replica_size(2, 1, is_cc=False), 300 } 301 302 for i in range(0, 6): 303 workers = 1 << i 304 replica_sizes[f"{workers}"] = replica_size(workers, 1) 305 for mem in [4, 8, 16, 32]: 306 replica_sizes[f"{workers}-{mem}G"] = replica_size( 307 workers, 1, memory_limit=f"{mem} GiB" 308 ) 309 310 replica_sizes[f"{workers}-1"] = replica_size(1, workers) 311 replica_sizes[f"{workers}-{workers}"] = replica_size(workers, workers) 312 replica_sizes[f"mem-{workers}"] = replica_size( 313 workers, 1, memory_limit=f"{workers} GiB" 314 ) 315 316 return replica_sizes