misc.python.materialize.mzcompose
The implementation of the mzcompose system for Docker compositions.
For an overview of what mzcompose is and why it exists, see the user-facing documentation.
1# Copyright Materialize, Inc. and contributors. All rights reserved. 2# 3# Use of this software is governed by the Business Source License 4# included in the LICENSE file at the root of this repository. 5# 6# As of the Change Date specified in that file, in accordance with 7# the Business Source License, use of this software will be governed 8# by the Apache License, Version 2.0. 9 10"""The implementation of the mzcompose system for Docker compositions. 11 12For an overview of what mzcompose is and why it exists, see the [user-facing 13documentation][user-docs]. 14 15[user-docs]: https://github.com/MaterializeInc/materialize/blob/main/doc/developer/mzbuild.md 16""" 17 18import os 19import random 20import subprocess 21import sys 22from collections.abc import Iterable 23from dataclasses import dataclass 24from typing import Any, Literal, TypeVar 25 26import psycopg 27 28from materialize import spawn, ui 29from materialize.mz_version import MzVersion 30from materialize.ui import UIError 31 32T = TypeVar("T") 33say = ui.speaker("C> ") 34 35 36DEFAULT_CONFLUENT_PLATFORM_VERSION = "7.9.4" 37 38DEFAULT_MZ_VOLUMES = [ 39 "mzdata:/mzdata", 40 "mydata:/var/lib/mysql-files", 41 "tmp:/share/tmp", 42 "scratch:/scratch", 43] 44 45 46# Parameters which disable systems that periodically/unpredictably impact performance 47# We try to keep this empty, so that we benchmark Materialize as we ship it. If 48# a new feature causes benchmarks to become flaky, consider that this can also 49# impact customers' experience and try to find a solution other than disabling 50# the feature here! 51ADDITIONAL_BENCHMARKING_SYSTEM_PARAMETERS = {} 52 53 54def get_minimal_system_parameters( 55 version: MzVersion, 56) -> dict[str, str]: 57 """Settings we need in order to have tests run at all, but otherwise stay 58 with the defaults: not changing performance or increasing coverage.""" 59 60 config = { 61 # ----- 62 # Unsafe functions 63 "unsafe_enable_unsafe_functions": "true", 64 # ----- 65 # Others (ordered by name) 66 "allow_real_time_recency": "true", 67 "constraint_based_timestamp_selection": "verify", # removed from main, keeping it here for old versions 68 "enable_compute_peek_response_stash": "true", 69 "enable_0dt_deployment_panic_after_timeout": "true", 70 "enable_0dt_deployment_sources": ( 71 "true" if version >= MzVersion.parse_mz("v0.132.0-dev") else "false" 72 ), 73 "enable_alter_swap": "true", 74 "enable_cast_elimination": "true", 75 "enable_columnar_lgalloc": "false", 76 "enable_columnation_lgalloc": "false", 77 "enable_compute_correction_v2": "true", 78 "enable_compute_logical_backpressure": "true", 79 "enable_connection_validation_syntax": "true", 80 "enable_continual_task_create": "true", 81 "enable_continual_task_retain": "true", 82 "enable_continual_task_transform": "true", 83 "enable_copy_to_expr": "true", 84 "enable_copy_from_remote": "true", 85 "enable_create_table_from_source": "true", 86 "enable_eager_delta_joins": "true", 87 "enable_envelope_debezium_in_subscribe": "true", 88 "enable_expressions_in_limit_syntax": "true", 89 "enable_iceberg_sink": "true", 90 "enable_introspection_subscribes": "true", 91 "enable_kafka_sink_partition_by": "true", 92 "enable_lgalloc": "false", 93 "enable_load_generator_counter": "true", 94 "enable_logical_compaction_window": "true", 95 "enable_multi_worker_storage_persist_sink": "true", 96 "enable_multi_replica_sources": "true", 97 "enable_rbac_checks": "true", 98 "enable_reduce_mfp_fusion": "true", 99 "enable_refresh_every_mvs": "true", 100 "enable_replacement_materialized_views": "true", 101 "enable_cluster_schedule_refresh": "true", 102 "enable_sql_server_source": "true", 103 "enable_s3_tables_region_check": "false", 104 "enable_statement_lifecycle_logging": "true", 105 "enable_compute_temporal_bucketing": "true", 106 "enable_variadic_left_join_lowering": "true", 107 "enable_worker_core_affinity": "true", 108 "grpc_client_http2_keep_alive_timeout": "5s", 109 "ore_overflowing_behavior": "panic", 110 "unsafe_enable_table_keys": "true", 111 "with_0dt_deployment_max_wait": "1800s", 112 # End of list (ordered by name) 113 } 114 115 if version < MzVersion.parse_mz("v0.163.0-dev"): 116 config["enable_compute_active_dataflow_cancelation"] = "true" 117 118 return config 119 120 121@dataclass 122class VariableSystemParameter: 123 key: str 124 default: str 125 values: list[str] 126 127 128# TODO: The linter should check this too 129def get_variable_system_parameters( 130 version: MzVersion, 131 force_source_table_syntax: bool, 132) -> list[VariableSystemParameter]: 133 """Note: Only the default is tested unless we explicitly select "System Parameters: Random" in trigger-ci. 134 These defaults are applied _after_ applying the settings from `get_minimal_system_parameters`. 135 """ 136 137 return [ 138 # ----- 139 # To reduce CRDB load as we are struggling with it in CI (values based on load test environment): 140 VariableSystemParameter( 141 "persist_next_listen_batch_retryer_clamp", 142 "16s", 143 ["100ms", "1s", "10s", "100s"], 144 ), 145 VariableSystemParameter( 146 "persist_next_listen_batch_retryer_initial_backoff", 147 "100ms", 148 ["10ms", "100ms", "1s", "10s"], 149 ), 150 VariableSystemParameter( 151 "persist_next_listen_batch_retryer_fixed_sleep", 152 "1200ms", 153 ["100ms", "1s", "10s"], 154 ), 155 # ----- 156 # Persist internals changes, advance coverage 157 VariableSystemParameter( 158 "persist_enable_arrow_lgalloc_noncc_sizes", "true", ["true", "false"] 159 ), 160 VariableSystemParameter( 161 "persist_enable_s3_lgalloc_noncc_sizes", "true", ["true", "false"] 162 ), 163 # ----- 164 # Others (ordered by name), 165 VariableSystemParameter( 166 "compute_dataflow_max_inflight_bytes", 167 "134217728", 168 ["1048576", "4194304", "16777216", "67108864"], 169 ), # 128 MiB 170 VariableSystemParameter("compute_hydration_concurrency", "2", ["1", "2", "4"]), 171 VariableSystemParameter( 172 "compute_replica_expiration_offset", "3d", ["3d", "10d"] 173 ), 174 VariableSystemParameter( 175 "compute_apply_column_demands", "true", ["true", "false"] 176 ), 177 VariableSystemParameter( 178 "compute_peek_response_stash_threshold_bytes", 179 # 1 MiB, an in-between value 180 "1048576", 181 # force-enabled, the in-between, and the production value 182 ["0", "1048576", "314572800", "67108864"], 183 ), 184 VariableSystemParameter( 185 "compute_subscribe_snapshot_optimization", 186 "true", 187 ["true", "false"], 188 ), 189 VariableSystemParameter( 190 "enable_cast_elimination", 191 "true", 192 ["true", "false"], 193 ), 194 VariableSystemParameter( 195 "enable_password_auth", 196 "true", 197 ["true", "false"], 198 ), 199 VariableSystemParameter( 200 "enable_frontend_peek_sequencing", 201 "true" if version >= MzVersion.parse_mz("v26.9.0-dev") else "false", 202 ["true", "false"], 203 ), 204 VariableSystemParameter( 205 "default_timestamp_interval", 206 "1s", 207 ["100ms", "1s"], 208 ), 209 VariableSystemParameter( 210 "force_source_table_syntax", 211 "true" if force_source_table_syntax else "false", 212 ["true", "false"] if force_source_table_syntax else ["false"], 213 ), 214 VariableSystemParameter( 215 "persist_batch_columnar_format", 216 "structured" if version > MzVersion.parse_mz("v0.135.0-dev") else "both_v2", 217 ["row", "both_v2", "both", "structured"], 218 ), 219 VariableSystemParameter( 220 "persist_batch_delete_enabled", "true", ["true", "false"] 221 ), 222 VariableSystemParameter( 223 "persist_batch_structured_order", "true", ["true", "false"] 224 ), 225 VariableSystemParameter( 226 "persist_batch_builder_structured", "true", ["true", "false"] 227 ), 228 VariableSystemParameter( 229 "persist_batch_structured_key_lower_len", 230 "256", 231 ["0", "1", "512", "1000"], 232 ), 233 VariableSystemParameter( 234 "persist_batch_max_run_len", "4", ["2", "3", "4", "16"] 235 ), 236 VariableSystemParameter( 237 "persist_catalog_force_compaction_fuel", 238 "1024", 239 ["256", "1024", "4096"], 240 ), 241 VariableSystemParameter( 242 "persist_catalog_force_compaction_wait", 243 "1s", 244 ["100ms", "1s", "10s"], 245 ), 246 VariableSystemParameter( 247 "persist_encoding_enable_dictionary", "true", ["true", "false"] 248 ), 249 VariableSystemParameter( 250 "persist_stats_audit_percent", 251 "100", 252 [ 253 "0", 254 "1", 255 "2", 256 "10", 257 "100", 258 ], 259 ), 260 VariableSystemParameter("persist_stats_audit_panic", "true", ["true", "false"]), 261 VariableSystemParameter( 262 "persist_encoding_enable_dictionary", "true", ["true", "false"] 263 ), 264 VariableSystemParameter( 265 "persist_fast_path_limit", 266 "1000", 267 ["100", "1000", "10000"], 268 ), 269 VariableSystemParameter("persist_fast_path_order", "true", ["true", "false"]), 270 VariableSystemParameter( 271 "persist_gc_use_active_gc", 272 ("true" if version > MzVersion.parse_mz("v0.143.0-dev") else "false"), 273 ( 274 ["true", "false"] 275 if version > MzVersion.parse_mz("v0.127.0-dev") 276 else ["false"] 277 ), 278 ), 279 VariableSystemParameter( 280 "persist_gc_min_versions", 281 "16", 282 ["16", "256", "1024"], 283 ), 284 VariableSystemParameter( 285 "persist_gc_max_versions", 286 "128000", 287 ["256", "128000"], 288 ), 289 VariableSystemParameter( 290 "persist_inline_writes_single_max_bytes", 291 "4096", 292 ["256", "1024", "4096", "16384"], 293 ), 294 VariableSystemParameter( 295 "persist_inline_writes_total_max_bytes", 296 "1048576", 297 ["65536", "262144", "1048576", "4194304"], 298 ), 299 VariableSystemParameter( 300 "persist_pubsub_client_enabled", "true", ["true", "false"] 301 ), 302 VariableSystemParameter( 303 "persist_pubsub_push_diff_enabled", "true", ["true", "false"] 304 ), 305 VariableSystemParameter( 306 "persist_record_compactions", "true", ["true", "false"] 307 ), 308 VariableSystemParameter( 309 "persist_record_schema_id", 310 ("true" if version > MzVersion.parse_mz("v0.127.0-dev") else "false"), 311 ( 312 ["true", "false"] 313 if version > MzVersion.parse_mz("v0.127.0-dev") 314 else ["false"] 315 ), 316 ), 317 VariableSystemParameter( 318 "persist_rollup_use_active_rollup", 319 ("true" if version > MzVersion.parse_mz("v0.143.0-dev") else "false"), 320 ( 321 ["true", "false"] 322 if version > MzVersion.parse_mz("v0.127.0-dev") 323 else ["false"] 324 ), 325 ), 326 # 16 MiB - large enough to avoid a big perf hit, small enough to get more coverage... 327 VariableSystemParameter( 328 "persist_blob_target_size", 329 "16777216", 330 ["4096", "1048576", "16777216", "134217728"], 331 ), 332 # 5 times the default part size - 4 is the bare minimum. 333 VariableSystemParameter( 334 "persist_compaction_memory_bound_bytes", 335 "83886080", 336 ["67108864", "134217728", "536870912", "1073741824"], 337 ), 338 VariableSystemParameter( 339 "persist_enable_incremental_compaction", 340 ("true" if version >= MzVersion.parse_mz("v0.161.0-dev") else "false"), 341 ( 342 ["true", "false"] 343 if version >= MzVersion.parse_mz("v0.161.0-dev") 344 else ["false"] 345 ), 346 ), 347 VariableSystemParameter( 348 "persist_use_critical_since_catalog", "true", ["true", "false"] 349 ), 350 VariableSystemParameter( 351 "persist_use_critical_since_snapshot", 352 "false", # always false, because we always have zero-downtime enabled 353 ["false"], 354 ), 355 VariableSystemParameter( 356 "persist_use_critical_since_source", 357 "false", # always false, because we always have zero-downtime enabled 358 ["false"], 359 ), 360 VariableSystemParameter( 361 "persist_part_decode_format", "arrow", ["arrow", "row_with_validate"] 362 ), 363 VariableSystemParameter( 364 "persist_blob_cache_scale_with_threads", "true", ["true", "false"] 365 ), 366 VariableSystemParameter( 367 "persist_state_update_lease_timeout", "1s", ["0s", "1s", "10s"] 368 ), 369 VariableSystemParameter( 370 "persist_validate_part_bounds_on_read", "false", ["true", "false"] 371 ), 372 VariableSystemParameter( 373 "persist_validate_part_bounds_on_write", "false", ["true", "false"] 374 ), 375 VariableSystemParameter( 376 "statement_logging_default_sample_rate", 377 "1.0", 378 ["0", "0.01", "0.5", "0.99", "1.0"], 379 ), 380 VariableSystemParameter( 381 "statement_logging_max_data_credit", 382 "", 383 ["", "0", "1024", "1048576", "1073741824"], 384 ), 385 VariableSystemParameter( 386 "statement_logging_max_sample_rate", 387 "1.0", 388 ["0", "0.01", "0.5", "0.99", "1.0"], 389 ), 390 VariableSystemParameter( 391 "statement_logging_target_data_rate", 392 "", 393 ["", "0", "1", "1000", "2071", "1000000"], 394 ), 395 VariableSystemParameter("storage_reclock_to_latest", "true", ["true", "false"]), 396 VariableSystemParameter( 397 "storage_source_decode_fuel", 398 "100000", 399 ["10000", "100000", "1000000"], 400 ), 401 VariableSystemParameter( 402 "storage_statistics_collection_interval", 403 "1000", 404 ["100", "1000", "10000"], 405 ), 406 VariableSystemParameter( 407 "storage_statistics_interval", "2000", ["100", "1000", "10000"] 408 ), 409 VariableSystemParameter( 410 "storage_use_continual_feedback_upsert", "true", ["true", "false"] 411 ), 412 # End of list (ordered by name) 413 ] 414 415 416def get_default_system_parameters( 417 version: MzVersion | None = None, 418 force_source_table_syntax: bool = False, 419) -> dict[str, str]: 420 """For upgrade tests we only want parameters set when all environmentd / 421 clusterd processes have reached a specific version (or higher) 422 """ 423 424 if not version: 425 version = MzVersion.parse_cargo() 426 427 params = get_minimal_system_parameters(version) 428 429 system_param_setting = os.getenv("CI_SYSTEM_PARAMETERS", "") 430 variable_params = get_variable_system_parameters(version, force_source_table_syntax) 431 432 if system_param_setting == "": 433 for param in variable_params: 434 params[param.key] = param.default 435 elif system_param_setting == "random": 436 seed = os.getenv("CI_SYSTEM_PARAMETERS_SEED", os.getenv("BUILDKITE_JOB_ID", 1)) 437 rng = random.Random(seed) 438 for param in variable_params: 439 params[param.key] = rng.choice(param.values) 440 print( 441 f"System parameters with seed CI_SYSTEM_PARAMETERS_SEED={seed}: {params}", 442 file=sys.stderr, 443 ) 444 elif system_param_setting == "minimal": 445 pass 446 else: 447 raise ValueError( 448 f"Unknown value for CI_SYSTEM_PARAMETERS: {system_param_setting}" 449 ) 450 451 return params 452 453 454# If you are adding a new config flag in Materialize, consider setting values 455# for it in get_variable_system_parameters if it can be varied in tests. Set it 456# in get_minimal_system_parameters if it's required for tests to succeed at 457# all. Only add it in UNINTERESTING_SYSTEM_PARAMETERS if none of the above 458# apply. 459UNINTERESTING_SYSTEM_PARAMETERS = [ 460 "enable_mz_join_core", 461 "linear_join_yielding", 462 "enable_lgalloc_eager_reclamation", 463 "lgalloc_background_interval", 464 "lgalloc_file_growth_dampener", 465 "lgalloc_local_buffer_bytes", 466 "lgalloc_slow_clear_bytes", 467 "memory_limiter_interval", 468 "memory_limiter_usage_bias", 469 "memory_limiter_burst_factor", 470 "compute_server_maintenance_interval", 471 "compute_dataflow_max_inflight_bytes_cc", 472 "compute_flat_map_fuel", 473 "compute_temporal_bucketing_summary", 474 "consolidating_vec_growth_dampener", 475 "copy_to_s3_parquet_row_group_file_ratio", 476 "copy_to_s3_arrow_builder_buffer_ratio", 477 "copy_to_s3_multipart_part_size_bytes", 478 "enable_replica_targeted_materialized_views", 479 "enable_compute_replica_expiration", 480 "enable_compute_render_fueled_as_specific_collection", 481 "compute_logical_backpressure_max_retained_capabilities", 482 "compute_logical_backpressure_inflight_slack", 483 "persist_fetch_semaphore_cost_adjustment", 484 "persist_fetch_semaphore_permit_adjustment", 485 "persist_optimize_ignored_data_fetch", 486 "persist_pubsub_same_process_delegate_enabled", 487 "persist_pubsub_connect_attempt_timeout", 488 "persist_pubsub_request_timeout", 489 "persist_pubsub_connect_max_backoff", 490 "persist_pubsub_client_sender_channel_size", 491 "persist_pubsub_client_receiver_channel_size", 492 "persist_pubsub_server_connection_channel_size", 493 "persist_pubsub_state_cache_shard_ref_channel_size", 494 "persist_pubsub_reconnect_backoff", 495 "persist_encoding_compression_format", 496 "persist_batch_max_runs", 497 "persist_write_combine_inline_writes", 498 "persist_reader_lease_duration", 499 "persist_consensus_connection_pool_max_size", 500 "persist_consensus_connection_pool_max_wait", 501 "persist_consensus_connection_pool_ttl", 502 "persist_consensus_connection_pool_ttl_stagger", 503 "crdb_connect_timeout", 504 "crdb_tcp_user_timeout", 505 "crdb_keepalives_idle", 506 "crdb_keepalives_interval", 507 "crdb_keepalives_retries", 508 "persist_use_critical_since_txn", 509 "use_global_txn_cache_source", 510 "persist_batch_builder_max_outstanding_parts", 511 "persist_compaction_heuristic_min_inputs", 512 "persist_compaction_heuristic_min_parts", 513 "persist_compaction_heuristic_min_updates", 514 "persist_gc_blob_delete_concurrency_limit", 515 "persist_state_versions_recent_live_diffs_limit", 516 "persist_usage_state_fetch_concurrency_limit", 517 "persist_blob_operation_timeout", 518 "persist_blob_operation_attempt_timeout", 519 "persist_blob_connect_timeout", 520 "persist_blob_read_timeout", 521 "persist_stats_collection_enabled", 522 "persist_stats_filter_enabled", 523 "persist_stats_budget_bytes", 524 "persist_stats_untrimmable_columns_equals", 525 "persist_stats_untrimmable_columns_prefix", 526 "persist_stats_untrimmable_columns_suffix", 527 "persist_expression_cache_force_compaction_fuel", 528 "persist_expression_cache_force_compaction_wait", 529 "persist_blob_cache_mem_limit_bytes", 530 "persist_blob_cache_scale_factor_bytes", 531 "persist_claim_unclaimed_compactions", 532 "persist_claim_compaction_percent", 533 "persist_claim_compaction_min_version", 534 "persist_next_listen_batch_retryer_multiplier", 535 "persist_rollup_threshold", 536 "persist_rollup_fallback_threshold_ms", 537 "persist_gc_fallback_threshold_ms", 538 "persist_compaction_minimum_timeout", 539 "persist_compaction_check_process_flag", 540 "balancerd_sigterm_connection_wait", 541 "balancerd_sigterm_listen_wait", 542 "balancerd_inject_proxy_protocol_header_http", 543 "balancerd_log_filter", 544 "balancerd_opentelemetry_filter", 545 "balancerd_log_filter_defaults", 546 "balancerd_opentelemetry_filter_defaults", 547 "balancerd_sentry_filters", 548 "persist_enable_s3_lgalloc_cc_sizes", 549 "persist_enable_arrow_lgalloc_cc_sizes", 550 "controller_past_generation_replica_cleanup_retry_interval", 551 "wallclock_lag_recording_interval", 552 "wallclock_lag_histogram_period_interval", 553 "enable_timely_zero_copy", 554 "enable_timely_zero_copy_lgalloc", 555 "timely_zero_copy_limit", 556 "arrangement_exert_proportionality", 557 "txn_wal_apply_ensure_schema_match", 558 "persist_txns_data_shard_retryer_initial_backoff", 559 "persist_txns_data_shard_retryer_multiplier", 560 "persist_txns_data_shard_retryer_clamp", 561 "storage_cluster_shutdown_grace_period", 562 "storage_dataflow_delay_sources_past_rehydration", 563 "storage_dataflow_suspendable_sources", 564 "storage_downgrade_since_during_finalization", 565 "replica_metrics_history_retention_interval", 566 "wallclock_lag_history_retention_interval", 567 "wallclock_global_lag_histogram_retention_interval", 568 "kafka_client_id_enrichment_rules", 569 "kafka_poll_max_wait", 570 "kafka_default_aws_privatelink_endpoint_identification_algorithm", 571 "kafka_buffered_event_resize_threshold_elements", 572 "mysql_replication_heartbeat_interval", 573 "postgres_fetch_slot_resume_lsn_interval", 574 "pg_schema_validation_interval", 575 "pg_source_validate_timeline", 576 "sql_server_source_validate_restore_history", 577 "storage_enforce_external_addresses", 578 "storage_upsert_prevent_snapshot_buffering", 579 "storage_rocksdb_use_merge_operator", 580 "storage_upsert_max_snapshot_batch_buffering", 581 "storage_rocksdb_cleanup_tries", 582 "storage_suspend_and_restart_delay", 583 "storage_server_maintenance_interval", 584 "storage_sink_progress_search", 585 "storage_sink_ensure_topic_config", 586 "sql_server_max_lsn_wait", 587 "sql_server_snapshot_progress_report_interval", 588 "sql_server_cdc_cleanup_change_table", 589 "sql_server_cdc_cleanup_change_table_max_deletes", 590 "allow_user_sessions", 591 "with_0dt_deployment_ddl_check_interval", 592 "enable_0dt_caught_up_check", 593 "with_0dt_caught_up_check_allowed_lag", 594 "with_0dt_caught_up_check_cutoff", 595 "enable_0dt_caught_up_replica_status_check", 596 "plan_insights_notice_fast_path_clusters_optimize_duration", 597 "enable_continual_task_builtins", 598 "enable_expression_cache", 599 "mz_metrics_lgalloc_map_refresh_interval", 600 "mz_metrics_lgalloc_refresh_interval", 601 "mz_metrics_rusage_refresh_interval", 602 "compute_peek_response_stash_batch_max_runs", 603 "compute_peek_response_stash_read_batch_size_bytes", 604 "compute_peek_response_stash_read_memory_budget_bytes", 605 "compute_peek_stash_num_batches", 606 "compute_peek_stash_batch_size", 607 "storage_statistics_retention_duration", 608 "enable_paused_cluster_readhold_downgrade", 609 "kafka_retry_backoff", 610 "kafka_retry_backoff_max", 611 "kafka_reconnect_backoff", 612 "kafka_reconnect_backoff_max", 613 "oidc_issuer", 614 "oidc_audience", 615 "oidc_authentication_claim", 616 "enable_mcp_agents", 617 "enable_mcp_observatory", 618 "user_id_pool_batch_size", 619] 620 621 622DEFAULT_CRDB_ENVIRONMENT = [ 623 "COCKROACH_ENGINE_MAX_SYNC_DURATION_DEFAULT=120s", 624 "COCKROACH_LOG_MAX_SYNC_DURATION=120s", 625] 626 627 628# TODO(benesch): change to `docker-mzcompose` once v0.39 ships. 629DEFAULT_CLOUD_PROVIDER = "mzcompose" 630DEFAULT_CLOUD_REGION = "us-east-1" 631DEFAULT_ORG_ID = "00000000-0000-0000-0000-000000000000" 632DEFAULT_ORDINAL = "0" 633DEFAULT_MZ_ENVIRONMENT_ID = f"{DEFAULT_CLOUD_PROVIDER}-{DEFAULT_CLOUD_REGION}-{DEFAULT_ORG_ID}-{DEFAULT_ORDINAL}" 634 635 636# TODO(benesch): replace with Docker health checks. 637def _check_tcp( 638 cmd: list[str], host: str, port: int, timeout_secs: int, kind: str = "" 639) -> list[str]: 640 cmd.extend( 641 [ 642 "timeout", 643 str(timeout_secs), 644 "bash", 645 "-c", 646 f"until [ cat < /dev/null > /dev/tcp/{host}/{port} ] ; do sleep 0.1 ; done", 647 ] 648 ) 649 try: 650 spawn.capture(cmd, stderr=subprocess.STDOUT) 651 except subprocess.CalledProcessError as e: 652 ui.log_in_automation( 653 "wait-for-tcp ({}{}:{}): error running {}: {}, stdout:\n{}\nstderr:\n{}".format( 654 kind, host, port, ui.shell_quote(cmd), e, e.stdout, e.stderr 655 ) 656 ) 657 raise 658 return cmd 659 660 661# TODO(benesch): replace with Docker health checks. 662def _wait_for_pg( 663 timeout_secs: int, 664 query: str, 665 dbname: str, 666 port: int, 667 host: str, 668 user: str, 669 password: str | None, 670 expected: Iterable[Any] | Literal["any"], 671 print_result: bool = False, 672 sslmode: str = "disable", 673) -> None: 674 """Wait for a pg-compatible database (includes materialized)""" 675 obfuscated_password = password[0:1] if password is not None else "" 676 args = f"dbname={dbname} host={host} port={port} user={user} password='{obfuscated_password}...'" 677 ui.progress(f"waiting for {args} to handle {query!r}", "C") 678 error = None 679 for remaining in ui.timeout_loop(timeout_secs, tick=0.5): 680 try: 681 conn = psycopg.connect( 682 dbname=dbname, 683 host=host, 684 port=port, 685 user=user, 686 password=password, 687 connect_timeout=1, 688 sslmode=sslmode, 689 ) 690 # The default (autocommit = false) wraps everything in a transaction. 691 conn.autocommit = True 692 with conn.cursor() as cur: 693 cur.execute(query.encode()) 694 if expected == "any" and cur.rowcount == -1: 695 ui.progress(" success!", finish=True) 696 return 697 result = list(cur.fetchall()) 698 if expected == "any" or result == expected: 699 if print_result: 700 say(f"query result: {result}") 701 else: 702 ui.progress(" success!", finish=True) 703 return 704 else: 705 say( 706 f"host={host} port={port} did not return rows matching {expected} got: {result}" 707 ) 708 except Exception as e: 709 ui.progress(f"{e if print_result else ''} {int(remaining)}") 710 error = e 711 ui.progress(finish=True) 712 raise UIError(f"never got correct result for {args}: {error}") 713 714 715def bootstrap_cluster_replica_size() -> str: 716 return "bootstrap" 717 718 719def cluster_replica_size_map() -> dict[str, dict[str, Any]]: 720 """scale=<n>,workers=<n>[,mem=<n>GiB][,legacy]""" 721 722 def replica_size( 723 scale: int, 724 workers: int, 725 disabled: bool = False, 726 is_cc: bool = True, 727 memory_limit: str = "4 GiB", 728 ) -> dict[str, Any]: 729 return { 730 "cpu_exclusive": False, 731 "cpu_limit": None, 732 "credits_per_hour": f"{workers * scale}", 733 "disabled": disabled, 734 "disk_limit": None, 735 "is_cc": is_cc, 736 "memory_limit": memory_limit, 737 "scale": scale, 738 "workers": workers, 739 # "selectors": {}, 740 } 741 742 replica_sizes = { 743 bootstrap_cluster_replica_size(): replica_size(1, 1), 744 "scale=2,workers=4": replica_size(2, 4), 745 "scale=1,workers=1,legacy": replica_size(1, 1, is_cc=False), 746 "scale=1,workers=2,legacy": replica_size(1, 2, is_cc=False), 747 # Intentionally not following the naming scheme 748 "free": replica_size(1, 1, disabled=True), 749 } 750 751 for i in range(0, 6): 752 workers = 1 << i 753 replica_sizes[f"scale=1,workers={workers}"] = replica_size(1, workers) 754 for mem in [4, 8, 16, 32]: 755 replica_sizes[f"scale=1,workers={workers},mem={mem}GiB"] = replica_size( 756 1, workers, memory_limit=f"{mem} GiB" 757 ) 758 759 replica_sizes[f"scale={workers},workers=1"] = replica_size(workers, 1) 760 replica_sizes[f"scale={workers},workers={workers}"] = replica_size( 761 workers, workers 762 ) 763 replica_sizes[f"scale=1,workers={workers},mem={workers}GiB"] = replica_size( 764 1, workers, memory_limit=f"{workers} GiB" 765 ) 766 767 return replica_sizes
def
say(msg: str) -> None:
DEFAULT_CONFLUENT_PLATFORM_VERSION =
'7.9.4'
DEFAULT_MZ_VOLUMES =
['mzdata:/mzdata', 'mydata:/var/lib/mysql-files', 'tmp:/share/tmp', 'scratch:/scratch']
ADDITIONAL_BENCHMARKING_SYSTEM_PARAMETERS =
{}
def
get_minimal_system_parameters(version: materialize.mz_version.MzVersion) -> dict[str, str]:
55def get_minimal_system_parameters( 56 version: MzVersion, 57) -> dict[str, str]: 58 """Settings we need in order to have tests run at all, but otherwise stay 59 with the defaults: not changing performance or increasing coverage.""" 60 61 config = { 62 # ----- 63 # Unsafe functions 64 "unsafe_enable_unsafe_functions": "true", 65 # ----- 66 # Others (ordered by name) 67 "allow_real_time_recency": "true", 68 "constraint_based_timestamp_selection": "verify", # removed from main, keeping it here for old versions 69 "enable_compute_peek_response_stash": "true", 70 "enable_0dt_deployment_panic_after_timeout": "true", 71 "enable_0dt_deployment_sources": ( 72 "true" if version >= MzVersion.parse_mz("v0.132.0-dev") else "false" 73 ), 74 "enable_alter_swap": "true", 75 "enable_cast_elimination": "true", 76 "enable_columnar_lgalloc": "false", 77 "enable_columnation_lgalloc": "false", 78 "enable_compute_correction_v2": "true", 79 "enable_compute_logical_backpressure": "true", 80 "enable_connection_validation_syntax": "true", 81 "enable_continual_task_create": "true", 82 "enable_continual_task_retain": "true", 83 "enable_continual_task_transform": "true", 84 "enable_copy_to_expr": "true", 85 "enable_copy_from_remote": "true", 86 "enable_create_table_from_source": "true", 87 "enable_eager_delta_joins": "true", 88 "enable_envelope_debezium_in_subscribe": "true", 89 "enable_expressions_in_limit_syntax": "true", 90 "enable_iceberg_sink": "true", 91 "enable_introspection_subscribes": "true", 92 "enable_kafka_sink_partition_by": "true", 93 "enable_lgalloc": "false", 94 "enable_load_generator_counter": "true", 95 "enable_logical_compaction_window": "true", 96 "enable_multi_worker_storage_persist_sink": "true", 97 "enable_multi_replica_sources": "true", 98 "enable_rbac_checks": "true", 99 "enable_reduce_mfp_fusion": "true", 100 "enable_refresh_every_mvs": "true", 101 "enable_replacement_materialized_views": "true", 102 "enable_cluster_schedule_refresh": "true", 103 "enable_sql_server_source": "true", 104 "enable_s3_tables_region_check": "false", 105 "enable_statement_lifecycle_logging": "true", 106 "enable_compute_temporal_bucketing": "true", 107 "enable_variadic_left_join_lowering": "true", 108 "enable_worker_core_affinity": "true", 109 "grpc_client_http2_keep_alive_timeout": "5s", 110 "ore_overflowing_behavior": "panic", 111 "unsafe_enable_table_keys": "true", 112 "with_0dt_deployment_max_wait": "1800s", 113 # End of list (ordered by name) 114 } 115 116 if version < MzVersion.parse_mz("v0.163.0-dev"): 117 config["enable_compute_active_dataflow_cancelation"] = "true" 118 119 return config
Settings we need in order to have tests run at all, but otherwise stay with the defaults: not changing performance or increasing coverage.
@dataclass
class
VariableSystemParameter:
def
get_variable_system_parameters( version: materialize.mz_version.MzVersion, force_source_table_syntax: bool) -> list[VariableSystemParameter]:
130def get_variable_system_parameters( 131 version: MzVersion, 132 force_source_table_syntax: bool, 133) -> list[VariableSystemParameter]: 134 """Note: Only the default is tested unless we explicitly select "System Parameters: Random" in trigger-ci. 135 These defaults are applied _after_ applying the settings from `get_minimal_system_parameters`. 136 """ 137 138 return [ 139 # ----- 140 # To reduce CRDB load as we are struggling with it in CI (values based on load test environment): 141 VariableSystemParameter( 142 "persist_next_listen_batch_retryer_clamp", 143 "16s", 144 ["100ms", "1s", "10s", "100s"], 145 ), 146 VariableSystemParameter( 147 "persist_next_listen_batch_retryer_initial_backoff", 148 "100ms", 149 ["10ms", "100ms", "1s", "10s"], 150 ), 151 VariableSystemParameter( 152 "persist_next_listen_batch_retryer_fixed_sleep", 153 "1200ms", 154 ["100ms", "1s", "10s"], 155 ), 156 # ----- 157 # Persist internals changes, advance coverage 158 VariableSystemParameter( 159 "persist_enable_arrow_lgalloc_noncc_sizes", "true", ["true", "false"] 160 ), 161 VariableSystemParameter( 162 "persist_enable_s3_lgalloc_noncc_sizes", "true", ["true", "false"] 163 ), 164 # ----- 165 # Others (ordered by name), 166 VariableSystemParameter( 167 "compute_dataflow_max_inflight_bytes", 168 "134217728", 169 ["1048576", "4194304", "16777216", "67108864"], 170 ), # 128 MiB 171 VariableSystemParameter("compute_hydration_concurrency", "2", ["1", "2", "4"]), 172 VariableSystemParameter( 173 "compute_replica_expiration_offset", "3d", ["3d", "10d"] 174 ), 175 VariableSystemParameter( 176 "compute_apply_column_demands", "true", ["true", "false"] 177 ), 178 VariableSystemParameter( 179 "compute_peek_response_stash_threshold_bytes", 180 # 1 MiB, an in-between value 181 "1048576", 182 # force-enabled, the in-between, and the production value 183 ["0", "1048576", "314572800", "67108864"], 184 ), 185 VariableSystemParameter( 186 "compute_subscribe_snapshot_optimization", 187 "true", 188 ["true", "false"], 189 ), 190 VariableSystemParameter( 191 "enable_cast_elimination", 192 "true", 193 ["true", "false"], 194 ), 195 VariableSystemParameter( 196 "enable_password_auth", 197 "true", 198 ["true", "false"], 199 ), 200 VariableSystemParameter( 201 "enable_frontend_peek_sequencing", 202 "true" if version >= MzVersion.parse_mz("v26.9.0-dev") else "false", 203 ["true", "false"], 204 ), 205 VariableSystemParameter( 206 "default_timestamp_interval", 207 "1s", 208 ["100ms", "1s"], 209 ), 210 VariableSystemParameter( 211 "force_source_table_syntax", 212 "true" if force_source_table_syntax else "false", 213 ["true", "false"] if force_source_table_syntax else ["false"], 214 ), 215 VariableSystemParameter( 216 "persist_batch_columnar_format", 217 "structured" if version > MzVersion.parse_mz("v0.135.0-dev") else "both_v2", 218 ["row", "both_v2", "both", "structured"], 219 ), 220 VariableSystemParameter( 221 "persist_batch_delete_enabled", "true", ["true", "false"] 222 ), 223 VariableSystemParameter( 224 "persist_batch_structured_order", "true", ["true", "false"] 225 ), 226 VariableSystemParameter( 227 "persist_batch_builder_structured", "true", ["true", "false"] 228 ), 229 VariableSystemParameter( 230 "persist_batch_structured_key_lower_len", 231 "256", 232 ["0", "1", "512", "1000"], 233 ), 234 VariableSystemParameter( 235 "persist_batch_max_run_len", "4", ["2", "3", "4", "16"] 236 ), 237 VariableSystemParameter( 238 "persist_catalog_force_compaction_fuel", 239 "1024", 240 ["256", "1024", "4096"], 241 ), 242 VariableSystemParameter( 243 "persist_catalog_force_compaction_wait", 244 "1s", 245 ["100ms", "1s", "10s"], 246 ), 247 VariableSystemParameter( 248 "persist_encoding_enable_dictionary", "true", ["true", "false"] 249 ), 250 VariableSystemParameter( 251 "persist_stats_audit_percent", 252 "100", 253 [ 254 "0", 255 "1", 256 "2", 257 "10", 258 "100", 259 ], 260 ), 261 VariableSystemParameter("persist_stats_audit_panic", "true", ["true", "false"]), 262 VariableSystemParameter( 263 "persist_encoding_enable_dictionary", "true", ["true", "false"] 264 ), 265 VariableSystemParameter( 266 "persist_fast_path_limit", 267 "1000", 268 ["100", "1000", "10000"], 269 ), 270 VariableSystemParameter("persist_fast_path_order", "true", ["true", "false"]), 271 VariableSystemParameter( 272 "persist_gc_use_active_gc", 273 ("true" if version > MzVersion.parse_mz("v0.143.0-dev") else "false"), 274 ( 275 ["true", "false"] 276 if version > MzVersion.parse_mz("v0.127.0-dev") 277 else ["false"] 278 ), 279 ), 280 VariableSystemParameter( 281 "persist_gc_min_versions", 282 "16", 283 ["16", "256", "1024"], 284 ), 285 VariableSystemParameter( 286 "persist_gc_max_versions", 287 "128000", 288 ["256", "128000"], 289 ), 290 VariableSystemParameter( 291 "persist_inline_writes_single_max_bytes", 292 "4096", 293 ["256", "1024", "4096", "16384"], 294 ), 295 VariableSystemParameter( 296 "persist_inline_writes_total_max_bytes", 297 "1048576", 298 ["65536", "262144", "1048576", "4194304"], 299 ), 300 VariableSystemParameter( 301 "persist_pubsub_client_enabled", "true", ["true", "false"] 302 ), 303 VariableSystemParameter( 304 "persist_pubsub_push_diff_enabled", "true", ["true", "false"] 305 ), 306 VariableSystemParameter( 307 "persist_record_compactions", "true", ["true", "false"] 308 ), 309 VariableSystemParameter( 310 "persist_record_schema_id", 311 ("true" if version > MzVersion.parse_mz("v0.127.0-dev") else "false"), 312 ( 313 ["true", "false"] 314 if version > MzVersion.parse_mz("v0.127.0-dev") 315 else ["false"] 316 ), 317 ), 318 VariableSystemParameter( 319 "persist_rollup_use_active_rollup", 320 ("true" if version > MzVersion.parse_mz("v0.143.0-dev") else "false"), 321 ( 322 ["true", "false"] 323 if version > MzVersion.parse_mz("v0.127.0-dev") 324 else ["false"] 325 ), 326 ), 327 # 16 MiB - large enough to avoid a big perf hit, small enough to get more coverage... 328 VariableSystemParameter( 329 "persist_blob_target_size", 330 "16777216", 331 ["4096", "1048576", "16777216", "134217728"], 332 ), 333 # 5 times the default part size - 4 is the bare minimum. 334 VariableSystemParameter( 335 "persist_compaction_memory_bound_bytes", 336 "83886080", 337 ["67108864", "134217728", "536870912", "1073741824"], 338 ), 339 VariableSystemParameter( 340 "persist_enable_incremental_compaction", 341 ("true" if version >= MzVersion.parse_mz("v0.161.0-dev") else "false"), 342 ( 343 ["true", "false"] 344 if version >= MzVersion.parse_mz("v0.161.0-dev") 345 else ["false"] 346 ), 347 ), 348 VariableSystemParameter( 349 "persist_use_critical_since_catalog", "true", ["true", "false"] 350 ), 351 VariableSystemParameter( 352 "persist_use_critical_since_snapshot", 353 "false", # always false, because we always have zero-downtime enabled 354 ["false"], 355 ), 356 VariableSystemParameter( 357 "persist_use_critical_since_source", 358 "false", # always false, because we always have zero-downtime enabled 359 ["false"], 360 ), 361 VariableSystemParameter( 362 "persist_part_decode_format", "arrow", ["arrow", "row_with_validate"] 363 ), 364 VariableSystemParameter( 365 "persist_blob_cache_scale_with_threads", "true", ["true", "false"] 366 ), 367 VariableSystemParameter( 368 "persist_state_update_lease_timeout", "1s", ["0s", "1s", "10s"] 369 ), 370 VariableSystemParameter( 371 "persist_validate_part_bounds_on_read", "false", ["true", "false"] 372 ), 373 VariableSystemParameter( 374 "persist_validate_part_bounds_on_write", "false", ["true", "false"] 375 ), 376 VariableSystemParameter( 377 "statement_logging_default_sample_rate", 378 "1.0", 379 ["0", "0.01", "0.5", "0.99", "1.0"], 380 ), 381 VariableSystemParameter( 382 "statement_logging_max_data_credit", 383 "", 384 ["", "0", "1024", "1048576", "1073741824"], 385 ), 386 VariableSystemParameter( 387 "statement_logging_max_sample_rate", 388 "1.0", 389 ["0", "0.01", "0.5", "0.99", "1.0"], 390 ), 391 VariableSystemParameter( 392 "statement_logging_target_data_rate", 393 "", 394 ["", "0", "1", "1000", "2071", "1000000"], 395 ), 396 VariableSystemParameter("storage_reclock_to_latest", "true", ["true", "false"]), 397 VariableSystemParameter( 398 "storage_source_decode_fuel", 399 "100000", 400 ["10000", "100000", "1000000"], 401 ), 402 VariableSystemParameter( 403 "storage_statistics_collection_interval", 404 "1000", 405 ["100", "1000", "10000"], 406 ), 407 VariableSystemParameter( 408 "storage_statistics_interval", "2000", ["100", "1000", "10000"] 409 ), 410 VariableSystemParameter( 411 "storage_use_continual_feedback_upsert", "true", ["true", "false"] 412 ), 413 # End of list (ordered by name) 414 ]
Note: Only the default is tested unless we explicitly select "System Parameters: Random" in trigger-ci.
These defaults are applied _after_ applying the settings from get_minimal_system_parameters.
def
get_default_system_parameters( version: materialize.mz_version.MzVersion | None = None, force_source_table_syntax: bool = False) -> dict[str, str]:
417def get_default_system_parameters( 418 version: MzVersion | None = None, 419 force_source_table_syntax: bool = False, 420) -> dict[str, str]: 421 """For upgrade tests we only want parameters set when all environmentd / 422 clusterd processes have reached a specific version (or higher) 423 """ 424 425 if not version: 426 version = MzVersion.parse_cargo() 427 428 params = get_minimal_system_parameters(version) 429 430 system_param_setting = os.getenv("CI_SYSTEM_PARAMETERS", "") 431 variable_params = get_variable_system_parameters(version, force_source_table_syntax) 432 433 if system_param_setting == "": 434 for param in variable_params: 435 params[param.key] = param.default 436 elif system_param_setting == "random": 437 seed = os.getenv("CI_SYSTEM_PARAMETERS_SEED", os.getenv("BUILDKITE_JOB_ID", 1)) 438 rng = random.Random(seed) 439 for param in variable_params: 440 params[param.key] = rng.choice(param.values) 441 print( 442 f"System parameters with seed CI_SYSTEM_PARAMETERS_SEED={seed}: {params}", 443 file=sys.stderr, 444 ) 445 elif system_param_setting == "minimal": 446 pass 447 else: 448 raise ValueError( 449 f"Unknown value for CI_SYSTEM_PARAMETERS: {system_param_setting}" 450 ) 451 452 return params
For upgrade tests we only want parameters set when all environmentd / clusterd processes have reached a specific version (or higher)
UNINTERESTING_SYSTEM_PARAMETERS =
['enable_mz_join_core', 'linear_join_yielding', 'enable_lgalloc_eager_reclamation', 'lgalloc_background_interval', 'lgalloc_file_growth_dampener', 'lgalloc_local_buffer_bytes', 'lgalloc_slow_clear_bytes', 'memory_limiter_interval', 'memory_limiter_usage_bias', 'memory_limiter_burst_factor', 'compute_server_maintenance_interval', 'compute_dataflow_max_inflight_bytes_cc', 'compute_flat_map_fuel', 'compute_temporal_bucketing_summary', 'consolidating_vec_growth_dampener', 'copy_to_s3_parquet_row_group_file_ratio', 'copy_to_s3_arrow_builder_buffer_ratio', 'copy_to_s3_multipart_part_size_bytes', 'enable_replica_targeted_materialized_views', 'enable_compute_replica_expiration', 'enable_compute_render_fueled_as_specific_collection', 'compute_logical_backpressure_max_retained_capabilities', 'compute_logical_backpressure_inflight_slack', 'persist_fetch_semaphore_cost_adjustment', 'persist_fetch_semaphore_permit_adjustment', 'persist_optimize_ignored_data_fetch', 'persist_pubsub_same_process_delegate_enabled', 'persist_pubsub_connect_attempt_timeout', 'persist_pubsub_request_timeout', 'persist_pubsub_connect_max_backoff', 'persist_pubsub_client_sender_channel_size', 'persist_pubsub_client_receiver_channel_size', 'persist_pubsub_server_connection_channel_size', 'persist_pubsub_state_cache_shard_ref_channel_size', 'persist_pubsub_reconnect_backoff', 'persist_encoding_compression_format', 'persist_batch_max_runs', 'persist_write_combine_inline_writes', 'persist_reader_lease_duration', 'persist_consensus_connection_pool_max_size', 'persist_consensus_connection_pool_max_wait', 'persist_consensus_connection_pool_ttl', 'persist_consensus_connection_pool_ttl_stagger', 'crdb_connect_timeout', 'crdb_tcp_user_timeout', 'crdb_keepalives_idle', 'crdb_keepalives_interval', 'crdb_keepalives_retries', 'persist_use_critical_since_txn', 'use_global_txn_cache_source', 'persist_batch_builder_max_outstanding_parts', 'persist_compaction_heuristic_min_inputs', 'persist_compaction_heuristic_min_parts', 'persist_compaction_heuristic_min_updates', 'persist_gc_blob_delete_concurrency_limit', 'persist_state_versions_recent_live_diffs_limit', 'persist_usage_state_fetch_concurrency_limit', 'persist_blob_operation_timeout', 'persist_blob_operation_attempt_timeout', 'persist_blob_connect_timeout', 'persist_blob_read_timeout', 'persist_stats_collection_enabled', 'persist_stats_filter_enabled', 'persist_stats_budget_bytes', 'persist_stats_untrimmable_columns_equals', 'persist_stats_untrimmable_columns_prefix', 'persist_stats_untrimmable_columns_suffix', 'persist_expression_cache_force_compaction_fuel', 'persist_expression_cache_force_compaction_wait', 'persist_blob_cache_mem_limit_bytes', 'persist_blob_cache_scale_factor_bytes', 'persist_claim_unclaimed_compactions', 'persist_claim_compaction_percent', 'persist_claim_compaction_min_version', 'persist_next_listen_batch_retryer_multiplier', 'persist_rollup_threshold', 'persist_rollup_fallback_threshold_ms', 'persist_gc_fallback_threshold_ms', 'persist_compaction_minimum_timeout', 'persist_compaction_check_process_flag', 'balancerd_sigterm_connection_wait', 'balancerd_sigterm_listen_wait', 'balancerd_inject_proxy_protocol_header_http', 'balancerd_log_filter', 'balancerd_opentelemetry_filter', 'balancerd_log_filter_defaults', 'balancerd_opentelemetry_filter_defaults', 'balancerd_sentry_filters', 'persist_enable_s3_lgalloc_cc_sizes', 'persist_enable_arrow_lgalloc_cc_sizes', 'controller_past_generation_replica_cleanup_retry_interval', 'wallclock_lag_recording_interval', 'wallclock_lag_histogram_period_interval', 'enable_timely_zero_copy', 'enable_timely_zero_copy_lgalloc', 'timely_zero_copy_limit', 'arrangement_exert_proportionality', 'txn_wal_apply_ensure_schema_match', 'persist_txns_data_shard_retryer_initial_backoff', 'persist_txns_data_shard_retryer_multiplier', 'persist_txns_data_shard_retryer_clamp', 'storage_cluster_shutdown_grace_period', 'storage_dataflow_delay_sources_past_rehydration', 'storage_dataflow_suspendable_sources', 'storage_downgrade_since_during_finalization', 'replica_metrics_history_retention_interval', 'wallclock_lag_history_retention_interval', 'wallclock_global_lag_histogram_retention_interval', 'kafka_client_id_enrichment_rules', 'kafka_poll_max_wait', 'kafka_default_aws_privatelink_endpoint_identification_algorithm', 'kafka_buffered_event_resize_threshold_elements', 'mysql_replication_heartbeat_interval', 'postgres_fetch_slot_resume_lsn_interval', 'pg_schema_validation_interval', 'pg_source_validate_timeline', 'sql_server_source_validate_restore_history', 'storage_enforce_external_addresses', 'storage_upsert_prevent_snapshot_buffering', 'storage_rocksdb_use_merge_operator', 'storage_upsert_max_snapshot_batch_buffering', 'storage_rocksdb_cleanup_tries', 'storage_suspend_and_restart_delay', 'storage_server_maintenance_interval', 'storage_sink_progress_search', 'storage_sink_ensure_topic_config', 'sql_server_max_lsn_wait', 'sql_server_snapshot_progress_report_interval', 'sql_server_cdc_cleanup_change_table', 'sql_server_cdc_cleanup_change_table_max_deletes', 'allow_user_sessions', 'with_0dt_deployment_ddl_check_interval', 'enable_0dt_caught_up_check', 'with_0dt_caught_up_check_allowed_lag', 'with_0dt_caught_up_check_cutoff', 'enable_0dt_caught_up_replica_status_check', 'plan_insights_notice_fast_path_clusters_optimize_duration', 'enable_continual_task_builtins', 'enable_expression_cache', 'mz_metrics_lgalloc_map_refresh_interval', 'mz_metrics_lgalloc_refresh_interval', 'mz_metrics_rusage_refresh_interval', 'compute_peek_response_stash_batch_max_runs', 'compute_peek_response_stash_read_batch_size_bytes', 'compute_peek_response_stash_read_memory_budget_bytes', 'compute_peek_stash_num_batches', 'compute_peek_stash_batch_size', 'storage_statistics_retention_duration', 'enable_paused_cluster_readhold_downgrade', 'kafka_retry_backoff', 'kafka_retry_backoff_max', 'kafka_reconnect_backoff', 'kafka_reconnect_backoff_max', 'oidc_issuer', 'oidc_audience', 'oidc_authentication_claim', 'enable_mcp_agents', 'enable_mcp_observatory', 'user_id_pool_batch_size']
DEFAULT_CRDB_ENVIRONMENT =
['COCKROACH_ENGINE_MAX_SYNC_DURATION_DEFAULT=120s', 'COCKROACH_LOG_MAX_SYNC_DURATION=120s']
DEFAULT_CLOUD_PROVIDER =
'mzcompose'
DEFAULT_CLOUD_REGION =
'us-east-1'
DEFAULT_ORG_ID =
'00000000-0000-0000-0000-000000000000'
DEFAULT_ORDINAL =
'0'
DEFAULT_MZ_ENVIRONMENT_ID =
'mzcompose-us-east-1-00000000-0000-0000-0000-000000000000-0'
def
bootstrap_cluster_replica_size() -> str:
def
cluster_replica_size_map() -> dict[str, dict[str, typing.Any]]:
720def cluster_replica_size_map() -> dict[str, dict[str, Any]]: 721 """scale=<n>,workers=<n>[,mem=<n>GiB][,legacy]""" 722 723 def replica_size( 724 scale: int, 725 workers: int, 726 disabled: bool = False, 727 is_cc: bool = True, 728 memory_limit: str = "4 GiB", 729 ) -> dict[str, Any]: 730 return { 731 "cpu_exclusive": False, 732 "cpu_limit": None, 733 "credits_per_hour": f"{workers * scale}", 734 "disabled": disabled, 735 "disk_limit": None, 736 "is_cc": is_cc, 737 "memory_limit": memory_limit, 738 "scale": scale, 739 "workers": workers, 740 # "selectors": {}, 741 } 742 743 replica_sizes = { 744 bootstrap_cluster_replica_size(): replica_size(1, 1), 745 "scale=2,workers=4": replica_size(2, 4), 746 "scale=1,workers=1,legacy": replica_size(1, 1, is_cc=False), 747 "scale=1,workers=2,legacy": replica_size(1, 2, is_cc=False), 748 # Intentionally not following the naming scheme 749 "free": replica_size(1, 1, disabled=True), 750 } 751 752 for i in range(0, 6): 753 workers = 1 << i 754 replica_sizes[f"scale=1,workers={workers}"] = replica_size(1, workers) 755 for mem in [4, 8, 16, 32]: 756 replica_sizes[f"scale=1,workers={workers},mem={mem}GiB"] = replica_size( 757 1, workers, memory_limit=f"{mem} GiB" 758 ) 759 760 replica_sizes[f"scale={workers},workers=1"] = replica_size(workers, 1) 761 replica_sizes[f"scale={workers},workers={workers}"] = replica_size( 762 workers, workers 763 ) 764 replica_sizes[f"scale=1,workers={workers},mem={workers}GiB"] = replica_size( 765 1, workers, memory_limit=f"{workers} GiB" 766 ) 767 768 return replica_sizes
scale=