misc.python.materialize.mzcompose

The implementation of the mzcompose system for Docker compositions.

For an overview of what mzcompose is and why it exists, see the user-facing documentation.

  1# Copyright Materialize, Inc. and contributors. All rights reserved.
  2#
  3# Use of this software is governed by the Business Source License
  4# included in the LICENSE file at the root of this repository.
  5#
  6# As of the Change Date specified in that file, in accordance with
  7# the Business Source License, use of this software will be governed
  8# by the Apache License, Version 2.0.
  9
 10"""The implementation of the mzcompose system for Docker compositions.
 11
 12For an overview of what mzcompose is and why it exists, see the [user-facing
 13documentation][user-docs].
 14
 15[user-docs]: https://github.com/MaterializeInc/materialize/blob/main/doc/developer/mzbuild.md
 16"""
 17
 18import os
 19import random
 20import subprocess
 21import sys
 22from collections.abc import Iterable
 23from dataclasses import dataclass
 24from typing import Any, Literal, TypeVar
 25
 26import psycopg
 27
 28from materialize import spawn, ui
 29from materialize.mz_version import MzVersion
 30from materialize.ui import UIError
 31
 32T = TypeVar("T")
 33say = ui.speaker("C> ")
 34
 35
 36DEFAULT_CONFLUENT_PLATFORM_VERSION = "7.9.0"
 37
 38DEFAULT_MZ_VOLUMES = [
 39    "mzdata:/mzdata",
 40    "mydata:/var/lib/mysql-files",
 41    "tmp:/share/tmp",
 42    "scratch:/scratch",
 43]
 44
 45
 46# Parameters which disable systems that periodically/unpredictably impact performance
 47ADDITIONAL_BENCHMARKING_SYSTEM_PARAMETERS = {
 48    "enable_statement_lifecycle_logging": "false",
 49    "persist_catalog_force_compaction_fuel": "0",
 50    "statement_logging_default_sample_rate": "0",
 51    "statement_logging_max_sample_rate": "0",
 52    # Default of 128 MB increases memory usage by a lot for some small
 53    # performance in benchmarks, see for example FastPathLimit scenario: 55%
 54    # more memory, 5% faster
 55    "persist_blob_cache_mem_limit_bytes": "1048576",
 56    # This would increase the memory usage of many tests, making it harder to
 57    # tell small memory increase regressions
 58    "persist_blob_cache_scale_with_threads": "false",
 59    # The peek response stash kicks in when results get larger, and it
 60    # increases query latency. Which in turn makes benchmarking more
 61    # unpredictable.
 62    "enable_compute_peek_response_stash": "false",
 63}
 64
 65
 66def get_minimal_system_parameters(
 67    version: MzVersion,
 68) -> dict[str, str]:
 69    """Settings we need in order to have tests run at all, but otherwise stay
 70    with the defaults: not changing performance or increasing coverage."""
 71
 72    config = {
 73        # -----
 74        # Unsafe functions
 75        "unsafe_enable_unsafe_functions": "true",
 76        # -----
 77        # Others (ordered by name)
 78        "allow_real_time_recency": "true",
 79        "constraint_based_timestamp_selection": "verify",
 80        "enable_compute_peek_response_stash": "true",
 81        "enable_0dt_deployment_panic_after_timeout": "true",
 82        "enable_0dt_deployment_sources": (
 83            "true" if version >= MzVersion.parse_mz("v0.132.0-dev") else "false"
 84        ),
 85        "enable_alter_swap": "true",
 86        "enable_columnar_lgalloc": "false",
 87        "enable_columnation_lgalloc": "false",
 88        "enable_compute_correction_v2": "true",
 89        "enable_compute_logical_backpressure": "true",
 90        "enable_connection_validation_syntax": "true",
 91        "enable_continual_task_create": "true",
 92        "enable_continual_task_retain": "true",
 93        "enable_continual_task_transform": "true",
 94        "enable_copy_to_expr": "true",
 95        "enable_create_table_from_source": "true",
 96        "enable_eager_delta_joins": "true",
 97        "enable_envelope_debezium_in_subscribe": "true",
 98        "enable_expressions_in_limit_syntax": "true",
 99        "enable_introspection_subscribes": "true",
100        "enable_kafka_sink_partition_by": "true",
101        "enable_lgalloc": "false",
102        "enable_load_generator_counter": "true",
103        "enable_logical_compaction_window": "true",
104        "enable_multi_worker_storage_persist_sink": "true",
105        "enable_multi_replica_sources": "true",
106        "enable_rbac_checks": "true",
107        "enable_reduce_mfp_fusion": "true",
108        "enable_refresh_every_mvs": "true",
109        "enable_cluster_schedule_refresh": "true",
110        "enable_sql_server_source": "true",
111        "enable_statement_lifecycle_logging": "true",
112        "enable_compute_temporal_bucketing": "true",
113        "enable_variadic_left_join_lowering": "true",
114        "enable_worker_core_affinity": "true",
115        "grpc_client_http2_keep_alive_timeout": "5s",
116        "ore_overflowing_behavior": "panic",
117        "unsafe_enable_table_keys": "true",
118        "with_0dt_deployment_max_wait": "1800s",
119        # End of list (ordered by name)
120    }
121
122    if version < MzVersion.parse_mz("v0.163.0-dev"):
123        config["enable_compute_active_dataflow_cancelation"] = "true"
124
125    return config
126
127
128@dataclass
129class VariableSystemParameter:
130    key: str
131    default: str
132    values: list[str]
133
134
135# TODO: The linter should check this too
136def get_variable_system_parameters(
137    version: MzVersion,
138    force_source_table_syntax: bool,
139) -> list[VariableSystemParameter]:
140    return [
141        # -----
142        # To reduce CRDB load as we are struggling with it in CI (values based on load test environment):
143        VariableSystemParameter(
144            "persist_next_listen_batch_retryer_clamp",
145            "16s",
146            ["100ms", "1s", "10s", "100s"],
147        ),
148        VariableSystemParameter(
149            "persist_next_listen_batch_retryer_initial_backoff",
150            "100ms",
151            ["10ms", "100ms", "1s", "10s"],
152        ),
153        VariableSystemParameter(
154            "persist_next_listen_batch_retryer_fixed_sleep",
155            "1200ms",
156            ["100ms", "1s", "10s"],
157        ),
158        # -----
159        # Persist internals changes, advance coverage
160        VariableSystemParameter(
161            "persist_enable_arrow_lgalloc_noncc_sizes", "true", ["true", "false"]
162        ),
163        VariableSystemParameter(
164            "persist_enable_s3_lgalloc_noncc_sizes", "true", ["true", "false"]
165        ),
166        # -----
167        # Others (ordered by name),
168        VariableSystemParameter(
169            "compute_dataflow_max_inflight_bytes",
170            "134217728",
171            ["1048576", "4194304", "16777216", "67108864"],
172        ),  # 128 MiB
173        VariableSystemParameter("compute_hydration_concurrency", "2", ["1", "2", "4"]),
174        VariableSystemParameter(
175            "compute_replica_expiration_offset", "3d", ["3d", "10d"]
176        ),
177        VariableSystemParameter(
178            "compute_apply_column_demands", "true", ["true", "false"]
179        ),
180        VariableSystemParameter(
181            "compute_peek_response_stash_threshold_bytes",
182            # 1 MiB, an in-between value
183            "1048576",
184            # force-enabled, the in-between, and the production value
185            ["0", "1048576", "314572800", "67108864"],
186        ),
187        VariableSystemParameter(
188            "enable_password_auth",
189            "true",
190            ["true", "false"],
191        ),
192        VariableSystemParameter(
193            "kafka_default_metadata_fetch_interval",
194            "1s",
195            ["100ms", "1s"],
196        ),
197        VariableSystemParameter("mysql_offset_known_interval", "1s", ["100ms", "1s"]),
198        VariableSystemParameter(
199            "force_source_table_syntax",
200            "true" if force_source_table_syntax else "false",
201            ["true", "false"] if force_source_table_syntax else ["false"],
202        ),
203        VariableSystemParameter(
204            "persist_batch_columnar_format",
205            "structured" if version > MzVersion.parse_mz("v0.135.0-dev") else "both_v2",
206            ["row", "both_v2", "both", "structured"],
207        ),
208        VariableSystemParameter(
209            "persist_batch_delete_enabled", "true", ["true", "false"]
210        ),
211        VariableSystemParameter(
212            "persist_batch_structured_order", "true", ["true", "false"]
213        ),
214        VariableSystemParameter(
215            "persist_batch_builder_structured", "true", ["true", "false"]
216        ),
217        VariableSystemParameter(
218            "persist_batch_structured_key_lower_len",
219            "256",
220            ["0", "1", "512", "1000"],
221        ),
222        VariableSystemParameter(
223            "persist_batch_max_run_len", "4", ["2", "3", "4", "16"]
224        ),
225        VariableSystemParameter(
226            "persist_catalog_force_compaction_fuel",
227            "1024",
228            ["256", "1024", "4096"],
229        ),
230        VariableSystemParameter(
231            "persist_catalog_force_compaction_wait",
232            "1s",
233            ["100ms", "1s", "10s"],
234        ),
235        VariableSystemParameter(
236            "persist_encoding_enable_dictionary", "true", ["true", "false"]
237        ),
238        VariableSystemParameter(
239            "persist_stats_audit_percent",
240            "100",
241            [
242                "0",
243                "1",
244                "2",
245                "10",
246                "100",
247            ],
248        ),
249        VariableSystemParameter("persist_stats_audit_panic", "true", ["true", "false"]),
250        VariableSystemParameter(
251            "persist_encoding_enable_dictionary", "true", ["true", "false"]
252        ),
253        VariableSystemParameter(
254            "persist_fast_path_limit",
255            "1000",
256            ["100", "1000", "10000"],
257        ),
258        VariableSystemParameter("persist_fast_path_order", "true", ["true", "false"]),
259        VariableSystemParameter(
260            "persist_gc_use_active_gc",
261            ("true" if version > MzVersion.parse_mz("v0.143.0-dev") else "false"),
262            (
263                ["true", "false"]
264                if version > MzVersion.parse_mz("v0.127.0-dev")
265                else ["false"]
266            ),
267        ),
268        VariableSystemParameter(
269            "persist_gc_min_versions",
270            "16",
271            ["16", "256", "1024"],
272        ),
273        VariableSystemParameter(
274            "persist_gc_max_versions",
275            "128000",
276            ["256", "128000"],
277        ),
278        VariableSystemParameter(
279            "persist_inline_writes_single_max_bytes",
280            "4096",
281            ["256", "1024", "4096", "16384"],
282        ),
283        VariableSystemParameter(
284            "persist_inline_writes_total_max_bytes",
285            "1048576",
286            ["65536", "262144", "1048576", "4194304"],
287        ),
288        VariableSystemParameter(
289            "persist_pubsub_client_enabled", "true", ["true", "false"]
290        ),
291        VariableSystemParameter(
292            "persist_pubsub_push_diff_enabled", "true", ["true", "false"]
293        ),
294        VariableSystemParameter(
295            "persist_record_compactions", "true", ["true", "false"]
296        ),
297        VariableSystemParameter(
298            "persist_record_schema_id",
299            ("true" if version > MzVersion.parse_mz("v0.127.0-dev") else "false"),
300            (
301                ["true", "false"]
302                if version > MzVersion.parse_mz("v0.127.0-dev")
303                else ["false"]
304            ),
305        ),
306        VariableSystemParameter(
307            "persist_rollup_use_active_rollup",
308            ("true" if version > MzVersion.parse_mz("v0.143.0-dev") else "false"),
309            (
310                ["true", "false"]
311                if version > MzVersion.parse_mz("v0.127.0-dev")
312                else ["false"]
313            ),
314        ),
315        # 16 MiB - large enough to avoid a big perf hit, small enough to get more coverage...
316        VariableSystemParameter(
317            "persist_blob_target_size",
318            "16777216",
319            ["4096", "1048576", "16777216", "134217728"],
320        ),
321        # 5 times the default part size - 4 is the bare minimum.
322        VariableSystemParameter(
323            "persist_compaction_memory_bound_bytes",
324            "83886080",
325            ["67108864", "134217728", "536870912", "1073741824"],
326        ),
327        VariableSystemParameter(
328            "persist_enable_incremental_compaction",
329            ("true" if version >= MzVersion.parse_mz("v0.161.0-dev") else "false"),
330            (
331                ["true", "false"]
332                if version >= MzVersion.parse_mz("v0.161.0-dev")
333                else ["false"]
334            ),
335        ),
336        VariableSystemParameter(
337            "persist_use_critical_since_catalog", "true", ["true", "false"]
338        ),
339        VariableSystemParameter(
340            "persist_use_critical_since_snapshot",
341            "false",  # always false, because we always have zero-downtime enabled
342            ["false"],
343        ),
344        VariableSystemParameter(
345            "persist_use_critical_since_source",
346            "false",  # always false, because we always have zero-downtime enabled
347            ["false"],
348        ),
349        VariableSystemParameter(
350            "persist_part_decode_format", "arrow", ["arrow", "row_with_validate"]
351        ),
352        VariableSystemParameter(
353            "persist_blob_cache_scale_with_threads", "true", ["true", "false"]
354        ),
355        VariableSystemParameter(
356            "persist_validate_part_bounds_on_read", "true", ["true", "false"]
357        ),
358        VariableSystemParameter(
359            "persist_validate_part_bounds_on_write", "true", ["true", "false"]
360        ),
361        VariableSystemParameter("pg_offset_known_interval", "1s", ["100ms", "1s"]),
362        VariableSystemParameter(
363            "statement_logging_default_sample_rate", "0.01", ["0", "0.01"]
364        ),
365        VariableSystemParameter(
366            "statement_logging_max_sample_rate", "0.01", ["0", "0.01"]
367        ),
368        VariableSystemParameter("storage_reclock_to_latest", "true", ["true", "false"]),
369        VariableSystemParameter(
370            "storage_source_decode_fuel",
371            "100000",
372            ["10000", "100000", "1000000"],
373        ),
374        VariableSystemParameter(
375            "storage_statistics_collection_interval",
376            "1000",
377            ["100", "1000", "10000"],
378        ),
379        VariableSystemParameter(
380            "storage_statistics_interval", "2000", ["100", "1000", "10000"]
381        ),
382        VariableSystemParameter(
383            "storage_use_continual_feedback_upsert", "true", ["true", "false"]
384        ),
385        VariableSystemParameter(
386            "sql_server_offset_known_interval", "1s", ["10ms", "100ms", "1s"]
387        ),
388        # End of list (ordered by name)
389    ]
390
391
392def get_default_system_parameters(
393    version: MzVersion | None = None,
394    force_source_table_syntax: bool = False,
395) -> dict[str, str]:
396    """For upgrade tests we only want parameters set when all environmentd /
397    clusterd processes have reached a specific version (or higher)
398    """
399
400    if not version:
401        version = MzVersion.parse_cargo()
402
403    params = get_minimal_system_parameters(version)
404
405    system_param_setting = os.getenv("CI_SYSTEM_PARAMETERS", "")
406    variable_params = get_variable_system_parameters(version, force_source_table_syntax)
407
408    if system_param_setting == "":
409        for param in variable_params:
410            params[param.key] = param.default
411    elif system_param_setting == "random":
412        seed = os.getenv("CI_SYSTEM_PARAMETERS_SEED", os.getenv("BUILDKITE_JOB_ID", 1))
413        rng = random.Random(seed)
414        for param in variable_params:
415            params[param.key] = rng.choice(param.values)
416        print(
417            f"System parameters with seed CI_SYSTEM_PARAMETERS_SEED={seed}: {params}",
418            file=sys.stderr,
419        )
420    elif system_param_setting == "minimal":
421        pass
422    else:
423        raise ValueError(
424            f"Unknown value for CI_SYSTEM_PARAMETERS: {system_param_setting}"
425        )
426
427    return params
428
429
430# If you are adding a new config flag in Materialize, consider setting values
431# for it in get_variable_system_parameters if it can be varied in tests. Set it
432# in get_minimal_system_parameters if it's required for tests to succeed at
433# all. Only add it in UNINTERESTING_SYSTEM_PARAMETERS if none of the above
434# apply.
435UNINTERESTING_SYSTEM_PARAMETERS = [
436    "enable_mz_join_core",
437    "linear_join_yielding",
438    "enable_lgalloc_eager_reclamation",
439    "lgalloc_background_interval",
440    "lgalloc_file_growth_dampener",
441    "lgalloc_local_buffer_bytes",
442    "lgalloc_slow_clear_bytes",
443    "memory_limiter_interval",
444    "memory_limiter_usage_bias",
445    "memory_limiter_burst_factor",
446    "compute_server_maintenance_interval",
447    "compute_dataflow_max_inflight_bytes_cc",
448    "compute_flat_map_fuel",
449    "compute_temporal_bucketing_summary",
450    "consolidating_vec_growth_dampener",
451    "copy_to_s3_parquet_row_group_file_ratio",
452    "copy_to_s3_arrow_builder_buffer_ratio",
453    "copy_to_s3_multipart_part_size_bytes",
454    "enable_compute_replica_expiration",
455    "enable_compute_render_fueled_as_specific_collection",
456    "compute_logical_backpressure_max_retained_capabilities",
457    "compute_logical_backpressure_inflight_slack",
458    "persist_fetch_semaphore_cost_adjustment",
459    "persist_fetch_semaphore_permit_adjustment",
460    "persist_optimize_ignored_data_fetch",
461    "persist_pubsub_same_process_delegate_enabled",
462    "persist_pubsub_connect_attempt_timeout",
463    "persist_pubsub_request_timeout",
464    "persist_pubsub_connect_max_backoff",
465    "persist_pubsub_client_sender_channel_size",
466    "persist_pubsub_client_receiver_channel_size",
467    "persist_pubsub_server_connection_channel_size",
468    "persist_pubsub_state_cache_shard_ref_channel_size",
469    "persist_pubsub_reconnect_backoff",
470    "persist_encoding_compression_format",
471    "persist_batch_max_runs",
472    "persist_write_combine_inline_writes",
473    "persist_reader_lease_duration",
474    "persist_consensus_connection_pool_max_size",
475    "persist_consensus_connection_pool_max_wait",
476    "persist_consensus_connection_pool_ttl",
477    "persist_consensus_connection_pool_ttl_stagger",
478    "crdb_connect_timeout",
479    "crdb_tcp_user_timeout",
480    "persist_use_critical_since_txn",
481    "use_global_txn_cache_source",
482    "persist_batch_builder_max_outstanding_parts",
483    "persist_compaction_heuristic_min_inputs",
484    "persist_compaction_heuristic_min_parts",
485    "persist_compaction_heuristic_min_updates",
486    "persist_gc_blob_delete_concurrency_limit",
487    "persist_state_versions_recent_live_diffs_limit",
488    "persist_usage_state_fetch_concurrency_limit",
489    "persist_blob_operation_timeout",
490    "persist_blob_operation_attempt_timeout",
491    "persist_blob_connect_timeout",
492    "persist_blob_read_timeout",
493    "persist_stats_collection_enabled",
494    "persist_stats_filter_enabled",
495    "persist_stats_budget_bytes",
496    "persist_stats_untrimmable_columns_equals",
497    "persist_stats_untrimmable_columns_prefix",
498    "persist_stats_untrimmable_columns_suffix",
499    "persist_expression_cache_force_compaction_fuel",
500    "persist_expression_cache_force_compaction_wait",
501    "persist_blob_cache_mem_limit_bytes",
502    "persist_blob_cache_scale_factor_bytes",
503    "persist_claim_unclaimed_compactions",
504    "persist_claim_compaction_percent",
505    "persist_claim_compaction_min_version",
506    "persist_next_listen_batch_retryer_multiplier",
507    "persist_rollup_threshold",
508    "persist_rollup_fallback_threshold_ms",
509    "persist_gc_fallback_threshold_ms",
510    "persist_compaction_minimum_timeout",
511    "persist_compaction_use_most_recent_schema",
512    "persist_compaction_check_process_flag",
513    "balancerd_sigterm_connection_wait",
514    "balancerd_sigterm_listen_wait",
515    "balancerd_inject_proxy_protocol_header_http",
516    "balancerd_log_filter",
517    "balancerd_opentelemetry_filter",
518    "balancerd_log_filter_defaults",
519    "balancerd_opentelemetry_filter_defaults",
520    "balancerd_sentry_filters",
521    "persist_enable_s3_lgalloc_cc_sizes",
522    "persist_enable_arrow_lgalloc_cc_sizes",
523    "controller_past_generation_replica_cleanup_retry_interval",
524    "wallclock_lag_recording_interval",
525    "wallclock_lag_histogram_period_interval",
526    "enable_timely_zero_copy",
527    "enable_timely_zero_copy_lgalloc",
528    "timely_zero_copy_limit",
529    "arrangement_exert_proportionality",
530    "txn_wal_apply_ensure_schema_match",
531    "persist_txns_data_shard_retryer_initial_backoff",
532    "persist_txns_data_shard_retryer_multiplier",
533    "persist_txns_data_shard_retryer_clamp",
534    "storage_cluster_shutdown_grace_period",
535    "storage_dataflow_delay_sources_past_rehydration",
536    "storage_dataflow_suspendable_sources",
537    "storage_downgrade_since_during_finalization",
538    "replica_metrics_history_retention_interval",
539    "wallclock_lag_history_retention_interval",
540    "wallclock_global_lag_histogram_retention_interval",
541    "kafka_client_id_enrichment_rules",
542    "kafka_poll_max_wait",
543    "kafka_default_aws_privatelink_endpoint_identification_algorithm",
544    "kafka_buffered_event_resize_threshold_elements",
545    "mysql_replication_heartbeat_interval",
546    "postgres_fetch_slot_resume_lsn_interval",
547    "pg_schema_validation_interval",
548    "storage_enforce_external_addresses",
549    "storage_upsert_prevent_snapshot_buffering",
550    "storage_rocksdb_use_merge_operator",
551    "storage_upsert_max_snapshot_batch_buffering",
552    "storage_rocksdb_cleanup_tries",
553    "storage_suspend_and_restart_delay",
554    "storage_server_maintenance_interval",
555    "storage_sink_progress_search",
556    "storage_sink_ensure_topic_config",
557    "sql_server_max_lsn_wait",
558    "sql_server_snapshot_progress_report_interval",
559    "sql_server_cdc_poll_interval",
560    "sql_server_cdc_cleanup_change_table",
561    "sql_server_cdc_cleanup_change_table_max_deletes",
562    "allow_user_sessions",
563    "with_0dt_deployment_ddl_check_interval",
564    "enable_0dt_caught_up_check",
565    "with_0dt_caught_up_check_allowed_lag",
566    "with_0dt_caught_up_check_cutoff",
567    "enable_0dt_caught_up_replica_status_check",
568    "plan_insights_notice_fast_path_clusters_optimize_duration",
569    "enable_continual_task_builtins",
570    "enable_expression_cache",
571    "mz_metrics_lgalloc_map_refresh_interval",
572    "mz_metrics_lgalloc_refresh_interval",
573    "mz_metrics_rusage_refresh_interval",
574    "compute_peek_response_stash_batch_max_runs",
575    "compute_peek_response_stash_read_batch_size_bytes",
576    "compute_peek_response_stash_read_memory_budget_bytes",
577    "compute_peek_stash_num_batches",
578    "compute_peek_stash_batch_size",
579    "storage_statistics_retention_duration",
580    "enable_paused_cluster_readhold_downgrade",
581    "enable_builtin_migration_schema_evolution",
582]
583
584
585DEFAULT_CRDB_ENVIRONMENT = [
586    "COCKROACH_ENGINE_MAX_SYNC_DURATION_DEFAULT=120s",
587    "COCKROACH_LOG_MAX_SYNC_DURATION=120s",
588]
589
590
591# TODO(benesch): change to `docker-mzcompose` once v0.39 ships.
592DEFAULT_CLOUD_PROVIDER = "mzcompose"
593DEFAULT_CLOUD_REGION = "us-east-1"
594DEFAULT_ORG_ID = "00000000-0000-0000-0000-000000000000"
595DEFAULT_ORDINAL = "0"
596DEFAULT_MZ_ENVIRONMENT_ID = f"{DEFAULT_CLOUD_PROVIDER}-{DEFAULT_CLOUD_REGION}-{DEFAULT_ORG_ID}-{DEFAULT_ORDINAL}"
597
598
599# TODO(benesch): replace with Docker health checks.
600def _check_tcp(
601    cmd: list[str], host: str, port: int, timeout_secs: int, kind: str = ""
602) -> list[str]:
603    cmd.extend(
604        [
605            "timeout",
606            str(timeout_secs),
607            "bash",
608            "-c",
609            f"until [ cat < /dev/null > /dev/tcp/{host}/{port} ] ; do sleep 0.1 ; done",
610        ]
611    )
612    try:
613        spawn.capture(cmd, stderr=subprocess.STDOUT)
614    except subprocess.CalledProcessError as e:
615        ui.log_in_automation(
616            "wait-for-tcp ({}{}:{}): error running {}: {}, stdout:\n{}\nstderr:\n{}".format(
617                kind, host, port, ui.shell_quote(cmd), e, e.stdout, e.stderr
618            )
619        )
620        raise
621    return cmd
622
623
624# TODO(benesch): replace with Docker health checks.
625def _wait_for_pg(
626    timeout_secs: int,
627    query: str,
628    dbname: str,
629    port: int,
630    host: str,
631    user: str,
632    password: str | None,
633    expected: Iterable[Any] | Literal["any"],
634    print_result: bool = False,
635    sslmode: str = "disable",
636) -> None:
637    """Wait for a pg-compatible database (includes materialized)"""
638    obfuscated_password = password[0:1] if password is not None else ""
639    args = f"dbname={dbname} host={host} port={port} user={user} password='{obfuscated_password}...'"
640    ui.progress(f"waiting for {args} to handle {query!r}", "C")
641    error = None
642    for remaining in ui.timeout_loop(timeout_secs, tick=0.5):
643        try:
644            conn = psycopg.connect(
645                dbname=dbname,
646                host=host,
647                port=port,
648                user=user,
649                password=password,
650                connect_timeout=1,
651                sslmode=sslmode,
652            )
653            # The default (autocommit = false) wraps everything in a transaction.
654            conn.autocommit = True
655            with conn.cursor() as cur:
656                cur.execute(query.encode())
657                if expected == "any" and cur.rowcount == -1:
658                    ui.progress(" success!", finish=True)
659                    return
660                result = list(cur.fetchall())
661                if expected == "any" or result == expected:
662                    if print_result:
663                        say(f"query result: {result}")
664                    else:
665                        ui.progress(" success!", finish=True)
666                    return
667                else:
668                    say(
669                        f"host={host} port={port} did not return rows matching {expected} got: {result}"
670                    )
671        except Exception as e:
672            ui.progress(f"{e if print_result else ''} {int(remaining)}")
673            error = e
674    ui.progress(finish=True)
675    raise UIError(f"never got correct result for {args}: {error}")
676
677
678def bootstrap_cluster_replica_size() -> str:
679    return "bootstrap"
680
681
682def cluster_replica_size_map() -> dict[str, dict[str, Any]]:
683    """scale=<n>,workers=<n>[,mem=<n>GiB][,legacy]"""
684
685    def replica_size(
686        scale: int,
687        workers: int,
688        disabled: bool = False,
689        is_cc: bool = True,
690        memory_limit: str = "4 GiB",
691    ) -> dict[str, Any]:
692        return {
693            "cpu_exclusive": False,
694            "cpu_limit": None,
695            "credits_per_hour": f"{workers * scale}",
696            "disabled": disabled,
697            "disk_limit": None,
698            "is_cc": is_cc,
699            "memory_limit": memory_limit,
700            "scale": scale,
701            "workers": workers,
702            # "selectors": {},
703        }
704
705    replica_sizes = {
706        bootstrap_cluster_replica_size(): replica_size(1, 1),
707        "scale=2,workers=4": replica_size(2, 4),
708        "scale=1,workers=1,legacy": replica_size(1, 1, is_cc=False),
709        "scale=1,workers=2,legacy": replica_size(1, 2, is_cc=False),
710        # Intentionally not following the naming scheme
711        "free": replica_size(0, 0, disabled=True),
712    }
713
714    for i in range(0, 6):
715        workers = 1 << i
716        replica_sizes[f"scale=1,workers={workers}"] = replica_size(1, workers)
717        for mem in [4, 8, 16, 32]:
718            replica_sizes[f"scale=1,workers={workers},mem={mem}GiB"] = replica_size(
719                1, workers, memory_limit=f"{mem} GiB"
720            )
721
722        replica_sizes[f"scale={workers},workers=1"] = replica_size(workers, 1)
723        replica_sizes[f"scale={workers},workers={workers}"] = replica_size(
724            workers, workers
725        )
726        replica_sizes[f"scale=1,workers={workers},mem={workers}GiB"] = replica_size(
727            1, workers, memory_limit=f"{workers} GiB"
728        )
729
730    return replica_sizes
def say(msg: str) -> None:
55    def say(msg: str) -> None:
56        if not Verbosity.quiet:
57            print(f"{prefix}{msg}", file=sys.stderr)

The type of the None singleton.

DEFAULT_CONFLUENT_PLATFORM_VERSION = '7.9.0'
DEFAULT_MZ_VOLUMES = ['mzdata:/mzdata', 'mydata:/var/lib/mysql-files', 'tmp:/share/tmp', 'scratch:/scratch']
ADDITIONAL_BENCHMARKING_SYSTEM_PARAMETERS = {'enable_statement_lifecycle_logging': 'false', 'persist_catalog_force_compaction_fuel': '0', 'statement_logging_default_sample_rate': '0', 'statement_logging_max_sample_rate': '0', 'persist_blob_cache_mem_limit_bytes': '1048576', 'persist_blob_cache_scale_with_threads': 'false', 'enable_compute_peek_response_stash': 'false'}
def get_minimal_system_parameters(version: materialize.mz_version.MzVersion) -> dict[str, str]:
 67def get_minimal_system_parameters(
 68    version: MzVersion,
 69) -> dict[str, str]:
 70    """Settings we need in order to have tests run at all, but otherwise stay
 71    with the defaults: not changing performance or increasing coverage."""
 72
 73    config = {
 74        # -----
 75        # Unsafe functions
 76        "unsafe_enable_unsafe_functions": "true",
 77        # -----
 78        # Others (ordered by name)
 79        "allow_real_time_recency": "true",
 80        "constraint_based_timestamp_selection": "verify",
 81        "enable_compute_peek_response_stash": "true",
 82        "enable_0dt_deployment_panic_after_timeout": "true",
 83        "enable_0dt_deployment_sources": (
 84            "true" if version >= MzVersion.parse_mz("v0.132.0-dev") else "false"
 85        ),
 86        "enable_alter_swap": "true",
 87        "enable_columnar_lgalloc": "false",
 88        "enable_columnation_lgalloc": "false",
 89        "enable_compute_correction_v2": "true",
 90        "enable_compute_logical_backpressure": "true",
 91        "enable_connection_validation_syntax": "true",
 92        "enable_continual_task_create": "true",
 93        "enable_continual_task_retain": "true",
 94        "enable_continual_task_transform": "true",
 95        "enable_copy_to_expr": "true",
 96        "enable_create_table_from_source": "true",
 97        "enable_eager_delta_joins": "true",
 98        "enable_envelope_debezium_in_subscribe": "true",
 99        "enable_expressions_in_limit_syntax": "true",
100        "enable_introspection_subscribes": "true",
101        "enable_kafka_sink_partition_by": "true",
102        "enable_lgalloc": "false",
103        "enable_load_generator_counter": "true",
104        "enable_logical_compaction_window": "true",
105        "enable_multi_worker_storage_persist_sink": "true",
106        "enable_multi_replica_sources": "true",
107        "enable_rbac_checks": "true",
108        "enable_reduce_mfp_fusion": "true",
109        "enable_refresh_every_mvs": "true",
110        "enable_cluster_schedule_refresh": "true",
111        "enable_sql_server_source": "true",
112        "enable_statement_lifecycle_logging": "true",
113        "enable_compute_temporal_bucketing": "true",
114        "enable_variadic_left_join_lowering": "true",
115        "enable_worker_core_affinity": "true",
116        "grpc_client_http2_keep_alive_timeout": "5s",
117        "ore_overflowing_behavior": "panic",
118        "unsafe_enable_table_keys": "true",
119        "with_0dt_deployment_max_wait": "1800s",
120        # End of list (ordered by name)
121    }
122
123    if version < MzVersion.parse_mz("v0.163.0-dev"):
124        config["enable_compute_active_dataflow_cancelation"] = "true"
125
126    return config

Settings we need in order to have tests run at all, but otherwise stay with the defaults: not changing performance or increasing coverage.

@dataclass
class VariableSystemParameter:
129@dataclass
130class VariableSystemParameter:
131    key: str
132    default: str
133    values: list[str]
VariableSystemParameter(key: str, default: str, values: list[str])
key: str
default: str
values: list[str]
def get_variable_system_parameters( version: materialize.mz_version.MzVersion, force_source_table_syntax: bool) -> list[VariableSystemParameter]:
137def get_variable_system_parameters(
138    version: MzVersion,
139    force_source_table_syntax: bool,
140) -> list[VariableSystemParameter]:
141    return [
142        # -----
143        # To reduce CRDB load as we are struggling with it in CI (values based on load test environment):
144        VariableSystemParameter(
145            "persist_next_listen_batch_retryer_clamp",
146            "16s",
147            ["100ms", "1s", "10s", "100s"],
148        ),
149        VariableSystemParameter(
150            "persist_next_listen_batch_retryer_initial_backoff",
151            "100ms",
152            ["10ms", "100ms", "1s", "10s"],
153        ),
154        VariableSystemParameter(
155            "persist_next_listen_batch_retryer_fixed_sleep",
156            "1200ms",
157            ["100ms", "1s", "10s"],
158        ),
159        # -----
160        # Persist internals changes, advance coverage
161        VariableSystemParameter(
162            "persist_enable_arrow_lgalloc_noncc_sizes", "true", ["true", "false"]
163        ),
164        VariableSystemParameter(
165            "persist_enable_s3_lgalloc_noncc_sizes", "true", ["true", "false"]
166        ),
167        # -----
168        # Others (ordered by name),
169        VariableSystemParameter(
170            "compute_dataflow_max_inflight_bytes",
171            "134217728",
172            ["1048576", "4194304", "16777216", "67108864"],
173        ),  # 128 MiB
174        VariableSystemParameter("compute_hydration_concurrency", "2", ["1", "2", "4"]),
175        VariableSystemParameter(
176            "compute_replica_expiration_offset", "3d", ["3d", "10d"]
177        ),
178        VariableSystemParameter(
179            "compute_apply_column_demands", "true", ["true", "false"]
180        ),
181        VariableSystemParameter(
182            "compute_peek_response_stash_threshold_bytes",
183            # 1 MiB, an in-between value
184            "1048576",
185            # force-enabled, the in-between, and the production value
186            ["0", "1048576", "314572800", "67108864"],
187        ),
188        VariableSystemParameter(
189            "enable_password_auth",
190            "true",
191            ["true", "false"],
192        ),
193        VariableSystemParameter(
194            "kafka_default_metadata_fetch_interval",
195            "1s",
196            ["100ms", "1s"],
197        ),
198        VariableSystemParameter("mysql_offset_known_interval", "1s", ["100ms", "1s"]),
199        VariableSystemParameter(
200            "force_source_table_syntax",
201            "true" if force_source_table_syntax else "false",
202            ["true", "false"] if force_source_table_syntax else ["false"],
203        ),
204        VariableSystemParameter(
205            "persist_batch_columnar_format",
206            "structured" if version > MzVersion.parse_mz("v0.135.0-dev") else "both_v2",
207            ["row", "both_v2", "both", "structured"],
208        ),
209        VariableSystemParameter(
210            "persist_batch_delete_enabled", "true", ["true", "false"]
211        ),
212        VariableSystemParameter(
213            "persist_batch_structured_order", "true", ["true", "false"]
214        ),
215        VariableSystemParameter(
216            "persist_batch_builder_structured", "true", ["true", "false"]
217        ),
218        VariableSystemParameter(
219            "persist_batch_structured_key_lower_len",
220            "256",
221            ["0", "1", "512", "1000"],
222        ),
223        VariableSystemParameter(
224            "persist_batch_max_run_len", "4", ["2", "3", "4", "16"]
225        ),
226        VariableSystemParameter(
227            "persist_catalog_force_compaction_fuel",
228            "1024",
229            ["256", "1024", "4096"],
230        ),
231        VariableSystemParameter(
232            "persist_catalog_force_compaction_wait",
233            "1s",
234            ["100ms", "1s", "10s"],
235        ),
236        VariableSystemParameter(
237            "persist_encoding_enable_dictionary", "true", ["true", "false"]
238        ),
239        VariableSystemParameter(
240            "persist_stats_audit_percent",
241            "100",
242            [
243                "0",
244                "1",
245                "2",
246                "10",
247                "100",
248            ],
249        ),
250        VariableSystemParameter("persist_stats_audit_panic", "true", ["true", "false"]),
251        VariableSystemParameter(
252            "persist_encoding_enable_dictionary", "true", ["true", "false"]
253        ),
254        VariableSystemParameter(
255            "persist_fast_path_limit",
256            "1000",
257            ["100", "1000", "10000"],
258        ),
259        VariableSystemParameter("persist_fast_path_order", "true", ["true", "false"]),
260        VariableSystemParameter(
261            "persist_gc_use_active_gc",
262            ("true" if version > MzVersion.parse_mz("v0.143.0-dev") else "false"),
263            (
264                ["true", "false"]
265                if version > MzVersion.parse_mz("v0.127.0-dev")
266                else ["false"]
267            ),
268        ),
269        VariableSystemParameter(
270            "persist_gc_min_versions",
271            "16",
272            ["16", "256", "1024"],
273        ),
274        VariableSystemParameter(
275            "persist_gc_max_versions",
276            "128000",
277            ["256", "128000"],
278        ),
279        VariableSystemParameter(
280            "persist_inline_writes_single_max_bytes",
281            "4096",
282            ["256", "1024", "4096", "16384"],
283        ),
284        VariableSystemParameter(
285            "persist_inline_writes_total_max_bytes",
286            "1048576",
287            ["65536", "262144", "1048576", "4194304"],
288        ),
289        VariableSystemParameter(
290            "persist_pubsub_client_enabled", "true", ["true", "false"]
291        ),
292        VariableSystemParameter(
293            "persist_pubsub_push_diff_enabled", "true", ["true", "false"]
294        ),
295        VariableSystemParameter(
296            "persist_record_compactions", "true", ["true", "false"]
297        ),
298        VariableSystemParameter(
299            "persist_record_schema_id",
300            ("true" if version > MzVersion.parse_mz("v0.127.0-dev") else "false"),
301            (
302                ["true", "false"]
303                if version > MzVersion.parse_mz("v0.127.0-dev")
304                else ["false"]
305            ),
306        ),
307        VariableSystemParameter(
308            "persist_rollup_use_active_rollup",
309            ("true" if version > MzVersion.parse_mz("v0.143.0-dev") else "false"),
310            (
311                ["true", "false"]
312                if version > MzVersion.parse_mz("v0.127.0-dev")
313                else ["false"]
314            ),
315        ),
316        # 16 MiB - large enough to avoid a big perf hit, small enough to get more coverage...
317        VariableSystemParameter(
318            "persist_blob_target_size",
319            "16777216",
320            ["4096", "1048576", "16777216", "134217728"],
321        ),
322        # 5 times the default part size - 4 is the bare minimum.
323        VariableSystemParameter(
324            "persist_compaction_memory_bound_bytes",
325            "83886080",
326            ["67108864", "134217728", "536870912", "1073741824"],
327        ),
328        VariableSystemParameter(
329            "persist_enable_incremental_compaction",
330            ("true" if version >= MzVersion.parse_mz("v0.161.0-dev") else "false"),
331            (
332                ["true", "false"]
333                if version >= MzVersion.parse_mz("v0.161.0-dev")
334                else ["false"]
335            ),
336        ),
337        VariableSystemParameter(
338            "persist_use_critical_since_catalog", "true", ["true", "false"]
339        ),
340        VariableSystemParameter(
341            "persist_use_critical_since_snapshot",
342            "false",  # always false, because we always have zero-downtime enabled
343            ["false"],
344        ),
345        VariableSystemParameter(
346            "persist_use_critical_since_source",
347            "false",  # always false, because we always have zero-downtime enabled
348            ["false"],
349        ),
350        VariableSystemParameter(
351            "persist_part_decode_format", "arrow", ["arrow", "row_with_validate"]
352        ),
353        VariableSystemParameter(
354            "persist_blob_cache_scale_with_threads", "true", ["true", "false"]
355        ),
356        VariableSystemParameter(
357            "persist_validate_part_bounds_on_read", "true", ["true", "false"]
358        ),
359        VariableSystemParameter(
360            "persist_validate_part_bounds_on_write", "true", ["true", "false"]
361        ),
362        VariableSystemParameter("pg_offset_known_interval", "1s", ["100ms", "1s"]),
363        VariableSystemParameter(
364            "statement_logging_default_sample_rate", "0.01", ["0", "0.01"]
365        ),
366        VariableSystemParameter(
367            "statement_logging_max_sample_rate", "0.01", ["0", "0.01"]
368        ),
369        VariableSystemParameter("storage_reclock_to_latest", "true", ["true", "false"]),
370        VariableSystemParameter(
371            "storage_source_decode_fuel",
372            "100000",
373            ["10000", "100000", "1000000"],
374        ),
375        VariableSystemParameter(
376            "storage_statistics_collection_interval",
377            "1000",
378            ["100", "1000", "10000"],
379        ),
380        VariableSystemParameter(
381            "storage_statistics_interval", "2000", ["100", "1000", "10000"]
382        ),
383        VariableSystemParameter(
384            "storage_use_continual_feedback_upsert", "true", ["true", "false"]
385        ),
386        VariableSystemParameter(
387            "sql_server_offset_known_interval", "1s", ["10ms", "100ms", "1s"]
388        ),
389        # End of list (ordered by name)
390    ]
def get_default_system_parameters( version: materialize.mz_version.MzVersion | None = None, force_source_table_syntax: bool = False) -> dict[str, str]:
393def get_default_system_parameters(
394    version: MzVersion | None = None,
395    force_source_table_syntax: bool = False,
396) -> dict[str, str]:
397    """For upgrade tests we only want parameters set when all environmentd /
398    clusterd processes have reached a specific version (or higher)
399    """
400
401    if not version:
402        version = MzVersion.parse_cargo()
403
404    params = get_minimal_system_parameters(version)
405
406    system_param_setting = os.getenv("CI_SYSTEM_PARAMETERS", "")
407    variable_params = get_variable_system_parameters(version, force_source_table_syntax)
408
409    if system_param_setting == "":
410        for param in variable_params:
411            params[param.key] = param.default
412    elif system_param_setting == "random":
413        seed = os.getenv("CI_SYSTEM_PARAMETERS_SEED", os.getenv("BUILDKITE_JOB_ID", 1))
414        rng = random.Random(seed)
415        for param in variable_params:
416            params[param.key] = rng.choice(param.values)
417        print(
418            f"System parameters with seed CI_SYSTEM_PARAMETERS_SEED={seed}: {params}",
419            file=sys.stderr,
420        )
421    elif system_param_setting == "minimal":
422        pass
423    else:
424        raise ValueError(
425            f"Unknown value for CI_SYSTEM_PARAMETERS: {system_param_setting}"
426        )
427
428    return params

For upgrade tests we only want parameters set when all environmentd / clusterd processes have reached a specific version (or higher)

UNINTERESTING_SYSTEM_PARAMETERS = ['enable_mz_join_core', 'linear_join_yielding', 'enable_lgalloc_eager_reclamation', 'lgalloc_background_interval', 'lgalloc_file_growth_dampener', 'lgalloc_local_buffer_bytes', 'lgalloc_slow_clear_bytes', 'memory_limiter_interval', 'memory_limiter_usage_bias', 'memory_limiter_burst_factor', 'compute_server_maintenance_interval', 'compute_dataflow_max_inflight_bytes_cc', 'compute_flat_map_fuel', 'compute_temporal_bucketing_summary', 'consolidating_vec_growth_dampener', 'copy_to_s3_parquet_row_group_file_ratio', 'copy_to_s3_arrow_builder_buffer_ratio', 'copy_to_s3_multipart_part_size_bytes', 'enable_compute_replica_expiration', 'enable_compute_render_fueled_as_specific_collection', 'compute_logical_backpressure_max_retained_capabilities', 'compute_logical_backpressure_inflight_slack', 'persist_fetch_semaphore_cost_adjustment', 'persist_fetch_semaphore_permit_adjustment', 'persist_optimize_ignored_data_fetch', 'persist_pubsub_same_process_delegate_enabled', 'persist_pubsub_connect_attempt_timeout', 'persist_pubsub_request_timeout', 'persist_pubsub_connect_max_backoff', 'persist_pubsub_client_sender_channel_size', 'persist_pubsub_client_receiver_channel_size', 'persist_pubsub_server_connection_channel_size', 'persist_pubsub_state_cache_shard_ref_channel_size', 'persist_pubsub_reconnect_backoff', 'persist_encoding_compression_format', 'persist_batch_max_runs', 'persist_write_combine_inline_writes', 'persist_reader_lease_duration', 'persist_consensus_connection_pool_max_size', 'persist_consensus_connection_pool_max_wait', 'persist_consensus_connection_pool_ttl', 'persist_consensus_connection_pool_ttl_stagger', 'crdb_connect_timeout', 'crdb_tcp_user_timeout', 'persist_use_critical_since_txn', 'use_global_txn_cache_source', 'persist_batch_builder_max_outstanding_parts', 'persist_compaction_heuristic_min_inputs', 'persist_compaction_heuristic_min_parts', 'persist_compaction_heuristic_min_updates', 'persist_gc_blob_delete_concurrency_limit', 'persist_state_versions_recent_live_diffs_limit', 'persist_usage_state_fetch_concurrency_limit', 'persist_blob_operation_timeout', 'persist_blob_operation_attempt_timeout', 'persist_blob_connect_timeout', 'persist_blob_read_timeout', 'persist_stats_collection_enabled', 'persist_stats_filter_enabled', 'persist_stats_budget_bytes', 'persist_stats_untrimmable_columns_equals', 'persist_stats_untrimmable_columns_prefix', 'persist_stats_untrimmable_columns_suffix', 'persist_expression_cache_force_compaction_fuel', 'persist_expression_cache_force_compaction_wait', 'persist_blob_cache_mem_limit_bytes', 'persist_blob_cache_scale_factor_bytes', 'persist_claim_unclaimed_compactions', 'persist_claim_compaction_percent', 'persist_claim_compaction_min_version', 'persist_next_listen_batch_retryer_multiplier', 'persist_rollup_threshold', 'persist_rollup_fallback_threshold_ms', 'persist_gc_fallback_threshold_ms', 'persist_compaction_minimum_timeout', 'persist_compaction_use_most_recent_schema', 'persist_compaction_check_process_flag', 'balancerd_sigterm_connection_wait', 'balancerd_sigterm_listen_wait', 'balancerd_inject_proxy_protocol_header_http', 'balancerd_log_filter', 'balancerd_opentelemetry_filter', 'balancerd_log_filter_defaults', 'balancerd_opentelemetry_filter_defaults', 'balancerd_sentry_filters', 'persist_enable_s3_lgalloc_cc_sizes', 'persist_enable_arrow_lgalloc_cc_sizes', 'controller_past_generation_replica_cleanup_retry_interval', 'wallclock_lag_recording_interval', 'wallclock_lag_histogram_period_interval', 'enable_timely_zero_copy', 'enable_timely_zero_copy_lgalloc', 'timely_zero_copy_limit', 'arrangement_exert_proportionality', 'txn_wal_apply_ensure_schema_match', 'persist_txns_data_shard_retryer_initial_backoff', 'persist_txns_data_shard_retryer_multiplier', 'persist_txns_data_shard_retryer_clamp', 'storage_cluster_shutdown_grace_period', 'storage_dataflow_delay_sources_past_rehydration', 'storage_dataflow_suspendable_sources', 'storage_downgrade_since_during_finalization', 'replica_metrics_history_retention_interval', 'wallclock_lag_history_retention_interval', 'wallclock_global_lag_histogram_retention_interval', 'kafka_client_id_enrichment_rules', 'kafka_poll_max_wait', 'kafka_default_aws_privatelink_endpoint_identification_algorithm', 'kafka_buffered_event_resize_threshold_elements', 'mysql_replication_heartbeat_interval', 'postgres_fetch_slot_resume_lsn_interval', 'pg_schema_validation_interval', 'storage_enforce_external_addresses', 'storage_upsert_prevent_snapshot_buffering', 'storage_rocksdb_use_merge_operator', 'storage_upsert_max_snapshot_batch_buffering', 'storage_rocksdb_cleanup_tries', 'storage_suspend_and_restart_delay', 'storage_server_maintenance_interval', 'storage_sink_progress_search', 'storage_sink_ensure_topic_config', 'sql_server_max_lsn_wait', 'sql_server_snapshot_progress_report_interval', 'sql_server_cdc_poll_interval', 'sql_server_cdc_cleanup_change_table', 'sql_server_cdc_cleanup_change_table_max_deletes', 'allow_user_sessions', 'with_0dt_deployment_ddl_check_interval', 'enable_0dt_caught_up_check', 'with_0dt_caught_up_check_allowed_lag', 'with_0dt_caught_up_check_cutoff', 'enable_0dt_caught_up_replica_status_check', 'plan_insights_notice_fast_path_clusters_optimize_duration', 'enable_continual_task_builtins', 'enable_expression_cache', 'mz_metrics_lgalloc_map_refresh_interval', 'mz_metrics_lgalloc_refresh_interval', 'mz_metrics_rusage_refresh_interval', 'compute_peek_response_stash_batch_max_runs', 'compute_peek_response_stash_read_batch_size_bytes', 'compute_peek_response_stash_read_memory_budget_bytes', 'compute_peek_stash_num_batches', 'compute_peek_stash_batch_size', 'storage_statistics_retention_duration', 'enable_paused_cluster_readhold_downgrade', 'enable_builtin_migration_schema_evolution']
DEFAULT_CRDB_ENVIRONMENT = ['COCKROACH_ENGINE_MAX_SYNC_DURATION_DEFAULT=120s', 'COCKROACH_LOG_MAX_SYNC_DURATION=120s']
DEFAULT_CLOUD_PROVIDER = 'mzcompose'
DEFAULT_CLOUD_REGION = 'us-east-1'
DEFAULT_ORG_ID = '00000000-0000-0000-0000-000000000000'
DEFAULT_ORDINAL = '0'
DEFAULT_MZ_ENVIRONMENT_ID = 'mzcompose-us-east-1-00000000-0000-0000-0000-000000000000-0'
def bootstrap_cluster_replica_size() -> str:
679def bootstrap_cluster_replica_size() -> str:
680    return "bootstrap"
def cluster_replica_size_map() -> dict[str, dict[str, typing.Any]]:
683def cluster_replica_size_map() -> dict[str, dict[str, Any]]:
684    """scale=<n>,workers=<n>[,mem=<n>GiB][,legacy]"""
685
686    def replica_size(
687        scale: int,
688        workers: int,
689        disabled: bool = False,
690        is_cc: bool = True,
691        memory_limit: str = "4 GiB",
692    ) -> dict[str, Any]:
693        return {
694            "cpu_exclusive": False,
695            "cpu_limit": None,
696            "credits_per_hour": f"{workers * scale}",
697            "disabled": disabled,
698            "disk_limit": None,
699            "is_cc": is_cc,
700            "memory_limit": memory_limit,
701            "scale": scale,
702            "workers": workers,
703            # "selectors": {},
704        }
705
706    replica_sizes = {
707        bootstrap_cluster_replica_size(): replica_size(1, 1),
708        "scale=2,workers=4": replica_size(2, 4),
709        "scale=1,workers=1,legacy": replica_size(1, 1, is_cc=False),
710        "scale=1,workers=2,legacy": replica_size(1, 2, is_cc=False),
711        # Intentionally not following the naming scheme
712        "free": replica_size(0, 0, disabled=True),
713    }
714
715    for i in range(0, 6):
716        workers = 1 << i
717        replica_sizes[f"scale=1,workers={workers}"] = replica_size(1, workers)
718        for mem in [4, 8, 16, 32]:
719            replica_sizes[f"scale=1,workers={workers},mem={mem}GiB"] = replica_size(
720                1, workers, memory_limit=f"{mem} GiB"
721            )
722
723        replica_sizes[f"scale={workers},workers=1"] = replica_size(workers, 1)
724        replica_sizes[f"scale={workers},workers={workers}"] = replica_size(
725            workers, workers
726        )
727        replica_sizes[f"scale=1,workers={workers},mem={workers}GiB"] = replica_size(
728            1, workers, memory_limit=f"{workers} GiB"
729        )
730
731    return replica_sizes

scale=,workers=[,mem=GiB][,legacy]