1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702
// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.
#![allow(missing_docs)]
//! The tunable knobs for persist.
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use mz_build_info::BuildInfo;
use mz_dyncfg::{Config, ConfigSet, ConfigType, ConfigUpdates};
use mz_ore::instrument;
use mz_ore::now::NowFn;
use mz_persist::cfg::BlobKnobs;
use mz_persist::retry::Retry;
use mz_postgres_client::PostgresClientKnobs;
use proptest_derive::Arbitrary;
use semver::Version;
use serde::{Deserialize, Serialize};
use tokio::sync::watch;
use crate::internal::machine::{
NEXT_LISTEN_BATCH_RETRYER_CLAMP, NEXT_LISTEN_BATCH_RETRYER_INITIAL_BACKOFF,
NEXT_LISTEN_BATCH_RETRYER_MULTIPLIER,
};
use crate::internal::state::ROLLUP_THRESHOLD;
use crate::operators::{
PERSIST_SINK_MINIMUM_BATCH_UPDATES, STORAGE_PERSIST_SINK_MINIMUM_BATCH_UPDATES,
STORAGE_SOURCE_DECODE_FUEL,
};
use crate::project::OPTIMIZE_IGNORED_DATA_DECODE;
use crate::read::READER_LEASE_DURATION;
/// The tunable knobs for persist.
///
/// Tuning inputs:
/// - A larger blob_target_size (capped at KEY_VAL_DATA_MAX_LEN) results in
/// fewer entries in consensus state. Before we have compaction and/or
/// incremental state, it is already growing without bound, so this is a
/// concern. OTOH, for any "reasonable" size (> 100MiB?) of blob_target_size,
/// it seems we'd end up with a pretty tremendous amount of data in the shard
/// before this became a real issue.
/// - A larger blob_target_size will results in fewer s3 operations, which are
/// charged per operation. (Hmm, maybe not if we're charged per call in a
/// multipart op. The S3Blob impl already chunks things at 8MiB.)
/// - A smaller blob_target_size will result in more even memory usage in
/// readers.
/// - A larger batch_builder_max_outstanding_parts increases throughput (to a
/// point).
/// - A smaller batch_builder_max_outstanding_parts provides a bound on the
/// amount of memory used by a writer.
/// - A larger compaction_heuristic_min_inputs means state size is larger.
/// - A smaller compaction_heuristic_min_inputs means more compactions happen
/// (higher write amp).
/// - A larger compaction_heuristic_min_updates means more consolidations are
/// discovered while reading a snapshot (higher read amp and higher space
/// amp).
/// - A smaller compaction_heuristic_min_updates means more compactions happen
/// (higher write amp).
///
/// Tuning logic:
/// - blob_target_size was initially selected to be an exact multiple of 8MiB
/// (the s3 multipart size) that was in the same neighborhood as our initial
/// max throughput (~250MiB).
/// - batch_builder_max_outstanding_parts was initially selected to be as small
/// as possible without harming pipelining. 0 means no pipelining, 1 is full
/// pipelining as long as generating data takes less time than writing to s3
/// (hopefully a fair assumption), 2 is a little extra slop on top of 1.
/// - compaction_heuristic_min_inputs was set by running the open-loop benchmark
/// with batches of size 10,240 bytes (selected to be small but such that the
/// overhead of our columnar encoding format was less than 10%) and manually
/// increased until the write amp stopped going down. This becomes much less
/// important once we have incremental state. The initial value is a
/// placeholder and should be revisited at some point.
/// - compaction_heuristic_min_updates was set via a thought experiment. This is
/// an `O(n*log(n))` upper bound on the number of unconsolidated updates that
/// would be consolidated if we compacted as the in-mem Spine does. The
/// initial value is a placeholder and should be revisited at some point.
///
/// TODO: Move these tuning notes into SessionVar descriptions once we have
/// SystemVars for most of these.
//
// TODO: The configs left here don't react dynamically to changes. Move as many
// of them to DynamicConfig as possible.
#[derive(Debug, Clone)]
pub struct PersistConfig {
/// Info about which version of the code is running.
pub build_version: Version,
/// Hostname of this persist user. Stored in state and used for debugging.
pub hostname: String,
/// Whether this persist instance is running in a "cc" sized cluster.
pub is_cc_active: bool,
/// Memory limit of the process, if known.
pub announce_memory_limit: Option<usize>,
/// A clock to use for all leasing and other non-debugging use.
pub now: NowFn,
/// Persist [Config]s that can change value dynamically within the lifetime
/// of a process.
///
/// TODO(cfg): Entirely replace dynamic with this.
pub configs: Arc<ConfigSet>,
/// Indicates whether `configs` has been synced at least once with an
/// upstream source.
configs_synced_once: Arc<watch::Sender<bool>>,
/// Configurations that can be dynamically updated.
pub dynamic: Arc<DynamicConfig>,
/// Whether to physically and logically compact batches in blob storage.
pub compaction_enabled: bool,
/// In Compactor::compact_and_apply_background, the maximum number of concurrent
/// compaction requests that can execute for a given shard.
pub compaction_concurrency_limit: usize,
/// In Compactor::compact_and_apply_background, the maximum number of pending
/// compaction requests to queue.
pub compaction_queue_size: usize,
/// In Compactor::compact_and_apply_background, how many updates to encode or
/// decode before voluntarily yielding the task.
pub compaction_yield_after_n_updates: usize,
/// The maximum size of the connection pool to Postgres/CRDB when performing
/// consensus reads and writes.
pub consensus_connection_pool_max_size: usize,
/// The maximum time to wait when attempting to obtain a connection from the pool.
pub consensus_connection_pool_max_wait: Option<Duration>,
/// Length of time after a writer's last operation after which the writer
/// may be expired.
pub writer_lease_duration: Duration,
/// Length of time between critical handles' calls to downgrade since
pub critical_downgrade_interval: Duration,
/// Timeout per connection attempt to Persist PubSub service.
pub pubsub_connect_attempt_timeout: Duration,
/// Timeout per request attempt to Persist PubSub service.
pub pubsub_request_timeout: Duration,
/// Maximum backoff when retrying connection establishment to Persist PubSub service.
pub pubsub_connect_max_backoff: Duration,
/// Size of channel used to buffer send messages to PubSub service.
pub pubsub_client_sender_channel_size: usize,
/// Size of channel used to buffer received messages from PubSub service.
pub pubsub_client_receiver_channel_size: usize,
/// Size of channel used per connection to buffer broadcasted messages from PubSub server.
pub pubsub_server_connection_channel_size: usize,
/// Size of channel used by the state cache to broadcast shard state references.
pub pubsub_state_cache_shard_ref_channel_size: usize,
/// Backoff after an established connection to Persist PubSub service fails.
pub pubsub_reconnect_backoff: Duration,
/// Number of worker threads to create for the [`crate::IsolatedRuntime`], defaults to the
/// number of threads.
pub isolated_runtime_worker_threads: usize,
}
// Impl Deref to ConfigSet for convenience of accessing the dynamic configs.
impl std::ops::Deref for PersistConfig {
type Target = ConfigSet;
fn deref(&self) -> &Self::Target {
&self.configs
}
}
impl PersistConfig {
/// Returns a new instance of [PersistConfig] with default tuning and
/// default ConfigSet.
pub fn new_default_configs(build_info: &BuildInfo, now: NowFn) -> Self {
Self::new(build_info, now, all_dyncfgs(ConfigSet::default()))
}
/// Returns a new instance of [PersistConfig] with default tuning and the
/// specified ConfigSet.
pub fn new(build_info: &BuildInfo, now: NowFn, configs: ConfigSet) -> Self {
// Escape hatch in case we need to disable compaction.
let compaction_disabled = mz_ore::env::is_var_truthy("MZ_PERSIST_COMPACTION_DISABLED");
// We create receivers on demand, so we drop the initial receiver.
let (configs_synced_once, _) = watch::channel(false);
Self {
build_version: build_info.semver_version(),
is_cc_active: false,
announce_memory_limit: None,
now,
configs: Arc::new(configs),
configs_synced_once: Arc::new(configs_synced_once),
dynamic: Arc::new(DynamicConfig {
batch_builder_max_outstanding_parts: AtomicUsize::new(2),
compaction_heuristic_min_inputs: AtomicUsize::new(8),
compaction_heuristic_min_parts: AtomicUsize::new(8),
compaction_heuristic_min_updates: AtomicUsize::new(1024),
compaction_memory_bound_bytes: AtomicUsize::new(1024 * MiB),
gc_blob_delete_concurrency_limit: AtomicUsize::new(32),
state_versions_recent_live_diffs_limit: AtomicUsize::new(
30 * ROLLUP_THRESHOLD.default(),
),
usage_state_fetch_concurrency_limit: AtomicUsize::new(8),
}),
compaction_enabled: !compaction_disabled,
compaction_concurrency_limit: 5,
compaction_queue_size: 20,
compaction_yield_after_n_updates: 100_000,
consensus_connection_pool_max_size: 50,
consensus_connection_pool_max_wait: Some(Duration::from_secs(60)),
writer_lease_duration: 60 * Duration::from_secs(60),
critical_downgrade_interval: Duration::from_secs(30),
pubsub_connect_attempt_timeout: Duration::from_secs(5),
pubsub_request_timeout: Duration::from_secs(5),
pubsub_connect_max_backoff: Duration::from_secs(60),
pubsub_client_sender_channel_size: 25,
pubsub_client_receiver_channel_size: 25,
pubsub_server_connection_channel_size: 25,
pubsub_state_cache_shard_ref_channel_size: 25,
pubsub_reconnect_backoff: Duration::from_secs(5),
isolated_runtime_worker_threads: num_cpus::get(),
// TODO: This doesn't work with the process orchestrator. Instead,
// separate --log-prefix into --service-name and --enable-log-prefix
// options, where the first is always provided and the second is
// conditionally enabled by the process orchestrator.
hostname: std::env::var("HOSTNAME").unwrap_or_else(|_| "unknown".to_owned()),
}
}
pub(crate) fn set_config<T: ConfigType>(&self, cfg: &Config<T>, val: T) {
let mut updates = ConfigUpdates::default();
updates.add(cfg, val);
updates.apply(self)
}
/// Applies the provided updates to this configuration.
///
/// You should prefer calling this method over mutating `self.configs`
/// directly, so that [`Self::configs_synced_once`] can be properly
/// maintained.
pub fn apply_from(&self, updates: &ConfigUpdates) {
updates.apply(&self.configs);
self.configs_synced_once.send_replace(true);
}
/// Resolves when `configs` has been synced at least once with an upstream
/// source, i.e., via [`Self::apply_from`].
///
/// If `configs` has already been synced once at the time the method is
/// called, resolves immediately.
///
/// Useful in conjunction with configuration parameters that cannot be
/// dynamically updated once set (e.g., PubSub).
#[instrument(level = "info")]
pub async fn configs_synced_once(&self) {
self.configs_synced_once
.subscribe()
.wait_for(|synced| *synced)
.await
.expect("we have a borrow on sender so it cannot drop");
}
/// The minimum number of updates that justify writing out a batch in `persist_sink`'s
/// `write_batches` operator. (If there are fewer than this minimum number of updates,
/// they'll be forwarded on to `append_batch` to be combined and written there.)
pub fn sink_minimum_batch_updates(&self) -> usize {
PERSIST_SINK_MINIMUM_BATCH_UPDATES.get(self)
}
/// The same as `Self::sink_minimum_batch_updates`, but
/// for storage `persist_sink`'s.
pub fn storage_sink_minimum_batch_updates(&self) -> usize {
STORAGE_PERSIST_SINK_MINIMUM_BATCH_UPDATES.get(self)
}
/// The maximum amount of work to do in the persist_source mfp_and_decode
/// operator before yielding.
pub fn storage_source_decode_fuel(&self) -> usize {
STORAGE_SOURCE_DECODE_FUEL.get(self)
}
/// CYA to allow opt-out of a performance optimization to skip decoding
/// ignored data.
pub fn optimize_ignored_data_decode(&self) -> bool {
OPTIMIZE_IGNORED_DATA_DECODE.get(self)
}
/// Overrides the value for "persist_reader_lease_duration".
pub fn set_reader_lease_duration(&self, val: Duration) {
self.set_config(&READER_LEASE_DURATION, val);
}
/// Overrides the value for "persist_rollup_threshold".
pub fn set_rollup_threshold(&self, val: usize) {
self.set_config(&ROLLUP_THRESHOLD, val);
}
/// Overrides the value for the "persist_next_listen_batch_retryer_*"
/// configs.
pub fn set_next_listen_batch_retryer(&self, val: RetryParameters) {
self.set_config(
&NEXT_LISTEN_BATCH_RETRYER_INITIAL_BACKOFF,
val.initial_backoff,
);
self.set_config(&NEXT_LISTEN_BATCH_RETRYER_MULTIPLIER, val.multiplier);
self.set_config(&NEXT_LISTEN_BATCH_RETRYER_CLAMP, val.clamp);
}
/// Returns a new instance of [PersistConfig] for tests.
pub fn new_for_tests() -> Self {
use mz_build_info::DUMMY_BUILD_INFO;
use mz_ore::now::SYSTEM_TIME;
let mut cfg = Self::new_default_configs(&DUMMY_BUILD_INFO, SYSTEM_TIME.clone());
cfg.hostname = "tests".into();
cfg
}
}
#[allow(non_upper_case_globals)]
pub(crate) const MiB: usize = 1024 * 1024;
/// Adds the full set of all persist [Config]s.
///
/// TODO(cfg): Consider replacing this with a static global registry powered by
/// something like the `ctor` or `inventory` crate. This would involve managing
/// the footgun of a Config being linked into one binary but not the other.
pub fn all_dyncfgs(configs: ConfigSet) -> ConfigSet {
mz_persist::cfg::all_dyn_configs(configs)
.add(&crate::batch::BATCH_DELETE_ENABLED)
.add(&crate::batch::BATCH_COLUMNAR_FORMAT)
.add(&crate::batch::BATCH_COLUMNAR_FORMAT_PERCENT)
.add(&crate::batch::BATCH_RECORD_PART_FORMAT)
.add(&crate::batch::BLOB_TARGET_SIZE)
.add(&crate::batch::INLINE_WRITES_TOTAL_MAX_BYTES)
.add(&crate::batch::INLINE_WRITES_SINGLE_MAX_BYTES)
.add(&crate::batch::ENCODING_ENABLE_DICTIONARY)
.add(&crate::batch::ENCODING_COMPRESSION_FORMAT)
.add(&crate::batch::RECORD_RUN_META)
.add(&crate::batch::STRUCTURED_ORDER)
.add(&crate::batch::STRUCTURED_KEY_LOWER_LEN)
.add(&crate::cfg::CONSENSUS_CONNECTION_POOL_TTL_STAGGER)
.add(&crate::cfg::CONSENSUS_CONNECTION_POOL_TTL)
.add(&crate::cfg::CRDB_CONNECT_TIMEOUT)
.add(&crate::cfg::CRDB_TCP_USER_TIMEOUT)
.add(&crate::cfg::USE_CRITICAL_SINCE_TXN)
.add(&crate::cfg::USE_CRITICAL_SINCE_CATALOG)
.add(&crate::cfg::USE_CRITICAL_SINCE_SOURCE)
.add(&crate::cfg::USE_CRITICAL_SINCE_SNAPSHOT)
.add(&crate::cfg::USE_GLOBAL_TXN_CACHE_SOURCE)
.add(&crate::cli::admin::CATALOG_FORCE_COMPACTION_FUEL)
.add(&crate::cli::admin::CATALOG_FORCE_COMPACTION_WAIT)
.add(&crate::fetch::FETCH_SEMAPHORE_COST_ADJUSTMENT)
.add(&crate::fetch::FETCH_SEMAPHORE_PERMIT_ADJUSTMENT)
.add(&crate::internal::cache::BLOB_CACHE_MEM_LIMIT_BYTES)
.add(&crate::internal::compact::COMPACTION_MINIMUM_TIMEOUT)
.add(&crate::internal::machine::CLAIM_UNCLAIMED_COMPACTIONS)
.add(&crate::internal::machine::CLAIM_COMPACTION_PERCENT)
.add(&crate::internal::machine::CLAIM_COMPACTION_MIN_VERSION)
.add(&crate::internal::machine::NEXT_LISTEN_BATCH_RETRYER_CLAMP)
.add(&crate::internal::machine::NEXT_LISTEN_BATCH_RETRYER_FIXED_SLEEP)
.add(&crate::internal::machine::NEXT_LISTEN_BATCH_RETRYER_INITIAL_BACKOFF)
.add(&crate::internal::machine::NEXT_LISTEN_BATCH_RETRYER_MULTIPLIER)
.add(&crate::internal::machine::RECORD_COMPACTIONS)
.add(&crate::internal::state::ROLLUP_THRESHOLD)
.add(&crate::internal::state::WRITE_DIFFS_SUM)
.add(&crate::internal::apply::ROUNDTRIP_SPINE)
.add(&crate::operators::PERSIST_SINK_MINIMUM_BATCH_UPDATES)
.add(&crate::operators::STORAGE_PERSIST_SINK_MINIMUM_BATCH_UPDATES)
.add(&crate::operators::STORAGE_SOURCE_DECODE_FUEL)
.add(&crate::project::OPTIMIZE_IGNORED_DATA_DECODE)
.add(&crate::project::OPTIMIZE_IGNORED_DATA_FETCH)
.add(&crate::read::READER_LEASE_DURATION)
.add(&crate::rpc::PUBSUB_CLIENT_ENABLED)
.add(&crate::rpc::PUBSUB_PUSH_DIFF_ENABLED)
.add(&crate::stats::STATS_AUDIT_PERCENT)
.add(&crate::stats::STATS_BUDGET_BYTES)
.add(&crate::stats::STATS_COLLECTION_ENABLED)
.add(&crate::stats::STATS_FILTER_ENABLED)
.add(&crate::stats::STATS_UNTRIMMABLE_COLUMNS_EQUALS)
.add(&crate::stats::STATS_UNTRIMMABLE_COLUMNS_PREFIX)
.add(&crate::stats::STATS_UNTRIMMABLE_COLUMNS_SUFFIX)
.add(&crate::fetch::PART_DECODE_FORMAT)
.add(&crate::DANGEROUS_ENABLE_SCHEMA_EVOLUTION)
}
impl PersistConfig {
pub(crate) const DEFAULT_FALLBACK_ROLLUP_THRESHOLD_MULTIPLIER: usize = 3;
// TODO: Get rid of this in favor of using dyncfgs at the
// relevant callsites.
pub fn set_state_versions_recent_live_diffs_limit(&self, val: usize) {
self.dynamic
.state_versions_recent_live_diffs_limit
.store(val, DynamicConfig::STORE_ORDERING);
}
}
/// The minimum TTL of a connection to Postgres/CRDB before it is proactively
/// terminated. Connections are routinely culled to balance load against the
/// downstream database.
const CONSENSUS_CONNECTION_POOL_TTL: Config<Duration> = Config::new(
"persist_consensus_connection_pool_ttl",
Duration::from_secs(300),
"\
The minimum TTL of a Consensus connection to Postgres/CRDB before it is \
proactively terminated",
);
/// The minimum time between TTLing connections to Postgres/CRDB. This delay is
/// used to stagger reconnections to avoid stampedes and high tail latencies.
/// This value should be much less than `consensus_connection_pool_ttl` so that
/// reconnections are biased towards terminating the oldest connections first. A
/// value of `consensus_connection_pool_ttl /
/// consensus_connection_pool_max_size` is likely a good place to start so that
/// all connections are rotated when the pool is fully used.
const CONSENSUS_CONNECTION_POOL_TTL_STAGGER: Config<Duration> = Config::new(
"persist_consensus_connection_pool_ttl_stagger",
Duration::from_secs(6),
"The minimum time between TTLing Consensus connections to Postgres/CRDB.",
);
/// The duration to wait for a Consensus Postgres/CRDB connection to be made
/// before retrying.
pub const CRDB_CONNECT_TIMEOUT: Config<Duration> = Config::new(
"crdb_connect_timeout",
Duration::from_secs(5),
"The time to connect to CockroachDB before timing out and retrying.",
);
/// The TCP user timeout for a Consensus Postgres/CRDB connection. Specifies the
/// amount of time that transmitted data may remain unacknowledged before the
/// TCP connection is forcibly closed.
pub const CRDB_TCP_USER_TIMEOUT: Config<Duration> = Config::new(
"crdb_tcp_user_timeout",
Duration::from_secs(30),
"\
The TCP timeout for connections to CockroachDB. Specifies the amount of \
time that transmitted data may remain unacknowledged before the TCP \
connection is forcibly closed.",
);
/// Migrate the txns code to use the critical since when opening a new read handle.
pub const USE_CRITICAL_SINCE_TXN: Config<bool> = Config::new(
"persist_use_critical_since_txn",
true,
"Use the critical since (instead of the overall since) when initializing a subscribe.",
);
/// Migrate the catalog to use the critical since when opening a new read handle.
pub const USE_CRITICAL_SINCE_CATALOG: Config<bool> = Config::new(
"persist_use_critical_since_catalog",
false,
"Use the critical since (instead of the overall since) for the Persist-backed catalog.",
);
/// Migrate the persist source to use the critical since when opening a new read handle.
pub const USE_CRITICAL_SINCE_SOURCE: Config<bool> = Config::new(
"persist_use_critical_since_source",
false,
"Use the critical since (instead of the overall since) in the Persist source.",
);
/// Migrate snapshots to use the critical since when opening a new read handle.
pub const USE_CRITICAL_SINCE_SNAPSHOT: Config<bool> = Config::new(
"persist_use_critical_since_snapshot",
false,
"Use the critical since (instead of the overall since) when taking snapshots in the controller or in fast-path peeks.",
);
/// Migrate the persist source to use a process global txn cache.
pub const USE_GLOBAL_TXN_CACHE_SOURCE: Config<bool> = Config::new(
"use_global_txn_cache_source",
true,
"Use the process global txn cache (instead of an operator local one) in the Persist source.",
);
impl PostgresClientKnobs for PersistConfig {
fn connection_pool_max_size(&self) -> usize {
self.consensus_connection_pool_max_size
}
fn connection_pool_max_wait(&self) -> Option<Duration> {
self.consensus_connection_pool_max_wait
}
fn connection_pool_ttl(&self) -> Duration {
CONSENSUS_CONNECTION_POOL_TTL.get(self)
}
fn connection_pool_ttl_stagger(&self) -> Duration {
CONSENSUS_CONNECTION_POOL_TTL_STAGGER.get(self)
}
fn connect_timeout(&self) -> Duration {
CRDB_CONNECT_TIMEOUT.get(self)
}
fn tcp_user_timeout(&self) -> Duration {
CRDB_TCP_USER_TIMEOUT.get(self)
}
}
/// Persist configurations that can be dynamically updated.
///
/// Persist is expected to react to each of these such that updating the value
/// returned by the function takes effect in persist (i.e. no caching it). This
/// should happen "as promptly as reasonably possible" where that's defined by
/// the tradeoffs of complexity vs promptness. For example, we might use a
/// consistent version of `BLOB_TARGET_SIZE` for the entirety of a single
/// compaction call. However, it should _never_ require a process restart for an
/// update of these to take effect.
///
/// These are hooked up to LaunchDarkly. Specifically, LaunchDarkly configs are
/// serialized into a [mz_dyncfg::ConfigUpdates]. In environmentd, these are applied
/// directly via [mz_dyncfg::ConfigUpdates::apply] to the [PersistConfig] in
/// [crate::cache::PersistClientCache]. There is one `PersistClientCache` per
/// process, and every `PersistConfig` shares the same `Arc<DynamicConfig>`, so
/// this affects all [DynamicConfig] usage in the process. The
/// `ConfigUpdates` is also sent via the compute and storage command
/// streams, which then apply it to all computed/storaged/clusterd processes as
/// well.
#[derive(Debug)]
pub struct DynamicConfig {
batch_builder_max_outstanding_parts: AtomicUsize,
compaction_heuristic_min_inputs: AtomicUsize,
compaction_heuristic_min_parts: AtomicUsize,
compaction_heuristic_min_updates: AtomicUsize,
compaction_memory_bound_bytes: AtomicUsize,
gc_blob_delete_concurrency_limit: AtomicUsize,
state_versions_recent_live_diffs_limit: AtomicUsize,
usage_state_fetch_concurrency_limit: AtomicUsize,
}
#[derive(Copy, Clone, Debug, Eq, PartialEq, Arbitrary, Serialize, Deserialize)]
pub struct RetryParameters {
pub fixed_sleep: Duration,
pub initial_backoff: Duration,
pub multiplier: u32,
pub clamp: Duration,
}
impl RetryParameters {
pub(crate) fn into_retry(self, now: SystemTime) -> Retry {
let seed = now
.duration_since(UNIX_EPOCH)
.map_or(0, |x| u64::from(x.subsec_nanos()));
Retry {
fixed_sleep: self.fixed_sleep,
initial_backoff: self.initial_backoff,
multiplier: self.multiplier,
clamp_backoff: self.clamp,
seed,
}
}
}
impl DynamicConfig {
// TODO: Decide if we can relax these.
const LOAD_ORDERING: Ordering = Ordering::SeqCst;
const STORE_ORDERING: Ordering = Ordering::SeqCst;
/// The maximum number of parts (s3 blobs) that [crate::batch::BatchBuilder]
/// will pipeline before back-pressuring [crate::batch::BatchBuilder::add]
/// calls on previous ones finishing.
pub fn batch_builder_max_outstanding_parts(&self) -> usize {
self.batch_builder_max_outstanding_parts
.load(Self::LOAD_ORDERING)
}
/// In Compactor::compact_and_apply, we do the compaction (don't skip it)
/// if the number of inputs is at least this many. Compaction is performed
/// if any of the heuristic criteria are met (they are OR'd).
pub fn compaction_heuristic_min_inputs(&self) -> usize {
self.compaction_heuristic_min_inputs
.load(Self::LOAD_ORDERING)
}
/// In Compactor::compact_and_apply, we do the compaction (don't skip it)
/// if the number of batch parts is at least this many. Compaction is performed
/// if any of the heuristic criteria are met (they are OR'd).
pub fn compaction_heuristic_min_parts(&self) -> usize {
self.compaction_heuristic_min_parts
.load(Self::LOAD_ORDERING)
}
/// In Compactor::compact_and_apply, we do the compaction (don't skip it)
/// if the number of updates is at least this many. Compaction is performed
/// if any of the heuristic criteria are met (they are OR'd).
pub fn compaction_heuristic_min_updates(&self) -> usize {
self.compaction_heuristic_min_updates
.load(Self::LOAD_ORDERING)
}
/// The upper bound on compaction's memory consumption. The value must be at
/// least 4*`blob_target_size`. Increasing this value beyond the minimum allows
/// compaction to merge together more runs at once, providing greater
/// consolidation of updates, at the cost of greater memory usage.
pub fn compaction_memory_bound_bytes(&self) -> usize {
self.compaction_memory_bound_bytes.load(Self::LOAD_ORDERING)
}
/// The maximum number of concurrent blob deletes during garbage collection.
pub fn gc_blob_delete_concurrency_limit(&self) -> usize {
self.gc_blob_delete_concurrency_limit
.load(Self::LOAD_ORDERING)
}
/// The # of diffs to initially scan when fetching the latest consensus state, to
/// determine which requests go down the fast vs slow path. Should be large enough
/// to fetch all live diffs in the steady-state, and small enough to query Consensus
/// at high volume. Steady-state usage should accommodate readers that require
/// seqno-holds for reasonable amounts of time, which to start we say is 10s of minutes.
///
/// This value ought to be defined in terms of `NEED_ROLLUP_THRESHOLD` to approximate
/// when we expect rollups to be written and therefore when old states will be truncated
/// by GC.
pub fn state_versions_recent_live_diffs_limit(&self) -> usize {
self.state_versions_recent_live_diffs_limit
.load(Self::LOAD_ORDERING)
}
/// The maximum number of concurrent state fetches during usage computation.
pub fn usage_state_fetch_concurrency_limit(&self) -> usize {
self.usage_state_fetch_concurrency_limit
.load(Self::LOAD_ORDERING)
}
// TODO: Get rid of these in favor of using dyncfgs at the
// relevant callsites.
#[cfg(test)]
pub fn set_batch_builder_max_outstanding_parts(&self, val: usize) {
self.batch_builder_max_outstanding_parts
.store(val, Self::LOAD_ORDERING);
}
pub fn set_compaction_memory_bound_bytes(&self, val: usize) {
self.compaction_memory_bound_bytes
.store(val, Self::LOAD_ORDERING);
}
}
// TODO: Replace with dynamic values when PersistConfig is integrated with LD
impl BlobKnobs for PersistConfig {
fn operation_timeout(&self) -> Duration {
Duration::from_secs(180)
}
fn operation_attempt_timeout(&self) -> Duration {
Duration::from_secs(90)
}
fn connect_timeout(&self) -> Duration {
Duration::from_secs(7)
}
fn read_timeout(&self) -> Duration {
Duration::from_secs(10)
}
fn is_cc_active(&self) -> bool {
self.is_cc_active
}
}
// If persist gets some encoded ProtoState from the future (e.g. two versions of
// code are running simultaneously against the same shard), it might have a
// field that the current code doesn't know about. This would be silently
// discarded at proto decode time. Unknown Fields [1] are a tool we can use in
// the future to help deal with this, but in the short-term, it's best to keep
// the persist read-modify-CaS loop simple for as long as we can get away with
// it (i.e. until we have to offer the ability to do rollbacks).
//
// [1]: https://developers.google.com/protocol-buffers/docs/proto3#unknowns
//
// To detect the bad situation and disallow it, we tag every version of state
// written to consensus with the version of code used to encode it. Then at
// decode time, we're able to compare the current version against any we receive
// and assert as necessary.
//
// Initially we allow any from the past (permanent backward compatibility) and
// one minor version into the future (forward compatibility). This allows us to
// run two versions concurrently for rolling upgrades. We'll have to revisit
// this logic if/when we start using major versions other than 0.
//
// We could do the same for blob data, but it shouldn't be necessary. Any blob
// data we read is going to be because we fetched it using a pointer stored in
// some persist state. If we can handle the state, we can handle the blobs it
// references, too.
pub fn check_data_version(code_version: &Version, data_version: &Version) -> Result<(), String> {
// Allow one minor version of forward compatibility. We could avoid the
// clone with some nested comparisons of the semver fields, but this code
// isn't particularly performance sensitive and I find this impl easier to
// reason about.
let max_allowed_data_version = Version::new(
code_version.major,
code_version.minor.saturating_add(1),
u64::MAX,
);
if &max_allowed_data_version < data_version {
Err(format!(
"{code_version} received persist state from the future {data_version}",
))
} else {
Ok(())
}
}