Skip to main content

mz_persist_client/
cfg.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10#![allow(missing_docs)]
11
12//! The tunable knobs for persist.
13
14use std::sync::Arc;
15use std::sync::atomic::AtomicBool;
16use std::time::{Duration, SystemTime, UNIX_EPOCH};
17
18use mz_build_info::BuildInfo;
19use mz_dyncfg::{Config, ConfigDefault, ConfigSet, ConfigUpdates};
20use mz_ore::instrument;
21use mz_ore::now::NowFn;
22use mz_persist::cfg::BlobKnobs;
23use mz_persist::retry::Retry;
24use mz_postgres_client::PostgresClientKnobs;
25use proptest_derive::Arbitrary;
26use semver::Version;
27use serde::{Deserialize, Serialize};
28use tokio::sync::watch;
29
30use crate::async_runtime;
31use crate::internal::machine::{
32    NEXT_LISTEN_BATCH_RETRYER_CLAMP, NEXT_LISTEN_BATCH_RETRYER_INITIAL_BACKOFF,
33    NEXT_LISTEN_BATCH_RETRYER_MULTIPLIER,
34};
35use crate::internal::state::ROLLUP_THRESHOLD;
36use crate::operators::STORAGE_SOURCE_DECODE_FUEL;
37use crate::read::READER_LEASE_DURATION;
38
39// Ignores the patch version
40const SELF_MANAGED_VERSIONS: &[Version; 2] = &[
41    // 25.1
42    Version::new(0, 130, 0),
43    // 25.2
44    Version::new(0, 147, 0),
45];
46
47/// The tunable knobs for persist.
48///
49/// Tuning inputs:
50/// - A larger blob_target_size (capped at KEY_VAL_DATA_MAX_LEN) results in
51///   fewer entries in consensus state. Before we have compaction and/or
52///   incremental state, it is already growing without bound, so this is a
53///   concern. OTOH, for any "reasonable" size (> 100MiB?) of blob_target_size,
54///   it seems we'd end up with a pretty tremendous amount of data in the shard
55///   before this became a real issue.
56/// - A larger blob_target_size will results in fewer s3 operations, which are
57///   charged per operation. (Hmm, maybe not if we're charged per call in a
58///   multipart op. The S3Blob impl already chunks things at 8MiB.)
59/// - A smaller blob_target_size will result in more even memory usage in
60///   readers.
61/// - A larger batch_builder_max_outstanding_parts increases throughput (to a
62///   point).
63/// - A smaller batch_builder_max_outstanding_parts provides a bound on the
64///   amount of memory used by a writer.
65/// - A larger compaction_heuristic_min_inputs means state size is larger.
66/// - A smaller compaction_heuristic_min_inputs means more compactions happen
67///   (higher write amp).
68/// - A larger compaction_heuristic_min_updates means more consolidations are
69///   discovered while reading a snapshot (higher read amp and higher space
70///   amp).
71/// - A smaller compaction_heuristic_min_updates means more compactions happen
72///   (higher write amp).
73///
74/// Tuning logic:
75/// - blob_target_size was initially selected to be an exact multiple of 8MiB
76///   (the s3 multipart size) that was in the same neighborhood as our initial
77///   max throughput (~250MiB).
78/// - batch_builder_max_outstanding_parts was initially selected to be as small
79///   as possible without harming pipelining. 0 means no pipelining, 1 is full
80///   pipelining as long as generating data takes less time than writing to s3
81///   (hopefully a fair assumption), 2 is a little extra slop on top of 1.
82/// - compaction_heuristic_min_inputs was set by running the open-loop benchmark
83///   with batches of size 10,240 bytes (selected to be small but such that the
84///   overhead of our columnar encoding format was less than 10%) and manually
85///   increased until the write amp stopped going down. This becomes much less
86///   important once we have incremental state. The initial value is a
87///   placeholder and should be revisited at some point.
88/// - compaction_heuristic_min_updates was set via a thought experiment. This is
89///   an `O(n*log(n))` upper bound on the number of unconsolidated updates that
90///   would be consolidated if we compacted as the in-mem Spine does. The
91///   initial value is a placeholder and should be revisited at some point.
92///
93/// TODO: Move these tuning notes into SessionVar descriptions once we have
94/// SystemVars for most of these.
95//
96// TODO: The configs left here don't react dynamically to changes. Move as many
97// of them to DynamicConfig as possible.
98#[derive(Debug, Clone)]
99pub struct PersistConfig {
100    /// Info about which version of the code is running.
101    pub build_version: Version,
102    /// An opaque string describing the host of this persist client.
103    /// Stored in state and used for debugging.
104    pub hostname: String,
105    /// Whether this persist instance is running in a "cc" sized cluster.
106    pub is_cc_active: bool,
107    /// Memory limit of the process, if known.
108    pub announce_memory_limit: Option<usize>,
109    /// A clock to use for all leasing and other non-debugging use.
110    pub now: NowFn,
111    /// Persist [Config]s that can change value dynamically within the lifetime
112    /// of a process.
113    ///
114    /// TODO(cfg): Entirely replace dynamic with this.
115    pub configs: Arc<ConfigSet>,
116    /// Indicates whether `configs` has been synced at least once with an
117    /// upstream source.
118    configs_synced_once: Arc<watch::Sender<bool>>,
119    /// Whether to physically and logically compact batches in blob storage.
120    pub compaction_enabled: bool,
121    /// Whether the `Compactor` will process compaction requests, or drop them on the floor.
122    pub compaction_process_requests: Arc<AtomicBool>,
123    /// In Compactor::compact_and_apply_background, the maximum number of concurrent
124    /// compaction requests that can execute for a given shard.
125    pub compaction_concurrency_limit: usize,
126    /// In Compactor::compact_and_apply_background, the maximum number of pending
127    /// compaction requests to queue.
128    pub compaction_queue_size: usize,
129    /// In Compactor::compact_and_apply_background, how many updates to encode or
130    /// decode before voluntarily yielding the task.
131    pub compaction_yield_after_n_updates: usize,
132    /// Length of time after a writer's last operation after which the writer
133    /// may be expired.
134    pub writer_lease_duration: Duration,
135    /// Length of time between critical handles' calls to downgrade since
136    pub critical_downgrade_interval: Duration,
137    /// Number of worker threads to create for the [`crate::IsolatedRuntime`], defaults to the
138    /// number of threads.
139    pub isolated_runtime_worker_threads: usize,
140}
141
142// Impl Deref to ConfigSet for convenience of accessing the dynamic configs.
143impl std::ops::Deref for PersistConfig {
144    type Target = ConfigSet;
145    fn deref(&self) -> &Self::Target {
146        &self.configs
147    }
148}
149
150impl PersistConfig {
151    /// Returns a new instance of [PersistConfig] with default tuning and
152    /// default ConfigSet.
153    pub fn new_default_configs(build_info: &BuildInfo, now: NowFn) -> Self {
154        Self::new(build_info, now, all_dyncfgs(ConfigSet::default()))
155    }
156
157    /// Returns a new instance of [PersistConfig] with default tuning and the
158    /// specified ConfigSet.
159    pub fn new(build_info: &BuildInfo, now: NowFn, configs: ConfigSet) -> Self {
160        // Escape hatch in case we need to disable compaction.
161        let compaction_disabled = mz_ore::env::is_var_truthy("MZ_PERSIST_COMPACTION_DISABLED");
162
163        // We create receivers on demand, so we drop the initial receiver.
164        let (configs_synced_once, _) = watch::channel(false);
165
166        Self {
167            build_version: build_info.semver_version(),
168            is_cc_active: false,
169            announce_memory_limit: None,
170            now,
171            configs: Arc::new(configs),
172            configs_synced_once: Arc::new(configs_synced_once),
173            compaction_enabled: !compaction_disabled,
174            compaction_process_requests: Arc::new(AtomicBool::new(true)),
175            compaction_concurrency_limit: 5,
176            compaction_queue_size: 20,
177            compaction_yield_after_n_updates: 100_000,
178            writer_lease_duration: 60 * Duration::from_secs(60),
179            critical_downgrade_interval: Duration::from_secs(30),
180            isolated_runtime_worker_threads: num_cpus::get(),
181            // TODO: This doesn't work with the process orchestrator. Instead,
182            // separate --log-prefix into --service-name and --enable-log-prefix
183            // options, where the first is always provided and the second is
184            // conditionally enabled by the process orchestrator.
185            hostname: {
186                use std::fmt::Write;
187                let mut name = std::env::var("HOSTNAME").unwrap_or_else(|_| "unknown".to_owned());
188                write!(&mut name, " {}", build_info.version)
189                    .expect("writing to string should not fail");
190                name
191            },
192        }
193    }
194
195    pub(crate) fn set_config<T: ConfigDefault>(&self, cfg: &Config<T>, val: T) {
196        let mut updates = ConfigUpdates::default();
197        updates.add(cfg, val);
198        updates.apply(self)
199    }
200
201    /// Applies the provided updates to this configuration.
202    ///
203    /// You should prefer calling this method over mutating `self.configs`
204    /// directly, so that [`Self::configs_synced_once`] can be properly
205    /// maintained.
206    pub fn apply_from(&self, updates: &ConfigUpdates) {
207        updates.apply(&self.configs);
208        self.configs_synced_once.send_replace(true);
209    }
210
211    /// Resolves when `configs` has been synced at least once with an upstream
212    /// source, i.e., via [`Self::apply_from`].
213    ///
214    /// If `configs` has already been synced once at the time the method is
215    /// called, resolves immediately.
216    ///
217    /// Useful in conjunction with configuration parameters that cannot be
218    /// dynamically updated once set (e.g., PubSub).
219    #[instrument(level = "info")]
220    pub async fn configs_synced_once(&self) {
221        self.configs_synced_once
222            .subscribe()
223            .wait_for(|synced| *synced)
224            .await
225            .expect("we have a borrow on sender so it cannot drop");
226    }
227
228    /// The maximum amount of work to do in the persist_source mfp_and_decode
229    /// operator before yielding.
230    pub fn storage_source_decode_fuel(&self) -> usize {
231        STORAGE_SOURCE_DECODE_FUEL.get(self)
232    }
233
234    /// Overrides the value for "persist_reader_lease_duration".
235    pub fn set_reader_lease_duration(&self, val: Duration) {
236        self.set_config(&READER_LEASE_DURATION, val);
237    }
238
239    /// Overrides the value for "persist_rollup_threshold".
240    pub fn set_rollup_threshold(&self, val: usize) {
241        self.set_config(&ROLLUP_THRESHOLD, val);
242    }
243
244    /// Overrides the value for the "persist_next_listen_batch_retryer_*"
245    /// configs.
246    pub fn set_next_listen_batch_retryer(&self, val: RetryParameters) {
247        self.set_config(
248            &NEXT_LISTEN_BATCH_RETRYER_INITIAL_BACKOFF,
249            val.initial_backoff,
250        );
251        self.set_config(&NEXT_LISTEN_BATCH_RETRYER_MULTIPLIER, val.multiplier);
252        self.set_config(&NEXT_LISTEN_BATCH_RETRYER_CLAMP, val.clamp);
253    }
254
255    pub fn disable_compaction(&self) {
256        tracing::info!("Disabling Persist Compaction");
257        self.compaction_process_requests
258            .store(false, std::sync::atomic::Ordering::Relaxed);
259    }
260
261    pub fn enable_compaction(&self) {
262        tracing::info!("Enabling Persist Compaction");
263        self.compaction_process_requests
264            .store(true, std::sync::atomic::Ordering::Relaxed);
265    }
266
267    /// Returns a new instance of [PersistConfig] for tests.
268    pub fn new_for_tests() -> Self {
269        use mz_build_info::DUMMY_BUILD_INFO;
270        use mz_ore::now::SYSTEM_TIME;
271
272        let mut cfg = Self::new_default_configs(&DUMMY_BUILD_INFO, SYSTEM_TIME.clone());
273        cfg.hostname = "tests".into();
274        cfg.isolated_runtime_worker_threads = async_runtime::TEST_THREADS;
275        cfg
276    }
277}
278
279#[allow(non_upper_case_globals)]
280pub(crate) const MiB: usize = 1024 * 1024;
281
282/// Adds the full set of all persist [Config]s.
283///
284/// TODO(cfg): Consider replacing this with a static global registry powered by
285/// something like the `ctor` or `inventory` crate. This would involve managing
286/// the footgun of a Config being linked into one binary but not the other.
287pub fn all_dyncfgs(configs: ConfigSet) -> ConfigSet {
288    mz_persist::cfg::all_dyn_configs(configs)
289        .add(&crate::batch::BATCH_DELETE_ENABLED)
290        .add(&crate::batch::BLOB_TARGET_SIZE)
291        .add(&crate::batch::INLINE_WRITES_TOTAL_MAX_BYTES)
292        .add(&crate::batch::INLINE_WRITES_SINGLE_MAX_BYTES)
293        .add(&crate::batch::ENCODING_ENABLE_DICTIONARY)
294        .add(&crate::batch::ENCODING_COMPRESSION_FORMAT)
295        .add(&crate::batch::STRUCTURED_KEY_LOWER_LEN)
296        .add(&crate::batch::MAX_RUN_LEN)
297        .add(&crate::batch::MAX_RUNS)
298        .add(&BLOB_OPERATION_TIMEOUT)
299        .add(&BLOB_OPERATION_ATTEMPT_TIMEOUT)
300        .add(&BLOB_CONNECT_TIMEOUT)
301        .add(&BLOB_READ_TIMEOUT)
302        .add(&crate::cfg::CONSENSUS_CONNECTION_POOL_MAX_SIZE)
303        .add(&crate::cfg::CONSENSUS_CONNECTION_POOL_MAX_WAIT)
304        .add(&crate::cfg::CONSENSUS_CONNECTION_POOL_TTL_STAGGER)
305        .add(&crate::cfg::CONSENSUS_CONNECTION_POOL_TTL)
306        .add(&crate::cfg::CRDB_CONNECT_TIMEOUT)
307        .add(&crate::cfg::CRDB_TCP_USER_TIMEOUT)
308        .add(&crate::cfg::CRDB_KEEPALIVES_IDLE)
309        .add(&crate::cfg::CRDB_KEEPALIVES_INTERVAL)
310        .add(&crate::cfg::CRDB_KEEPALIVES_RETRIES)
311        .add(&crate::cfg::USE_CRITICAL_SINCE_TXN)
312        .add(&crate::cfg::USE_CRITICAL_SINCE_CATALOG)
313        .add(&crate::cfg::USE_CRITICAL_SINCE_SOURCE)
314        .add(&crate::cfg::USE_CRITICAL_SINCE_SNAPSHOT)
315        .add(&BATCH_BUILDER_MAX_OUTSTANDING_PARTS)
316        .add(&COMPACTION_HEURISTIC_MIN_INPUTS)
317        .add(&COMPACTION_HEURISTIC_MIN_PARTS)
318        .add(&COMPACTION_HEURISTIC_MIN_UPDATES)
319        .add(&COMPACTION_MEMORY_BOUND_BYTES)
320        .add(&GC_BLOB_DELETE_CONCURRENCY_LIMIT)
321        .add(&STATE_VERSIONS_RECENT_LIVE_DIFFS_LIMIT)
322        .add(&USAGE_STATE_FETCH_CONCURRENCY_LIMIT)
323        .add(&crate::cli::admin::CATALOG_FORCE_COMPACTION_FUEL)
324        .add(&crate::cli::admin::CATALOG_FORCE_COMPACTION_WAIT)
325        .add(&crate::cli::admin::EXPRESSION_CACHE_FORCE_COMPACTION_FUEL)
326        .add(&crate::cli::admin::EXPRESSION_CACHE_FORCE_COMPACTION_WAIT)
327        .add(&crate::fetch::FETCH_SEMAPHORE_COST_ADJUSTMENT)
328        .add(&crate::fetch::FETCH_SEMAPHORE_PERMIT_ADJUSTMENT)
329        .add(&crate::fetch::VALIDATE_PART_BOUNDS_ON_READ)
330        .add(&crate::fetch::OPTIMIZE_IGNORED_DATA_FETCH)
331        .add(&crate::internal::cache::BLOB_CACHE_MEM_LIMIT_BYTES)
332        .add(&crate::internal::cache::BLOB_CACHE_SCALE_WITH_THREADS)
333        .add(&crate::internal::cache::BLOB_CACHE_SCALE_FACTOR_BYTES)
334        .add(&crate::internal::compact::COMPACTION_MINIMUM_TIMEOUT)
335        .add(&crate::internal::compact::COMPACTION_CHECK_PROCESS_FLAG)
336        .add(&crate::internal::machine::CLAIM_UNCLAIMED_COMPACTIONS)
337        .add(&crate::internal::machine::CLAIM_COMPACTION_PERCENT)
338        .add(&crate::internal::machine::CLAIM_COMPACTION_MIN_VERSION)
339        .add(&crate::internal::machine::NEXT_LISTEN_BATCH_RETRYER_CLAMP)
340        .add(&crate::internal::machine::NEXT_LISTEN_BATCH_RETRYER_FIXED_SLEEP)
341        .add(&crate::internal::machine::NEXT_LISTEN_BATCH_RETRYER_INITIAL_BACKOFF)
342        .add(&crate::internal::machine::NEXT_LISTEN_BATCH_RETRYER_MULTIPLIER)
343        .add(&crate::internal::state::ROLLUP_THRESHOLD)
344        .add(&crate::internal::state::ROLLUP_USE_ACTIVE_ROLLUP)
345        .add(&crate::internal::state::GC_FALLBACK_THRESHOLD_MS)
346        .add(&crate::internal::state::GC_USE_ACTIVE_GC)
347        .add(&crate::internal::state::GC_MIN_VERSIONS)
348        .add(&crate::internal::state::GC_MAX_VERSIONS)
349        .add(&crate::internal::state::ROLLUP_FALLBACK_THRESHOLD_MS)
350        .add(&crate::internal::state::ENABLE_INCREMENTAL_COMPACTION)
351        .add(&crate::operators::STORAGE_SOURCE_DECODE_FUEL)
352        .add(&crate::read::READER_LEASE_DURATION)
353        .add(&crate::rpc::PUBSUB_CLIENT_ENABLED)
354        .add(&crate::rpc::PUBSUB_PUSH_DIFF_ENABLED)
355        .add(&crate::rpc::PUBSUB_SAME_PROCESS_DELEGATE_ENABLED)
356        .add(&crate::rpc::PUBSUB_CONNECT_ATTEMPT_TIMEOUT)
357        .add(&crate::rpc::PUBSUB_REQUEST_TIMEOUT)
358        .add(&crate::rpc::PUBSUB_CONNECT_MAX_BACKOFF)
359        .add(&crate::rpc::PUBSUB_CLIENT_SENDER_CHANNEL_SIZE)
360        .add(&crate::rpc::PUBSUB_CLIENT_RECEIVER_CHANNEL_SIZE)
361        .add(&crate::rpc::PUBSUB_SERVER_CONNECTION_CHANNEL_SIZE)
362        .add(&crate::rpc::PUBSUB_STATE_CACHE_SHARD_REF_CHANNEL_SIZE)
363        .add(&crate::rpc::PUBSUB_RECONNECT_BACKOFF)
364        .add(&crate::stats::STATS_AUDIT_PERCENT)
365        .add(&crate::stats::STATS_AUDIT_PANIC)
366        .add(&crate::stats::STATS_BUDGET_BYTES)
367        .add(&crate::stats::STATS_COLLECTION_ENABLED)
368        .add(&crate::stats::STATS_FILTER_ENABLED)
369        .add(&crate::stats::STATS_UNTRIMMABLE_COLUMNS_EQUALS)
370        .add(&crate::stats::STATS_UNTRIMMABLE_COLUMNS_PREFIX)
371        .add(&crate::stats::STATS_UNTRIMMABLE_COLUMNS_SUFFIX)
372        .add(&crate::fetch::PART_DECODE_FORMAT)
373        .add(&crate::write::COMBINE_INLINE_WRITES)
374        .add(&crate::write::VALIDATE_PART_BOUNDS_ON_WRITE)
375}
376
377impl PersistConfig {
378    pub(crate) const DEFAULT_FALLBACK_ROLLUP_THRESHOLD_MULTIPLIER: usize = 3;
379
380    pub fn set_state_versions_recent_live_diffs_limit(&self, val: usize) {
381        self.set_config(&STATE_VERSIONS_RECENT_LIVE_DIFFS_LIMIT, val);
382    }
383}
384
385/// Sets the maximum size of the connection pool that is used by consensus.
386///
387/// Requires a restart of the process to take effect.
388pub const CONSENSUS_CONNECTION_POOL_MAX_SIZE: Config<usize> = Config::new(
389    "persist_consensus_connection_pool_max_size",
390    50,
391    "The maximum size the connection pool to Postgres/CRDB will grow to.",
392);
393
394/// Sets the maximum amount of time we'll wait to acquire a connection from
395/// the connection pool.
396///
397/// Requires a restart of the process to take effect.
398const CONSENSUS_CONNECTION_POOL_MAX_WAIT: Config<Duration> = Config::new(
399    "persist_consensus_connection_pool_max_wait",
400    Duration::from_secs(60),
401    "The amount of time we'll wait for a connection to become available.",
402);
403
404/// The minimum TTL of a connection to Postgres/CRDB before it is proactively
405/// terminated. Connections are routinely culled to balance load against the
406/// downstream database.
407const CONSENSUS_CONNECTION_POOL_TTL: Config<Duration> = Config::new(
408    "persist_consensus_connection_pool_ttl",
409    Duration::from_secs(300),
410    "\
411    The minimum TTL of a Consensus connection to Postgres/CRDB before it is \
412    proactively terminated",
413);
414
415/// The minimum time between TTLing connections to Postgres/CRDB. This delay is
416/// used to stagger reconnections to avoid stampedes and high tail latencies.
417/// This value should be much less than `consensus_connection_pool_ttl` so that
418/// reconnections are biased towards terminating the oldest connections first. A
419/// value of `consensus_connection_pool_ttl /
420/// consensus_connection_pool_max_size` is likely a good place to start so that
421/// all connections are rotated when the pool is fully used.
422const CONSENSUS_CONNECTION_POOL_TTL_STAGGER: Config<Duration> = Config::new(
423    "persist_consensus_connection_pool_ttl_stagger",
424    Duration::from_secs(6),
425    "The minimum time between TTLing Consensus connections to Postgres/CRDB.",
426);
427
428/// The duration to wait for a Consensus Postgres/CRDB connection to be made
429/// before retrying.
430pub const CRDB_CONNECT_TIMEOUT: Config<Duration> = Config::new(
431    "crdb_connect_timeout",
432    Duration::from_secs(5),
433    "The time to connect to CockroachDB before timing out and retrying.",
434);
435
436/// The TCP user timeout for a Consensus Postgres/CRDB connection. Specifies the
437/// amount of time that transmitted data may remain unacknowledged before the
438/// TCP connection is forcibly closed.
439pub const CRDB_TCP_USER_TIMEOUT: Config<Duration> = Config::new(
440    "crdb_tcp_user_timeout",
441    Duration::from_secs(30),
442    "\
443    The TCP timeout for connections to CockroachDB. Specifies the amount of \
444    time that transmitted data may remain unacknowledged before the TCP \
445    connection is forcibly closed.",
446);
447
448pub const CRDB_KEEPALIVES_IDLE: Config<Duration> = Config::new(
449    "crdb_keepalives_idle",
450    Duration::from_secs(10),
451    "\
452    The amount of idle time before a TCP keepalive packet is sent on CRDB \
453    connections.",
454);
455
456pub const CRDB_KEEPALIVES_INTERVAL: Config<Duration> = Config::new(
457    "crdb_keepalives_interval",
458    Duration::from_secs(5),
459    "The time interval between TCP keepalive probes on CRDB connections.",
460);
461
462pub const CRDB_KEEPALIVES_RETRIES: Config<u32> = Config::new(
463    "crdb_keepalives_retries",
464    5,
465    "\
466    The maximum number of TCP keepalive probes that will be sent before \
467    dropping a CRDB connection.",
468);
469
470/// Migrate the txns code to use the critical since when opening a new read handle.
471pub const USE_CRITICAL_SINCE_TXN: Config<bool> = Config::new(
472    "persist_use_critical_since_txn",
473    true,
474    "Use the critical since (instead of the overall since) when initializing a subscribe.",
475);
476
477/// Migrate the catalog to use the critical since when opening a new read handle.
478pub const USE_CRITICAL_SINCE_CATALOG: Config<bool> = Config::new(
479    "persist_use_critical_since_catalog",
480    false,
481    "Use the critical since (instead of the overall since) for the Persist-backed catalog.",
482);
483
484/// Migrate the persist source to use the critical since when opening a new read handle.
485pub const USE_CRITICAL_SINCE_SOURCE: Config<bool> = Config::new(
486    "persist_use_critical_since_source",
487    false,
488    "Use the critical since (instead of the overall since) in the Persist source.",
489);
490
491/// Migrate snapshots to use the critical since when opening a new read handle.
492pub const USE_CRITICAL_SINCE_SNAPSHOT: Config<bool> = Config::new(
493    "persist_use_critical_since_snapshot",
494    false,
495    "Use the critical since (instead of the overall since) when taking snapshots in the controller or in fast-path peeks.",
496);
497
498/// The maximum number of parts (s3 blobs) that [crate::batch::BatchBuilder]
499/// will pipeline before back-pressuring [crate::batch::BatchBuilder::add]
500/// calls on previous ones finishing.
501pub const BATCH_BUILDER_MAX_OUTSTANDING_PARTS: Config<usize> = Config::new(
502    "persist_batch_builder_max_outstanding_parts",
503    2,
504    "The number of writes a batch builder can have outstanding before we slow down the writer.",
505);
506
507/// In Compactor::compact_and_apply, we do the compaction (don't skip it)
508/// if the number of inputs is at least this many. Compaction is performed
509/// if any of the heuristic criteria are met (they are OR'd).
510pub const COMPACTION_HEURISTIC_MIN_INPUTS: Config<usize> = Config::new(
511    "persist_compaction_heuristic_min_inputs",
512    8,
513    "Don't skip compaction if we have more than this many hollow batches as input.",
514);
515
516/// In Compactor::compact_and_apply, we do the compaction (don't skip it)
517/// if the number of batch parts is at least this many. Compaction is performed
518/// if any of the heuristic criteria are met (they are OR'd).
519pub const COMPACTION_HEURISTIC_MIN_PARTS: Config<usize> = Config::new(
520    "persist_compaction_heuristic_min_parts",
521    8,
522    "Don't skip compaction if we have more than this many parts as input.",
523);
524
525/// In Compactor::compact_and_apply, we do the compaction (don't skip it)
526/// if the number of updates is at least this many. Compaction is performed
527/// if any of the heuristic criteria are met (they are OR'd).
528pub const COMPACTION_HEURISTIC_MIN_UPDATES: Config<usize> = Config::new(
529    "persist_compaction_heuristic_min_updates",
530    1024,
531    "Don't skip compaction if we have more than this many updates as input.",
532);
533
534/// The upper bound on compaction's memory consumption. The value must be at
535/// least 4*`blob_target_size`. Increasing this value beyond the minimum allows
536/// compaction to merge together more runs at once, providing greater
537/// consolidation of updates, at the cost of greater memory usage.
538pub const COMPACTION_MEMORY_BOUND_BYTES: Config<usize> = Config::new(
539    "persist_compaction_memory_bound_bytes",
540    1024 * MiB,
541    "Attempt to limit compaction to this amount of memory.",
542);
543
544/// The maximum number of concurrent blob deletes during garbage collection.
545pub const GC_BLOB_DELETE_CONCURRENCY_LIMIT: Config<usize> = Config::new(
546    "persist_gc_blob_delete_concurrency_limit",
547    32,
548    "Limit the number of concurrent deletes GC can perform to this threshold.",
549);
550
551/// The # of diffs to initially scan when fetching the latest consensus state, to
552/// determine which requests go down the fast vs slow path. Should be large enough
553/// to fetch all live diffs in the steady-state, and small enough to query Consensus
554/// at high volume. Steady-state usage should accommodate readers that require
555/// seqno-holds for reasonable amounts of time, which to start we say is 10s of minutes.
556///
557/// This value ought to be defined in terms of `NEED_ROLLUP_THRESHOLD` to approximate
558/// when we expect rollups to be written and therefore when old states will be truncated
559/// by GC.
560pub const STATE_VERSIONS_RECENT_LIVE_DIFFS_LIMIT: Config<usize> = Config::new(
561    "persist_state_versions_recent_live_diffs_limit",
562    30 * 128,
563    "Fetch this many diffs when fetching recent diffs.",
564);
565
566/// The maximum number of concurrent state fetches during usage computation.
567pub const USAGE_STATE_FETCH_CONCURRENCY_LIMIT: Config<usize> = Config::new(
568    "persist_usage_state_fetch_concurrency_limit",
569    8,
570    "Limit the concurrency in of fetching in the perioding Persist-storage-usage calculation.",
571);
572
573impl PostgresClientKnobs for PersistConfig {
574    fn connection_pool_max_size(&self) -> usize {
575        CONSENSUS_CONNECTION_POOL_MAX_SIZE.get(self)
576    }
577
578    fn connection_pool_max_wait(&self) -> Option<Duration> {
579        Some(CONSENSUS_CONNECTION_POOL_MAX_WAIT.get(self))
580    }
581
582    fn connection_pool_ttl(&self) -> Duration {
583        CONSENSUS_CONNECTION_POOL_TTL.get(self)
584    }
585
586    fn connection_pool_ttl_stagger(&self) -> Duration {
587        CONSENSUS_CONNECTION_POOL_TTL_STAGGER.get(self)
588    }
589
590    fn connect_timeout(&self) -> Duration {
591        CRDB_CONNECT_TIMEOUT.get(self)
592    }
593
594    fn tcp_user_timeout(&self) -> Duration {
595        CRDB_TCP_USER_TIMEOUT.get(self)
596    }
597
598    fn keepalives_idle(&self) -> Duration {
599        CRDB_KEEPALIVES_IDLE.get(self)
600    }
601
602    fn keepalives_interval(&self) -> Duration {
603        CRDB_KEEPALIVES_INTERVAL.get(self)
604    }
605
606    fn keepalives_retries(&self) -> u32 {
607        CRDB_KEEPALIVES_RETRIES.get(self)
608    }
609}
610
611#[derive(Copy, Clone, Debug, Eq, PartialEq, Arbitrary, Serialize, Deserialize)]
612pub struct RetryParameters {
613    pub fixed_sleep: Duration,
614    pub initial_backoff: Duration,
615    pub multiplier: u32,
616    pub clamp: Duration,
617}
618
619impl RetryParameters {
620    pub(crate) fn into_retry(self, now: SystemTime) -> Retry {
621        let seed = now
622            .duration_since(UNIX_EPOCH)
623            .map_or(0, |x| u64::from(x.subsec_nanos()));
624        Retry {
625            fixed_sleep: self.fixed_sleep,
626            initial_backoff: self.initial_backoff,
627            multiplier: self.multiplier,
628            clamp_backoff: self.clamp,
629            seed,
630        }
631    }
632}
633
634pub(crate) const BLOB_OPERATION_TIMEOUT: Config<Duration> = Config::new(
635    "persist_blob_operation_timeout",
636    Duration::from_secs(180),
637    "Maximum time allowed for a network call, including retry attempts.",
638);
639
640pub(crate) const BLOB_OPERATION_ATTEMPT_TIMEOUT: Config<Duration> = Config::new(
641    "persist_blob_operation_attempt_timeout",
642    Duration::from_secs(90),
643    "Maximum time allowed for a single network call.",
644);
645
646pub(crate) const BLOB_CONNECT_TIMEOUT: Config<Duration> = Config::new(
647    "persist_blob_connect_timeout",
648    Duration::from_secs(7),
649    "Maximum time to wait for a socket connection to be made.",
650);
651
652pub(crate) const BLOB_READ_TIMEOUT: Config<Duration> = Config::new(
653    "persist_blob_read_timeout",
654    Duration::from_secs(10),
655    "Maximum time to wait to read the first byte of a response, including connection time.",
656);
657
658impl BlobKnobs for PersistConfig {
659    fn operation_timeout(&self) -> Duration {
660        BLOB_OPERATION_TIMEOUT.get(self)
661    }
662
663    fn operation_attempt_timeout(&self) -> Duration {
664        BLOB_OPERATION_ATTEMPT_TIMEOUT.get(self)
665    }
666
667    fn connect_timeout(&self) -> Duration {
668        BLOB_CONNECT_TIMEOUT.get(self)
669    }
670
671    fn read_timeout(&self) -> Duration {
672        BLOB_READ_TIMEOUT.get(self)
673    }
674
675    fn is_cc_active(&self) -> bool {
676        self.is_cc_active
677    }
678}
679
680/// If persist gets some encoded ProtoState from the future (e.g. two versions of
681/// code are running simultaneously against the same shard), it might have a
682/// field that the current code doesn't know about. This would be silently
683/// discarded at proto decode time: our Proto library can't handle unknown fields,
684/// and old versions of code might not be able to respect the semantics of the new
685/// fields even if they did.
686///
687/// [1]: https://developers.google.com/protocol-buffers/docs/proto3#unknowns
688///
689/// To detect the bad situation and disallow it, we tag every version of state
690/// written to consensus with the version of code it's compatible with. Then at
691/// decode time, we're able to compare the current version against any we receive
692/// and assert as necessary. The current version is typically the version of code
693/// used to write the state, but it may be lower when code is intentionally emulating
694/// an older version during eg. a graceful upgrade process.
695///
696/// We could do the same for blob data, but it shouldn't be necessary. Any blob
697/// data we read is going to be because we fetched it using a pointer stored in
698/// some persist state. If we can handle the state, we can handle the blobs it
699/// references, too.
700pub fn code_can_read_data(code_version: &Version, data_version: &Version) -> bool {
701    // For now, Persist can read arbitrarily old state data.
702    // We expect to add a floor to this in future versions.
703    code_version.cmp_precedence(data_version).is_ge()
704}
705
706/// Can the given version of the code generate data that older versions can understand?
707/// Imagine the case of eg. garbage collection after a version upgrade... we may need to read old
708/// diffs to be able to find blobs to delete, even if we no longer have code to generate data in
709/// that format.
710pub fn code_can_write_data(code_version: &Version, data_version: &Version) -> bool {
711    if !code_can_read_data(code_version, data_version) {
712        return false;
713    }
714
715    if code_version.major == 0 && code_version.minor <= SELF_MANAGED_VERSIONS[1].minor {
716        // This code was added well after the last ad-hoc version was released,
717        // so we don't strictly model compatibility with earlier releases.
718        true
719    } else if code_version.major == 0 {
720        // Self-managed versions 25.2+ must be upgradeable from 25.1+.
721        SELF_MANAGED_VERSIONS[0]
722            .cmp_precedence(data_version)
723            .is_le()
724    } else if code_version.major <= 26 {
725        // Versions 26.x must be upgradeable from the last pre-1.0 release.
726        SELF_MANAGED_VERSIONS[1]
727            .cmp_precedence(data_version)
728            .is_le()
729    } else {
730        // Otherwise, the data must be from at earliest the _previous_ major version.
731        code_version.major - 1 <= data_version.major
732    }
733}