misc.python.materialize.mzcompose

The implementation of the mzcompose system for Docker compositions.

For an overview of what mzcompose is and why it exists, see the user-facing documentation.

  1# Copyright Materialize, Inc. and contributors. All rights reserved.
  2#
  3# Use of this software is governed by the Business Source License
  4# included in the LICENSE file at the root of this repository.
  5#
  6# As of the Change Date specified in that file, in accordance with
  7# the Business Source License, use of this software will be governed
  8# by the Apache License, Version 2.0.
  9
 10"""The implementation of the mzcompose system for Docker compositions.
 11
 12For an overview of what mzcompose is and why it exists, see the [user-facing
 13documentation][user-docs].
 14
 15[user-docs]: https://github.com/MaterializeInc/materialize/blob/main/doc/developer/mzbuild.md
 16"""
 17
 18import os
 19import random
 20import subprocess
 21import sys
 22from collections.abc import Iterable
 23from dataclasses import dataclass
 24from typing import Any, Literal, TypeVar
 25
 26import psycopg
 27
 28from materialize import spawn, ui
 29from materialize.mz_version import MzVersion
 30from materialize.ui import UIError
 31
 32T = TypeVar("T")
 33say = ui.speaker("C> ")
 34
 35
 36DEFAULT_CONFLUENT_PLATFORM_VERSION = "7.9.0"
 37
 38DEFAULT_MZ_VOLUMES = [
 39    "mzdata:/mzdata",
 40    "mydata:/var/lib/mysql-files",
 41    "tmp:/share/tmp",
 42    "scratch:/scratch",
 43]
 44
 45
 46# Parameters which disable systems that periodically/unpredictably impact performance
 47ADDITIONAL_BENCHMARKING_SYSTEM_PARAMETERS = {
 48    "enable_statement_lifecycle_logging": "false",
 49    "persist_catalog_force_compaction_fuel": "0",
 50    "statement_logging_default_sample_rate": "0",
 51    "statement_logging_max_sample_rate": "0",
 52    # Default of 128 MB increases memory usage by a lot for some small
 53    # performance in benchmarks, see for example FastPathLimit scenario: 55%
 54    # more memory, 5% faster
 55    "persist_blob_cache_mem_limit_bytes": "1048576",
 56    # This would increase the memory usage of many tests, making it harder to
 57    # tell small memory increase regressions
 58    "persist_blob_cache_scale_with_threads": "false",
 59    # The peek response stash kicks in when results get larger, and it
 60    # increases query latency. Which in turn makes benchmarking more
 61    # unpredictable.
 62    "enable_compute_peek_response_stash": "false",
 63}
 64
 65
 66def get_minimal_system_parameters(
 67    version: MzVersion,
 68    zero_downtime: bool = False,
 69) -> dict[str, str]:
 70    """Settings we need in order to have tests run at all, but otherwise stay
 71    with the defaults: not changing performance or increasing coverage."""
 72
 73    return {
 74        # -----
 75        # Unsafe functions
 76        "unsafe_enable_unsafe_functions": "true",
 77        # -----
 78        # Others (ordered by name)
 79        "allow_real_time_recency": "true",
 80        "constraint_based_timestamp_selection": "verify",
 81        "enable_compute_peek_response_stash": "true",
 82        "enable_0dt_deployment": "true" if zero_downtime else "false",
 83        "enable_0dt_deployment_panic_after_timeout": "true",
 84        "enable_0dt_deployment_sources": (
 85            "true" if version >= MzVersion.parse_mz("v0.132.0-dev") else "false"
 86        ),
 87        "enable_alter_swap": "true",
 88        "enable_columnation_lgalloc": "true",
 89        "enable_compute_active_dataflow_cancelation": "true",
 90        "enable_compute_correction_v2": "true",
 91        "enable_compute_logical_backpressure": "true",
 92        "enable_connection_validation_syntax": "true",
 93        "enable_continual_task_create": "true",
 94        "enable_continual_task_retain": "true",
 95        "enable_continual_task_transform": "true",
 96        "enable_copy_to_expr": "true",
 97        "enable_create_table_from_source": "true",
 98        "enable_disk_cluster_replicas": "true",
 99        "enable_eager_delta_joins": "true",
100        "enable_envelope_debezium_in_subscribe": "true",
101        "enable_expressions_in_limit_syntax": "true",
102        "enable_introspection_subscribes": "true",
103        "enable_kafka_sink_partition_by": "true",
104        "enable_logical_compaction_window": "true",
105        "enable_multi_worker_storage_persist_sink": "true",
106        "enable_multi_replica_sources": "true",
107        "enable_rbac_checks": "true",
108        "enable_reduce_mfp_fusion": "true",
109        "enable_refresh_every_mvs": "true",
110        "enable_cluster_schedule_refresh": "true",
111        "enable_statement_lifecycle_logging": "true",
112        "enable_compute_temporal_bucketing": "false",
113        "enable_variadic_left_join_lowering": "true",
114        "enable_worker_core_affinity": "true",
115        "grpc_client_http2_keep_alive_timeout": "5s",
116        "ore_overflowing_behavior": "panic",
117        "persist_stats_audit_percent": "100",
118        "unsafe_enable_table_keys": "true",
119        "with_0dt_deployment_max_wait": "1800s",
120        # End of list (ordered by name)
121    }
122
123
124@dataclass
125class VariableSystemParameter:
126    key: str
127    default: str
128    values: list[str]
129
130
131# TODO: The linter should check this too
132def get_variable_system_parameters(
133    version: MzVersion,
134    zero_downtime: bool,
135    force_source_table_syntax: bool,
136) -> list[VariableSystemParameter]:
137    return [
138        # -----
139        # To reduce CRDB load as we are struggling with it in CI (values based on load test environment):
140        VariableSystemParameter(
141            "persist_next_listen_batch_retryer_clamp",
142            "16s",
143            ["100ms", "1s", "10s", "100s"],
144        ),
145        VariableSystemParameter(
146            "persist_next_listen_batch_retryer_initial_backoff",
147            "100ms",
148            ["10ms", "100ms", "1s", "10s"],
149        ),
150        VariableSystemParameter(
151            "persist_next_listen_batch_retryer_fixed_sleep",
152            "1200ms",
153            ["100ms", "1s", "10s"],
154        ),
155        # -----
156        # Persist internals changes, advance coverage
157        VariableSystemParameter(
158            "persist_enable_arrow_lgalloc_noncc_sizes", "true", ["true", "false"]
159        ),
160        VariableSystemParameter(
161            "persist_enable_s3_lgalloc_noncc_sizes", "true", ["true", "false"]
162        ),
163        # -----
164        # Others (ordered by name),
165        VariableSystemParameter("cluster_always_use_disk", "true", ["true", "false"]),
166        VariableSystemParameter(
167            "compute_dataflow_max_inflight_bytes",
168            "134217728",
169            ["1048576", "4194304", "16777216", "67108864"],
170        ),  # 128 MiB
171        VariableSystemParameter("compute_hydration_concurrency", "2", ["1", "2", "4"]),
172        VariableSystemParameter(
173            "compute_replica_expiration_offset", "3d", ["3d", "10d"]
174        ),
175        VariableSystemParameter(
176            "compute_apply_column_demands", "true", ["true", "false"]
177        ),
178        VariableSystemParameter(
179            "compute_peek_response_stash_threshold_bytes",
180            # 1 MiB, an in-between value
181            "1048576",
182            # force-enabled, the in-between, and the production value
183            ["0", "1048576", "314572800", "67108864"],
184        ),
185        VariableSystemParameter(
186            "disk_cluster_replicas_default", "true", ["true", "false"]
187        ),
188        VariableSystemParameter(
189            "kafka_default_metadata_fetch_interval",
190            "1s",
191            ["100ms", "1s"],
192        ),
193        VariableSystemParameter("mysql_offset_known_interval", "1s", ["100ms", "1s"]),
194        VariableSystemParameter(
195            "force_source_table_syntax",
196            "true" if force_source_table_syntax else "false",
197            ["true", "false"] if force_source_table_syntax else ["false"],
198        ),
199        VariableSystemParameter(
200            "persist_batch_columnar_format",
201            "structured" if version > MzVersion.parse_mz("v0.135.0-dev") else "both_v2",
202            ["row", "both_v2", "both", "structured"],
203        ),
204        VariableSystemParameter(
205            "persist_batch_delete_enabled", "true", ["true", "false"]
206        ),
207        VariableSystemParameter(
208            "persist_batch_structured_order", "true", ["true", "false"]
209        ),
210        VariableSystemParameter(
211            "persist_batch_builder_structured", "true", ["true", "false"]
212        ),
213        VariableSystemParameter(
214            "persist_batch_structured_key_lower_len",
215            "256",
216            ["0", "1", "512", "1000"],
217        ),
218        VariableSystemParameter(
219            "persist_batch_max_run_len", "4", ["2", "3", "4", "16"]
220        ),
221        VariableSystemParameter(
222            "persist_catalog_force_compaction_fuel",
223            "1024",
224            ["256", "1024", "4096"],
225        ),
226        VariableSystemParameter(
227            "persist_catalog_force_compaction_wait",
228            "1s",
229            ["100ms", "1s", "10s"],
230        ),
231        VariableSystemParameter(
232            "persist_encoding_enable_dictionary", "true", ["true", "false"]
233        ),
234        VariableSystemParameter(
235            "persist_fast_path_limit",
236            "1000",
237            ["100", "1000", "10000"],
238        ),
239        VariableSystemParameter("persist_fast_path_order", "true", ["true", "false"]),
240        VariableSystemParameter(
241            "persist_gc_use_active_gc",
242            ("true" if version > MzVersion.parse_mz("v0.143.0-dev") else "false"),
243            (
244                ["true", "false"]
245                if version > MzVersion.parse_mz("v0.127.0-dev")
246                else ["false"]
247            ),
248        ),
249        VariableSystemParameter(
250            "persist_gc_min_versions",
251            "16",
252            ["16", "256", "1024"],
253        ),
254        VariableSystemParameter(
255            "persist_gc_max_versions",
256            "128000",
257            ["256", "128000"],
258        ),
259        VariableSystemParameter(
260            "persist_inline_writes_single_max_bytes",
261            "4096",
262            ["256", "1024", "4096", "16384"],
263        ),
264        VariableSystemParameter(
265            "persist_inline_writes_total_max_bytes",
266            "1048576",
267            ["65536", "262144", "1048576", "4194304"],
268        ),
269        VariableSystemParameter(
270            "persist_pubsub_client_enabled", "true", ["true", "false"]
271        ),
272        VariableSystemParameter(
273            "persist_pubsub_push_diff_enabled", "true", ["true", "false"]
274        ),
275        VariableSystemParameter(
276            "persist_record_compactions", "true", ["true", "false"]
277        ),
278        VariableSystemParameter(
279            "persist_record_schema_id",
280            ("true" if version > MzVersion.parse_mz("v0.127.0-dev") else "false"),
281            (
282                ["true", "false"]
283                if version > MzVersion.parse_mz("v0.127.0-dev")
284                else ["false"]
285            ),
286        ),
287        VariableSystemParameter(
288            "persist_rollup_use_active_rollup",
289            ("true" if version > MzVersion.parse_mz("v0.143.0-dev") else "false"),
290            (
291                ["true", "false"]
292                if version > MzVersion.parse_mz("v0.127.0-dev")
293                else ["false"]
294            ),
295        ),
296        # 16 MiB - large enough to avoid a big perf hit, small enough to get more coverage...
297        VariableSystemParameter(
298            "persist_blob_target_size",
299            "16777216",
300            ["4096", "1048576", "16777216", "134217728"],
301        ),
302        # 5 times the default part size - 4 is the bare minimum.
303        VariableSystemParameter(
304            "persist_compaction_memory_bound_bytes",
305            "83886080",
306            ["67108864", "134217728", "536870912", "1073741824"],
307        ),
308        VariableSystemParameter(
309            "persist_use_critical_since_catalog", "true", ["true", "false"]
310        ),
311        VariableSystemParameter(
312            "persist_use_critical_since_snapshot",
313            "false" if zero_downtime else "true",
314            ["false"] if zero_downtime else ["true", "false"],
315        ),
316        VariableSystemParameter(
317            "persist_use_critical_since_source",
318            "false" if zero_downtime else "true",
319            ["false"] if zero_downtime else ["true", "false"],
320        ),
321        VariableSystemParameter(
322            "persist_part_decode_format", "arrow", ["arrow", "row_with_validate"]
323        ),
324        VariableSystemParameter(
325            "persist_blob_cache_scale_with_threads", "true", ["true", "false"]
326        ),
327        VariableSystemParameter(
328            "persist_validate_part_bounds_on_read", "true", ["true", "false"]
329        ),
330        VariableSystemParameter(
331            "persist_validate_part_bounds_on_write", "true", ["true", "false"]
332        ),
333        VariableSystemParameter("pg_offset_known_interval", "1s", ["100ms", "1s"]),
334        VariableSystemParameter(
335            "statement_logging_default_sample_rate", "0.01", ["0", "0.01"]
336        ),
337        VariableSystemParameter(
338            "statement_logging_max_sample_rate", "0.01", ["0", "0.01"]
339        ),
340        VariableSystemParameter("storage_reclock_to_latest", "true", ["true", "false"]),
341        VariableSystemParameter(
342            "storage_source_decode_fuel",
343            "100000",
344            ["10000", "100000", "1000000"],
345        ),
346        VariableSystemParameter(
347            "storage_statistics_collection_interval",
348            "1000",
349            ["100", "1000", "10000"],
350        ),
351        VariableSystemParameter(
352            "storage_statistics_interval", "2000", ["100", "1000", "10000"]
353        ),
354        VariableSystemParameter(
355            "storage_use_continual_feedback_upsert", "true", ["true", "false"]
356        ),
357        # End of list (ordered by name)
358    ]
359
360
361def get_default_system_parameters(
362    version: MzVersion | None = None,
363    zero_downtime: bool = False,
364    force_source_table_syntax: bool = False,
365) -> dict[str, str]:
366    """For upgrade tests we only want parameters set when all environmentd /
367    clusterd processes have reached a specific version (or higher)
368    """
369
370    if not version:
371        version = MzVersion.parse_cargo()
372
373    params = get_minimal_system_parameters(version, zero_downtime)
374
375    system_param_setting = os.getenv("CI_SYSTEM_PARAMETERS", "")
376    variable_params = get_variable_system_parameters(
377        version, zero_downtime, force_source_table_syntax
378    )
379
380    if system_param_setting == "":
381        for param in variable_params:
382            params[param.key] = param.default
383    elif system_param_setting == "random":
384        seed = os.getenv("CI_SYSTEM_PARAMETERS_SEED", os.getenv("BUILDKITE_JOB_ID", 1))
385        rng = random.Random(seed)
386        for param in variable_params:
387            params[param.key] = rng.choice(param.values)
388        print(
389            f"System parameters with seed CI_SYSTEM_PARAMETERS_SEED={seed}: {params}",
390            file=sys.stderr,
391        )
392    elif system_param_setting == "minimal":
393        pass
394    else:
395        raise ValueError(
396            f"Unknown value for CI_SYSTEM_PARAMETERS: {system_param_setting}"
397        )
398
399    return params
400
401
402# If you are adding a new config flag in Materialize, consider setting values
403# for it in get_variable_system_parameters if it can be varied in tests. Set it
404# in get_minimal_system_parameters if it's required for tests to succeed at
405# all. Only add it in UNINTERESTING_SYSTEM_PARAMETERS if none of the above
406# apply.
407UNINTERESTING_SYSTEM_PARAMETERS = [
408    "enable_mz_join_core",
409    "enable_compute_mv_append_smearing",
410    "linear_join_yielding",
411    "enable_lgalloc",
412    "enable_lgalloc_eager_reclamation",
413    "lgalloc_background_interval",
414    "lgalloc_file_growth_dampener",
415    "lgalloc_local_buffer_bytes",
416    "lgalloc_slow_clear_bytes",
417    "lgalloc_limiter_interval",
418    "lgalloc_limiter_usage_factor",
419    "lgalloc_limiter_usage_bias",
420    "lgalloc_limiter_burst_factor",
421    "memory_limiter_interval",
422    "memory_limiter_usage_factor",
423    "memory_limiter_usage_bias",
424    "memory_limiter_burst_factor",
425    "enable_columnar_lgalloc",
426    "compute_server_maintenance_interval",
427    "compute_dataflow_max_inflight_bytes_cc",
428    "compute_flat_map_fuel",
429    "consolidating_vec_growth_dampener",
430    "copy_to_s3_parquet_row_group_file_ratio",
431    "copy_to_s3_arrow_builder_buffer_ratio",
432    "copy_to_s3_multipart_part_size_bytes",
433    "enable_compute_replica_expiration",
434    "enable_compute_render_fueled_as_specific_collection",
435    "compute_logical_backpressure_max_retained_capabilities",
436    "compute_logical_backpressure_inflight_slack",
437    "persist_fetch_semaphore_cost_adjustment",
438    "persist_fetch_semaphore_permit_adjustment",
439    "persist_optimize_ignored_data_fetch",
440    "persist_pubsub_same_process_delegate_enabled",
441    "persist_pubsub_connect_attempt_timeout",
442    "persist_pubsub_request_timeout",
443    "persist_pubsub_connect_max_backoff",
444    "persist_pubsub_client_sender_channel_size",
445    "persist_pubsub_client_receiver_channel_size",
446    "persist_pubsub_server_connection_channel_size",
447    "persist_pubsub_state_cache_shard_ref_channel_size",
448    "persist_pubsub_reconnect_backoff",
449    "persist_encoding_compression_format",
450    "persist_batch_max_runs",
451    "persist_write_combine_inline_writes",
452    "persist_reader_lease_duration",
453    "persist_consensus_connection_pool_max_size",
454    "persist_consensus_connection_pool_max_wait",
455    "persist_consensus_connection_pool_ttl",
456    "persist_consensus_connection_pool_ttl_stagger",
457    "crdb_connect_timeout",
458    "crdb_tcp_user_timeout",
459    "persist_use_critical_since_txn",
460    "use_global_txn_cache_source",
461    "persist_batch_builder_max_outstanding_parts",
462    "persist_compaction_heuristic_min_inputs",
463    "persist_compaction_heuristic_min_parts",
464    "persist_compaction_heuristic_min_updates",
465    "persist_gc_blob_delete_concurrency_limit",
466    "persist_state_versions_recent_live_diffs_limit",
467    "persist_usage_state_fetch_concurrency_limit",
468    "persist_blob_operation_timeout",
469    "persist_blob_operation_attempt_timeout",
470    "persist_blob_connect_timeout",
471    "persist_blob_read_timeout",
472    "persist_stats_collection_enabled",
473    "persist_stats_filter_enabled",
474    "persist_stats_budget_bytes",
475    "persist_stats_untrimmable_columns_equals",
476    "persist_stats_untrimmable_columns_prefix",
477    "persist_stats_untrimmable_columns_suffix",
478    "persist_expression_cache_force_compaction_fuel",
479    "persist_expression_cache_force_compaction_wait",
480    "persist_blob_cache_mem_limit_bytes",
481    "persist_blob_cache_scale_factor_bytes",
482    "persist_claim_unclaimed_compactions",
483    "persist_claim_compaction_percent",
484    "persist_claim_compaction_min_version",
485    "persist_next_listen_batch_retryer_multiplier",
486    "persist_rollup_threshold",
487    "persist_rollup_fallback_threshold_ms",
488    "persist_gc_fallback_threshold_ms",
489    "persist_compaction_minimum_timeout",
490    "persist_compaction_use_most_recent_schema",
491    "persist_compaction_check_process_flag",
492    "balancerd_sigterm_connection_wait",
493    "balancerd_sigterm_listen_wait",
494    "balancerd_inject_proxy_protocol_header_http",
495    "balancerd_log_filter",
496    "balancerd_opentelemetry_filter",
497    "balancerd_log_filter_defaults",
498    "balancerd_opentelemetry_filter_defaults",
499    "balancerd_sentry_filters",
500    "persist_enable_s3_lgalloc_cc_sizes",
501    "persist_enable_arrow_lgalloc_cc_sizes",
502    "controller_past_generation_replica_cleanup_retry_interval",
503    "wallclock_lag_recording_interval",
504    "enable_wallclock_lag_histogram_collection",
505    "wallclock_lag_histogram_period_interval",
506    "enable_timely_zero_copy",
507    "enable_timely_zero_copy_lgalloc",
508    "timely_zero_copy_limit",
509    "arrangement_exert_proportionality",
510    "txn_wal_apply_ensure_schema_match",
511    "persist_txns_data_shard_retryer_initial_backoff",
512    "persist_txns_data_shard_retryer_multiplier",
513    "persist_txns_data_shard_retryer_clamp",
514    "storage_cluster_shutdown_grace_period",
515    "storage_dataflow_delay_sources_past_rehydration",
516    "storage_dataflow_suspendable_sources",
517    "storage_downgrade_since_during_finalization",
518    "replica_metrics_history_retention_interval",
519    "wallclock_lag_history_retention_interval",
520    "wallclock_global_lag_histogram_retention_interval",
521    "kafka_client_id_enrichment_rules",
522    "kafka_poll_max_wait",
523    "kafka_default_aws_privatelink_endpoint_identification_algorithm",
524    "kafka_buffered_event_resize_threshold_elements",
525    "mysql_replication_heartbeat_interval",
526    "postgres_fetch_slot_resume_lsn_interval",
527    "pg_schema_validation_interval",
528    "storage_enforce_external_addresses",
529    "storage_upsert_prevent_snapshot_buffering",
530    "storage_rocksdb_use_merge_operator",
531    "storage_upsert_max_snapshot_batch_buffering",
532    "storage_rocksdb_cleanup_tries",
533    "storage_suspend_and_restart_delay",
534    "storage_sink_snapshot_frontier",
535    "storage_server_maintenance_interval",
536    "storage_sink_progress_search",
537    "storage_sink_ensure_topic_config",
538    "sql_server_snapshot_max_lsn_wait",
539    "sql_server_snapshot_progress_report_interval",
540    "sql_server_cdc_poll_interval",
541    "sql_server_cdc_cleanup_change_table",
542    "sql_server_cdc_cleanup_change_table_max_deletes",
543    "sql_server_offset_known_interval",
544    "allow_user_sessions",
545    "with_0dt_deployment_ddl_check_interval",
546    "enable_0dt_caught_up_check",
547    "with_0dt_caught_up_check_allowed_lag",
548    "with_0dt_caught_up_check_cutoff",
549    "plan_insights_notice_fast_path_clusters_optimize_duration",
550    "enable_continual_task_builtins",
551    "enable_expression_cache",
552    "enable_password_auth",
553    "mz_metrics_lgalloc_map_refresh_interval",
554    "mz_metrics_lgalloc_refresh_interval",
555    "mz_metrics_rusage_refresh_interval",
556    "compute_peek_response_stash_batch_max_runs",
557    "compute_peek_response_stash_read_batch_size_bytes",
558    "compute_peek_response_stash_read_memory_budget_bytes",
559    "compute_peek_stash_num_batches",
560    "compute_peek_stash_batch_size",
561    "enable_timely_init_at_process_startup",
562    "persist_enable_incremental_compaction",
563    "storage_statistics_retention_duration",
564]
565
566
567DEFAULT_CRDB_ENVIRONMENT = [
568    "COCKROACH_ENGINE_MAX_SYNC_DURATION_DEFAULT=120s",
569    "COCKROACH_LOG_MAX_SYNC_DURATION=120s",
570]
571
572
573# TODO(benesch): change to `docker-mzcompose` once v0.39 ships.
574DEFAULT_CLOUD_PROVIDER = "mzcompose"
575DEFAULT_CLOUD_REGION = "us-east-1"
576DEFAULT_ORG_ID = "00000000-0000-0000-0000-000000000000"
577DEFAULT_ORDINAL = "0"
578DEFAULT_MZ_ENVIRONMENT_ID = f"{DEFAULT_CLOUD_PROVIDER}-{DEFAULT_CLOUD_REGION}-{DEFAULT_ORG_ID}-{DEFAULT_ORDINAL}"
579
580
581# TODO(benesch): replace with Docker health checks.
582def _check_tcp(
583    cmd: list[str], host: str, port: int, timeout_secs: int, kind: str = ""
584) -> list[str]:
585    cmd.extend(
586        [
587            "timeout",
588            str(timeout_secs),
589            "bash",
590            "-c",
591            f"until [ cat < /dev/null > /dev/tcp/{host}/{port} ] ; do sleep 0.1 ; done",
592        ]
593    )
594    try:
595        spawn.capture(cmd, stderr=subprocess.STDOUT)
596    except subprocess.CalledProcessError as e:
597        ui.log_in_automation(
598            "wait-for-tcp ({}{}:{}): error running {}: {}, stdout:\n{}\nstderr:\n{}".format(
599                kind, host, port, ui.shell_quote(cmd), e, e.stdout, e.stderr
600            )
601        )
602        raise
603    return cmd
604
605
606# TODO(benesch): replace with Docker health checks.
607def _wait_for_pg(
608    timeout_secs: int,
609    query: str,
610    dbname: str,
611    port: int,
612    host: str,
613    user: str,
614    password: str | None,
615    expected: Iterable[Any] | Literal["any"],
616    print_result: bool = False,
617    sslmode: str = "disable",
618) -> None:
619    """Wait for a pg-compatible database (includes materialized)"""
620    obfuscated_password = password[0:1] if password is not None else ""
621    args = f"dbname={dbname} host={host} port={port} user={user} password='{obfuscated_password}...'"
622    ui.progress(f"waiting for {args} to handle {query!r}", "C")
623    error = None
624    for remaining in ui.timeout_loop(timeout_secs, tick=0.5):
625        try:
626            conn = psycopg.connect(
627                dbname=dbname,
628                host=host,
629                port=port,
630                user=user,
631                password=password,
632                connect_timeout=1,
633                sslmode=sslmode,
634            )
635            # The default (autocommit = false) wraps everything in a transaction.
636            conn.autocommit = True
637            with conn.cursor() as cur:
638                cur.execute(query.encode())
639                if expected == "any" and cur.rowcount == -1:
640                    ui.progress(" success!", finish=True)
641                    return
642                result = list(cur.fetchall())
643                if expected == "any" or result == expected:
644                    if print_result:
645                        say(f"query result: {result}")
646                    else:
647                        ui.progress(" success!", finish=True)
648                    return
649                else:
650                    say(
651                        f"host={host} port={port} did not return rows matching {expected} got: {result}"
652                    )
653        except Exception as e:
654            ui.progress(f"{e if print_result else ''} {int(remaining)}")
655            error = e
656    ui.progress(finish=True)
657    raise UIError(f"never got correct result for {args}: {error}")
658
659
660def bootstrap_cluster_replica_size() -> str:
661    return "bootstrap"
662
663
664def cluster_replica_size_map() -> dict[str, dict[str, Any]]:
665    def replica_size(
666        workers: int,
667        scale: int,
668        disabled: bool = False,
669        is_cc: bool = True,
670        memory_limit: str | None = None,
671    ) -> dict[str, Any]:
672        return {
673            "cpu_exclusive": False,
674            "cpu_limit": None,
675            "credits_per_hour": f"{workers * scale}",
676            "disabled": disabled,
677            "disk_limit": None,
678            "is_cc": is_cc,
679            "memory_limit": memory_limit or "4Gi",
680            "scale": scale,
681            "workers": workers,
682            # "selectors": {},
683        }
684
685    replica_sizes = {
686        bootstrap_cluster_replica_size(): replica_size(1, 1),
687        "2-4": replica_size(4, 2),
688        "free": replica_size(0, 0, disabled=True),
689        "1cc": replica_size(1, 1),
690        "1C": replica_size(1, 1),
691        "1-no-disk": replica_size(1, 1, is_cc=False),
692        "2-no-disk": replica_size(2, 1, is_cc=False),
693    }
694
695    for i in range(0, 6):
696        workers = 1 << i
697        replica_sizes[f"{workers}"] = replica_size(workers, 1)
698        for mem in [4, 8, 16, 32]:
699            replica_sizes[f"{workers}-{mem}G"] = replica_size(
700                workers, 1, memory_limit=f"{mem} GiB"
701            )
702
703        replica_sizes[f"{workers}-1"] = replica_size(1, workers)
704        replica_sizes[f"{workers}-{workers}"] = replica_size(workers, workers)
705        replica_sizes[f"mem-{workers}"] = replica_size(
706            workers, 1, memory_limit=f"{workers} GiB"
707        )
708
709    return replica_sizes
def say(msg: str) -> None:
55    def say(msg: str) -> None:
56        if not Verbosity.quiet:
57            print(f"{prefix}{msg}", file=sys.stderr)
DEFAULT_CONFLUENT_PLATFORM_VERSION = '7.9.0'
DEFAULT_MZ_VOLUMES = ['mzdata:/mzdata', 'mydata:/var/lib/mysql-files', 'tmp:/share/tmp', 'scratch:/scratch']
ADDITIONAL_BENCHMARKING_SYSTEM_PARAMETERS = {'enable_statement_lifecycle_logging': 'false', 'persist_catalog_force_compaction_fuel': '0', 'statement_logging_default_sample_rate': '0', 'statement_logging_max_sample_rate': '0', 'persist_blob_cache_mem_limit_bytes': '1048576', 'persist_blob_cache_scale_with_threads': 'false', 'enable_compute_peek_response_stash': 'false'}
def get_minimal_system_parameters( version: materialize.mz_version.MzVersion, zero_downtime: bool = False) -> dict[str, str]:
 67def get_minimal_system_parameters(
 68    version: MzVersion,
 69    zero_downtime: bool = False,
 70) -> dict[str, str]:
 71    """Settings we need in order to have tests run at all, but otherwise stay
 72    with the defaults: not changing performance or increasing coverage."""
 73
 74    return {
 75        # -----
 76        # Unsafe functions
 77        "unsafe_enable_unsafe_functions": "true",
 78        # -----
 79        # Others (ordered by name)
 80        "allow_real_time_recency": "true",
 81        "constraint_based_timestamp_selection": "verify",
 82        "enable_compute_peek_response_stash": "true",
 83        "enable_0dt_deployment": "true" if zero_downtime else "false",
 84        "enable_0dt_deployment_panic_after_timeout": "true",
 85        "enable_0dt_deployment_sources": (
 86            "true" if version >= MzVersion.parse_mz("v0.132.0-dev") else "false"
 87        ),
 88        "enable_alter_swap": "true",
 89        "enable_columnation_lgalloc": "true",
 90        "enable_compute_active_dataflow_cancelation": "true",
 91        "enable_compute_correction_v2": "true",
 92        "enable_compute_logical_backpressure": "true",
 93        "enable_connection_validation_syntax": "true",
 94        "enable_continual_task_create": "true",
 95        "enable_continual_task_retain": "true",
 96        "enable_continual_task_transform": "true",
 97        "enable_copy_to_expr": "true",
 98        "enable_create_table_from_source": "true",
 99        "enable_disk_cluster_replicas": "true",
100        "enable_eager_delta_joins": "true",
101        "enable_envelope_debezium_in_subscribe": "true",
102        "enable_expressions_in_limit_syntax": "true",
103        "enable_introspection_subscribes": "true",
104        "enable_kafka_sink_partition_by": "true",
105        "enable_logical_compaction_window": "true",
106        "enable_multi_worker_storage_persist_sink": "true",
107        "enable_multi_replica_sources": "true",
108        "enable_rbac_checks": "true",
109        "enable_reduce_mfp_fusion": "true",
110        "enable_refresh_every_mvs": "true",
111        "enable_cluster_schedule_refresh": "true",
112        "enable_statement_lifecycle_logging": "true",
113        "enable_compute_temporal_bucketing": "false",
114        "enable_variadic_left_join_lowering": "true",
115        "enable_worker_core_affinity": "true",
116        "grpc_client_http2_keep_alive_timeout": "5s",
117        "ore_overflowing_behavior": "panic",
118        "persist_stats_audit_percent": "100",
119        "unsafe_enable_table_keys": "true",
120        "with_0dt_deployment_max_wait": "1800s",
121        # End of list (ordered by name)
122    }

Settings we need in order to have tests run at all, but otherwise stay with the defaults: not changing performance or increasing coverage.

@dataclass
class VariableSystemParameter:
125@dataclass
126class VariableSystemParameter:
127    key: str
128    default: str
129    values: list[str]
VariableSystemParameter(key: str, default: str, values: list[str])
key: str
default: str
values: list[str]
def get_variable_system_parameters( version: materialize.mz_version.MzVersion, zero_downtime: bool, force_source_table_syntax: bool) -> list[VariableSystemParameter]:
133def get_variable_system_parameters(
134    version: MzVersion,
135    zero_downtime: bool,
136    force_source_table_syntax: bool,
137) -> list[VariableSystemParameter]:
138    return [
139        # -----
140        # To reduce CRDB load as we are struggling with it in CI (values based on load test environment):
141        VariableSystemParameter(
142            "persist_next_listen_batch_retryer_clamp",
143            "16s",
144            ["100ms", "1s", "10s", "100s"],
145        ),
146        VariableSystemParameter(
147            "persist_next_listen_batch_retryer_initial_backoff",
148            "100ms",
149            ["10ms", "100ms", "1s", "10s"],
150        ),
151        VariableSystemParameter(
152            "persist_next_listen_batch_retryer_fixed_sleep",
153            "1200ms",
154            ["100ms", "1s", "10s"],
155        ),
156        # -----
157        # Persist internals changes, advance coverage
158        VariableSystemParameter(
159            "persist_enable_arrow_lgalloc_noncc_sizes", "true", ["true", "false"]
160        ),
161        VariableSystemParameter(
162            "persist_enable_s3_lgalloc_noncc_sizes", "true", ["true", "false"]
163        ),
164        # -----
165        # Others (ordered by name),
166        VariableSystemParameter("cluster_always_use_disk", "true", ["true", "false"]),
167        VariableSystemParameter(
168            "compute_dataflow_max_inflight_bytes",
169            "134217728",
170            ["1048576", "4194304", "16777216", "67108864"],
171        ),  # 128 MiB
172        VariableSystemParameter("compute_hydration_concurrency", "2", ["1", "2", "4"]),
173        VariableSystemParameter(
174            "compute_replica_expiration_offset", "3d", ["3d", "10d"]
175        ),
176        VariableSystemParameter(
177            "compute_apply_column_demands", "true", ["true", "false"]
178        ),
179        VariableSystemParameter(
180            "compute_peek_response_stash_threshold_bytes",
181            # 1 MiB, an in-between value
182            "1048576",
183            # force-enabled, the in-between, and the production value
184            ["0", "1048576", "314572800", "67108864"],
185        ),
186        VariableSystemParameter(
187            "disk_cluster_replicas_default", "true", ["true", "false"]
188        ),
189        VariableSystemParameter(
190            "kafka_default_metadata_fetch_interval",
191            "1s",
192            ["100ms", "1s"],
193        ),
194        VariableSystemParameter("mysql_offset_known_interval", "1s", ["100ms", "1s"]),
195        VariableSystemParameter(
196            "force_source_table_syntax",
197            "true" if force_source_table_syntax else "false",
198            ["true", "false"] if force_source_table_syntax else ["false"],
199        ),
200        VariableSystemParameter(
201            "persist_batch_columnar_format",
202            "structured" if version > MzVersion.parse_mz("v0.135.0-dev") else "both_v2",
203            ["row", "both_v2", "both", "structured"],
204        ),
205        VariableSystemParameter(
206            "persist_batch_delete_enabled", "true", ["true", "false"]
207        ),
208        VariableSystemParameter(
209            "persist_batch_structured_order", "true", ["true", "false"]
210        ),
211        VariableSystemParameter(
212            "persist_batch_builder_structured", "true", ["true", "false"]
213        ),
214        VariableSystemParameter(
215            "persist_batch_structured_key_lower_len",
216            "256",
217            ["0", "1", "512", "1000"],
218        ),
219        VariableSystemParameter(
220            "persist_batch_max_run_len", "4", ["2", "3", "4", "16"]
221        ),
222        VariableSystemParameter(
223            "persist_catalog_force_compaction_fuel",
224            "1024",
225            ["256", "1024", "4096"],
226        ),
227        VariableSystemParameter(
228            "persist_catalog_force_compaction_wait",
229            "1s",
230            ["100ms", "1s", "10s"],
231        ),
232        VariableSystemParameter(
233            "persist_encoding_enable_dictionary", "true", ["true", "false"]
234        ),
235        VariableSystemParameter(
236            "persist_fast_path_limit",
237            "1000",
238            ["100", "1000", "10000"],
239        ),
240        VariableSystemParameter("persist_fast_path_order", "true", ["true", "false"]),
241        VariableSystemParameter(
242            "persist_gc_use_active_gc",
243            ("true" if version > MzVersion.parse_mz("v0.143.0-dev") else "false"),
244            (
245                ["true", "false"]
246                if version > MzVersion.parse_mz("v0.127.0-dev")
247                else ["false"]
248            ),
249        ),
250        VariableSystemParameter(
251            "persist_gc_min_versions",
252            "16",
253            ["16", "256", "1024"],
254        ),
255        VariableSystemParameter(
256            "persist_gc_max_versions",
257            "128000",
258            ["256", "128000"],
259        ),
260        VariableSystemParameter(
261            "persist_inline_writes_single_max_bytes",
262            "4096",
263            ["256", "1024", "4096", "16384"],
264        ),
265        VariableSystemParameter(
266            "persist_inline_writes_total_max_bytes",
267            "1048576",
268            ["65536", "262144", "1048576", "4194304"],
269        ),
270        VariableSystemParameter(
271            "persist_pubsub_client_enabled", "true", ["true", "false"]
272        ),
273        VariableSystemParameter(
274            "persist_pubsub_push_diff_enabled", "true", ["true", "false"]
275        ),
276        VariableSystemParameter(
277            "persist_record_compactions", "true", ["true", "false"]
278        ),
279        VariableSystemParameter(
280            "persist_record_schema_id",
281            ("true" if version > MzVersion.parse_mz("v0.127.0-dev") else "false"),
282            (
283                ["true", "false"]
284                if version > MzVersion.parse_mz("v0.127.0-dev")
285                else ["false"]
286            ),
287        ),
288        VariableSystemParameter(
289            "persist_rollup_use_active_rollup",
290            ("true" if version > MzVersion.parse_mz("v0.143.0-dev") else "false"),
291            (
292                ["true", "false"]
293                if version > MzVersion.parse_mz("v0.127.0-dev")
294                else ["false"]
295            ),
296        ),
297        # 16 MiB - large enough to avoid a big perf hit, small enough to get more coverage...
298        VariableSystemParameter(
299            "persist_blob_target_size",
300            "16777216",
301            ["4096", "1048576", "16777216", "134217728"],
302        ),
303        # 5 times the default part size - 4 is the bare minimum.
304        VariableSystemParameter(
305            "persist_compaction_memory_bound_bytes",
306            "83886080",
307            ["67108864", "134217728", "536870912", "1073741824"],
308        ),
309        VariableSystemParameter(
310            "persist_use_critical_since_catalog", "true", ["true", "false"]
311        ),
312        VariableSystemParameter(
313            "persist_use_critical_since_snapshot",
314            "false" if zero_downtime else "true",
315            ["false"] if zero_downtime else ["true", "false"],
316        ),
317        VariableSystemParameter(
318            "persist_use_critical_since_source",
319            "false" if zero_downtime else "true",
320            ["false"] if zero_downtime else ["true", "false"],
321        ),
322        VariableSystemParameter(
323            "persist_part_decode_format", "arrow", ["arrow", "row_with_validate"]
324        ),
325        VariableSystemParameter(
326            "persist_blob_cache_scale_with_threads", "true", ["true", "false"]
327        ),
328        VariableSystemParameter(
329            "persist_validate_part_bounds_on_read", "true", ["true", "false"]
330        ),
331        VariableSystemParameter(
332            "persist_validate_part_bounds_on_write", "true", ["true", "false"]
333        ),
334        VariableSystemParameter("pg_offset_known_interval", "1s", ["100ms", "1s"]),
335        VariableSystemParameter(
336            "statement_logging_default_sample_rate", "0.01", ["0", "0.01"]
337        ),
338        VariableSystemParameter(
339            "statement_logging_max_sample_rate", "0.01", ["0", "0.01"]
340        ),
341        VariableSystemParameter("storage_reclock_to_latest", "true", ["true", "false"]),
342        VariableSystemParameter(
343            "storage_source_decode_fuel",
344            "100000",
345            ["10000", "100000", "1000000"],
346        ),
347        VariableSystemParameter(
348            "storage_statistics_collection_interval",
349            "1000",
350            ["100", "1000", "10000"],
351        ),
352        VariableSystemParameter(
353            "storage_statistics_interval", "2000", ["100", "1000", "10000"]
354        ),
355        VariableSystemParameter(
356            "storage_use_continual_feedback_upsert", "true", ["true", "false"]
357        ),
358        # End of list (ordered by name)
359    ]
def get_default_system_parameters( version: materialize.mz_version.MzVersion | None = None, zero_downtime: bool = False, force_source_table_syntax: bool = False) -> dict[str, str]:
362def get_default_system_parameters(
363    version: MzVersion | None = None,
364    zero_downtime: bool = False,
365    force_source_table_syntax: bool = False,
366) -> dict[str, str]:
367    """For upgrade tests we only want parameters set when all environmentd /
368    clusterd processes have reached a specific version (or higher)
369    """
370
371    if not version:
372        version = MzVersion.parse_cargo()
373
374    params = get_minimal_system_parameters(version, zero_downtime)
375
376    system_param_setting = os.getenv("CI_SYSTEM_PARAMETERS", "")
377    variable_params = get_variable_system_parameters(
378        version, zero_downtime, force_source_table_syntax
379    )
380
381    if system_param_setting == "":
382        for param in variable_params:
383            params[param.key] = param.default
384    elif system_param_setting == "random":
385        seed = os.getenv("CI_SYSTEM_PARAMETERS_SEED", os.getenv("BUILDKITE_JOB_ID", 1))
386        rng = random.Random(seed)
387        for param in variable_params:
388            params[param.key] = rng.choice(param.values)
389        print(
390            f"System parameters with seed CI_SYSTEM_PARAMETERS_SEED={seed}: {params}",
391            file=sys.stderr,
392        )
393    elif system_param_setting == "minimal":
394        pass
395    else:
396        raise ValueError(
397            f"Unknown value for CI_SYSTEM_PARAMETERS: {system_param_setting}"
398        )
399
400    return params

For upgrade tests we only want parameters set when all environmentd / clusterd processes have reached a specific version (or higher)

UNINTERESTING_SYSTEM_PARAMETERS = ['enable_mz_join_core', 'enable_compute_mv_append_smearing', 'linear_join_yielding', 'enable_lgalloc', 'enable_lgalloc_eager_reclamation', 'lgalloc_background_interval', 'lgalloc_file_growth_dampener', 'lgalloc_local_buffer_bytes', 'lgalloc_slow_clear_bytes', 'lgalloc_limiter_interval', 'lgalloc_limiter_usage_factor', 'lgalloc_limiter_usage_bias', 'lgalloc_limiter_burst_factor', 'memory_limiter_interval', 'memory_limiter_usage_factor', 'memory_limiter_usage_bias', 'memory_limiter_burst_factor', 'enable_columnar_lgalloc', 'compute_server_maintenance_interval', 'compute_dataflow_max_inflight_bytes_cc', 'compute_flat_map_fuel', 'consolidating_vec_growth_dampener', 'copy_to_s3_parquet_row_group_file_ratio', 'copy_to_s3_arrow_builder_buffer_ratio', 'copy_to_s3_multipart_part_size_bytes', 'enable_compute_replica_expiration', 'enable_compute_render_fueled_as_specific_collection', 'compute_logical_backpressure_max_retained_capabilities', 'compute_logical_backpressure_inflight_slack', 'persist_fetch_semaphore_cost_adjustment', 'persist_fetch_semaphore_permit_adjustment', 'persist_optimize_ignored_data_fetch', 'persist_pubsub_same_process_delegate_enabled', 'persist_pubsub_connect_attempt_timeout', 'persist_pubsub_request_timeout', 'persist_pubsub_connect_max_backoff', 'persist_pubsub_client_sender_channel_size', 'persist_pubsub_client_receiver_channel_size', 'persist_pubsub_server_connection_channel_size', 'persist_pubsub_state_cache_shard_ref_channel_size', 'persist_pubsub_reconnect_backoff', 'persist_encoding_compression_format', 'persist_batch_max_runs', 'persist_write_combine_inline_writes', 'persist_reader_lease_duration', 'persist_consensus_connection_pool_max_size', 'persist_consensus_connection_pool_max_wait', 'persist_consensus_connection_pool_ttl', 'persist_consensus_connection_pool_ttl_stagger', 'crdb_connect_timeout', 'crdb_tcp_user_timeout', 'persist_use_critical_since_txn', 'use_global_txn_cache_source', 'persist_batch_builder_max_outstanding_parts', 'persist_compaction_heuristic_min_inputs', 'persist_compaction_heuristic_min_parts', 'persist_compaction_heuristic_min_updates', 'persist_gc_blob_delete_concurrency_limit', 'persist_state_versions_recent_live_diffs_limit', 'persist_usage_state_fetch_concurrency_limit', 'persist_blob_operation_timeout', 'persist_blob_operation_attempt_timeout', 'persist_blob_connect_timeout', 'persist_blob_read_timeout', 'persist_stats_collection_enabled', 'persist_stats_filter_enabled', 'persist_stats_budget_bytes', 'persist_stats_untrimmable_columns_equals', 'persist_stats_untrimmable_columns_prefix', 'persist_stats_untrimmable_columns_suffix', 'persist_expression_cache_force_compaction_fuel', 'persist_expression_cache_force_compaction_wait', 'persist_blob_cache_mem_limit_bytes', 'persist_blob_cache_scale_factor_bytes', 'persist_claim_unclaimed_compactions', 'persist_claim_compaction_percent', 'persist_claim_compaction_min_version', 'persist_next_listen_batch_retryer_multiplier', 'persist_rollup_threshold', 'persist_rollup_fallback_threshold_ms', 'persist_gc_fallback_threshold_ms', 'persist_compaction_minimum_timeout', 'persist_compaction_use_most_recent_schema', 'persist_compaction_check_process_flag', 'balancerd_sigterm_connection_wait', 'balancerd_sigterm_listen_wait', 'balancerd_inject_proxy_protocol_header_http', 'balancerd_log_filter', 'balancerd_opentelemetry_filter', 'balancerd_log_filter_defaults', 'balancerd_opentelemetry_filter_defaults', 'balancerd_sentry_filters', 'persist_enable_s3_lgalloc_cc_sizes', 'persist_enable_arrow_lgalloc_cc_sizes', 'controller_past_generation_replica_cleanup_retry_interval', 'wallclock_lag_recording_interval', 'enable_wallclock_lag_histogram_collection', 'wallclock_lag_histogram_period_interval', 'enable_timely_zero_copy', 'enable_timely_zero_copy_lgalloc', 'timely_zero_copy_limit', 'arrangement_exert_proportionality', 'txn_wal_apply_ensure_schema_match', 'persist_txns_data_shard_retryer_initial_backoff', 'persist_txns_data_shard_retryer_multiplier', 'persist_txns_data_shard_retryer_clamp', 'storage_cluster_shutdown_grace_period', 'storage_dataflow_delay_sources_past_rehydration', 'storage_dataflow_suspendable_sources', 'storage_downgrade_since_during_finalization', 'replica_metrics_history_retention_interval', 'wallclock_lag_history_retention_interval', 'wallclock_global_lag_histogram_retention_interval', 'kafka_client_id_enrichment_rules', 'kafka_poll_max_wait', 'kafka_default_aws_privatelink_endpoint_identification_algorithm', 'kafka_buffered_event_resize_threshold_elements', 'mysql_replication_heartbeat_interval', 'postgres_fetch_slot_resume_lsn_interval', 'pg_schema_validation_interval', 'storage_enforce_external_addresses', 'storage_upsert_prevent_snapshot_buffering', 'storage_rocksdb_use_merge_operator', 'storage_upsert_max_snapshot_batch_buffering', 'storage_rocksdb_cleanup_tries', 'storage_suspend_and_restart_delay', 'storage_sink_snapshot_frontier', 'storage_server_maintenance_interval', 'storage_sink_progress_search', 'storage_sink_ensure_topic_config', 'sql_server_snapshot_max_lsn_wait', 'sql_server_snapshot_progress_report_interval', 'sql_server_cdc_poll_interval', 'sql_server_cdc_cleanup_change_table', 'sql_server_cdc_cleanup_change_table_max_deletes', 'sql_server_offset_known_interval', 'allow_user_sessions', 'with_0dt_deployment_ddl_check_interval', 'enable_0dt_caught_up_check', 'with_0dt_caught_up_check_allowed_lag', 'with_0dt_caught_up_check_cutoff', 'plan_insights_notice_fast_path_clusters_optimize_duration', 'enable_continual_task_builtins', 'enable_expression_cache', 'enable_password_auth', 'mz_metrics_lgalloc_map_refresh_interval', 'mz_metrics_lgalloc_refresh_interval', 'mz_metrics_rusage_refresh_interval', 'compute_peek_response_stash_batch_max_runs', 'compute_peek_response_stash_read_batch_size_bytes', 'compute_peek_response_stash_read_memory_budget_bytes', 'compute_peek_stash_num_batches', 'compute_peek_stash_batch_size', 'enable_timely_init_at_process_startup', 'persist_enable_incremental_compaction', 'storage_statistics_retention_duration']
DEFAULT_CRDB_ENVIRONMENT = ['COCKROACH_ENGINE_MAX_SYNC_DURATION_DEFAULT=120s', 'COCKROACH_LOG_MAX_SYNC_DURATION=120s']
DEFAULT_CLOUD_PROVIDER = 'mzcompose'
DEFAULT_CLOUD_REGION = 'us-east-1'
DEFAULT_ORG_ID = '00000000-0000-0000-0000-000000000000'
DEFAULT_ORDINAL = '0'
DEFAULT_MZ_ENVIRONMENT_ID = 'mzcompose-us-east-1-00000000-0000-0000-0000-000000000000-0'
def bootstrap_cluster_replica_size() -> str:
661def bootstrap_cluster_replica_size() -> str:
662    return "bootstrap"
def cluster_replica_size_map() -> dict[str, dict[str, typing.Any]]:
665def cluster_replica_size_map() -> dict[str, dict[str, Any]]:
666    def replica_size(
667        workers: int,
668        scale: int,
669        disabled: bool = False,
670        is_cc: bool = True,
671        memory_limit: str | None = None,
672    ) -> dict[str, Any]:
673        return {
674            "cpu_exclusive": False,
675            "cpu_limit": None,
676            "credits_per_hour": f"{workers * scale}",
677            "disabled": disabled,
678            "disk_limit": None,
679            "is_cc": is_cc,
680            "memory_limit": memory_limit or "4Gi",
681            "scale": scale,
682            "workers": workers,
683            # "selectors": {},
684        }
685
686    replica_sizes = {
687        bootstrap_cluster_replica_size(): replica_size(1, 1),
688        "2-4": replica_size(4, 2),
689        "free": replica_size(0, 0, disabled=True),
690        "1cc": replica_size(1, 1),
691        "1C": replica_size(1, 1),
692        "1-no-disk": replica_size(1, 1, is_cc=False),
693        "2-no-disk": replica_size(2, 1, is_cc=False),
694    }
695
696    for i in range(0, 6):
697        workers = 1 << i
698        replica_sizes[f"{workers}"] = replica_size(workers, 1)
699        for mem in [4, 8, 16, 32]:
700            replica_sizes[f"{workers}-{mem}G"] = replica_size(
701                workers, 1, memory_limit=f"{mem} GiB"
702            )
703
704        replica_sizes[f"{workers}-1"] = replica_size(1, workers)
705        replica_sizes[f"{workers}-{workers}"] = replica_size(workers, workers)
706        replica_sizes[f"mem-{workers}"] = replica_size(
707            workers, 1, memory_limit=f"{workers} GiB"
708        )
709
710    return replica_sizes