mz_persist_client/
cfg.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10#![allow(missing_docs)]
11
12//! The tunable knobs for persist.
13
14use std::sync::Arc;
15use std::sync::atomic::AtomicBool;
16use std::time::{Duration, SystemTime, UNIX_EPOCH};
17
18use mz_build_info::BuildInfo;
19use mz_dyncfg::{Config, ConfigDefault, ConfigSet, ConfigUpdates};
20use mz_ore::instrument;
21use mz_ore::now::NowFn;
22use mz_persist::cfg::BlobKnobs;
23use mz_persist::retry::Retry;
24use mz_postgres_client::PostgresClientKnobs;
25use proptest_derive::Arbitrary;
26use semver::Version;
27use serde::{Deserialize, Serialize};
28use tokio::sync::watch;
29
30use crate::internal::machine::{
31    NEXT_LISTEN_BATCH_RETRYER_CLAMP, NEXT_LISTEN_BATCH_RETRYER_INITIAL_BACKOFF,
32    NEXT_LISTEN_BATCH_RETRYER_MULTIPLIER,
33};
34use crate::internal::state::ROLLUP_THRESHOLD;
35use crate::operators::STORAGE_SOURCE_DECODE_FUEL;
36use crate::read::READER_LEASE_DURATION;
37
38const LTS_VERSIONS: &[Version] = &[
39    // 25.1
40    Version::new(0, 130, 0),
41];
42
43/// The tunable knobs for persist.
44///
45/// Tuning inputs:
46/// - A larger blob_target_size (capped at KEY_VAL_DATA_MAX_LEN) results in
47///   fewer entries in consensus state. Before we have compaction and/or
48///   incremental state, it is already growing without bound, so this is a
49///   concern. OTOH, for any "reasonable" size (> 100MiB?) of blob_target_size,
50///   it seems we'd end up with a pretty tremendous amount of data in the shard
51///   before this became a real issue.
52/// - A larger blob_target_size will results in fewer s3 operations, which are
53///   charged per operation. (Hmm, maybe not if we're charged per call in a
54///   multipart op. The S3Blob impl already chunks things at 8MiB.)
55/// - A smaller blob_target_size will result in more even memory usage in
56///   readers.
57/// - A larger batch_builder_max_outstanding_parts increases throughput (to a
58///   point).
59/// - A smaller batch_builder_max_outstanding_parts provides a bound on the
60///   amount of memory used by a writer.
61/// - A larger compaction_heuristic_min_inputs means state size is larger.
62/// - A smaller compaction_heuristic_min_inputs means more compactions happen
63///   (higher write amp).
64/// - A larger compaction_heuristic_min_updates means more consolidations are
65///   discovered while reading a snapshot (higher read amp and higher space
66///   amp).
67/// - A smaller compaction_heuristic_min_updates means more compactions happen
68///   (higher write amp).
69///
70/// Tuning logic:
71/// - blob_target_size was initially selected to be an exact multiple of 8MiB
72///   (the s3 multipart size) that was in the same neighborhood as our initial
73///   max throughput (~250MiB).
74/// - batch_builder_max_outstanding_parts was initially selected to be as small
75///   as possible without harming pipelining. 0 means no pipelining, 1 is full
76///   pipelining as long as generating data takes less time than writing to s3
77///   (hopefully a fair assumption), 2 is a little extra slop on top of 1.
78/// - compaction_heuristic_min_inputs was set by running the open-loop benchmark
79///   with batches of size 10,240 bytes (selected to be small but such that the
80///   overhead of our columnar encoding format was less than 10%) and manually
81///   increased until the write amp stopped going down. This becomes much less
82///   important once we have incremental state. The initial value is a
83///   placeholder and should be revisited at some point.
84/// - compaction_heuristic_min_updates was set via a thought experiment. This is
85///   an `O(n*log(n))` upper bound on the number of unconsolidated updates that
86///   would be consolidated if we compacted as the in-mem Spine does. The
87///   initial value is a placeholder and should be revisited at some point.
88///
89/// TODO: Move these tuning notes into SessionVar descriptions once we have
90/// SystemVars for most of these.
91//
92// TODO: The configs left here don't react dynamically to changes. Move as many
93// of them to DynamicConfig as possible.
94#[derive(Debug, Clone)]
95pub struct PersistConfig {
96    /// Info about which version of the code is running.
97    pub build_version: Version,
98    /// Hostname of this persist user. Stored in state and used for debugging.
99    pub hostname: String,
100    /// Whether this persist instance is running in a "cc" sized cluster.
101    pub is_cc_active: bool,
102    /// Memory limit of the process, if known.
103    pub announce_memory_limit: Option<usize>,
104    /// A clock to use for all leasing and other non-debugging use.
105    pub now: NowFn,
106    /// Persist [Config]s that can change value dynamically within the lifetime
107    /// of a process.
108    ///
109    /// TODO(cfg): Entirely replace dynamic with this.
110    pub configs: Arc<ConfigSet>,
111    /// Indicates whether `configs` has been synced at least once with an
112    /// upstream source.
113    configs_synced_once: Arc<watch::Sender<bool>>,
114    /// Whether to physically and logically compact batches in blob storage.
115    pub compaction_enabled: bool,
116    /// Whether the `Compactor` will process compaction requests, or drop them on the floor.
117    pub compaction_process_requests: Arc<AtomicBool>,
118    /// In Compactor::compact_and_apply_background, the maximum number of concurrent
119    /// compaction requests that can execute for a given shard.
120    pub compaction_concurrency_limit: usize,
121    /// In Compactor::compact_and_apply_background, the maximum number of pending
122    /// compaction requests to queue.
123    pub compaction_queue_size: usize,
124    /// In Compactor::compact_and_apply_background, how many updates to encode or
125    /// decode before voluntarily yielding the task.
126    pub compaction_yield_after_n_updates: usize,
127    /// Length of time after a writer's last operation after which the writer
128    /// may be expired.
129    pub writer_lease_duration: Duration,
130    /// Length of time between critical handles' calls to downgrade since
131    pub critical_downgrade_interval: Duration,
132    /// Number of worker threads to create for the [`crate::IsolatedRuntime`], defaults to the
133    /// number of threads.
134    pub isolated_runtime_worker_threads: usize,
135}
136
137// Impl Deref to ConfigSet for convenience of accessing the dynamic configs.
138impl std::ops::Deref for PersistConfig {
139    type Target = ConfigSet;
140    fn deref(&self) -> &Self::Target {
141        &self.configs
142    }
143}
144
145impl PersistConfig {
146    /// Returns a new instance of [PersistConfig] with default tuning and
147    /// default ConfigSet.
148    pub fn new_default_configs(build_info: &BuildInfo, now: NowFn) -> Self {
149        Self::new(build_info, now, all_dyncfgs(ConfigSet::default()))
150    }
151
152    /// Returns a new instance of [PersistConfig] with default tuning and the
153    /// specified ConfigSet.
154    pub fn new(build_info: &BuildInfo, now: NowFn, configs: ConfigSet) -> Self {
155        // Escape hatch in case we need to disable compaction.
156        let compaction_disabled = mz_ore::env::is_var_truthy("MZ_PERSIST_COMPACTION_DISABLED");
157
158        // We create receivers on demand, so we drop the initial receiver.
159        let (configs_synced_once, _) = watch::channel(false);
160
161        Self {
162            build_version: build_info.semver_version(),
163            is_cc_active: false,
164            announce_memory_limit: None,
165            now,
166            configs: Arc::new(configs),
167            configs_synced_once: Arc::new(configs_synced_once),
168            compaction_enabled: !compaction_disabled,
169            compaction_process_requests: Arc::new(AtomicBool::new(true)),
170            compaction_concurrency_limit: 5,
171            compaction_queue_size: 20,
172            compaction_yield_after_n_updates: 100_000,
173            writer_lease_duration: 60 * Duration::from_secs(60),
174            critical_downgrade_interval: Duration::from_secs(30),
175            isolated_runtime_worker_threads: num_cpus::get(),
176            // TODO: This doesn't work with the process orchestrator. Instead,
177            // separate --log-prefix into --service-name and --enable-log-prefix
178            // options, where the first is always provided and the second is
179            // conditionally enabled by the process orchestrator.
180            hostname: std::env::var("HOSTNAME").unwrap_or_else(|_| "unknown".to_owned()),
181        }
182    }
183
184    pub(crate) fn set_config<T: ConfigDefault>(&self, cfg: &Config<T>, val: T) {
185        let mut updates = ConfigUpdates::default();
186        updates.add(cfg, val);
187        updates.apply(self)
188    }
189
190    /// Applies the provided updates to this configuration.
191    ///
192    /// You should prefer calling this method over mutating `self.configs`
193    /// directly, so that [`Self::configs_synced_once`] can be properly
194    /// maintained.
195    pub fn apply_from(&self, updates: &ConfigUpdates) {
196        updates.apply(&self.configs);
197        self.configs_synced_once.send_replace(true);
198    }
199
200    /// Resolves when `configs` has been synced at least once with an upstream
201    /// source, i.e., via [`Self::apply_from`].
202    ///
203    /// If `configs` has already been synced once at the time the method is
204    /// called, resolves immediately.
205    ///
206    /// Useful in conjunction with configuration parameters that cannot be
207    /// dynamically updated once set (e.g., PubSub).
208    #[instrument(level = "info")]
209    pub async fn configs_synced_once(&self) {
210        self.configs_synced_once
211            .subscribe()
212            .wait_for(|synced| *synced)
213            .await
214            .expect("we have a borrow on sender so it cannot drop");
215    }
216
217    /// The maximum amount of work to do in the persist_source mfp_and_decode
218    /// operator before yielding.
219    pub fn storage_source_decode_fuel(&self) -> usize {
220        STORAGE_SOURCE_DECODE_FUEL.get(self)
221    }
222
223    /// Overrides the value for "persist_reader_lease_duration".
224    pub fn set_reader_lease_duration(&self, val: Duration) {
225        self.set_config(&READER_LEASE_DURATION, val);
226    }
227
228    /// Overrides the value for "persist_rollup_threshold".
229    pub fn set_rollup_threshold(&self, val: usize) {
230        self.set_config(&ROLLUP_THRESHOLD, val);
231    }
232
233    /// Overrides the value for the "persist_next_listen_batch_retryer_*"
234    /// configs.
235    pub fn set_next_listen_batch_retryer(&self, val: RetryParameters) {
236        self.set_config(
237            &NEXT_LISTEN_BATCH_RETRYER_INITIAL_BACKOFF,
238            val.initial_backoff,
239        );
240        self.set_config(&NEXT_LISTEN_BATCH_RETRYER_MULTIPLIER, val.multiplier);
241        self.set_config(&NEXT_LISTEN_BATCH_RETRYER_CLAMP, val.clamp);
242    }
243
244    pub fn disable_compaction(&self) {
245        tracing::info!("Disabling Persist Compaction");
246        self.compaction_process_requests
247            .store(false, std::sync::atomic::Ordering::Relaxed);
248    }
249
250    pub fn enable_compaction(&self) {
251        tracing::info!("Enabling Persist Compaction");
252        self.compaction_process_requests
253            .store(true, std::sync::atomic::Ordering::Relaxed);
254    }
255
256    /// Returns a new instance of [PersistConfig] for tests.
257    pub fn new_for_tests() -> Self {
258        use mz_build_info::DUMMY_BUILD_INFO;
259        use mz_ore::now::SYSTEM_TIME;
260
261        let mut cfg = Self::new_default_configs(&DUMMY_BUILD_INFO, SYSTEM_TIME.clone());
262        cfg.hostname = "tests".into();
263        cfg
264    }
265}
266
267#[allow(non_upper_case_globals)]
268pub(crate) const MiB: usize = 1024 * 1024;
269
270/// Adds the full set of all persist [Config]s.
271///
272/// TODO(cfg): Consider replacing this with a static global registry powered by
273/// something like the `ctor` or `inventory` crate. This would involve managing
274/// the footgun of a Config being linked into one binary but not the other.
275pub fn all_dyncfgs(configs: ConfigSet) -> ConfigSet {
276    mz_persist::cfg::all_dyn_configs(configs)
277        .add(&crate::batch::BATCH_DELETE_ENABLED)
278        .add(&crate::batch::BLOB_TARGET_SIZE)
279        .add(&crate::batch::INLINE_WRITES_TOTAL_MAX_BYTES)
280        .add(&crate::batch::INLINE_WRITES_SINGLE_MAX_BYTES)
281        .add(&crate::batch::ENCODING_ENABLE_DICTIONARY)
282        .add(&crate::batch::ENCODING_COMPRESSION_FORMAT)
283        .add(&crate::batch::STRUCTURED_KEY_LOWER_LEN)
284        .add(&crate::batch::MAX_RUN_LEN)
285        .add(&crate::batch::MAX_RUNS)
286        .add(&BLOB_OPERATION_TIMEOUT)
287        .add(&BLOB_OPERATION_ATTEMPT_TIMEOUT)
288        .add(&BLOB_CONNECT_TIMEOUT)
289        .add(&BLOB_READ_TIMEOUT)
290        .add(&crate::cfg::CONSENSUS_CONNECTION_POOL_MAX_SIZE)
291        .add(&crate::cfg::CONSENSUS_CONNECTION_POOL_MAX_WAIT)
292        .add(&crate::cfg::CONSENSUS_CONNECTION_POOL_TTL_STAGGER)
293        .add(&crate::cfg::CONSENSUS_CONNECTION_POOL_TTL)
294        .add(&crate::cfg::CRDB_CONNECT_TIMEOUT)
295        .add(&crate::cfg::CRDB_TCP_USER_TIMEOUT)
296        .add(&crate::cfg::USE_CRITICAL_SINCE_TXN)
297        .add(&crate::cfg::USE_CRITICAL_SINCE_CATALOG)
298        .add(&crate::cfg::USE_CRITICAL_SINCE_SOURCE)
299        .add(&crate::cfg::USE_CRITICAL_SINCE_SNAPSHOT)
300        .add(&crate::cfg::USE_GLOBAL_TXN_CACHE_SOURCE)
301        .add(&BATCH_BUILDER_MAX_OUTSTANDING_PARTS)
302        .add(&COMPACTION_HEURISTIC_MIN_INPUTS)
303        .add(&COMPACTION_HEURISTIC_MIN_PARTS)
304        .add(&COMPACTION_HEURISTIC_MIN_UPDATES)
305        .add(&COMPACTION_MEMORY_BOUND_BYTES)
306        .add(&GC_BLOB_DELETE_CONCURRENCY_LIMIT)
307        .add(&STATE_VERSIONS_RECENT_LIVE_DIFFS_LIMIT)
308        .add(&USAGE_STATE_FETCH_CONCURRENCY_LIMIT)
309        .add(&crate::cli::admin::CATALOG_FORCE_COMPACTION_FUEL)
310        .add(&crate::cli::admin::CATALOG_FORCE_COMPACTION_WAIT)
311        .add(&crate::cli::admin::EXPRESSION_CACHE_FORCE_COMPACTION_FUEL)
312        .add(&crate::cli::admin::EXPRESSION_CACHE_FORCE_COMPACTION_WAIT)
313        .add(&crate::fetch::FETCH_SEMAPHORE_COST_ADJUSTMENT)
314        .add(&crate::fetch::FETCH_SEMAPHORE_PERMIT_ADJUSTMENT)
315        .add(&crate::fetch::OPTIMIZE_IGNORED_DATA_FETCH)
316        .add(&crate::internal::cache::BLOB_CACHE_MEM_LIMIT_BYTES)
317        .add(&crate::internal::cache::BLOB_CACHE_SCALE_WITH_THREADS)
318        .add(&crate::internal::cache::BLOB_CACHE_SCALE_FACTOR_BYTES)
319        .add(&crate::internal::compact::COMPACTION_MINIMUM_TIMEOUT)
320        .add(&crate::internal::compact::COMPACTION_USE_MOST_RECENT_SCHEMA)
321        .add(&crate::internal::compact::COMPACTION_CHECK_PROCESS_FLAG)
322        .add(&crate::internal::machine::CLAIM_UNCLAIMED_COMPACTIONS)
323        .add(&crate::internal::machine::CLAIM_COMPACTION_PERCENT)
324        .add(&crate::internal::machine::CLAIM_COMPACTION_MIN_VERSION)
325        .add(&crate::internal::machine::NEXT_LISTEN_BATCH_RETRYER_CLAMP)
326        .add(&crate::internal::machine::NEXT_LISTEN_BATCH_RETRYER_FIXED_SLEEP)
327        .add(&crate::internal::machine::NEXT_LISTEN_BATCH_RETRYER_INITIAL_BACKOFF)
328        .add(&crate::internal::machine::NEXT_LISTEN_BATCH_RETRYER_MULTIPLIER)
329        .add(&crate::internal::state::ROLLUP_THRESHOLD)
330        .add(&crate::internal::state::ROLLUP_USE_ACTIVE_ROLLUP)
331        .add(&crate::internal::state::GC_FALLBACK_THRESHOLD_MS)
332        .add(&crate::internal::state::GC_USE_ACTIVE_GC)
333        .add(&crate::internal::state::ROLLUP_FALLBACK_THRESHOLD_MS)
334        .add(&crate::operators::STORAGE_SOURCE_DECODE_FUEL)
335        .add(&crate::read::READER_LEASE_DURATION)
336        .add(&crate::rpc::PUBSUB_CLIENT_ENABLED)
337        .add(&crate::rpc::PUBSUB_PUSH_DIFF_ENABLED)
338        .add(&crate::rpc::PUBSUB_SAME_PROCESS_DELEGATE_ENABLED)
339        .add(&crate::rpc::PUBSUB_CONNECT_ATTEMPT_TIMEOUT)
340        .add(&crate::rpc::PUBSUB_REQUEST_TIMEOUT)
341        .add(&crate::rpc::PUBSUB_CONNECT_MAX_BACKOFF)
342        .add(&crate::rpc::PUBSUB_CLIENT_SENDER_CHANNEL_SIZE)
343        .add(&crate::rpc::PUBSUB_CLIENT_RECEIVER_CHANNEL_SIZE)
344        .add(&crate::rpc::PUBSUB_SERVER_CONNECTION_CHANNEL_SIZE)
345        .add(&crate::rpc::PUBSUB_STATE_CACHE_SHARD_REF_CHANNEL_SIZE)
346        .add(&crate::rpc::PUBSUB_RECONNECT_BACKOFF)
347        .add(&crate::stats::STATS_AUDIT_PERCENT)
348        .add(&crate::stats::STATS_BUDGET_BYTES)
349        .add(&crate::stats::STATS_COLLECTION_ENABLED)
350        .add(&crate::stats::STATS_FILTER_ENABLED)
351        .add(&crate::stats::STATS_UNTRIMMABLE_COLUMNS_EQUALS)
352        .add(&crate::stats::STATS_UNTRIMMABLE_COLUMNS_PREFIX)
353        .add(&crate::stats::STATS_UNTRIMMABLE_COLUMNS_SUFFIX)
354        .add(&crate::fetch::PART_DECODE_FORMAT)
355        .add(&crate::write::COMBINE_INLINE_WRITES)
356}
357
358impl PersistConfig {
359    pub(crate) const DEFAULT_FALLBACK_ROLLUP_THRESHOLD_MULTIPLIER: usize = 3;
360
361    pub fn set_state_versions_recent_live_diffs_limit(&self, val: usize) {
362        self.set_config(&STATE_VERSIONS_RECENT_LIVE_DIFFS_LIMIT, val);
363    }
364}
365
366/// Sets the maximum size of the connection pool that is used by consensus.
367///
368/// Requires a restart of the process to take effect.
369pub const CONSENSUS_CONNECTION_POOL_MAX_SIZE: Config<usize> = Config::new(
370    "persist_consensus_connection_pool_max_size",
371    50,
372    "The maximum size the connection pool to Postgres/CRDB will grow to.",
373);
374
375/// Sets the maximum amount of time we'll wait to acquire a connection from
376/// the connection pool.
377///
378/// Requires a restart of the process to take effect.
379const CONSENSUS_CONNECTION_POOL_MAX_WAIT: Config<Duration> = Config::new(
380    "persist_consensus_connection_pool_max_wait",
381    Duration::from_secs(60),
382    "The amount of time we'll wait for a connection to become available.",
383);
384
385/// The minimum TTL of a connection to Postgres/CRDB before it is proactively
386/// terminated. Connections are routinely culled to balance load against the
387/// downstream database.
388const CONSENSUS_CONNECTION_POOL_TTL: Config<Duration> = Config::new(
389    "persist_consensus_connection_pool_ttl",
390    Duration::from_secs(300),
391    "\
392    The minimum TTL of a Consensus connection to Postgres/CRDB before it is \
393    proactively terminated",
394);
395
396/// The minimum time between TTLing connections to Postgres/CRDB. This delay is
397/// used to stagger reconnections to avoid stampedes and high tail latencies.
398/// This value should be much less than `consensus_connection_pool_ttl` so that
399/// reconnections are biased towards terminating the oldest connections first. A
400/// value of `consensus_connection_pool_ttl /
401/// consensus_connection_pool_max_size` is likely a good place to start so that
402/// all connections are rotated when the pool is fully used.
403const CONSENSUS_CONNECTION_POOL_TTL_STAGGER: Config<Duration> = Config::new(
404    "persist_consensus_connection_pool_ttl_stagger",
405    Duration::from_secs(6),
406    "The minimum time between TTLing Consensus connections to Postgres/CRDB.",
407);
408
409/// The duration to wait for a Consensus Postgres/CRDB connection to be made
410/// before retrying.
411pub const CRDB_CONNECT_TIMEOUT: Config<Duration> = Config::new(
412    "crdb_connect_timeout",
413    Duration::from_secs(5),
414    "The time to connect to CockroachDB before timing out and retrying.",
415);
416
417/// The TCP user timeout for a Consensus Postgres/CRDB connection. Specifies the
418/// amount of time that transmitted data may remain unacknowledged before the
419/// TCP connection is forcibly closed.
420pub const CRDB_TCP_USER_TIMEOUT: Config<Duration> = Config::new(
421    "crdb_tcp_user_timeout",
422    Duration::from_secs(30),
423    "\
424    The TCP timeout for connections to CockroachDB. Specifies the amount of \
425    time that transmitted data may remain unacknowledged before the TCP \
426    connection is forcibly closed.",
427);
428
429/// Migrate the txns code to use the critical since when opening a new read handle.
430pub const USE_CRITICAL_SINCE_TXN: Config<bool> = Config::new(
431    "persist_use_critical_since_txn",
432    true,
433    "Use the critical since (instead of the overall since) when initializing a subscribe.",
434);
435
436/// Migrate the catalog to use the critical since when opening a new read handle.
437pub const USE_CRITICAL_SINCE_CATALOG: Config<bool> = Config::new(
438    "persist_use_critical_since_catalog",
439    false,
440    "Use the critical since (instead of the overall since) for the Persist-backed catalog.",
441);
442
443/// Migrate the persist source to use the critical since when opening a new read handle.
444pub const USE_CRITICAL_SINCE_SOURCE: Config<bool> = Config::new(
445    "persist_use_critical_since_source",
446    false,
447    "Use the critical since (instead of the overall since) in the Persist source.",
448);
449
450/// Migrate snapshots to use the critical since when opening a new read handle.
451pub const USE_CRITICAL_SINCE_SNAPSHOT: Config<bool> = Config::new(
452    "persist_use_critical_since_snapshot",
453    false,
454    "Use the critical since (instead of the overall since) when taking snapshots in the controller or in fast-path peeks.",
455);
456
457/// Migrate the persist source to use a process global txn cache.
458pub const USE_GLOBAL_TXN_CACHE_SOURCE: Config<bool> = Config::new(
459    "use_global_txn_cache_source",
460    true,
461    "Use the process global txn cache (instead of an operator local one) in the Persist source.",
462);
463
464/// The maximum number of parts (s3 blobs) that [crate::batch::BatchBuilder]
465/// will pipeline before back-pressuring [crate::batch::BatchBuilder::add]
466/// calls on previous ones finishing.
467pub const BATCH_BUILDER_MAX_OUTSTANDING_PARTS: Config<usize> = Config::new(
468    "persist_batch_builder_max_outstanding_parts",
469    2,
470    "The number of writes a batch builder can have outstanding before we slow down the writer.",
471);
472
473/// In Compactor::compact_and_apply, we do the compaction (don't skip it)
474/// if the number of inputs is at least this many. Compaction is performed
475/// if any of the heuristic criteria are met (they are OR'd).
476pub const COMPACTION_HEURISTIC_MIN_INPUTS: Config<usize> = Config::new(
477    "persist_compaction_heuristic_min_inputs",
478    8,
479    "Don't skip compaction if we have more than this many hollow batches as input.",
480);
481
482/// In Compactor::compact_and_apply, we do the compaction (don't skip it)
483/// if the number of batch parts is at least this many. Compaction is performed
484/// if any of the heuristic criteria are met (they are OR'd).
485pub const COMPACTION_HEURISTIC_MIN_PARTS: Config<usize> = Config::new(
486    "persist_compaction_heuristic_min_parts",
487    8,
488    "Don't skip compaction if we have more than this many parts as input.",
489);
490
491/// In Compactor::compact_and_apply, we do the compaction (don't skip it)
492/// if the number of updates is at least this many. Compaction is performed
493/// if any of the heuristic criteria are met (they are OR'd).
494pub const COMPACTION_HEURISTIC_MIN_UPDATES: Config<usize> = Config::new(
495    "persist_compaction_heuristic_min_updates",
496    1024,
497    "Don't skip compaction if we have more than this many updates as input.",
498);
499
500/// The upper bound on compaction's memory consumption. The value must be at
501/// least 4*`blob_target_size`. Increasing this value beyond the minimum allows
502/// compaction to merge together more runs at once, providing greater
503/// consolidation of updates, at the cost of greater memory usage.
504pub const COMPACTION_MEMORY_BOUND_BYTES: Config<usize> = Config::new(
505    "persist_compaction_memory_bound_bytes",
506    1024 * MiB,
507    "Attempt to limit compaction to this amount of memory.",
508);
509
510/// The maximum number of concurrent blob deletes during garbage collection.
511pub const GC_BLOB_DELETE_CONCURRENCY_LIMIT: Config<usize> = Config::new(
512    "persist_gc_blob_delete_concurrency_limit",
513    32,
514    "Limit the number of concurrent deletes GC can perform to this threshold.",
515);
516
517/// The # of diffs to initially scan when fetching the latest consensus state, to
518/// determine which requests go down the fast vs slow path. Should be large enough
519/// to fetch all live diffs in the steady-state, and small enough to query Consensus
520/// at high volume. Steady-state usage should accommodate readers that require
521/// seqno-holds for reasonable amounts of time, which to start we say is 10s of minutes.
522///
523/// This value ought to be defined in terms of `NEED_ROLLUP_THRESHOLD` to approximate
524/// when we expect rollups to be written and therefore when old states will be truncated
525/// by GC.
526pub const STATE_VERSIONS_RECENT_LIVE_DIFFS_LIMIT: Config<usize> = Config::new(
527    "persist_state_versions_recent_live_diffs_limit",
528    30 * 128,
529    "Fetch this many diffs when fetching recent diffs.",
530);
531
532/// The maximum number of concurrent state fetches during usage computation.
533pub const USAGE_STATE_FETCH_CONCURRENCY_LIMIT: Config<usize> = Config::new(
534    "persist_usage_state_fetch_concurrency_limit",
535    8,
536    "Limit the concurrency in of fetching in the perioding Persist-storage-usage calculation.",
537);
538
539impl PostgresClientKnobs for PersistConfig {
540    fn connection_pool_max_size(&self) -> usize {
541        CONSENSUS_CONNECTION_POOL_MAX_SIZE.get(self)
542    }
543
544    fn connection_pool_max_wait(&self) -> Option<Duration> {
545        Some(CONSENSUS_CONNECTION_POOL_MAX_WAIT.get(self))
546    }
547
548    fn connection_pool_ttl(&self) -> Duration {
549        CONSENSUS_CONNECTION_POOL_TTL.get(self)
550    }
551
552    fn connection_pool_ttl_stagger(&self) -> Duration {
553        CONSENSUS_CONNECTION_POOL_TTL_STAGGER.get(self)
554    }
555
556    fn connect_timeout(&self) -> Duration {
557        CRDB_CONNECT_TIMEOUT.get(self)
558    }
559
560    fn tcp_user_timeout(&self) -> Duration {
561        CRDB_TCP_USER_TIMEOUT.get(self)
562    }
563}
564
565#[derive(Copy, Clone, Debug, Eq, PartialEq, Arbitrary, Serialize, Deserialize)]
566pub struct RetryParameters {
567    pub fixed_sleep: Duration,
568    pub initial_backoff: Duration,
569    pub multiplier: u32,
570    pub clamp: Duration,
571}
572
573impl RetryParameters {
574    pub(crate) fn into_retry(self, now: SystemTime) -> Retry {
575        let seed = now
576            .duration_since(UNIX_EPOCH)
577            .map_or(0, |x| u64::from(x.subsec_nanos()));
578        Retry {
579            fixed_sleep: self.fixed_sleep,
580            initial_backoff: self.initial_backoff,
581            multiplier: self.multiplier,
582            clamp_backoff: self.clamp,
583            seed,
584        }
585    }
586}
587
588pub(crate) const BLOB_OPERATION_TIMEOUT: Config<Duration> = Config::new(
589    "persist_blob_operation_timeout",
590    Duration::from_secs(180),
591    "Maximum time allowed for a network call, including retry attempts.",
592);
593
594pub(crate) const BLOB_OPERATION_ATTEMPT_TIMEOUT: Config<Duration> = Config::new(
595    "persist_blob_operation_attempt_timeout",
596    Duration::from_secs(90),
597    "Maximum time allowed for a single network call.",
598);
599
600pub(crate) const BLOB_CONNECT_TIMEOUT: Config<Duration> = Config::new(
601    "persist_blob_connect_timeout",
602    Duration::from_secs(7),
603    "Maximum time to wait for a socket connection to be made.",
604);
605
606pub(crate) const BLOB_READ_TIMEOUT: Config<Duration> = Config::new(
607    "persist_blob_read_timeout",
608    Duration::from_secs(10),
609    "Maximum time to wait to read the first byte of a response, including connection time.",
610);
611
612impl BlobKnobs for PersistConfig {
613    fn operation_timeout(&self) -> Duration {
614        BLOB_OPERATION_TIMEOUT.get(self)
615    }
616
617    fn operation_attempt_timeout(&self) -> Duration {
618        BLOB_OPERATION_ATTEMPT_TIMEOUT.get(self)
619    }
620
621    fn connect_timeout(&self) -> Duration {
622        BLOB_CONNECT_TIMEOUT.get(self)
623    }
624
625    fn read_timeout(&self) -> Duration {
626        BLOB_READ_TIMEOUT.get(self)
627    }
628
629    fn is_cc_active(&self) -> bool {
630        self.is_cc_active
631    }
632}
633
634pub fn check_data_version(code_version: &Version, data_version: &Version) -> Result<(), String> {
635    check_data_version_with_lts_versions(code_version, data_version, LTS_VERSIONS)
636}
637
638// If persist gets some encoded ProtoState from the future (e.g. two versions of
639// code are running simultaneously against the same shard), it might have a
640// field that the current code doesn't know about. This would be silently
641// discarded at proto decode time. Unknown Fields [1] are a tool we can use in
642// the future to help deal with this, but in the short-term, it's best to keep
643// the persist read-modify-CaS loop simple for as long as we can get away with
644// it (i.e. until we have to offer the ability to do rollbacks).
645//
646// [1]: https://developers.google.com/protocol-buffers/docs/proto3#unknowns
647//
648// To detect the bad situation and disallow it, we tag every version of state
649// written to consensus with the version of code used to encode it. Then at
650// decode time, we're able to compare the current version against any we receive
651// and assert as necessary.
652//
653// Initially we allow any from the past (permanent backward compatibility) and
654// one minor version into the future (forward compatibility). This allows us to
655// run two versions concurrently for rolling upgrades. We'll have to revisit
656// this logic if/when we start using major versions other than 0.
657//
658// We could do the same for blob data, but it shouldn't be necessary. Any blob
659// data we read is going to be because we fetched it using a pointer stored in
660// some persist state. If we can handle the state, we can handle the blobs it
661// references, too.
662pub(crate) fn check_data_version_with_lts_versions(
663    code_version: &Version,
664    data_version: &Version,
665    lts_versions: &[Version],
666) -> Result<(), String> {
667    // Allow upgrades specifically between consecutive LTS releases.
668    let base_code_version = Version {
669        patch: 0,
670        ..code_version.clone()
671    };
672    let base_data_version = Version {
673        patch: 0,
674        ..data_version.clone()
675    };
676    if data_version >= code_version {
677        for window in lts_versions.windows(2) {
678            if base_code_version == window[0] && base_data_version <= window[1] {
679                return Ok(());
680            }
681        }
682
683        if let Some(last) = lts_versions.last() {
684            if base_code_version == *last
685                // kind of arbitrary, but just ensure we don't accidentally
686                // upgrade too far (the previous check should ensure that a
687                // new version won't take over from a too-old previous
688                // version, but we want to make sure the other side also
689                // doesn't get confused)
690                && base_data_version
691                    .minor
692                    .saturating_sub(base_code_version.minor)
693                    < 40
694            {
695                return Ok(());
696            }
697        }
698    }
699
700    // Allow one minor version of forward compatibility. We could avoid the
701    // clone with some nested comparisons of the semver fields, but this code
702    // isn't particularly performance sensitive and I find this impl easier to
703    // reason about.
704    let max_allowed_data_version = Version::new(
705        code_version.major,
706        code_version.minor.saturating_add(1),
707        u64::MAX,
708    );
709
710    if &max_allowed_data_version < data_version {
711        Err(format!(
712            "{code_version} received persist state from the future {data_version}",
713        ))
714    } else {
715        Ok(())
716    }
717}