misc.python.materialize.mzcompose
The implementation of the mzcompose system for Docker compositions.
For an overview of what mzcompose is and why it exists, see the user-facing documentation.
1# Copyright Materialize, Inc. and contributors. All rights reserved. 2# 3# Use of this software is governed by the Business Source License 4# included in the LICENSE file at the root of this repository. 5# 6# As of the Change Date specified in that file, in accordance with 7# the Business Source License, use of this software will be governed 8# by the Apache License, Version 2.0. 9 10"""The implementation of the mzcompose system for Docker compositions. 11 12For an overview of what mzcompose is and why it exists, see the [user-facing 13documentation][user-docs]. 14 15[user-docs]: https://github.com/MaterializeInc/materialize/blob/main/doc/developer/mzbuild.md 16""" 17 18import os 19import random 20import subprocess 21import sys 22from collections.abc import Iterable 23from dataclasses import dataclass 24from typing import Any, Literal, TypeVar 25 26import psycopg 27 28from materialize import spawn, ui 29from materialize.mz_version import MzVersion 30from materialize.ui import UIError 31 32T = TypeVar("T") 33say = ui.speaker("C> ") 34 35 36DEFAULT_CONFLUENT_PLATFORM_VERSION = "7.9.0" 37 38DEFAULT_MZ_VOLUMES = [ 39 "mzdata:/mzdata", 40 "mydata:/var/lib/mysql-files", 41 "tmp:/share/tmp", 42 "scratch:/scratch", 43] 44 45 46# Parameters which disable systems that periodically/unpredictably impact performance 47ADDITIONAL_BENCHMARKING_SYSTEM_PARAMETERS = { 48 "enable_statement_lifecycle_logging": "false", 49 "persist_catalog_force_compaction_fuel": "0", 50 "statement_logging_default_sample_rate": "0", 51 "statement_logging_max_sample_rate": "0", 52 # Default of 128 MB increases memory usage by a lot for some small 53 # performance in benchmarks, see for example FastPathLimit scenario: 55% 54 # more memory, 5% faster 55 "persist_blob_cache_mem_limit_bytes": "1048576", 56 # This would increase the memory usage of many tests, making it harder to 57 # tell small memory increase regressions 58 "persist_blob_cache_scale_with_threads": "false", 59 # The peek response stash kicks in when results get larger, and it 60 # increases query latency. Which in turn makes benchmarking more 61 # unpredictable. 62 "enable_compute_peek_response_stash": "false", 63} 64 65 66def get_minimal_system_parameters( 67 version: MzVersion, 68 zero_downtime: bool = False, 69) -> dict[str, str]: 70 """Settings we need in order to have tests run at all, but otherwise stay 71 with the defaults: not changing performance or increasing coverage.""" 72 73 return { 74 # ----- 75 # Unsafe functions 76 "unsafe_enable_unsafe_functions": "true", 77 # ----- 78 # Others (ordered by name) 79 "allow_real_time_recency": "true", 80 "constraint_based_timestamp_selection": "verify", 81 "enable_compute_peek_response_stash": "true", 82 "enable_0dt_deployment": "true" if zero_downtime else "false", 83 "enable_0dt_deployment_panic_after_timeout": "true", 84 "enable_0dt_deployment_sources": ( 85 "true" if version >= MzVersion.parse_mz("v0.132.0-dev") else "false" 86 ), 87 "enable_alter_swap": "true", 88 "enable_columnation_lgalloc": "true", 89 "enable_compute_active_dataflow_cancelation": "true", 90 "enable_compute_correction_v2": "true", 91 "enable_compute_logical_backpressure": "true", 92 "enable_connection_validation_syntax": "true", 93 "enable_continual_task_create": "true", 94 "enable_continual_task_retain": "true", 95 "enable_continual_task_transform": "true", 96 "enable_copy_to_expr": "true", 97 "enable_create_table_from_source": "true", 98 "enable_disk_cluster_replicas": "true", 99 "enable_eager_delta_joins": "true", 100 "enable_envelope_debezium_in_subscribe": "true", 101 "enable_expressions_in_limit_syntax": "true", 102 "enable_introspection_subscribes": "true", 103 "enable_kafka_sink_partition_by": "true", 104 "enable_logical_compaction_window": "true", 105 "enable_multi_worker_storage_persist_sink": "true", 106 "enable_multi_replica_sources": "true", 107 "enable_rbac_checks": "true", 108 "enable_reduce_mfp_fusion": "true", 109 "enable_refresh_every_mvs": "true", 110 "enable_cluster_schedule_refresh": "true", 111 "enable_statement_lifecycle_logging": "true", 112 "enable_compute_temporal_bucketing": "false", 113 "enable_variadic_left_join_lowering": "true", 114 "enable_worker_core_affinity": "true", 115 "grpc_client_http2_keep_alive_timeout": "5s", 116 "ore_overflowing_behavior": "panic", 117 "persist_stats_audit_percent": "100", 118 "unsafe_enable_table_keys": "true", 119 "with_0dt_deployment_max_wait": "1800s", 120 # End of list (ordered by name) 121 } 122 123 124@dataclass 125class VariableSystemParameter: 126 key: str 127 default: str 128 values: list[str] 129 130 131# TODO: The linter should check this too 132def get_variable_system_parameters( 133 version: MzVersion, 134 zero_downtime: bool, 135 force_source_table_syntax: bool, 136) -> list[VariableSystemParameter]: 137 return [ 138 # ----- 139 # To reduce CRDB load as we are struggling with it in CI (values based on load test environment): 140 VariableSystemParameter( 141 "persist_next_listen_batch_retryer_clamp", 142 "16s", 143 ["100ms", "1s", "10s", "100s"], 144 ), 145 VariableSystemParameter( 146 "persist_next_listen_batch_retryer_initial_backoff", 147 "100ms", 148 ["10ms", "100ms", "1s", "10s"], 149 ), 150 VariableSystemParameter( 151 "persist_next_listen_batch_retryer_fixed_sleep", 152 "1200ms", 153 ["100ms", "1s", "10s"], 154 ), 155 # ----- 156 # Persist internals changes, advance coverage 157 VariableSystemParameter( 158 "persist_enable_arrow_lgalloc_noncc_sizes", "true", ["true", "false"] 159 ), 160 VariableSystemParameter( 161 "persist_enable_s3_lgalloc_noncc_sizes", "true", ["true", "false"] 162 ), 163 # ----- 164 # Others (ordered by name), 165 VariableSystemParameter("cluster_always_use_disk", "true", ["true", "false"]), 166 VariableSystemParameter( 167 "compute_dataflow_max_inflight_bytes", 168 "134217728", 169 ["1048576", "4194304", "16777216", "67108864"], 170 ), # 128 MiB 171 VariableSystemParameter("compute_hydration_concurrency", "2", ["1", "2", "4"]), 172 VariableSystemParameter( 173 "compute_replica_expiration_offset", "3d", ["3d", "10d"] 174 ), 175 VariableSystemParameter( 176 "compute_apply_column_demands", "true", ["true", "false"] 177 ), 178 VariableSystemParameter( 179 "compute_peek_response_stash_threshold_bytes", 180 # 1 MiB, an in-between value 181 "1048576", 182 # force-enabled, the in-between, and the production value 183 ["0", "1048576", "314572800", "67108864"], 184 ), 185 VariableSystemParameter( 186 "disk_cluster_replicas_default", "true", ["true", "false"] 187 ), 188 VariableSystemParameter( 189 "kafka_default_metadata_fetch_interval", 190 "1s", 191 ["100ms", "1s"], 192 ), 193 VariableSystemParameter("mysql_offset_known_interval", "1s", ["100ms", "1s"]), 194 VariableSystemParameter( 195 "force_source_table_syntax", 196 "true" if force_source_table_syntax else "false", 197 ["true", "false"] if force_source_table_syntax else ["false"], 198 ), 199 VariableSystemParameter( 200 "persist_batch_columnar_format", 201 "structured" if version > MzVersion.parse_mz("v0.135.0-dev") else "both_v2", 202 ["row", "both_v2", "both", "structured"], 203 ), 204 VariableSystemParameter( 205 "persist_batch_delete_enabled", "true", ["true", "false"] 206 ), 207 VariableSystemParameter( 208 "persist_batch_structured_order", "true", ["true", "false"] 209 ), 210 VariableSystemParameter( 211 "persist_batch_builder_structured", "true", ["true", "false"] 212 ), 213 VariableSystemParameter( 214 "persist_batch_structured_key_lower_len", 215 "256", 216 ["0", "1", "512", "1000"], 217 ), 218 VariableSystemParameter( 219 "persist_batch_max_run_len", "4", ["2", "3", "4", "16"] 220 ), 221 VariableSystemParameter( 222 "persist_catalog_force_compaction_fuel", 223 "1024", 224 ["256", "1024", "4096"], 225 ), 226 VariableSystemParameter( 227 "persist_catalog_force_compaction_wait", 228 "1s", 229 ["100ms", "1s", "10s"], 230 ), 231 VariableSystemParameter( 232 "persist_encoding_enable_dictionary", "true", ["true", "false"] 233 ), 234 VariableSystemParameter( 235 "persist_fast_path_limit", 236 "1000", 237 ["100", "1000", "10000"], 238 ), 239 VariableSystemParameter("persist_fast_path_order", "true", ["true", "false"]), 240 VariableSystemParameter( 241 "persist_gc_use_active_gc", 242 ("true" if version > MzVersion.parse_mz("v0.143.0-dev") else "false"), 243 ( 244 ["true", "false"] 245 if version > MzVersion.parse_mz("v0.127.0-dev") 246 else ["false"] 247 ), 248 ), 249 VariableSystemParameter( 250 "persist_gc_min_versions", 251 "16", 252 ["16", "256", "1024"], 253 ), 254 VariableSystemParameter( 255 "persist_gc_max_versions", 256 "128000", 257 ["256", "128000"], 258 ), 259 VariableSystemParameter( 260 "persist_inline_writes_single_max_bytes", 261 "4096", 262 ["256", "1024", "4096", "16384"], 263 ), 264 VariableSystemParameter( 265 "persist_inline_writes_total_max_bytes", 266 "1048576", 267 ["65536", "262144", "1048576", "4194304"], 268 ), 269 VariableSystemParameter( 270 "persist_pubsub_client_enabled", "true", ["true", "false"] 271 ), 272 VariableSystemParameter( 273 "persist_pubsub_push_diff_enabled", "true", ["true", "false"] 274 ), 275 VariableSystemParameter( 276 "persist_record_compactions", "true", ["true", "false"] 277 ), 278 VariableSystemParameter( 279 "persist_record_schema_id", 280 ("true" if version > MzVersion.parse_mz("v0.127.0-dev") else "false"), 281 ( 282 ["true", "false"] 283 if version > MzVersion.parse_mz("v0.127.0-dev") 284 else ["false"] 285 ), 286 ), 287 VariableSystemParameter( 288 "persist_rollup_use_active_rollup", 289 ("true" if version > MzVersion.parse_mz("v0.143.0-dev") else "false"), 290 ( 291 ["true", "false"] 292 if version > MzVersion.parse_mz("v0.127.0-dev") 293 else ["false"] 294 ), 295 ), 296 # 16 MiB - large enough to avoid a big perf hit, small enough to get more coverage... 297 VariableSystemParameter( 298 "persist_blob_target_size", 299 "16777216", 300 ["4096", "1048576", "16777216", "134217728"], 301 ), 302 # 5 times the default part size - 4 is the bare minimum. 303 VariableSystemParameter( 304 "persist_compaction_memory_bound_bytes", 305 "83886080", 306 ["67108864", "134217728", "536870912", "1073741824"], 307 ), 308 VariableSystemParameter( 309 "persist_use_critical_since_catalog", "true", ["true", "false"] 310 ), 311 VariableSystemParameter( 312 "persist_use_critical_since_snapshot", 313 "false" if zero_downtime else "true", 314 ["false"] if zero_downtime else ["true", "false"], 315 ), 316 VariableSystemParameter( 317 "persist_use_critical_since_source", 318 "false" if zero_downtime else "true", 319 ["false"] if zero_downtime else ["true", "false"], 320 ), 321 VariableSystemParameter( 322 "persist_part_decode_format", "arrow", ["arrow", "row_with_validate"] 323 ), 324 VariableSystemParameter( 325 "persist_blob_cache_scale_with_threads", "true", ["true", "false"] 326 ), 327 VariableSystemParameter( 328 "persist_validate_part_bounds_on_read", "true", ["true", "false"] 329 ), 330 VariableSystemParameter( 331 "persist_validate_part_bounds_on_write", "true", ["true", "false"] 332 ), 333 VariableSystemParameter("pg_offset_known_interval", "1s", ["100ms", "1s"]), 334 VariableSystemParameter( 335 "statement_logging_default_sample_rate", "0.01", ["0", "0.01"] 336 ), 337 VariableSystemParameter( 338 "statement_logging_max_sample_rate", "0.01", ["0", "0.01"] 339 ), 340 VariableSystemParameter("storage_reclock_to_latest", "true", ["true", "false"]), 341 VariableSystemParameter( 342 "storage_source_decode_fuel", 343 "100000", 344 ["10000", "100000", "1000000"], 345 ), 346 VariableSystemParameter( 347 "storage_statistics_collection_interval", 348 "1000", 349 ["100", "1000", "10000"], 350 ), 351 VariableSystemParameter( 352 "storage_statistics_interval", "2000", ["100", "1000", "10000"] 353 ), 354 VariableSystemParameter( 355 "storage_use_continual_feedback_upsert", "true", ["true", "false"] 356 ), 357 # End of list (ordered by name) 358 ] 359 360 361def get_default_system_parameters( 362 version: MzVersion | None = None, 363 zero_downtime: bool = False, 364 force_source_table_syntax: bool = False, 365) -> dict[str, str]: 366 """For upgrade tests we only want parameters set when all environmentd / 367 clusterd processes have reached a specific version (or higher) 368 """ 369 370 if not version: 371 version = MzVersion.parse_cargo() 372 373 params = get_minimal_system_parameters(version, zero_downtime) 374 375 system_param_setting = os.getenv("CI_SYSTEM_PARAMETERS", "") 376 variable_params = get_variable_system_parameters( 377 version, zero_downtime, force_source_table_syntax 378 ) 379 380 if system_param_setting == "": 381 for param in variable_params: 382 params[param.key] = param.default 383 elif system_param_setting == "random": 384 seed = os.getenv("CI_SYSTEM_PARAMETERS_SEED", os.getenv("BUILDKITE_JOB_ID", 1)) 385 rng = random.Random(seed) 386 for param in variable_params: 387 params[param.key] = rng.choice(param.values) 388 print( 389 f"System parameters with seed CI_SYSTEM_PARAMETERS_SEED={seed}: {params}", 390 file=sys.stderr, 391 ) 392 elif system_param_setting == "minimal": 393 pass 394 else: 395 raise ValueError( 396 f"Unknown value for CI_SYSTEM_PARAMETERS: {system_param_setting}" 397 ) 398 399 return params 400 401 402# If you are adding a new config flag in Materialize, consider setting values 403# for it in get_variable_system_parameters if it can be varied in tests. Set it 404# in get_minimal_system_parameters if it's required for tests to succeed at 405# all. Only add it in UNINTERESTING_SYSTEM_PARAMETERS if none of the above 406# apply. 407UNINTERESTING_SYSTEM_PARAMETERS = [ 408 "enable_mz_join_core", 409 "enable_compute_mv_append_smearing", 410 "linear_join_yielding", 411 "enable_lgalloc", 412 "enable_lgalloc_eager_reclamation", 413 "lgalloc_background_interval", 414 "lgalloc_file_growth_dampener", 415 "lgalloc_local_buffer_bytes", 416 "lgalloc_slow_clear_bytes", 417 "lgalloc_limiter_interval", 418 "lgalloc_limiter_usage_factor", 419 "lgalloc_limiter_usage_bias", 420 "lgalloc_limiter_burst_factor", 421 "memory_limiter_interval", 422 "memory_limiter_usage_factor", 423 "memory_limiter_usage_bias", 424 "memory_limiter_burst_factor", 425 "enable_columnar_lgalloc", 426 "compute_server_maintenance_interval", 427 "compute_dataflow_max_inflight_bytes_cc", 428 "compute_flat_map_fuel", 429 "consolidating_vec_growth_dampener", 430 "copy_to_s3_parquet_row_group_file_ratio", 431 "copy_to_s3_arrow_builder_buffer_ratio", 432 "copy_to_s3_multipart_part_size_bytes", 433 "enable_compute_replica_expiration", 434 "enable_compute_render_fueled_as_specific_collection", 435 "compute_logical_backpressure_max_retained_capabilities", 436 "compute_logical_backpressure_inflight_slack", 437 "persist_fetch_semaphore_cost_adjustment", 438 "persist_fetch_semaphore_permit_adjustment", 439 "persist_optimize_ignored_data_fetch", 440 "persist_pubsub_same_process_delegate_enabled", 441 "persist_pubsub_connect_attempt_timeout", 442 "persist_pubsub_request_timeout", 443 "persist_pubsub_connect_max_backoff", 444 "persist_pubsub_client_sender_channel_size", 445 "persist_pubsub_client_receiver_channel_size", 446 "persist_pubsub_server_connection_channel_size", 447 "persist_pubsub_state_cache_shard_ref_channel_size", 448 "persist_pubsub_reconnect_backoff", 449 "persist_encoding_compression_format", 450 "persist_batch_max_runs", 451 "persist_write_combine_inline_writes", 452 "persist_reader_lease_duration", 453 "persist_consensus_connection_pool_max_size", 454 "persist_consensus_connection_pool_max_wait", 455 "persist_consensus_connection_pool_ttl", 456 "persist_consensus_connection_pool_ttl_stagger", 457 "crdb_connect_timeout", 458 "crdb_tcp_user_timeout", 459 "persist_use_critical_since_txn", 460 "use_global_txn_cache_source", 461 "persist_batch_builder_max_outstanding_parts", 462 "persist_compaction_heuristic_min_inputs", 463 "persist_compaction_heuristic_min_parts", 464 "persist_compaction_heuristic_min_updates", 465 "persist_gc_blob_delete_concurrency_limit", 466 "persist_state_versions_recent_live_diffs_limit", 467 "persist_usage_state_fetch_concurrency_limit", 468 "persist_blob_operation_timeout", 469 "persist_blob_operation_attempt_timeout", 470 "persist_blob_connect_timeout", 471 "persist_blob_read_timeout", 472 "persist_stats_collection_enabled", 473 "persist_stats_filter_enabled", 474 "persist_stats_budget_bytes", 475 "persist_stats_untrimmable_columns_equals", 476 "persist_stats_untrimmable_columns_prefix", 477 "persist_stats_untrimmable_columns_suffix", 478 "persist_expression_cache_force_compaction_fuel", 479 "persist_expression_cache_force_compaction_wait", 480 "persist_blob_cache_mem_limit_bytes", 481 "persist_blob_cache_scale_factor_bytes", 482 "persist_claim_unclaimed_compactions", 483 "persist_claim_compaction_percent", 484 "persist_claim_compaction_min_version", 485 "persist_next_listen_batch_retryer_multiplier", 486 "persist_rollup_threshold", 487 "persist_rollup_fallback_threshold_ms", 488 "persist_gc_fallback_threshold_ms", 489 "persist_compaction_minimum_timeout", 490 "persist_compaction_use_most_recent_schema", 491 "persist_compaction_check_process_flag", 492 "balancerd_sigterm_connection_wait", 493 "balancerd_sigterm_listen_wait", 494 "balancerd_inject_proxy_protocol_header_http", 495 "balancerd_log_filter", 496 "balancerd_opentelemetry_filter", 497 "balancerd_log_filter_defaults", 498 "balancerd_opentelemetry_filter_defaults", 499 "balancerd_sentry_filters", 500 "persist_enable_s3_lgalloc_cc_sizes", 501 "persist_enable_arrow_lgalloc_cc_sizes", 502 "controller_past_generation_replica_cleanup_retry_interval", 503 "wallclock_lag_recording_interval", 504 "enable_wallclock_lag_histogram_collection", 505 "wallclock_lag_histogram_period_interval", 506 "enable_timely_zero_copy", 507 "enable_timely_zero_copy_lgalloc", 508 "timely_zero_copy_limit", 509 "arrangement_exert_proportionality", 510 "txn_wal_apply_ensure_schema_match", 511 "persist_txns_data_shard_retryer_initial_backoff", 512 "persist_txns_data_shard_retryer_multiplier", 513 "persist_txns_data_shard_retryer_clamp", 514 "storage_cluster_shutdown_grace_period", 515 "storage_dataflow_delay_sources_past_rehydration", 516 "storage_dataflow_suspendable_sources", 517 "storage_downgrade_since_during_finalization", 518 "replica_metrics_history_retention_interval", 519 "wallclock_lag_history_retention_interval", 520 "wallclock_global_lag_histogram_retention_interval", 521 "kafka_client_id_enrichment_rules", 522 "kafka_poll_max_wait", 523 "kafka_default_aws_privatelink_endpoint_identification_algorithm", 524 "kafka_buffered_event_resize_threshold_elements", 525 "mysql_replication_heartbeat_interval", 526 "postgres_fetch_slot_resume_lsn_interval", 527 "pg_schema_validation_interval", 528 "storage_enforce_external_addresses", 529 "storage_upsert_prevent_snapshot_buffering", 530 "storage_rocksdb_use_merge_operator", 531 "storage_upsert_max_snapshot_batch_buffering", 532 "storage_rocksdb_cleanup_tries", 533 "storage_suspend_and_restart_delay", 534 "storage_sink_snapshot_frontier", 535 "storage_server_maintenance_interval", 536 "storage_sink_progress_search", 537 "storage_sink_ensure_topic_config", 538 "sql_server_snapshot_max_lsn_wait", 539 "sql_server_snapshot_progress_report_interval", 540 "sql_server_cdc_poll_interval", 541 "sql_server_cdc_cleanup_change_table", 542 "sql_server_cdc_cleanup_change_table_max_deletes", 543 "sql_server_offset_known_interval", 544 "allow_user_sessions", 545 "with_0dt_deployment_ddl_check_interval", 546 "enable_0dt_caught_up_check", 547 "with_0dt_caught_up_check_allowed_lag", 548 "with_0dt_caught_up_check_cutoff", 549 "plan_insights_notice_fast_path_clusters_optimize_duration", 550 "enable_continual_task_builtins", 551 "enable_expression_cache", 552 "enable_password_auth", 553 "mz_metrics_lgalloc_map_refresh_interval", 554 "mz_metrics_lgalloc_refresh_interval", 555 "mz_metrics_rusage_refresh_interval", 556 "compute_peek_response_stash_batch_max_runs", 557 "compute_peek_response_stash_read_batch_size_bytes", 558 "compute_peek_response_stash_read_memory_budget_bytes", 559 "compute_peek_stash_num_batches", 560 "compute_peek_stash_batch_size", 561 "enable_timely_init_at_process_startup", 562 "persist_enable_incremental_compaction", 563 "storage_statistics_retention_duration", 564] 565 566 567DEFAULT_CRDB_ENVIRONMENT = [ 568 "COCKROACH_ENGINE_MAX_SYNC_DURATION_DEFAULT=120s", 569 "COCKROACH_LOG_MAX_SYNC_DURATION=120s", 570] 571 572 573# TODO(benesch): change to `docker-mzcompose` once v0.39 ships. 574DEFAULT_CLOUD_PROVIDER = "mzcompose" 575DEFAULT_CLOUD_REGION = "us-east-1" 576DEFAULT_ORG_ID = "00000000-0000-0000-0000-000000000000" 577DEFAULT_ORDINAL = "0" 578DEFAULT_MZ_ENVIRONMENT_ID = f"{DEFAULT_CLOUD_PROVIDER}-{DEFAULT_CLOUD_REGION}-{DEFAULT_ORG_ID}-{DEFAULT_ORDINAL}" 579 580 581# TODO(benesch): replace with Docker health checks. 582def _check_tcp( 583 cmd: list[str], host: str, port: int, timeout_secs: int, kind: str = "" 584) -> list[str]: 585 cmd.extend( 586 [ 587 "timeout", 588 str(timeout_secs), 589 "bash", 590 "-c", 591 f"until [ cat < /dev/null > /dev/tcp/{host}/{port} ] ; do sleep 0.1 ; done", 592 ] 593 ) 594 try: 595 spawn.capture(cmd, stderr=subprocess.STDOUT) 596 except subprocess.CalledProcessError as e: 597 ui.log_in_automation( 598 "wait-for-tcp ({}{}:{}): error running {}: {}, stdout:\n{}\nstderr:\n{}".format( 599 kind, host, port, ui.shell_quote(cmd), e, e.stdout, e.stderr 600 ) 601 ) 602 raise 603 return cmd 604 605 606# TODO(benesch): replace with Docker health checks. 607def _wait_for_pg( 608 timeout_secs: int, 609 query: str, 610 dbname: str, 611 port: int, 612 host: str, 613 user: str, 614 password: str | None, 615 expected: Iterable[Any] | Literal["any"], 616 print_result: bool = False, 617 sslmode: str = "disable", 618) -> None: 619 """Wait for a pg-compatible database (includes materialized)""" 620 obfuscated_password = password[0:1] if password is not None else "" 621 args = f"dbname={dbname} host={host} port={port} user={user} password='{obfuscated_password}...'" 622 ui.progress(f"waiting for {args} to handle {query!r}", "C") 623 error = None 624 for remaining in ui.timeout_loop(timeout_secs, tick=0.5): 625 try: 626 conn = psycopg.connect( 627 dbname=dbname, 628 host=host, 629 port=port, 630 user=user, 631 password=password, 632 connect_timeout=1, 633 sslmode=sslmode, 634 ) 635 # The default (autocommit = false) wraps everything in a transaction. 636 conn.autocommit = True 637 with conn.cursor() as cur: 638 cur.execute(query.encode()) 639 if expected == "any" and cur.rowcount == -1: 640 ui.progress(" success!", finish=True) 641 return 642 result = list(cur.fetchall()) 643 if expected == "any" or result == expected: 644 if print_result: 645 say(f"query result: {result}") 646 else: 647 ui.progress(" success!", finish=True) 648 return 649 else: 650 say( 651 f"host={host} port={port} did not return rows matching {expected} got: {result}" 652 ) 653 except Exception as e: 654 ui.progress(f"{e if print_result else ''} {int(remaining)}") 655 error = e 656 ui.progress(finish=True) 657 raise UIError(f"never got correct result for {args}: {error}") 658 659 660def bootstrap_cluster_replica_size() -> str: 661 return "bootstrap" 662 663 664def cluster_replica_size_map() -> dict[str, dict[str, Any]]: 665 def replica_size( 666 workers: int, 667 scale: int, 668 disabled: bool = False, 669 is_cc: bool = True, 670 memory_limit: str | None = None, 671 ) -> dict[str, Any]: 672 return { 673 "cpu_exclusive": False, 674 "cpu_limit": None, 675 "credits_per_hour": f"{workers * scale}", 676 "disabled": disabled, 677 "disk_limit": None, 678 "is_cc": is_cc, 679 "memory_limit": memory_limit or "4Gi", 680 "scale": scale, 681 "workers": workers, 682 # "selectors": {}, 683 } 684 685 replica_sizes = { 686 bootstrap_cluster_replica_size(): replica_size(1, 1), 687 "2-4": replica_size(4, 2), 688 "free": replica_size(0, 0, disabled=True), 689 "1cc": replica_size(1, 1), 690 "1C": replica_size(1, 1), 691 "1-no-disk": replica_size(1, 1, is_cc=False), 692 "2-no-disk": replica_size(2, 1, is_cc=False), 693 } 694 695 for i in range(0, 6): 696 workers = 1 << i 697 replica_sizes[f"{workers}"] = replica_size(workers, 1) 698 for mem in [4, 8, 16, 32]: 699 replica_sizes[f"{workers}-{mem}G"] = replica_size( 700 workers, 1, memory_limit=f"{mem} GiB" 701 ) 702 703 replica_sizes[f"{workers}-1"] = replica_size(1, workers) 704 replica_sizes[f"{workers}-{workers}"] = replica_size(workers, workers) 705 replica_sizes[f"mem-{workers}"] = replica_size( 706 workers, 1, memory_limit=f"{workers} GiB" 707 ) 708 709 return replica_sizes
def
say(msg: str) -> None:
DEFAULT_CONFLUENT_PLATFORM_VERSION =
'7.9.0'
DEFAULT_MZ_VOLUMES =
['mzdata:/mzdata', 'mydata:/var/lib/mysql-files', 'tmp:/share/tmp', 'scratch:/scratch']
ADDITIONAL_BENCHMARKING_SYSTEM_PARAMETERS =
{'enable_statement_lifecycle_logging': 'false', 'persist_catalog_force_compaction_fuel': '0', 'statement_logging_default_sample_rate': '0', 'statement_logging_max_sample_rate': '0', 'persist_blob_cache_mem_limit_bytes': '1048576', 'persist_blob_cache_scale_with_threads': 'false', 'enable_compute_peek_response_stash': 'false'}
def
get_minimal_system_parameters( version: materialize.mz_version.MzVersion, zero_downtime: bool = False) -> dict[str, str]:
67def get_minimal_system_parameters( 68 version: MzVersion, 69 zero_downtime: bool = False, 70) -> dict[str, str]: 71 """Settings we need in order to have tests run at all, but otherwise stay 72 with the defaults: not changing performance or increasing coverage.""" 73 74 return { 75 # ----- 76 # Unsafe functions 77 "unsafe_enable_unsafe_functions": "true", 78 # ----- 79 # Others (ordered by name) 80 "allow_real_time_recency": "true", 81 "constraint_based_timestamp_selection": "verify", 82 "enable_compute_peek_response_stash": "true", 83 "enable_0dt_deployment": "true" if zero_downtime else "false", 84 "enable_0dt_deployment_panic_after_timeout": "true", 85 "enable_0dt_deployment_sources": ( 86 "true" if version >= MzVersion.parse_mz("v0.132.0-dev") else "false" 87 ), 88 "enable_alter_swap": "true", 89 "enable_columnation_lgalloc": "true", 90 "enable_compute_active_dataflow_cancelation": "true", 91 "enable_compute_correction_v2": "true", 92 "enable_compute_logical_backpressure": "true", 93 "enable_connection_validation_syntax": "true", 94 "enable_continual_task_create": "true", 95 "enable_continual_task_retain": "true", 96 "enable_continual_task_transform": "true", 97 "enable_copy_to_expr": "true", 98 "enable_create_table_from_source": "true", 99 "enable_disk_cluster_replicas": "true", 100 "enable_eager_delta_joins": "true", 101 "enable_envelope_debezium_in_subscribe": "true", 102 "enable_expressions_in_limit_syntax": "true", 103 "enable_introspection_subscribes": "true", 104 "enable_kafka_sink_partition_by": "true", 105 "enable_logical_compaction_window": "true", 106 "enable_multi_worker_storage_persist_sink": "true", 107 "enable_multi_replica_sources": "true", 108 "enable_rbac_checks": "true", 109 "enable_reduce_mfp_fusion": "true", 110 "enable_refresh_every_mvs": "true", 111 "enable_cluster_schedule_refresh": "true", 112 "enable_statement_lifecycle_logging": "true", 113 "enable_compute_temporal_bucketing": "false", 114 "enable_variadic_left_join_lowering": "true", 115 "enable_worker_core_affinity": "true", 116 "grpc_client_http2_keep_alive_timeout": "5s", 117 "ore_overflowing_behavior": "panic", 118 "persist_stats_audit_percent": "100", 119 "unsafe_enable_table_keys": "true", 120 "with_0dt_deployment_max_wait": "1800s", 121 # End of list (ordered by name) 122 }
Settings we need in order to have tests run at all, but otherwise stay with the defaults: not changing performance or increasing coverage.
@dataclass
class
VariableSystemParameter:
def
get_variable_system_parameters( version: materialize.mz_version.MzVersion, zero_downtime: bool, force_source_table_syntax: bool) -> list[VariableSystemParameter]:
133def get_variable_system_parameters( 134 version: MzVersion, 135 zero_downtime: bool, 136 force_source_table_syntax: bool, 137) -> list[VariableSystemParameter]: 138 return [ 139 # ----- 140 # To reduce CRDB load as we are struggling with it in CI (values based on load test environment): 141 VariableSystemParameter( 142 "persist_next_listen_batch_retryer_clamp", 143 "16s", 144 ["100ms", "1s", "10s", "100s"], 145 ), 146 VariableSystemParameter( 147 "persist_next_listen_batch_retryer_initial_backoff", 148 "100ms", 149 ["10ms", "100ms", "1s", "10s"], 150 ), 151 VariableSystemParameter( 152 "persist_next_listen_batch_retryer_fixed_sleep", 153 "1200ms", 154 ["100ms", "1s", "10s"], 155 ), 156 # ----- 157 # Persist internals changes, advance coverage 158 VariableSystemParameter( 159 "persist_enable_arrow_lgalloc_noncc_sizes", "true", ["true", "false"] 160 ), 161 VariableSystemParameter( 162 "persist_enable_s3_lgalloc_noncc_sizes", "true", ["true", "false"] 163 ), 164 # ----- 165 # Others (ordered by name), 166 VariableSystemParameter("cluster_always_use_disk", "true", ["true", "false"]), 167 VariableSystemParameter( 168 "compute_dataflow_max_inflight_bytes", 169 "134217728", 170 ["1048576", "4194304", "16777216", "67108864"], 171 ), # 128 MiB 172 VariableSystemParameter("compute_hydration_concurrency", "2", ["1", "2", "4"]), 173 VariableSystemParameter( 174 "compute_replica_expiration_offset", "3d", ["3d", "10d"] 175 ), 176 VariableSystemParameter( 177 "compute_apply_column_demands", "true", ["true", "false"] 178 ), 179 VariableSystemParameter( 180 "compute_peek_response_stash_threshold_bytes", 181 # 1 MiB, an in-between value 182 "1048576", 183 # force-enabled, the in-between, and the production value 184 ["0", "1048576", "314572800", "67108864"], 185 ), 186 VariableSystemParameter( 187 "disk_cluster_replicas_default", "true", ["true", "false"] 188 ), 189 VariableSystemParameter( 190 "kafka_default_metadata_fetch_interval", 191 "1s", 192 ["100ms", "1s"], 193 ), 194 VariableSystemParameter("mysql_offset_known_interval", "1s", ["100ms", "1s"]), 195 VariableSystemParameter( 196 "force_source_table_syntax", 197 "true" if force_source_table_syntax else "false", 198 ["true", "false"] if force_source_table_syntax else ["false"], 199 ), 200 VariableSystemParameter( 201 "persist_batch_columnar_format", 202 "structured" if version > MzVersion.parse_mz("v0.135.0-dev") else "both_v2", 203 ["row", "both_v2", "both", "structured"], 204 ), 205 VariableSystemParameter( 206 "persist_batch_delete_enabled", "true", ["true", "false"] 207 ), 208 VariableSystemParameter( 209 "persist_batch_structured_order", "true", ["true", "false"] 210 ), 211 VariableSystemParameter( 212 "persist_batch_builder_structured", "true", ["true", "false"] 213 ), 214 VariableSystemParameter( 215 "persist_batch_structured_key_lower_len", 216 "256", 217 ["0", "1", "512", "1000"], 218 ), 219 VariableSystemParameter( 220 "persist_batch_max_run_len", "4", ["2", "3", "4", "16"] 221 ), 222 VariableSystemParameter( 223 "persist_catalog_force_compaction_fuel", 224 "1024", 225 ["256", "1024", "4096"], 226 ), 227 VariableSystemParameter( 228 "persist_catalog_force_compaction_wait", 229 "1s", 230 ["100ms", "1s", "10s"], 231 ), 232 VariableSystemParameter( 233 "persist_encoding_enable_dictionary", "true", ["true", "false"] 234 ), 235 VariableSystemParameter( 236 "persist_fast_path_limit", 237 "1000", 238 ["100", "1000", "10000"], 239 ), 240 VariableSystemParameter("persist_fast_path_order", "true", ["true", "false"]), 241 VariableSystemParameter( 242 "persist_gc_use_active_gc", 243 ("true" if version > MzVersion.parse_mz("v0.143.0-dev") else "false"), 244 ( 245 ["true", "false"] 246 if version > MzVersion.parse_mz("v0.127.0-dev") 247 else ["false"] 248 ), 249 ), 250 VariableSystemParameter( 251 "persist_gc_min_versions", 252 "16", 253 ["16", "256", "1024"], 254 ), 255 VariableSystemParameter( 256 "persist_gc_max_versions", 257 "128000", 258 ["256", "128000"], 259 ), 260 VariableSystemParameter( 261 "persist_inline_writes_single_max_bytes", 262 "4096", 263 ["256", "1024", "4096", "16384"], 264 ), 265 VariableSystemParameter( 266 "persist_inline_writes_total_max_bytes", 267 "1048576", 268 ["65536", "262144", "1048576", "4194304"], 269 ), 270 VariableSystemParameter( 271 "persist_pubsub_client_enabled", "true", ["true", "false"] 272 ), 273 VariableSystemParameter( 274 "persist_pubsub_push_diff_enabled", "true", ["true", "false"] 275 ), 276 VariableSystemParameter( 277 "persist_record_compactions", "true", ["true", "false"] 278 ), 279 VariableSystemParameter( 280 "persist_record_schema_id", 281 ("true" if version > MzVersion.parse_mz("v0.127.0-dev") else "false"), 282 ( 283 ["true", "false"] 284 if version > MzVersion.parse_mz("v0.127.0-dev") 285 else ["false"] 286 ), 287 ), 288 VariableSystemParameter( 289 "persist_rollup_use_active_rollup", 290 ("true" if version > MzVersion.parse_mz("v0.143.0-dev") else "false"), 291 ( 292 ["true", "false"] 293 if version > MzVersion.parse_mz("v0.127.0-dev") 294 else ["false"] 295 ), 296 ), 297 # 16 MiB - large enough to avoid a big perf hit, small enough to get more coverage... 298 VariableSystemParameter( 299 "persist_blob_target_size", 300 "16777216", 301 ["4096", "1048576", "16777216", "134217728"], 302 ), 303 # 5 times the default part size - 4 is the bare minimum. 304 VariableSystemParameter( 305 "persist_compaction_memory_bound_bytes", 306 "83886080", 307 ["67108864", "134217728", "536870912", "1073741824"], 308 ), 309 VariableSystemParameter( 310 "persist_use_critical_since_catalog", "true", ["true", "false"] 311 ), 312 VariableSystemParameter( 313 "persist_use_critical_since_snapshot", 314 "false" if zero_downtime else "true", 315 ["false"] if zero_downtime else ["true", "false"], 316 ), 317 VariableSystemParameter( 318 "persist_use_critical_since_source", 319 "false" if zero_downtime else "true", 320 ["false"] if zero_downtime else ["true", "false"], 321 ), 322 VariableSystemParameter( 323 "persist_part_decode_format", "arrow", ["arrow", "row_with_validate"] 324 ), 325 VariableSystemParameter( 326 "persist_blob_cache_scale_with_threads", "true", ["true", "false"] 327 ), 328 VariableSystemParameter( 329 "persist_validate_part_bounds_on_read", "true", ["true", "false"] 330 ), 331 VariableSystemParameter( 332 "persist_validate_part_bounds_on_write", "true", ["true", "false"] 333 ), 334 VariableSystemParameter("pg_offset_known_interval", "1s", ["100ms", "1s"]), 335 VariableSystemParameter( 336 "statement_logging_default_sample_rate", "0.01", ["0", "0.01"] 337 ), 338 VariableSystemParameter( 339 "statement_logging_max_sample_rate", "0.01", ["0", "0.01"] 340 ), 341 VariableSystemParameter("storage_reclock_to_latest", "true", ["true", "false"]), 342 VariableSystemParameter( 343 "storage_source_decode_fuel", 344 "100000", 345 ["10000", "100000", "1000000"], 346 ), 347 VariableSystemParameter( 348 "storage_statistics_collection_interval", 349 "1000", 350 ["100", "1000", "10000"], 351 ), 352 VariableSystemParameter( 353 "storage_statistics_interval", "2000", ["100", "1000", "10000"] 354 ), 355 VariableSystemParameter( 356 "storage_use_continual_feedback_upsert", "true", ["true", "false"] 357 ), 358 # End of list (ordered by name) 359 ]
def
get_default_system_parameters( version: materialize.mz_version.MzVersion | None = None, zero_downtime: bool = False, force_source_table_syntax: bool = False) -> dict[str, str]:
362def get_default_system_parameters( 363 version: MzVersion | None = None, 364 zero_downtime: bool = False, 365 force_source_table_syntax: bool = False, 366) -> dict[str, str]: 367 """For upgrade tests we only want parameters set when all environmentd / 368 clusterd processes have reached a specific version (or higher) 369 """ 370 371 if not version: 372 version = MzVersion.parse_cargo() 373 374 params = get_minimal_system_parameters(version, zero_downtime) 375 376 system_param_setting = os.getenv("CI_SYSTEM_PARAMETERS", "") 377 variable_params = get_variable_system_parameters( 378 version, zero_downtime, force_source_table_syntax 379 ) 380 381 if system_param_setting == "": 382 for param in variable_params: 383 params[param.key] = param.default 384 elif system_param_setting == "random": 385 seed = os.getenv("CI_SYSTEM_PARAMETERS_SEED", os.getenv("BUILDKITE_JOB_ID", 1)) 386 rng = random.Random(seed) 387 for param in variable_params: 388 params[param.key] = rng.choice(param.values) 389 print( 390 f"System parameters with seed CI_SYSTEM_PARAMETERS_SEED={seed}: {params}", 391 file=sys.stderr, 392 ) 393 elif system_param_setting == "minimal": 394 pass 395 else: 396 raise ValueError( 397 f"Unknown value for CI_SYSTEM_PARAMETERS: {system_param_setting}" 398 ) 399 400 return params
For upgrade tests we only want parameters set when all environmentd / clusterd processes have reached a specific version (or higher)
UNINTERESTING_SYSTEM_PARAMETERS =
['enable_mz_join_core', 'enable_compute_mv_append_smearing', 'linear_join_yielding', 'enable_lgalloc', 'enable_lgalloc_eager_reclamation', 'lgalloc_background_interval', 'lgalloc_file_growth_dampener', 'lgalloc_local_buffer_bytes', 'lgalloc_slow_clear_bytes', 'lgalloc_limiter_interval', 'lgalloc_limiter_usage_factor', 'lgalloc_limiter_usage_bias', 'lgalloc_limiter_burst_factor', 'memory_limiter_interval', 'memory_limiter_usage_factor', 'memory_limiter_usage_bias', 'memory_limiter_burst_factor', 'enable_columnar_lgalloc', 'compute_server_maintenance_interval', 'compute_dataflow_max_inflight_bytes_cc', 'compute_flat_map_fuel', 'consolidating_vec_growth_dampener', 'copy_to_s3_parquet_row_group_file_ratio', 'copy_to_s3_arrow_builder_buffer_ratio', 'copy_to_s3_multipart_part_size_bytes', 'enable_compute_replica_expiration', 'enable_compute_render_fueled_as_specific_collection', 'compute_logical_backpressure_max_retained_capabilities', 'compute_logical_backpressure_inflight_slack', 'persist_fetch_semaphore_cost_adjustment', 'persist_fetch_semaphore_permit_adjustment', 'persist_optimize_ignored_data_fetch', 'persist_pubsub_same_process_delegate_enabled', 'persist_pubsub_connect_attempt_timeout', 'persist_pubsub_request_timeout', 'persist_pubsub_connect_max_backoff', 'persist_pubsub_client_sender_channel_size', 'persist_pubsub_client_receiver_channel_size', 'persist_pubsub_server_connection_channel_size', 'persist_pubsub_state_cache_shard_ref_channel_size', 'persist_pubsub_reconnect_backoff', 'persist_encoding_compression_format', 'persist_batch_max_runs', 'persist_write_combine_inline_writes', 'persist_reader_lease_duration', 'persist_consensus_connection_pool_max_size', 'persist_consensus_connection_pool_max_wait', 'persist_consensus_connection_pool_ttl', 'persist_consensus_connection_pool_ttl_stagger', 'crdb_connect_timeout', 'crdb_tcp_user_timeout', 'persist_use_critical_since_txn', 'use_global_txn_cache_source', 'persist_batch_builder_max_outstanding_parts', 'persist_compaction_heuristic_min_inputs', 'persist_compaction_heuristic_min_parts', 'persist_compaction_heuristic_min_updates', 'persist_gc_blob_delete_concurrency_limit', 'persist_state_versions_recent_live_diffs_limit', 'persist_usage_state_fetch_concurrency_limit', 'persist_blob_operation_timeout', 'persist_blob_operation_attempt_timeout', 'persist_blob_connect_timeout', 'persist_blob_read_timeout', 'persist_stats_collection_enabled', 'persist_stats_filter_enabled', 'persist_stats_budget_bytes', 'persist_stats_untrimmable_columns_equals', 'persist_stats_untrimmable_columns_prefix', 'persist_stats_untrimmable_columns_suffix', 'persist_expression_cache_force_compaction_fuel', 'persist_expression_cache_force_compaction_wait', 'persist_blob_cache_mem_limit_bytes', 'persist_blob_cache_scale_factor_bytes', 'persist_claim_unclaimed_compactions', 'persist_claim_compaction_percent', 'persist_claim_compaction_min_version', 'persist_next_listen_batch_retryer_multiplier', 'persist_rollup_threshold', 'persist_rollup_fallback_threshold_ms', 'persist_gc_fallback_threshold_ms', 'persist_compaction_minimum_timeout', 'persist_compaction_use_most_recent_schema', 'persist_compaction_check_process_flag', 'balancerd_sigterm_connection_wait', 'balancerd_sigterm_listen_wait', 'balancerd_inject_proxy_protocol_header_http', 'balancerd_log_filter', 'balancerd_opentelemetry_filter', 'balancerd_log_filter_defaults', 'balancerd_opentelemetry_filter_defaults', 'balancerd_sentry_filters', 'persist_enable_s3_lgalloc_cc_sizes', 'persist_enable_arrow_lgalloc_cc_sizes', 'controller_past_generation_replica_cleanup_retry_interval', 'wallclock_lag_recording_interval', 'enable_wallclock_lag_histogram_collection', 'wallclock_lag_histogram_period_interval', 'enable_timely_zero_copy', 'enable_timely_zero_copy_lgalloc', 'timely_zero_copy_limit', 'arrangement_exert_proportionality', 'txn_wal_apply_ensure_schema_match', 'persist_txns_data_shard_retryer_initial_backoff', 'persist_txns_data_shard_retryer_multiplier', 'persist_txns_data_shard_retryer_clamp', 'storage_cluster_shutdown_grace_period', 'storage_dataflow_delay_sources_past_rehydration', 'storage_dataflow_suspendable_sources', 'storage_downgrade_since_during_finalization', 'replica_metrics_history_retention_interval', 'wallclock_lag_history_retention_interval', 'wallclock_global_lag_histogram_retention_interval', 'kafka_client_id_enrichment_rules', 'kafka_poll_max_wait', 'kafka_default_aws_privatelink_endpoint_identification_algorithm', 'kafka_buffered_event_resize_threshold_elements', 'mysql_replication_heartbeat_interval', 'postgres_fetch_slot_resume_lsn_interval', 'pg_schema_validation_interval', 'storage_enforce_external_addresses', 'storage_upsert_prevent_snapshot_buffering', 'storage_rocksdb_use_merge_operator', 'storage_upsert_max_snapshot_batch_buffering', 'storage_rocksdb_cleanup_tries', 'storage_suspend_and_restart_delay', 'storage_sink_snapshot_frontier', 'storage_server_maintenance_interval', 'storage_sink_progress_search', 'storage_sink_ensure_topic_config', 'sql_server_snapshot_max_lsn_wait', 'sql_server_snapshot_progress_report_interval', 'sql_server_cdc_poll_interval', 'sql_server_cdc_cleanup_change_table', 'sql_server_cdc_cleanup_change_table_max_deletes', 'sql_server_offset_known_interval', 'allow_user_sessions', 'with_0dt_deployment_ddl_check_interval', 'enable_0dt_caught_up_check', 'with_0dt_caught_up_check_allowed_lag', 'with_0dt_caught_up_check_cutoff', 'plan_insights_notice_fast_path_clusters_optimize_duration', 'enable_continual_task_builtins', 'enable_expression_cache', 'enable_password_auth', 'mz_metrics_lgalloc_map_refresh_interval', 'mz_metrics_lgalloc_refresh_interval', 'mz_metrics_rusage_refresh_interval', 'compute_peek_response_stash_batch_max_runs', 'compute_peek_response_stash_read_batch_size_bytes', 'compute_peek_response_stash_read_memory_budget_bytes', 'compute_peek_stash_num_batches', 'compute_peek_stash_batch_size', 'enable_timely_init_at_process_startup', 'persist_enable_incremental_compaction', 'storage_statistics_retention_duration']
DEFAULT_CRDB_ENVIRONMENT =
['COCKROACH_ENGINE_MAX_SYNC_DURATION_DEFAULT=120s', 'COCKROACH_LOG_MAX_SYNC_DURATION=120s']
DEFAULT_CLOUD_PROVIDER =
'mzcompose'
DEFAULT_CLOUD_REGION =
'us-east-1'
DEFAULT_ORG_ID =
'00000000-0000-0000-0000-000000000000'
DEFAULT_ORDINAL =
'0'
DEFAULT_MZ_ENVIRONMENT_ID =
'mzcompose-us-east-1-00000000-0000-0000-0000-000000000000-0'
def
bootstrap_cluster_replica_size() -> str:
def
cluster_replica_size_map() -> dict[str, dict[str, typing.Any]]:
665def cluster_replica_size_map() -> dict[str, dict[str, Any]]: 666 def replica_size( 667 workers: int, 668 scale: int, 669 disabled: bool = False, 670 is_cc: bool = True, 671 memory_limit: str | None = None, 672 ) -> dict[str, Any]: 673 return { 674 "cpu_exclusive": False, 675 "cpu_limit": None, 676 "credits_per_hour": f"{workers * scale}", 677 "disabled": disabled, 678 "disk_limit": None, 679 "is_cc": is_cc, 680 "memory_limit": memory_limit or "4Gi", 681 "scale": scale, 682 "workers": workers, 683 # "selectors": {}, 684 } 685 686 replica_sizes = { 687 bootstrap_cluster_replica_size(): replica_size(1, 1), 688 "2-4": replica_size(4, 2), 689 "free": replica_size(0, 0, disabled=True), 690 "1cc": replica_size(1, 1), 691 "1C": replica_size(1, 1), 692 "1-no-disk": replica_size(1, 1, is_cc=False), 693 "2-no-disk": replica_size(2, 1, is_cc=False), 694 } 695 696 for i in range(0, 6): 697 workers = 1 << i 698 replica_sizes[f"{workers}"] = replica_size(workers, 1) 699 for mem in [4, 8, 16, 32]: 700 replica_sizes[f"{workers}-{mem}G"] = replica_size( 701 workers, 1, memory_limit=f"{mem} GiB" 702 ) 703 704 replica_sizes[f"{workers}-1"] = replica_size(1, workers) 705 replica_sizes[f"{workers}-{workers}"] = replica_size(workers, workers) 706 replica_sizes[f"mem-{workers}"] = replica_size( 707 workers, 1, memory_limit=f"{workers} GiB" 708 ) 709 710 return replica_sizes