1#![allow(missing_docs)]
11
12use std::sync::Arc;
15use std::sync::atomic::AtomicBool;
16use std::time::{Duration, SystemTime, UNIX_EPOCH};
17
18use mz_build_info::BuildInfo;
19use mz_dyncfg::{Config, ConfigDefault, ConfigSet, ConfigUpdates};
20use mz_ore::instrument;
21use mz_ore::now::NowFn;
22use mz_persist::cfg::BlobKnobs;
23use mz_persist::retry::Retry;
24use mz_postgres_client::PostgresClientKnobs;
25use proptest_derive::Arbitrary;
26use semver::Version;
27use serde::{Deserialize, Serialize};
28use tokio::sync::watch;
29
30use crate::async_runtime;
31use crate::internal::machine::{
32 NEXT_LISTEN_BATCH_RETRYER_CLAMP, NEXT_LISTEN_BATCH_RETRYER_INITIAL_BACKOFF,
33 NEXT_LISTEN_BATCH_RETRYER_MULTIPLIER,
34};
35use crate::internal::state::ROLLUP_THRESHOLD;
36use crate::operators::STORAGE_SOURCE_DECODE_FUEL;
37use crate::read::READER_LEASE_DURATION;
38
39const SELF_MANAGED_VERSIONS: &[Version] = &[
41 Version::new(0, 130, 0),
43 Version::new(0, 147, 0),
45];
46
47#[derive(Debug, Clone)]
99pub struct PersistConfig {
100 pub build_version: Version,
102 pub hostname: String,
104 pub is_cc_active: bool,
106 pub announce_memory_limit: Option<usize>,
108 pub now: NowFn,
110 pub configs: Arc<ConfigSet>,
115 configs_synced_once: Arc<watch::Sender<bool>>,
118 pub compaction_enabled: bool,
120 pub compaction_process_requests: Arc<AtomicBool>,
122 pub compaction_concurrency_limit: usize,
125 pub compaction_queue_size: usize,
128 pub compaction_yield_after_n_updates: usize,
131 pub writer_lease_duration: Duration,
134 pub critical_downgrade_interval: Duration,
136 pub isolated_runtime_worker_threads: usize,
139}
140
141impl std::ops::Deref for PersistConfig {
143 type Target = ConfigSet;
144 fn deref(&self) -> &Self::Target {
145 &self.configs
146 }
147}
148
149impl PersistConfig {
150 pub fn new_default_configs(build_info: &BuildInfo, now: NowFn) -> Self {
153 Self::new(build_info, now, all_dyncfgs(ConfigSet::default()))
154 }
155
156 pub fn new(build_info: &BuildInfo, now: NowFn, configs: ConfigSet) -> Self {
159 let compaction_disabled = mz_ore::env::is_var_truthy("MZ_PERSIST_COMPACTION_DISABLED");
161
162 let (configs_synced_once, _) = watch::channel(false);
164
165 Self {
166 build_version: build_info.semver_version(),
167 is_cc_active: false,
168 announce_memory_limit: None,
169 now,
170 configs: Arc::new(configs),
171 configs_synced_once: Arc::new(configs_synced_once),
172 compaction_enabled: !compaction_disabled,
173 compaction_process_requests: Arc::new(AtomicBool::new(true)),
174 compaction_concurrency_limit: 5,
175 compaction_queue_size: 20,
176 compaction_yield_after_n_updates: 100_000,
177 writer_lease_duration: 60 * Duration::from_secs(60),
178 critical_downgrade_interval: Duration::from_secs(30),
179 isolated_runtime_worker_threads: num_cpus::get(),
180 hostname: std::env::var("HOSTNAME").unwrap_or_else(|_| "unknown".to_owned()),
185 }
186 }
187
188 pub(crate) fn set_config<T: ConfigDefault>(&self, cfg: &Config<T>, val: T) {
189 let mut updates = ConfigUpdates::default();
190 updates.add(cfg, val);
191 updates.apply(self)
192 }
193
194 pub fn apply_from(&self, updates: &ConfigUpdates) {
200 updates.apply(&self.configs);
201 self.configs_synced_once.send_replace(true);
202 }
203
204 #[instrument(level = "info")]
213 pub async fn configs_synced_once(&self) {
214 self.configs_synced_once
215 .subscribe()
216 .wait_for(|synced| *synced)
217 .await
218 .expect("we have a borrow on sender so it cannot drop");
219 }
220
221 pub fn storage_source_decode_fuel(&self) -> usize {
224 STORAGE_SOURCE_DECODE_FUEL.get(self)
225 }
226
227 pub fn set_reader_lease_duration(&self, val: Duration) {
229 self.set_config(&READER_LEASE_DURATION, val);
230 }
231
232 pub fn set_rollup_threshold(&self, val: usize) {
234 self.set_config(&ROLLUP_THRESHOLD, val);
235 }
236
237 pub fn set_next_listen_batch_retryer(&self, val: RetryParameters) {
240 self.set_config(
241 &NEXT_LISTEN_BATCH_RETRYER_INITIAL_BACKOFF,
242 val.initial_backoff,
243 );
244 self.set_config(&NEXT_LISTEN_BATCH_RETRYER_MULTIPLIER, val.multiplier);
245 self.set_config(&NEXT_LISTEN_BATCH_RETRYER_CLAMP, val.clamp);
246 }
247
248 pub fn disable_compaction(&self) {
249 tracing::info!("Disabling Persist Compaction");
250 self.compaction_process_requests
251 .store(false, std::sync::atomic::Ordering::Relaxed);
252 }
253
254 pub fn enable_compaction(&self) {
255 tracing::info!("Enabling Persist Compaction");
256 self.compaction_process_requests
257 .store(true, std::sync::atomic::Ordering::Relaxed);
258 }
259
260 pub fn new_for_tests() -> Self {
262 use mz_build_info::DUMMY_BUILD_INFO;
263 use mz_ore::now::SYSTEM_TIME;
264
265 let mut cfg = Self::new_default_configs(&DUMMY_BUILD_INFO, SYSTEM_TIME.clone());
266 cfg.hostname = "tests".into();
267 cfg.isolated_runtime_worker_threads = async_runtime::TEST_THREADS;
268 cfg
269 }
270}
271
272#[allow(non_upper_case_globals)]
273pub(crate) const MiB: usize = 1024 * 1024;
274
275pub fn all_dyncfgs(configs: ConfigSet) -> ConfigSet {
281 mz_persist::cfg::all_dyn_configs(configs)
282 .add(&crate::batch::BATCH_DELETE_ENABLED)
283 .add(&crate::batch::BLOB_TARGET_SIZE)
284 .add(&crate::batch::INLINE_WRITES_TOTAL_MAX_BYTES)
285 .add(&crate::batch::INLINE_WRITES_SINGLE_MAX_BYTES)
286 .add(&crate::batch::ENCODING_ENABLE_DICTIONARY)
287 .add(&crate::batch::ENCODING_COMPRESSION_FORMAT)
288 .add(&crate::batch::STRUCTURED_KEY_LOWER_LEN)
289 .add(&crate::batch::MAX_RUN_LEN)
290 .add(&crate::batch::MAX_RUNS)
291 .add(&BLOB_OPERATION_TIMEOUT)
292 .add(&BLOB_OPERATION_ATTEMPT_TIMEOUT)
293 .add(&BLOB_CONNECT_TIMEOUT)
294 .add(&BLOB_READ_TIMEOUT)
295 .add(&crate::cfg::CONSENSUS_CONNECTION_POOL_MAX_SIZE)
296 .add(&crate::cfg::CONSENSUS_CONNECTION_POOL_MAX_WAIT)
297 .add(&crate::cfg::CONSENSUS_CONNECTION_POOL_TTL_STAGGER)
298 .add(&crate::cfg::CONSENSUS_CONNECTION_POOL_TTL)
299 .add(&crate::cfg::CRDB_CONNECT_TIMEOUT)
300 .add(&crate::cfg::CRDB_TCP_USER_TIMEOUT)
301 .add(&crate::cfg::USE_CRITICAL_SINCE_TXN)
302 .add(&crate::cfg::USE_CRITICAL_SINCE_CATALOG)
303 .add(&crate::cfg::USE_CRITICAL_SINCE_SOURCE)
304 .add(&crate::cfg::USE_CRITICAL_SINCE_SNAPSHOT)
305 .add(&crate::cfg::USE_GLOBAL_TXN_CACHE_SOURCE)
306 .add(&BATCH_BUILDER_MAX_OUTSTANDING_PARTS)
307 .add(&COMPACTION_HEURISTIC_MIN_INPUTS)
308 .add(&COMPACTION_HEURISTIC_MIN_PARTS)
309 .add(&COMPACTION_HEURISTIC_MIN_UPDATES)
310 .add(&COMPACTION_MEMORY_BOUND_BYTES)
311 .add(&GC_BLOB_DELETE_CONCURRENCY_LIMIT)
312 .add(&STATE_VERSIONS_RECENT_LIVE_DIFFS_LIMIT)
313 .add(&USAGE_STATE_FETCH_CONCURRENCY_LIMIT)
314 .add(&crate::cli::admin::CATALOG_FORCE_COMPACTION_FUEL)
315 .add(&crate::cli::admin::CATALOG_FORCE_COMPACTION_WAIT)
316 .add(&crate::cli::admin::EXPRESSION_CACHE_FORCE_COMPACTION_FUEL)
317 .add(&crate::cli::admin::EXPRESSION_CACHE_FORCE_COMPACTION_WAIT)
318 .add(&crate::fetch::FETCH_SEMAPHORE_COST_ADJUSTMENT)
319 .add(&crate::fetch::FETCH_SEMAPHORE_PERMIT_ADJUSTMENT)
320 .add(&crate::fetch::VALIDATE_PART_BOUNDS_ON_READ)
321 .add(&crate::fetch::OPTIMIZE_IGNORED_DATA_FETCH)
322 .add(&crate::internal::cache::BLOB_CACHE_MEM_LIMIT_BYTES)
323 .add(&crate::internal::cache::BLOB_CACHE_SCALE_WITH_THREADS)
324 .add(&crate::internal::cache::BLOB_CACHE_SCALE_FACTOR_BYTES)
325 .add(&crate::internal::compact::COMPACTION_MINIMUM_TIMEOUT)
326 .add(&crate::internal::compact::COMPACTION_USE_MOST_RECENT_SCHEMA)
327 .add(&crate::internal::compact::COMPACTION_CHECK_PROCESS_FLAG)
328 .add(&crate::internal::machine::CLAIM_UNCLAIMED_COMPACTIONS)
329 .add(&crate::internal::machine::CLAIM_COMPACTION_PERCENT)
330 .add(&crate::internal::machine::CLAIM_COMPACTION_MIN_VERSION)
331 .add(&crate::internal::machine::NEXT_LISTEN_BATCH_RETRYER_CLAMP)
332 .add(&crate::internal::machine::NEXT_LISTEN_BATCH_RETRYER_FIXED_SLEEP)
333 .add(&crate::internal::machine::NEXT_LISTEN_BATCH_RETRYER_INITIAL_BACKOFF)
334 .add(&crate::internal::machine::NEXT_LISTEN_BATCH_RETRYER_MULTIPLIER)
335 .add(&crate::internal::state::ROLLUP_THRESHOLD)
336 .add(&crate::internal::state::ROLLUP_USE_ACTIVE_ROLLUP)
337 .add(&crate::internal::state::GC_FALLBACK_THRESHOLD_MS)
338 .add(&crate::internal::state::GC_USE_ACTIVE_GC)
339 .add(&crate::internal::state::GC_MIN_VERSIONS)
340 .add(&crate::internal::state::GC_MAX_VERSIONS)
341 .add(&crate::internal::state::ROLLUP_FALLBACK_THRESHOLD_MS)
342 .add(&crate::internal::state::ENABLE_INCREMENTAL_COMPACTION)
343 .add(&crate::operators::STORAGE_SOURCE_DECODE_FUEL)
344 .add(&crate::read::READER_LEASE_DURATION)
345 .add(&crate::rpc::PUBSUB_CLIENT_ENABLED)
346 .add(&crate::rpc::PUBSUB_PUSH_DIFF_ENABLED)
347 .add(&crate::rpc::PUBSUB_SAME_PROCESS_DELEGATE_ENABLED)
348 .add(&crate::rpc::PUBSUB_CONNECT_ATTEMPT_TIMEOUT)
349 .add(&crate::rpc::PUBSUB_REQUEST_TIMEOUT)
350 .add(&crate::rpc::PUBSUB_CONNECT_MAX_BACKOFF)
351 .add(&crate::rpc::PUBSUB_CLIENT_SENDER_CHANNEL_SIZE)
352 .add(&crate::rpc::PUBSUB_CLIENT_RECEIVER_CHANNEL_SIZE)
353 .add(&crate::rpc::PUBSUB_SERVER_CONNECTION_CHANNEL_SIZE)
354 .add(&crate::rpc::PUBSUB_STATE_CACHE_SHARD_REF_CHANNEL_SIZE)
355 .add(&crate::rpc::PUBSUB_RECONNECT_BACKOFF)
356 .add(&crate::stats::STATS_AUDIT_PERCENT)
357 .add(&crate::stats::STATS_AUDIT_PANIC)
358 .add(&crate::stats::STATS_BUDGET_BYTES)
359 .add(&crate::stats::STATS_COLLECTION_ENABLED)
360 .add(&crate::stats::STATS_FILTER_ENABLED)
361 .add(&crate::stats::STATS_UNTRIMMABLE_COLUMNS_EQUALS)
362 .add(&crate::stats::STATS_UNTRIMMABLE_COLUMNS_PREFIX)
363 .add(&crate::stats::STATS_UNTRIMMABLE_COLUMNS_SUFFIX)
364 .add(&crate::fetch::PART_DECODE_FORMAT)
365 .add(&crate::write::COMBINE_INLINE_WRITES)
366 .add(&crate::write::VALIDATE_PART_BOUNDS_ON_WRITE)
367}
368
369impl PersistConfig {
370 pub(crate) const DEFAULT_FALLBACK_ROLLUP_THRESHOLD_MULTIPLIER: usize = 3;
371
372 pub fn set_state_versions_recent_live_diffs_limit(&self, val: usize) {
373 self.set_config(&STATE_VERSIONS_RECENT_LIVE_DIFFS_LIMIT, val);
374 }
375}
376
377pub const CONSENSUS_CONNECTION_POOL_MAX_SIZE: Config<usize> = Config::new(
381 "persist_consensus_connection_pool_max_size",
382 50,
383 "The maximum size the connection pool to Postgres/CRDB will grow to.",
384);
385
386const CONSENSUS_CONNECTION_POOL_MAX_WAIT: Config<Duration> = Config::new(
391 "persist_consensus_connection_pool_max_wait",
392 Duration::from_secs(60),
393 "The amount of time we'll wait for a connection to become available.",
394);
395
396const CONSENSUS_CONNECTION_POOL_TTL: Config<Duration> = Config::new(
400 "persist_consensus_connection_pool_ttl",
401 Duration::from_secs(300),
402 "\
403 The minimum TTL of a Consensus connection to Postgres/CRDB before it is \
404 proactively terminated",
405);
406
407const CONSENSUS_CONNECTION_POOL_TTL_STAGGER: Config<Duration> = Config::new(
415 "persist_consensus_connection_pool_ttl_stagger",
416 Duration::from_secs(6),
417 "The minimum time between TTLing Consensus connections to Postgres/CRDB.",
418);
419
420pub const CRDB_CONNECT_TIMEOUT: Config<Duration> = Config::new(
423 "crdb_connect_timeout",
424 Duration::from_secs(5),
425 "The time to connect to CockroachDB before timing out and retrying.",
426);
427
428pub const CRDB_TCP_USER_TIMEOUT: Config<Duration> = Config::new(
432 "crdb_tcp_user_timeout",
433 Duration::from_secs(30),
434 "\
435 The TCP timeout for connections to CockroachDB. Specifies the amount of \
436 time that transmitted data may remain unacknowledged before the TCP \
437 connection is forcibly closed.",
438);
439
440pub const USE_CRITICAL_SINCE_TXN: Config<bool> = Config::new(
442 "persist_use_critical_since_txn",
443 true,
444 "Use the critical since (instead of the overall since) when initializing a subscribe.",
445);
446
447pub const USE_CRITICAL_SINCE_CATALOG: Config<bool> = Config::new(
449 "persist_use_critical_since_catalog",
450 false,
451 "Use the critical since (instead of the overall since) for the Persist-backed catalog.",
452);
453
454pub const USE_CRITICAL_SINCE_SOURCE: Config<bool> = Config::new(
456 "persist_use_critical_since_source",
457 false,
458 "Use the critical since (instead of the overall since) in the Persist source.",
459);
460
461pub const USE_CRITICAL_SINCE_SNAPSHOT: Config<bool> = Config::new(
463 "persist_use_critical_since_snapshot",
464 false,
465 "Use the critical since (instead of the overall since) when taking snapshots in the controller or in fast-path peeks.",
466);
467
468pub const USE_GLOBAL_TXN_CACHE_SOURCE: Config<bool> = Config::new(
470 "use_global_txn_cache_source",
471 true,
472 "Use the process global txn cache (instead of an operator local one) in the Persist source.",
473);
474
475pub const BATCH_BUILDER_MAX_OUTSTANDING_PARTS: Config<usize> = Config::new(
479 "persist_batch_builder_max_outstanding_parts",
480 2,
481 "The number of writes a batch builder can have outstanding before we slow down the writer.",
482);
483
484pub const COMPACTION_HEURISTIC_MIN_INPUTS: Config<usize> = Config::new(
488 "persist_compaction_heuristic_min_inputs",
489 8,
490 "Don't skip compaction if we have more than this many hollow batches as input.",
491);
492
493pub const COMPACTION_HEURISTIC_MIN_PARTS: Config<usize> = Config::new(
497 "persist_compaction_heuristic_min_parts",
498 8,
499 "Don't skip compaction if we have more than this many parts as input.",
500);
501
502pub const COMPACTION_HEURISTIC_MIN_UPDATES: Config<usize> = Config::new(
506 "persist_compaction_heuristic_min_updates",
507 1024,
508 "Don't skip compaction if we have more than this many updates as input.",
509);
510
511pub const COMPACTION_MEMORY_BOUND_BYTES: Config<usize> = Config::new(
516 "persist_compaction_memory_bound_bytes",
517 1024 * MiB,
518 "Attempt to limit compaction to this amount of memory.",
519);
520
521pub const GC_BLOB_DELETE_CONCURRENCY_LIMIT: Config<usize> = Config::new(
523 "persist_gc_blob_delete_concurrency_limit",
524 32,
525 "Limit the number of concurrent deletes GC can perform to this threshold.",
526);
527
528pub const STATE_VERSIONS_RECENT_LIVE_DIFFS_LIMIT: Config<usize> = Config::new(
538 "persist_state_versions_recent_live_diffs_limit",
539 30 * 128,
540 "Fetch this many diffs when fetching recent diffs.",
541);
542
543pub const USAGE_STATE_FETCH_CONCURRENCY_LIMIT: Config<usize> = Config::new(
545 "persist_usage_state_fetch_concurrency_limit",
546 8,
547 "Limit the concurrency in of fetching in the perioding Persist-storage-usage calculation.",
548);
549
550impl PostgresClientKnobs for PersistConfig {
551 fn connection_pool_max_size(&self) -> usize {
552 CONSENSUS_CONNECTION_POOL_MAX_SIZE.get(self)
553 }
554
555 fn connection_pool_max_wait(&self) -> Option<Duration> {
556 Some(CONSENSUS_CONNECTION_POOL_MAX_WAIT.get(self))
557 }
558
559 fn connection_pool_ttl(&self) -> Duration {
560 CONSENSUS_CONNECTION_POOL_TTL.get(self)
561 }
562
563 fn connection_pool_ttl_stagger(&self) -> Duration {
564 CONSENSUS_CONNECTION_POOL_TTL_STAGGER.get(self)
565 }
566
567 fn connect_timeout(&self) -> Duration {
568 CRDB_CONNECT_TIMEOUT.get(self)
569 }
570
571 fn tcp_user_timeout(&self) -> Duration {
572 CRDB_TCP_USER_TIMEOUT.get(self)
573 }
574}
575
576#[derive(Copy, Clone, Debug, Eq, PartialEq, Arbitrary, Serialize, Deserialize)]
577pub struct RetryParameters {
578 pub fixed_sleep: Duration,
579 pub initial_backoff: Duration,
580 pub multiplier: u32,
581 pub clamp: Duration,
582}
583
584impl RetryParameters {
585 pub(crate) fn into_retry(self, now: SystemTime) -> Retry {
586 let seed = now
587 .duration_since(UNIX_EPOCH)
588 .map_or(0, |x| u64::from(x.subsec_nanos()));
589 Retry {
590 fixed_sleep: self.fixed_sleep,
591 initial_backoff: self.initial_backoff,
592 multiplier: self.multiplier,
593 clamp_backoff: self.clamp,
594 seed,
595 }
596 }
597}
598
599pub(crate) const BLOB_OPERATION_TIMEOUT: Config<Duration> = Config::new(
600 "persist_blob_operation_timeout",
601 Duration::from_secs(180),
602 "Maximum time allowed for a network call, including retry attempts.",
603);
604
605pub(crate) const BLOB_OPERATION_ATTEMPT_TIMEOUT: Config<Duration> = Config::new(
606 "persist_blob_operation_attempt_timeout",
607 Duration::from_secs(90),
608 "Maximum time allowed for a single network call.",
609);
610
611pub(crate) const BLOB_CONNECT_TIMEOUT: Config<Duration> = Config::new(
612 "persist_blob_connect_timeout",
613 Duration::from_secs(7),
614 "Maximum time to wait for a socket connection to be made.",
615);
616
617pub(crate) const BLOB_READ_TIMEOUT: Config<Duration> = Config::new(
618 "persist_blob_read_timeout",
619 Duration::from_secs(10),
620 "Maximum time to wait to read the first byte of a response, including connection time.",
621);
622
623impl BlobKnobs for PersistConfig {
624 fn operation_timeout(&self) -> Duration {
625 BLOB_OPERATION_TIMEOUT.get(self)
626 }
627
628 fn operation_attempt_timeout(&self) -> Duration {
629 BLOB_OPERATION_ATTEMPT_TIMEOUT.get(self)
630 }
631
632 fn connect_timeout(&self) -> Duration {
633 BLOB_CONNECT_TIMEOUT.get(self)
634 }
635
636 fn read_timeout(&self) -> Duration {
637 BLOB_READ_TIMEOUT.get(self)
638 }
639
640 fn is_cc_active(&self) -> bool {
641 self.is_cc_active
642 }
643}
644
645pub fn check_data_version(code_version: &Version, data_version: &Version) -> Result<(), String> {
646 check_data_version_with_self_managed_versions(code_version, data_version, SELF_MANAGED_VERSIONS)
647}
648
649pub(crate) fn check_data_version_with_self_managed_versions(
674 code_version: &Version,
675 data_version: &Version,
676 self_managed_versions: &[Version],
677) -> Result<(), String> {
678 let base_code_version = Version {
680 patch: 0,
681 ..code_version.clone()
682 };
683 let base_data_version = Version {
684 patch: 0,
685 ..data_version.clone()
686 };
687 if data_version >= code_version {
688 for window in self_managed_versions.windows(2) {
689 if base_code_version == window[0] && base_data_version <= window[1] {
690 return Ok(());
691 }
692 }
693
694 if let Some(last) = self_managed_versions.last() {
695 if base_code_version == *last
696 && base_data_version
702 .minor
703 .saturating_sub(base_code_version.minor)
704 < 40
705 {
706 return Ok(());
707 }
708 }
709 }
710
711 let max_allowed_data_version = Version::new(
716 code_version.major,
717 code_version.minor.saturating_add(1),
718 u64::MAX,
719 );
720
721 if &max_allowed_data_version < data_version {
722 Err(format!(
723 "{code_version} received persist state from the future {data_version}",
724 ))
725 } else {
726 Ok(())
727 }
728}