misc.python.materialize.mzcompose

The implementation of the mzcompose system for Docker compositions.

For an overview of what mzcompose is and why it exists, see the user-facing documentation.

  1# Copyright Materialize, Inc. and contributors. All rights reserved.
  2#
  3# Use of this software is governed by the Business Source License
  4# included in the LICENSE file at the root of this repository.
  5#
  6# As of the Change Date specified in that file, in accordance with
  7# the Business Source License, use of this software will be governed
  8# by the Apache License, Version 2.0.
  9
 10"""The implementation of the mzcompose system for Docker compositions.
 11
 12For an overview of what mzcompose is and why it exists, see the [user-facing
 13documentation][user-docs].
 14
 15[user-docs]: https://github.com/MaterializeInc/materialize/blob/main/doc/developer/mzbuild.md
 16"""
 17
 18import os
 19import random
 20import subprocess
 21import sys
 22from collections.abc import Iterable
 23from dataclasses import dataclass
 24from typing import Any, Literal, TypeVar
 25
 26import psycopg
 27
 28from materialize import spawn, ui
 29from materialize.mz_version import MzVersion
 30from materialize.ui import UIError
 31
 32T = TypeVar("T")
 33say = ui.speaker("C> ")
 34
 35
 36DEFAULT_CONFLUENT_PLATFORM_VERSION = "7.9.4"
 37
 38DEFAULT_MZ_VOLUMES = [
 39    "mzdata:/mzdata",
 40    "mydata:/var/lib/mysql-files",
 41    "tmp:/share/tmp",
 42    "scratch:/scratch",
 43]
 44
 45
 46# Parameters which disable systems that periodically/unpredictably impact performance
 47# We try to keep this empty, so that we benchmark Materialize as we ship it. If
 48# a new feature causes benchmarks to become flaky, consider that this can also
 49# impact customers' experience and try to find a solution other than disabling
 50# the feature here!
 51ADDITIONAL_BENCHMARKING_SYSTEM_PARAMETERS = {}
 52
 53
 54def get_minimal_system_parameters(
 55    version: MzVersion,
 56) -> dict[str, str]:
 57    """Settings we need in order to have tests run at all, but otherwise stay
 58    with the defaults: not changing performance or increasing coverage."""
 59
 60    config = {
 61        # -----
 62        # Unsafe functions
 63        "unsafe_enable_unsafe_functions": "true",
 64        # -----
 65        # Others (ordered by name)
 66        "allow_real_time_recency": "true",
 67        "constraint_based_timestamp_selection": "verify",  # removed from main, keeping it here for old versions
 68        "enable_compute_peek_response_stash": "true",
 69        "enable_0dt_deployment_panic_after_timeout": "true",
 70        "enable_0dt_deployment_sources": (
 71            "true" if version >= MzVersion.parse_mz("v0.132.0-dev") else "false"
 72        ),
 73        "enable_alter_swap": "true",
 74        "enable_cast_elimination": "true",
 75        "enable_columnar_lgalloc": "false",
 76        "enable_columnation_lgalloc": "false",
 77        "enable_compute_correction_v2": "true",
 78        "enable_compute_logical_backpressure": "true",
 79        "enable_connection_validation_syntax": "true",
 80        "enable_continual_task_create": "true",
 81        "enable_continual_task_retain": "true",
 82        "enable_continual_task_transform": "true",
 83        "enable_copy_to_expr": "true",
 84        "enable_copy_from_remote": "true",
 85        "enable_create_table_from_source": "true",
 86        "enable_eager_delta_joins": "true",
 87        "enable_envelope_debezium_in_subscribe": "true",
 88        "enable_expressions_in_limit_syntax": "true",
 89        "enable_iceberg_sink": "true",
 90        "enable_introspection_subscribes": "true",
 91        "enable_kafka_sink_partition_by": "true",
 92        "enable_lgalloc": "false",
 93        "enable_load_generator_counter": "true",
 94        "enable_logical_compaction_window": "true",
 95        "enable_multi_worker_storage_persist_sink": "true",
 96        "enable_multi_replica_sources": "true",
 97        "enable_rbac_checks": "true",
 98        "enable_reduce_mfp_fusion": "true",
 99        "enable_refresh_every_mvs": "true",
100        "enable_replacement_materialized_views": "true",
101        "enable_cluster_schedule_refresh": "true",
102        "enable_sql_server_source": "true",
103        "enable_s3_tables_region_check": "false",
104        "enable_statement_lifecycle_logging": "true",
105        "enable_compute_temporal_bucketing": "true",
106        "enable_variadic_left_join_lowering": "true",
107        "enable_worker_core_affinity": "true",
108        "grpc_client_http2_keep_alive_timeout": "5s",
109        "ore_overflowing_behavior": "panic",
110        "unsafe_enable_table_keys": "true",
111        "with_0dt_deployment_max_wait": "1800s",
112        # End of list (ordered by name)
113    }
114
115    if version < MzVersion.parse_mz("v0.163.0-dev"):
116        config["enable_compute_active_dataflow_cancelation"] = "true"
117
118    return config
119
120
121@dataclass
122class VariableSystemParameter:
123    key: str
124    default: str
125    values: list[str]
126
127
128# TODO: The linter should check this too
129def get_variable_system_parameters(
130    version: MzVersion,
131    force_source_table_syntax: bool,
132) -> list[VariableSystemParameter]:
133    """Note: Only the default is tested unless we explicitly select "System Parameters: Random" in trigger-ci.
134    These defaults are applied _after_ applying the settings from `get_minimal_system_parameters`.
135    """
136
137    return [
138        # -----
139        # To reduce CRDB load as we are struggling with it in CI (values based on load test environment):
140        VariableSystemParameter(
141            "persist_next_listen_batch_retryer_clamp",
142            "16s",
143            ["100ms", "1s", "10s", "100s"],
144        ),
145        VariableSystemParameter(
146            "persist_next_listen_batch_retryer_initial_backoff",
147            "100ms",
148            ["10ms", "100ms", "1s", "10s"],
149        ),
150        VariableSystemParameter(
151            "persist_next_listen_batch_retryer_fixed_sleep",
152            "1200ms",
153            ["100ms", "1s", "10s"],
154        ),
155        # -----
156        # Persist internals changes, advance coverage
157        VariableSystemParameter(
158            "persist_enable_arrow_lgalloc_noncc_sizes", "true", ["true", "false"]
159        ),
160        VariableSystemParameter(
161            "persist_enable_s3_lgalloc_noncc_sizes", "true", ["true", "false"]
162        ),
163        # -----
164        # Others (ordered by name),
165        VariableSystemParameter(
166            "compute_dataflow_max_inflight_bytes",
167            "134217728",
168            ["1048576", "4194304", "16777216", "67108864"],
169        ),  # 128 MiB
170        VariableSystemParameter("compute_hydration_concurrency", "2", ["1", "2", "4"]),
171        VariableSystemParameter(
172            "compute_replica_expiration_offset", "3d", ["3d", "10d"]
173        ),
174        VariableSystemParameter(
175            "compute_apply_column_demands", "true", ["true", "false"]
176        ),
177        VariableSystemParameter(
178            "compute_peek_response_stash_threshold_bytes",
179            # 1 MiB, an in-between value
180            "1048576",
181            # force-enabled, the in-between, and the production value
182            ["0", "1048576", "314572800", "67108864"],
183        ),
184        VariableSystemParameter(
185            "compute_subscribe_snapshot_optimization",
186            "true",
187            ["true", "false"],
188        ),
189        VariableSystemParameter(
190            "enable_cast_elimination",
191            "true",
192            ["true", "false"],
193        ),
194        VariableSystemParameter(
195            "enable_password_auth",
196            "true",
197            ["true", "false"],
198        ),
199        VariableSystemParameter(
200            "enable_frontend_peek_sequencing",
201            "true" if version >= MzVersion.parse_mz("v26.9.0-dev") else "false",
202            ["true", "false"],
203        ),
204        VariableSystemParameter(
205            "default_timestamp_interval",
206            "1s",
207            ["100ms", "1s"],
208        ),
209        VariableSystemParameter(
210            "force_source_table_syntax",
211            "true" if force_source_table_syntax else "false",
212            ["true", "false"] if force_source_table_syntax else ["false"],
213        ),
214        VariableSystemParameter(
215            "persist_batch_columnar_format",
216            "structured" if version > MzVersion.parse_mz("v0.135.0-dev") else "both_v2",
217            ["row", "both_v2", "both", "structured"],
218        ),
219        VariableSystemParameter(
220            "persist_batch_delete_enabled", "true", ["true", "false"]
221        ),
222        VariableSystemParameter(
223            "persist_batch_structured_order", "true", ["true", "false"]
224        ),
225        VariableSystemParameter(
226            "persist_batch_builder_structured", "true", ["true", "false"]
227        ),
228        VariableSystemParameter(
229            "persist_batch_structured_key_lower_len",
230            "256",
231            ["0", "1", "512", "1000"],
232        ),
233        VariableSystemParameter(
234            "persist_batch_max_run_len", "4", ["2", "3", "4", "16"]
235        ),
236        VariableSystemParameter(
237            "persist_catalog_force_compaction_fuel",
238            "1024",
239            ["256", "1024", "4096"],
240        ),
241        VariableSystemParameter(
242            "persist_catalog_force_compaction_wait",
243            "1s",
244            ["100ms", "1s", "10s"],
245        ),
246        VariableSystemParameter(
247            "persist_encoding_enable_dictionary", "true", ["true", "false"]
248        ),
249        VariableSystemParameter(
250            "persist_stats_audit_percent",
251            "100",
252            [
253                "0",
254                "1",
255                "2",
256                "10",
257                "100",
258            ],
259        ),
260        VariableSystemParameter("persist_stats_audit_panic", "true", ["true", "false"]),
261        VariableSystemParameter(
262            "persist_encoding_enable_dictionary", "true", ["true", "false"]
263        ),
264        VariableSystemParameter(
265            "persist_fast_path_limit",
266            "1000",
267            ["100", "1000", "10000"],
268        ),
269        VariableSystemParameter("persist_fast_path_order", "true", ["true", "false"]),
270        VariableSystemParameter(
271            "persist_gc_use_active_gc",
272            ("true" if version > MzVersion.parse_mz("v0.143.0-dev") else "false"),
273            (
274                ["true", "false"]
275                if version > MzVersion.parse_mz("v0.127.0-dev")
276                else ["false"]
277            ),
278        ),
279        VariableSystemParameter(
280            "persist_gc_min_versions",
281            "16",
282            ["16", "256", "1024"],
283        ),
284        VariableSystemParameter(
285            "persist_gc_max_versions",
286            "128000",
287            ["256", "128000"],
288        ),
289        VariableSystemParameter(
290            "persist_inline_writes_single_max_bytes",
291            "4096",
292            ["256", "1024", "4096", "16384"],
293        ),
294        VariableSystemParameter(
295            "persist_inline_writes_total_max_bytes",
296            "1048576",
297            ["65536", "262144", "1048576", "4194304"],
298        ),
299        VariableSystemParameter(
300            "persist_pubsub_client_enabled", "true", ["true", "false"]
301        ),
302        VariableSystemParameter(
303            "persist_pubsub_push_diff_enabled", "true", ["true", "false"]
304        ),
305        VariableSystemParameter(
306            "persist_record_compactions", "true", ["true", "false"]
307        ),
308        VariableSystemParameter(
309            "persist_record_schema_id",
310            ("true" if version > MzVersion.parse_mz("v0.127.0-dev") else "false"),
311            (
312                ["true", "false"]
313                if version > MzVersion.parse_mz("v0.127.0-dev")
314                else ["false"]
315            ),
316        ),
317        VariableSystemParameter(
318            "persist_rollup_use_active_rollup",
319            ("true" if version > MzVersion.parse_mz("v0.143.0-dev") else "false"),
320            (
321                ["true", "false"]
322                if version > MzVersion.parse_mz("v0.127.0-dev")
323                else ["false"]
324            ),
325        ),
326        # 16 MiB - large enough to avoid a big perf hit, small enough to get more coverage...
327        VariableSystemParameter(
328            "persist_blob_target_size",
329            "16777216",
330            ["4096", "1048576", "16777216", "134217728"],
331        ),
332        # 5 times the default part size - 4 is the bare minimum.
333        VariableSystemParameter(
334            "persist_compaction_memory_bound_bytes",
335            "83886080",
336            ["67108864", "134217728", "536870912", "1073741824"],
337        ),
338        VariableSystemParameter(
339            "persist_enable_incremental_compaction",
340            ("true" if version >= MzVersion.parse_mz("v0.161.0-dev") else "false"),
341            (
342                ["true", "false"]
343                if version >= MzVersion.parse_mz("v0.161.0-dev")
344                else ["false"]
345            ),
346        ),
347        VariableSystemParameter(
348            "persist_use_critical_since_catalog", "true", ["true", "false"]
349        ),
350        VariableSystemParameter(
351            "persist_use_critical_since_snapshot",
352            "false",  # always false, because we always have zero-downtime enabled
353            ["false"],
354        ),
355        VariableSystemParameter(
356            "persist_use_critical_since_source",
357            "false",  # always false, because we always have zero-downtime enabled
358            ["false"],
359        ),
360        VariableSystemParameter(
361            "persist_part_decode_format", "arrow", ["arrow", "row_with_validate"]
362        ),
363        VariableSystemParameter(
364            "persist_blob_cache_scale_with_threads", "true", ["true", "false"]
365        ),
366        VariableSystemParameter(
367            "persist_state_update_lease_timeout", "1s", ["0s", "1s", "10s"]
368        ),
369        VariableSystemParameter(
370            "persist_validate_part_bounds_on_read", "false", ["true", "false"]
371        ),
372        VariableSystemParameter(
373            "persist_validate_part_bounds_on_write", "false", ["true", "false"]
374        ),
375        VariableSystemParameter(
376            "statement_logging_default_sample_rate",
377            "1.0",
378            ["0", "0.01", "0.5", "0.99", "1.0"],
379        ),
380        VariableSystemParameter(
381            "statement_logging_max_data_credit",
382            "",
383            ["", "0", "1024", "1048576", "1073741824"],
384        ),
385        VariableSystemParameter(
386            "statement_logging_max_sample_rate",
387            "1.0",
388            ["0", "0.01", "0.5", "0.99", "1.0"],
389        ),
390        VariableSystemParameter(
391            "statement_logging_target_data_rate",
392            "",
393            ["", "0", "1", "1000", "2071", "1000000"],
394        ),
395        VariableSystemParameter("storage_reclock_to_latest", "true", ["true", "false"]),
396        VariableSystemParameter(
397            "storage_source_decode_fuel",
398            "100000",
399            ["10000", "100000", "1000000"],
400        ),
401        VariableSystemParameter(
402            "storage_statistics_collection_interval",
403            "1000",
404            ["100", "1000", "10000"],
405        ),
406        VariableSystemParameter(
407            "storage_statistics_interval", "2000", ["100", "1000", "10000"]
408        ),
409        VariableSystemParameter(
410            "storage_use_continual_feedback_upsert", "true", ["true", "false"]
411        ),
412        # End of list (ordered by name)
413    ]
414
415
416def get_default_system_parameters(
417    version: MzVersion | None = None,
418    force_source_table_syntax: bool = False,
419) -> dict[str, str]:
420    """For upgrade tests we only want parameters set when all environmentd /
421    clusterd processes have reached a specific version (or higher)
422    """
423
424    if not version:
425        version = MzVersion.parse_cargo()
426
427    params = get_minimal_system_parameters(version)
428
429    system_param_setting = os.getenv("CI_SYSTEM_PARAMETERS", "")
430    variable_params = get_variable_system_parameters(version, force_source_table_syntax)
431
432    if system_param_setting == "":
433        for param in variable_params:
434            params[param.key] = param.default
435    elif system_param_setting == "random":
436        seed = os.getenv("CI_SYSTEM_PARAMETERS_SEED", os.getenv("BUILDKITE_JOB_ID", 1))
437        rng = random.Random(seed)
438        for param in variable_params:
439            params[param.key] = rng.choice(param.values)
440        print(
441            f"System parameters with seed CI_SYSTEM_PARAMETERS_SEED={seed}: {params}",
442            file=sys.stderr,
443        )
444    elif system_param_setting == "minimal":
445        pass
446    else:
447        raise ValueError(
448            f"Unknown value for CI_SYSTEM_PARAMETERS: {system_param_setting}"
449        )
450
451    return params
452
453
454# If you are adding a new config flag in Materialize, consider setting values
455# for it in get_variable_system_parameters if it can be varied in tests. Set it
456# in get_minimal_system_parameters if it's required for tests to succeed at
457# all. Only add it in UNINTERESTING_SYSTEM_PARAMETERS if none of the above
458# apply.
459UNINTERESTING_SYSTEM_PARAMETERS = [
460    "enable_mz_join_core",
461    "linear_join_yielding",
462    "enable_lgalloc_eager_reclamation",
463    "lgalloc_background_interval",
464    "lgalloc_file_growth_dampener",
465    "lgalloc_local_buffer_bytes",
466    "lgalloc_slow_clear_bytes",
467    "memory_limiter_interval",
468    "memory_limiter_usage_bias",
469    "memory_limiter_burst_factor",
470    "compute_server_maintenance_interval",
471    "compute_dataflow_max_inflight_bytes_cc",
472    "compute_flat_map_fuel",
473    "compute_temporal_bucketing_summary",
474    "consolidating_vec_growth_dampener",
475    "copy_to_s3_parquet_row_group_file_ratio",
476    "copy_to_s3_arrow_builder_buffer_ratio",
477    "copy_to_s3_multipart_part_size_bytes",
478    "enable_replica_targeted_materialized_views",
479    "enable_compute_replica_expiration",
480    "enable_compute_render_fueled_as_specific_collection",
481    "compute_logical_backpressure_max_retained_capabilities",
482    "compute_logical_backpressure_inflight_slack",
483    "persist_fetch_semaphore_cost_adjustment",
484    "persist_fetch_semaphore_permit_adjustment",
485    "persist_optimize_ignored_data_fetch",
486    "persist_pubsub_same_process_delegate_enabled",
487    "persist_pubsub_connect_attempt_timeout",
488    "persist_pubsub_request_timeout",
489    "persist_pubsub_connect_max_backoff",
490    "persist_pubsub_client_sender_channel_size",
491    "persist_pubsub_client_receiver_channel_size",
492    "persist_pubsub_server_connection_channel_size",
493    "persist_pubsub_state_cache_shard_ref_channel_size",
494    "persist_pubsub_reconnect_backoff",
495    "persist_encoding_compression_format",
496    "persist_batch_max_runs",
497    "persist_write_combine_inline_writes",
498    "persist_reader_lease_duration",
499    "persist_consensus_connection_pool_max_size",
500    "persist_consensus_connection_pool_max_wait",
501    "persist_consensus_connection_pool_ttl",
502    "persist_consensus_connection_pool_ttl_stagger",
503    "crdb_connect_timeout",
504    "crdb_tcp_user_timeout",
505    "crdb_keepalives_idle",
506    "crdb_keepalives_interval",
507    "crdb_keepalives_retries",
508    "persist_use_critical_since_txn",
509    "use_global_txn_cache_source",
510    "persist_batch_builder_max_outstanding_parts",
511    "persist_compaction_heuristic_min_inputs",
512    "persist_compaction_heuristic_min_parts",
513    "persist_compaction_heuristic_min_updates",
514    "persist_gc_blob_delete_concurrency_limit",
515    "persist_state_versions_recent_live_diffs_limit",
516    "persist_usage_state_fetch_concurrency_limit",
517    "persist_blob_operation_timeout",
518    "persist_blob_operation_attempt_timeout",
519    "persist_blob_connect_timeout",
520    "persist_blob_read_timeout",
521    "persist_stats_collection_enabled",
522    "persist_stats_filter_enabled",
523    "persist_stats_budget_bytes",
524    "persist_stats_untrimmable_columns_equals",
525    "persist_stats_untrimmable_columns_prefix",
526    "persist_stats_untrimmable_columns_suffix",
527    "persist_expression_cache_force_compaction_fuel",
528    "persist_expression_cache_force_compaction_wait",
529    "persist_blob_cache_mem_limit_bytes",
530    "persist_blob_cache_scale_factor_bytes",
531    "persist_claim_unclaimed_compactions",
532    "persist_claim_compaction_percent",
533    "persist_claim_compaction_min_version",
534    "persist_next_listen_batch_retryer_multiplier",
535    "persist_rollup_threshold",
536    "persist_rollup_fallback_threshold_ms",
537    "persist_gc_fallback_threshold_ms",
538    "persist_compaction_minimum_timeout",
539    "persist_compaction_check_process_flag",
540    "balancerd_sigterm_connection_wait",
541    "balancerd_sigterm_listen_wait",
542    "balancerd_inject_proxy_protocol_header_http",
543    "balancerd_log_filter",
544    "balancerd_opentelemetry_filter",
545    "balancerd_log_filter_defaults",
546    "balancerd_opentelemetry_filter_defaults",
547    "balancerd_sentry_filters",
548    "persist_enable_s3_lgalloc_cc_sizes",
549    "persist_enable_arrow_lgalloc_cc_sizes",
550    "controller_past_generation_replica_cleanup_retry_interval",
551    "wallclock_lag_recording_interval",
552    "wallclock_lag_histogram_period_interval",
553    "enable_timely_zero_copy",
554    "enable_timely_zero_copy_lgalloc",
555    "timely_zero_copy_limit",
556    "arrangement_exert_proportionality",
557    "txn_wal_apply_ensure_schema_match",
558    "persist_txns_data_shard_retryer_initial_backoff",
559    "persist_txns_data_shard_retryer_multiplier",
560    "persist_txns_data_shard_retryer_clamp",
561    "storage_cluster_shutdown_grace_period",
562    "storage_dataflow_delay_sources_past_rehydration",
563    "storage_dataflow_suspendable_sources",
564    "storage_downgrade_since_during_finalization",
565    "replica_metrics_history_retention_interval",
566    "wallclock_lag_history_retention_interval",
567    "wallclock_global_lag_histogram_retention_interval",
568    "kafka_client_id_enrichment_rules",
569    "kafka_poll_max_wait",
570    "kafka_default_aws_privatelink_endpoint_identification_algorithm",
571    "kafka_buffered_event_resize_threshold_elements",
572    "mysql_replication_heartbeat_interval",
573    "postgres_fetch_slot_resume_lsn_interval",
574    "pg_schema_validation_interval",
575    "pg_source_validate_timeline",
576    "sql_server_source_validate_restore_history",
577    "storage_enforce_external_addresses",
578    "storage_upsert_prevent_snapshot_buffering",
579    "storage_rocksdb_use_merge_operator",
580    "storage_upsert_max_snapshot_batch_buffering",
581    "storage_rocksdb_cleanup_tries",
582    "storage_suspend_and_restart_delay",
583    "storage_server_maintenance_interval",
584    "storage_sink_progress_search",
585    "storage_sink_ensure_topic_config",
586    "sql_server_max_lsn_wait",
587    "sql_server_snapshot_progress_report_interval",
588    "sql_server_cdc_cleanup_change_table",
589    "sql_server_cdc_cleanup_change_table_max_deletes",
590    "allow_user_sessions",
591    "with_0dt_deployment_ddl_check_interval",
592    "enable_0dt_caught_up_check",
593    "with_0dt_caught_up_check_allowed_lag",
594    "with_0dt_caught_up_check_cutoff",
595    "enable_0dt_caught_up_replica_status_check",
596    "plan_insights_notice_fast_path_clusters_optimize_duration",
597    "enable_continual_task_builtins",
598    "enable_expression_cache",
599    "mz_metrics_lgalloc_map_refresh_interval",
600    "mz_metrics_lgalloc_refresh_interval",
601    "mz_metrics_rusage_refresh_interval",
602    "compute_peek_response_stash_batch_max_runs",
603    "compute_peek_response_stash_read_batch_size_bytes",
604    "compute_peek_response_stash_read_memory_budget_bytes",
605    "compute_peek_stash_num_batches",
606    "compute_peek_stash_batch_size",
607    "storage_statistics_retention_duration",
608    "enable_paused_cluster_readhold_downgrade",
609    "kafka_retry_backoff",
610    "kafka_retry_backoff_max",
611    "kafka_reconnect_backoff",
612    "kafka_reconnect_backoff_max",
613    "oidc_issuer",
614    "oidc_audience",
615    "oidc_authentication_claim",
616    "enable_mcp_agents",
617    "enable_mcp_observatory",
618    "user_id_pool_batch_size",
619]
620
621
622DEFAULT_CRDB_ENVIRONMENT = [
623    "COCKROACH_ENGINE_MAX_SYNC_DURATION_DEFAULT=120s",
624    "COCKROACH_LOG_MAX_SYNC_DURATION=120s",
625]
626
627
628# TODO(benesch): change to `docker-mzcompose` once v0.39 ships.
629DEFAULT_CLOUD_PROVIDER = "mzcompose"
630DEFAULT_CLOUD_REGION = "us-east-1"
631DEFAULT_ORG_ID = "00000000-0000-0000-0000-000000000000"
632DEFAULT_ORDINAL = "0"
633DEFAULT_MZ_ENVIRONMENT_ID = f"{DEFAULT_CLOUD_PROVIDER}-{DEFAULT_CLOUD_REGION}-{DEFAULT_ORG_ID}-{DEFAULT_ORDINAL}"
634
635
636# TODO(benesch): replace with Docker health checks.
637def _check_tcp(
638    cmd: list[str], host: str, port: int, timeout_secs: int, kind: str = ""
639) -> list[str]:
640    cmd.extend(
641        [
642            "timeout",
643            str(timeout_secs),
644            "bash",
645            "-c",
646            f"until [ cat < /dev/null > /dev/tcp/{host}/{port} ] ; do sleep 0.1 ; done",
647        ]
648    )
649    try:
650        spawn.capture(cmd, stderr=subprocess.STDOUT)
651    except subprocess.CalledProcessError as e:
652        ui.log_in_automation(
653            "wait-for-tcp ({}{}:{}): error running {}: {}, stdout:\n{}\nstderr:\n{}".format(
654                kind, host, port, ui.shell_quote(cmd), e, e.stdout, e.stderr
655            )
656        )
657        raise
658    return cmd
659
660
661# TODO(benesch): replace with Docker health checks.
662def _wait_for_pg(
663    timeout_secs: int,
664    query: str,
665    dbname: str,
666    port: int,
667    host: str,
668    user: str,
669    password: str | None,
670    expected: Iterable[Any] | Literal["any"],
671    print_result: bool = False,
672    sslmode: str = "disable",
673) -> None:
674    """Wait for a pg-compatible database (includes materialized)"""
675    obfuscated_password = password[0:1] if password is not None else ""
676    args = f"dbname={dbname} host={host} port={port} user={user} password='{obfuscated_password}...'"
677    ui.progress(f"waiting for {args} to handle {query!r}", "C")
678    error = None
679    for remaining in ui.timeout_loop(timeout_secs, tick=0.5):
680        try:
681            conn = psycopg.connect(
682                dbname=dbname,
683                host=host,
684                port=port,
685                user=user,
686                password=password,
687                connect_timeout=1,
688                sslmode=sslmode,
689            )
690            # The default (autocommit = false) wraps everything in a transaction.
691            conn.autocommit = True
692            with conn.cursor() as cur:
693                cur.execute(query.encode())
694                if expected == "any" and cur.rowcount == -1:
695                    ui.progress(" success!", finish=True)
696                    return
697                result = list(cur.fetchall())
698                if expected == "any" or result == expected:
699                    if print_result:
700                        say(f"query result: {result}")
701                    else:
702                        ui.progress(" success!", finish=True)
703                    return
704                else:
705                    say(
706                        f"host={host} port={port} did not return rows matching {expected} got: {result}"
707                    )
708        except Exception as e:
709            ui.progress(f"{e if print_result else ''} {int(remaining)}")
710            error = e
711    ui.progress(finish=True)
712    raise UIError(f"never got correct result for {args}: {error}")
713
714
715def bootstrap_cluster_replica_size() -> str:
716    return "bootstrap"
717
718
719def cluster_replica_size_map() -> dict[str, dict[str, Any]]:
720    """scale=<n>,workers=<n>[,mem=<n>GiB][,legacy]"""
721
722    def replica_size(
723        scale: int,
724        workers: int,
725        disabled: bool = False,
726        is_cc: bool = True,
727        memory_limit: str = "4 GiB",
728    ) -> dict[str, Any]:
729        return {
730            "cpu_exclusive": False,
731            "cpu_limit": None,
732            "credits_per_hour": f"{workers * scale}",
733            "disabled": disabled,
734            "disk_limit": None,
735            "is_cc": is_cc,
736            "memory_limit": memory_limit,
737            "scale": scale,
738            "workers": workers,
739            # "selectors": {},
740        }
741
742    replica_sizes = {
743        bootstrap_cluster_replica_size(): replica_size(1, 1),
744        "scale=2,workers=4": replica_size(2, 4),
745        "scale=1,workers=1,legacy": replica_size(1, 1, is_cc=False),
746        "scale=1,workers=2,legacy": replica_size(1, 2, is_cc=False),
747        # Intentionally not following the naming scheme
748        "free": replica_size(1, 1, disabled=True),
749    }
750
751    for i in range(0, 6):
752        workers = 1 << i
753        replica_sizes[f"scale=1,workers={workers}"] = replica_size(1, workers)
754        for mem in [4, 8, 16, 32]:
755            replica_sizes[f"scale=1,workers={workers},mem={mem}GiB"] = replica_size(
756                1, workers, memory_limit=f"{mem} GiB"
757            )
758
759        replica_sizes[f"scale={workers},workers=1"] = replica_size(workers, 1)
760        replica_sizes[f"scale={workers},workers={workers}"] = replica_size(
761            workers, workers
762        )
763        replica_sizes[f"scale=1,workers={workers},mem={workers}GiB"] = replica_size(
764            1, workers, memory_limit=f"{workers} GiB"
765        )
766
767    return replica_sizes
def say(msg: str) -> None:
55    def say(msg: str) -> None:
56        if not Verbosity.quiet:
57            print(f"{prefix}{msg}", file=sys.stderr)
DEFAULT_CONFLUENT_PLATFORM_VERSION = '7.9.4'
DEFAULT_MZ_VOLUMES = ['mzdata:/mzdata', 'mydata:/var/lib/mysql-files', 'tmp:/share/tmp', 'scratch:/scratch']
ADDITIONAL_BENCHMARKING_SYSTEM_PARAMETERS = {}
def get_minimal_system_parameters(version: materialize.mz_version.MzVersion) -> dict[str, str]:
 55def get_minimal_system_parameters(
 56    version: MzVersion,
 57) -> dict[str, str]:
 58    """Settings we need in order to have tests run at all, but otherwise stay
 59    with the defaults: not changing performance or increasing coverage."""
 60
 61    config = {
 62        # -----
 63        # Unsafe functions
 64        "unsafe_enable_unsafe_functions": "true",
 65        # -----
 66        # Others (ordered by name)
 67        "allow_real_time_recency": "true",
 68        "constraint_based_timestamp_selection": "verify",  # removed from main, keeping it here for old versions
 69        "enable_compute_peek_response_stash": "true",
 70        "enable_0dt_deployment_panic_after_timeout": "true",
 71        "enable_0dt_deployment_sources": (
 72            "true" if version >= MzVersion.parse_mz("v0.132.0-dev") else "false"
 73        ),
 74        "enable_alter_swap": "true",
 75        "enable_cast_elimination": "true",
 76        "enable_columnar_lgalloc": "false",
 77        "enable_columnation_lgalloc": "false",
 78        "enable_compute_correction_v2": "true",
 79        "enable_compute_logical_backpressure": "true",
 80        "enable_connection_validation_syntax": "true",
 81        "enable_continual_task_create": "true",
 82        "enable_continual_task_retain": "true",
 83        "enable_continual_task_transform": "true",
 84        "enable_copy_to_expr": "true",
 85        "enable_copy_from_remote": "true",
 86        "enable_create_table_from_source": "true",
 87        "enable_eager_delta_joins": "true",
 88        "enable_envelope_debezium_in_subscribe": "true",
 89        "enable_expressions_in_limit_syntax": "true",
 90        "enable_iceberg_sink": "true",
 91        "enable_introspection_subscribes": "true",
 92        "enable_kafka_sink_partition_by": "true",
 93        "enable_lgalloc": "false",
 94        "enable_load_generator_counter": "true",
 95        "enable_logical_compaction_window": "true",
 96        "enable_multi_worker_storage_persist_sink": "true",
 97        "enable_multi_replica_sources": "true",
 98        "enable_rbac_checks": "true",
 99        "enable_reduce_mfp_fusion": "true",
100        "enable_refresh_every_mvs": "true",
101        "enable_replacement_materialized_views": "true",
102        "enable_cluster_schedule_refresh": "true",
103        "enable_sql_server_source": "true",
104        "enable_s3_tables_region_check": "false",
105        "enable_statement_lifecycle_logging": "true",
106        "enable_compute_temporal_bucketing": "true",
107        "enable_variadic_left_join_lowering": "true",
108        "enable_worker_core_affinity": "true",
109        "grpc_client_http2_keep_alive_timeout": "5s",
110        "ore_overflowing_behavior": "panic",
111        "unsafe_enable_table_keys": "true",
112        "with_0dt_deployment_max_wait": "1800s",
113        # End of list (ordered by name)
114    }
115
116    if version < MzVersion.parse_mz("v0.163.0-dev"):
117        config["enable_compute_active_dataflow_cancelation"] = "true"
118
119    return config

Settings we need in order to have tests run at all, but otherwise stay with the defaults: not changing performance or increasing coverage.

@dataclass
class VariableSystemParameter:
122@dataclass
123class VariableSystemParameter:
124    key: str
125    default: str
126    values: list[str]
VariableSystemParameter(key: str, default: str, values: list[str])
key: str
default: str
values: list[str]
def get_variable_system_parameters( version: materialize.mz_version.MzVersion, force_source_table_syntax: bool) -> list[VariableSystemParameter]:
130def get_variable_system_parameters(
131    version: MzVersion,
132    force_source_table_syntax: bool,
133) -> list[VariableSystemParameter]:
134    """Note: Only the default is tested unless we explicitly select "System Parameters: Random" in trigger-ci.
135    These defaults are applied _after_ applying the settings from `get_minimal_system_parameters`.
136    """
137
138    return [
139        # -----
140        # To reduce CRDB load as we are struggling with it in CI (values based on load test environment):
141        VariableSystemParameter(
142            "persist_next_listen_batch_retryer_clamp",
143            "16s",
144            ["100ms", "1s", "10s", "100s"],
145        ),
146        VariableSystemParameter(
147            "persist_next_listen_batch_retryer_initial_backoff",
148            "100ms",
149            ["10ms", "100ms", "1s", "10s"],
150        ),
151        VariableSystemParameter(
152            "persist_next_listen_batch_retryer_fixed_sleep",
153            "1200ms",
154            ["100ms", "1s", "10s"],
155        ),
156        # -----
157        # Persist internals changes, advance coverage
158        VariableSystemParameter(
159            "persist_enable_arrow_lgalloc_noncc_sizes", "true", ["true", "false"]
160        ),
161        VariableSystemParameter(
162            "persist_enable_s3_lgalloc_noncc_sizes", "true", ["true", "false"]
163        ),
164        # -----
165        # Others (ordered by name),
166        VariableSystemParameter(
167            "compute_dataflow_max_inflight_bytes",
168            "134217728",
169            ["1048576", "4194304", "16777216", "67108864"],
170        ),  # 128 MiB
171        VariableSystemParameter("compute_hydration_concurrency", "2", ["1", "2", "4"]),
172        VariableSystemParameter(
173            "compute_replica_expiration_offset", "3d", ["3d", "10d"]
174        ),
175        VariableSystemParameter(
176            "compute_apply_column_demands", "true", ["true", "false"]
177        ),
178        VariableSystemParameter(
179            "compute_peek_response_stash_threshold_bytes",
180            # 1 MiB, an in-between value
181            "1048576",
182            # force-enabled, the in-between, and the production value
183            ["0", "1048576", "314572800", "67108864"],
184        ),
185        VariableSystemParameter(
186            "compute_subscribe_snapshot_optimization",
187            "true",
188            ["true", "false"],
189        ),
190        VariableSystemParameter(
191            "enable_cast_elimination",
192            "true",
193            ["true", "false"],
194        ),
195        VariableSystemParameter(
196            "enable_password_auth",
197            "true",
198            ["true", "false"],
199        ),
200        VariableSystemParameter(
201            "enable_frontend_peek_sequencing",
202            "true" if version >= MzVersion.parse_mz("v26.9.0-dev") else "false",
203            ["true", "false"],
204        ),
205        VariableSystemParameter(
206            "default_timestamp_interval",
207            "1s",
208            ["100ms", "1s"],
209        ),
210        VariableSystemParameter(
211            "force_source_table_syntax",
212            "true" if force_source_table_syntax else "false",
213            ["true", "false"] if force_source_table_syntax else ["false"],
214        ),
215        VariableSystemParameter(
216            "persist_batch_columnar_format",
217            "structured" if version > MzVersion.parse_mz("v0.135.0-dev") else "both_v2",
218            ["row", "both_v2", "both", "structured"],
219        ),
220        VariableSystemParameter(
221            "persist_batch_delete_enabled", "true", ["true", "false"]
222        ),
223        VariableSystemParameter(
224            "persist_batch_structured_order", "true", ["true", "false"]
225        ),
226        VariableSystemParameter(
227            "persist_batch_builder_structured", "true", ["true", "false"]
228        ),
229        VariableSystemParameter(
230            "persist_batch_structured_key_lower_len",
231            "256",
232            ["0", "1", "512", "1000"],
233        ),
234        VariableSystemParameter(
235            "persist_batch_max_run_len", "4", ["2", "3", "4", "16"]
236        ),
237        VariableSystemParameter(
238            "persist_catalog_force_compaction_fuel",
239            "1024",
240            ["256", "1024", "4096"],
241        ),
242        VariableSystemParameter(
243            "persist_catalog_force_compaction_wait",
244            "1s",
245            ["100ms", "1s", "10s"],
246        ),
247        VariableSystemParameter(
248            "persist_encoding_enable_dictionary", "true", ["true", "false"]
249        ),
250        VariableSystemParameter(
251            "persist_stats_audit_percent",
252            "100",
253            [
254                "0",
255                "1",
256                "2",
257                "10",
258                "100",
259            ],
260        ),
261        VariableSystemParameter("persist_stats_audit_panic", "true", ["true", "false"]),
262        VariableSystemParameter(
263            "persist_encoding_enable_dictionary", "true", ["true", "false"]
264        ),
265        VariableSystemParameter(
266            "persist_fast_path_limit",
267            "1000",
268            ["100", "1000", "10000"],
269        ),
270        VariableSystemParameter("persist_fast_path_order", "true", ["true", "false"]),
271        VariableSystemParameter(
272            "persist_gc_use_active_gc",
273            ("true" if version > MzVersion.parse_mz("v0.143.0-dev") else "false"),
274            (
275                ["true", "false"]
276                if version > MzVersion.parse_mz("v0.127.0-dev")
277                else ["false"]
278            ),
279        ),
280        VariableSystemParameter(
281            "persist_gc_min_versions",
282            "16",
283            ["16", "256", "1024"],
284        ),
285        VariableSystemParameter(
286            "persist_gc_max_versions",
287            "128000",
288            ["256", "128000"],
289        ),
290        VariableSystemParameter(
291            "persist_inline_writes_single_max_bytes",
292            "4096",
293            ["256", "1024", "4096", "16384"],
294        ),
295        VariableSystemParameter(
296            "persist_inline_writes_total_max_bytes",
297            "1048576",
298            ["65536", "262144", "1048576", "4194304"],
299        ),
300        VariableSystemParameter(
301            "persist_pubsub_client_enabled", "true", ["true", "false"]
302        ),
303        VariableSystemParameter(
304            "persist_pubsub_push_diff_enabled", "true", ["true", "false"]
305        ),
306        VariableSystemParameter(
307            "persist_record_compactions", "true", ["true", "false"]
308        ),
309        VariableSystemParameter(
310            "persist_record_schema_id",
311            ("true" if version > MzVersion.parse_mz("v0.127.0-dev") else "false"),
312            (
313                ["true", "false"]
314                if version > MzVersion.parse_mz("v0.127.0-dev")
315                else ["false"]
316            ),
317        ),
318        VariableSystemParameter(
319            "persist_rollup_use_active_rollup",
320            ("true" if version > MzVersion.parse_mz("v0.143.0-dev") else "false"),
321            (
322                ["true", "false"]
323                if version > MzVersion.parse_mz("v0.127.0-dev")
324                else ["false"]
325            ),
326        ),
327        # 16 MiB - large enough to avoid a big perf hit, small enough to get more coverage...
328        VariableSystemParameter(
329            "persist_blob_target_size",
330            "16777216",
331            ["4096", "1048576", "16777216", "134217728"],
332        ),
333        # 5 times the default part size - 4 is the bare minimum.
334        VariableSystemParameter(
335            "persist_compaction_memory_bound_bytes",
336            "83886080",
337            ["67108864", "134217728", "536870912", "1073741824"],
338        ),
339        VariableSystemParameter(
340            "persist_enable_incremental_compaction",
341            ("true" if version >= MzVersion.parse_mz("v0.161.0-dev") else "false"),
342            (
343                ["true", "false"]
344                if version >= MzVersion.parse_mz("v0.161.0-dev")
345                else ["false"]
346            ),
347        ),
348        VariableSystemParameter(
349            "persist_use_critical_since_catalog", "true", ["true", "false"]
350        ),
351        VariableSystemParameter(
352            "persist_use_critical_since_snapshot",
353            "false",  # always false, because we always have zero-downtime enabled
354            ["false"],
355        ),
356        VariableSystemParameter(
357            "persist_use_critical_since_source",
358            "false",  # always false, because we always have zero-downtime enabled
359            ["false"],
360        ),
361        VariableSystemParameter(
362            "persist_part_decode_format", "arrow", ["arrow", "row_with_validate"]
363        ),
364        VariableSystemParameter(
365            "persist_blob_cache_scale_with_threads", "true", ["true", "false"]
366        ),
367        VariableSystemParameter(
368            "persist_state_update_lease_timeout", "1s", ["0s", "1s", "10s"]
369        ),
370        VariableSystemParameter(
371            "persist_validate_part_bounds_on_read", "false", ["true", "false"]
372        ),
373        VariableSystemParameter(
374            "persist_validate_part_bounds_on_write", "false", ["true", "false"]
375        ),
376        VariableSystemParameter(
377            "statement_logging_default_sample_rate",
378            "1.0",
379            ["0", "0.01", "0.5", "0.99", "1.0"],
380        ),
381        VariableSystemParameter(
382            "statement_logging_max_data_credit",
383            "",
384            ["", "0", "1024", "1048576", "1073741824"],
385        ),
386        VariableSystemParameter(
387            "statement_logging_max_sample_rate",
388            "1.0",
389            ["0", "0.01", "0.5", "0.99", "1.0"],
390        ),
391        VariableSystemParameter(
392            "statement_logging_target_data_rate",
393            "",
394            ["", "0", "1", "1000", "2071", "1000000"],
395        ),
396        VariableSystemParameter("storage_reclock_to_latest", "true", ["true", "false"]),
397        VariableSystemParameter(
398            "storage_source_decode_fuel",
399            "100000",
400            ["10000", "100000", "1000000"],
401        ),
402        VariableSystemParameter(
403            "storage_statistics_collection_interval",
404            "1000",
405            ["100", "1000", "10000"],
406        ),
407        VariableSystemParameter(
408            "storage_statistics_interval", "2000", ["100", "1000", "10000"]
409        ),
410        VariableSystemParameter(
411            "storage_use_continual_feedback_upsert", "true", ["true", "false"]
412        ),
413        # End of list (ordered by name)
414    ]

Note: Only the default is tested unless we explicitly select "System Parameters: Random" in trigger-ci. These defaults are applied _after_ applying the settings from get_minimal_system_parameters.

def get_default_system_parameters( version: materialize.mz_version.MzVersion | None = None, force_source_table_syntax: bool = False) -> dict[str, str]:
417def get_default_system_parameters(
418    version: MzVersion | None = None,
419    force_source_table_syntax: bool = False,
420) -> dict[str, str]:
421    """For upgrade tests we only want parameters set when all environmentd /
422    clusterd processes have reached a specific version (or higher)
423    """
424
425    if not version:
426        version = MzVersion.parse_cargo()
427
428    params = get_minimal_system_parameters(version)
429
430    system_param_setting = os.getenv("CI_SYSTEM_PARAMETERS", "")
431    variable_params = get_variable_system_parameters(version, force_source_table_syntax)
432
433    if system_param_setting == "":
434        for param in variable_params:
435            params[param.key] = param.default
436    elif system_param_setting == "random":
437        seed = os.getenv("CI_SYSTEM_PARAMETERS_SEED", os.getenv("BUILDKITE_JOB_ID", 1))
438        rng = random.Random(seed)
439        for param in variable_params:
440            params[param.key] = rng.choice(param.values)
441        print(
442            f"System parameters with seed CI_SYSTEM_PARAMETERS_SEED={seed}: {params}",
443            file=sys.stderr,
444        )
445    elif system_param_setting == "minimal":
446        pass
447    else:
448        raise ValueError(
449            f"Unknown value for CI_SYSTEM_PARAMETERS: {system_param_setting}"
450        )
451
452    return params

For upgrade tests we only want parameters set when all environmentd / clusterd processes have reached a specific version (or higher)

UNINTERESTING_SYSTEM_PARAMETERS = ['enable_mz_join_core', 'linear_join_yielding', 'enable_lgalloc_eager_reclamation', 'lgalloc_background_interval', 'lgalloc_file_growth_dampener', 'lgalloc_local_buffer_bytes', 'lgalloc_slow_clear_bytes', 'memory_limiter_interval', 'memory_limiter_usage_bias', 'memory_limiter_burst_factor', 'compute_server_maintenance_interval', 'compute_dataflow_max_inflight_bytes_cc', 'compute_flat_map_fuel', 'compute_temporal_bucketing_summary', 'consolidating_vec_growth_dampener', 'copy_to_s3_parquet_row_group_file_ratio', 'copy_to_s3_arrow_builder_buffer_ratio', 'copy_to_s3_multipart_part_size_bytes', 'enable_replica_targeted_materialized_views', 'enable_compute_replica_expiration', 'enable_compute_render_fueled_as_specific_collection', 'compute_logical_backpressure_max_retained_capabilities', 'compute_logical_backpressure_inflight_slack', 'persist_fetch_semaphore_cost_adjustment', 'persist_fetch_semaphore_permit_adjustment', 'persist_optimize_ignored_data_fetch', 'persist_pubsub_same_process_delegate_enabled', 'persist_pubsub_connect_attempt_timeout', 'persist_pubsub_request_timeout', 'persist_pubsub_connect_max_backoff', 'persist_pubsub_client_sender_channel_size', 'persist_pubsub_client_receiver_channel_size', 'persist_pubsub_server_connection_channel_size', 'persist_pubsub_state_cache_shard_ref_channel_size', 'persist_pubsub_reconnect_backoff', 'persist_encoding_compression_format', 'persist_batch_max_runs', 'persist_write_combine_inline_writes', 'persist_reader_lease_duration', 'persist_consensus_connection_pool_max_size', 'persist_consensus_connection_pool_max_wait', 'persist_consensus_connection_pool_ttl', 'persist_consensus_connection_pool_ttl_stagger', 'crdb_connect_timeout', 'crdb_tcp_user_timeout', 'crdb_keepalives_idle', 'crdb_keepalives_interval', 'crdb_keepalives_retries', 'persist_use_critical_since_txn', 'use_global_txn_cache_source', 'persist_batch_builder_max_outstanding_parts', 'persist_compaction_heuristic_min_inputs', 'persist_compaction_heuristic_min_parts', 'persist_compaction_heuristic_min_updates', 'persist_gc_blob_delete_concurrency_limit', 'persist_state_versions_recent_live_diffs_limit', 'persist_usage_state_fetch_concurrency_limit', 'persist_blob_operation_timeout', 'persist_blob_operation_attempt_timeout', 'persist_blob_connect_timeout', 'persist_blob_read_timeout', 'persist_stats_collection_enabled', 'persist_stats_filter_enabled', 'persist_stats_budget_bytes', 'persist_stats_untrimmable_columns_equals', 'persist_stats_untrimmable_columns_prefix', 'persist_stats_untrimmable_columns_suffix', 'persist_expression_cache_force_compaction_fuel', 'persist_expression_cache_force_compaction_wait', 'persist_blob_cache_mem_limit_bytes', 'persist_blob_cache_scale_factor_bytes', 'persist_claim_unclaimed_compactions', 'persist_claim_compaction_percent', 'persist_claim_compaction_min_version', 'persist_next_listen_batch_retryer_multiplier', 'persist_rollup_threshold', 'persist_rollup_fallback_threshold_ms', 'persist_gc_fallback_threshold_ms', 'persist_compaction_minimum_timeout', 'persist_compaction_check_process_flag', 'balancerd_sigterm_connection_wait', 'balancerd_sigterm_listen_wait', 'balancerd_inject_proxy_protocol_header_http', 'balancerd_log_filter', 'balancerd_opentelemetry_filter', 'balancerd_log_filter_defaults', 'balancerd_opentelemetry_filter_defaults', 'balancerd_sentry_filters', 'persist_enable_s3_lgalloc_cc_sizes', 'persist_enable_arrow_lgalloc_cc_sizes', 'controller_past_generation_replica_cleanup_retry_interval', 'wallclock_lag_recording_interval', 'wallclock_lag_histogram_period_interval', 'enable_timely_zero_copy', 'enable_timely_zero_copy_lgalloc', 'timely_zero_copy_limit', 'arrangement_exert_proportionality', 'txn_wal_apply_ensure_schema_match', 'persist_txns_data_shard_retryer_initial_backoff', 'persist_txns_data_shard_retryer_multiplier', 'persist_txns_data_shard_retryer_clamp', 'storage_cluster_shutdown_grace_period', 'storage_dataflow_delay_sources_past_rehydration', 'storage_dataflow_suspendable_sources', 'storage_downgrade_since_during_finalization', 'replica_metrics_history_retention_interval', 'wallclock_lag_history_retention_interval', 'wallclock_global_lag_histogram_retention_interval', 'kafka_client_id_enrichment_rules', 'kafka_poll_max_wait', 'kafka_default_aws_privatelink_endpoint_identification_algorithm', 'kafka_buffered_event_resize_threshold_elements', 'mysql_replication_heartbeat_interval', 'postgres_fetch_slot_resume_lsn_interval', 'pg_schema_validation_interval', 'pg_source_validate_timeline', 'sql_server_source_validate_restore_history', 'storage_enforce_external_addresses', 'storage_upsert_prevent_snapshot_buffering', 'storage_rocksdb_use_merge_operator', 'storage_upsert_max_snapshot_batch_buffering', 'storage_rocksdb_cleanup_tries', 'storage_suspend_and_restart_delay', 'storage_server_maintenance_interval', 'storage_sink_progress_search', 'storage_sink_ensure_topic_config', 'sql_server_max_lsn_wait', 'sql_server_snapshot_progress_report_interval', 'sql_server_cdc_cleanup_change_table', 'sql_server_cdc_cleanup_change_table_max_deletes', 'allow_user_sessions', 'with_0dt_deployment_ddl_check_interval', 'enable_0dt_caught_up_check', 'with_0dt_caught_up_check_allowed_lag', 'with_0dt_caught_up_check_cutoff', 'enable_0dt_caught_up_replica_status_check', 'plan_insights_notice_fast_path_clusters_optimize_duration', 'enable_continual_task_builtins', 'enable_expression_cache', 'mz_metrics_lgalloc_map_refresh_interval', 'mz_metrics_lgalloc_refresh_interval', 'mz_metrics_rusage_refresh_interval', 'compute_peek_response_stash_batch_max_runs', 'compute_peek_response_stash_read_batch_size_bytes', 'compute_peek_response_stash_read_memory_budget_bytes', 'compute_peek_stash_num_batches', 'compute_peek_stash_batch_size', 'storage_statistics_retention_duration', 'enable_paused_cluster_readhold_downgrade', 'kafka_retry_backoff', 'kafka_retry_backoff_max', 'kafka_reconnect_backoff', 'kafka_reconnect_backoff_max', 'oidc_issuer', 'oidc_audience', 'oidc_authentication_claim', 'enable_mcp_agents', 'enable_mcp_observatory', 'user_id_pool_batch_size']
DEFAULT_CRDB_ENVIRONMENT = ['COCKROACH_ENGINE_MAX_SYNC_DURATION_DEFAULT=120s', 'COCKROACH_LOG_MAX_SYNC_DURATION=120s']
DEFAULT_CLOUD_PROVIDER = 'mzcompose'
DEFAULT_CLOUD_REGION = 'us-east-1'
DEFAULT_ORG_ID = '00000000-0000-0000-0000-000000000000'
DEFAULT_ORDINAL = '0'
DEFAULT_MZ_ENVIRONMENT_ID = 'mzcompose-us-east-1-00000000-0000-0000-0000-000000000000-0'
def bootstrap_cluster_replica_size() -> str:
716def bootstrap_cluster_replica_size() -> str:
717    return "bootstrap"
def cluster_replica_size_map() -> dict[str, dict[str, typing.Any]]:
720def cluster_replica_size_map() -> dict[str, dict[str, Any]]:
721    """scale=<n>,workers=<n>[,mem=<n>GiB][,legacy]"""
722
723    def replica_size(
724        scale: int,
725        workers: int,
726        disabled: bool = False,
727        is_cc: bool = True,
728        memory_limit: str = "4 GiB",
729    ) -> dict[str, Any]:
730        return {
731            "cpu_exclusive": False,
732            "cpu_limit": None,
733            "credits_per_hour": f"{workers * scale}",
734            "disabled": disabled,
735            "disk_limit": None,
736            "is_cc": is_cc,
737            "memory_limit": memory_limit,
738            "scale": scale,
739            "workers": workers,
740            # "selectors": {},
741        }
742
743    replica_sizes = {
744        bootstrap_cluster_replica_size(): replica_size(1, 1),
745        "scale=2,workers=4": replica_size(2, 4),
746        "scale=1,workers=1,legacy": replica_size(1, 1, is_cc=False),
747        "scale=1,workers=2,legacy": replica_size(1, 2, is_cc=False),
748        # Intentionally not following the naming scheme
749        "free": replica_size(1, 1, disabled=True),
750    }
751
752    for i in range(0, 6):
753        workers = 1 << i
754        replica_sizes[f"scale=1,workers={workers}"] = replica_size(1, workers)
755        for mem in [4, 8, 16, 32]:
756            replica_sizes[f"scale=1,workers={workers},mem={mem}GiB"] = replica_size(
757                1, workers, memory_limit=f"{mem} GiB"
758            )
759
760        replica_sizes[f"scale={workers},workers=1"] = replica_size(workers, 1)
761        replica_sizes[f"scale={workers},workers={workers}"] = replica_size(
762            workers, workers
763        )
764        replica_sizes[f"scale=1,workers={workers},mem={workers}GiB"] = replica_size(
765            1, workers, memory_limit=f"{workers} GiB"
766        )
767
768    return replica_sizes

scale=,workers=[,mem=GiB][,legacy]