1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

#![allow(missing_docs)]

//! The tunable knobs for persist.

use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};

use mz_build_info::BuildInfo;
use mz_dyncfg::{Config, ConfigSet, ConfigType, ConfigUpdates};
use mz_ore::instrument;
use mz_ore::now::NowFn;
use mz_persist::cfg::BlobKnobs;
use mz_persist::retry::Retry;
use mz_postgres_client::PostgresClientKnobs;
use proptest_derive::Arbitrary;
use semver::Version;
use serde::{Deserialize, Serialize};
use tokio::sync::watch;

use crate::internal::machine::{
    NEXT_LISTEN_BATCH_RETRYER_CLAMP, NEXT_LISTEN_BATCH_RETRYER_INITIAL_BACKOFF,
    NEXT_LISTEN_BATCH_RETRYER_MULTIPLIER,
};
use crate::internal::state::ROLLUP_THRESHOLD;
use crate::operators::{
    PERSIST_SINK_MINIMUM_BATCH_UPDATES, STORAGE_PERSIST_SINK_MINIMUM_BATCH_UPDATES,
    STORAGE_SOURCE_DECODE_FUEL,
};
use crate::project::OPTIMIZE_IGNORED_DATA_DECODE;
use crate::read::READER_LEASE_DURATION;

/// The tunable knobs for persist.
///
/// Tuning inputs:
/// - A larger blob_target_size (capped at KEY_VAL_DATA_MAX_LEN) results in
///   fewer entries in consensus state. Before we have compaction and/or
///   incremental state, it is already growing without bound, so this is a
///   concern. OTOH, for any "reasonable" size (> 100MiB?) of blob_target_size,
///   it seems we'd end up with a pretty tremendous amount of data in the shard
///   before this became a real issue.
/// - A larger blob_target_size will results in fewer s3 operations, which are
///   charged per operation. (Hmm, maybe not if we're charged per call in a
///   multipart op. The S3Blob impl already chunks things at 8MiB.)
/// - A smaller blob_target_size will result in more even memory usage in
///   readers.
/// - A larger batch_builder_max_outstanding_parts increases throughput (to a
///   point).
/// - A smaller batch_builder_max_outstanding_parts provides a bound on the
///   amount of memory used by a writer.
/// - A larger compaction_heuristic_min_inputs means state size is larger.
/// - A smaller compaction_heuristic_min_inputs means more compactions happen
///   (higher write amp).
/// - A larger compaction_heuristic_min_updates means more consolidations are
///   discovered while reading a snapshot (higher read amp and higher space
///   amp).
/// - A smaller compaction_heuristic_min_updates means more compactions happen
///   (higher write amp).
///
/// Tuning logic:
/// - blob_target_size was initially selected to be an exact multiple of 8MiB
///   (the s3 multipart size) that was in the same neighborhood as our initial
///   max throughput (~250MiB).
/// - batch_builder_max_outstanding_parts was initially selected to be as small
///   as possible without harming pipelining. 0 means no pipelining, 1 is full
///   pipelining as long as generating data takes less time than writing to s3
///   (hopefully a fair assumption), 2 is a little extra slop on top of 1.
/// - compaction_heuristic_min_inputs was set by running the open-loop benchmark
///   with batches of size 10,240 bytes (selected to be small but such that the
///   overhead of our columnar encoding format was less than 10%) and manually
///   increased until the write amp stopped going down. This becomes much less
///   important once we have incremental state. The initial value is a
///   placeholder and should be revisited at some point.
/// - compaction_heuristic_min_updates was set via a thought experiment. This is
///   an `O(n*log(n))` upper bound on the number of unconsolidated updates that
///   would be consolidated if we compacted as the in-mem Spine does. The
///   initial value is a placeholder and should be revisited at some point.
///
/// TODO: Move these tuning notes into SessionVar descriptions once we have
/// SystemVars for most of these.
//
// TODO: The configs left here don't react dynamically to changes. Move as many
// of them to DynamicConfig as possible.
#[derive(Debug, Clone)]
pub struct PersistConfig {
    /// Info about which version of the code is running.
    pub build_version: Version,
    /// Hostname of this persist user. Stored in state and used for debugging.
    pub hostname: String,
    /// Whether this persist instance is running in a "cc" sized cluster.
    pub is_cc_active: bool,
    /// Memory limit of the process, if known.
    pub announce_memory_limit: Option<usize>,
    /// A clock to use for all leasing and other non-debugging use.
    pub now: NowFn,
    /// Persist [Config]s that can change value dynamically within the lifetime
    /// of a process.
    ///
    /// TODO(cfg): Entirely replace dynamic with this.
    pub configs: Arc<ConfigSet>,
    /// Indicates whether `configs` has been synced at least once with an
    /// upstream source.
    configs_synced_once: Arc<watch::Sender<bool>>,
    /// Configurations that can be dynamically updated.
    pub dynamic: Arc<DynamicConfig>,
    /// Whether to physically and logically compact batches in blob storage.
    pub compaction_enabled: bool,
    /// In Compactor::compact_and_apply_background, the maximum number of concurrent
    /// compaction requests that can execute for a given shard.
    pub compaction_concurrency_limit: usize,
    /// In Compactor::compact_and_apply_background, the maximum number of pending
    /// compaction requests to queue.
    pub compaction_queue_size: usize,
    /// In Compactor::compact_and_apply_background, how many updates to encode or
    /// decode before voluntarily yielding the task.
    pub compaction_yield_after_n_updates: usize,
    /// The maximum size of the connection pool to Postgres/CRDB when performing
    /// consensus reads and writes.
    pub consensus_connection_pool_max_size: usize,
    /// The maximum time to wait when attempting to obtain a connection from the pool.
    pub consensus_connection_pool_max_wait: Option<Duration>,
    /// Length of time after a writer's last operation after which the writer
    /// may be expired.
    pub writer_lease_duration: Duration,
    /// Length of time between critical handles' calls to downgrade since
    pub critical_downgrade_interval: Duration,
    /// Timeout per connection attempt to Persist PubSub service.
    pub pubsub_connect_attempt_timeout: Duration,
    /// Timeout per request attempt to Persist PubSub service.
    pub pubsub_request_timeout: Duration,
    /// Maximum backoff when retrying connection establishment to Persist PubSub service.
    pub pubsub_connect_max_backoff: Duration,
    /// Size of channel used to buffer send messages to PubSub service.
    pub pubsub_client_sender_channel_size: usize,
    /// Size of channel used to buffer received messages from PubSub service.
    pub pubsub_client_receiver_channel_size: usize,
    /// Size of channel used per connection to buffer broadcasted messages from PubSub server.
    pub pubsub_server_connection_channel_size: usize,
    /// Size of channel used by the state cache to broadcast shard state references.
    pub pubsub_state_cache_shard_ref_channel_size: usize,
    /// Backoff after an established connection to Persist PubSub service fails.
    pub pubsub_reconnect_backoff: Duration,
    /// Number of worker threads to create for the [`crate::IsolatedRuntime`], defaults to the
    /// number of threads.
    pub isolated_runtime_worker_threads: usize,
}

// Impl Deref to ConfigSet for convenience of accessing the dynamic configs.
impl std::ops::Deref for PersistConfig {
    type Target = ConfigSet;
    fn deref(&self) -> &Self::Target {
        &self.configs
    }
}

impl PersistConfig {
    /// Returns a new instance of [PersistConfig] with default tuning and
    /// default ConfigSet.
    pub fn new_default_configs(build_info: &BuildInfo, now: NowFn) -> Self {
        Self::new(build_info, now, all_dyncfgs(ConfigSet::default()))
    }

    /// Returns a new instance of [PersistConfig] with default tuning and the
    /// specified ConfigSet.
    pub fn new(build_info: &BuildInfo, now: NowFn, configs: ConfigSet) -> Self {
        // Escape hatch in case we need to disable compaction.
        let compaction_disabled = mz_ore::env::is_var_truthy("MZ_PERSIST_COMPACTION_DISABLED");

        // We create receivers on demand, so we drop the initial receiver.
        let (configs_synced_once, _) = watch::channel(false);

        Self {
            build_version: build_info.semver_version(),
            is_cc_active: false,
            announce_memory_limit: None,
            now,
            configs: Arc::new(configs),
            configs_synced_once: Arc::new(configs_synced_once),
            dynamic: Arc::new(DynamicConfig {
                batch_builder_max_outstanding_parts: AtomicUsize::new(2),
                compaction_heuristic_min_inputs: AtomicUsize::new(8),
                compaction_heuristic_min_parts: AtomicUsize::new(8),
                compaction_heuristic_min_updates: AtomicUsize::new(1024),
                compaction_memory_bound_bytes: AtomicUsize::new(1024 * MiB),
                gc_blob_delete_concurrency_limit: AtomicUsize::new(32),
                state_versions_recent_live_diffs_limit: AtomicUsize::new(
                    30 * ROLLUP_THRESHOLD.default(),
                ),
                usage_state_fetch_concurrency_limit: AtomicUsize::new(8),
            }),
            compaction_enabled: !compaction_disabled,
            compaction_concurrency_limit: 5,
            compaction_queue_size: 20,
            compaction_yield_after_n_updates: 100_000,
            consensus_connection_pool_max_size: 50,
            consensus_connection_pool_max_wait: Some(Duration::from_secs(60)),
            writer_lease_duration: 60 * Duration::from_secs(60),
            critical_downgrade_interval: Duration::from_secs(30),
            pubsub_connect_attempt_timeout: Duration::from_secs(5),
            pubsub_request_timeout: Duration::from_secs(5),
            pubsub_connect_max_backoff: Duration::from_secs(60),
            pubsub_client_sender_channel_size: 25,
            pubsub_client_receiver_channel_size: 25,
            pubsub_server_connection_channel_size: 25,
            pubsub_state_cache_shard_ref_channel_size: 25,
            pubsub_reconnect_backoff: Duration::from_secs(5),
            isolated_runtime_worker_threads: num_cpus::get(),
            // TODO: This doesn't work with the process orchestrator. Instead,
            // separate --log-prefix into --service-name and --enable-log-prefix
            // options, where the first is always provided and the second is
            // conditionally enabled by the process orchestrator.
            hostname: std::env::var("HOSTNAME").unwrap_or_else(|_| "unknown".to_owned()),
        }
    }

    pub(crate) fn set_config<T: ConfigType>(&self, cfg: &Config<T>, val: T) {
        let mut updates = ConfigUpdates::default();
        updates.add(cfg, val);
        updates.apply(self)
    }

    /// Applies the provided updates to this configuration.
    ///
    /// You should prefer calling this method over mutating `self.configs`
    /// directly, so that [`Self::configs_synced_once`] can be properly
    /// maintained.
    pub fn apply_from(&self, updates: &ConfigUpdates) {
        updates.apply(&self.configs);
        self.configs_synced_once.send_replace(true);
    }

    /// Resolves when `configs` has been synced at least once with an upstream
    /// source, i.e., via [`Self::apply_from`].
    ///
    /// If `configs` has already been synced once at the time the method is
    /// called, resolves immediately.
    ///
    /// Useful in conjunction with configuration parameters that cannot be
    /// dynamically updated once set (e.g., PubSub).
    #[instrument(level = "info")]
    pub async fn configs_synced_once(&self) {
        self.configs_synced_once
            .subscribe()
            .wait_for(|synced| *synced)
            .await
            .expect("we have a borrow on sender so it cannot drop");
    }

    /// The minimum number of updates that justify writing out a batch in `persist_sink`'s
    /// `write_batches` operator. (If there are fewer than this minimum number of updates,
    /// they'll be forwarded on to `append_batch` to be combined and written there.)
    pub fn sink_minimum_batch_updates(&self) -> usize {
        PERSIST_SINK_MINIMUM_BATCH_UPDATES.get(self)
    }

    /// The same as `Self::sink_minimum_batch_updates`, but
    /// for storage `persist_sink`'s.
    pub fn storage_sink_minimum_batch_updates(&self) -> usize {
        STORAGE_PERSIST_SINK_MINIMUM_BATCH_UPDATES.get(self)
    }

    /// The maximum amount of work to do in the persist_source mfp_and_decode
    /// operator before yielding.
    pub fn storage_source_decode_fuel(&self) -> usize {
        STORAGE_SOURCE_DECODE_FUEL.get(self)
    }

    /// CYA to allow opt-out of a performance optimization to skip decoding
    /// ignored data.
    pub fn optimize_ignored_data_decode(&self) -> bool {
        OPTIMIZE_IGNORED_DATA_DECODE.get(self)
    }

    /// Overrides the value for "persist_reader_lease_duration".
    pub fn set_reader_lease_duration(&self, val: Duration) {
        self.set_config(&READER_LEASE_DURATION, val);
    }

    /// Overrides the value for "persist_rollup_threshold".
    pub fn set_rollup_threshold(&self, val: usize) {
        self.set_config(&ROLLUP_THRESHOLD, val);
    }

    /// Overrides the value for the "persist_next_listen_batch_retryer_*"
    /// configs.
    pub fn set_next_listen_batch_retryer(&self, val: RetryParameters) {
        self.set_config(
            &NEXT_LISTEN_BATCH_RETRYER_INITIAL_BACKOFF,
            val.initial_backoff,
        );
        self.set_config(&NEXT_LISTEN_BATCH_RETRYER_MULTIPLIER, val.multiplier);
        self.set_config(&NEXT_LISTEN_BATCH_RETRYER_CLAMP, val.clamp);
    }

    /// Returns a new instance of [PersistConfig] for tests.
    pub fn new_for_tests() -> Self {
        use mz_build_info::DUMMY_BUILD_INFO;
        use mz_ore::now::SYSTEM_TIME;

        let mut cfg = Self::new_default_configs(&DUMMY_BUILD_INFO, SYSTEM_TIME.clone());
        cfg.hostname = "tests".into();
        cfg
    }
}

#[allow(non_upper_case_globals)]
pub(crate) const MiB: usize = 1024 * 1024;

/// Adds the full set of all persist [Config]s.
///
/// TODO(cfg): Consider replacing this with a static global registry powered by
/// something like the `ctor` or `inventory` crate. This would involve managing
/// the footgun of a Config being linked into one binary but not the other.
pub fn all_dyncfgs(configs: ConfigSet) -> ConfigSet {
    mz_persist::cfg::all_dyn_configs(configs)
        .add(&crate::batch::BATCH_DELETE_ENABLED)
        .add(&crate::batch::BATCH_COLUMNAR_FORMAT)
        .add(&crate::batch::BATCH_COLUMNAR_FORMAT_PERCENT)
        .add(&crate::batch::BATCH_RECORD_PART_FORMAT)
        .add(&crate::batch::BLOB_TARGET_SIZE)
        .add(&crate::batch::INLINE_WRITES_TOTAL_MAX_BYTES)
        .add(&crate::batch::INLINE_WRITES_SINGLE_MAX_BYTES)
        .add(&crate::batch::ENCODING_ENABLE_DICTIONARY)
        .add(&crate::batch::ENCODING_COMPRESSION_FORMAT)
        .add(&crate::batch::RECORD_RUN_META)
        .add(&crate::batch::STRUCTURED_ORDER)
        .add(&crate::batch::STRUCTURED_KEY_LOWER_LEN)
        .add(&crate::cfg::CONSENSUS_CONNECTION_POOL_TTL_STAGGER)
        .add(&crate::cfg::CONSENSUS_CONNECTION_POOL_TTL)
        .add(&crate::cfg::CRDB_CONNECT_TIMEOUT)
        .add(&crate::cfg::CRDB_TCP_USER_TIMEOUT)
        .add(&crate::cfg::USE_CRITICAL_SINCE_TXN)
        .add(&crate::cfg::USE_CRITICAL_SINCE_CATALOG)
        .add(&crate::cfg::USE_CRITICAL_SINCE_SOURCE)
        .add(&crate::cfg::USE_CRITICAL_SINCE_SNAPSHOT)
        .add(&crate::cfg::USE_GLOBAL_TXN_CACHE_SOURCE)
        .add(&crate::cli::admin::CATALOG_FORCE_COMPACTION_FUEL)
        .add(&crate::cli::admin::CATALOG_FORCE_COMPACTION_WAIT)
        .add(&crate::fetch::FETCH_SEMAPHORE_COST_ADJUSTMENT)
        .add(&crate::fetch::FETCH_SEMAPHORE_PERMIT_ADJUSTMENT)
        .add(&crate::internal::cache::BLOB_CACHE_MEM_LIMIT_BYTES)
        .add(&crate::internal::compact::COMPACTION_MINIMUM_TIMEOUT)
        .add(&crate::internal::machine::CLAIM_UNCLAIMED_COMPACTIONS)
        .add(&crate::internal::machine::CLAIM_COMPACTION_PERCENT)
        .add(&crate::internal::machine::CLAIM_COMPACTION_MIN_VERSION)
        .add(&crate::internal::machine::NEXT_LISTEN_BATCH_RETRYER_CLAMP)
        .add(&crate::internal::machine::NEXT_LISTEN_BATCH_RETRYER_FIXED_SLEEP)
        .add(&crate::internal::machine::NEXT_LISTEN_BATCH_RETRYER_INITIAL_BACKOFF)
        .add(&crate::internal::machine::NEXT_LISTEN_BATCH_RETRYER_MULTIPLIER)
        .add(&crate::internal::machine::RECORD_COMPACTIONS)
        .add(&crate::internal::state::ROLLUP_THRESHOLD)
        .add(&crate::internal::state::WRITE_DIFFS_SUM)
        .add(&crate::internal::apply::ROUNDTRIP_SPINE)
        .add(&crate::operators::PERSIST_SINK_MINIMUM_BATCH_UPDATES)
        .add(&crate::operators::STORAGE_PERSIST_SINK_MINIMUM_BATCH_UPDATES)
        .add(&crate::operators::STORAGE_SOURCE_DECODE_FUEL)
        .add(&crate::project::OPTIMIZE_IGNORED_DATA_DECODE)
        .add(&crate::project::OPTIMIZE_IGNORED_DATA_FETCH)
        .add(&crate::read::READER_LEASE_DURATION)
        .add(&crate::rpc::PUBSUB_CLIENT_ENABLED)
        .add(&crate::rpc::PUBSUB_PUSH_DIFF_ENABLED)
        .add(&crate::stats::STATS_AUDIT_PERCENT)
        .add(&crate::stats::STATS_BUDGET_BYTES)
        .add(&crate::stats::STATS_COLLECTION_ENABLED)
        .add(&crate::stats::STATS_FILTER_ENABLED)
        .add(&crate::stats::STATS_UNTRIMMABLE_COLUMNS_EQUALS)
        .add(&crate::stats::STATS_UNTRIMMABLE_COLUMNS_PREFIX)
        .add(&crate::stats::STATS_UNTRIMMABLE_COLUMNS_SUFFIX)
        .add(&crate::fetch::PART_DECODE_FORMAT)
        .add(&crate::DANGEROUS_ENABLE_SCHEMA_EVOLUTION)
}

impl PersistConfig {
    pub(crate) const DEFAULT_FALLBACK_ROLLUP_THRESHOLD_MULTIPLIER: usize = 3;

    // TODO: Get rid of this in favor of using dyncfgs at the
    // relevant callsites.
    pub fn set_state_versions_recent_live_diffs_limit(&self, val: usize) {
        self.dynamic
            .state_versions_recent_live_diffs_limit
            .store(val, DynamicConfig::STORE_ORDERING);
    }
}

/// The minimum TTL of a connection to Postgres/CRDB before it is proactively
/// terminated. Connections are routinely culled to balance load against the
/// downstream database.
const CONSENSUS_CONNECTION_POOL_TTL: Config<Duration> = Config::new(
    "persist_consensus_connection_pool_ttl",
    Duration::from_secs(300),
    "\
    The minimum TTL of a Consensus connection to Postgres/CRDB before it is \
    proactively terminated",
);

/// The minimum time between TTLing connections to Postgres/CRDB. This delay is
/// used to stagger reconnections to avoid stampedes and high tail latencies.
/// This value should be much less than `consensus_connection_pool_ttl` so that
/// reconnections are biased towards terminating the oldest connections first. A
/// value of `consensus_connection_pool_ttl /
/// consensus_connection_pool_max_size` is likely a good place to start so that
/// all connections are rotated when the pool is fully used.
const CONSENSUS_CONNECTION_POOL_TTL_STAGGER: Config<Duration> = Config::new(
    "persist_consensus_connection_pool_ttl_stagger",
    Duration::from_secs(6),
    "The minimum time between TTLing Consensus connections to Postgres/CRDB.",
);

/// The duration to wait for a Consensus Postgres/CRDB connection to be made
/// before retrying.
pub const CRDB_CONNECT_TIMEOUT: Config<Duration> = Config::new(
    "crdb_connect_timeout",
    Duration::from_secs(5),
    "The time to connect to CockroachDB before timing out and retrying.",
);

/// The TCP user timeout for a Consensus Postgres/CRDB connection. Specifies the
/// amount of time that transmitted data may remain unacknowledged before the
/// TCP connection is forcibly closed.
pub const CRDB_TCP_USER_TIMEOUT: Config<Duration> = Config::new(
    "crdb_tcp_user_timeout",
    Duration::from_secs(30),
    "\
    The TCP timeout for connections to CockroachDB. Specifies the amount of \
    time that transmitted data may remain unacknowledged before the TCP \
    connection is forcibly closed.",
);

/// Migrate the txns code to use the critical since when opening a new read handle.
pub const USE_CRITICAL_SINCE_TXN: Config<bool> = Config::new(
    "persist_use_critical_since_txn",
    true,
    "Use the critical since (instead of the overall since) when initializing a subscribe.",
);

/// Migrate the catalog to use the critical since when opening a new read handle.
pub const USE_CRITICAL_SINCE_CATALOG: Config<bool> = Config::new(
    "persist_use_critical_since_catalog",
    false,
    "Use the critical since (instead of the overall since) for the Persist-backed catalog.",
);

/// Migrate the persist source to use the critical since when opening a new read handle.
pub const USE_CRITICAL_SINCE_SOURCE: Config<bool> = Config::new(
    "persist_use_critical_since_source",
    false,
    "Use the critical since (instead of the overall since) in the Persist source.",
);

/// Migrate snapshots to use the critical since when opening a new read handle.
pub const USE_CRITICAL_SINCE_SNAPSHOT: Config<bool> = Config::new(
    "persist_use_critical_since_snapshot",
    false,
    "Use the critical since (instead of the overall since) when taking snapshots in the controller or in fast-path peeks.",
);

/// Migrate the persist source to use a process global txn cache.
pub const USE_GLOBAL_TXN_CACHE_SOURCE: Config<bool> = Config::new(
    "use_global_txn_cache_source",
    true,
    "Use the process global txn cache (instead of an operator local one) in the Persist source.",
);

impl PostgresClientKnobs for PersistConfig {
    fn connection_pool_max_size(&self) -> usize {
        self.consensus_connection_pool_max_size
    }

    fn connection_pool_max_wait(&self) -> Option<Duration> {
        self.consensus_connection_pool_max_wait
    }

    fn connection_pool_ttl(&self) -> Duration {
        CONSENSUS_CONNECTION_POOL_TTL.get(self)
    }

    fn connection_pool_ttl_stagger(&self) -> Duration {
        CONSENSUS_CONNECTION_POOL_TTL_STAGGER.get(self)
    }

    fn connect_timeout(&self) -> Duration {
        CRDB_CONNECT_TIMEOUT.get(self)
    }

    fn tcp_user_timeout(&self) -> Duration {
        CRDB_TCP_USER_TIMEOUT.get(self)
    }
}

/// Persist configurations that can be dynamically updated.
///
/// Persist is expected to react to each of these such that updating the value
/// returned by the function takes effect in persist (i.e. no caching it). This
/// should happen "as promptly as reasonably possible" where that's defined by
/// the tradeoffs of complexity vs promptness. For example, we might use a
/// consistent version of `BLOB_TARGET_SIZE` for the entirety of a single
/// compaction call. However, it should _never_ require a process restart for an
/// update of these to take effect.
///
/// These are hooked up to LaunchDarkly. Specifically, LaunchDarkly configs are
/// serialized into a [mz_dyncfg::ConfigUpdates]. In environmentd, these are applied
/// directly via [mz_dyncfg::ConfigUpdates::apply] to the [PersistConfig] in
/// [crate::cache::PersistClientCache]. There is one `PersistClientCache` per
/// process, and every `PersistConfig` shares the same `Arc<DynamicConfig>`, so
/// this affects all [DynamicConfig] usage in the process. The
/// `ConfigUpdates` is also sent via the compute and storage command
/// streams, which then apply it to all computed/storaged/clusterd processes as
/// well.
#[derive(Debug)]
pub struct DynamicConfig {
    batch_builder_max_outstanding_parts: AtomicUsize,
    compaction_heuristic_min_inputs: AtomicUsize,
    compaction_heuristic_min_parts: AtomicUsize,
    compaction_heuristic_min_updates: AtomicUsize,
    compaction_memory_bound_bytes: AtomicUsize,
    gc_blob_delete_concurrency_limit: AtomicUsize,
    state_versions_recent_live_diffs_limit: AtomicUsize,
    usage_state_fetch_concurrency_limit: AtomicUsize,
}

#[derive(Copy, Clone, Debug, Eq, PartialEq, Arbitrary, Serialize, Deserialize)]
pub struct RetryParameters {
    pub fixed_sleep: Duration,
    pub initial_backoff: Duration,
    pub multiplier: u32,
    pub clamp: Duration,
}

impl RetryParameters {
    pub(crate) fn into_retry(self, now: SystemTime) -> Retry {
        let seed = now
            .duration_since(UNIX_EPOCH)
            .map_or(0, |x| u64::from(x.subsec_nanos()));
        Retry {
            fixed_sleep: self.fixed_sleep,
            initial_backoff: self.initial_backoff,
            multiplier: self.multiplier,
            clamp_backoff: self.clamp,
            seed,
        }
    }
}

impl DynamicConfig {
    // TODO: Decide if we can relax these.
    const LOAD_ORDERING: Ordering = Ordering::SeqCst;
    const STORE_ORDERING: Ordering = Ordering::SeqCst;

    /// The maximum number of parts (s3 blobs) that [crate::batch::BatchBuilder]
    /// will pipeline before back-pressuring [crate::batch::BatchBuilder::add]
    /// calls on previous ones finishing.
    pub fn batch_builder_max_outstanding_parts(&self) -> usize {
        self.batch_builder_max_outstanding_parts
            .load(Self::LOAD_ORDERING)
    }

    /// In Compactor::compact_and_apply, we do the compaction (don't skip it)
    /// if the number of inputs is at least this many. Compaction is performed
    /// if any of the heuristic criteria are met (they are OR'd).
    pub fn compaction_heuristic_min_inputs(&self) -> usize {
        self.compaction_heuristic_min_inputs
            .load(Self::LOAD_ORDERING)
    }

    /// In Compactor::compact_and_apply, we do the compaction (don't skip it)
    /// if the number of batch parts is at least this many. Compaction is performed
    /// if any of the heuristic criteria are met (they are OR'd).
    pub fn compaction_heuristic_min_parts(&self) -> usize {
        self.compaction_heuristic_min_parts
            .load(Self::LOAD_ORDERING)
    }

    /// In Compactor::compact_and_apply, we do the compaction (don't skip it)
    /// if the number of updates is at least this many. Compaction is performed
    /// if any of the heuristic criteria are met (they are OR'd).
    pub fn compaction_heuristic_min_updates(&self) -> usize {
        self.compaction_heuristic_min_updates
            .load(Self::LOAD_ORDERING)
    }

    /// The upper bound on compaction's memory consumption. The value must be at
    /// least 4*`blob_target_size`. Increasing this value beyond the minimum allows
    /// compaction to merge together more runs at once, providing greater
    /// consolidation of updates, at the cost of greater memory usage.
    pub fn compaction_memory_bound_bytes(&self) -> usize {
        self.compaction_memory_bound_bytes.load(Self::LOAD_ORDERING)
    }

    /// The maximum number of concurrent blob deletes during garbage collection.
    pub fn gc_blob_delete_concurrency_limit(&self) -> usize {
        self.gc_blob_delete_concurrency_limit
            .load(Self::LOAD_ORDERING)
    }

    /// The # of diffs to initially scan when fetching the latest consensus state, to
    /// determine which requests go down the fast vs slow path. Should be large enough
    /// to fetch all live diffs in the steady-state, and small enough to query Consensus
    /// at high volume. Steady-state usage should accommodate readers that require
    /// seqno-holds for reasonable amounts of time, which to start we say is 10s of minutes.
    ///
    /// This value ought to be defined in terms of `NEED_ROLLUP_THRESHOLD` to approximate
    /// when we expect rollups to be written and therefore when old states will be truncated
    /// by GC.
    pub fn state_versions_recent_live_diffs_limit(&self) -> usize {
        self.state_versions_recent_live_diffs_limit
            .load(Self::LOAD_ORDERING)
    }

    /// The maximum number of concurrent state fetches during usage computation.
    pub fn usage_state_fetch_concurrency_limit(&self) -> usize {
        self.usage_state_fetch_concurrency_limit
            .load(Self::LOAD_ORDERING)
    }

    // TODO: Get rid of these in favor of using dyncfgs at the
    // relevant callsites.
    #[cfg(test)]
    pub fn set_batch_builder_max_outstanding_parts(&self, val: usize) {
        self.batch_builder_max_outstanding_parts
            .store(val, Self::LOAD_ORDERING);
    }
    pub fn set_compaction_memory_bound_bytes(&self, val: usize) {
        self.compaction_memory_bound_bytes
            .store(val, Self::LOAD_ORDERING);
    }
}

// TODO: Replace with dynamic values when PersistConfig is integrated with LD
impl BlobKnobs for PersistConfig {
    fn operation_timeout(&self) -> Duration {
        Duration::from_secs(180)
    }

    fn operation_attempt_timeout(&self) -> Duration {
        Duration::from_secs(90)
    }

    fn connect_timeout(&self) -> Duration {
        Duration::from_secs(7)
    }

    fn read_timeout(&self) -> Duration {
        Duration::from_secs(10)
    }

    fn is_cc_active(&self) -> bool {
        self.is_cc_active
    }
}

// If persist gets some encoded ProtoState from the future (e.g. two versions of
// code are running simultaneously against the same shard), it might have a
// field that the current code doesn't know about. This would be silently
// discarded at proto decode time. Unknown Fields [1] are a tool we can use in
// the future to help deal with this, but in the short-term, it's best to keep
// the persist read-modify-CaS loop simple for as long as we can get away with
// it (i.e. until we have to offer the ability to do rollbacks).
//
// [1]: https://developers.google.com/protocol-buffers/docs/proto3#unknowns
//
// To detect the bad situation and disallow it, we tag every version of state
// written to consensus with the version of code used to encode it. Then at
// decode time, we're able to compare the current version against any we receive
// and assert as necessary.
//
// Initially we allow any from the past (permanent backward compatibility) and
// one minor version into the future (forward compatibility). This allows us to
// run two versions concurrently for rolling upgrades. We'll have to revisit
// this logic if/when we start using major versions other than 0.
//
// We could do the same for blob data, but it shouldn't be necessary. Any blob
// data we read is going to be because we fetched it using a pointer stored in
// some persist state. If we can handle the state, we can handle the blobs it
// references, too.
pub fn check_data_version(code_version: &Version, data_version: &Version) -> Result<(), String> {
    // Allow one minor version of forward compatibility. We could avoid the
    // clone with some nested comparisons of the semver fields, but this code
    // isn't particularly performance sensitive and I find this impl easier to
    // reason about.
    let max_allowed_data_version = Version::new(
        code_version.major,
        code_version.minor.saturating_add(1),
        u64::MAX,
    );

    if &max_allowed_data_version < data_version {
        Err(format!(
            "{code_version} received persist state from the future {data_version}",
        ))
    } else {
        Ok(())
    }
}