misc.python.materialize.mzcompose
The implementation of the mzcompose system for Docker compositions.
For an overview of what mzcompose is and why it exists, see the user-facing documentation.
1# Copyright Materialize, Inc. and contributors. All rights reserved. 2# 3# Use of this software is governed by the Business Source License 4# included in the LICENSE file at the root of this repository. 5# 6# As of the Change Date specified in that file, in accordance with 7# the Business Source License, use of this software will be governed 8# by the Apache License, Version 2.0. 9 10"""The implementation of the mzcompose system for Docker compositions. 11 12For an overview of what mzcompose is and why it exists, see the [user-facing 13documentation][user-docs]. 14 15[user-docs]: https://github.com/MaterializeInc/materialize/blob/main/doc/developer/mzbuild.md 16""" 17 18import os 19import random 20import subprocess 21import sys 22from collections.abc import Iterable 23from dataclasses import dataclass 24from typing import Any, Literal, TypeVar 25 26import psycopg 27 28from materialize import spawn, ui 29from materialize.mz_version import MzVersion 30from materialize.ui import UIError 31 32T = TypeVar("T") 33say = ui.speaker("C> ") 34 35 36DEFAULT_CONFLUENT_PLATFORM_VERSION = "7.9.0" 37 38DEFAULT_MZ_VOLUMES = [ 39 "mzdata:/mzdata", 40 "mydata:/var/lib/mysql-files", 41 "tmp:/share/tmp", 42 "scratch:/scratch", 43] 44 45 46# Parameters which disable systems that periodically/unpredictably impact performance 47ADDITIONAL_BENCHMARKING_SYSTEM_PARAMETERS = { 48 "enable_statement_lifecycle_logging": "false", 49 "persist_catalog_force_compaction_fuel": "0", 50 "statement_logging_default_sample_rate": "0", 51 "statement_logging_max_sample_rate": "0", 52 # Default of 128 MB increases memory usage by a lot for some small 53 # performance in benchmarks, see for example FastPathLimit scenario: 55% 54 # more memory, 5% faster 55 "persist_blob_cache_mem_limit_bytes": "1048576", 56 # This would increase the memory usage of many tests, making it harder to 57 # tell small memory increase regressions 58 "persist_blob_cache_scale_with_threads": "false", 59 # The peek response stash kicks in when results get larger, and it 60 # increases query latency. Which in turn makes benchmarking more 61 # unpredictable. 62 "enable_compute_peek_response_stash": "false", 63} 64 65 66def get_minimal_system_parameters( 67 version: MzVersion, 68) -> dict[str, str]: 69 """Settings we need in order to have tests run at all, but otherwise stay 70 with the defaults: not changing performance or increasing coverage.""" 71 72 config = { 73 # ----- 74 # Unsafe functions 75 "unsafe_enable_unsafe_functions": "true", 76 # ----- 77 # Others (ordered by name) 78 "allow_real_time_recency": "true", 79 "constraint_based_timestamp_selection": "verify", 80 "enable_compute_peek_response_stash": "true", 81 "enable_0dt_deployment_panic_after_timeout": "true", 82 "enable_0dt_deployment_sources": ( 83 "true" if version >= MzVersion.parse_mz("v0.132.0-dev") else "false" 84 ), 85 "enable_alter_swap": "true", 86 "enable_columnar_lgalloc": "false", 87 "enable_columnation_lgalloc": "false", 88 "enable_compute_correction_v2": "true", 89 "enable_compute_logical_backpressure": "true", 90 "enable_connection_validation_syntax": "true", 91 "enable_continual_task_create": "true", 92 "enable_continual_task_retain": "true", 93 "enable_continual_task_transform": "true", 94 "enable_copy_to_expr": "true", 95 "enable_create_table_from_source": "true", 96 "enable_eager_delta_joins": "true", 97 "enable_envelope_debezium_in_subscribe": "true", 98 "enable_expressions_in_limit_syntax": "true", 99 "enable_introspection_subscribes": "true", 100 "enable_kafka_sink_partition_by": "true", 101 "enable_lgalloc": "false", 102 "enable_load_generator_counter": "true", 103 "enable_logical_compaction_window": "true", 104 "enable_multi_worker_storage_persist_sink": "true", 105 "enable_multi_replica_sources": "true", 106 "enable_rbac_checks": "true", 107 "enable_reduce_mfp_fusion": "true", 108 "enable_refresh_every_mvs": "true", 109 "enable_cluster_schedule_refresh": "true", 110 "enable_sql_server_source": "true", 111 "enable_statement_lifecycle_logging": "true", 112 "enable_compute_temporal_bucketing": "true", 113 "enable_variadic_left_join_lowering": "true", 114 "enable_worker_core_affinity": "true", 115 "grpc_client_http2_keep_alive_timeout": "5s", 116 "ore_overflowing_behavior": "panic", 117 "unsafe_enable_table_keys": "true", 118 "with_0dt_deployment_max_wait": "1800s", 119 # End of list (ordered by name) 120 } 121 122 if version < MzVersion.parse_mz("v0.163.0-dev"): 123 config["enable_compute_active_dataflow_cancelation"] = "true" 124 125 return config 126 127 128@dataclass 129class VariableSystemParameter: 130 key: str 131 default: str 132 values: list[str] 133 134 135# TODO: The linter should check this too 136def get_variable_system_parameters( 137 version: MzVersion, 138 force_source_table_syntax: bool, 139) -> list[VariableSystemParameter]: 140 return [ 141 # ----- 142 # To reduce CRDB load as we are struggling with it in CI (values based on load test environment): 143 VariableSystemParameter( 144 "persist_next_listen_batch_retryer_clamp", 145 "16s", 146 ["100ms", "1s", "10s", "100s"], 147 ), 148 VariableSystemParameter( 149 "persist_next_listen_batch_retryer_initial_backoff", 150 "100ms", 151 ["10ms", "100ms", "1s", "10s"], 152 ), 153 VariableSystemParameter( 154 "persist_next_listen_batch_retryer_fixed_sleep", 155 "1200ms", 156 ["100ms", "1s", "10s"], 157 ), 158 # ----- 159 # Persist internals changes, advance coverage 160 VariableSystemParameter( 161 "persist_enable_arrow_lgalloc_noncc_sizes", "true", ["true", "false"] 162 ), 163 VariableSystemParameter( 164 "persist_enable_s3_lgalloc_noncc_sizes", "true", ["true", "false"] 165 ), 166 # ----- 167 # Others (ordered by name), 168 VariableSystemParameter( 169 "compute_dataflow_max_inflight_bytes", 170 "134217728", 171 ["1048576", "4194304", "16777216", "67108864"], 172 ), # 128 MiB 173 VariableSystemParameter("compute_hydration_concurrency", "2", ["1", "2", "4"]), 174 VariableSystemParameter( 175 "compute_replica_expiration_offset", "3d", ["3d", "10d"] 176 ), 177 VariableSystemParameter( 178 "compute_apply_column_demands", "true", ["true", "false"] 179 ), 180 VariableSystemParameter( 181 "compute_peek_response_stash_threshold_bytes", 182 # 1 MiB, an in-between value 183 "1048576", 184 # force-enabled, the in-between, and the production value 185 ["0", "1048576", "314572800", "67108864"], 186 ), 187 VariableSystemParameter( 188 "enable_password_auth", 189 "true", 190 ["true", "false"], 191 ), 192 VariableSystemParameter( 193 "kafka_default_metadata_fetch_interval", 194 "1s", 195 ["100ms", "1s"], 196 ), 197 VariableSystemParameter("mysql_offset_known_interval", "1s", ["100ms", "1s"]), 198 VariableSystemParameter( 199 "force_source_table_syntax", 200 "true" if force_source_table_syntax else "false", 201 ["true", "false"] if force_source_table_syntax else ["false"], 202 ), 203 VariableSystemParameter( 204 "persist_batch_columnar_format", 205 "structured" if version > MzVersion.parse_mz("v0.135.0-dev") else "both_v2", 206 ["row", "both_v2", "both", "structured"], 207 ), 208 VariableSystemParameter( 209 "persist_batch_delete_enabled", "true", ["true", "false"] 210 ), 211 VariableSystemParameter( 212 "persist_batch_structured_order", "true", ["true", "false"] 213 ), 214 VariableSystemParameter( 215 "persist_batch_builder_structured", "true", ["true", "false"] 216 ), 217 VariableSystemParameter( 218 "persist_batch_structured_key_lower_len", 219 "256", 220 ["0", "1", "512", "1000"], 221 ), 222 VariableSystemParameter( 223 "persist_batch_max_run_len", "4", ["2", "3", "4", "16"] 224 ), 225 VariableSystemParameter( 226 "persist_catalog_force_compaction_fuel", 227 "1024", 228 ["256", "1024", "4096"], 229 ), 230 VariableSystemParameter( 231 "persist_catalog_force_compaction_wait", 232 "1s", 233 ["100ms", "1s", "10s"], 234 ), 235 VariableSystemParameter( 236 "persist_encoding_enable_dictionary", "true", ["true", "false"] 237 ), 238 VariableSystemParameter( 239 "persist_stats_audit_percent", 240 "100", 241 [ 242 "0", 243 "1", 244 "2", 245 "10", 246 "100", 247 ], 248 ), 249 VariableSystemParameter("persist_stats_audit_panic", "true", ["true", "false"]), 250 VariableSystemParameter( 251 "persist_encoding_enable_dictionary", "true", ["true", "false"] 252 ), 253 VariableSystemParameter( 254 "persist_fast_path_limit", 255 "1000", 256 ["100", "1000", "10000"], 257 ), 258 VariableSystemParameter("persist_fast_path_order", "true", ["true", "false"]), 259 VariableSystemParameter( 260 "persist_gc_use_active_gc", 261 ("true" if version > MzVersion.parse_mz("v0.143.0-dev") else "false"), 262 ( 263 ["true", "false"] 264 if version > MzVersion.parse_mz("v0.127.0-dev") 265 else ["false"] 266 ), 267 ), 268 VariableSystemParameter( 269 "persist_gc_min_versions", 270 "16", 271 ["16", "256", "1024"], 272 ), 273 VariableSystemParameter( 274 "persist_gc_max_versions", 275 "128000", 276 ["256", "128000"], 277 ), 278 VariableSystemParameter( 279 "persist_inline_writes_single_max_bytes", 280 "4096", 281 ["256", "1024", "4096", "16384"], 282 ), 283 VariableSystemParameter( 284 "persist_inline_writes_total_max_bytes", 285 "1048576", 286 ["65536", "262144", "1048576", "4194304"], 287 ), 288 VariableSystemParameter( 289 "persist_pubsub_client_enabled", "true", ["true", "false"] 290 ), 291 VariableSystemParameter( 292 "persist_pubsub_push_diff_enabled", "true", ["true", "false"] 293 ), 294 VariableSystemParameter( 295 "persist_record_compactions", "true", ["true", "false"] 296 ), 297 VariableSystemParameter( 298 "persist_record_schema_id", 299 ("true" if version > MzVersion.parse_mz("v0.127.0-dev") else "false"), 300 ( 301 ["true", "false"] 302 if version > MzVersion.parse_mz("v0.127.0-dev") 303 else ["false"] 304 ), 305 ), 306 VariableSystemParameter( 307 "persist_rollup_use_active_rollup", 308 ("true" if version > MzVersion.parse_mz("v0.143.0-dev") else "false"), 309 ( 310 ["true", "false"] 311 if version > MzVersion.parse_mz("v0.127.0-dev") 312 else ["false"] 313 ), 314 ), 315 # 16 MiB - large enough to avoid a big perf hit, small enough to get more coverage... 316 VariableSystemParameter( 317 "persist_blob_target_size", 318 "16777216", 319 ["4096", "1048576", "16777216", "134217728"], 320 ), 321 # 5 times the default part size - 4 is the bare minimum. 322 VariableSystemParameter( 323 "persist_compaction_memory_bound_bytes", 324 "83886080", 325 ["67108864", "134217728", "536870912", "1073741824"], 326 ), 327 VariableSystemParameter( 328 "persist_enable_incremental_compaction", 329 ("true" if version >= MzVersion.parse_mz("v0.161.0-dev") else "false"), 330 ( 331 ["true", "false"] 332 if version >= MzVersion.parse_mz("v0.161.0-dev") 333 else ["false"] 334 ), 335 ), 336 VariableSystemParameter( 337 "persist_use_critical_since_catalog", "true", ["true", "false"] 338 ), 339 VariableSystemParameter( 340 "persist_use_critical_since_snapshot", 341 "false", # always false, because we always have zero-downtime enabled 342 ["false"], 343 ), 344 VariableSystemParameter( 345 "persist_use_critical_since_source", 346 "false", # always false, because we always have zero-downtime enabled 347 ["false"], 348 ), 349 VariableSystemParameter( 350 "persist_part_decode_format", "arrow", ["arrow", "row_with_validate"] 351 ), 352 VariableSystemParameter( 353 "persist_blob_cache_scale_with_threads", "true", ["true", "false"] 354 ), 355 VariableSystemParameter( 356 "persist_validate_part_bounds_on_read", "true", ["true", "false"] 357 ), 358 VariableSystemParameter( 359 "persist_validate_part_bounds_on_write", "true", ["true", "false"] 360 ), 361 VariableSystemParameter("pg_offset_known_interval", "1s", ["100ms", "1s"]), 362 VariableSystemParameter( 363 "statement_logging_default_sample_rate", "0.01", ["0", "0.01"] 364 ), 365 VariableSystemParameter( 366 "statement_logging_max_sample_rate", "0.01", ["0", "0.01"] 367 ), 368 VariableSystemParameter("storage_reclock_to_latest", "true", ["true", "false"]), 369 VariableSystemParameter( 370 "storage_source_decode_fuel", 371 "100000", 372 ["10000", "100000", "1000000"], 373 ), 374 VariableSystemParameter( 375 "storage_statistics_collection_interval", 376 "1000", 377 ["100", "1000", "10000"], 378 ), 379 VariableSystemParameter( 380 "storage_statistics_interval", "2000", ["100", "1000", "10000"] 381 ), 382 VariableSystemParameter( 383 "storage_use_continual_feedback_upsert", "true", ["true", "false"] 384 ), 385 VariableSystemParameter( 386 "sql_server_offset_known_interval", "1s", ["10ms", "100ms", "1s"] 387 ), 388 # End of list (ordered by name) 389 ] 390 391 392def get_default_system_parameters( 393 version: MzVersion | None = None, 394 force_source_table_syntax: bool = False, 395) -> dict[str, str]: 396 """For upgrade tests we only want parameters set when all environmentd / 397 clusterd processes have reached a specific version (or higher) 398 """ 399 400 if not version: 401 version = MzVersion.parse_cargo() 402 403 params = get_minimal_system_parameters(version) 404 405 system_param_setting = os.getenv("CI_SYSTEM_PARAMETERS", "") 406 variable_params = get_variable_system_parameters(version, force_source_table_syntax) 407 408 if system_param_setting == "": 409 for param in variable_params: 410 params[param.key] = param.default 411 elif system_param_setting == "random": 412 seed = os.getenv("CI_SYSTEM_PARAMETERS_SEED", os.getenv("BUILDKITE_JOB_ID", 1)) 413 rng = random.Random(seed) 414 for param in variable_params: 415 params[param.key] = rng.choice(param.values) 416 print( 417 f"System parameters with seed CI_SYSTEM_PARAMETERS_SEED={seed}: {params}", 418 file=sys.stderr, 419 ) 420 elif system_param_setting == "minimal": 421 pass 422 else: 423 raise ValueError( 424 f"Unknown value for CI_SYSTEM_PARAMETERS: {system_param_setting}" 425 ) 426 427 return params 428 429 430# If you are adding a new config flag in Materialize, consider setting values 431# for it in get_variable_system_parameters if it can be varied in tests. Set it 432# in get_minimal_system_parameters if it's required for tests to succeed at 433# all. Only add it in UNINTERESTING_SYSTEM_PARAMETERS if none of the above 434# apply. 435UNINTERESTING_SYSTEM_PARAMETERS = [ 436 "enable_mz_join_core", 437 "linear_join_yielding", 438 "enable_lgalloc_eager_reclamation", 439 "lgalloc_background_interval", 440 "lgalloc_file_growth_dampener", 441 "lgalloc_local_buffer_bytes", 442 "lgalloc_slow_clear_bytes", 443 "memory_limiter_interval", 444 "memory_limiter_usage_bias", 445 "memory_limiter_burst_factor", 446 "compute_server_maintenance_interval", 447 "compute_dataflow_max_inflight_bytes_cc", 448 "compute_flat_map_fuel", 449 "compute_temporal_bucketing_summary", 450 "consolidating_vec_growth_dampener", 451 "copy_to_s3_parquet_row_group_file_ratio", 452 "copy_to_s3_arrow_builder_buffer_ratio", 453 "copy_to_s3_multipart_part_size_bytes", 454 "enable_compute_replica_expiration", 455 "enable_compute_render_fueled_as_specific_collection", 456 "compute_logical_backpressure_max_retained_capabilities", 457 "compute_logical_backpressure_inflight_slack", 458 "persist_fetch_semaphore_cost_adjustment", 459 "persist_fetch_semaphore_permit_adjustment", 460 "persist_optimize_ignored_data_fetch", 461 "persist_pubsub_same_process_delegate_enabled", 462 "persist_pubsub_connect_attempt_timeout", 463 "persist_pubsub_request_timeout", 464 "persist_pubsub_connect_max_backoff", 465 "persist_pubsub_client_sender_channel_size", 466 "persist_pubsub_client_receiver_channel_size", 467 "persist_pubsub_server_connection_channel_size", 468 "persist_pubsub_state_cache_shard_ref_channel_size", 469 "persist_pubsub_reconnect_backoff", 470 "persist_encoding_compression_format", 471 "persist_batch_max_runs", 472 "persist_write_combine_inline_writes", 473 "persist_reader_lease_duration", 474 "persist_consensus_connection_pool_max_size", 475 "persist_consensus_connection_pool_max_wait", 476 "persist_consensus_connection_pool_ttl", 477 "persist_consensus_connection_pool_ttl_stagger", 478 "crdb_connect_timeout", 479 "crdb_tcp_user_timeout", 480 "persist_use_critical_since_txn", 481 "use_global_txn_cache_source", 482 "persist_batch_builder_max_outstanding_parts", 483 "persist_compaction_heuristic_min_inputs", 484 "persist_compaction_heuristic_min_parts", 485 "persist_compaction_heuristic_min_updates", 486 "persist_gc_blob_delete_concurrency_limit", 487 "persist_state_versions_recent_live_diffs_limit", 488 "persist_usage_state_fetch_concurrency_limit", 489 "persist_blob_operation_timeout", 490 "persist_blob_operation_attempt_timeout", 491 "persist_blob_connect_timeout", 492 "persist_blob_read_timeout", 493 "persist_stats_collection_enabled", 494 "persist_stats_filter_enabled", 495 "persist_stats_budget_bytes", 496 "persist_stats_untrimmable_columns_equals", 497 "persist_stats_untrimmable_columns_prefix", 498 "persist_stats_untrimmable_columns_suffix", 499 "persist_expression_cache_force_compaction_fuel", 500 "persist_expression_cache_force_compaction_wait", 501 "persist_blob_cache_mem_limit_bytes", 502 "persist_blob_cache_scale_factor_bytes", 503 "persist_claim_unclaimed_compactions", 504 "persist_claim_compaction_percent", 505 "persist_claim_compaction_min_version", 506 "persist_next_listen_batch_retryer_multiplier", 507 "persist_rollup_threshold", 508 "persist_rollup_fallback_threshold_ms", 509 "persist_gc_fallback_threshold_ms", 510 "persist_compaction_minimum_timeout", 511 "persist_compaction_use_most_recent_schema", 512 "persist_compaction_check_process_flag", 513 "balancerd_sigterm_connection_wait", 514 "balancerd_sigterm_listen_wait", 515 "balancerd_inject_proxy_protocol_header_http", 516 "balancerd_log_filter", 517 "balancerd_opentelemetry_filter", 518 "balancerd_log_filter_defaults", 519 "balancerd_opentelemetry_filter_defaults", 520 "balancerd_sentry_filters", 521 "persist_enable_s3_lgalloc_cc_sizes", 522 "persist_enable_arrow_lgalloc_cc_sizes", 523 "controller_past_generation_replica_cleanup_retry_interval", 524 "wallclock_lag_recording_interval", 525 "wallclock_lag_histogram_period_interval", 526 "enable_timely_zero_copy", 527 "enable_timely_zero_copy_lgalloc", 528 "timely_zero_copy_limit", 529 "arrangement_exert_proportionality", 530 "txn_wal_apply_ensure_schema_match", 531 "persist_txns_data_shard_retryer_initial_backoff", 532 "persist_txns_data_shard_retryer_multiplier", 533 "persist_txns_data_shard_retryer_clamp", 534 "storage_cluster_shutdown_grace_period", 535 "storage_dataflow_delay_sources_past_rehydration", 536 "storage_dataflow_suspendable_sources", 537 "storage_downgrade_since_during_finalization", 538 "replica_metrics_history_retention_interval", 539 "wallclock_lag_history_retention_interval", 540 "wallclock_global_lag_histogram_retention_interval", 541 "kafka_client_id_enrichment_rules", 542 "kafka_poll_max_wait", 543 "kafka_default_aws_privatelink_endpoint_identification_algorithm", 544 "kafka_buffered_event_resize_threshold_elements", 545 "mysql_replication_heartbeat_interval", 546 "postgres_fetch_slot_resume_lsn_interval", 547 "pg_schema_validation_interval", 548 "storage_enforce_external_addresses", 549 "storage_upsert_prevent_snapshot_buffering", 550 "storage_rocksdb_use_merge_operator", 551 "storage_upsert_max_snapshot_batch_buffering", 552 "storage_rocksdb_cleanup_tries", 553 "storage_suspend_and_restart_delay", 554 "storage_server_maintenance_interval", 555 "storage_sink_progress_search", 556 "storage_sink_ensure_topic_config", 557 "sql_server_max_lsn_wait", 558 "sql_server_snapshot_progress_report_interval", 559 "sql_server_cdc_poll_interval", 560 "sql_server_cdc_cleanup_change_table", 561 "sql_server_cdc_cleanup_change_table_max_deletes", 562 "allow_user_sessions", 563 "with_0dt_deployment_ddl_check_interval", 564 "enable_0dt_caught_up_check", 565 "with_0dt_caught_up_check_allowed_lag", 566 "with_0dt_caught_up_check_cutoff", 567 "enable_0dt_caught_up_replica_status_check", 568 "plan_insights_notice_fast_path_clusters_optimize_duration", 569 "enable_continual_task_builtins", 570 "enable_expression_cache", 571 "mz_metrics_lgalloc_map_refresh_interval", 572 "mz_metrics_lgalloc_refresh_interval", 573 "mz_metrics_rusage_refresh_interval", 574 "compute_peek_response_stash_batch_max_runs", 575 "compute_peek_response_stash_read_batch_size_bytes", 576 "compute_peek_response_stash_read_memory_budget_bytes", 577 "compute_peek_stash_num_batches", 578 "compute_peek_stash_batch_size", 579 "storage_statistics_retention_duration", 580 "enable_paused_cluster_readhold_downgrade", 581 "enable_builtin_migration_schema_evolution", 582] 583 584 585DEFAULT_CRDB_ENVIRONMENT = [ 586 "COCKROACH_ENGINE_MAX_SYNC_DURATION_DEFAULT=120s", 587 "COCKROACH_LOG_MAX_SYNC_DURATION=120s", 588] 589 590 591# TODO(benesch): change to `docker-mzcompose` once v0.39 ships. 592DEFAULT_CLOUD_PROVIDER = "mzcompose" 593DEFAULT_CLOUD_REGION = "us-east-1" 594DEFAULT_ORG_ID = "00000000-0000-0000-0000-000000000000" 595DEFAULT_ORDINAL = "0" 596DEFAULT_MZ_ENVIRONMENT_ID = f"{DEFAULT_CLOUD_PROVIDER}-{DEFAULT_CLOUD_REGION}-{DEFAULT_ORG_ID}-{DEFAULT_ORDINAL}" 597 598 599# TODO(benesch): replace with Docker health checks. 600def _check_tcp( 601 cmd: list[str], host: str, port: int, timeout_secs: int, kind: str = "" 602) -> list[str]: 603 cmd.extend( 604 [ 605 "timeout", 606 str(timeout_secs), 607 "bash", 608 "-c", 609 f"until [ cat < /dev/null > /dev/tcp/{host}/{port} ] ; do sleep 0.1 ; done", 610 ] 611 ) 612 try: 613 spawn.capture(cmd, stderr=subprocess.STDOUT) 614 except subprocess.CalledProcessError as e: 615 ui.log_in_automation( 616 "wait-for-tcp ({}{}:{}): error running {}: {}, stdout:\n{}\nstderr:\n{}".format( 617 kind, host, port, ui.shell_quote(cmd), e, e.stdout, e.stderr 618 ) 619 ) 620 raise 621 return cmd 622 623 624# TODO(benesch): replace with Docker health checks. 625def _wait_for_pg( 626 timeout_secs: int, 627 query: str, 628 dbname: str, 629 port: int, 630 host: str, 631 user: str, 632 password: str | None, 633 expected: Iterable[Any] | Literal["any"], 634 print_result: bool = False, 635 sslmode: str = "disable", 636) -> None: 637 """Wait for a pg-compatible database (includes materialized)""" 638 obfuscated_password = password[0:1] if password is not None else "" 639 args = f"dbname={dbname} host={host} port={port} user={user} password='{obfuscated_password}...'" 640 ui.progress(f"waiting for {args} to handle {query!r}", "C") 641 error = None 642 for remaining in ui.timeout_loop(timeout_secs, tick=0.5): 643 try: 644 conn = psycopg.connect( 645 dbname=dbname, 646 host=host, 647 port=port, 648 user=user, 649 password=password, 650 connect_timeout=1, 651 sslmode=sslmode, 652 ) 653 # The default (autocommit = false) wraps everything in a transaction. 654 conn.autocommit = True 655 with conn.cursor() as cur: 656 cur.execute(query.encode()) 657 if expected == "any" and cur.rowcount == -1: 658 ui.progress(" success!", finish=True) 659 return 660 result = list(cur.fetchall()) 661 if expected == "any" or result == expected: 662 if print_result: 663 say(f"query result: {result}") 664 else: 665 ui.progress(" success!", finish=True) 666 return 667 else: 668 say( 669 f"host={host} port={port} did not return rows matching {expected} got: {result}" 670 ) 671 except Exception as e: 672 ui.progress(f"{e if print_result else ''} {int(remaining)}") 673 error = e 674 ui.progress(finish=True) 675 raise UIError(f"never got correct result for {args}: {error}") 676 677 678def bootstrap_cluster_replica_size() -> str: 679 return "bootstrap" 680 681 682def cluster_replica_size_map() -> dict[str, dict[str, Any]]: 683 """scale=<n>,workers=<n>[,mem=<n>GiB][,legacy]""" 684 685 def replica_size( 686 scale: int, 687 workers: int, 688 disabled: bool = False, 689 is_cc: bool = True, 690 memory_limit: str = "4 GiB", 691 ) -> dict[str, Any]: 692 return { 693 "cpu_exclusive": False, 694 "cpu_limit": None, 695 "credits_per_hour": f"{workers * scale}", 696 "disabled": disabled, 697 "disk_limit": None, 698 "is_cc": is_cc, 699 "memory_limit": memory_limit, 700 "scale": scale, 701 "workers": workers, 702 # "selectors": {}, 703 } 704 705 replica_sizes = { 706 bootstrap_cluster_replica_size(): replica_size(1, 1), 707 "scale=2,workers=4": replica_size(2, 4), 708 "scale=1,workers=1,legacy": replica_size(1, 1, is_cc=False), 709 "scale=1,workers=2,legacy": replica_size(1, 2, is_cc=False), 710 # Intentionally not following the naming scheme 711 "free": replica_size(0, 0, disabled=True), 712 } 713 714 for i in range(0, 6): 715 workers = 1 << i 716 replica_sizes[f"scale=1,workers={workers}"] = replica_size(1, workers) 717 for mem in [4, 8, 16, 32]: 718 replica_sizes[f"scale=1,workers={workers},mem={mem}GiB"] = replica_size( 719 1, workers, memory_limit=f"{mem} GiB" 720 ) 721 722 replica_sizes[f"scale={workers},workers=1"] = replica_size(workers, 1) 723 replica_sizes[f"scale={workers},workers={workers}"] = replica_size( 724 workers, workers 725 ) 726 replica_sizes[f"scale=1,workers={workers},mem={workers}GiB"] = replica_size( 727 1, workers, memory_limit=f"{workers} GiB" 728 ) 729 730 return replica_sizes
def
say(msg: str) -> None:
55 def say(msg: str) -> None: 56 if not Verbosity.quiet: 57 print(f"{prefix}{msg}", file=sys.stderr)
The type of the None singleton.
DEFAULT_CONFLUENT_PLATFORM_VERSION =
'7.9.0'
DEFAULT_MZ_VOLUMES =
['mzdata:/mzdata', 'mydata:/var/lib/mysql-files', 'tmp:/share/tmp', 'scratch:/scratch']
ADDITIONAL_BENCHMARKING_SYSTEM_PARAMETERS =
{'enable_statement_lifecycle_logging': 'false', 'persist_catalog_force_compaction_fuel': '0', 'statement_logging_default_sample_rate': '0', 'statement_logging_max_sample_rate': '0', 'persist_blob_cache_mem_limit_bytes': '1048576', 'persist_blob_cache_scale_with_threads': 'false', 'enable_compute_peek_response_stash': 'false'}
def
get_minimal_system_parameters(version: materialize.mz_version.MzVersion) -> dict[str, str]:
67def get_minimal_system_parameters( 68 version: MzVersion, 69) -> dict[str, str]: 70 """Settings we need in order to have tests run at all, but otherwise stay 71 with the defaults: not changing performance or increasing coverage.""" 72 73 config = { 74 # ----- 75 # Unsafe functions 76 "unsafe_enable_unsafe_functions": "true", 77 # ----- 78 # Others (ordered by name) 79 "allow_real_time_recency": "true", 80 "constraint_based_timestamp_selection": "verify", 81 "enable_compute_peek_response_stash": "true", 82 "enable_0dt_deployment_panic_after_timeout": "true", 83 "enable_0dt_deployment_sources": ( 84 "true" if version >= MzVersion.parse_mz("v0.132.0-dev") else "false" 85 ), 86 "enable_alter_swap": "true", 87 "enable_columnar_lgalloc": "false", 88 "enable_columnation_lgalloc": "false", 89 "enable_compute_correction_v2": "true", 90 "enable_compute_logical_backpressure": "true", 91 "enable_connection_validation_syntax": "true", 92 "enable_continual_task_create": "true", 93 "enable_continual_task_retain": "true", 94 "enable_continual_task_transform": "true", 95 "enable_copy_to_expr": "true", 96 "enable_create_table_from_source": "true", 97 "enable_eager_delta_joins": "true", 98 "enable_envelope_debezium_in_subscribe": "true", 99 "enable_expressions_in_limit_syntax": "true", 100 "enable_introspection_subscribes": "true", 101 "enable_kafka_sink_partition_by": "true", 102 "enable_lgalloc": "false", 103 "enable_load_generator_counter": "true", 104 "enable_logical_compaction_window": "true", 105 "enable_multi_worker_storage_persist_sink": "true", 106 "enable_multi_replica_sources": "true", 107 "enable_rbac_checks": "true", 108 "enable_reduce_mfp_fusion": "true", 109 "enable_refresh_every_mvs": "true", 110 "enable_cluster_schedule_refresh": "true", 111 "enable_sql_server_source": "true", 112 "enable_statement_lifecycle_logging": "true", 113 "enable_compute_temporal_bucketing": "true", 114 "enable_variadic_left_join_lowering": "true", 115 "enable_worker_core_affinity": "true", 116 "grpc_client_http2_keep_alive_timeout": "5s", 117 "ore_overflowing_behavior": "panic", 118 "unsafe_enable_table_keys": "true", 119 "with_0dt_deployment_max_wait": "1800s", 120 # End of list (ordered by name) 121 } 122 123 if version < MzVersion.parse_mz("v0.163.0-dev"): 124 config["enable_compute_active_dataflow_cancelation"] = "true" 125 126 return config
Settings we need in order to have tests run at all, but otherwise stay with the defaults: not changing performance or increasing coverage.
@dataclass
class
VariableSystemParameter:
def
get_variable_system_parameters( version: materialize.mz_version.MzVersion, force_source_table_syntax: bool) -> list[VariableSystemParameter]:
137def get_variable_system_parameters( 138 version: MzVersion, 139 force_source_table_syntax: bool, 140) -> list[VariableSystemParameter]: 141 return [ 142 # ----- 143 # To reduce CRDB load as we are struggling with it in CI (values based on load test environment): 144 VariableSystemParameter( 145 "persist_next_listen_batch_retryer_clamp", 146 "16s", 147 ["100ms", "1s", "10s", "100s"], 148 ), 149 VariableSystemParameter( 150 "persist_next_listen_batch_retryer_initial_backoff", 151 "100ms", 152 ["10ms", "100ms", "1s", "10s"], 153 ), 154 VariableSystemParameter( 155 "persist_next_listen_batch_retryer_fixed_sleep", 156 "1200ms", 157 ["100ms", "1s", "10s"], 158 ), 159 # ----- 160 # Persist internals changes, advance coverage 161 VariableSystemParameter( 162 "persist_enable_arrow_lgalloc_noncc_sizes", "true", ["true", "false"] 163 ), 164 VariableSystemParameter( 165 "persist_enable_s3_lgalloc_noncc_sizes", "true", ["true", "false"] 166 ), 167 # ----- 168 # Others (ordered by name), 169 VariableSystemParameter( 170 "compute_dataflow_max_inflight_bytes", 171 "134217728", 172 ["1048576", "4194304", "16777216", "67108864"], 173 ), # 128 MiB 174 VariableSystemParameter("compute_hydration_concurrency", "2", ["1", "2", "4"]), 175 VariableSystemParameter( 176 "compute_replica_expiration_offset", "3d", ["3d", "10d"] 177 ), 178 VariableSystemParameter( 179 "compute_apply_column_demands", "true", ["true", "false"] 180 ), 181 VariableSystemParameter( 182 "compute_peek_response_stash_threshold_bytes", 183 # 1 MiB, an in-between value 184 "1048576", 185 # force-enabled, the in-between, and the production value 186 ["0", "1048576", "314572800", "67108864"], 187 ), 188 VariableSystemParameter( 189 "enable_password_auth", 190 "true", 191 ["true", "false"], 192 ), 193 VariableSystemParameter( 194 "kafka_default_metadata_fetch_interval", 195 "1s", 196 ["100ms", "1s"], 197 ), 198 VariableSystemParameter("mysql_offset_known_interval", "1s", ["100ms", "1s"]), 199 VariableSystemParameter( 200 "force_source_table_syntax", 201 "true" if force_source_table_syntax else "false", 202 ["true", "false"] if force_source_table_syntax else ["false"], 203 ), 204 VariableSystemParameter( 205 "persist_batch_columnar_format", 206 "structured" if version > MzVersion.parse_mz("v0.135.0-dev") else "both_v2", 207 ["row", "both_v2", "both", "structured"], 208 ), 209 VariableSystemParameter( 210 "persist_batch_delete_enabled", "true", ["true", "false"] 211 ), 212 VariableSystemParameter( 213 "persist_batch_structured_order", "true", ["true", "false"] 214 ), 215 VariableSystemParameter( 216 "persist_batch_builder_structured", "true", ["true", "false"] 217 ), 218 VariableSystemParameter( 219 "persist_batch_structured_key_lower_len", 220 "256", 221 ["0", "1", "512", "1000"], 222 ), 223 VariableSystemParameter( 224 "persist_batch_max_run_len", "4", ["2", "3", "4", "16"] 225 ), 226 VariableSystemParameter( 227 "persist_catalog_force_compaction_fuel", 228 "1024", 229 ["256", "1024", "4096"], 230 ), 231 VariableSystemParameter( 232 "persist_catalog_force_compaction_wait", 233 "1s", 234 ["100ms", "1s", "10s"], 235 ), 236 VariableSystemParameter( 237 "persist_encoding_enable_dictionary", "true", ["true", "false"] 238 ), 239 VariableSystemParameter( 240 "persist_stats_audit_percent", 241 "100", 242 [ 243 "0", 244 "1", 245 "2", 246 "10", 247 "100", 248 ], 249 ), 250 VariableSystemParameter("persist_stats_audit_panic", "true", ["true", "false"]), 251 VariableSystemParameter( 252 "persist_encoding_enable_dictionary", "true", ["true", "false"] 253 ), 254 VariableSystemParameter( 255 "persist_fast_path_limit", 256 "1000", 257 ["100", "1000", "10000"], 258 ), 259 VariableSystemParameter("persist_fast_path_order", "true", ["true", "false"]), 260 VariableSystemParameter( 261 "persist_gc_use_active_gc", 262 ("true" if version > MzVersion.parse_mz("v0.143.0-dev") else "false"), 263 ( 264 ["true", "false"] 265 if version > MzVersion.parse_mz("v0.127.0-dev") 266 else ["false"] 267 ), 268 ), 269 VariableSystemParameter( 270 "persist_gc_min_versions", 271 "16", 272 ["16", "256", "1024"], 273 ), 274 VariableSystemParameter( 275 "persist_gc_max_versions", 276 "128000", 277 ["256", "128000"], 278 ), 279 VariableSystemParameter( 280 "persist_inline_writes_single_max_bytes", 281 "4096", 282 ["256", "1024", "4096", "16384"], 283 ), 284 VariableSystemParameter( 285 "persist_inline_writes_total_max_bytes", 286 "1048576", 287 ["65536", "262144", "1048576", "4194304"], 288 ), 289 VariableSystemParameter( 290 "persist_pubsub_client_enabled", "true", ["true", "false"] 291 ), 292 VariableSystemParameter( 293 "persist_pubsub_push_diff_enabled", "true", ["true", "false"] 294 ), 295 VariableSystemParameter( 296 "persist_record_compactions", "true", ["true", "false"] 297 ), 298 VariableSystemParameter( 299 "persist_record_schema_id", 300 ("true" if version > MzVersion.parse_mz("v0.127.0-dev") else "false"), 301 ( 302 ["true", "false"] 303 if version > MzVersion.parse_mz("v0.127.0-dev") 304 else ["false"] 305 ), 306 ), 307 VariableSystemParameter( 308 "persist_rollup_use_active_rollup", 309 ("true" if version > MzVersion.parse_mz("v0.143.0-dev") else "false"), 310 ( 311 ["true", "false"] 312 if version > MzVersion.parse_mz("v0.127.0-dev") 313 else ["false"] 314 ), 315 ), 316 # 16 MiB - large enough to avoid a big perf hit, small enough to get more coverage... 317 VariableSystemParameter( 318 "persist_blob_target_size", 319 "16777216", 320 ["4096", "1048576", "16777216", "134217728"], 321 ), 322 # 5 times the default part size - 4 is the bare minimum. 323 VariableSystemParameter( 324 "persist_compaction_memory_bound_bytes", 325 "83886080", 326 ["67108864", "134217728", "536870912", "1073741824"], 327 ), 328 VariableSystemParameter( 329 "persist_enable_incremental_compaction", 330 ("true" if version >= MzVersion.parse_mz("v0.161.0-dev") else "false"), 331 ( 332 ["true", "false"] 333 if version >= MzVersion.parse_mz("v0.161.0-dev") 334 else ["false"] 335 ), 336 ), 337 VariableSystemParameter( 338 "persist_use_critical_since_catalog", "true", ["true", "false"] 339 ), 340 VariableSystemParameter( 341 "persist_use_critical_since_snapshot", 342 "false", # always false, because we always have zero-downtime enabled 343 ["false"], 344 ), 345 VariableSystemParameter( 346 "persist_use_critical_since_source", 347 "false", # always false, because we always have zero-downtime enabled 348 ["false"], 349 ), 350 VariableSystemParameter( 351 "persist_part_decode_format", "arrow", ["arrow", "row_with_validate"] 352 ), 353 VariableSystemParameter( 354 "persist_blob_cache_scale_with_threads", "true", ["true", "false"] 355 ), 356 VariableSystemParameter( 357 "persist_validate_part_bounds_on_read", "true", ["true", "false"] 358 ), 359 VariableSystemParameter( 360 "persist_validate_part_bounds_on_write", "true", ["true", "false"] 361 ), 362 VariableSystemParameter("pg_offset_known_interval", "1s", ["100ms", "1s"]), 363 VariableSystemParameter( 364 "statement_logging_default_sample_rate", "0.01", ["0", "0.01"] 365 ), 366 VariableSystemParameter( 367 "statement_logging_max_sample_rate", "0.01", ["0", "0.01"] 368 ), 369 VariableSystemParameter("storage_reclock_to_latest", "true", ["true", "false"]), 370 VariableSystemParameter( 371 "storage_source_decode_fuel", 372 "100000", 373 ["10000", "100000", "1000000"], 374 ), 375 VariableSystemParameter( 376 "storage_statistics_collection_interval", 377 "1000", 378 ["100", "1000", "10000"], 379 ), 380 VariableSystemParameter( 381 "storage_statistics_interval", "2000", ["100", "1000", "10000"] 382 ), 383 VariableSystemParameter( 384 "storage_use_continual_feedback_upsert", "true", ["true", "false"] 385 ), 386 VariableSystemParameter( 387 "sql_server_offset_known_interval", "1s", ["10ms", "100ms", "1s"] 388 ), 389 # End of list (ordered by name) 390 ]
def
get_default_system_parameters( version: materialize.mz_version.MzVersion | None = None, force_source_table_syntax: bool = False) -> dict[str, str]:
393def get_default_system_parameters( 394 version: MzVersion | None = None, 395 force_source_table_syntax: bool = False, 396) -> dict[str, str]: 397 """For upgrade tests we only want parameters set when all environmentd / 398 clusterd processes have reached a specific version (or higher) 399 """ 400 401 if not version: 402 version = MzVersion.parse_cargo() 403 404 params = get_minimal_system_parameters(version) 405 406 system_param_setting = os.getenv("CI_SYSTEM_PARAMETERS", "") 407 variable_params = get_variable_system_parameters(version, force_source_table_syntax) 408 409 if system_param_setting == "": 410 for param in variable_params: 411 params[param.key] = param.default 412 elif system_param_setting == "random": 413 seed = os.getenv("CI_SYSTEM_PARAMETERS_SEED", os.getenv("BUILDKITE_JOB_ID", 1)) 414 rng = random.Random(seed) 415 for param in variable_params: 416 params[param.key] = rng.choice(param.values) 417 print( 418 f"System parameters with seed CI_SYSTEM_PARAMETERS_SEED={seed}: {params}", 419 file=sys.stderr, 420 ) 421 elif system_param_setting == "minimal": 422 pass 423 else: 424 raise ValueError( 425 f"Unknown value for CI_SYSTEM_PARAMETERS: {system_param_setting}" 426 ) 427 428 return params
For upgrade tests we only want parameters set when all environmentd / clusterd processes have reached a specific version (or higher)
UNINTERESTING_SYSTEM_PARAMETERS =
['enable_mz_join_core', 'linear_join_yielding', 'enable_lgalloc_eager_reclamation', 'lgalloc_background_interval', 'lgalloc_file_growth_dampener', 'lgalloc_local_buffer_bytes', 'lgalloc_slow_clear_bytes', 'memory_limiter_interval', 'memory_limiter_usage_bias', 'memory_limiter_burst_factor', 'compute_server_maintenance_interval', 'compute_dataflow_max_inflight_bytes_cc', 'compute_flat_map_fuel', 'compute_temporal_bucketing_summary', 'consolidating_vec_growth_dampener', 'copy_to_s3_parquet_row_group_file_ratio', 'copy_to_s3_arrow_builder_buffer_ratio', 'copy_to_s3_multipart_part_size_bytes', 'enable_compute_replica_expiration', 'enable_compute_render_fueled_as_specific_collection', 'compute_logical_backpressure_max_retained_capabilities', 'compute_logical_backpressure_inflight_slack', 'persist_fetch_semaphore_cost_adjustment', 'persist_fetch_semaphore_permit_adjustment', 'persist_optimize_ignored_data_fetch', 'persist_pubsub_same_process_delegate_enabled', 'persist_pubsub_connect_attempt_timeout', 'persist_pubsub_request_timeout', 'persist_pubsub_connect_max_backoff', 'persist_pubsub_client_sender_channel_size', 'persist_pubsub_client_receiver_channel_size', 'persist_pubsub_server_connection_channel_size', 'persist_pubsub_state_cache_shard_ref_channel_size', 'persist_pubsub_reconnect_backoff', 'persist_encoding_compression_format', 'persist_batch_max_runs', 'persist_write_combine_inline_writes', 'persist_reader_lease_duration', 'persist_consensus_connection_pool_max_size', 'persist_consensus_connection_pool_max_wait', 'persist_consensus_connection_pool_ttl', 'persist_consensus_connection_pool_ttl_stagger', 'crdb_connect_timeout', 'crdb_tcp_user_timeout', 'persist_use_critical_since_txn', 'use_global_txn_cache_source', 'persist_batch_builder_max_outstanding_parts', 'persist_compaction_heuristic_min_inputs', 'persist_compaction_heuristic_min_parts', 'persist_compaction_heuristic_min_updates', 'persist_gc_blob_delete_concurrency_limit', 'persist_state_versions_recent_live_diffs_limit', 'persist_usage_state_fetch_concurrency_limit', 'persist_blob_operation_timeout', 'persist_blob_operation_attempt_timeout', 'persist_blob_connect_timeout', 'persist_blob_read_timeout', 'persist_stats_collection_enabled', 'persist_stats_filter_enabled', 'persist_stats_budget_bytes', 'persist_stats_untrimmable_columns_equals', 'persist_stats_untrimmable_columns_prefix', 'persist_stats_untrimmable_columns_suffix', 'persist_expression_cache_force_compaction_fuel', 'persist_expression_cache_force_compaction_wait', 'persist_blob_cache_mem_limit_bytes', 'persist_blob_cache_scale_factor_bytes', 'persist_claim_unclaimed_compactions', 'persist_claim_compaction_percent', 'persist_claim_compaction_min_version', 'persist_next_listen_batch_retryer_multiplier', 'persist_rollup_threshold', 'persist_rollup_fallback_threshold_ms', 'persist_gc_fallback_threshold_ms', 'persist_compaction_minimum_timeout', 'persist_compaction_use_most_recent_schema', 'persist_compaction_check_process_flag', 'balancerd_sigterm_connection_wait', 'balancerd_sigterm_listen_wait', 'balancerd_inject_proxy_protocol_header_http', 'balancerd_log_filter', 'balancerd_opentelemetry_filter', 'balancerd_log_filter_defaults', 'balancerd_opentelemetry_filter_defaults', 'balancerd_sentry_filters', 'persist_enable_s3_lgalloc_cc_sizes', 'persist_enable_arrow_lgalloc_cc_sizes', 'controller_past_generation_replica_cleanup_retry_interval', 'wallclock_lag_recording_interval', 'wallclock_lag_histogram_period_interval', 'enable_timely_zero_copy', 'enable_timely_zero_copy_lgalloc', 'timely_zero_copy_limit', 'arrangement_exert_proportionality', 'txn_wal_apply_ensure_schema_match', 'persist_txns_data_shard_retryer_initial_backoff', 'persist_txns_data_shard_retryer_multiplier', 'persist_txns_data_shard_retryer_clamp', 'storage_cluster_shutdown_grace_period', 'storage_dataflow_delay_sources_past_rehydration', 'storage_dataflow_suspendable_sources', 'storage_downgrade_since_during_finalization', 'replica_metrics_history_retention_interval', 'wallclock_lag_history_retention_interval', 'wallclock_global_lag_histogram_retention_interval', 'kafka_client_id_enrichment_rules', 'kafka_poll_max_wait', 'kafka_default_aws_privatelink_endpoint_identification_algorithm', 'kafka_buffered_event_resize_threshold_elements', 'mysql_replication_heartbeat_interval', 'postgres_fetch_slot_resume_lsn_interval', 'pg_schema_validation_interval', 'storage_enforce_external_addresses', 'storage_upsert_prevent_snapshot_buffering', 'storage_rocksdb_use_merge_operator', 'storage_upsert_max_snapshot_batch_buffering', 'storage_rocksdb_cleanup_tries', 'storage_suspend_and_restart_delay', 'storage_server_maintenance_interval', 'storage_sink_progress_search', 'storage_sink_ensure_topic_config', 'sql_server_max_lsn_wait', 'sql_server_snapshot_progress_report_interval', 'sql_server_cdc_poll_interval', 'sql_server_cdc_cleanup_change_table', 'sql_server_cdc_cleanup_change_table_max_deletes', 'allow_user_sessions', 'with_0dt_deployment_ddl_check_interval', 'enable_0dt_caught_up_check', 'with_0dt_caught_up_check_allowed_lag', 'with_0dt_caught_up_check_cutoff', 'enable_0dt_caught_up_replica_status_check', 'plan_insights_notice_fast_path_clusters_optimize_duration', 'enable_continual_task_builtins', 'enable_expression_cache', 'mz_metrics_lgalloc_map_refresh_interval', 'mz_metrics_lgalloc_refresh_interval', 'mz_metrics_rusage_refresh_interval', 'compute_peek_response_stash_batch_max_runs', 'compute_peek_response_stash_read_batch_size_bytes', 'compute_peek_response_stash_read_memory_budget_bytes', 'compute_peek_stash_num_batches', 'compute_peek_stash_batch_size', 'storage_statistics_retention_duration', 'enable_paused_cluster_readhold_downgrade', 'enable_builtin_migration_schema_evolution']
DEFAULT_CRDB_ENVIRONMENT =
['COCKROACH_ENGINE_MAX_SYNC_DURATION_DEFAULT=120s', 'COCKROACH_LOG_MAX_SYNC_DURATION=120s']
DEFAULT_CLOUD_PROVIDER =
'mzcompose'
DEFAULT_CLOUD_REGION =
'us-east-1'
DEFAULT_ORG_ID =
'00000000-0000-0000-0000-000000000000'
DEFAULT_ORDINAL =
'0'
DEFAULT_MZ_ENVIRONMENT_ID =
'mzcompose-us-east-1-00000000-0000-0000-0000-000000000000-0'
def
bootstrap_cluster_replica_size() -> str:
def
cluster_replica_size_map() -> dict[str, dict[str, typing.Any]]:
683def cluster_replica_size_map() -> dict[str, dict[str, Any]]: 684 """scale=<n>,workers=<n>[,mem=<n>GiB][,legacy]""" 685 686 def replica_size( 687 scale: int, 688 workers: int, 689 disabled: bool = False, 690 is_cc: bool = True, 691 memory_limit: str = "4 GiB", 692 ) -> dict[str, Any]: 693 return { 694 "cpu_exclusive": False, 695 "cpu_limit": None, 696 "credits_per_hour": f"{workers * scale}", 697 "disabled": disabled, 698 "disk_limit": None, 699 "is_cc": is_cc, 700 "memory_limit": memory_limit, 701 "scale": scale, 702 "workers": workers, 703 # "selectors": {}, 704 } 705 706 replica_sizes = { 707 bootstrap_cluster_replica_size(): replica_size(1, 1), 708 "scale=2,workers=4": replica_size(2, 4), 709 "scale=1,workers=1,legacy": replica_size(1, 1, is_cc=False), 710 "scale=1,workers=2,legacy": replica_size(1, 2, is_cc=False), 711 # Intentionally not following the naming scheme 712 "free": replica_size(0, 0, disabled=True), 713 } 714 715 for i in range(0, 6): 716 workers = 1 << i 717 replica_sizes[f"scale=1,workers={workers}"] = replica_size(1, workers) 718 for mem in [4, 8, 16, 32]: 719 replica_sizes[f"scale=1,workers={workers},mem={mem}GiB"] = replica_size( 720 1, workers, memory_limit=f"{mem} GiB" 721 ) 722 723 replica_sizes[f"scale={workers},workers=1"] = replica_size(workers, 1) 724 replica_sizes[f"scale={workers},workers={workers}"] = replica_size( 725 workers, workers 726 ) 727 replica_sizes[f"scale=1,workers={workers},mem={workers}GiB"] = replica_size( 728 1, workers, memory_limit=f"{workers} GiB" 729 ) 730 731 return replica_sizes
scale=