1#![allow(missing_docs)]
11
12use std::sync::Arc;
15use std::sync::atomic::AtomicBool;
16use std::time::{Duration, SystemTime, UNIX_EPOCH};
17
18use mz_build_info::BuildInfo;
19use mz_dyncfg::{Config, ConfigDefault, ConfigSet, ConfigUpdates};
20use mz_ore::instrument;
21use mz_ore::now::NowFn;
22use mz_persist::cfg::BlobKnobs;
23use mz_persist::retry::Retry;
24use mz_postgres_client::PostgresClientKnobs;
25use proptest_derive::Arbitrary;
26use semver::Version;
27use serde::{Deserialize, Serialize};
28use tokio::sync::watch;
29
30use crate::async_runtime;
31use crate::internal::machine::{
32 NEXT_LISTEN_BATCH_RETRYER_CLAMP, NEXT_LISTEN_BATCH_RETRYER_INITIAL_BACKOFF,
33 NEXT_LISTEN_BATCH_RETRYER_MULTIPLIER,
34};
35use crate::internal::state::ROLLUP_THRESHOLD;
36use crate::operators::STORAGE_SOURCE_DECODE_FUEL;
37use crate::read::READER_LEASE_DURATION;
38
39const SELF_MANAGED_VERSIONS: &[Version; 2] = &[
41 Version::new(0, 130, 0),
43 Version::new(0, 147, 0),
45];
46
47#[derive(Debug, Clone)]
99pub struct PersistConfig {
100 pub build_version: Version,
102 pub hostname: String,
105 pub is_cc_active: bool,
107 pub announce_memory_limit: Option<usize>,
109 pub now: NowFn,
111 pub configs: Arc<ConfigSet>,
116 configs_synced_once: Arc<watch::Sender<bool>>,
119 pub compaction_enabled: bool,
121 pub compaction_process_requests: Arc<AtomicBool>,
123 pub compaction_concurrency_limit: usize,
126 pub compaction_queue_size: usize,
129 pub compaction_yield_after_n_updates: usize,
132 pub writer_lease_duration: Duration,
135 pub critical_downgrade_interval: Duration,
137 pub isolated_runtime_worker_threads: usize,
140}
141
142impl std::ops::Deref for PersistConfig {
144 type Target = ConfigSet;
145 fn deref(&self) -> &Self::Target {
146 &self.configs
147 }
148}
149
150impl PersistConfig {
151 pub fn new_default_configs(build_info: &BuildInfo, now: NowFn) -> Self {
154 Self::new(build_info, now, all_dyncfgs(ConfigSet::default()))
155 }
156
157 pub fn new(build_info: &BuildInfo, now: NowFn, configs: ConfigSet) -> Self {
160 let compaction_disabled = mz_ore::env::is_var_truthy("MZ_PERSIST_COMPACTION_DISABLED");
162
163 let (configs_synced_once, _) = watch::channel(false);
165
166 Self {
167 build_version: build_info.semver_version(),
168 is_cc_active: false,
169 announce_memory_limit: None,
170 now,
171 configs: Arc::new(configs),
172 configs_synced_once: Arc::new(configs_synced_once),
173 compaction_enabled: !compaction_disabled,
174 compaction_process_requests: Arc::new(AtomicBool::new(true)),
175 compaction_concurrency_limit: 5,
176 compaction_queue_size: 20,
177 compaction_yield_after_n_updates: 100_000,
178 writer_lease_duration: 60 * Duration::from_secs(60),
179 critical_downgrade_interval: Duration::from_secs(30),
180 isolated_runtime_worker_threads: num_cpus::get(),
181 hostname: {
186 use std::fmt::Write;
187 let mut name = std::env::var("HOSTNAME").unwrap_or_else(|_| "unknown".to_owned());
188 write!(&mut name, " {}", build_info.version)
189 .expect("writing to string should not fail");
190 name
191 },
192 }
193 }
194
195 pub(crate) fn set_config<T: ConfigDefault>(&self, cfg: &Config<T>, val: T) {
196 let mut updates = ConfigUpdates::default();
197 updates.add(cfg, val);
198 updates.apply(self)
199 }
200
201 pub fn apply_from(&self, updates: &ConfigUpdates) {
207 updates.apply(&self.configs);
208 self.configs_synced_once.send_replace(true);
209 }
210
211 #[instrument(level = "info")]
220 pub async fn configs_synced_once(&self) {
221 self.configs_synced_once
222 .subscribe()
223 .wait_for(|synced| *synced)
224 .await
225 .expect("we have a borrow on sender so it cannot drop");
226 }
227
228 pub fn storage_source_decode_fuel(&self) -> usize {
231 STORAGE_SOURCE_DECODE_FUEL.get(self)
232 }
233
234 pub fn set_reader_lease_duration(&self, val: Duration) {
236 self.set_config(&READER_LEASE_DURATION, val);
237 }
238
239 pub fn set_rollup_threshold(&self, val: usize) {
241 self.set_config(&ROLLUP_THRESHOLD, val);
242 }
243
244 pub fn set_next_listen_batch_retryer(&self, val: RetryParameters) {
247 self.set_config(
248 &NEXT_LISTEN_BATCH_RETRYER_INITIAL_BACKOFF,
249 val.initial_backoff,
250 );
251 self.set_config(&NEXT_LISTEN_BATCH_RETRYER_MULTIPLIER, val.multiplier);
252 self.set_config(&NEXT_LISTEN_BATCH_RETRYER_CLAMP, val.clamp);
253 }
254
255 pub fn disable_compaction(&self) {
256 tracing::info!("Disabling Persist Compaction");
257 self.compaction_process_requests
258 .store(false, std::sync::atomic::Ordering::Relaxed);
259 }
260
261 pub fn enable_compaction(&self) {
262 tracing::info!("Enabling Persist Compaction");
263 self.compaction_process_requests
264 .store(true, std::sync::atomic::Ordering::Relaxed);
265 }
266
267 pub fn new_for_tests() -> Self {
269 use mz_build_info::DUMMY_BUILD_INFO;
270 use mz_ore::now::SYSTEM_TIME;
271
272 let mut cfg = Self::new_default_configs(&DUMMY_BUILD_INFO, SYSTEM_TIME.clone());
273 cfg.hostname = "tests".into();
274 cfg.isolated_runtime_worker_threads = async_runtime::TEST_THREADS;
275 cfg
276 }
277}
278
279#[allow(non_upper_case_globals)]
280pub(crate) const MiB: usize = 1024 * 1024;
281
282pub fn all_dyncfgs(configs: ConfigSet) -> ConfigSet {
288 mz_persist::cfg::all_dyn_configs(configs)
289 .add(&crate::batch::BATCH_DELETE_ENABLED)
290 .add(&crate::batch::BLOB_TARGET_SIZE)
291 .add(&crate::batch::INLINE_WRITES_TOTAL_MAX_BYTES)
292 .add(&crate::batch::INLINE_WRITES_SINGLE_MAX_BYTES)
293 .add(&crate::batch::ENCODING_ENABLE_DICTIONARY)
294 .add(&crate::batch::ENCODING_COMPRESSION_FORMAT)
295 .add(&crate::batch::STRUCTURED_KEY_LOWER_LEN)
296 .add(&crate::batch::MAX_RUN_LEN)
297 .add(&crate::batch::MAX_RUNS)
298 .add(&BLOB_OPERATION_TIMEOUT)
299 .add(&BLOB_OPERATION_ATTEMPT_TIMEOUT)
300 .add(&BLOB_CONNECT_TIMEOUT)
301 .add(&BLOB_READ_TIMEOUT)
302 .add(&crate::cfg::CONSENSUS_CONNECTION_POOL_MAX_SIZE)
303 .add(&crate::cfg::CONSENSUS_CONNECTION_POOL_MAX_WAIT)
304 .add(&crate::cfg::CONSENSUS_CONNECTION_POOL_TTL_STAGGER)
305 .add(&crate::cfg::CONSENSUS_CONNECTION_POOL_TTL)
306 .add(&crate::cfg::CRDB_CONNECT_TIMEOUT)
307 .add(&crate::cfg::CRDB_TCP_USER_TIMEOUT)
308 .add(&crate::cfg::CRDB_KEEPALIVES_IDLE)
309 .add(&crate::cfg::CRDB_KEEPALIVES_INTERVAL)
310 .add(&crate::cfg::CRDB_KEEPALIVES_RETRIES)
311 .add(&crate::cfg::USE_CRITICAL_SINCE_TXN)
312 .add(&crate::cfg::USE_CRITICAL_SINCE_CATALOG)
313 .add(&crate::cfg::USE_CRITICAL_SINCE_SOURCE)
314 .add(&crate::cfg::USE_CRITICAL_SINCE_SNAPSHOT)
315 .add(&BATCH_BUILDER_MAX_OUTSTANDING_PARTS)
316 .add(&COMPACTION_HEURISTIC_MIN_INPUTS)
317 .add(&COMPACTION_HEURISTIC_MIN_PARTS)
318 .add(&COMPACTION_HEURISTIC_MIN_UPDATES)
319 .add(&COMPACTION_MEMORY_BOUND_BYTES)
320 .add(&GC_BLOB_DELETE_CONCURRENCY_LIMIT)
321 .add(&STATE_VERSIONS_RECENT_LIVE_DIFFS_LIMIT)
322 .add(&USAGE_STATE_FETCH_CONCURRENCY_LIMIT)
323 .add(&crate::cli::admin::CATALOG_FORCE_COMPACTION_FUEL)
324 .add(&crate::cli::admin::CATALOG_FORCE_COMPACTION_WAIT)
325 .add(&crate::cli::admin::EXPRESSION_CACHE_FORCE_COMPACTION_FUEL)
326 .add(&crate::cli::admin::EXPRESSION_CACHE_FORCE_COMPACTION_WAIT)
327 .add(&crate::fetch::FETCH_SEMAPHORE_COST_ADJUSTMENT)
328 .add(&crate::fetch::FETCH_SEMAPHORE_PERMIT_ADJUSTMENT)
329 .add(&crate::fetch::VALIDATE_PART_BOUNDS_ON_READ)
330 .add(&crate::fetch::OPTIMIZE_IGNORED_DATA_FETCH)
331 .add(&crate::internal::cache::BLOB_CACHE_MEM_LIMIT_BYTES)
332 .add(&crate::internal::cache::BLOB_CACHE_SCALE_WITH_THREADS)
333 .add(&crate::internal::cache::BLOB_CACHE_SCALE_FACTOR_BYTES)
334 .add(&crate::internal::compact::COMPACTION_MINIMUM_TIMEOUT)
335 .add(&crate::internal::compact::COMPACTION_CHECK_PROCESS_FLAG)
336 .add(&crate::internal::machine::CLAIM_UNCLAIMED_COMPACTIONS)
337 .add(&crate::internal::machine::CLAIM_COMPACTION_PERCENT)
338 .add(&crate::internal::machine::CLAIM_COMPACTION_MIN_VERSION)
339 .add(&crate::internal::machine::NEXT_LISTEN_BATCH_RETRYER_CLAMP)
340 .add(&crate::internal::machine::NEXT_LISTEN_BATCH_RETRYER_FIXED_SLEEP)
341 .add(&crate::internal::machine::NEXT_LISTEN_BATCH_RETRYER_INITIAL_BACKOFF)
342 .add(&crate::internal::machine::NEXT_LISTEN_BATCH_RETRYER_MULTIPLIER)
343 .add(&crate::internal::state::ROLLUP_THRESHOLD)
344 .add(&crate::internal::state::ROLLUP_USE_ACTIVE_ROLLUP)
345 .add(&crate::internal::state::GC_FALLBACK_THRESHOLD_MS)
346 .add(&crate::internal::state::GC_USE_ACTIVE_GC)
347 .add(&crate::internal::state::GC_MIN_VERSIONS)
348 .add(&crate::internal::state::GC_MAX_VERSIONS)
349 .add(&crate::internal::state::ROLLUP_FALLBACK_THRESHOLD_MS)
350 .add(&crate::internal::state::ENABLE_INCREMENTAL_COMPACTION)
351 .add(&crate::operators::STORAGE_SOURCE_DECODE_FUEL)
352 .add(&crate::read::READER_LEASE_DURATION)
353 .add(&crate::rpc::PUBSUB_CLIENT_ENABLED)
354 .add(&crate::rpc::PUBSUB_PUSH_DIFF_ENABLED)
355 .add(&crate::rpc::PUBSUB_SAME_PROCESS_DELEGATE_ENABLED)
356 .add(&crate::rpc::PUBSUB_CONNECT_ATTEMPT_TIMEOUT)
357 .add(&crate::rpc::PUBSUB_REQUEST_TIMEOUT)
358 .add(&crate::rpc::PUBSUB_CONNECT_MAX_BACKOFF)
359 .add(&crate::rpc::PUBSUB_CLIENT_SENDER_CHANNEL_SIZE)
360 .add(&crate::rpc::PUBSUB_CLIENT_RECEIVER_CHANNEL_SIZE)
361 .add(&crate::rpc::PUBSUB_SERVER_CONNECTION_CHANNEL_SIZE)
362 .add(&crate::rpc::PUBSUB_STATE_CACHE_SHARD_REF_CHANNEL_SIZE)
363 .add(&crate::rpc::PUBSUB_RECONNECT_BACKOFF)
364 .add(&crate::stats::STATS_AUDIT_PERCENT)
365 .add(&crate::stats::STATS_AUDIT_PANIC)
366 .add(&crate::stats::STATS_BUDGET_BYTES)
367 .add(&crate::stats::STATS_COLLECTION_ENABLED)
368 .add(&crate::stats::STATS_FILTER_ENABLED)
369 .add(&crate::stats::STATS_UNTRIMMABLE_COLUMNS_EQUALS)
370 .add(&crate::stats::STATS_UNTRIMMABLE_COLUMNS_PREFIX)
371 .add(&crate::stats::STATS_UNTRIMMABLE_COLUMNS_SUFFIX)
372 .add(&crate::fetch::PART_DECODE_FORMAT)
373 .add(&crate::write::COMBINE_INLINE_WRITES)
374 .add(&crate::write::VALIDATE_PART_BOUNDS_ON_WRITE)
375}
376
377impl PersistConfig {
378 pub(crate) const DEFAULT_FALLBACK_ROLLUP_THRESHOLD_MULTIPLIER: usize = 3;
379
380 pub fn set_state_versions_recent_live_diffs_limit(&self, val: usize) {
381 self.set_config(&STATE_VERSIONS_RECENT_LIVE_DIFFS_LIMIT, val);
382 }
383}
384
385pub const CONSENSUS_CONNECTION_POOL_MAX_SIZE: Config<usize> = Config::new(
389 "persist_consensus_connection_pool_max_size",
390 50,
391 "The maximum size the connection pool to Postgres/CRDB will grow to.",
392);
393
394const CONSENSUS_CONNECTION_POOL_MAX_WAIT: Config<Duration> = Config::new(
399 "persist_consensus_connection_pool_max_wait",
400 Duration::from_secs(60),
401 "The amount of time we'll wait for a connection to become available.",
402);
403
404const CONSENSUS_CONNECTION_POOL_TTL: Config<Duration> = Config::new(
408 "persist_consensus_connection_pool_ttl",
409 Duration::from_secs(300),
410 "\
411 The minimum TTL of a Consensus connection to Postgres/CRDB before it is \
412 proactively terminated",
413);
414
415const CONSENSUS_CONNECTION_POOL_TTL_STAGGER: Config<Duration> = Config::new(
423 "persist_consensus_connection_pool_ttl_stagger",
424 Duration::from_secs(6),
425 "The minimum time between TTLing Consensus connections to Postgres/CRDB.",
426);
427
428pub const CRDB_CONNECT_TIMEOUT: Config<Duration> = Config::new(
431 "crdb_connect_timeout",
432 Duration::from_secs(5),
433 "The time to connect to CockroachDB before timing out and retrying.",
434);
435
436pub const CRDB_TCP_USER_TIMEOUT: Config<Duration> = Config::new(
440 "crdb_tcp_user_timeout",
441 Duration::from_secs(30),
442 "\
443 The TCP timeout for connections to CockroachDB. Specifies the amount of \
444 time that transmitted data may remain unacknowledged before the TCP \
445 connection is forcibly closed.",
446);
447
448pub const CRDB_KEEPALIVES_IDLE: Config<Duration> = Config::new(
449 "crdb_keepalives_idle",
450 Duration::from_secs(10),
451 "\
452 The amount of idle time before a TCP keepalive packet is sent on CRDB \
453 connections.",
454);
455
456pub const CRDB_KEEPALIVES_INTERVAL: Config<Duration> = Config::new(
457 "crdb_keepalives_interval",
458 Duration::from_secs(5),
459 "The time interval between TCP keepalive probes on CRDB connections.",
460);
461
462pub const CRDB_KEEPALIVES_RETRIES: Config<u32> = Config::new(
463 "crdb_keepalives_retries",
464 5,
465 "\
466 The maximum number of TCP keepalive probes that will be sent before \
467 dropping a CRDB connection.",
468);
469
470pub const USE_CRITICAL_SINCE_TXN: Config<bool> = Config::new(
472 "persist_use_critical_since_txn",
473 true,
474 "Use the critical since (instead of the overall since) when initializing a subscribe.",
475);
476
477pub const USE_CRITICAL_SINCE_CATALOG: Config<bool> = Config::new(
479 "persist_use_critical_since_catalog",
480 false,
481 "Use the critical since (instead of the overall since) for the Persist-backed catalog.",
482);
483
484pub const USE_CRITICAL_SINCE_SOURCE: Config<bool> = Config::new(
486 "persist_use_critical_since_source",
487 false,
488 "Use the critical since (instead of the overall since) in the Persist source.",
489);
490
491pub const USE_CRITICAL_SINCE_SNAPSHOT: Config<bool> = Config::new(
493 "persist_use_critical_since_snapshot",
494 false,
495 "Use the critical since (instead of the overall since) when taking snapshots in the controller or in fast-path peeks.",
496);
497
498pub const BATCH_BUILDER_MAX_OUTSTANDING_PARTS: Config<usize> = Config::new(
502 "persist_batch_builder_max_outstanding_parts",
503 2,
504 "The number of writes a batch builder can have outstanding before we slow down the writer.",
505);
506
507pub const COMPACTION_HEURISTIC_MIN_INPUTS: Config<usize> = Config::new(
511 "persist_compaction_heuristic_min_inputs",
512 8,
513 "Don't skip compaction if we have more than this many hollow batches as input.",
514);
515
516pub const COMPACTION_HEURISTIC_MIN_PARTS: Config<usize> = Config::new(
520 "persist_compaction_heuristic_min_parts",
521 8,
522 "Don't skip compaction if we have more than this many parts as input.",
523);
524
525pub const COMPACTION_HEURISTIC_MIN_UPDATES: Config<usize> = Config::new(
529 "persist_compaction_heuristic_min_updates",
530 1024,
531 "Don't skip compaction if we have more than this many updates as input.",
532);
533
534pub const COMPACTION_MEMORY_BOUND_BYTES: Config<usize> = Config::new(
539 "persist_compaction_memory_bound_bytes",
540 1024 * MiB,
541 "Attempt to limit compaction to this amount of memory.",
542);
543
544pub const GC_BLOB_DELETE_CONCURRENCY_LIMIT: Config<usize> = Config::new(
546 "persist_gc_blob_delete_concurrency_limit",
547 32,
548 "Limit the number of concurrent deletes GC can perform to this threshold.",
549);
550
551pub const STATE_VERSIONS_RECENT_LIVE_DIFFS_LIMIT: Config<usize> = Config::new(
561 "persist_state_versions_recent_live_diffs_limit",
562 30 * 128,
563 "Fetch this many diffs when fetching recent diffs.",
564);
565
566pub const USAGE_STATE_FETCH_CONCURRENCY_LIMIT: Config<usize> = Config::new(
568 "persist_usage_state_fetch_concurrency_limit",
569 8,
570 "Limit the concurrency in of fetching in the perioding Persist-storage-usage calculation.",
571);
572
573impl PostgresClientKnobs for PersistConfig {
574 fn connection_pool_max_size(&self) -> usize {
575 CONSENSUS_CONNECTION_POOL_MAX_SIZE.get(self)
576 }
577
578 fn connection_pool_max_wait(&self) -> Option<Duration> {
579 Some(CONSENSUS_CONNECTION_POOL_MAX_WAIT.get(self))
580 }
581
582 fn connection_pool_ttl(&self) -> Duration {
583 CONSENSUS_CONNECTION_POOL_TTL.get(self)
584 }
585
586 fn connection_pool_ttl_stagger(&self) -> Duration {
587 CONSENSUS_CONNECTION_POOL_TTL_STAGGER.get(self)
588 }
589
590 fn connect_timeout(&self) -> Duration {
591 CRDB_CONNECT_TIMEOUT.get(self)
592 }
593
594 fn tcp_user_timeout(&self) -> Duration {
595 CRDB_TCP_USER_TIMEOUT.get(self)
596 }
597
598 fn keepalives_idle(&self) -> Duration {
599 CRDB_KEEPALIVES_IDLE.get(self)
600 }
601
602 fn keepalives_interval(&self) -> Duration {
603 CRDB_KEEPALIVES_INTERVAL.get(self)
604 }
605
606 fn keepalives_retries(&self) -> u32 {
607 CRDB_KEEPALIVES_RETRIES.get(self)
608 }
609}
610
611#[derive(Copy, Clone, Debug, Eq, PartialEq, Arbitrary, Serialize, Deserialize)]
612pub struct RetryParameters {
613 pub fixed_sleep: Duration,
614 pub initial_backoff: Duration,
615 pub multiplier: u32,
616 pub clamp: Duration,
617}
618
619impl RetryParameters {
620 pub(crate) fn into_retry(self, now: SystemTime) -> Retry {
621 let seed = now
622 .duration_since(UNIX_EPOCH)
623 .map_or(0, |x| u64::from(x.subsec_nanos()));
624 Retry {
625 fixed_sleep: self.fixed_sleep,
626 initial_backoff: self.initial_backoff,
627 multiplier: self.multiplier,
628 clamp_backoff: self.clamp,
629 seed,
630 }
631 }
632}
633
634pub(crate) const BLOB_OPERATION_TIMEOUT: Config<Duration> = Config::new(
635 "persist_blob_operation_timeout",
636 Duration::from_secs(180),
637 "Maximum time allowed for a network call, including retry attempts.",
638);
639
640pub(crate) const BLOB_OPERATION_ATTEMPT_TIMEOUT: Config<Duration> = Config::new(
641 "persist_blob_operation_attempt_timeout",
642 Duration::from_secs(90),
643 "Maximum time allowed for a single network call.",
644);
645
646pub(crate) const BLOB_CONNECT_TIMEOUT: Config<Duration> = Config::new(
647 "persist_blob_connect_timeout",
648 Duration::from_secs(7),
649 "Maximum time to wait for a socket connection to be made.",
650);
651
652pub(crate) const BLOB_READ_TIMEOUT: Config<Duration> = Config::new(
653 "persist_blob_read_timeout",
654 Duration::from_secs(10),
655 "Maximum time to wait to read the first byte of a response, including connection time.",
656);
657
658impl BlobKnobs for PersistConfig {
659 fn operation_timeout(&self) -> Duration {
660 BLOB_OPERATION_TIMEOUT.get(self)
661 }
662
663 fn operation_attempt_timeout(&self) -> Duration {
664 BLOB_OPERATION_ATTEMPT_TIMEOUT.get(self)
665 }
666
667 fn connect_timeout(&self) -> Duration {
668 BLOB_CONNECT_TIMEOUT.get(self)
669 }
670
671 fn read_timeout(&self) -> Duration {
672 BLOB_READ_TIMEOUT.get(self)
673 }
674
675 fn is_cc_active(&self) -> bool {
676 self.is_cc_active
677 }
678}
679
680pub fn code_can_read_data(code_version: &Version, data_version: &Version) -> bool {
701 code_version.cmp_precedence(data_version).is_ge()
704}
705
706pub fn code_can_write_data(code_version: &Version, data_version: &Version) -> bool {
711 if !code_can_read_data(code_version, data_version) {
712 return false;
713 }
714
715 if code_version.major == 0 && code_version.minor <= SELF_MANAGED_VERSIONS[1].minor {
716 true
719 } else if code_version.major == 0 {
720 SELF_MANAGED_VERSIONS[0]
722 .cmp_precedence(data_version)
723 .is_le()
724 } else if code_version.major <= 26 {
725 SELF_MANAGED_VERSIONS[1]
727 .cmp_precedence(data_version)
728 .is_le()
729 } else {
730 code_version.major - 1 <= data_version.major
732 }
733}