1#![allow(missing_docs)]
11
12use std::sync::Arc;
15use std::sync::atomic::AtomicBool;
16use std::time::{Duration, SystemTime, UNIX_EPOCH};
17
18use mz_build_info::BuildInfo;
19use mz_dyncfg::{Config, ConfigDefault, ConfigSet, ConfigUpdates};
20use mz_ore::instrument;
21use mz_ore::now::NowFn;
22use mz_persist::cfg::BlobKnobs;
23use mz_persist::retry::Retry;
24use mz_postgres_client::PostgresClientKnobs;
25use proptest_derive::Arbitrary;
26use semver::Version;
27use serde::{Deserialize, Serialize};
28use tokio::sync::watch;
29
30use crate::internal::machine::{
31 NEXT_LISTEN_BATCH_RETRYER_CLAMP, NEXT_LISTEN_BATCH_RETRYER_INITIAL_BACKOFF,
32 NEXT_LISTEN_BATCH_RETRYER_MULTIPLIER,
33};
34use crate::internal::state::ROLLUP_THRESHOLD;
35use crate::operators::STORAGE_SOURCE_DECODE_FUEL;
36use crate::read::READER_LEASE_DURATION;
37
38const LTS_VERSIONS: &[Version] = &[
39 Version::new(0, 130, 0),
41];
42
43#[derive(Debug, Clone)]
95pub struct PersistConfig {
96 pub build_version: Version,
98 pub hostname: String,
100 pub is_cc_active: bool,
102 pub announce_memory_limit: Option<usize>,
104 pub now: NowFn,
106 pub configs: Arc<ConfigSet>,
111 configs_synced_once: Arc<watch::Sender<bool>>,
114 pub compaction_enabled: bool,
116 pub compaction_process_requests: Arc<AtomicBool>,
118 pub compaction_concurrency_limit: usize,
121 pub compaction_queue_size: usize,
124 pub compaction_yield_after_n_updates: usize,
127 pub writer_lease_duration: Duration,
130 pub critical_downgrade_interval: Duration,
132 pub isolated_runtime_worker_threads: usize,
135}
136
137impl std::ops::Deref for PersistConfig {
139 type Target = ConfigSet;
140 fn deref(&self) -> &Self::Target {
141 &self.configs
142 }
143}
144
145impl PersistConfig {
146 pub fn new_default_configs(build_info: &BuildInfo, now: NowFn) -> Self {
149 Self::new(build_info, now, all_dyncfgs(ConfigSet::default()))
150 }
151
152 pub fn new(build_info: &BuildInfo, now: NowFn, configs: ConfigSet) -> Self {
155 let compaction_disabled = mz_ore::env::is_var_truthy("MZ_PERSIST_COMPACTION_DISABLED");
157
158 let (configs_synced_once, _) = watch::channel(false);
160
161 Self {
162 build_version: build_info.semver_version(),
163 is_cc_active: false,
164 announce_memory_limit: None,
165 now,
166 configs: Arc::new(configs),
167 configs_synced_once: Arc::new(configs_synced_once),
168 compaction_enabled: !compaction_disabled,
169 compaction_process_requests: Arc::new(AtomicBool::new(true)),
170 compaction_concurrency_limit: 5,
171 compaction_queue_size: 20,
172 compaction_yield_after_n_updates: 100_000,
173 writer_lease_duration: 60 * Duration::from_secs(60),
174 critical_downgrade_interval: Duration::from_secs(30),
175 isolated_runtime_worker_threads: num_cpus::get(),
176 hostname: std::env::var("HOSTNAME").unwrap_or_else(|_| "unknown".to_owned()),
181 }
182 }
183
184 pub(crate) fn set_config<T: ConfigDefault>(&self, cfg: &Config<T>, val: T) {
185 let mut updates = ConfigUpdates::default();
186 updates.add(cfg, val);
187 updates.apply(self)
188 }
189
190 pub fn apply_from(&self, updates: &ConfigUpdates) {
196 updates.apply(&self.configs);
197 self.configs_synced_once.send_replace(true);
198 }
199
200 #[instrument(level = "info")]
209 pub async fn configs_synced_once(&self) {
210 self.configs_synced_once
211 .subscribe()
212 .wait_for(|synced| *synced)
213 .await
214 .expect("we have a borrow on sender so it cannot drop");
215 }
216
217 pub fn storage_source_decode_fuel(&self) -> usize {
220 STORAGE_SOURCE_DECODE_FUEL.get(self)
221 }
222
223 pub fn set_reader_lease_duration(&self, val: Duration) {
225 self.set_config(&READER_LEASE_DURATION, val);
226 }
227
228 pub fn set_rollup_threshold(&self, val: usize) {
230 self.set_config(&ROLLUP_THRESHOLD, val);
231 }
232
233 pub fn set_next_listen_batch_retryer(&self, val: RetryParameters) {
236 self.set_config(
237 &NEXT_LISTEN_BATCH_RETRYER_INITIAL_BACKOFF,
238 val.initial_backoff,
239 );
240 self.set_config(&NEXT_LISTEN_BATCH_RETRYER_MULTIPLIER, val.multiplier);
241 self.set_config(&NEXT_LISTEN_BATCH_RETRYER_CLAMP, val.clamp);
242 }
243
244 pub fn disable_compaction(&self) {
245 tracing::info!("Disabling Persist Compaction");
246 self.compaction_process_requests
247 .store(false, std::sync::atomic::Ordering::Relaxed);
248 }
249
250 pub fn enable_compaction(&self) {
251 tracing::info!("Enabling Persist Compaction");
252 self.compaction_process_requests
253 .store(true, std::sync::atomic::Ordering::Relaxed);
254 }
255
256 pub fn new_for_tests() -> Self {
258 use mz_build_info::DUMMY_BUILD_INFO;
259 use mz_ore::now::SYSTEM_TIME;
260
261 let mut cfg = Self::new_default_configs(&DUMMY_BUILD_INFO, SYSTEM_TIME.clone());
262 cfg.hostname = "tests".into();
263 cfg
264 }
265}
266
267#[allow(non_upper_case_globals)]
268pub(crate) const MiB: usize = 1024 * 1024;
269
270pub fn all_dyncfgs(configs: ConfigSet) -> ConfigSet {
276 mz_persist::cfg::all_dyn_configs(configs)
277 .add(&crate::batch::BATCH_DELETE_ENABLED)
278 .add(&crate::batch::BLOB_TARGET_SIZE)
279 .add(&crate::batch::INLINE_WRITES_TOTAL_MAX_BYTES)
280 .add(&crate::batch::INLINE_WRITES_SINGLE_MAX_BYTES)
281 .add(&crate::batch::ENCODING_ENABLE_DICTIONARY)
282 .add(&crate::batch::ENCODING_COMPRESSION_FORMAT)
283 .add(&crate::batch::STRUCTURED_KEY_LOWER_LEN)
284 .add(&crate::batch::MAX_RUN_LEN)
285 .add(&crate::batch::MAX_RUNS)
286 .add(&BLOB_OPERATION_TIMEOUT)
287 .add(&BLOB_OPERATION_ATTEMPT_TIMEOUT)
288 .add(&BLOB_CONNECT_TIMEOUT)
289 .add(&BLOB_READ_TIMEOUT)
290 .add(&crate::cfg::CONSENSUS_CONNECTION_POOL_MAX_SIZE)
291 .add(&crate::cfg::CONSENSUS_CONNECTION_POOL_MAX_WAIT)
292 .add(&crate::cfg::CONSENSUS_CONNECTION_POOL_TTL_STAGGER)
293 .add(&crate::cfg::CONSENSUS_CONNECTION_POOL_TTL)
294 .add(&crate::cfg::CRDB_CONNECT_TIMEOUT)
295 .add(&crate::cfg::CRDB_TCP_USER_TIMEOUT)
296 .add(&crate::cfg::USE_CRITICAL_SINCE_TXN)
297 .add(&crate::cfg::USE_CRITICAL_SINCE_CATALOG)
298 .add(&crate::cfg::USE_CRITICAL_SINCE_SOURCE)
299 .add(&crate::cfg::USE_CRITICAL_SINCE_SNAPSHOT)
300 .add(&crate::cfg::USE_GLOBAL_TXN_CACHE_SOURCE)
301 .add(&BATCH_BUILDER_MAX_OUTSTANDING_PARTS)
302 .add(&COMPACTION_HEURISTIC_MIN_INPUTS)
303 .add(&COMPACTION_HEURISTIC_MIN_PARTS)
304 .add(&COMPACTION_HEURISTIC_MIN_UPDATES)
305 .add(&COMPACTION_MEMORY_BOUND_BYTES)
306 .add(&GC_BLOB_DELETE_CONCURRENCY_LIMIT)
307 .add(&STATE_VERSIONS_RECENT_LIVE_DIFFS_LIMIT)
308 .add(&USAGE_STATE_FETCH_CONCURRENCY_LIMIT)
309 .add(&crate::cli::admin::CATALOG_FORCE_COMPACTION_FUEL)
310 .add(&crate::cli::admin::CATALOG_FORCE_COMPACTION_WAIT)
311 .add(&crate::cli::admin::EXPRESSION_CACHE_FORCE_COMPACTION_FUEL)
312 .add(&crate::cli::admin::EXPRESSION_CACHE_FORCE_COMPACTION_WAIT)
313 .add(&crate::fetch::FETCH_SEMAPHORE_COST_ADJUSTMENT)
314 .add(&crate::fetch::FETCH_SEMAPHORE_PERMIT_ADJUSTMENT)
315 .add(&crate::fetch::OPTIMIZE_IGNORED_DATA_FETCH)
316 .add(&crate::internal::cache::BLOB_CACHE_MEM_LIMIT_BYTES)
317 .add(&crate::internal::cache::BLOB_CACHE_SCALE_WITH_THREADS)
318 .add(&crate::internal::cache::BLOB_CACHE_SCALE_FACTOR_BYTES)
319 .add(&crate::internal::compact::COMPACTION_MINIMUM_TIMEOUT)
320 .add(&crate::internal::compact::COMPACTION_USE_MOST_RECENT_SCHEMA)
321 .add(&crate::internal::compact::COMPACTION_CHECK_PROCESS_FLAG)
322 .add(&crate::internal::machine::CLAIM_UNCLAIMED_COMPACTIONS)
323 .add(&crate::internal::machine::CLAIM_COMPACTION_PERCENT)
324 .add(&crate::internal::machine::CLAIM_COMPACTION_MIN_VERSION)
325 .add(&crate::internal::machine::NEXT_LISTEN_BATCH_RETRYER_CLAMP)
326 .add(&crate::internal::machine::NEXT_LISTEN_BATCH_RETRYER_FIXED_SLEEP)
327 .add(&crate::internal::machine::NEXT_LISTEN_BATCH_RETRYER_INITIAL_BACKOFF)
328 .add(&crate::internal::machine::NEXT_LISTEN_BATCH_RETRYER_MULTIPLIER)
329 .add(&crate::internal::state::ROLLUP_THRESHOLD)
330 .add(&crate::internal::state::ROLLUP_USE_ACTIVE_ROLLUP)
331 .add(&crate::internal::state::GC_FALLBACK_THRESHOLD_MS)
332 .add(&crate::internal::state::GC_USE_ACTIVE_GC)
333 .add(&crate::internal::state::ROLLUP_FALLBACK_THRESHOLD_MS)
334 .add(&crate::operators::STORAGE_SOURCE_DECODE_FUEL)
335 .add(&crate::read::READER_LEASE_DURATION)
336 .add(&crate::rpc::PUBSUB_CLIENT_ENABLED)
337 .add(&crate::rpc::PUBSUB_PUSH_DIFF_ENABLED)
338 .add(&crate::rpc::PUBSUB_SAME_PROCESS_DELEGATE_ENABLED)
339 .add(&crate::rpc::PUBSUB_CONNECT_ATTEMPT_TIMEOUT)
340 .add(&crate::rpc::PUBSUB_REQUEST_TIMEOUT)
341 .add(&crate::rpc::PUBSUB_CONNECT_MAX_BACKOFF)
342 .add(&crate::rpc::PUBSUB_CLIENT_SENDER_CHANNEL_SIZE)
343 .add(&crate::rpc::PUBSUB_CLIENT_RECEIVER_CHANNEL_SIZE)
344 .add(&crate::rpc::PUBSUB_SERVER_CONNECTION_CHANNEL_SIZE)
345 .add(&crate::rpc::PUBSUB_STATE_CACHE_SHARD_REF_CHANNEL_SIZE)
346 .add(&crate::rpc::PUBSUB_RECONNECT_BACKOFF)
347 .add(&crate::stats::STATS_AUDIT_PERCENT)
348 .add(&crate::stats::STATS_BUDGET_BYTES)
349 .add(&crate::stats::STATS_COLLECTION_ENABLED)
350 .add(&crate::stats::STATS_FILTER_ENABLED)
351 .add(&crate::stats::STATS_UNTRIMMABLE_COLUMNS_EQUALS)
352 .add(&crate::stats::STATS_UNTRIMMABLE_COLUMNS_PREFIX)
353 .add(&crate::stats::STATS_UNTRIMMABLE_COLUMNS_SUFFIX)
354 .add(&crate::fetch::PART_DECODE_FORMAT)
355 .add(&crate::write::COMBINE_INLINE_WRITES)
356}
357
358impl PersistConfig {
359 pub(crate) const DEFAULT_FALLBACK_ROLLUP_THRESHOLD_MULTIPLIER: usize = 3;
360
361 pub fn set_state_versions_recent_live_diffs_limit(&self, val: usize) {
362 self.set_config(&STATE_VERSIONS_RECENT_LIVE_DIFFS_LIMIT, val);
363 }
364}
365
366pub const CONSENSUS_CONNECTION_POOL_MAX_SIZE: Config<usize> = Config::new(
370 "persist_consensus_connection_pool_max_size",
371 50,
372 "The maximum size the connection pool to Postgres/CRDB will grow to.",
373);
374
375const CONSENSUS_CONNECTION_POOL_MAX_WAIT: Config<Duration> = Config::new(
380 "persist_consensus_connection_pool_max_wait",
381 Duration::from_secs(60),
382 "The amount of time we'll wait for a connection to become available.",
383);
384
385const CONSENSUS_CONNECTION_POOL_TTL: Config<Duration> = Config::new(
389 "persist_consensus_connection_pool_ttl",
390 Duration::from_secs(300),
391 "\
392 The minimum TTL of a Consensus connection to Postgres/CRDB before it is \
393 proactively terminated",
394);
395
396const CONSENSUS_CONNECTION_POOL_TTL_STAGGER: Config<Duration> = Config::new(
404 "persist_consensus_connection_pool_ttl_stagger",
405 Duration::from_secs(6),
406 "The minimum time between TTLing Consensus connections to Postgres/CRDB.",
407);
408
409pub const CRDB_CONNECT_TIMEOUT: Config<Duration> = Config::new(
412 "crdb_connect_timeout",
413 Duration::from_secs(5),
414 "The time to connect to CockroachDB before timing out and retrying.",
415);
416
417pub const CRDB_TCP_USER_TIMEOUT: Config<Duration> = Config::new(
421 "crdb_tcp_user_timeout",
422 Duration::from_secs(30),
423 "\
424 The TCP timeout for connections to CockroachDB. Specifies the amount of \
425 time that transmitted data may remain unacknowledged before the TCP \
426 connection is forcibly closed.",
427);
428
429pub const USE_CRITICAL_SINCE_TXN: Config<bool> = Config::new(
431 "persist_use_critical_since_txn",
432 true,
433 "Use the critical since (instead of the overall since) when initializing a subscribe.",
434);
435
436pub const USE_CRITICAL_SINCE_CATALOG: Config<bool> = Config::new(
438 "persist_use_critical_since_catalog",
439 false,
440 "Use the critical since (instead of the overall since) for the Persist-backed catalog.",
441);
442
443pub const USE_CRITICAL_SINCE_SOURCE: Config<bool> = Config::new(
445 "persist_use_critical_since_source",
446 false,
447 "Use the critical since (instead of the overall since) in the Persist source.",
448);
449
450pub const USE_CRITICAL_SINCE_SNAPSHOT: Config<bool> = Config::new(
452 "persist_use_critical_since_snapshot",
453 false,
454 "Use the critical since (instead of the overall since) when taking snapshots in the controller or in fast-path peeks.",
455);
456
457pub const USE_GLOBAL_TXN_CACHE_SOURCE: Config<bool> = Config::new(
459 "use_global_txn_cache_source",
460 true,
461 "Use the process global txn cache (instead of an operator local one) in the Persist source.",
462);
463
464pub const BATCH_BUILDER_MAX_OUTSTANDING_PARTS: Config<usize> = Config::new(
468 "persist_batch_builder_max_outstanding_parts",
469 2,
470 "The number of writes a batch builder can have outstanding before we slow down the writer.",
471);
472
473pub const COMPACTION_HEURISTIC_MIN_INPUTS: Config<usize> = Config::new(
477 "persist_compaction_heuristic_min_inputs",
478 8,
479 "Don't skip compaction if we have more than this many hollow batches as input.",
480);
481
482pub const COMPACTION_HEURISTIC_MIN_PARTS: Config<usize> = Config::new(
486 "persist_compaction_heuristic_min_parts",
487 8,
488 "Don't skip compaction if we have more than this many parts as input.",
489);
490
491pub const COMPACTION_HEURISTIC_MIN_UPDATES: Config<usize> = Config::new(
495 "persist_compaction_heuristic_min_updates",
496 1024,
497 "Don't skip compaction if we have more than this many updates as input.",
498);
499
500pub const COMPACTION_MEMORY_BOUND_BYTES: Config<usize> = Config::new(
505 "persist_compaction_memory_bound_bytes",
506 1024 * MiB,
507 "Attempt to limit compaction to this amount of memory.",
508);
509
510pub const GC_BLOB_DELETE_CONCURRENCY_LIMIT: Config<usize> = Config::new(
512 "persist_gc_blob_delete_concurrency_limit",
513 32,
514 "Limit the number of concurrent deletes GC can perform to this threshold.",
515);
516
517pub const STATE_VERSIONS_RECENT_LIVE_DIFFS_LIMIT: Config<usize> = Config::new(
527 "persist_state_versions_recent_live_diffs_limit",
528 30 * 128,
529 "Fetch this many diffs when fetching recent diffs.",
530);
531
532pub const USAGE_STATE_FETCH_CONCURRENCY_LIMIT: Config<usize> = Config::new(
534 "persist_usage_state_fetch_concurrency_limit",
535 8,
536 "Limit the concurrency in of fetching in the perioding Persist-storage-usage calculation.",
537);
538
539impl PostgresClientKnobs for PersistConfig {
540 fn connection_pool_max_size(&self) -> usize {
541 CONSENSUS_CONNECTION_POOL_MAX_SIZE.get(self)
542 }
543
544 fn connection_pool_max_wait(&self) -> Option<Duration> {
545 Some(CONSENSUS_CONNECTION_POOL_MAX_WAIT.get(self))
546 }
547
548 fn connection_pool_ttl(&self) -> Duration {
549 CONSENSUS_CONNECTION_POOL_TTL.get(self)
550 }
551
552 fn connection_pool_ttl_stagger(&self) -> Duration {
553 CONSENSUS_CONNECTION_POOL_TTL_STAGGER.get(self)
554 }
555
556 fn connect_timeout(&self) -> Duration {
557 CRDB_CONNECT_TIMEOUT.get(self)
558 }
559
560 fn tcp_user_timeout(&self) -> Duration {
561 CRDB_TCP_USER_TIMEOUT.get(self)
562 }
563}
564
565#[derive(Copy, Clone, Debug, Eq, PartialEq, Arbitrary, Serialize, Deserialize)]
566pub struct RetryParameters {
567 pub fixed_sleep: Duration,
568 pub initial_backoff: Duration,
569 pub multiplier: u32,
570 pub clamp: Duration,
571}
572
573impl RetryParameters {
574 pub(crate) fn into_retry(self, now: SystemTime) -> Retry {
575 let seed = now
576 .duration_since(UNIX_EPOCH)
577 .map_or(0, |x| u64::from(x.subsec_nanos()));
578 Retry {
579 fixed_sleep: self.fixed_sleep,
580 initial_backoff: self.initial_backoff,
581 multiplier: self.multiplier,
582 clamp_backoff: self.clamp,
583 seed,
584 }
585 }
586}
587
588pub(crate) const BLOB_OPERATION_TIMEOUT: Config<Duration> = Config::new(
589 "persist_blob_operation_timeout",
590 Duration::from_secs(180),
591 "Maximum time allowed for a network call, including retry attempts.",
592);
593
594pub(crate) const BLOB_OPERATION_ATTEMPT_TIMEOUT: Config<Duration> = Config::new(
595 "persist_blob_operation_attempt_timeout",
596 Duration::from_secs(90),
597 "Maximum time allowed for a single network call.",
598);
599
600pub(crate) const BLOB_CONNECT_TIMEOUT: Config<Duration> = Config::new(
601 "persist_blob_connect_timeout",
602 Duration::from_secs(7),
603 "Maximum time to wait for a socket connection to be made.",
604);
605
606pub(crate) const BLOB_READ_TIMEOUT: Config<Duration> = Config::new(
607 "persist_blob_read_timeout",
608 Duration::from_secs(10),
609 "Maximum time to wait to read the first byte of a response, including connection time.",
610);
611
612impl BlobKnobs for PersistConfig {
613 fn operation_timeout(&self) -> Duration {
614 BLOB_OPERATION_TIMEOUT.get(self)
615 }
616
617 fn operation_attempt_timeout(&self) -> Duration {
618 BLOB_OPERATION_ATTEMPT_TIMEOUT.get(self)
619 }
620
621 fn connect_timeout(&self) -> Duration {
622 BLOB_CONNECT_TIMEOUT.get(self)
623 }
624
625 fn read_timeout(&self) -> Duration {
626 BLOB_READ_TIMEOUT.get(self)
627 }
628
629 fn is_cc_active(&self) -> bool {
630 self.is_cc_active
631 }
632}
633
634pub fn check_data_version(code_version: &Version, data_version: &Version) -> Result<(), String> {
635 check_data_version_with_lts_versions(code_version, data_version, LTS_VERSIONS)
636}
637
638pub(crate) fn check_data_version_with_lts_versions(
663 code_version: &Version,
664 data_version: &Version,
665 lts_versions: &[Version],
666) -> Result<(), String> {
667 let base_code_version = Version {
669 patch: 0,
670 ..code_version.clone()
671 };
672 let base_data_version = Version {
673 patch: 0,
674 ..data_version.clone()
675 };
676 if data_version >= code_version {
677 for window in lts_versions.windows(2) {
678 if base_code_version == window[0] && base_data_version <= window[1] {
679 return Ok(());
680 }
681 }
682
683 if let Some(last) = lts_versions.last() {
684 if base_code_version == *last
685 && base_data_version
691 .minor
692 .saturating_sub(base_code_version.minor)
693 < 40
694 {
695 return Ok(());
696 }
697 }
698 }
699
700 let max_allowed_data_version = Version::new(
705 code_version.major,
706 code_version.minor.saturating_add(1),
707 u64::MAX,
708 );
709
710 if &max_allowed_data_version < data_version {
711 Err(format!(
712 "{code_version} received persist state from the future {data_version}",
713 ))
714 } else {
715 Ok(())
716 }
717}