misc.python.materialize.mzcompose

The implementation of the mzcompose system for Docker compositions.

For an overview of what mzcompose is and why it exists, see the user-facing documentation.

  1# Copyright Materialize, Inc. and contributors. All rights reserved.
  2#
  3# Use of this software is governed by the Business Source License
  4# included in the LICENSE file at the root of this repository.
  5#
  6# As of the Change Date specified in that file, in accordance with
  7# the Business Source License, use of this software will be governed
  8# by the Apache License, Version 2.0.
  9
 10"""The implementation of the mzcompose system for Docker compositions.
 11
 12For an overview of what mzcompose is and why it exists, see the [user-facing
 13documentation][user-docs].
 14
 15[user-docs]: https://github.com/MaterializeInc/materialize/blob/main/doc/developer/mzbuild.md
 16"""
 17
 18import os
 19import random
 20import subprocess
 21import sys
 22from collections.abc import Iterable
 23from dataclasses import dataclass
 24from typing import Any, Literal, TypeVar
 25
 26import psycopg
 27
 28from materialize import spawn, ui
 29from materialize.mz_version import MzVersion
 30from materialize.ui import UIError
 31
 32T = TypeVar("T")
 33say = ui.speaker("C> ")
 34
 35
 36DEFAULT_CONFLUENT_PLATFORM_VERSION = "8.2.0"
 37
 38DEFAULT_MZ_VOLUMES = [
 39    "mzdata:/mzdata",
 40    "mydata:/var/lib/mysql-files",
 41    "tmp:/share/tmp",
 42    "scratch:/scratch",
 43]
 44
 45
 46# Parameters which disable systems that periodically/unpredictably impact performance
 47# We try to keep this empty, so that we benchmark Materialize as we ship it. If
 48# a new feature causes benchmarks to become flaky, consider that this can also
 49# impact customers' experience and try to find a solution other than disabling
 50# the feature here!
 51ADDITIONAL_BENCHMARKING_SYSTEM_PARAMETERS = {}
 52
 53
 54def get_minimal_system_parameters(
 55    version: MzVersion,
 56) -> dict[str, str]:
 57    """Settings we need in order to have tests run at all, but otherwise stay
 58    with the defaults: not changing performance or increasing coverage."""
 59
 60    config = {
 61        # -----
 62        # Unsafe functions
 63        "unsafe_enable_unsafe_functions": "true",
 64        # -----
 65        # Others (ordered by name)
 66        "allow_real_time_recency": "true",
 67        "constraint_based_timestamp_selection": "verify",  # removed from main, keeping it here for old versions
 68        "enable_compute_peek_response_stash": "true",
 69        "enable_0dt_deployment_panic_after_timeout": "true",
 70        "enable_0dt_deployment_sources": (
 71            "true" if version >= MzVersion.parse_mz("v0.132.0-dev") else "false"
 72        ),
 73        "enable_alter_swap": "true",
 74        "enable_arrangement_dictionary_compression_alpha": "false",
 75        "enable_case_literal_transform": "true",
 76        "enable_cast_elimination": "true",
 77        "enable_coalesce_case_transform": "true",
 78        "enable_columnar_lgalloc": "false",
 79        "enable_columnation_lgalloc": "false",
 80        "enable_compute_correction_v2": "true",
 81        "enable_compute_logical_backpressure": "true",
 82        "enable_connection_validation_syntax": "true",
 83        "enable_create_table_from_source": "true",
 84        "enable_eager_delta_joins": "true",
 85        "enable_envelope_debezium_in_subscribe": "true",
 86        "enable_expressions_in_limit_syntax": "true",
 87        "enable_introspection_subscribes": "true",
 88        "enable_kafka_sink_partition_by": "true",
 89        "enable_lgalloc": "false",
 90        "enable_load_generator_counter": "true",
 91        "enable_logical_compaction_window": "true",
 92        "enable_multi_worker_storage_persist_sink": "true",
 93        "enable_multi_replica_sources": "true",
 94        "enable_rbac_checks": "true",
 95        "enable_reduce_mfp_fusion": "true",
 96        "enable_refresh_every_mvs": "true",
 97        "enable_replacement_materialized_views": "true",
 98        "enable_cluster_schedule_refresh": "true",
 99        "enable_s3_tables_region_check": "false",
100        "enable_statement_lifecycle_logging": "true",
101        "enable_storage_introspection_logs": "true",
102        "enable_compute_temporal_bucketing": "true",
103        "enable_variadic_left_join_lowering": "true",
104        "enable_worker_core_affinity": "true",
105        "grpc_client_http2_keep_alive_timeout": "5s",
106        "ore_overflowing_behavior": "panic",
107        "unsafe_enable_table_keys": "true",
108        "with_0dt_deployment_max_wait": "1800s",
109        # End of list (ordered by name)
110    }
111
112    if version < MzVersion.parse_mz("v0.163.0-dev"):
113        config["enable_compute_active_dataflow_cancelation"] = "true"
114
115    return config
116
117
118@dataclass
119class VariableSystemParameter:
120    key: str
121    default: str
122    values: list[str]
123
124
125# TODO: The linter should check this too
126def get_variable_system_parameters(
127    version: MzVersion,
128    force_source_table_syntax: bool,
129) -> list[VariableSystemParameter]:
130    """Note: Only the default is tested unless we explicitly select "System Parameters: Random" in trigger-ci.
131    These defaults are applied _after_ applying the settings from `get_minimal_system_parameters`.
132    """
133
134    return [
135        # -----
136        # To reduce CRDB load as we are struggling with it in CI (values based on load test environment):
137        VariableSystemParameter(
138            "persist_next_listen_batch_retryer_clamp",
139            "16s",
140            ["100ms", "1s", "10s", "100s"],
141        ),
142        VariableSystemParameter(
143            "persist_next_listen_batch_retryer_initial_backoff",
144            "100ms",
145            ["10ms", "100ms", "1s", "10s"],
146        ),
147        VariableSystemParameter(
148            "persist_next_listen_batch_retryer_fixed_sleep",
149            "1200ms",
150            ["100ms", "1s", "10s"],
151        ),
152        # -----
153        # Persist internals changes, advance coverage
154        VariableSystemParameter(
155            "persist_enable_arrow_lgalloc_noncc_sizes", "true", ["true", "false"]
156        ),
157        VariableSystemParameter(
158            "persist_enable_s3_lgalloc_noncc_sizes", "true", ["true", "false"]
159        ),
160        # -----
161        # Others (ordered by name),
162        VariableSystemParameter(
163            "compute_correction_v2_chain_proportionality",
164            "3",
165            ["2", "3"],
166        ),
167        VariableSystemParameter(
168            "compute_correction_v2_chunk_size",
169            "8192",
170            ["8192", "65536", "1048576"],
171        ),
172        VariableSystemParameter(
173            "compute_dataflow_max_inflight_bytes",
174            "134217728",
175            ["1048576", "4194304", "16777216", "67108864"],
176        ),  # 128 MiB
177        VariableSystemParameter("compute_hydration_concurrency", "2", ["1", "2", "4"]),
178        VariableSystemParameter(
179            "compute_replica_expiration_offset", "3d", ["3d", "10d"]
180        ),
181        VariableSystemParameter(
182            "compute_apply_column_demands", "true", ["true", "false"]
183        ),
184        VariableSystemParameter(
185            "compute_peek_response_stash_threshold_bytes",
186            # 1 MiB, an in-between value
187            "1048576",
188            # force-enabled, the in-between, and the production value
189            ["0", "1048576", "314572800", "67108864"],
190        ),
191        VariableSystemParameter(
192            "compute_subscribe_snapshot_optimization",
193            "true",
194            ["true", "false"],
195        ),
196        VariableSystemParameter(
197            "enable_case_literal_transform",
198            "false",
199            ["true", "false"],
200        ),
201        VariableSystemParameter(
202            "enable_coalesce_case_transform",
203            "true",
204            ["true", "false"],
205        ),
206        VariableSystemParameter(
207            "enable_cast_elimination",
208            "true",
209            ["true", "false"],
210        ),
211        VariableSystemParameter(
212            "enable_compute_sync_mv_sink",
213            "true",
214            ["true", "false"],
215        ),
216        VariableSystemParameter(
217            "enable_password_auth",
218            "true",
219            ["true", "false"],
220        ),
221        VariableSystemParameter(
222            "enable_frontend_peek_sequencing",
223            "true" if version >= MzVersion.parse_mz("v26.9.0-dev") else "false",
224            ["true", "false"],
225        ),
226        VariableSystemParameter(
227            "enable_frontend_subscribes",
228            "true" if version >= MzVersion.parse_mz("v26.18.0-dev") else "false",
229            ["true", "false"],
230        ),
231        VariableSystemParameter(
232            "enable_upsert_v2",
233            "false",
234            ["true", "false"],
235        ),
236        VariableSystemParameter(
237            "default_timestamp_interval",
238            "1s",
239            ["100ms", "1s"],
240        ),
241        VariableSystemParameter(
242            "force_source_table_syntax",
243            "true" if force_source_table_syntax else "false",
244            ["true", "false"] if force_source_table_syntax else ["false"],
245        ),
246        VariableSystemParameter(
247            "persist_batch_columnar_format",
248            "structured" if version > MzVersion.parse_mz("v0.135.0-dev") else "both_v2",
249            ["row", "both_v2", "both", "structured"],
250        ),
251        VariableSystemParameter(
252            "persist_batch_delete_enabled", "true", ["true", "false"]
253        ),
254        VariableSystemParameter(
255            "persist_batch_structured_order", "true", ["true", "false"]
256        ),
257        VariableSystemParameter(
258            "persist_batch_builder_structured", "true", ["true", "false"]
259        ),
260        VariableSystemParameter(
261            "persist_batch_structured_key_lower_len",
262            "256",
263            ["0", "1", "512", "1000"],
264        ),
265        VariableSystemParameter(
266            "persist_batch_max_run_len", "4", ["2", "3", "4", "16"]
267        ),
268        VariableSystemParameter(
269            "persist_catalog_force_compaction_fuel",
270            "1024",
271            ["256", "1024", "4096"],
272        ),
273        VariableSystemParameter(
274            "persist_catalog_force_compaction_wait",
275            "1s",
276            ["100ms", "1s", "10s"],
277        ),
278        VariableSystemParameter(
279            "persist_stats_audit_percent",
280            "100",
281            [
282                "0",
283                "1",
284                "2",
285                "10",
286                "100",
287            ],
288        ),
289        VariableSystemParameter("persist_stats_audit_panic", "true", ["true", "false"]),
290        VariableSystemParameter(
291            "persist_encoding_enable_dictionary", "true", ["true", "false"]
292        ),
293        VariableSystemParameter(
294            "persist_fast_path_limit",
295            "1000",
296            ["100", "1000", "10000"],
297        ),
298        VariableSystemParameter("persist_fast_path_order", "true", ["true", "false"]),
299        VariableSystemParameter(
300            "persist_gc_use_active_gc",
301            ("true" if version > MzVersion.parse_mz("v0.143.0-dev") else "false"),
302            (
303                ["true", "false"]
304                if version > MzVersion.parse_mz("v0.127.0-dev")
305                else ["false"]
306            ),
307        ),
308        VariableSystemParameter(
309            "persist_gc_min_versions",
310            "16",
311            ["16", "256", "1024"],
312        ),
313        VariableSystemParameter(
314            "persist_gc_max_versions",
315            "128000",
316            ["256", "128000"],
317        ),
318        VariableSystemParameter(
319            "persist_inline_writes_single_max_bytes",
320            "4096",
321            ["256", "1024", "4096", "16384"],
322        ),
323        VariableSystemParameter(
324            "persist_inline_writes_total_max_bytes",
325            "1048576",
326            ["65536", "262144", "1048576", "4194304"],
327        ),
328        VariableSystemParameter(
329            "persist_pubsub_client_enabled", "true", ["true", "false"]
330        ),
331        VariableSystemParameter(
332            "persist_pubsub_push_diff_enabled", "true", ["true", "false"]
333        ),
334        VariableSystemParameter(
335            "persist_record_compactions", "true", ["true", "false"]
336        ),
337        VariableSystemParameter(
338            "persist_record_schema_id",
339            ("true" if version > MzVersion.parse_mz("v0.127.0-dev") else "false"),
340            (
341                ["true", "false"]
342                if version > MzVersion.parse_mz("v0.127.0-dev")
343                else ["false"]
344            ),
345        ),
346        VariableSystemParameter(
347            "persist_rollup_use_active_rollup",
348            ("true" if version > MzVersion.parse_mz("v0.143.0-dev") else "false"),
349            (
350                ["true", "false"]
351                if version > MzVersion.parse_mz("v0.127.0-dev")
352                else ["false"]
353            ),
354        ),
355        # 16 MiB - large enough to avoid a big perf hit, small enough to get more coverage...
356        VariableSystemParameter(
357            "persist_blob_target_size",
358            "16777216",
359            ["4096", "1048576", "16777216", "134217728"],
360        ),
361        # 5 times the default part size - 4 is the bare minimum.
362        VariableSystemParameter(
363            "persist_compaction_memory_bound_bytes",
364            "83886080",
365            ["67108864", "134217728", "536870912", "1073741824"],
366        ),
367        VariableSystemParameter(
368            "persist_enable_incremental_compaction",
369            ("true" if version >= MzVersion.parse_mz("v0.161.0-dev") else "false"),
370            (
371                ["true", "false"]
372                if version >= MzVersion.parse_mz("v0.161.0-dev")
373                else ["false"]
374            ),
375        ),
376        VariableSystemParameter(
377            "persist_use_critical_since_catalog", "true", ["true", "false"]
378        ),
379        VariableSystemParameter(
380            "persist_use_critical_since_snapshot",
381            "false",  # always false, because we always have zero-downtime enabled
382            ["false"],
383        ),
384        VariableSystemParameter(
385            "persist_use_critical_since_source",
386            "false",  # always false, because we always have zero-downtime enabled
387            ["false"],
388        ),
389        VariableSystemParameter(
390            "persist_part_decode_format", "arrow", ["arrow", "row_with_validate"]
391        ),
392        VariableSystemParameter(
393            "persist_blob_cache_scale_with_threads", "true", ["true", "false"]
394        ),
395        VariableSystemParameter(
396            "persist_state_update_lease_timeout", "1s", ["0s", "1s", "10s"]
397        ),
398        VariableSystemParameter(
399            "arrangement_size_history_collection_interval", "1h", ["1s", "10s", "1h"]
400        ),
401        VariableSystemParameter(
402            "arrangement_size_history_retention_period", "7d", ["1min", "1h", "7d"]
403        ),
404        VariableSystemParameter(
405            "persist_validate_part_bounds_on_read", "false", ["true", "false"]
406        ),
407        VariableSystemParameter(
408            "persist_validate_part_bounds_on_write", "false", ["true", "false"]
409        ),
410        VariableSystemParameter(
411            "statement_logging_default_sample_rate",
412            "1.0",
413            ["0", "0.01", "0.5", "0.99", "1.0"],
414        ),
415        VariableSystemParameter(
416            "statement_logging_max_data_credit",
417            "",
418            ["", "0", "1024", "1048576", "1073741824"],
419        ),
420        VariableSystemParameter(
421            "statement_logging_max_sample_rate",
422            "1.0",
423            ["0", "0.01", "0.5", "0.99", "1.0"],
424        ),
425        VariableSystemParameter(
426            "statement_logging_target_data_rate",
427            "",
428            ["", "0", "1", "1000", "2071", "1000000"],
429        ),
430        VariableSystemParameter("storage_reclock_to_latest", "true", ["true", "false"]),
431        VariableSystemParameter(
432            "storage_source_decode_fuel",
433            "100000",
434            ["10000", "100000", "1000000"],
435        ),
436        VariableSystemParameter(
437            "storage_statistics_collection_interval",
438            "1000",
439            ["100", "1000", "10000"],
440        ),
441        VariableSystemParameter(
442            "storage_statistics_interval", "2000", ["100", "1000", "10000"]
443        ),
444        VariableSystemParameter(
445            "storage_use_continual_feedback_upsert", "true", ["true", "false"]
446        ),
447        # End of list (ordered by name)
448    ]
449
450
451def get_default_system_parameters(
452    version: MzVersion | None = None,
453    force_source_table_syntax: bool = False,
454) -> dict[str, str]:
455    """For upgrade tests we only want parameters set when all environmentd /
456    clusterd processes have reached a specific version (or higher)
457    """
458
459    if not version:
460        version = MzVersion.parse_cargo()
461
462    params = get_minimal_system_parameters(version)
463
464    system_param_setting = os.getenv("CI_SYSTEM_PARAMETERS", "")
465    variable_params = get_variable_system_parameters(version, force_source_table_syntax)
466
467    if system_param_setting == "":
468        for param in variable_params:
469            params[param.key] = param.default
470    elif system_param_setting == "random":
471        seed = os.getenv("CI_SYSTEM_PARAMETERS_SEED", os.getenv("BUILDKITE_JOB_ID", 1))
472        rng = random.Random(seed)
473        for param in variable_params:
474            params[param.key] = rng.choice(param.values)
475        print(
476            f"System parameters with seed CI_SYSTEM_PARAMETERS_SEED={seed}: {params}",
477            file=sys.stderr,
478        )
479    elif system_param_setting == "minimal":
480        pass
481    else:
482        raise ValueError(
483            f"Unknown value for CI_SYSTEM_PARAMETERS: {system_param_setting}"
484        )
485
486    return params
487
488
489# If you are adding a new config flag in Materialize, consider setting values
490# for it in get_variable_system_parameters if it can be varied in tests. Set it
491# in get_minimal_system_parameters if it's required for tests to succeed at
492# all. Only add it in UNINTERESTING_SYSTEM_PARAMETERS if none of the above
493# apply.
494UNINTERESTING_SYSTEM_PARAMETERS = [
495    "enable_compute_half_join2",
496    "enable_mz_join_core",
497    "linear_join_yielding",
498    "enable_column_paged_batcher",
499    "enable_column_paged_batcher_spill",
500    "column_paged_batcher_budget_fraction",
501    "column_paged_batcher_lz4",
502    "column_paged_batcher_swap_pageout",
503    "enable_upsert_paged_spill",
504    "enable_lgalloc_eager_reclamation",
505    "lgalloc_background_interval",
506    "lgalloc_file_growth_dampener",
507    "lgalloc_local_buffer_bytes",
508    "lgalloc_slow_clear_bytes",
509    "memory_limiter_interval",
510    "memory_limiter_usage_bias",
511    "memory_limiter_burst_factor",
512    "catalog_info_metrics_reconcile_interval",
513    "compute_server_maintenance_interval",
514    "compute_dataflow_max_inflight_bytes_cc",
515    "compute_flat_map_fuel",
516    "compute_temporal_bucketing_summary",
517    "consolidating_vec_growth_dampener",
518    "copy_to_s3_parquet_row_group_file_ratio",
519    "copy_to_s3_arrow_builder_buffer_ratio",
520    "copy_to_s3_multipart_part_size_bytes",
521    "enable_replica_targeted_materialized_views",
522    "compute_mv_sink_advance_persist_frontiers",
523    "compute_prometheus_introspection_scrape_interval",
524    "enable_compute_replica_expiration",
525    "enable_compute_render_fueled_as_specific_collection",
526    "compute_logical_backpressure_max_retained_capabilities",
527    "compute_logical_backpressure_inflight_slack",
528    "persist_fetch_semaphore_cost_adjustment",
529    "persist_fetch_semaphore_permit_adjustment",
530    "persist_optimize_ignored_data_fetch",
531    "persist_pubsub_same_process_delegate_enabled",
532    "persist_pubsub_connect_attempt_timeout",
533    "persist_pubsub_request_timeout",
534    "persist_pubsub_connect_max_backoff",
535    "persist_pubsub_client_sender_channel_size",
536    "persist_pubsub_client_receiver_channel_size",
537    "persist_pubsub_server_connection_channel_size",
538    "persist_pubsub_state_cache_shard_ref_channel_size",
539    "persist_pubsub_reconnect_backoff",
540    "persist_encoding_compression_format",
541    "persist_batch_max_runs",
542    "persist_write_combine_inline_writes",
543    "persist_reader_lease_duration",
544    "persist_consensus_connection_pool_max_size",
545    "persist_consensus_connection_pool_max_wait",
546    "persist_consensus_connection_pool_ttl",
547    "persist_consensus_connection_pool_ttl_stagger",
548    "crdb_connect_timeout",
549    "crdb_tcp_user_timeout",
550    "crdb_keepalives_idle",
551    "crdb_keepalives_interval",
552    "crdb_keepalives_retries",
553    "persist_use_critical_since_txn",
554    "use_global_txn_cache_source",
555    "persist_batch_builder_max_outstanding_parts",
556    "persist_compaction_heuristic_min_inputs",
557    "persist_compaction_heuristic_min_parts",
558    "persist_compaction_heuristic_min_updates",
559    "persist_gc_blob_delete_concurrency_limit",
560    "persist_state_versions_recent_live_diffs_limit",
561    "persist_usage_state_fetch_concurrency_limit",
562    "persist_blob_operation_timeout",
563    "persist_blob_operation_attempt_timeout",
564    "persist_blob_connect_timeout",
565    "persist_blob_read_timeout",
566    "persist_stats_collection_enabled",
567    "persist_stats_filter_enabled",
568    "persist_stats_budget_bytes",
569    "persist_stats_untrimmable_columns_equals",
570    "persist_stats_untrimmable_columns_prefix",
571    "persist_stats_untrimmable_columns_suffix",
572    "persist_expression_cache_force_compaction_fuel",
573    "persist_expression_cache_force_compaction_wait",
574    "persist_blob_cache_mem_limit_bytes",
575    "persist_blob_cache_scale_factor_bytes",
576    "persist_claim_unclaimed_compactions",
577    "persist_claim_compaction_percent",
578    "persist_claim_compaction_min_version",
579    "persist_next_listen_batch_retryer_multiplier",
580    "persist_rollup_threshold",
581    "persist_rollup_fallback_threshold_ms",
582    "persist_gc_fallback_threshold_ms",
583    "persist_compaction_minimum_timeout",
584    "persist_compaction_check_process_flag",
585    "balancerd_sigterm_connection_wait",
586    "balancerd_sigterm_listen_wait",
587    "balancerd_inject_proxy_protocol_header_http",
588    "balancerd_log_filter",
589    "balancerd_opentelemetry_filter",
590    "balancerd_log_filter_defaults",
591    "balancerd_opentelemetry_filter_defaults",
592    "balancerd_sentry_filters",
593    "persist_enable_s3_lgalloc_cc_sizes",
594    "persist_enable_arrow_lgalloc_cc_sizes",
595    "controller_past_generation_replica_cleanup_retry_interval",
596    "wallclock_lag_recording_interval",
597    "wallclock_lag_histogram_period_interval",
598    "enable_timely_zero_copy",
599    "enable_timely_zero_copy_lgalloc",
600    "timely_zero_copy_limit",
601    "arrangement_exert_proportionality",
602    "txn_wal_apply_ensure_schema_match",
603    "persist_txns_data_shard_retryer_initial_backoff",
604    "persist_txns_data_shard_retryer_multiplier",
605    "persist_txns_data_shard_retryer_clamp",
606    "storage_cluster_shutdown_grace_period",
607    "storage_dataflow_delay_sources_past_rehydration",
608    "storage_dataflow_suspendable_sources",
609    "storage_downgrade_since_during_finalization",
610    "replica_metrics_history_retention_interval",
611    "wallclock_lag_history_retention_interval",
612    "wallclock_global_lag_histogram_retention_interval",
613    "kafka_client_id_enrichment_rules",
614    "kafka_poll_max_wait",
615    "kafka_default_aws_privatelink_endpoint_identification_algorithm",
616    "kafka_buffered_event_resize_threshold_elements",
617    "kafka_low_watermark_check",
618    "mysql_replication_heartbeat_interval",
619    "postgres_fetch_slot_resume_lsn_interval",
620    "pg_schema_validation_interval",
621    "pg_source_validate_timeline",
622    "sql_server_source_validate_restore_history",
623    "storage_enforce_external_addresses",
624    "storage_upsert_prevent_snapshot_buffering",
625    "storage_rocksdb_use_merge_operator",
626    "storage_upsert_max_snapshot_batch_buffering",
627    "storage_rocksdb_cleanup_tries",
628    "storage_suspend_and_restart_delay",
629    "storage_server_maintenance_interval",
630    "storage_sink_progress_search",
631    "storage_sink_ensure_topic_config",
632    "sql_server_max_lsn_wait",
633    "sql_server_snapshot_progress_report_interval",
634    "sql_server_cdc_cleanup_change_table",
635    "sql_server_cdc_cleanup_change_table_max_deletes",
636    "allow_user_sessions",
637    "with_0dt_deployment_ddl_check_interval",
638    "enable_0dt_caught_up_check",
639    "with_0dt_caught_up_check_allowed_lag",
640    "with_0dt_caught_up_check_cutoff",
641    "enable_0dt_caught_up_replica_status_check",
642    "plan_insights_notice_fast_path_clusters_optimize_duration",
643    "enable_expression_cache",
644    "mz_metrics_lgalloc_map_refresh_interval",
645    "mz_metrics_lgalloc_refresh_interval",
646    "mz_metrics_rusage_refresh_interval",
647    "compute_peek_response_stash_batch_max_runs",
648    "compute_peek_response_stash_read_batch_size_bytes",
649    "compute_peek_response_stash_read_memory_budget_bytes",
650    "compute_peek_stash_num_batches",
651    "compute_peek_stash_batch_size",
652    "storage_statistics_retention_duration",
653    "enable_paused_cluster_readhold_downgrade",
654    "kafka_retry_backoff",
655    "kafka_retry_backoff_max",
656    "kafka_reconnect_backoff",
657    "kafka_reconnect_backoff_max",
658    "kafka_sink_message_max_bytes",
659    "kafka_sink_batch_size",
660    "kafka_sink_batch_num_messages",
661    "oidc_issuer",
662    "oidc_audience",
663    "oidc_authentication_claim",
664    "oidc_group_role_sync_enabled",
665    "oidc_group_claim",
666    "oidc_group_role_sync_strict",
667    "console_oidc_client_id",
668    "console_oidc_scopes",
669    "enable_public_metrics_endpoint",
670    "enable_mcp_agent",
671    "enable_mcp_agent_query_tool",
672    "enable_mcp_developer",
673    "enable_mcp_developer_query_tool",
674    "mcp_max_response_size",
675    "user_id_pool_batch_size",
676]
677
678
679DEFAULT_CRDB_ENVIRONMENT = [
680    "COCKROACH_ENGINE_MAX_SYNC_DURATION_DEFAULT=120s",
681    "COCKROACH_LOG_MAX_SYNC_DURATION=120s",
682]
683
684
685# TODO(benesch): change to `docker-mzcompose` once v0.39 ships.
686DEFAULT_CLOUD_PROVIDER = "mzcompose"
687DEFAULT_CLOUD_REGION = "us-east-1"
688DEFAULT_ORG_ID = "00000000-0000-0000-0000-000000000000"
689DEFAULT_ORDINAL = "0"
690DEFAULT_MZ_ENVIRONMENT_ID = f"{DEFAULT_CLOUD_PROVIDER}-{DEFAULT_CLOUD_REGION}-{DEFAULT_ORG_ID}-{DEFAULT_ORDINAL}"
691
692
693# TODO(benesch): replace with Docker health checks.
694def _check_tcp(
695    cmd: list[str], host: str, port: int, timeout_secs: int, kind: str = ""
696) -> list[str]:
697    cmd.extend(
698        [
699            "timeout",
700            str(timeout_secs),
701            "bash",
702            "-c",
703            f"until [ cat < /dev/null > /dev/tcp/{host}/{port} ] ; do sleep 0.1 ; done",
704        ]
705    )
706    try:
707        spawn.capture(cmd, stderr=subprocess.STDOUT)
708    except subprocess.CalledProcessError as e:
709        ui.log_in_automation(
710            "wait-for-tcp ({}{}:{}): error running {}: {}, stdout:\n{}\nstderr:\n{}".format(
711                kind, host, port, ui.shell_quote(cmd), e, e.stdout, e.stderr
712            )
713        )
714        raise
715    return cmd
716
717
718# TODO(benesch): replace with Docker health checks.
719def _wait_for_pg(
720    timeout_secs: int,
721    query: str,
722    dbname: str,
723    port: int,
724    host: str,
725    user: str,
726    password: str | None,
727    expected: Iterable[Any] | Literal["any"],
728    print_result: bool = False,
729    sslmode: str = "disable",
730) -> None:
731    """Wait for a pg-compatible database (includes materialized)"""
732    obfuscated_password = password[0:1] if password is not None else ""
733    args = f"dbname={dbname} host={host} port={port} user={user} password='{obfuscated_password}...'"
734    ui.progress(f"waiting for {args} to handle {query!r}", "C")
735    error = None
736    for remaining in ui.timeout_loop(timeout_secs, tick=0.5):
737        try:
738            conn = psycopg.connect(
739                dbname=dbname,
740                host=host,
741                port=port,
742                user=user,
743                password=password,
744                connect_timeout=1,
745                sslmode=sslmode,
746            )
747            # The default (autocommit = false) wraps everything in a transaction.
748            conn.autocommit = True
749            with conn.cursor() as cur:
750                cur.execute(query.encode())
751                if expected == "any" and cur.rowcount == -1:
752                    ui.progress(" success!", finish=True)
753                    return
754                result = list(cur.fetchall())
755                if expected == "any" or result == expected:
756                    if print_result:
757                        say(f"query result: {result}")
758                    else:
759                        ui.progress(" success!", finish=True)
760                    return
761                else:
762                    say(
763                        f"host={host} port={port} did not return rows matching {expected} got: {result}"
764                    )
765        except Exception as e:
766            ui.progress(f"{e if print_result else ''} {int(remaining)}")
767            error = e
768    ui.progress(finish=True)
769    raise UIError(f"never got correct result for {args}: {error}")
770
771
772def bootstrap_cluster_replica_size() -> str:
773    return "bootstrap"
774
775
776def cluster_replica_size_map() -> dict[str, dict[str, Any]]:
777    """scale=<n>,workers=<n>[,mem=<n>GiB][,legacy]"""
778
779    def replica_size(
780        scale: int,
781        workers: int,
782        disabled: bool = False,
783        is_cc: bool = True,
784        memory_limit: str = "4 GiB",
785    ) -> dict[str, Any]:
786        return {
787            "cpu_exclusive": False,
788            "cpu_limit": None,
789            "credits_per_hour": f"{workers * scale}",
790            "disabled": disabled,
791            "disk_limit": None,
792            "is_cc": is_cc,
793            "memory_limit": memory_limit,
794            "scale": scale,
795            "workers": workers,
796            # "selectors": {},
797        }
798
799    replica_sizes = {
800        bootstrap_cluster_replica_size(): replica_size(1, 1),
801        "scale=2,workers=4": replica_size(2, 4),
802        "scale=1,workers=1,legacy": replica_size(1, 1, is_cc=False),
803        "scale=1,workers=2,legacy": replica_size(1, 2, is_cc=False),
804        # Intentionally not following the naming scheme
805        "free": replica_size(1, 1, disabled=True),
806    }
807
808    for i in range(0, 6):
809        workers = 1 << i
810        replica_sizes[f"scale=1,workers={workers}"] = replica_size(1, workers)
811        for mem in [4, 8, 16, 32]:
812            replica_sizes[f"scale=1,workers={workers},mem={mem}GiB"] = replica_size(
813                1, workers, memory_limit=f"{mem} GiB"
814            )
815
816        replica_sizes[f"scale={workers},workers=1"] = replica_size(workers, 1)
817        replica_sizes[f"scale={workers},workers={workers}"] = replica_size(
818            workers, workers
819        )
820        replica_sizes[f"scale=1,workers={workers},mem={workers}GiB"] = replica_size(
821            1, workers, memory_limit=f"{workers} GiB"
822        )
823
824    return replica_sizes
def say(msg: str) -> None:
55    def say(msg: str) -> None:
56        if not Verbosity.quiet:
57            print(f"{prefix}{msg}", file=sys.stderr)
DEFAULT_CONFLUENT_PLATFORM_VERSION = '8.2.0'
DEFAULT_MZ_VOLUMES = ['mzdata:/mzdata', 'mydata:/var/lib/mysql-files', 'tmp:/share/tmp', 'scratch:/scratch']
ADDITIONAL_BENCHMARKING_SYSTEM_PARAMETERS = {}
def get_minimal_system_parameters(version: materialize.mz_version.MzVersion) -> dict[str, str]:
 55def get_minimal_system_parameters(
 56    version: MzVersion,
 57) -> dict[str, str]:
 58    """Settings we need in order to have tests run at all, but otherwise stay
 59    with the defaults: not changing performance or increasing coverage."""
 60
 61    config = {
 62        # -----
 63        # Unsafe functions
 64        "unsafe_enable_unsafe_functions": "true",
 65        # -----
 66        # Others (ordered by name)
 67        "allow_real_time_recency": "true",
 68        "constraint_based_timestamp_selection": "verify",  # removed from main, keeping it here for old versions
 69        "enable_compute_peek_response_stash": "true",
 70        "enable_0dt_deployment_panic_after_timeout": "true",
 71        "enable_0dt_deployment_sources": (
 72            "true" if version >= MzVersion.parse_mz("v0.132.0-dev") else "false"
 73        ),
 74        "enable_alter_swap": "true",
 75        "enable_arrangement_dictionary_compression_alpha": "false",
 76        "enable_case_literal_transform": "true",
 77        "enable_cast_elimination": "true",
 78        "enable_coalesce_case_transform": "true",
 79        "enable_columnar_lgalloc": "false",
 80        "enable_columnation_lgalloc": "false",
 81        "enable_compute_correction_v2": "true",
 82        "enable_compute_logical_backpressure": "true",
 83        "enable_connection_validation_syntax": "true",
 84        "enable_create_table_from_source": "true",
 85        "enable_eager_delta_joins": "true",
 86        "enable_envelope_debezium_in_subscribe": "true",
 87        "enable_expressions_in_limit_syntax": "true",
 88        "enable_introspection_subscribes": "true",
 89        "enable_kafka_sink_partition_by": "true",
 90        "enable_lgalloc": "false",
 91        "enable_load_generator_counter": "true",
 92        "enable_logical_compaction_window": "true",
 93        "enable_multi_worker_storage_persist_sink": "true",
 94        "enable_multi_replica_sources": "true",
 95        "enable_rbac_checks": "true",
 96        "enable_reduce_mfp_fusion": "true",
 97        "enable_refresh_every_mvs": "true",
 98        "enable_replacement_materialized_views": "true",
 99        "enable_cluster_schedule_refresh": "true",
100        "enable_s3_tables_region_check": "false",
101        "enable_statement_lifecycle_logging": "true",
102        "enable_storage_introspection_logs": "true",
103        "enable_compute_temporal_bucketing": "true",
104        "enable_variadic_left_join_lowering": "true",
105        "enable_worker_core_affinity": "true",
106        "grpc_client_http2_keep_alive_timeout": "5s",
107        "ore_overflowing_behavior": "panic",
108        "unsafe_enable_table_keys": "true",
109        "with_0dt_deployment_max_wait": "1800s",
110        # End of list (ordered by name)
111    }
112
113    if version < MzVersion.parse_mz("v0.163.0-dev"):
114        config["enable_compute_active_dataflow_cancelation"] = "true"
115
116    return config

Settings we need in order to have tests run at all, but otherwise stay with the defaults: not changing performance or increasing coverage.

@dataclass
class VariableSystemParameter:
119@dataclass
120class VariableSystemParameter:
121    key: str
122    default: str
123    values: list[str]
VariableSystemParameter(key: str, default: str, values: list[str])
key: str
default: str
values: list[str]
def get_variable_system_parameters( version: materialize.mz_version.MzVersion, force_source_table_syntax: bool) -> list[VariableSystemParameter]:
127def get_variable_system_parameters(
128    version: MzVersion,
129    force_source_table_syntax: bool,
130) -> list[VariableSystemParameter]:
131    """Note: Only the default is tested unless we explicitly select "System Parameters: Random" in trigger-ci.
132    These defaults are applied _after_ applying the settings from `get_minimal_system_parameters`.
133    """
134
135    return [
136        # -----
137        # To reduce CRDB load as we are struggling with it in CI (values based on load test environment):
138        VariableSystemParameter(
139            "persist_next_listen_batch_retryer_clamp",
140            "16s",
141            ["100ms", "1s", "10s", "100s"],
142        ),
143        VariableSystemParameter(
144            "persist_next_listen_batch_retryer_initial_backoff",
145            "100ms",
146            ["10ms", "100ms", "1s", "10s"],
147        ),
148        VariableSystemParameter(
149            "persist_next_listen_batch_retryer_fixed_sleep",
150            "1200ms",
151            ["100ms", "1s", "10s"],
152        ),
153        # -----
154        # Persist internals changes, advance coverage
155        VariableSystemParameter(
156            "persist_enable_arrow_lgalloc_noncc_sizes", "true", ["true", "false"]
157        ),
158        VariableSystemParameter(
159            "persist_enable_s3_lgalloc_noncc_sizes", "true", ["true", "false"]
160        ),
161        # -----
162        # Others (ordered by name),
163        VariableSystemParameter(
164            "compute_correction_v2_chain_proportionality",
165            "3",
166            ["2", "3"],
167        ),
168        VariableSystemParameter(
169            "compute_correction_v2_chunk_size",
170            "8192",
171            ["8192", "65536", "1048576"],
172        ),
173        VariableSystemParameter(
174            "compute_dataflow_max_inflight_bytes",
175            "134217728",
176            ["1048576", "4194304", "16777216", "67108864"],
177        ),  # 128 MiB
178        VariableSystemParameter("compute_hydration_concurrency", "2", ["1", "2", "4"]),
179        VariableSystemParameter(
180            "compute_replica_expiration_offset", "3d", ["3d", "10d"]
181        ),
182        VariableSystemParameter(
183            "compute_apply_column_demands", "true", ["true", "false"]
184        ),
185        VariableSystemParameter(
186            "compute_peek_response_stash_threshold_bytes",
187            # 1 MiB, an in-between value
188            "1048576",
189            # force-enabled, the in-between, and the production value
190            ["0", "1048576", "314572800", "67108864"],
191        ),
192        VariableSystemParameter(
193            "compute_subscribe_snapshot_optimization",
194            "true",
195            ["true", "false"],
196        ),
197        VariableSystemParameter(
198            "enable_case_literal_transform",
199            "false",
200            ["true", "false"],
201        ),
202        VariableSystemParameter(
203            "enable_coalesce_case_transform",
204            "true",
205            ["true", "false"],
206        ),
207        VariableSystemParameter(
208            "enable_cast_elimination",
209            "true",
210            ["true", "false"],
211        ),
212        VariableSystemParameter(
213            "enable_compute_sync_mv_sink",
214            "true",
215            ["true", "false"],
216        ),
217        VariableSystemParameter(
218            "enable_password_auth",
219            "true",
220            ["true", "false"],
221        ),
222        VariableSystemParameter(
223            "enable_frontend_peek_sequencing",
224            "true" if version >= MzVersion.parse_mz("v26.9.0-dev") else "false",
225            ["true", "false"],
226        ),
227        VariableSystemParameter(
228            "enable_frontend_subscribes",
229            "true" if version >= MzVersion.parse_mz("v26.18.0-dev") else "false",
230            ["true", "false"],
231        ),
232        VariableSystemParameter(
233            "enable_upsert_v2",
234            "false",
235            ["true", "false"],
236        ),
237        VariableSystemParameter(
238            "default_timestamp_interval",
239            "1s",
240            ["100ms", "1s"],
241        ),
242        VariableSystemParameter(
243            "force_source_table_syntax",
244            "true" if force_source_table_syntax else "false",
245            ["true", "false"] if force_source_table_syntax else ["false"],
246        ),
247        VariableSystemParameter(
248            "persist_batch_columnar_format",
249            "structured" if version > MzVersion.parse_mz("v0.135.0-dev") else "both_v2",
250            ["row", "both_v2", "both", "structured"],
251        ),
252        VariableSystemParameter(
253            "persist_batch_delete_enabled", "true", ["true", "false"]
254        ),
255        VariableSystemParameter(
256            "persist_batch_structured_order", "true", ["true", "false"]
257        ),
258        VariableSystemParameter(
259            "persist_batch_builder_structured", "true", ["true", "false"]
260        ),
261        VariableSystemParameter(
262            "persist_batch_structured_key_lower_len",
263            "256",
264            ["0", "1", "512", "1000"],
265        ),
266        VariableSystemParameter(
267            "persist_batch_max_run_len", "4", ["2", "3", "4", "16"]
268        ),
269        VariableSystemParameter(
270            "persist_catalog_force_compaction_fuel",
271            "1024",
272            ["256", "1024", "4096"],
273        ),
274        VariableSystemParameter(
275            "persist_catalog_force_compaction_wait",
276            "1s",
277            ["100ms", "1s", "10s"],
278        ),
279        VariableSystemParameter(
280            "persist_stats_audit_percent",
281            "100",
282            [
283                "0",
284                "1",
285                "2",
286                "10",
287                "100",
288            ],
289        ),
290        VariableSystemParameter("persist_stats_audit_panic", "true", ["true", "false"]),
291        VariableSystemParameter(
292            "persist_encoding_enable_dictionary", "true", ["true", "false"]
293        ),
294        VariableSystemParameter(
295            "persist_fast_path_limit",
296            "1000",
297            ["100", "1000", "10000"],
298        ),
299        VariableSystemParameter("persist_fast_path_order", "true", ["true", "false"]),
300        VariableSystemParameter(
301            "persist_gc_use_active_gc",
302            ("true" if version > MzVersion.parse_mz("v0.143.0-dev") else "false"),
303            (
304                ["true", "false"]
305                if version > MzVersion.parse_mz("v0.127.0-dev")
306                else ["false"]
307            ),
308        ),
309        VariableSystemParameter(
310            "persist_gc_min_versions",
311            "16",
312            ["16", "256", "1024"],
313        ),
314        VariableSystemParameter(
315            "persist_gc_max_versions",
316            "128000",
317            ["256", "128000"],
318        ),
319        VariableSystemParameter(
320            "persist_inline_writes_single_max_bytes",
321            "4096",
322            ["256", "1024", "4096", "16384"],
323        ),
324        VariableSystemParameter(
325            "persist_inline_writes_total_max_bytes",
326            "1048576",
327            ["65536", "262144", "1048576", "4194304"],
328        ),
329        VariableSystemParameter(
330            "persist_pubsub_client_enabled", "true", ["true", "false"]
331        ),
332        VariableSystemParameter(
333            "persist_pubsub_push_diff_enabled", "true", ["true", "false"]
334        ),
335        VariableSystemParameter(
336            "persist_record_compactions", "true", ["true", "false"]
337        ),
338        VariableSystemParameter(
339            "persist_record_schema_id",
340            ("true" if version > MzVersion.parse_mz("v0.127.0-dev") else "false"),
341            (
342                ["true", "false"]
343                if version > MzVersion.parse_mz("v0.127.0-dev")
344                else ["false"]
345            ),
346        ),
347        VariableSystemParameter(
348            "persist_rollup_use_active_rollup",
349            ("true" if version > MzVersion.parse_mz("v0.143.0-dev") else "false"),
350            (
351                ["true", "false"]
352                if version > MzVersion.parse_mz("v0.127.0-dev")
353                else ["false"]
354            ),
355        ),
356        # 16 MiB - large enough to avoid a big perf hit, small enough to get more coverage...
357        VariableSystemParameter(
358            "persist_blob_target_size",
359            "16777216",
360            ["4096", "1048576", "16777216", "134217728"],
361        ),
362        # 5 times the default part size - 4 is the bare minimum.
363        VariableSystemParameter(
364            "persist_compaction_memory_bound_bytes",
365            "83886080",
366            ["67108864", "134217728", "536870912", "1073741824"],
367        ),
368        VariableSystemParameter(
369            "persist_enable_incremental_compaction",
370            ("true" if version >= MzVersion.parse_mz("v0.161.0-dev") else "false"),
371            (
372                ["true", "false"]
373                if version >= MzVersion.parse_mz("v0.161.0-dev")
374                else ["false"]
375            ),
376        ),
377        VariableSystemParameter(
378            "persist_use_critical_since_catalog", "true", ["true", "false"]
379        ),
380        VariableSystemParameter(
381            "persist_use_critical_since_snapshot",
382            "false",  # always false, because we always have zero-downtime enabled
383            ["false"],
384        ),
385        VariableSystemParameter(
386            "persist_use_critical_since_source",
387            "false",  # always false, because we always have zero-downtime enabled
388            ["false"],
389        ),
390        VariableSystemParameter(
391            "persist_part_decode_format", "arrow", ["arrow", "row_with_validate"]
392        ),
393        VariableSystemParameter(
394            "persist_blob_cache_scale_with_threads", "true", ["true", "false"]
395        ),
396        VariableSystemParameter(
397            "persist_state_update_lease_timeout", "1s", ["0s", "1s", "10s"]
398        ),
399        VariableSystemParameter(
400            "arrangement_size_history_collection_interval", "1h", ["1s", "10s", "1h"]
401        ),
402        VariableSystemParameter(
403            "arrangement_size_history_retention_period", "7d", ["1min", "1h", "7d"]
404        ),
405        VariableSystemParameter(
406            "persist_validate_part_bounds_on_read", "false", ["true", "false"]
407        ),
408        VariableSystemParameter(
409            "persist_validate_part_bounds_on_write", "false", ["true", "false"]
410        ),
411        VariableSystemParameter(
412            "statement_logging_default_sample_rate",
413            "1.0",
414            ["0", "0.01", "0.5", "0.99", "1.0"],
415        ),
416        VariableSystemParameter(
417            "statement_logging_max_data_credit",
418            "",
419            ["", "0", "1024", "1048576", "1073741824"],
420        ),
421        VariableSystemParameter(
422            "statement_logging_max_sample_rate",
423            "1.0",
424            ["0", "0.01", "0.5", "0.99", "1.0"],
425        ),
426        VariableSystemParameter(
427            "statement_logging_target_data_rate",
428            "",
429            ["", "0", "1", "1000", "2071", "1000000"],
430        ),
431        VariableSystemParameter("storage_reclock_to_latest", "true", ["true", "false"]),
432        VariableSystemParameter(
433            "storage_source_decode_fuel",
434            "100000",
435            ["10000", "100000", "1000000"],
436        ),
437        VariableSystemParameter(
438            "storage_statistics_collection_interval",
439            "1000",
440            ["100", "1000", "10000"],
441        ),
442        VariableSystemParameter(
443            "storage_statistics_interval", "2000", ["100", "1000", "10000"]
444        ),
445        VariableSystemParameter(
446            "storage_use_continual_feedback_upsert", "true", ["true", "false"]
447        ),
448        # End of list (ordered by name)
449    ]

Note: Only the default is tested unless we explicitly select "System Parameters: Random" in trigger-ci. These defaults are applied _after_ applying the settings from get_minimal_system_parameters.

def get_default_system_parameters( version: materialize.mz_version.MzVersion | None = None, force_source_table_syntax: bool = False) -> dict[str, str]:
452def get_default_system_parameters(
453    version: MzVersion | None = None,
454    force_source_table_syntax: bool = False,
455) -> dict[str, str]:
456    """For upgrade tests we only want parameters set when all environmentd /
457    clusterd processes have reached a specific version (or higher)
458    """
459
460    if not version:
461        version = MzVersion.parse_cargo()
462
463    params = get_minimal_system_parameters(version)
464
465    system_param_setting = os.getenv("CI_SYSTEM_PARAMETERS", "")
466    variable_params = get_variable_system_parameters(version, force_source_table_syntax)
467
468    if system_param_setting == "":
469        for param in variable_params:
470            params[param.key] = param.default
471    elif system_param_setting == "random":
472        seed = os.getenv("CI_SYSTEM_PARAMETERS_SEED", os.getenv("BUILDKITE_JOB_ID", 1))
473        rng = random.Random(seed)
474        for param in variable_params:
475            params[param.key] = rng.choice(param.values)
476        print(
477            f"System parameters with seed CI_SYSTEM_PARAMETERS_SEED={seed}: {params}",
478            file=sys.stderr,
479        )
480    elif system_param_setting == "minimal":
481        pass
482    else:
483        raise ValueError(
484            f"Unknown value for CI_SYSTEM_PARAMETERS: {system_param_setting}"
485        )
486
487    return params

For upgrade tests we only want parameters set when all environmentd / clusterd processes have reached a specific version (or higher)

UNINTERESTING_SYSTEM_PARAMETERS = ['enable_compute_half_join2', 'enable_mz_join_core', 'linear_join_yielding', 'enable_column_paged_batcher', 'enable_column_paged_batcher_spill', 'column_paged_batcher_budget_fraction', 'column_paged_batcher_lz4', 'column_paged_batcher_swap_pageout', 'enable_upsert_paged_spill', 'enable_lgalloc_eager_reclamation', 'lgalloc_background_interval', 'lgalloc_file_growth_dampener', 'lgalloc_local_buffer_bytes', 'lgalloc_slow_clear_bytes', 'memory_limiter_interval', 'memory_limiter_usage_bias', 'memory_limiter_burst_factor', 'catalog_info_metrics_reconcile_interval', 'compute_server_maintenance_interval', 'compute_dataflow_max_inflight_bytes_cc', 'compute_flat_map_fuel', 'compute_temporal_bucketing_summary', 'consolidating_vec_growth_dampener', 'copy_to_s3_parquet_row_group_file_ratio', 'copy_to_s3_arrow_builder_buffer_ratio', 'copy_to_s3_multipart_part_size_bytes', 'enable_replica_targeted_materialized_views', 'compute_mv_sink_advance_persist_frontiers', 'compute_prometheus_introspection_scrape_interval', 'enable_compute_replica_expiration', 'enable_compute_render_fueled_as_specific_collection', 'compute_logical_backpressure_max_retained_capabilities', 'compute_logical_backpressure_inflight_slack', 'persist_fetch_semaphore_cost_adjustment', 'persist_fetch_semaphore_permit_adjustment', 'persist_optimize_ignored_data_fetch', 'persist_pubsub_same_process_delegate_enabled', 'persist_pubsub_connect_attempt_timeout', 'persist_pubsub_request_timeout', 'persist_pubsub_connect_max_backoff', 'persist_pubsub_client_sender_channel_size', 'persist_pubsub_client_receiver_channel_size', 'persist_pubsub_server_connection_channel_size', 'persist_pubsub_state_cache_shard_ref_channel_size', 'persist_pubsub_reconnect_backoff', 'persist_encoding_compression_format', 'persist_batch_max_runs', 'persist_write_combine_inline_writes', 'persist_reader_lease_duration', 'persist_consensus_connection_pool_max_size', 'persist_consensus_connection_pool_max_wait', 'persist_consensus_connection_pool_ttl', 'persist_consensus_connection_pool_ttl_stagger', 'crdb_connect_timeout', 'crdb_tcp_user_timeout', 'crdb_keepalives_idle', 'crdb_keepalives_interval', 'crdb_keepalives_retries', 'persist_use_critical_since_txn', 'use_global_txn_cache_source', 'persist_batch_builder_max_outstanding_parts', 'persist_compaction_heuristic_min_inputs', 'persist_compaction_heuristic_min_parts', 'persist_compaction_heuristic_min_updates', 'persist_gc_blob_delete_concurrency_limit', 'persist_state_versions_recent_live_diffs_limit', 'persist_usage_state_fetch_concurrency_limit', 'persist_blob_operation_timeout', 'persist_blob_operation_attempt_timeout', 'persist_blob_connect_timeout', 'persist_blob_read_timeout', 'persist_stats_collection_enabled', 'persist_stats_filter_enabled', 'persist_stats_budget_bytes', 'persist_stats_untrimmable_columns_equals', 'persist_stats_untrimmable_columns_prefix', 'persist_stats_untrimmable_columns_suffix', 'persist_expression_cache_force_compaction_fuel', 'persist_expression_cache_force_compaction_wait', 'persist_blob_cache_mem_limit_bytes', 'persist_blob_cache_scale_factor_bytes', 'persist_claim_unclaimed_compactions', 'persist_claim_compaction_percent', 'persist_claim_compaction_min_version', 'persist_next_listen_batch_retryer_multiplier', 'persist_rollup_threshold', 'persist_rollup_fallback_threshold_ms', 'persist_gc_fallback_threshold_ms', 'persist_compaction_minimum_timeout', 'persist_compaction_check_process_flag', 'balancerd_sigterm_connection_wait', 'balancerd_sigterm_listen_wait', 'balancerd_inject_proxy_protocol_header_http', 'balancerd_log_filter', 'balancerd_opentelemetry_filter', 'balancerd_log_filter_defaults', 'balancerd_opentelemetry_filter_defaults', 'balancerd_sentry_filters', 'persist_enable_s3_lgalloc_cc_sizes', 'persist_enable_arrow_lgalloc_cc_sizes', 'controller_past_generation_replica_cleanup_retry_interval', 'wallclock_lag_recording_interval', 'wallclock_lag_histogram_period_interval', 'enable_timely_zero_copy', 'enable_timely_zero_copy_lgalloc', 'timely_zero_copy_limit', 'arrangement_exert_proportionality', 'txn_wal_apply_ensure_schema_match', 'persist_txns_data_shard_retryer_initial_backoff', 'persist_txns_data_shard_retryer_multiplier', 'persist_txns_data_shard_retryer_clamp', 'storage_cluster_shutdown_grace_period', 'storage_dataflow_delay_sources_past_rehydration', 'storage_dataflow_suspendable_sources', 'storage_downgrade_since_during_finalization', 'replica_metrics_history_retention_interval', 'wallclock_lag_history_retention_interval', 'wallclock_global_lag_histogram_retention_interval', 'kafka_client_id_enrichment_rules', 'kafka_poll_max_wait', 'kafka_default_aws_privatelink_endpoint_identification_algorithm', 'kafka_buffered_event_resize_threshold_elements', 'kafka_low_watermark_check', 'mysql_replication_heartbeat_interval', 'postgres_fetch_slot_resume_lsn_interval', 'pg_schema_validation_interval', 'pg_source_validate_timeline', 'sql_server_source_validate_restore_history', 'storage_enforce_external_addresses', 'storage_upsert_prevent_snapshot_buffering', 'storage_rocksdb_use_merge_operator', 'storage_upsert_max_snapshot_batch_buffering', 'storage_rocksdb_cleanup_tries', 'storage_suspend_and_restart_delay', 'storage_server_maintenance_interval', 'storage_sink_progress_search', 'storage_sink_ensure_topic_config', 'sql_server_max_lsn_wait', 'sql_server_snapshot_progress_report_interval', 'sql_server_cdc_cleanup_change_table', 'sql_server_cdc_cleanup_change_table_max_deletes', 'allow_user_sessions', 'with_0dt_deployment_ddl_check_interval', 'enable_0dt_caught_up_check', 'with_0dt_caught_up_check_allowed_lag', 'with_0dt_caught_up_check_cutoff', 'enable_0dt_caught_up_replica_status_check', 'plan_insights_notice_fast_path_clusters_optimize_duration', 'enable_expression_cache', 'mz_metrics_lgalloc_map_refresh_interval', 'mz_metrics_lgalloc_refresh_interval', 'mz_metrics_rusage_refresh_interval', 'compute_peek_response_stash_batch_max_runs', 'compute_peek_response_stash_read_batch_size_bytes', 'compute_peek_response_stash_read_memory_budget_bytes', 'compute_peek_stash_num_batches', 'compute_peek_stash_batch_size', 'storage_statistics_retention_duration', 'enable_paused_cluster_readhold_downgrade', 'kafka_retry_backoff', 'kafka_retry_backoff_max', 'kafka_reconnect_backoff', 'kafka_reconnect_backoff_max', 'kafka_sink_message_max_bytes', 'kafka_sink_batch_size', 'kafka_sink_batch_num_messages', 'oidc_issuer', 'oidc_audience', 'oidc_authentication_claim', 'oidc_group_role_sync_enabled', 'oidc_group_claim', 'oidc_group_role_sync_strict', 'console_oidc_client_id', 'console_oidc_scopes', 'enable_public_metrics_endpoint', 'enable_mcp_agent', 'enable_mcp_agent_query_tool', 'enable_mcp_developer', 'enable_mcp_developer_query_tool', 'mcp_max_response_size', 'user_id_pool_batch_size']
DEFAULT_CRDB_ENVIRONMENT = ['COCKROACH_ENGINE_MAX_SYNC_DURATION_DEFAULT=120s', 'COCKROACH_LOG_MAX_SYNC_DURATION=120s']
DEFAULT_CLOUD_PROVIDER = 'mzcompose'
DEFAULT_CLOUD_REGION = 'us-east-1'
DEFAULT_ORG_ID = '00000000-0000-0000-0000-000000000000'
DEFAULT_ORDINAL = '0'
DEFAULT_MZ_ENVIRONMENT_ID = 'mzcompose-us-east-1-00000000-0000-0000-0000-000000000000-0'
def bootstrap_cluster_replica_size() -> str:
773def bootstrap_cluster_replica_size() -> str:
774    return "bootstrap"
def cluster_replica_size_map() -> dict[str, dict[str, typing.Any]]:
777def cluster_replica_size_map() -> dict[str, dict[str, Any]]:
778    """scale=<n>,workers=<n>[,mem=<n>GiB][,legacy]"""
779
780    def replica_size(
781        scale: int,
782        workers: int,
783        disabled: bool = False,
784        is_cc: bool = True,
785        memory_limit: str = "4 GiB",
786    ) -> dict[str, Any]:
787        return {
788            "cpu_exclusive": False,
789            "cpu_limit": None,
790            "credits_per_hour": f"{workers * scale}",
791            "disabled": disabled,
792            "disk_limit": None,
793            "is_cc": is_cc,
794            "memory_limit": memory_limit,
795            "scale": scale,
796            "workers": workers,
797            # "selectors": {},
798        }
799
800    replica_sizes = {
801        bootstrap_cluster_replica_size(): replica_size(1, 1),
802        "scale=2,workers=4": replica_size(2, 4),
803        "scale=1,workers=1,legacy": replica_size(1, 1, is_cc=False),
804        "scale=1,workers=2,legacy": replica_size(1, 2, is_cc=False),
805        # Intentionally not following the naming scheme
806        "free": replica_size(1, 1, disabled=True),
807    }
808
809    for i in range(0, 6):
810        workers = 1 << i
811        replica_sizes[f"scale=1,workers={workers}"] = replica_size(1, workers)
812        for mem in [4, 8, 16, 32]:
813            replica_sizes[f"scale=1,workers={workers},mem={mem}GiB"] = replica_size(
814                1, workers, memory_limit=f"{mem} GiB"
815            )
816
817        replica_sizes[f"scale={workers},workers=1"] = replica_size(workers, 1)
818        replica_sizes[f"scale={workers},workers={workers}"] = replica_size(
819            workers, workers
820        )
821        replica_sizes[f"scale=1,workers={workers},mem={workers}GiB"] = replica_size(
822            1, workers, memory_limit=f"{workers} GiB"
823        )
824
825    return replica_sizes

scale=,workers=[,mem=GiB][,legacy]