misc.python.materialize.mzcompose
The implementation of the mzcompose system for Docker compositions.
For an overview of what mzcompose is and why it exists, see the user-facing documentation.
1# Copyright Materialize, Inc. and contributors. All rights reserved. 2# 3# Use of this software is governed by the Business Source License 4# included in the LICENSE file at the root of this repository. 5# 6# As of the Change Date specified in that file, in accordance with 7# the Business Source License, use of this software will be governed 8# by the Apache License, Version 2.0. 9 10"""The implementation of the mzcompose system for Docker compositions. 11 12For an overview of what mzcompose is and why it exists, see the [user-facing 13documentation][user-docs]. 14 15[user-docs]: https://github.com/MaterializeInc/materialize/blob/main/doc/developer/mzbuild.md 16""" 17 18import os 19import random 20import subprocess 21import sys 22from collections.abc import Iterable 23from dataclasses import dataclass 24from typing import Any, Literal, TypeVar 25 26import psycopg 27 28from materialize import spawn, ui 29from materialize.mz_version import MzVersion 30from materialize.ui import UIError 31 32T = TypeVar("T") 33say = ui.speaker("C> ") 34 35 36DEFAULT_CONFLUENT_PLATFORM_VERSION = "8.2.0" 37 38DEFAULT_MZ_VOLUMES = [ 39 "mzdata:/mzdata", 40 "mydata:/var/lib/mysql-files", 41 "tmp:/share/tmp", 42 "scratch:/scratch", 43] 44 45 46# Parameters which disable systems that periodically/unpredictably impact performance 47# We try to keep this empty, so that we benchmark Materialize as we ship it. If 48# a new feature causes benchmarks to become flaky, consider that this can also 49# impact customers' experience and try to find a solution other than disabling 50# the feature here! 51ADDITIONAL_BENCHMARKING_SYSTEM_PARAMETERS = {} 52 53 54def get_minimal_system_parameters( 55 version: MzVersion, 56) -> dict[str, str]: 57 """Settings we need in order to have tests run at all, but otherwise stay 58 with the defaults: not changing performance or increasing coverage.""" 59 60 config = { 61 # ----- 62 # Unsafe functions 63 "unsafe_enable_unsafe_functions": "true", 64 # ----- 65 # Others (ordered by name) 66 "allow_real_time_recency": "true", 67 "constraint_based_timestamp_selection": "verify", # removed from main, keeping it here for old versions 68 "enable_compute_peek_response_stash": "true", 69 "enable_0dt_deployment_panic_after_timeout": "true", 70 "enable_0dt_deployment_sources": ( 71 "true" if version >= MzVersion.parse_mz("v0.132.0-dev") else "false" 72 ), 73 "enable_alter_swap": "true", 74 "enable_arrangement_dictionary_compression_alpha": "false", 75 "enable_case_literal_transform": "true", 76 "enable_cast_elimination": "true", 77 "enable_coalesce_case_transform": "true", 78 "enable_columnar_lgalloc": "false", 79 "enable_columnation_lgalloc": "false", 80 "enable_compute_correction_v2": "true", 81 "enable_compute_logical_backpressure": "true", 82 "enable_connection_validation_syntax": "true", 83 "enable_create_table_from_source": "true", 84 "enable_eager_delta_joins": "true", 85 "enable_envelope_debezium_in_subscribe": "true", 86 "enable_expressions_in_limit_syntax": "true", 87 "enable_introspection_subscribes": "true", 88 "enable_kafka_sink_partition_by": "true", 89 "enable_lgalloc": "false", 90 "enable_load_generator_counter": "true", 91 "enable_logical_compaction_window": "true", 92 "enable_multi_worker_storage_persist_sink": "true", 93 "enable_multi_replica_sources": "true", 94 "enable_rbac_checks": "true", 95 "enable_reduce_mfp_fusion": "true", 96 "enable_refresh_every_mvs": "true", 97 "enable_replacement_materialized_views": "true", 98 "enable_cluster_schedule_refresh": "true", 99 "enable_s3_tables_region_check": "false", 100 "enable_statement_lifecycle_logging": "true", 101 "enable_storage_introspection_logs": "true", 102 "enable_compute_temporal_bucketing": "true", 103 "enable_variadic_left_join_lowering": "true", 104 "enable_worker_core_affinity": "true", 105 "grpc_client_http2_keep_alive_timeout": "5s", 106 "ore_overflowing_behavior": "panic", 107 "unsafe_enable_table_keys": "true", 108 "with_0dt_deployment_max_wait": "1800s", 109 # End of list (ordered by name) 110 } 111 112 if version < MzVersion.parse_mz("v0.163.0-dev"): 113 config["enable_compute_active_dataflow_cancelation"] = "true" 114 115 return config 116 117 118@dataclass 119class VariableSystemParameter: 120 key: str 121 default: str 122 values: list[str] 123 124 125# TODO: The linter should check this too 126def get_variable_system_parameters( 127 version: MzVersion, 128 force_source_table_syntax: bool, 129) -> list[VariableSystemParameter]: 130 """Note: Only the default is tested unless we explicitly select "System Parameters: Random" in trigger-ci. 131 These defaults are applied _after_ applying the settings from `get_minimal_system_parameters`. 132 """ 133 134 return [ 135 # ----- 136 # To reduce CRDB load as we are struggling with it in CI (values based on load test environment): 137 VariableSystemParameter( 138 "persist_next_listen_batch_retryer_clamp", 139 "16s", 140 ["100ms", "1s", "10s", "100s"], 141 ), 142 VariableSystemParameter( 143 "persist_next_listen_batch_retryer_initial_backoff", 144 "100ms", 145 ["10ms", "100ms", "1s", "10s"], 146 ), 147 VariableSystemParameter( 148 "persist_next_listen_batch_retryer_fixed_sleep", 149 "1200ms", 150 ["100ms", "1s", "10s"], 151 ), 152 # ----- 153 # Persist internals changes, advance coverage 154 VariableSystemParameter( 155 "persist_enable_arrow_lgalloc_noncc_sizes", "true", ["true", "false"] 156 ), 157 VariableSystemParameter( 158 "persist_enable_s3_lgalloc_noncc_sizes", "true", ["true", "false"] 159 ), 160 # ----- 161 # Others (ordered by name), 162 VariableSystemParameter( 163 "compute_correction_v2_chain_proportionality", 164 "3", 165 ["2", "3"], 166 ), 167 VariableSystemParameter( 168 "compute_correction_v2_chunk_size", 169 "8192", 170 ["8192", "65536", "1048576"], 171 ), 172 VariableSystemParameter( 173 "compute_dataflow_max_inflight_bytes", 174 "134217728", 175 ["1048576", "4194304", "16777216", "67108864"], 176 ), # 128 MiB 177 VariableSystemParameter("compute_hydration_concurrency", "2", ["1", "2", "4"]), 178 VariableSystemParameter( 179 "compute_replica_expiration_offset", "3d", ["3d", "10d"] 180 ), 181 VariableSystemParameter( 182 "compute_apply_column_demands", "true", ["true", "false"] 183 ), 184 VariableSystemParameter( 185 "compute_peek_response_stash_threshold_bytes", 186 # 1 MiB, an in-between value 187 "1048576", 188 # force-enabled, the in-between, and the production value 189 ["0", "1048576", "314572800", "67108864"], 190 ), 191 VariableSystemParameter( 192 "compute_subscribe_snapshot_optimization", 193 "true", 194 ["true", "false"], 195 ), 196 VariableSystemParameter( 197 "enable_case_literal_transform", 198 "false", 199 ["true", "false"], 200 ), 201 VariableSystemParameter( 202 "enable_coalesce_case_transform", 203 "true", 204 ["true", "false"], 205 ), 206 VariableSystemParameter( 207 "enable_cast_elimination", 208 "true", 209 ["true", "false"], 210 ), 211 VariableSystemParameter( 212 "enable_compute_sync_mv_sink", 213 "true", 214 ["true", "false"], 215 ), 216 VariableSystemParameter( 217 "enable_password_auth", 218 "true", 219 ["true", "false"], 220 ), 221 VariableSystemParameter( 222 "enable_frontend_peek_sequencing", 223 "true" if version >= MzVersion.parse_mz("v26.9.0-dev") else "false", 224 ["true", "false"], 225 ), 226 VariableSystemParameter( 227 "enable_frontend_subscribes", 228 "true" if version >= MzVersion.parse_mz("v26.18.0-dev") else "false", 229 ["true", "false"], 230 ), 231 VariableSystemParameter( 232 "enable_upsert_v2", 233 "false", 234 ["true", "false"], 235 ), 236 VariableSystemParameter( 237 "default_timestamp_interval", 238 "1s", 239 ["100ms", "1s"], 240 ), 241 VariableSystemParameter( 242 "force_source_table_syntax", 243 "true" if force_source_table_syntax else "false", 244 ["true", "false"] if force_source_table_syntax else ["false"], 245 ), 246 VariableSystemParameter( 247 "persist_batch_columnar_format", 248 "structured" if version > MzVersion.parse_mz("v0.135.0-dev") else "both_v2", 249 ["row", "both_v2", "both", "structured"], 250 ), 251 VariableSystemParameter( 252 "persist_batch_delete_enabled", "true", ["true", "false"] 253 ), 254 VariableSystemParameter( 255 "persist_batch_structured_order", "true", ["true", "false"] 256 ), 257 VariableSystemParameter( 258 "persist_batch_builder_structured", "true", ["true", "false"] 259 ), 260 VariableSystemParameter( 261 "persist_batch_structured_key_lower_len", 262 "256", 263 ["0", "1", "512", "1000"], 264 ), 265 VariableSystemParameter( 266 "persist_batch_max_run_len", "4", ["2", "3", "4", "16"] 267 ), 268 VariableSystemParameter( 269 "persist_catalog_force_compaction_fuel", 270 "1024", 271 ["256", "1024", "4096"], 272 ), 273 VariableSystemParameter( 274 "persist_catalog_force_compaction_wait", 275 "1s", 276 ["100ms", "1s", "10s"], 277 ), 278 VariableSystemParameter( 279 "persist_stats_audit_percent", 280 "100", 281 [ 282 "0", 283 "1", 284 "2", 285 "10", 286 "100", 287 ], 288 ), 289 VariableSystemParameter("persist_stats_audit_panic", "true", ["true", "false"]), 290 VariableSystemParameter( 291 "persist_encoding_enable_dictionary", "true", ["true", "false"] 292 ), 293 VariableSystemParameter( 294 "persist_fast_path_limit", 295 "1000", 296 ["100", "1000", "10000"], 297 ), 298 VariableSystemParameter("persist_fast_path_order", "true", ["true", "false"]), 299 VariableSystemParameter( 300 "persist_gc_use_active_gc", 301 ("true" if version > MzVersion.parse_mz("v0.143.0-dev") else "false"), 302 ( 303 ["true", "false"] 304 if version > MzVersion.parse_mz("v0.127.0-dev") 305 else ["false"] 306 ), 307 ), 308 VariableSystemParameter( 309 "persist_gc_min_versions", 310 "16", 311 ["16", "256", "1024"], 312 ), 313 VariableSystemParameter( 314 "persist_gc_max_versions", 315 "128000", 316 ["256", "128000"], 317 ), 318 VariableSystemParameter( 319 "persist_inline_writes_single_max_bytes", 320 "4096", 321 ["256", "1024", "4096", "16384"], 322 ), 323 VariableSystemParameter( 324 "persist_inline_writes_total_max_bytes", 325 "1048576", 326 ["65536", "262144", "1048576", "4194304"], 327 ), 328 VariableSystemParameter( 329 "persist_pubsub_client_enabled", "true", ["true", "false"] 330 ), 331 VariableSystemParameter( 332 "persist_pubsub_push_diff_enabled", "true", ["true", "false"] 333 ), 334 VariableSystemParameter( 335 "persist_record_compactions", "true", ["true", "false"] 336 ), 337 VariableSystemParameter( 338 "persist_record_schema_id", 339 ("true" if version > MzVersion.parse_mz("v0.127.0-dev") else "false"), 340 ( 341 ["true", "false"] 342 if version > MzVersion.parse_mz("v0.127.0-dev") 343 else ["false"] 344 ), 345 ), 346 VariableSystemParameter( 347 "persist_rollup_use_active_rollup", 348 ("true" if version > MzVersion.parse_mz("v0.143.0-dev") else "false"), 349 ( 350 ["true", "false"] 351 if version > MzVersion.parse_mz("v0.127.0-dev") 352 else ["false"] 353 ), 354 ), 355 # 16 MiB - large enough to avoid a big perf hit, small enough to get more coverage... 356 VariableSystemParameter( 357 "persist_blob_target_size", 358 "16777216", 359 ["4096", "1048576", "16777216", "134217728"], 360 ), 361 # 5 times the default part size - 4 is the bare minimum. 362 VariableSystemParameter( 363 "persist_compaction_memory_bound_bytes", 364 "83886080", 365 ["67108864", "134217728", "536870912", "1073741824"], 366 ), 367 VariableSystemParameter( 368 "persist_enable_incremental_compaction", 369 ("true" if version >= MzVersion.parse_mz("v0.161.0-dev") else "false"), 370 ( 371 ["true", "false"] 372 if version >= MzVersion.parse_mz("v0.161.0-dev") 373 else ["false"] 374 ), 375 ), 376 VariableSystemParameter( 377 "persist_use_critical_since_catalog", "true", ["true", "false"] 378 ), 379 VariableSystemParameter( 380 "persist_use_critical_since_snapshot", 381 "false", # always false, because we always have zero-downtime enabled 382 ["false"], 383 ), 384 VariableSystemParameter( 385 "persist_use_critical_since_source", 386 "false", # always false, because we always have zero-downtime enabled 387 ["false"], 388 ), 389 VariableSystemParameter( 390 "persist_part_decode_format", "arrow", ["arrow", "row_with_validate"] 391 ), 392 VariableSystemParameter( 393 "persist_blob_cache_scale_with_threads", "true", ["true", "false"] 394 ), 395 VariableSystemParameter( 396 "persist_state_update_lease_timeout", "1s", ["0s", "1s", "10s"] 397 ), 398 VariableSystemParameter( 399 "arrangement_size_history_collection_interval", "1h", ["1s", "10s", "1h"] 400 ), 401 VariableSystemParameter( 402 "arrangement_size_history_retention_period", "7d", ["1min", "1h", "7d"] 403 ), 404 VariableSystemParameter( 405 "persist_validate_part_bounds_on_read", "false", ["true", "false"] 406 ), 407 VariableSystemParameter( 408 "persist_validate_part_bounds_on_write", "false", ["true", "false"] 409 ), 410 VariableSystemParameter( 411 "statement_logging_default_sample_rate", 412 "1.0", 413 ["0", "0.01", "0.5", "0.99", "1.0"], 414 ), 415 VariableSystemParameter( 416 "statement_logging_max_data_credit", 417 "", 418 ["", "0", "1024", "1048576", "1073741824"], 419 ), 420 VariableSystemParameter( 421 "statement_logging_max_sample_rate", 422 "1.0", 423 ["0", "0.01", "0.5", "0.99", "1.0"], 424 ), 425 VariableSystemParameter( 426 "statement_logging_target_data_rate", 427 "", 428 ["", "0", "1", "1000", "2071", "1000000"], 429 ), 430 VariableSystemParameter("storage_reclock_to_latest", "true", ["true", "false"]), 431 VariableSystemParameter( 432 "storage_source_decode_fuel", 433 "100000", 434 ["10000", "100000", "1000000"], 435 ), 436 VariableSystemParameter( 437 "storage_statistics_collection_interval", 438 "1000", 439 ["100", "1000", "10000"], 440 ), 441 VariableSystemParameter( 442 "storage_statistics_interval", "2000", ["100", "1000", "10000"] 443 ), 444 VariableSystemParameter( 445 "storage_use_continual_feedback_upsert", "true", ["true", "false"] 446 ), 447 # End of list (ordered by name) 448 ] 449 450 451def get_default_system_parameters( 452 version: MzVersion | None = None, 453 force_source_table_syntax: bool = False, 454) -> dict[str, str]: 455 """For upgrade tests we only want parameters set when all environmentd / 456 clusterd processes have reached a specific version (or higher) 457 """ 458 459 if not version: 460 version = MzVersion.parse_cargo() 461 462 params = get_minimal_system_parameters(version) 463 464 system_param_setting = os.getenv("CI_SYSTEM_PARAMETERS", "") 465 variable_params = get_variable_system_parameters(version, force_source_table_syntax) 466 467 if system_param_setting == "": 468 for param in variable_params: 469 params[param.key] = param.default 470 elif system_param_setting == "random": 471 seed = os.getenv("CI_SYSTEM_PARAMETERS_SEED", os.getenv("BUILDKITE_JOB_ID", 1)) 472 rng = random.Random(seed) 473 for param in variable_params: 474 params[param.key] = rng.choice(param.values) 475 print( 476 f"System parameters with seed CI_SYSTEM_PARAMETERS_SEED={seed}: {params}", 477 file=sys.stderr, 478 ) 479 elif system_param_setting == "minimal": 480 pass 481 else: 482 raise ValueError( 483 f"Unknown value for CI_SYSTEM_PARAMETERS: {system_param_setting}" 484 ) 485 486 return params 487 488 489# If you are adding a new config flag in Materialize, consider setting values 490# for it in get_variable_system_parameters if it can be varied in tests. Set it 491# in get_minimal_system_parameters if it's required for tests to succeed at 492# all. Only add it in UNINTERESTING_SYSTEM_PARAMETERS if none of the above 493# apply. 494UNINTERESTING_SYSTEM_PARAMETERS = [ 495 "enable_compute_half_join2", 496 "enable_mz_join_core", 497 "linear_join_yielding", 498 "enable_column_paged_batcher", 499 "enable_column_paged_batcher_spill", 500 "column_paged_batcher_budget_fraction", 501 "column_paged_batcher_lz4", 502 "column_paged_batcher_swap_pageout", 503 "enable_upsert_paged_spill", 504 "enable_lgalloc_eager_reclamation", 505 "lgalloc_background_interval", 506 "lgalloc_file_growth_dampener", 507 "lgalloc_local_buffer_bytes", 508 "lgalloc_slow_clear_bytes", 509 "memory_limiter_interval", 510 "memory_limiter_usage_bias", 511 "memory_limiter_burst_factor", 512 "catalog_info_metrics_reconcile_interval", 513 "compute_server_maintenance_interval", 514 "compute_dataflow_max_inflight_bytes_cc", 515 "compute_flat_map_fuel", 516 "compute_temporal_bucketing_summary", 517 "consolidating_vec_growth_dampener", 518 "copy_to_s3_parquet_row_group_file_ratio", 519 "copy_to_s3_arrow_builder_buffer_ratio", 520 "copy_to_s3_multipart_part_size_bytes", 521 "enable_replica_targeted_materialized_views", 522 "compute_mv_sink_advance_persist_frontiers", 523 "compute_prometheus_introspection_scrape_interval", 524 "enable_compute_replica_expiration", 525 "enable_compute_render_fueled_as_specific_collection", 526 "compute_logical_backpressure_max_retained_capabilities", 527 "compute_logical_backpressure_inflight_slack", 528 "persist_fetch_semaphore_cost_adjustment", 529 "persist_fetch_semaphore_permit_adjustment", 530 "persist_optimize_ignored_data_fetch", 531 "persist_pubsub_same_process_delegate_enabled", 532 "persist_pubsub_connect_attempt_timeout", 533 "persist_pubsub_request_timeout", 534 "persist_pubsub_connect_max_backoff", 535 "persist_pubsub_client_sender_channel_size", 536 "persist_pubsub_client_receiver_channel_size", 537 "persist_pubsub_server_connection_channel_size", 538 "persist_pubsub_state_cache_shard_ref_channel_size", 539 "persist_pubsub_reconnect_backoff", 540 "persist_encoding_compression_format", 541 "persist_batch_max_runs", 542 "persist_write_combine_inline_writes", 543 "persist_reader_lease_duration", 544 "persist_consensus_connection_pool_max_size", 545 "persist_consensus_connection_pool_max_wait", 546 "persist_consensus_connection_pool_ttl", 547 "persist_consensus_connection_pool_ttl_stagger", 548 "crdb_connect_timeout", 549 "crdb_tcp_user_timeout", 550 "crdb_keepalives_idle", 551 "crdb_keepalives_interval", 552 "crdb_keepalives_retries", 553 "persist_use_critical_since_txn", 554 "use_global_txn_cache_source", 555 "persist_batch_builder_max_outstanding_parts", 556 "persist_compaction_heuristic_min_inputs", 557 "persist_compaction_heuristic_min_parts", 558 "persist_compaction_heuristic_min_updates", 559 "persist_gc_blob_delete_concurrency_limit", 560 "persist_state_versions_recent_live_diffs_limit", 561 "persist_usage_state_fetch_concurrency_limit", 562 "persist_blob_operation_timeout", 563 "persist_blob_operation_attempt_timeout", 564 "persist_blob_connect_timeout", 565 "persist_blob_read_timeout", 566 "persist_stats_collection_enabled", 567 "persist_stats_filter_enabled", 568 "persist_stats_budget_bytes", 569 "persist_stats_untrimmable_columns_equals", 570 "persist_stats_untrimmable_columns_prefix", 571 "persist_stats_untrimmable_columns_suffix", 572 "persist_expression_cache_force_compaction_fuel", 573 "persist_expression_cache_force_compaction_wait", 574 "persist_blob_cache_mem_limit_bytes", 575 "persist_blob_cache_scale_factor_bytes", 576 "persist_claim_unclaimed_compactions", 577 "persist_claim_compaction_percent", 578 "persist_claim_compaction_min_version", 579 "persist_next_listen_batch_retryer_multiplier", 580 "persist_rollup_threshold", 581 "persist_rollup_fallback_threshold_ms", 582 "persist_gc_fallback_threshold_ms", 583 "persist_compaction_minimum_timeout", 584 "persist_compaction_check_process_flag", 585 "balancerd_sigterm_connection_wait", 586 "balancerd_sigterm_listen_wait", 587 "balancerd_inject_proxy_protocol_header_http", 588 "balancerd_log_filter", 589 "balancerd_opentelemetry_filter", 590 "balancerd_log_filter_defaults", 591 "balancerd_opentelemetry_filter_defaults", 592 "balancerd_sentry_filters", 593 "persist_enable_s3_lgalloc_cc_sizes", 594 "persist_enable_arrow_lgalloc_cc_sizes", 595 "controller_past_generation_replica_cleanup_retry_interval", 596 "wallclock_lag_recording_interval", 597 "wallclock_lag_histogram_period_interval", 598 "enable_timely_zero_copy", 599 "enable_timely_zero_copy_lgalloc", 600 "timely_zero_copy_limit", 601 "arrangement_exert_proportionality", 602 "txn_wal_apply_ensure_schema_match", 603 "persist_txns_data_shard_retryer_initial_backoff", 604 "persist_txns_data_shard_retryer_multiplier", 605 "persist_txns_data_shard_retryer_clamp", 606 "storage_cluster_shutdown_grace_period", 607 "storage_dataflow_delay_sources_past_rehydration", 608 "storage_dataflow_suspendable_sources", 609 "storage_downgrade_since_during_finalization", 610 "replica_metrics_history_retention_interval", 611 "wallclock_lag_history_retention_interval", 612 "wallclock_global_lag_histogram_retention_interval", 613 "kafka_client_id_enrichment_rules", 614 "kafka_poll_max_wait", 615 "kafka_default_aws_privatelink_endpoint_identification_algorithm", 616 "kafka_buffered_event_resize_threshold_elements", 617 "kafka_low_watermark_check", 618 "mysql_replication_heartbeat_interval", 619 "postgres_fetch_slot_resume_lsn_interval", 620 "pg_schema_validation_interval", 621 "pg_source_validate_timeline", 622 "sql_server_source_validate_restore_history", 623 "storage_enforce_external_addresses", 624 "storage_upsert_prevent_snapshot_buffering", 625 "storage_rocksdb_use_merge_operator", 626 "storage_upsert_max_snapshot_batch_buffering", 627 "storage_rocksdb_cleanup_tries", 628 "storage_suspend_and_restart_delay", 629 "storage_server_maintenance_interval", 630 "storage_sink_progress_search", 631 "storage_sink_ensure_topic_config", 632 "sql_server_max_lsn_wait", 633 "sql_server_snapshot_progress_report_interval", 634 "sql_server_cdc_cleanup_change_table", 635 "sql_server_cdc_cleanup_change_table_max_deletes", 636 "allow_user_sessions", 637 "with_0dt_deployment_ddl_check_interval", 638 "enable_0dt_caught_up_check", 639 "with_0dt_caught_up_check_allowed_lag", 640 "with_0dt_caught_up_check_cutoff", 641 "enable_0dt_caught_up_replica_status_check", 642 "plan_insights_notice_fast_path_clusters_optimize_duration", 643 "enable_expression_cache", 644 "mz_metrics_lgalloc_map_refresh_interval", 645 "mz_metrics_lgalloc_refresh_interval", 646 "mz_metrics_rusage_refresh_interval", 647 "compute_peek_response_stash_batch_max_runs", 648 "compute_peek_response_stash_read_batch_size_bytes", 649 "compute_peek_response_stash_read_memory_budget_bytes", 650 "compute_peek_stash_num_batches", 651 "compute_peek_stash_batch_size", 652 "storage_statistics_retention_duration", 653 "enable_paused_cluster_readhold_downgrade", 654 "kafka_retry_backoff", 655 "kafka_retry_backoff_max", 656 "kafka_reconnect_backoff", 657 "kafka_reconnect_backoff_max", 658 "kafka_sink_message_max_bytes", 659 "kafka_sink_batch_size", 660 "kafka_sink_batch_num_messages", 661 "oidc_issuer", 662 "oidc_audience", 663 "oidc_authentication_claim", 664 "oidc_group_role_sync_enabled", 665 "oidc_group_claim", 666 "oidc_group_role_sync_strict", 667 "console_oidc_client_id", 668 "console_oidc_scopes", 669 "enable_public_metrics_endpoint", 670 "enable_mcp_agent", 671 "enable_mcp_agent_query_tool", 672 "enable_mcp_developer", 673 "enable_mcp_developer_query_tool", 674 "mcp_max_response_size", 675 "user_id_pool_batch_size", 676] 677 678 679DEFAULT_CRDB_ENVIRONMENT = [ 680 "COCKROACH_ENGINE_MAX_SYNC_DURATION_DEFAULT=120s", 681 "COCKROACH_LOG_MAX_SYNC_DURATION=120s", 682] 683 684 685# TODO(benesch): change to `docker-mzcompose` once v0.39 ships. 686DEFAULT_CLOUD_PROVIDER = "mzcompose" 687DEFAULT_CLOUD_REGION = "us-east-1" 688DEFAULT_ORG_ID = "00000000-0000-0000-0000-000000000000" 689DEFAULT_ORDINAL = "0" 690DEFAULT_MZ_ENVIRONMENT_ID = f"{DEFAULT_CLOUD_PROVIDER}-{DEFAULT_CLOUD_REGION}-{DEFAULT_ORG_ID}-{DEFAULT_ORDINAL}" 691 692 693# TODO(benesch): replace with Docker health checks. 694def _check_tcp( 695 cmd: list[str], host: str, port: int, timeout_secs: int, kind: str = "" 696) -> list[str]: 697 cmd.extend( 698 [ 699 "timeout", 700 str(timeout_secs), 701 "bash", 702 "-c", 703 f"until [ cat < /dev/null > /dev/tcp/{host}/{port} ] ; do sleep 0.1 ; done", 704 ] 705 ) 706 try: 707 spawn.capture(cmd, stderr=subprocess.STDOUT) 708 except subprocess.CalledProcessError as e: 709 ui.log_in_automation( 710 "wait-for-tcp ({}{}:{}): error running {}: {}, stdout:\n{}\nstderr:\n{}".format( 711 kind, host, port, ui.shell_quote(cmd), e, e.stdout, e.stderr 712 ) 713 ) 714 raise 715 return cmd 716 717 718# TODO(benesch): replace with Docker health checks. 719def _wait_for_pg( 720 timeout_secs: int, 721 query: str, 722 dbname: str, 723 port: int, 724 host: str, 725 user: str, 726 password: str | None, 727 expected: Iterable[Any] | Literal["any"], 728 print_result: bool = False, 729 sslmode: str = "disable", 730) -> None: 731 """Wait for a pg-compatible database (includes materialized)""" 732 obfuscated_password = password[0:1] if password is not None else "" 733 args = f"dbname={dbname} host={host} port={port} user={user} password='{obfuscated_password}...'" 734 ui.progress(f"waiting for {args} to handle {query!r}", "C") 735 error = None 736 for remaining in ui.timeout_loop(timeout_secs, tick=0.5): 737 try: 738 conn = psycopg.connect( 739 dbname=dbname, 740 host=host, 741 port=port, 742 user=user, 743 password=password, 744 connect_timeout=1, 745 sslmode=sslmode, 746 ) 747 # The default (autocommit = false) wraps everything in a transaction. 748 conn.autocommit = True 749 with conn.cursor() as cur: 750 cur.execute(query.encode()) 751 if expected == "any" and cur.rowcount == -1: 752 ui.progress(" success!", finish=True) 753 return 754 result = list(cur.fetchall()) 755 if expected == "any" or result == expected: 756 if print_result: 757 say(f"query result: {result}") 758 else: 759 ui.progress(" success!", finish=True) 760 return 761 else: 762 say( 763 f"host={host} port={port} did not return rows matching {expected} got: {result}" 764 ) 765 except Exception as e: 766 ui.progress(f"{e if print_result else ''} {int(remaining)}") 767 error = e 768 ui.progress(finish=True) 769 raise UIError(f"never got correct result for {args}: {error}") 770 771 772def bootstrap_cluster_replica_size() -> str: 773 return "bootstrap" 774 775 776def cluster_replica_size_map() -> dict[str, dict[str, Any]]: 777 """scale=<n>,workers=<n>[,mem=<n>GiB][,legacy]""" 778 779 def replica_size( 780 scale: int, 781 workers: int, 782 disabled: bool = False, 783 is_cc: bool = True, 784 memory_limit: str = "4 GiB", 785 ) -> dict[str, Any]: 786 return { 787 "cpu_exclusive": False, 788 "cpu_limit": None, 789 "credits_per_hour": f"{workers * scale}", 790 "disabled": disabled, 791 "disk_limit": None, 792 "is_cc": is_cc, 793 "memory_limit": memory_limit, 794 "scale": scale, 795 "workers": workers, 796 # "selectors": {}, 797 } 798 799 replica_sizes = { 800 bootstrap_cluster_replica_size(): replica_size(1, 1), 801 "scale=2,workers=4": replica_size(2, 4), 802 "scale=1,workers=1,legacy": replica_size(1, 1, is_cc=False), 803 "scale=1,workers=2,legacy": replica_size(1, 2, is_cc=False), 804 # Intentionally not following the naming scheme 805 "free": replica_size(1, 1, disabled=True), 806 } 807 808 for i in range(0, 6): 809 workers = 1 << i 810 replica_sizes[f"scale=1,workers={workers}"] = replica_size(1, workers) 811 for mem in [4, 8, 16, 32]: 812 replica_sizes[f"scale=1,workers={workers},mem={mem}GiB"] = replica_size( 813 1, workers, memory_limit=f"{mem} GiB" 814 ) 815 816 replica_sizes[f"scale={workers},workers=1"] = replica_size(workers, 1) 817 replica_sizes[f"scale={workers},workers={workers}"] = replica_size( 818 workers, workers 819 ) 820 replica_sizes[f"scale=1,workers={workers},mem={workers}GiB"] = replica_size( 821 1, workers, memory_limit=f"{workers} GiB" 822 ) 823 824 return replica_sizes
def
say(msg: str) -> None:
DEFAULT_CONFLUENT_PLATFORM_VERSION =
'8.2.0'
DEFAULT_MZ_VOLUMES =
['mzdata:/mzdata', 'mydata:/var/lib/mysql-files', 'tmp:/share/tmp', 'scratch:/scratch']
ADDITIONAL_BENCHMARKING_SYSTEM_PARAMETERS =
{}
def
get_minimal_system_parameters(version: materialize.mz_version.MzVersion) -> dict[str, str]:
55def get_minimal_system_parameters( 56 version: MzVersion, 57) -> dict[str, str]: 58 """Settings we need in order to have tests run at all, but otherwise stay 59 with the defaults: not changing performance or increasing coverage.""" 60 61 config = { 62 # ----- 63 # Unsafe functions 64 "unsafe_enable_unsafe_functions": "true", 65 # ----- 66 # Others (ordered by name) 67 "allow_real_time_recency": "true", 68 "constraint_based_timestamp_selection": "verify", # removed from main, keeping it here for old versions 69 "enable_compute_peek_response_stash": "true", 70 "enable_0dt_deployment_panic_after_timeout": "true", 71 "enable_0dt_deployment_sources": ( 72 "true" if version >= MzVersion.parse_mz("v0.132.0-dev") else "false" 73 ), 74 "enable_alter_swap": "true", 75 "enable_arrangement_dictionary_compression_alpha": "false", 76 "enable_case_literal_transform": "true", 77 "enable_cast_elimination": "true", 78 "enable_coalesce_case_transform": "true", 79 "enable_columnar_lgalloc": "false", 80 "enable_columnation_lgalloc": "false", 81 "enable_compute_correction_v2": "true", 82 "enable_compute_logical_backpressure": "true", 83 "enable_connection_validation_syntax": "true", 84 "enable_create_table_from_source": "true", 85 "enable_eager_delta_joins": "true", 86 "enable_envelope_debezium_in_subscribe": "true", 87 "enable_expressions_in_limit_syntax": "true", 88 "enable_introspection_subscribes": "true", 89 "enable_kafka_sink_partition_by": "true", 90 "enable_lgalloc": "false", 91 "enable_load_generator_counter": "true", 92 "enable_logical_compaction_window": "true", 93 "enable_multi_worker_storage_persist_sink": "true", 94 "enable_multi_replica_sources": "true", 95 "enable_rbac_checks": "true", 96 "enable_reduce_mfp_fusion": "true", 97 "enable_refresh_every_mvs": "true", 98 "enable_replacement_materialized_views": "true", 99 "enable_cluster_schedule_refresh": "true", 100 "enable_s3_tables_region_check": "false", 101 "enable_statement_lifecycle_logging": "true", 102 "enable_storage_introspection_logs": "true", 103 "enable_compute_temporal_bucketing": "true", 104 "enable_variadic_left_join_lowering": "true", 105 "enable_worker_core_affinity": "true", 106 "grpc_client_http2_keep_alive_timeout": "5s", 107 "ore_overflowing_behavior": "panic", 108 "unsafe_enable_table_keys": "true", 109 "with_0dt_deployment_max_wait": "1800s", 110 # End of list (ordered by name) 111 } 112 113 if version < MzVersion.parse_mz("v0.163.0-dev"): 114 config["enable_compute_active_dataflow_cancelation"] = "true" 115 116 return config
Settings we need in order to have tests run at all, but otherwise stay with the defaults: not changing performance or increasing coverage.
@dataclass
class
VariableSystemParameter:
def
get_variable_system_parameters( version: materialize.mz_version.MzVersion, force_source_table_syntax: bool) -> list[VariableSystemParameter]:
127def get_variable_system_parameters( 128 version: MzVersion, 129 force_source_table_syntax: bool, 130) -> list[VariableSystemParameter]: 131 """Note: Only the default is tested unless we explicitly select "System Parameters: Random" in trigger-ci. 132 These defaults are applied _after_ applying the settings from `get_minimal_system_parameters`. 133 """ 134 135 return [ 136 # ----- 137 # To reduce CRDB load as we are struggling with it in CI (values based on load test environment): 138 VariableSystemParameter( 139 "persist_next_listen_batch_retryer_clamp", 140 "16s", 141 ["100ms", "1s", "10s", "100s"], 142 ), 143 VariableSystemParameter( 144 "persist_next_listen_batch_retryer_initial_backoff", 145 "100ms", 146 ["10ms", "100ms", "1s", "10s"], 147 ), 148 VariableSystemParameter( 149 "persist_next_listen_batch_retryer_fixed_sleep", 150 "1200ms", 151 ["100ms", "1s", "10s"], 152 ), 153 # ----- 154 # Persist internals changes, advance coverage 155 VariableSystemParameter( 156 "persist_enable_arrow_lgalloc_noncc_sizes", "true", ["true", "false"] 157 ), 158 VariableSystemParameter( 159 "persist_enable_s3_lgalloc_noncc_sizes", "true", ["true", "false"] 160 ), 161 # ----- 162 # Others (ordered by name), 163 VariableSystemParameter( 164 "compute_correction_v2_chain_proportionality", 165 "3", 166 ["2", "3"], 167 ), 168 VariableSystemParameter( 169 "compute_correction_v2_chunk_size", 170 "8192", 171 ["8192", "65536", "1048576"], 172 ), 173 VariableSystemParameter( 174 "compute_dataflow_max_inflight_bytes", 175 "134217728", 176 ["1048576", "4194304", "16777216", "67108864"], 177 ), # 128 MiB 178 VariableSystemParameter("compute_hydration_concurrency", "2", ["1", "2", "4"]), 179 VariableSystemParameter( 180 "compute_replica_expiration_offset", "3d", ["3d", "10d"] 181 ), 182 VariableSystemParameter( 183 "compute_apply_column_demands", "true", ["true", "false"] 184 ), 185 VariableSystemParameter( 186 "compute_peek_response_stash_threshold_bytes", 187 # 1 MiB, an in-between value 188 "1048576", 189 # force-enabled, the in-between, and the production value 190 ["0", "1048576", "314572800", "67108864"], 191 ), 192 VariableSystemParameter( 193 "compute_subscribe_snapshot_optimization", 194 "true", 195 ["true", "false"], 196 ), 197 VariableSystemParameter( 198 "enable_case_literal_transform", 199 "false", 200 ["true", "false"], 201 ), 202 VariableSystemParameter( 203 "enable_coalesce_case_transform", 204 "true", 205 ["true", "false"], 206 ), 207 VariableSystemParameter( 208 "enable_cast_elimination", 209 "true", 210 ["true", "false"], 211 ), 212 VariableSystemParameter( 213 "enable_compute_sync_mv_sink", 214 "true", 215 ["true", "false"], 216 ), 217 VariableSystemParameter( 218 "enable_password_auth", 219 "true", 220 ["true", "false"], 221 ), 222 VariableSystemParameter( 223 "enable_frontend_peek_sequencing", 224 "true" if version >= MzVersion.parse_mz("v26.9.0-dev") else "false", 225 ["true", "false"], 226 ), 227 VariableSystemParameter( 228 "enable_frontend_subscribes", 229 "true" if version >= MzVersion.parse_mz("v26.18.0-dev") else "false", 230 ["true", "false"], 231 ), 232 VariableSystemParameter( 233 "enable_upsert_v2", 234 "false", 235 ["true", "false"], 236 ), 237 VariableSystemParameter( 238 "default_timestamp_interval", 239 "1s", 240 ["100ms", "1s"], 241 ), 242 VariableSystemParameter( 243 "force_source_table_syntax", 244 "true" if force_source_table_syntax else "false", 245 ["true", "false"] if force_source_table_syntax else ["false"], 246 ), 247 VariableSystemParameter( 248 "persist_batch_columnar_format", 249 "structured" if version > MzVersion.parse_mz("v0.135.0-dev") else "both_v2", 250 ["row", "both_v2", "both", "structured"], 251 ), 252 VariableSystemParameter( 253 "persist_batch_delete_enabled", "true", ["true", "false"] 254 ), 255 VariableSystemParameter( 256 "persist_batch_structured_order", "true", ["true", "false"] 257 ), 258 VariableSystemParameter( 259 "persist_batch_builder_structured", "true", ["true", "false"] 260 ), 261 VariableSystemParameter( 262 "persist_batch_structured_key_lower_len", 263 "256", 264 ["0", "1", "512", "1000"], 265 ), 266 VariableSystemParameter( 267 "persist_batch_max_run_len", "4", ["2", "3", "4", "16"] 268 ), 269 VariableSystemParameter( 270 "persist_catalog_force_compaction_fuel", 271 "1024", 272 ["256", "1024", "4096"], 273 ), 274 VariableSystemParameter( 275 "persist_catalog_force_compaction_wait", 276 "1s", 277 ["100ms", "1s", "10s"], 278 ), 279 VariableSystemParameter( 280 "persist_stats_audit_percent", 281 "100", 282 [ 283 "0", 284 "1", 285 "2", 286 "10", 287 "100", 288 ], 289 ), 290 VariableSystemParameter("persist_stats_audit_panic", "true", ["true", "false"]), 291 VariableSystemParameter( 292 "persist_encoding_enable_dictionary", "true", ["true", "false"] 293 ), 294 VariableSystemParameter( 295 "persist_fast_path_limit", 296 "1000", 297 ["100", "1000", "10000"], 298 ), 299 VariableSystemParameter("persist_fast_path_order", "true", ["true", "false"]), 300 VariableSystemParameter( 301 "persist_gc_use_active_gc", 302 ("true" if version > MzVersion.parse_mz("v0.143.0-dev") else "false"), 303 ( 304 ["true", "false"] 305 if version > MzVersion.parse_mz("v0.127.0-dev") 306 else ["false"] 307 ), 308 ), 309 VariableSystemParameter( 310 "persist_gc_min_versions", 311 "16", 312 ["16", "256", "1024"], 313 ), 314 VariableSystemParameter( 315 "persist_gc_max_versions", 316 "128000", 317 ["256", "128000"], 318 ), 319 VariableSystemParameter( 320 "persist_inline_writes_single_max_bytes", 321 "4096", 322 ["256", "1024", "4096", "16384"], 323 ), 324 VariableSystemParameter( 325 "persist_inline_writes_total_max_bytes", 326 "1048576", 327 ["65536", "262144", "1048576", "4194304"], 328 ), 329 VariableSystemParameter( 330 "persist_pubsub_client_enabled", "true", ["true", "false"] 331 ), 332 VariableSystemParameter( 333 "persist_pubsub_push_diff_enabled", "true", ["true", "false"] 334 ), 335 VariableSystemParameter( 336 "persist_record_compactions", "true", ["true", "false"] 337 ), 338 VariableSystemParameter( 339 "persist_record_schema_id", 340 ("true" if version > MzVersion.parse_mz("v0.127.0-dev") else "false"), 341 ( 342 ["true", "false"] 343 if version > MzVersion.parse_mz("v0.127.0-dev") 344 else ["false"] 345 ), 346 ), 347 VariableSystemParameter( 348 "persist_rollup_use_active_rollup", 349 ("true" if version > MzVersion.parse_mz("v0.143.0-dev") else "false"), 350 ( 351 ["true", "false"] 352 if version > MzVersion.parse_mz("v0.127.0-dev") 353 else ["false"] 354 ), 355 ), 356 # 16 MiB - large enough to avoid a big perf hit, small enough to get more coverage... 357 VariableSystemParameter( 358 "persist_blob_target_size", 359 "16777216", 360 ["4096", "1048576", "16777216", "134217728"], 361 ), 362 # 5 times the default part size - 4 is the bare minimum. 363 VariableSystemParameter( 364 "persist_compaction_memory_bound_bytes", 365 "83886080", 366 ["67108864", "134217728", "536870912", "1073741824"], 367 ), 368 VariableSystemParameter( 369 "persist_enable_incremental_compaction", 370 ("true" if version >= MzVersion.parse_mz("v0.161.0-dev") else "false"), 371 ( 372 ["true", "false"] 373 if version >= MzVersion.parse_mz("v0.161.0-dev") 374 else ["false"] 375 ), 376 ), 377 VariableSystemParameter( 378 "persist_use_critical_since_catalog", "true", ["true", "false"] 379 ), 380 VariableSystemParameter( 381 "persist_use_critical_since_snapshot", 382 "false", # always false, because we always have zero-downtime enabled 383 ["false"], 384 ), 385 VariableSystemParameter( 386 "persist_use_critical_since_source", 387 "false", # always false, because we always have zero-downtime enabled 388 ["false"], 389 ), 390 VariableSystemParameter( 391 "persist_part_decode_format", "arrow", ["arrow", "row_with_validate"] 392 ), 393 VariableSystemParameter( 394 "persist_blob_cache_scale_with_threads", "true", ["true", "false"] 395 ), 396 VariableSystemParameter( 397 "persist_state_update_lease_timeout", "1s", ["0s", "1s", "10s"] 398 ), 399 VariableSystemParameter( 400 "arrangement_size_history_collection_interval", "1h", ["1s", "10s", "1h"] 401 ), 402 VariableSystemParameter( 403 "arrangement_size_history_retention_period", "7d", ["1min", "1h", "7d"] 404 ), 405 VariableSystemParameter( 406 "persist_validate_part_bounds_on_read", "false", ["true", "false"] 407 ), 408 VariableSystemParameter( 409 "persist_validate_part_bounds_on_write", "false", ["true", "false"] 410 ), 411 VariableSystemParameter( 412 "statement_logging_default_sample_rate", 413 "1.0", 414 ["0", "0.01", "0.5", "0.99", "1.0"], 415 ), 416 VariableSystemParameter( 417 "statement_logging_max_data_credit", 418 "", 419 ["", "0", "1024", "1048576", "1073741824"], 420 ), 421 VariableSystemParameter( 422 "statement_logging_max_sample_rate", 423 "1.0", 424 ["0", "0.01", "0.5", "0.99", "1.0"], 425 ), 426 VariableSystemParameter( 427 "statement_logging_target_data_rate", 428 "", 429 ["", "0", "1", "1000", "2071", "1000000"], 430 ), 431 VariableSystemParameter("storage_reclock_to_latest", "true", ["true", "false"]), 432 VariableSystemParameter( 433 "storage_source_decode_fuel", 434 "100000", 435 ["10000", "100000", "1000000"], 436 ), 437 VariableSystemParameter( 438 "storage_statistics_collection_interval", 439 "1000", 440 ["100", "1000", "10000"], 441 ), 442 VariableSystemParameter( 443 "storage_statistics_interval", "2000", ["100", "1000", "10000"] 444 ), 445 VariableSystemParameter( 446 "storage_use_continual_feedback_upsert", "true", ["true", "false"] 447 ), 448 # End of list (ordered by name) 449 ]
Note: Only the default is tested unless we explicitly select "System Parameters: Random" in trigger-ci.
These defaults are applied _after_ applying the settings from get_minimal_system_parameters.
def
get_default_system_parameters( version: materialize.mz_version.MzVersion | None = None, force_source_table_syntax: bool = False) -> dict[str, str]:
452def get_default_system_parameters( 453 version: MzVersion | None = None, 454 force_source_table_syntax: bool = False, 455) -> dict[str, str]: 456 """For upgrade tests we only want parameters set when all environmentd / 457 clusterd processes have reached a specific version (or higher) 458 """ 459 460 if not version: 461 version = MzVersion.parse_cargo() 462 463 params = get_minimal_system_parameters(version) 464 465 system_param_setting = os.getenv("CI_SYSTEM_PARAMETERS", "") 466 variable_params = get_variable_system_parameters(version, force_source_table_syntax) 467 468 if system_param_setting == "": 469 for param in variable_params: 470 params[param.key] = param.default 471 elif system_param_setting == "random": 472 seed = os.getenv("CI_SYSTEM_PARAMETERS_SEED", os.getenv("BUILDKITE_JOB_ID", 1)) 473 rng = random.Random(seed) 474 for param in variable_params: 475 params[param.key] = rng.choice(param.values) 476 print( 477 f"System parameters with seed CI_SYSTEM_PARAMETERS_SEED={seed}: {params}", 478 file=sys.stderr, 479 ) 480 elif system_param_setting == "minimal": 481 pass 482 else: 483 raise ValueError( 484 f"Unknown value for CI_SYSTEM_PARAMETERS: {system_param_setting}" 485 ) 486 487 return params
For upgrade tests we only want parameters set when all environmentd / clusterd processes have reached a specific version (or higher)
UNINTERESTING_SYSTEM_PARAMETERS =
['enable_compute_half_join2', 'enable_mz_join_core', 'linear_join_yielding', 'enable_column_paged_batcher', 'enable_column_paged_batcher_spill', 'column_paged_batcher_budget_fraction', 'column_paged_batcher_lz4', 'column_paged_batcher_swap_pageout', 'enable_upsert_paged_spill', 'enable_lgalloc_eager_reclamation', 'lgalloc_background_interval', 'lgalloc_file_growth_dampener', 'lgalloc_local_buffer_bytes', 'lgalloc_slow_clear_bytes', 'memory_limiter_interval', 'memory_limiter_usage_bias', 'memory_limiter_burst_factor', 'catalog_info_metrics_reconcile_interval', 'compute_server_maintenance_interval', 'compute_dataflow_max_inflight_bytes_cc', 'compute_flat_map_fuel', 'compute_temporal_bucketing_summary', 'consolidating_vec_growth_dampener', 'copy_to_s3_parquet_row_group_file_ratio', 'copy_to_s3_arrow_builder_buffer_ratio', 'copy_to_s3_multipart_part_size_bytes', 'enable_replica_targeted_materialized_views', 'compute_mv_sink_advance_persist_frontiers', 'compute_prometheus_introspection_scrape_interval', 'enable_compute_replica_expiration', 'enable_compute_render_fueled_as_specific_collection', 'compute_logical_backpressure_max_retained_capabilities', 'compute_logical_backpressure_inflight_slack', 'persist_fetch_semaphore_cost_adjustment', 'persist_fetch_semaphore_permit_adjustment', 'persist_optimize_ignored_data_fetch', 'persist_pubsub_same_process_delegate_enabled', 'persist_pubsub_connect_attempt_timeout', 'persist_pubsub_request_timeout', 'persist_pubsub_connect_max_backoff', 'persist_pubsub_client_sender_channel_size', 'persist_pubsub_client_receiver_channel_size', 'persist_pubsub_server_connection_channel_size', 'persist_pubsub_state_cache_shard_ref_channel_size', 'persist_pubsub_reconnect_backoff', 'persist_encoding_compression_format', 'persist_batch_max_runs', 'persist_write_combine_inline_writes', 'persist_reader_lease_duration', 'persist_consensus_connection_pool_max_size', 'persist_consensus_connection_pool_max_wait', 'persist_consensus_connection_pool_ttl', 'persist_consensus_connection_pool_ttl_stagger', 'crdb_connect_timeout', 'crdb_tcp_user_timeout', 'crdb_keepalives_idle', 'crdb_keepalives_interval', 'crdb_keepalives_retries', 'persist_use_critical_since_txn', 'use_global_txn_cache_source', 'persist_batch_builder_max_outstanding_parts', 'persist_compaction_heuristic_min_inputs', 'persist_compaction_heuristic_min_parts', 'persist_compaction_heuristic_min_updates', 'persist_gc_blob_delete_concurrency_limit', 'persist_state_versions_recent_live_diffs_limit', 'persist_usage_state_fetch_concurrency_limit', 'persist_blob_operation_timeout', 'persist_blob_operation_attempt_timeout', 'persist_blob_connect_timeout', 'persist_blob_read_timeout', 'persist_stats_collection_enabled', 'persist_stats_filter_enabled', 'persist_stats_budget_bytes', 'persist_stats_untrimmable_columns_equals', 'persist_stats_untrimmable_columns_prefix', 'persist_stats_untrimmable_columns_suffix', 'persist_expression_cache_force_compaction_fuel', 'persist_expression_cache_force_compaction_wait', 'persist_blob_cache_mem_limit_bytes', 'persist_blob_cache_scale_factor_bytes', 'persist_claim_unclaimed_compactions', 'persist_claim_compaction_percent', 'persist_claim_compaction_min_version', 'persist_next_listen_batch_retryer_multiplier', 'persist_rollup_threshold', 'persist_rollup_fallback_threshold_ms', 'persist_gc_fallback_threshold_ms', 'persist_compaction_minimum_timeout', 'persist_compaction_check_process_flag', 'balancerd_sigterm_connection_wait', 'balancerd_sigterm_listen_wait', 'balancerd_inject_proxy_protocol_header_http', 'balancerd_log_filter', 'balancerd_opentelemetry_filter', 'balancerd_log_filter_defaults', 'balancerd_opentelemetry_filter_defaults', 'balancerd_sentry_filters', 'persist_enable_s3_lgalloc_cc_sizes', 'persist_enable_arrow_lgalloc_cc_sizes', 'controller_past_generation_replica_cleanup_retry_interval', 'wallclock_lag_recording_interval', 'wallclock_lag_histogram_period_interval', 'enable_timely_zero_copy', 'enable_timely_zero_copy_lgalloc', 'timely_zero_copy_limit', 'arrangement_exert_proportionality', 'txn_wal_apply_ensure_schema_match', 'persist_txns_data_shard_retryer_initial_backoff', 'persist_txns_data_shard_retryer_multiplier', 'persist_txns_data_shard_retryer_clamp', 'storage_cluster_shutdown_grace_period', 'storage_dataflow_delay_sources_past_rehydration', 'storage_dataflow_suspendable_sources', 'storage_downgrade_since_during_finalization', 'replica_metrics_history_retention_interval', 'wallclock_lag_history_retention_interval', 'wallclock_global_lag_histogram_retention_interval', 'kafka_client_id_enrichment_rules', 'kafka_poll_max_wait', 'kafka_default_aws_privatelink_endpoint_identification_algorithm', 'kafka_buffered_event_resize_threshold_elements', 'kafka_low_watermark_check', 'mysql_replication_heartbeat_interval', 'postgres_fetch_slot_resume_lsn_interval', 'pg_schema_validation_interval', 'pg_source_validate_timeline', 'sql_server_source_validate_restore_history', 'storage_enforce_external_addresses', 'storage_upsert_prevent_snapshot_buffering', 'storage_rocksdb_use_merge_operator', 'storage_upsert_max_snapshot_batch_buffering', 'storage_rocksdb_cleanup_tries', 'storage_suspend_and_restart_delay', 'storage_server_maintenance_interval', 'storage_sink_progress_search', 'storage_sink_ensure_topic_config', 'sql_server_max_lsn_wait', 'sql_server_snapshot_progress_report_interval', 'sql_server_cdc_cleanup_change_table', 'sql_server_cdc_cleanup_change_table_max_deletes', 'allow_user_sessions', 'with_0dt_deployment_ddl_check_interval', 'enable_0dt_caught_up_check', 'with_0dt_caught_up_check_allowed_lag', 'with_0dt_caught_up_check_cutoff', 'enable_0dt_caught_up_replica_status_check', 'plan_insights_notice_fast_path_clusters_optimize_duration', 'enable_expression_cache', 'mz_metrics_lgalloc_map_refresh_interval', 'mz_metrics_lgalloc_refresh_interval', 'mz_metrics_rusage_refresh_interval', 'compute_peek_response_stash_batch_max_runs', 'compute_peek_response_stash_read_batch_size_bytes', 'compute_peek_response_stash_read_memory_budget_bytes', 'compute_peek_stash_num_batches', 'compute_peek_stash_batch_size', 'storage_statistics_retention_duration', 'enable_paused_cluster_readhold_downgrade', 'kafka_retry_backoff', 'kafka_retry_backoff_max', 'kafka_reconnect_backoff', 'kafka_reconnect_backoff_max', 'kafka_sink_message_max_bytes', 'kafka_sink_batch_size', 'kafka_sink_batch_num_messages', 'oidc_issuer', 'oidc_audience', 'oidc_authentication_claim', 'oidc_group_role_sync_enabled', 'oidc_group_claim', 'oidc_group_role_sync_strict', 'console_oidc_client_id', 'console_oidc_scopes', 'enable_public_metrics_endpoint', 'enable_mcp_agent', 'enable_mcp_agent_query_tool', 'enable_mcp_developer', 'enable_mcp_developer_query_tool', 'mcp_max_response_size', 'user_id_pool_batch_size']
DEFAULT_CRDB_ENVIRONMENT =
['COCKROACH_ENGINE_MAX_SYNC_DURATION_DEFAULT=120s', 'COCKROACH_LOG_MAX_SYNC_DURATION=120s']
DEFAULT_CLOUD_PROVIDER =
'mzcompose'
DEFAULT_CLOUD_REGION =
'us-east-1'
DEFAULT_ORG_ID =
'00000000-0000-0000-0000-000000000000'
DEFAULT_ORDINAL =
'0'
DEFAULT_MZ_ENVIRONMENT_ID =
'mzcompose-us-east-1-00000000-0000-0000-0000-000000000000-0'
def
bootstrap_cluster_replica_size() -> str:
def
cluster_replica_size_map() -> dict[str, dict[str, typing.Any]]:
777def cluster_replica_size_map() -> dict[str, dict[str, Any]]: 778 """scale=<n>,workers=<n>[,mem=<n>GiB][,legacy]""" 779 780 def replica_size( 781 scale: int, 782 workers: int, 783 disabled: bool = False, 784 is_cc: bool = True, 785 memory_limit: str = "4 GiB", 786 ) -> dict[str, Any]: 787 return { 788 "cpu_exclusive": False, 789 "cpu_limit": None, 790 "credits_per_hour": f"{workers * scale}", 791 "disabled": disabled, 792 "disk_limit": None, 793 "is_cc": is_cc, 794 "memory_limit": memory_limit, 795 "scale": scale, 796 "workers": workers, 797 # "selectors": {}, 798 } 799 800 replica_sizes = { 801 bootstrap_cluster_replica_size(): replica_size(1, 1), 802 "scale=2,workers=4": replica_size(2, 4), 803 "scale=1,workers=1,legacy": replica_size(1, 1, is_cc=False), 804 "scale=1,workers=2,legacy": replica_size(1, 2, is_cc=False), 805 # Intentionally not following the naming scheme 806 "free": replica_size(1, 1, disabled=True), 807 } 808 809 for i in range(0, 6): 810 workers = 1 << i 811 replica_sizes[f"scale=1,workers={workers}"] = replica_size(1, workers) 812 for mem in [4, 8, 16, 32]: 813 replica_sizes[f"scale=1,workers={workers},mem={mem}GiB"] = replica_size( 814 1, workers, memory_limit=f"{mem} GiB" 815 ) 816 817 replica_sizes[f"scale={workers},workers=1"] = replica_size(workers, 1) 818 replica_sizes[f"scale={workers},workers={workers}"] = replica_size( 819 workers, workers 820 ) 821 replica_sizes[f"scale=1,workers={workers},mem={workers}GiB"] = replica_size( 822 1, workers, memory_limit=f"{workers} GiB" 823 ) 824 825 return replica_sizes
scale=