Skip to main content

mz_storage_client/
storage_collections.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! An abstraction for dealing with storage collections.
11
12use std::cmp::Reverse;
13use std::collections::{BTreeMap, BTreeSet};
14use std::fmt::Debug;
15use std::iter;
16use std::num::NonZeroI64;
17use std::sync::{Arc, Mutex};
18use std::time::Duration;
19
20use async_trait::async_trait;
21use differential_dataflow::lattice::Lattice;
22use futures::future::BoxFuture;
23use futures::stream::{BoxStream, FuturesUnordered};
24use futures::{Future, FutureExt, StreamExt};
25use itertools::Itertools;
26use mz_ore::collections::CollectionExt;
27use mz_ore::metrics::MetricsRegistry;
28use mz_ore::now::NowFn;
29use mz_ore::task::AbortOnDropHandle;
30use mz_ore::{assert_none, instrument, soft_assert_or_log};
31use mz_persist_client::cache::PersistClientCache;
32use mz_persist_client::cfg::USE_CRITICAL_SINCE_SNAPSHOT;
33use mz_persist_client::critical::{Opaque, SinceHandle};
34use mz_persist_client::read::{Cursor, ReadHandle};
35use mz_persist_client::schema::CaESchema;
36use mz_persist_client::stats::{SnapshotPartsStats, SnapshotStats};
37use mz_persist_client::write::WriteHandle;
38use mz_persist_client::{Diagnostics, PersistClient, PersistLocation, ShardId};
39use mz_persist_types::codec_impls::UnitSchema;
40use mz_persist_types::txn::TxnsCodec;
41use mz_repr::{GlobalId, RelationDesc, RelationVersion, Row, Timestamp};
42use mz_storage_types::StorageDiff;
43use mz_storage_types::configuration::StorageConfiguration;
44use mz_storage_types::connections::ConnectionContext;
45use mz_storage_types::controller::{CollectionMetadata, StorageError, TxnsCodecRow};
46use mz_storage_types::dyncfgs::STORAGE_DOWNGRADE_SINCE_DURING_FINALIZATION;
47use mz_storage_types::errors::CollectionMissing;
48use mz_storage_types::parameters::StorageParameters;
49use mz_storage_types::read_holds::ReadHold;
50use mz_storage_types::read_policy::ReadPolicy;
51use mz_storage_types::sources::{GenericSourceConnection, SourceData, SourceEnvelope, Timeline};
52use mz_storage_types::time_dependence::{TimeDependence, TimeDependenceError};
53use mz_txn_wal::metrics::Metrics as TxnMetrics;
54use mz_txn_wal::txn_read::{DataSnapshot, TxnsRead};
55use mz_txn_wal::txns::TxnsHandle;
56use timely::PartialOrder;
57use timely::progress::frontier::MutableAntichain;
58use timely::progress::{Antichain, ChangeBatch};
59use tokio::sync::{mpsc, oneshot};
60use tokio::time::MissedTickBehavior;
61use tracing::{debug, info, trace, warn};
62
63use crate::client::TimestamplessUpdateBuilder;
64use crate::controller::{
65    CollectionDescription, DataSource, PersistEpoch, StorageMetadata, StorageTxn,
66};
67use crate::storage_collections::metrics::{ShardIdSet, StorageCollectionsMetrics};
68
69mod metrics;
70
71/// An abstraction for keeping track of storage collections and managing access
72/// to them.
73///
74/// Responsibilities:
75///
76/// - Keeps a critical persist handle for holding the since of collections
77///   where it need to be.
78///
79/// - Drives the since forward based on the upper of a collection and a
80///   [ReadPolicy].
81///
82/// - Hands out [ReadHolds](ReadHold) that prevent a collection's since from
83/// advancing while it needs to be read at a specific time.
84#[async_trait]
85pub trait StorageCollections: Debug + Sync {
86    /// On boot, reconcile this [StorageCollections] with outside state. We get
87    /// a [StorageTxn] where we can record any durable state that we need.
88    ///
89    /// We get `init_ids`, which tells us about all collections that currently
90    /// exist, so that we can record durable state for those that _we_ don't
91    /// know yet about.
92    async fn initialize_state(
93        &self,
94        txn: &mut (dyn StorageTxn + Send),
95        init_ids: BTreeSet<GlobalId>,
96    ) -> Result<(), StorageError>;
97
98    /// Update storage configuration with new parameters.
99    fn update_parameters(&self, config_params: StorageParameters);
100
101    /// Returns the [CollectionMetadata] of the collection identified by `id`.
102    fn collection_metadata(&self, id: GlobalId) -> Result<CollectionMetadata, CollectionMissing>;
103
104    /// Acquire an iterator over [CollectionMetadata] for all active
105    /// collections.
106    ///
107    /// A collection is "active" when it has a non empty frontier of read
108    /// capabilties.
109    fn active_collection_metadatas(&self) -> Vec<(GlobalId, CollectionMetadata)>;
110
111    /// Returns the frontiers of the identified collection.
112    fn collection_frontiers(&self, id: GlobalId) -> Result<CollectionFrontiers, CollectionMissing> {
113        let frontiers = self
114            .collections_frontiers(vec![id])?
115            .expect_element(|| "known to exist");
116
117        Ok(frontiers)
118    }
119
120    /// Atomically gets and returns the frontiers of all the identified
121    /// collections.
122    fn collections_frontiers(
123        &self,
124        id: Vec<GlobalId>,
125    ) -> Result<Vec<CollectionFrontiers>, CollectionMissing>;
126
127    /// Atomically gets and returns the frontiers of all active collections.
128    ///
129    /// A collection is "active" when it has a non-empty frontier of read
130    /// capabilities.
131    fn active_collection_frontiers(&self) -> Vec<CollectionFrontiers>;
132
133    /// Checks whether a collection exists under the given `GlobalId`. Returns
134    /// an error if the collection does not exist.
135    fn check_exists(&self, id: GlobalId) -> Result<(), StorageError>;
136
137    /// Returns aggregate statistics about the contents of the local input named
138    /// `id` at `as_of`.
139    async fn snapshot_stats(
140        &self,
141        id: GlobalId,
142        as_of: Antichain<Timestamp>,
143    ) -> Result<SnapshotStats, StorageError>;
144
145    /// Returns aggregate statistics about the contents of the local input named
146    /// `id` at `as_of`.
147    ///
148    /// Note that this async function itself returns a future. We may
149    /// need to block on the stats being available, but don't want to hold a reference
150    /// to the controller for too long... so the outer future holds a reference to the
151    /// controller but returns quickly, and the inner future is slow but does not
152    /// reference the controller.
153    async fn snapshot_parts_stats(
154        &self,
155        id: GlobalId,
156        as_of: Antichain<Timestamp>,
157    ) -> BoxFuture<'static, Result<SnapshotPartsStats, StorageError>>;
158
159    /// Returns a snapshot of the contents of collection `id` at `as_of`.
160    fn snapshot(
161        &self,
162        id: GlobalId,
163        as_of: Timestamp,
164    ) -> BoxFuture<'static, Result<Vec<(Row, StorageDiff)>, StorageError>>;
165
166    /// Returns a snapshot of the contents of collection `id` at the largest readable `as_of`.
167    /// The collection must consolidate to a set, i.e., the multiplicity of every row must be 1!
168    ///
169    /// # Errors
170    ///
171    /// - Returns `StorageError::InvalidUsage` if the collection is closed.
172    /// - Propagates the error if the underlying `snapshot` call errors.
173    ///
174    /// # Panics
175    ///
176    /// Panics if the collection does not consolidate to a set at that `as_of`
177    /// (i.e., if any row survives with a multiplicity other than `+1`). Only
178    /// safe to call on collections whose producer guarantees set semantics;
179    /// not safe on arbitrary user collections.
180    async fn snapshot_latest(&self, id: GlobalId) -> Result<Vec<Row>, StorageError>;
181
182    /// Returns a snapshot of the contents of collection `id` at `as_of`.
183    fn snapshot_cursor(
184        &self,
185        id: GlobalId,
186        as_of: Timestamp,
187    ) -> BoxFuture<'static, Result<SnapshotCursor, StorageError>>;
188
189    /// Generates a snapshot of the contents of collection `id` at `as_of` and
190    /// streams out all of the updates in bounded memory.
191    ///
192    /// The output is __not__ consolidated.
193    fn snapshot_and_stream(
194        &self,
195        id: GlobalId,
196        as_of: Timestamp,
197    ) -> BoxFuture<
198        'static,
199        Result<BoxStream<'static, (SourceData, Timestamp, StorageDiff)>, StorageError>,
200    >;
201
202    /// Create a [`TimestamplessUpdateBuilder`] that can be used to stage
203    /// updates for the provided [`GlobalId`].
204    fn create_update_builder(
205        &self,
206        id: GlobalId,
207    ) -> BoxFuture<
208        'static,
209        Result<TimestamplessUpdateBuilder<SourceData, (), StorageDiff>, StorageError>,
210    >;
211
212    /// Update the given [`StorageTxn`] with the appropriate metadata given the
213    /// IDs to add and drop.
214    ///
215    /// The data modified in the `StorageTxn` must be made available in all
216    /// subsequent calls that require [`StorageMetadata`] as a parameter.
217    async fn prepare_state(
218        &self,
219        txn: &mut (dyn StorageTxn + Send),
220        ids_to_add: BTreeSet<GlobalId>,
221        ids_to_drop: BTreeSet<GlobalId>,
222        ids_to_register: BTreeMap<GlobalId, ShardId>,
223    ) -> Result<(), StorageError>;
224
225    /// Create the collections described by the individual
226    /// [CollectionDescriptions](CollectionDescription).
227    ///
228    /// Each command carries the source id, the source description, and any
229    /// associated metadata needed to ingest the particular source.
230    ///
231    /// This command installs collection state for the indicated sources, and
232    /// they are now valid to use in queries at times beyond the initial `since`
233    /// frontiers. Each collection also acquires a read capability at this
234    /// frontier, which will need to be repeatedly downgraded with
235    /// `allow_compaction()` to permit compaction.
236    ///
237    /// This method is NOT idempotent; It can fail between processing of
238    /// different collections and leave the [StorageCollections] in an
239    /// inconsistent state. It is almost always wrong to do anything but abort
240    /// the process on `Err`.
241    ///
242    /// The `register_ts` is used as the initial timestamp that tables are
243    /// available for reads. (We might later give non-tables the same treatment,
244    /// but hold off on that initially.) Callers must provide a Some if any of
245    /// the collections is a table. A None may be given if none of the
246    /// collections are a table (i.e. all materialized views, sources, etc).
247    ///
248    /// `migrated_storage_collections` is a set of migrated storage collections to be excluded
249    /// from the txn-wal sub-system.
250    async fn create_collections_for_bootstrap(
251        &self,
252        storage_metadata: &StorageMetadata,
253        register_ts: Option<Timestamp>,
254        collections: Vec<(GlobalId, CollectionDescription)>,
255        migrated_storage_collections: &BTreeSet<GlobalId>,
256    ) -> Result<(), StorageError>;
257
258    /// Updates the [`RelationDesc`] for the specified table.
259    async fn alter_table_desc(
260        &self,
261        existing_collection: GlobalId,
262        new_collection: GlobalId,
263        new_desc: RelationDesc,
264        expected_version: RelationVersion,
265    ) -> Result<(), StorageError>;
266
267    /// Drops the read capability for the sources and allows their resources to
268    /// be reclaimed.
269    ///
270    /// TODO(jkosh44): This method does not validate the provided identifiers.
271    /// Currently when the controller starts/restarts it has no durable state.
272    /// That means that it has no way of remembering any past commands sent. In
273    /// the future we plan on persisting state for the controller so that it is
274    /// aware of past commands. Therefore this method is for dropping sources
275    /// that we know to have been previously created, but have been forgotten by
276    /// the controller due to a restart. Once command history becomes durable we
277    /// can remove this method and use the normal `drop_sources`.
278    fn drop_collections_unvalidated(
279        &self,
280        storage_metadata: &StorageMetadata,
281        identifiers: Vec<GlobalId>,
282    );
283
284    /// Assigns a read policy to specific identifiers.
285    ///
286    /// The policies are assigned in the order presented, and repeated
287    /// identifiers should conclude with the last policy. Changing a policy will
288    /// immediately downgrade the read capability if appropriate, but it will
289    /// not "recover" the read capability if the prior capability is already
290    /// ahead of it.
291    ///
292    /// This [StorageCollections] may include its own overrides on these
293    /// policies.
294    ///
295    /// Identifiers not present in `policies` retain their existing read
296    /// policies.
297    fn set_read_policies(&self, policies: Vec<(GlobalId, ReadPolicy)>);
298
299    /// Acquires and returns the earliest possible read holds for the specified
300    /// collections.
301    fn acquire_read_holds(
302        &self,
303        desired_holds: Vec<GlobalId>,
304    ) -> Result<Vec<ReadHold>, CollectionMissing>;
305
306    /// Get the time dependence for a storage collection. Returns no value if unknown or if
307    /// the object isn't managed by storage.
308    fn determine_time_dependence(
309        &self,
310        id: GlobalId,
311    ) -> Result<Option<TimeDependence>, TimeDependenceError>;
312
313    /// Returns the state of [`StorageCollections`] formatted as JSON.
314    fn dump(&self) -> Result<serde_json::Value, anyhow::Error>;
315}
316
317/// A cursor over a snapshot, allowing us to read just part of a snapshot in its
318/// consolidated form.
319pub struct SnapshotCursor {
320    // We allocate a temporary read handle for each snapshot, and that handle needs to live at
321    // least as long as the cursor itself, which holds part leases. Bundling them together!
322    pub _read_handle: ReadHandle<SourceData, (), Timestamp, StorageDiff>,
323    pub cursor: Cursor<SourceData, (), Timestamp, StorageDiff>,
324}
325
326impl SnapshotCursor {
327    pub async fn next(
328        &mut self,
329    ) -> Option<impl Iterator<Item = (SourceData, Timestamp, StorageDiff)> + Sized + '_> {
330        let iter = self.cursor.next().await?;
331        Some(iter.map(|((k, ()), t, d)| (k, t, d)))
332    }
333}
334
335/// Frontiers of the collection identified by `id`.
336#[derive(Debug)]
337pub struct CollectionFrontiers {
338    /// The [GlobalId] of the collection that these frontiers belong to.
339    pub id: GlobalId,
340
341    /// The upper/write frontier of the collection.
342    pub write_frontier: Antichain<Timestamp>,
343
344    /// The since frontier that is implied by the collection's existence,
345    /// disregarding any read holds.
346    ///
347    /// Concretely, it is the since frontier that is implied by the combination
348    /// of the `write_frontier` and a [ReadPolicy]. The implied capability is
349    /// derived from the write frontier using the [ReadPolicy].
350    pub implied_capability: Antichain<Timestamp>,
351
352    /// The frontier of all oustanding [ReadHolds](ReadHold). This includes the
353    /// implied capability.
354    pub read_capabilities: Antichain<Timestamp>,
355}
356
357/// Implementation of [StorageCollections] that is shallow-cloneable and uses a
358/// background task for doing work concurrently, in the background.
359#[derive(Debug, Clone)]
360pub struct StorageCollectionsImpl {
361    /// The fencing token for this instance of [StorageCollections], and really
362    /// all of the controllers and Coordinator.
363    envd_epoch: NonZeroI64,
364
365    /// Whether or not this [StorageCollections] is in read-only mode.
366    ///
367    /// When in read-only mode, we are not allowed to affect changes to external
368    /// systems, including, for example, acquiring and downgrading critical
369    /// [SinceHandles](SinceHandle)
370    read_only: bool,
371
372    /// The set of [ShardIds](ShardId) that we have to finalize. These will have
373    /// been persisted by the caller of [StorageCollections::prepare_state].
374    finalizable_shards: Arc<ShardIdSet>,
375
376    /// The set of [ShardIds](ShardId) that we have finalized. We keep track of
377    /// shards here until we are given a chance to let our callers know that
378    /// these have been finalized, for example via
379    /// [StorageCollections::prepare_state].
380    finalized_shards: Arc<ShardIdSet>,
381
382    /// Collections maintained by this [StorageCollections].
383    collections: Arc<std::sync::Mutex<BTreeMap<GlobalId, CollectionState>>>,
384
385    /// A shared TxnsCache running in a task and communicated with over a channel.
386    txns_read: TxnsRead<Timestamp>,
387
388    /// Storage configuration parameters.
389    config: Arc<Mutex<StorageConfiguration>>,
390
391    /// The upper of the txn shard as it was when we booted. We forward the
392    /// upper of created/registered tables to make sure that their uppers are
393    /// not less than the initially known txn upper.
394    ///
395    /// NOTE: This works around a quirk in how the adapter chooses the as_of of
396    /// existing indexes when bootstrapping, where tables that have an upper
397    /// that is less than the initially known txn upper can lead to indexes that
398    /// cannot hydrate in read-only mode.
399    initial_txn_upper: Antichain<Timestamp>,
400
401    /// The persist location where all storage collections are being written to
402    persist_location: PersistLocation,
403
404    /// A persist client used to write to storage collections
405    persist: Arc<PersistClientCache>,
406
407    /// For sending commands to our internal task.
408    cmd_tx: mpsc::UnboundedSender<BackgroundCmd>,
409
410    /// For sending updates about read holds to our internal task.
411    holds_tx: mpsc::UnboundedSender<(GlobalId, ChangeBatch<Timestamp>)>,
412
413    /// Handles to tasks we own, making sure they're dropped when we are.
414    _background_task: Arc<AbortOnDropHandle<()>>,
415    _finalize_shards_task: Arc<AbortOnDropHandle<()>>,
416}
417
418// Supporting methods for implementing [StorageCollections].
419//
420// Almost all internal methods that are the backing implementation for a trait
421// method have the `_inner` suffix.
422//
423// We follow a pattern where `_inner` methods get a mutable reference to the
424// shared collections state, and it's the public-facing method that locks the
425/// A boxed stream of source data with timestamps and diffs.
426type SourceDataStream = BoxStream<'static, (SourceData, Timestamp, StorageDiff)>;
427
428// state for the duration of its invocation. This allows calling other `_inner`
429// methods from within `_inner` methods.
430impl StorageCollectionsImpl {
431    /// Creates and returns a new [StorageCollections].
432    ///
433    /// Note that when creating a new [StorageCollections], you must also
434    /// reconcile it with the previous state using
435    /// [StorageCollections::initialize_state],
436    /// [StorageCollections::prepare_state], and
437    /// [StorageCollections::create_collections_for_bootstrap].
438    pub async fn new(
439        persist_location: PersistLocation,
440        persist_clients: Arc<PersistClientCache>,
441        metrics_registry: &MetricsRegistry,
442        _now: NowFn,
443        txns_metrics: Arc<TxnMetrics>,
444        envd_epoch: NonZeroI64,
445        read_only: bool,
446        connection_context: ConnectionContext,
447        txn: &dyn StorageTxn,
448    ) -> Self {
449        let metrics = StorageCollectionsMetrics::register_into(metrics_registry);
450
451        // This value must be already installed because we must ensure it's
452        // durably recorded before it is used, otherwise we risk leaking persist
453        // state.
454        let txns_id = txn
455            .get_txn_wal_shard()
456            .expect("must call prepare initialization before creating StorageCollections");
457
458        let txns_client = persist_clients
459            .open(persist_location.clone())
460            .await
461            .expect("location should be valid");
462
463        // We have to initialize, so that TxnsRead::start() below does not
464        // block.
465        let _txns_handle: TxnsHandle<SourceData, (), Timestamp, StorageDiff, TxnsCodecRow> =
466            TxnsHandle::open(
467                Timestamp::MIN,
468                txns_client.clone(),
469                txns_client.dyncfgs().clone(),
470                Arc::clone(&txns_metrics),
471                txns_id,
472                Opaque::encode(&PersistEpoch::default()),
473            )
474            .await;
475
476        // For handing to the background task, for listening to upper updates.
477        let (txns_key_schema, txns_val_schema) = TxnsCodecRow::schemas();
478        let mut txns_write = txns_client
479            .open_writer(
480                txns_id,
481                Arc::new(txns_key_schema),
482                Arc::new(txns_val_schema),
483                Diagnostics {
484                    shard_name: "txns".to_owned(),
485                    handle_purpose: "commit txns".to_owned(),
486                },
487            )
488            .await
489            .expect("txns schema shouldn't change");
490
491        let txns_read = TxnsRead::start::<TxnsCodecRow>(txns_client.clone(), txns_id).await;
492
493        let collections = Arc::new(std::sync::Mutex::new(BTreeMap::default()));
494        let finalizable_shards =
495            Arc::new(ShardIdSet::new(metrics.finalization_outstanding.clone()));
496        let finalized_shards =
497            Arc::new(ShardIdSet::new(metrics.finalization_pending_commit.clone()));
498        let config = Arc::new(Mutex::new(StorageConfiguration::new(
499            connection_context,
500            mz_dyncfgs::all_dyncfgs(),
501        )));
502
503        let initial_txn_upper = txns_write.fetch_recent_upper().await.to_owned();
504
505        let (cmd_tx, cmd_rx) = mpsc::unbounded_channel();
506        let (holds_tx, holds_rx) = mpsc::unbounded_channel();
507        let mut background_task = BackgroundTask {
508            config: Arc::clone(&config),
509            cmds_tx: cmd_tx.clone(),
510            cmds_rx: cmd_rx,
511            holds_rx,
512            collections: Arc::clone(&collections),
513            finalizable_shards: Arc::clone(&finalizable_shards),
514            shard_by_id: BTreeMap::new(),
515            since_handles: BTreeMap::new(),
516            txns_handle: Some(txns_write),
517            txns_shards: Default::default(),
518        };
519
520        let background_task =
521            mz_ore::task::spawn(|| "storage_collections::background_task", async move {
522                background_task.run().await
523            });
524
525        let finalize_shards_task = mz_ore::task::spawn(
526            || "storage_collections::finalize_shards_task",
527            finalize_shards_task(FinalizeShardsTaskConfig {
528                envd_epoch: envd_epoch.clone(),
529                config: Arc::clone(&config),
530                metrics,
531                finalizable_shards: Arc::clone(&finalizable_shards),
532                finalized_shards: Arc::clone(&finalized_shards),
533                persist_location: persist_location.clone(),
534                persist: Arc::clone(&persist_clients),
535                read_only,
536            }),
537        );
538
539        Self {
540            finalizable_shards,
541            finalized_shards,
542            collections,
543            txns_read,
544            envd_epoch,
545            read_only,
546            config,
547            initial_txn_upper,
548            persist_location,
549            persist: persist_clients,
550            cmd_tx,
551            holds_tx,
552            _background_task: Arc::new(background_task.abort_on_drop()),
553            _finalize_shards_task: Arc::new(finalize_shards_task.abort_on_drop()),
554        }
555    }
556
557    /// Opens a [WriteHandle] and a [SinceHandleWrapper], for holding back the since.
558    ///
559    /// `since` is an optional since that the read handle will be forwarded to
560    /// if it is less than its current since.
561    ///
562    /// This will `halt!` the process if we cannot successfully acquire a
563    /// critical handle with our current epoch.
564    async fn open_data_handles(
565        &self,
566        id: &GlobalId,
567        shard: ShardId,
568        since: Option<&Antichain<Timestamp>>,
569        relation_desc: RelationDesc,
570        persist_client: &PersistClient,
571    ) -> (
572        WriteHandle<SourceData, (), Timestamp, StorageDiff>,
573        SinceHandleWrapper,
574    ) {
575        let since_handle = if self.read_only {
576            let read_handle = self
577                .open_leased_handle(id, shard, relation_desc.clone(), since, persist_client)
578                .await;
579            SinceHandleWrapper::Leased(read_handle)
580        } else {
581            // We're managing the data for this shard in read-write mode, which would fence out other
582            // processes in read-only mode; it's safe to upgrade the metadata version.
583            persist_client
584                .upgrade_version::<SourceData, (), Timestamp, StorageDiff>(
585                    shard,
586                    Diagnostics {
587                        shard_name: id.to_string(),
588                        handle_purpose: format!("controller data for {}", id),
589                    },
590                )
591                .await
592                .expect("invalid persist usage");
593
594            let since_handle = self
595                .open_critical_handle(id, shard, since, persist_client)
596                .await;
597
598            SinceHandleWrapper::Critical(since_handle)
599        };
600
601        let mut write_handle = self
602            .open_write_handle(id, shard, relation_desc, persist_client)
603            .await;
604
605        // N.B.
606        // Fetch the most recent upper for the write handle. Otherwise, this may
607        // be behind the since of the since handle. Its vital this happens AFTER
608        // we create the since handle as it needs to be linearized with that
609        // operation. It may be true that creating the write handle after the
610        // since handle already ensures this, but we do this out of an abundance
611        // of caution.
612        //
613        // Note that this returns the upper, but also sets it on the handle to
614        // be fetched later.
615        write_handle.fetch_recent_upper().await;
616
617        (write_handle, since_handle)
618    }
619
620    /// Opens a write handle for the given `shard`.
621    async fn open_write_handle(
622        &self,
623        id: &GlobalId,
624        shard: ShardId,
625        relation_desc: RelationDesc,
626        persist_client: &PersistClient,
627    ) -> WriteHandle<SourceData, (), Timestamp, StorageDiff> {
628        let diagnostics = Diagnostics {
629            shard_name: id.to_string(),
630            handle_purpose: format!("controller data for {}", id),
631        };
632
633        let write = persist_client
634            .open_writer(
635                shard,
636                Arc::new(relation_desc),
637                Arc::new(UnitSchema),
638                diagnostics.clone(),
639            )
640            .await
641            .expect("invalid persist usage");
642
643        write
644    }
645
646    /// Opens a critical since handle for the given `shard`.
647    ///
648    /// `since` is an optional since that the read handle will be forwarded to
649    /// if it is less than its current since.
650    ///
651    /// This will `halt!` the process if we cannot successfully acquire a
652    /// critical handle with our current epoch.
653    async fn open_critical_handle(
654        &self,
655        id: &GlobalId,
656        shard: ShardId,
657        since: Option<&Antichain<Timestamp>>,
658        persist_client: &PersistClient,
659    ) -> SinceHandle<SourceData, (), Timestamp, StorageDiff> {
660        tracing::debug!(%id, ?since, "opening critical handle");
661
662        assert!(
663            !self.read_only,
664            "attempting to open critical SinceHandle in read-only mode"
665        );
666
667        let diagnostics = Diagnostics {
668            shard_name: id.to_string(),
669            handle_purpose: format!("controller data for {}", id),
670        };
671
672        // Construct the handle in a separate block to ensure all error paths
673        // are diverging
674        let since_handle = {
675            // This block's aim is to ensure the handle is in terms of our epoch
676            // by the time we return it.
677            let mut handle = persist_client
678                .open_critical_since(
679                    shard,
680                    PersistClient::CONTROLLER_CRITICAL_SINCE,
681                    Opaque::encode(&PersistEpoch::default()),
682                    diagnostics.clone(),
683                )
684                .await
685                .expect("invalid persist usage");
686
687            // Take the join of the handle's since and the provided `since`;
688            // this lets materialized views express the since at which their
689            // read handles "start."
690            let provided_since = match since {
691                Some(since) => since,
692                None => &Antichain::from_elem(Timestamp::MIN),
693            };
694            let since = handle.since().join(provided_since);
695
696            let our_epoch = self.envd_epoch;
697
698            loop {
699                let current_epoch: PersistEpoch = handle.opaque().decode();
700
701                // Ensure the current epoch is <= our epoch.
702                let unchecked_success = current_epoch.0.map(|e| e <= our_epoch).unwrap_or(true);
703
704                if unchecked_success {
705                    // Update the handle's state so that it is in terms of our
706                    // epoch.
707                    let checked_success = handle
708                        .compare_and_downgrade_since(
709                            &Opaque::encode(&current_epoch),
710                            (&Opaque::encode(&PersistEpoch::from(our_epoch)), &since),
711                        )
712                        .await
713                        .is_ok();
714                    if checked_success {
715                        break handle;
716                    }
717                } else {
718                    mz_ore::halt!("fenced by envd @ {current_epoch:?}. ours = {our_epoch}");
719                }
720            }
721        };
722
723        since_handle
724    }
725
726    /// Opens a leased [ReadHandle], for the purpose of holding back a since,
727    /// for the given `shard`.
728    ///
729    /// `since` is an optional since that the read handle will be forwarded to
730    /// if it is less than its current since.
731    async fn open_leased_handle(
732        &self,
733        id: &GlobalId,
734        shard: ShardId,
735        relation_desc: RelationDesc,
736        since: Option<&Antichain<Timestamp>>,
737        persist_client: &PersistClient,
738    ) -> ReadHandle<SourceData, (), Timestamp, StorageDiff> {
739        tracing::debug!(%id, ?since, "opening leased handle");
740
741        let diagnostics = Diagnostics {
742            shard_name: id.to_string(),
743            handle_purpose: format!("controller data for {}", id),
744        };
745
746        let use_critical_since = false;
747        let mut handle: ReadHandle<_, _, _, _> = persist_client
748            .open_leased_reader(
749                shard,
750                Arc::new(relation_desc),
751                Arc::new(UnitSchema),
752                diagnostics.clone(),
753                use_critical_since,
754            )
755            .await
756            .expect("invalid persist usage");
757
758        // Take the join of the handle's since and the provided `since`;
759        // this lets materialized views express the since at which their
760        // read handles "start."
761        let provided_since = match since {
762            Some(since) => since,
763            None => &Antichain::from_elem(Timestamp::MIN),
764        };
765        let since = handle.since().join(provided_since);
766
767        handle.downgrade_since(&since).await;
768
769        handle
770    }
771
772    fn register_handles(
773        &self,
774        id: GlobalId,
775        is_in_txns: bool,
776        since_handle: SinceHandleWrapper,
777        write_handle: WriteHandle<SourceData, (), Timestamp, StorageDiff>,
778    ) {
779        self.send(BackgroundCmd::Register {
780            id,
781            is_in_txns,
782            since_handle,
783            write_handle,
784        });
785    }
786
787    fn send(&self, cmd: BackgroundCmd) {
788        let _ = self.cmd_tx.send(cmd);
789    }
790
791    async fn snapshot_stats_inner(
792        &self,
793        id: GlobalId,
794        as_of: SnapshotStatsAsOf,
795    ) -> Result<SnapshotStats, StorageError> {
796        // TODO: Pull this out of BackgroundTask. Unlike the other methods, the
797        // caller of this one drives it to completion.
798        //
799        // We'd need to either share the critical handle somehow or maybe have
800        // two instances around, one in the worker and one in the
801        // StorageCollections.
802        let (tx, rx) = oneshot::channel();
803        self.send(BackgroundCmd::SnapshotStats(id, as_of, tx));
804        rx.await.expect("BackgroundTask should be live").0.await
805    }
806
807    /// If this identified collection has a dependency, install a read hold on
808    /// it.
809    ///
810    /// This is necessary to ensure that the dependency's since does not advance
811    /// beyond its dependents'.
812    fn install_collection_dependency_read_holds_inner(
813        &self,
814        self_collections: &mut BTreeMap<GlobalId, CollectionState>,
815        id: GlobalId,
816    ) -> Result<(), StorageError> {
817        let (deps, collection_implied_capability) = match self_collections.get(&id) {
818            Some(CollectionState {
819                storage_dependencies: deps,
820                implied_capability,
821                ..
822            }) => (deps.clone(), implied_capability),
823            _ => return Ok(()),
824        };
825
826        for dep in deps.iter() {
827            let dep_collection = self_collections
828                .get(dep)
829                .ok_or(StorageError::IdentifierMissing(id))?;
830
831            mz_ore::soft_assert_or_log!(
832                PartialOrder::less_equal(
833                    &dep_collection.implied_capability,
834                    collection_implied_capability
835                ),
836                "dependency since ({dep}@{:?}) cannot be in advance of dependent's since ({id}@{:?})",
837                dep_collection.implied_capability,
838                collection_implied_capability,
839            );
840        }
841
842        self.install_read_capabilities_inner(
843            self_collections,
844            id,
845            &deps,
846            collection_implied_capability.clone(),
847        )?;
848
849        Ok(())
850    }
851
852    /// Returns the given collection's dependencies.
853    fn determine_collection_dependencies(
854        self_collections: &BTreeMap<GlobalId, CollectionState>,
855        source_id: GlobalId,
856        collection_desc: &CollectionDescription,
857    ) -> Result<Vec<GlobalId>, StorageError> {
858        let mut dependencies = Vec::new();
859
860        if let Some(id) = collection_desc.primary {
861            dependencies.push(id);
862        }
863
864        match &collection_desc.data_source {
865            DataSource::Introspection(_)
866            | DataSource::Webhook
867            | DataSource::Table
868            | DataSource::Progress
869            | DataSource::Other => (),
870            DataSource::IngestionExport {
871                ingestion_id,
872                data_config,
873                ..
874            } => {
875                // Ingestion exports depend on their primary source's remap
876                // collection, except when they use a CDCv2 envelope.
877                let source = self_collections
878                    .get(ingestion_id)
879                    .ok_or(StorageError::IdentifierMissing(*ingestion_id))?;
880                let Some(remap_collection_id) = &source.ingestion_remap_collection_id else {
881                    panic!("SourceExport must refer to a primary source that already exists");
882                };
883
884                match data_config.envelope {
885                    SourceEnvelope::CdcV2 => (),
886                    _ => dependencies.push(*remap_collection_id),
887                }
888            }
889            // Ingestions depend on their remap collection.
890            DataSource::Ingestion(ingestion) => {
891                if ingestion.remap_collection_id != source_id {
892                    dependencies.push(ingestion.remap_collection_id);
893                }
894            }
895            DataSource::Sink { desc } => dependencies.push(desc.sink.from),
896        }
897
898        Ok(dependencies)
899    }
900
901    /// Install read capabilities on the given `storage_dependencies`.
902    #[instrument(level = "debug")]
903    fn install_read_capabilities_inner(
904        &self,
905        self_collections: &mut BTreeMap<GlobalId, CollectionState>,
906        from_id: GlobalId,
907        storage_dependencies: &[GlobalId],
908        read_capability: Antichain<Timestamp>,
909    ) -> Result<(), StorageError> {
910        let mut changes = ChangeBatch::new();
911        for time in read_capability.iter() {
912            changes.update(*time, 1);
913        }
914
915        if tracing::span_enabled!(tracing::Level::TRACE) {
916            // Collecting `user_capabilities` is potentially slow, thus only do it when needed.
917            let user_capabilities = self_collections
918                .iter_mut()
919                .filter(|(id, _c)| id.is_user())
920                .map(|(id, c)| {
921                    let updates = c.read_capabilities.updates().cloned().collect_vec();
922                    (*id, c.implied_capability.clone(), updates)
923                })
924                .collect_vec();
925
926            trace!(
927                %from_id,
928                ?storage_dependencies,
929                ?read_capability,
930                ?user_capabilities,
931                "install_read_capabilities_inner");
932        }
933
934        let mut storage_read_updates = storage_dependencies
935            .iter()
936            .map(|id| (*id, changes.clone()))
937            .collect();
938
939        StorageCollectionsImpl::update_read_capabilities_inner(
940            &self.cmd_tx,
941            self_collections,
942            &mut storage_read_updates,
943        );
944
945        if tracing::span_enabled!(tracing::Level::TRACE) {
946            // Collecting `user_capabilities` is potentially slow, thus only do it when needed.
947            let user_capabilities = self_collections
948                .iter_mut()
949                .filter(|(id, _c)| id.is_user())
950                .map(|(id, c)| {
951                    let updates = c.read_capabilities.updates().cloned().collect_vec();
952                    (*id, c.implied_capability.clone(), updates)
953                })
954                .collect_vec();
955
956            trace!(
957                %from_id,
958                ?storage_dependencies,
959                ?read_capability,
960                ?user_capabilities,
961                "after install_read_capabilities_inner!");
962        }
963
964        Ok(())
965    }
966
967    async fn recent_upper(&self, id: GlobalId) -> Result<Antichain<Timestamp>, StorageError> {
968        let metadata = &self.collection_metadata(id)?;
969        let persist_client = self
970            .persist
971            .open(metadata.persist_location.clone())
972            .await
973            .unwrap();
974        // Duplicate part of open_data_handles here because we don't need the
975        // fetch_recent_upper call. The pubsub-updated shared_upper is enough.
976        let diagnostics = Diagnostics {
977            shard_name: id.to_string(),
978            handle_purpose: format!("controller data for {}", id),
979        };
980        // NB: Opening a WriteHandle is cheap if it's never used in a
981        // compare_and_append operation.
982        let write = persist_client
983            .open_writer::<SourceData, (), Timestamp, StorageDiff>(
984                metadata.data_shard,
985                Arc::new(metadata.relation_desc.clone()),
986                Arc::new(UnitSchema),
987                diagnostics.clone(),
988            )
989            .await
990            .expect("invalid persist usage");
991        Ok(write.shared_upper())
992    }
993
994    async fn read_handle_for_snapshot(
995        persist: Arc<PersistClientCache>,
996        metadata: &CollectionMetadata,
997        id: GlobalId,
998    ) -> Result<ReadHandle<SourceData, (), Timestamp, StorageDiff>, StorageError> {
999        let persist_client = persist
1000            .open(metadata.persist_location.clone())
1001            .await
1002            .unwrap();
1003
1004        // We create a new read handle every time someone requests a snapshot
1005        // and then immediately expire it instead of keeping a read handle
1006        // permanently in our state to avoid having it heartbeat continually.
1007        // The assumption is that calls to snapshot are rare and therefore worth
1008        // it to always create a new handle.
1009        let read_handle = persist_client
1010            .open_leased_reader::<SourceData, (), _, _>(
1011                metadata.data_shard,
1012                Arc::new(metadata.relation_desc.clone()),
1013                Arc::new(UnitSchema),
1014                Diagnostics {
1015                    shard_name: id.to_string(),
1016                    handle_purpose: format!("snapshot {}", id),
1017                },
1018                USE_CRITICAL_SINCE_SNAPSHOT.get(&persist.cfg),
1019            )
1020            .await
1021            .expect("invalid persist usage");
1022        Ok(read_handle)
1023    }
1024
1025    fn snapshot(
1026        &self,
1027        id: GlobalId,
1028        as_of: Timestamp,
1029        txns_read: &TxnsRead<Timestamp>,
1030    ) -> BoxFuture<'static, Result<Vec<(Row, StorageDiff)>, StorageError>> {
1031        let metadata = match self.collection_metadata(id) {
1032            Ok(metadata) => metadata.clone(),
1033            Err(e) => return async { Err(e.into()) }.boxed(),
1034        };
1035        let txns_read = metadata.txns_shard.as_ref().map(|txns_id| {
1036            assert_eq!(txns_id, txns_read.txns_id());
1037            txns_read.clone()
1038        });
1039        let persist = Arc::clone(&self.persist);
1040        async move {
1041            let mut read_handle = Self::read_handle_for_snapshot(persist, &metadata, id).await?;
1042            let contents = match txns_read {
1043                None => {
1044                    // We're not using txn-wal for tables, so we can take a snapshot directly.
1045                    read_handle
1046                        .snapshot_and_fetch(Antichain::from_elem(as_of))
1047                        .await
1048                }
1049                Some(txns_read) => {
1050                    // We _are_ using txn-wal for tables. It advances the physical upper of the
1051                    // shard lazily, so we need to ask it for the snapshot to ensure the read is
1052                    // unblocked.
1053                    //
1054                    // Consider the following scenario:
1055                    // - Table A is written to via txns at time 5
1056                    // - Tables other than A are written to via txns consuming timestamps up to 10
1057                    // - We'd like to read A at 7
1058                    // - The application process of A's txn has advanced the upper to 5+1, but we need
1059                    //   it to be past 7, but the txns shard knows that (5,10) is empty of writes to A
1060                    // - This branch allows it to handle that advancing the physical upper of Table A to
1061                    //   10 (NB but only once we see it get past the write at 5!)
1062                    // - Then we can read it normally.
1063                    txns_read.update_gt(as_of).await;
1064                    let data_snapshot = txns_read.data_snapshot(metadata.data_shard, as_of).await;
1065                    data_snapshot.snapshot_and_fetch(&mut read_handle).await
1066                }
1067            };
1068            match contents {
1069                Ok(contents) => {
1070                    let mut snapshot = Vec::with_capacity(contents.len());
1071                    for ((data, _), _, diff) in contents {
1072                        // TODO(petrosagg): We should accumulate the errors too and let the user
1073                        // interpret the result
1074                        let row = data.0?;
1075                        snapshot.push((row, diff));
1076                    }
1077                    Ok(snapshot)
1078                }
1079                Err(_) => Err(StorageError::ReadBeforeSince(id)),
1080            }
1081        }
1082        .boxed()
1083    }
1084
1085    fn snapshot_and_stream(
1086        &self,
1087        id: GlobalId,
1088        as_of: Timestamp,
1089        txns_read: &TxnsRead<Timestamp>,
1090    ) -> BoxFuture<'static, Result<SourceDataStream, StorageError>> {
1091        use futures::stream::StreamExt;
1092
1093        let metadata = match self.collection_metadata(id) {
1094            Ok(metadata) => metadata.clone(),
1095            Err(e) => return async { Err(e.into()) }.boxed(),
1096        };
1097        let txns_read = metadata.txns_shard.as_ref().map(|txns_id| {
1098            assert_eq!(txns_id, txns_read.txns_id());
1099            txns_read.clone()
1100        });
1101        let persist = Arc::clone(&self.persist);
1102
1103        async move {
1104            let mut read_handle = Self::read_handle_for_snapshot(persist, &metadata, id).await?;
1105            let stream = match txns_read {
1106                None => {
1107                    // We're not using txn-wal for tables, so we can take a snapshot directly.
1108                    read_handle
1109                        .snapshot_and_stream(Antichain::from_elem(as_of))
1110                        .await
1111                        .map_err(|_| StorageError::ReadBeforeSince(id))?
1112                        .boxed()
1113                }
1114                Some(txns_read) => {
1115                    txns_read.update_gt(as_of).await;
1116                    let data_snapshot = txns_read.data_snapshot(metadata.data_shard, as_of).await;
1117                    data_snapshot
1118                        .snapshot_and_stream(&mut read_handle)
1119                        .await
1120                        .map_err(|_| StorageError::ReadBeforeSince(id))?
1121                        .boxed()
1122                }
1123            };
1124
1125            // Map our stream, unwrapping Persist internal errors.
1126            let stream = stream.map(|((data, _v), t, d)| (data, t, d)).boxed();
1127            Ok(stream)
1128        }
1129        .boxed()
1130    }
1131
1132    fn set_read_policies_inner(
1133        &self,
1134        collections: &mut BTreeMap<GlobalId, CollectionState>,
1135        policies: Vec<(GlobalId, ReadPolicy)>,
1136    ) {
1137        trace!("set_read_policies: {:?}", policies);
1138
1139        let mut read_capability_changes = BTreeMap::default();
1140
1141        for (id, policy) in policies.into_iter() {
1142            let collection = match collections.get_mut(&id) {
1143                Some(c) => c,
1144                None => {
1145                    panic!("Reference to absent collection {id}");
1146                }
1147            };
1148
1149            let mut new_read_capability = policy.frontier(collection.write_frontier.borrow());
1150
1151            if PartialOrder::less_equal(&collection.implied_capability, &new_read_capability) {
1152                let mut update = ChangeBatch::new();
1153                update.extend(new_read_capability.iter().map(|time| (*time, 1)));
1154                std::mem::swap(&mut collection.implied_capability, &mut new_read_capability);
1155                update.extend(new_read_capability.iter().map(|time| (*time, -1)));
1156                if !update.is_empty() {
1157                    read_capability_changes.insert(id, update);
1158                }
1159            }
1160
1161            collection.read_policy = policy;
1162        }
1163
1164        for (id, changes) in read_capability_changes.iter() {
1165            if id.is_user() {
1166                trace!(%id, ?changes, "in set_read_policies, capability changes");
1167            }
1168        }
1169
1170        if !read_capability_changes.is_empty() {
1171            StorageCollectionsImpl::update_read_capabilities_inner(
1172                &self.cmd_tx,
1173                collections,
1174                &mut read_capability_changes,
1175            );
1176        }
1177    }
1178
1179    // This is not an associated function so that we can share it with the task
1180    // that updates the persist handles and also has a reference to the shared
1181    // collections state.
1182    fn update_read_capabilities_inner(
1183        cmd_tx: &mpsc::UnboundedSender<BackgroundCmd>,
1184        collections: &mut BTreeMap<GlobalId, CollectionState>,
1185        updates: &mut BTreeMap<GlobalId, ChangeBatch<Timestamp>>,
1186    ) {
1187        // Location to record consequences that we need to act on.
1188        let mut collections_net = BTreeMap::new();
1189
1190        // We must not rely on any specific relative ordering of `GlobalId`s.
1191        // That said, it is reasonable to assume that collections generally have
1192        // greater IDs than their dependencies, so starting with the largest is
1193        // a useful optimization.
1194        while let Some(id) = updates.keys().rev().next().cloned() {
1195            let mut update = updates.remove(&id).unwrap();
1196
1197            if id.is_user() {
1198                trace!(id = ?id, update = ?update, "update_read_capabilities");
1199            }
1200
1201            let collection = if let Some(c) = collections.get_mut(&id) {
1202                c
1203            } else {
1204                let has_positive_updates = update.iter().any(|(_ts, diff)| *diff > 0);
1205                if has_positive_updates {
1206                    panic!(
1207                        "reference to absent collection {id} but we have positive updates: {:?}",
1208                        update
1209                    );
1210                } else {
1211                    // Continue purely negative updates. Someone has probably
1212                    // already dropped this collection!
1213                    continue;
1214                }
1215            };
1216
1217            let current_read_capabilities = collection.read_capabilities.frontier().to_owned();
1218            for (time, diff) in update.iter() {
1219                assert!(
1220                    collection.read_capabilities.count_for(time) + diff >= 0,
1221                    "update {:?} for collection {id} would lead to negative \
1222                        read capabilities, read capabilities before applying: {:?}",
1223                    update,
1224                    collection.read_capabilities
1225                );
1226
1227                if collection.read_capabilities.count_for(time) + diff > 0 {
1228                    assert!(
1229                        current_read_capabilities.less_equal(time),
1230                        "update {:?} for collection {id} is trying to \
1231                            install read capabilities before the current \
1232                            frontier of read capabilities, read capabilities before applying: {:?}",
1233                        update,
1234                        collection.read_capabilities
1235                    );
1236                }
1237            }
1238
1239            let changes = collection.read_capabilities.update_iter(update.drain());
1240            update.extend(changes);
1241
1242            if id.is_user() {
1243                trace!(
1244                %id,
1245                ?collection.storage_dependencies,
1246                ?update,
1247                "forwarding update to storage dependencies");
1248            }
1249
1250            for id in collection.storage_dependencies.iter() {
1251                updates
1252                    .entry(*id)
1253                    .or_insert_with(ChangeBatch::new)
1254                    .extend(update.iter().cloned());
1255            }
1256
1257            let (changes, frontier) = collections_net
1258                .entry(id)
1259                .or_insert_with(|| (<ChangeBatch<_>>::new(), Antichain::new()));
1260
1261            changes.extend(update.drain());
1262            *frontier = collection.read_capabilities.frontier().to_owned();
1263        }
1264
1265        // Translate our net compute actions into downgrades of persist sinces.
1266        // The actual downgrades are performed by a Tokio task asynchronously.
1267        let mut persist_compaction_commands = Vec::with_capacity(collections_net.len());
1268        for (key, (mut changes, frontier)) in collections_net {
1269            if !changes.is_empty() {
1270                // If the collection has a "primary" collection, let that primary drive compaction.
1271                let collection = collections.get(&key).expect("must still exist");
1272                let should_emit_persist_compaction = collection.primary.is_none();
1273
1274                if frontier.is_empty() {
1275                    info!(id = %key, "removing collection state because the since advanced to []!");
1276                    collections.remove(&key).expect("must still exist");
1277                }
1278
1279                if should_emit_persist_compaction {
1280                    persist_compaction_commands.push((key, frontier));
1281                }
1282            }
1283        }
1284
1285        if !persist_compaction_commands.is_empty() {
1286            cmd_tx
1287                .send(BackgroundCmd::DowngradeSince(persist_compaction_commands))
1288                .expect("cannot fail to send");
1289        }
1290    }
1291
1292    /// Remove any shards that we know are finalized
1293    fn synchronize_finalized_shards(&self, storage_metadata: &StorageMetadata) {
1294        self.finalized_shards
1295            .lock()
1296            .retain(|shard| storage_metadata.unfinalized_shards.contains(shard));
1297    }
1298}
1299
1300// See comments on the above impl for StorageCollectionsImpl.
1301#[async_trait]
1302impl StorageCollections for StorageCollectionsImpl {
1303    async fn initialize_state(
1304        &self,
1305        txn: &mut (dyn StorageTxn + Send),
1306        init_ids: BTreeSet<GlobalId>,
1307    ) -> Result<(), StorageError> {
1308        let metadata = txn.get_collection_metadata();
1309        let existing_metadata: BTreeSet<_> = metadata.into_iter().map(|(id, _)| id).collect();
1310
1311        // Determine which collections we do not yet have metadata for.
1312        let new_collections: BTreeSet<GlobalId> =
1313            init_ids.difference(&existing_metadata).cloned().collect();
1314
1315        self.prepare_state(
1316            txn,
1317            new_collections,
1318            BTreeSet::default(),
1319            BTreeMap::default(),
1320        )
1321        .await?;
1322
1323        // All shards that belong to collections dropped in the last epoch are
1324        // eligible for finalization.
1325        //
1326        // n.b. this introduces an unlikely race condition: if a collection is
1327        // dropped from the catalog, but the dataflow is still running on a
1328        // worker, assuming the shard is safe to finalize on reboot may cause
1329        // the cluster to panic.
1330        let unfinalized_shards = txn.get_unfinalized_shards().into_iter().collect_vec();
1331
1332        info!(?unfinalized_shards, "initializing finalizable_shards");
1333
1334        self.finalizable_shards.lock().extend(unfinalized_shards);
1335
1336        Ok(())
1337    }
1338
1339    fn update_parameters(&self, config_params: StorageParameters) {
1340        // We serialize the dyncfg updates in StorageParameters, but configure
1341        // persist separately.
1342        config_params.dyncfg_updates.apply(self.persist.cfg());
1343
1344        self.config
1345            .lock()
1346            .expect("lock poisoned")
1347            .update(config_params);
1348    }
1349
1350    fn collection_metadata(&self, id: GlobalId) -> Result<CollectionMetadata, CollectionMissing> {
1351        let collections = self.collections.lock().expect("lock poisoned");
1352
1353        collections
1354            .get(&id)
1355            .map(|c| c.collection_metadata.clone())
1356            .ok_or(CollectionMissing(id))
1357    }
1358
1359    fn active_collection_metadatas(&self) -> Vec<(GlobalId, CollectionMetadata)> {
1360        let collections = self.collections.lock().expect("lock poisoned");
1361
1362        collections
1363            .iter()
1364            .filter(|(_id, c)| !c.is_dropped())
1365            .map(|(id, c)| (*id, c.collection_metadata.clone()))
1366            .collect()
1367    }
1368
1369    fn collections_frontiers(
1370        &self,
1371        ids: Vec<GlobalId>,
1372    ) -> Result<Vec<CollectionFrontiers>, CollectionMissing> {
1373        if ids.is_empty() {
1374            return Ok(vec![]);
1375        }
1376
1377        let collections = self.collections.lock().expect("lock poisoned");
1378
1379        let res = ids
1380            .into_iter()
1381            .map(|id| {
1382                collections
1383                    .get(&id)
1384                    .map(|c| CollectionFrontiers {
1385                        id: id.clone(),
1386                        write_frontier: c.write_frontier.clone(),
1387                        implied_capability: c.implied_capability.clone(),
1388                        read_capabilities: c.read_capabilities.frontier().to_owned(),
1389                    })
1390                    .ok_or(CollectionMissing(id))
1391            })
1392            .collect::<Result<Vec<_>, _>>()?;
1393
1394        Ok(res)
1395    }
1396
1397    fn active_collection_frontiers(&self) -> Vec<CollectionFrontiers> {
1398        let collections = self.collections.lock().expect("lock poisoned");
1399
1400        let res = collections
1401            .iter()
1402            .filter(|(_id, c)| !c.is_dropped())
1403            .map(|(id, c)| CollectionFrontiers {
1404                id: id.clone(),
1405                write_frontier: c.write_frontier.clone(),
1406                implied_capability: c.implied_capability.clone(),
1407                read_capabilities: c.read_capabilities.frontier().to_owned(),
1408            })
1409            .collect_vec();
1410
1411        res
1412    }
1413
1414    async fn snapshot_stats(
1415        &self,
1416        id: GlobalId,
1417        as_of: Antichain<Timestamp>,
1418    ) -> Result<SnapshotStats, StorageError> {
1419        let metadata = self.collection_metadata(id)?;
1420
1421        // See the comments in StorageController::snapshot for what's going on
1422        // here.
1423        let as_of = match metadata.txns_shard.as_ref() {
1424            None => SnapshotStatsAsOf::Direct(as_of),
1425            Some(txns_id) => {
1426                assert_eq!(txns_id, self.txns_read.txns_id());
1427                let as_of = as_of
1428                    .into_option()
1429                    .expect("cannot read as_of the empty antichain");
1430                self.txns_read.update_gt(as_of).await;
1431                let data_snapshot = self
1432                    .txns_read
1433                    .data_snapshot(metadata.data_shard, as_of)
1434                    .await;
1435                SnapshotStatsAsOf::Txns(data_snapshot)
1436            }
1437        };
1438        self.snapshot_stats_inner(id, as_of).await
1439    }
1440
1441    async fn snapshot_parts_stats(
1442        &self,
1443        id: GlobalId,
1444        as_of: Antichain<Timestamp>,
1445    ) -> BoxFuture<'static, Result<SnapshotPartsStats, StorageError>> {
1446        let metadata = {
1447            let self_collections = self.collections.lock().expect("lock poisoned");
1448
1449            let collection_metadata = self_collections
1450                .get(&id)
1451                .ok_or(StorageError::IdentifierMissing(id))
1452                .map(|c| c.collection_metadata.clone());
1453
1454            match collection_metadata {
1455                Ok(m) => m,
1456                Err(e) => return Box::pin(async move { Err(e) }),
1457            }
1458        };
1459
1460        // See the comments in StorageController::snapshot for what's going on
1461        // here.
1462        let persist = Arc::clone(&self.persist);
1463        let read_handle = Self::read_handle_for_snapshot(persist, &metadata, id).await;
1464
1465        let data_snapshot = match (metadata, as_of.as_option()) {
1466            (
1467                CollectionMetadata {
1468                    txns_shard: Some(txns_id),
1469                    data_shard,
1470                    ..
1471                },
1472                Some(as_of),
1473            ) => {
1474                assert_eq!(txns_id, *self.txns_read.txns_id());
1475                self.txns_read.update_gt(*as_of).await;
1476                let data_snapshot = self.txns_read.data_snapshot(data_shard, *as_of).await;
1477                Some(data_snapshot)
1478            }
1479            _ => None,
1480        };
1481
1482        Box::pin(async move {
1483            let read_handle = read_handle?;
1484            let result = match data_snapshot {
1485                Some(data_snapshot) => data_snapshot.snapshot_parts_stats(&read_handle).await,
1486                None => read_handle.snapshot_parts_stats(as_of).await,
1487            };
1488            read_handle.expire().await;
1489            result.map_err(|_| StorageError::ReadBeforeSince(id))
1490        })
1491    }
1492
1493    fn snapshot(
1494        &self,
1495        id: GlobalId,
1496        as_of: Timestamp,
1497    ) -> BoxFuture<'static, Result<Vec<(Row, StorageDiff)>, StorageError>> {
1498        self.snapshot(id, as_of, &self.txns_read)
1499    }
1500
1501    async fn snapshot_latest(&self, id: GlobalId) -> Result<Vec<Row>, StorageError> {
1502        let upper = self.recent_upper(id).await?;
1503        let res = match upper.as_option() {
1504            Some(f) if f > &Timestamp::MIN => {
1505                let as_of = f.step_back().expect("checked that f > &Timestamp::MIN");
1506
1507                let snapshot = self.snapshot(id, as_of, &self.txns_read).await?;
1508                snapshot
1509                    .into_iter()
1510                    .map(|(row, diff)| {
1511                        // See the trait doc: `snapshot_latest` is only meant for collections that
1512                        // consolidate to a set.
1513                        assert_eq!(diff, 1, "snapshot doesn't accumulate to set");
1514                        row
1515                    })
1516                    .collect()
1517            }
1518            Some(_min) => {
1519                // The collection must be empty!
1520                Vec::new()
1521            }
1522            // The collection is closed, we cannot determine a latest read
1523            // timestamp based on the upper.
1524            _ => {
1525                return Err(StorageError::InvalidUsage(
1526                    "collection closed, cannot determine a read timestamp based on the upper"
1527                        .to_string(),
1528                ));
1529            }
1530        };
1531
1532        Ok(res)
1533    }
1534
1535    fn snapshot_cursor(
1536        &self,
1537        id: GlobalId,
1538        as_of: Timestamp,
1539    ) -> BoxFuture<'static, Result<SnapshotCursor, StorageError>> {
1540        let metadata = match self.collection_metadata(id) {
1541            Ok(metadata) => metadata.clone(),
1542            Err(e) => return async { Err(e.into()) }.boxed(),
1543        };
1544        let txns_read = metadata.txns_shard.as_ref().map(|txns_id| {
1545            // Ensure the txn's shard the controller has is the same that this
1546            // collection is registered to.
1547            assert_eq!(txns_id, self.txns_read.txns_id());
1548            self.txns_read.clone()
1549        });
1550        let persist = Arc::clone(&self.persist);
1551
1552        // See the comments in Self::snapshot for what's going on here.
1553        async move {
1554            let mut handle = Self::read_handle_for_snapshot(persist, &metadata, id).await?;
1555            let cursor = match txns_read {
1556                None => {
1557                    let cursor = handle
1558                        .snapshot_cursor(Antichain::from_elem(as_of), |_| true)
1559                        .await
1560                        .map_err(|_| StorageError::ReadBeforeSince(id))?;
1561                    SnapshotCursor {
1562                        _read_handle: handle,
1563                        cursor,
1564                    }
1565                }
1566                Some(txns_read) => {
1567                    txns_read.update_gt(as_of).await;
1568                    let data_snapshot = txns_read.data_snapshot(metadata.data_shard, as_of).await;
1569                    let cursor = data_snapshot
1570                        .snapshot_cursor(&mut handle, |_| true)
1571                        .await
1572                        .map_err(|_| StorageError::ReadBeforeSince(id))?;
1573                    SnapshotCursor {
1574                        _read_handle: handle,
1575                        cursor,
1576                    }
1577                }
1578            };
1579
1580            Ok(cursor)
1581        }
1582        .boxed()
1583    }
1584
1585    fn snapshot_and_stream(
1586        &self,
1587        id: GlobalId,
1588        as_of: Timestamp,
1589    ) -> BoxFuture<
1590        'static,
1591        Result<BoxStream<'static, (SourceData, Timestamp, StorageDiff)>, StorageError>,
1592    > {
1593        self.snapshot_and_stream(id, as_of, &self.txns_read)
1594    }
1595
1596    fn create_update_builder(
1597        &self,
1598        id: GlobalId,
1599    ) -> BoxFuture<
1600        'static,
1601        Result<TimestamplessUpdateBuilder<SourceData, (), StorageDiff>, StorageError>,
1602    > {
1603        let metadata = match self.collection_metadata(id) {
1604            Ok(m) => m,
1605            Err(e) => return Box::pin(async move { Err(e.into()) }),
1606        };
1607        let persist = Arc::clone(&self.persist);
1608
1609        async move {
1610            let persist_client = persist
1611                .open(metadata.persist_location.clone())
1612                .await
1613                .expect("invalid persist usage");
1614            let write_handle = persist_client
1615                .open_writer::<SourceData, (), Timestamp, StorageDiff>(
1616                    metadata.data_shard,
1617                    Arc::new(metadata.relation_desc.clone()),
1618                    Arc::new(UnitSchema),
1619                    Diagnostics {
1620                        shard_name: id.to_string(),
1621                        handle_purpose: format!("create write batch {}", id),
1622                    },
1623                )
1624                .await
1625                .expect("invalid persist usage");
1626            let builder = TimestamplessUpdateBuilder::new(&write_handle);
1627
1628            Ok(builder)
1629        }
1630        .boxed()
1631    }
1632
1633    fn check_exists(&self, id: GlobalId) -> Result<(), StorageError> {
1634        let collections = self.collections.lock().expect("lock poisoned");
1635
1636        if collections.contains_key(&id) {
1637            Ok(())
1638        } else {
1639            Err(StorageError::IdentifierMissing(id))
1640        }
1641    }
1642
1643    async fn prepare_state(
1644        &self,
1645        txn: &mut (dyn StorageTxn + Send),
1646        ids_to_add: BTreeSet<GlobalId>,
1647        ids_to_drop: BTreeSet<GlobalId>,
1648        ids_to_register: BTreeMap<GlobalId, ShardId>,
1649    ) -> Result<(), StorageError> {
1650        txn.insert_collection_metadata(
1651            ids_to_add
1652                .into_iter()
1653                .map(|id| (id, ShardId::new()))
1654                .collect(),
1655        )?;
1656        txn.insert_collection_metadata(ids_to_register)?;
1657
1658        // Delete the metadata for any dropped collections.
1659        let dropped_mappings = txn.delete_collection_metadata(ids_to_drop);
1660
1661        // Only finalize the shards of dropped collections that don't have a primary.
1662        // Otherwise the shard might still be in use by the primary.
1663        let mut dropped_shards = BTreeSet::new();
1664        {
1665            let collections = self.collections.lock().expect("poisoned");
1666            for (id, shard) in dropped_mappings {
1667                let coll = collections.get(&id).expect("must exist");
1668                if coll.primary.is_none() {
1669                    dropped_shards.insert(shard);
1670                }
1671            }
1672        }
1673        txn.insert_unfinalized_shards(dropped_shards)?;
1674
1675        // Reconcile any shards we've successfully finalized with the shard
1676        // finalization collection.
1677        let finalized_shards = self.finalized_shards.lock().iter().copied().collect();
1678        txn.mark_shards_as_finalized(finalized_shards);
1679
1680        Ok(())
1681    }
1682
1683    // TODO(aljoscha): It would be swell if we could refactor this Leviathan of
1684    // a method/move individual parts to their own methods.
1685    #[instrument(level = "debug")]
1686    async fn create_collections_for_bootstrap(
1687        &self,
1688        storage_metadata: &StorageMetadata,
1689        register_ts: Option<Timestamp>,
1690        mut collections: Vec<(GlobalId, CollectionDescription)>,
1691        migrated_storage_collections: &BTreeSet<GlobalId>,
1692    ) -> Result<(), StorageError> {
1693        let is_in_txns = |id, metadata: &CollectionMetadata| {
1694            metadata.txns_shard.is_some()
1695                && !(self.read_only && migrated_storage_collections.contains(&id))
1696        };
1697
1698        // Validate first, to avoid corrupting state.
1699        // 1. create a dropped identifier, or
1700        // 2. create an existing identifier with a new description.
1701        // Make sure to check for errors within `ingestions` as well.
1702        collections.sort_by_key(|(id, _)| *id);
1703        collections.dedup();
1704        for pos in 1..collections.len() {
1705            if collections[pos - 1].0 == collections[pos].0 {
1706                return Err(StorageError::CollectionIdReused(collections[pos].0));
1707            }
1708        }
1709
1710        // We first enrich each collection description with some additional
1711        // metadata...
1712        let enriched_with_metadata = collections
1713            .into_iter()
1714            .map(|(id, description)| {
1715                let data_shard = storage_metadata.get_collection_shard(id)?;
1716
1717                // If the shard is being managed by txn-wal (initially,
1718                // tables), then we need to pass along the shard id for the txns
1719                // shard to dataflow rendering.
1720                let txns_shard = description
1721                    .data_source
1722                    .in_txns()
1723                    .then(|| *self.txns_read.txns_id());
1724
1725                let metadata = CollectionMetadata {
1726                    persist_location: self.persist_location.clone(),
1727                    data_shard,
1728                    relation_desc: description.desc.clone(),
1729                    txns_shard,
1730                };
1731
1732                Ok((id, description, metadata))
1733            })
1734            .collect_vec();
1735
1736        // So that we can open `SinceHandle`s for each collections concurrently.
1737        let persist_client = self
1738            .persist
1739            .open(self.persist_location.clone())
1740            .await
1741            .unwrap();
1742        let persist_client = &persist_client;
1743        // Reborrow the `&mut self` as immutable, as all the concurrent work to
1744        // be processed in this stream cannot all have exclusive access.
1745        use futures::stream::{StreamExt, TryStreamExt};
1746        let this = &*self;
1747        let mut to_register: Vec<_> = futures::stream::iter(enriched_with_metadata)
1748            .map(|data: Result<_, StorageError>| {
1749                async move {
1750                    let (id, description, metadata) = data?;
1751
1752                    // should be replaced with real introspection
1753                    // (https://github.com/MaterializeInc/database-issues/issues/4078)
1754                    // but for now, it's helpful to have this mapping written down
1755                    // somewhere
1756                    debug!("mapping GlobalId={} to shard ({})", id, metadata.data_shard);
1757
1758                    // If this collection has a primary, the primary is responsible for downgrading
1759                    // the critical since and it would be an error if we did so here while opening
1760                    // the since handle.
1761                    let since = if description.primary.is_some() {
1762                        None
1763                    } else {
1764                        description.since.as_ref()
1765                    };
1766
1767                    let (write, mut since_handle) = this
1768                        .open_data_handles(
1769                            &id,
1770                            metadata.data_shard,
1771                            since,
1772                            metadata.relation_desc.clone(),
1773                            persist_client,
1774                        )
1775                        .await;
1776
1777                    // Present tables as springing into existence at the register_ts
1778                    // by advancing the since. Otherwise, we could end up in a
1779                    // situation where a table with a long compaction window appears
1780                    // to exist before the environment (and this the table) existed.
1781                    //
1782                    // We could potentially also do the same thing for other
1783                    // sources, in particular storage's internal sources and perhaps
1784                    // others, but leave them for now.
1785                    match description.data_source {
1786                        DataSource::Introspection(_)
1787                        | DataSource::IngestionExport { .. }
1788                        | DataSource::Webhook
1789                        | DataSource::Ingestion(_)
1790                        | DataSource::Progress
1791                        | DataSource::Other => {}
1792                        DataSource::Sink { .. } => {}
1793                        DataSource::Table => {
1794                            let register_ts = register_ts.expect(
1795                                "caller should have provided a register_ts when creating a table",
1796                            );
1797                            if since_handle.since().elements() == &[Timestamp::MIN]
1798                                && !migrated_storage_collections.contains(&id)
1799                            {
1800                                debug!("advancing {} to initial since of {:?}", id, register_ts);
1801                                let token = since_handle.opaque();
1802                                let _ = since_handle
1803                                    .compare_and_downgrade_since(
1804                                        &token,
1805                                        (&token, &Antichain::from_elem(register_ts)),
1806                                    )
1807                                    .await;
1808                            }
1809                        }
1810                    }
1811
1812                    Ok::<_, StorageError>((id, description, write, since_handle, metadata))
1813                }
1814            })
1815            // Poll each future for each collection concurrently, maximum of 50 at a time.
1816            .buffer_unordered(50)
1817            // HERE BE DRAGONS:
1818            //
1819            // There are at least 2 subtleties in using `FuturesUnordered`
1820            // (which `buffer_unordered` uses underneath:
1821            // - One is captured here
1822            //   <https://github.com/rust-lang/futures-rs/issues/2387>
1823            // - And the other is deadlocking if processing an OUTPUT of a
1824            //   `FuturesUnordered` stream attempts to obtain an async mutex that
1825            //   is also obtained in the futures being polled.
1826            //
1827            // Both of these could potentially be issues in all usages of
1828            // `buffer_unordered` in this method, so we stick the standard
1829            // advice: only use `try_collect` or `collect`!
1830            .try_collect()
1831            .await?;
1832
1833        // Reorder in dependency order.
1834        #[derive(Ord, PartialOrd, Eq, PartialEq)]
1835        enum DependencyOrder {
1836            /// Tables should always be registered first, and large ids before small ones.
1837            Table(Reverse<GlobalId>),
1838            /// For most collections the id order is the correct one.
1839            Collection(GlobalId),
1840            /// Sinks should always be registered last.
1841            Sink(GlobalId),
1842        }
1843        to_register.sort_by_key(|(id, desc, ..)| match &desc.data_source {
1844            DataSource::Table => DependencyOrder::Table(Reverse(*id)),
1845            DataSource::Sink { .. } => DependencyOrder::Sink(*id),
1846            _ => DependencyOrder::Collection(*id),
1847        });
1848
1849        // We hold this lock for a very short amount of time, just doing some
1850        // hashmap inserts and unbounded channel sends.
1851        let mut self_collections = self.collections.lock().expect("lock poisoned");
1852
1853        for (id, description, write_handle, since_handle, metadata) in to_register {
1854            let write_frontier = write_handle.upper();
1855            let data_shard_since = since_handle.since().clone();
1856
1857            // Determine if this collection has any dependencies.
1858            let storage_dependencies =
1859                Self::determine_collection_dependencies(&*self_collections, id, &description)?;
1860
1861            // Determine the initial since of the collection.
1862            let initial_since = match storage_dependencies
1863                .iter()
1864                .at_most_one()
1865                .expect("should have at most one dependency")
1866            {
1867                Some(dep) => {
1868                    let dependency_collection = self_collections
1869                        .get(dep)
1870                        .ok_or(StorageError::IdentifierMissing(*dep))?;
1871                    let dependency_since = dependency_collection.implied_capability.clone();
1872
1873                    // If an item has a dependency, its initial since must be
1874                    // advanced as far as its dependency, i.e. a dependency's
1875                    // since may never be in advance of its dependents.
1876                    //
1877                    // We have to do this every time we initialize the
1878                    // collection, though––the invariant might have been upheld
1879                    // correctly in the previous epoch, but the
1880                    // `data_shard_since` might not have compacted and, on
1881                    // establishing a new persist connection, still have data we
1882                    // said _could_ be compacted.
1883                    if PartialOrder::less_than(&data_shard_since, &dependency_since) {
1884                        // The dependency since cannot be beyond the dependent
1885                        // (our) upper unless the collection is new. In
1886                        // practice, the depdenency is the remap shard of a
1887                        // source (export), and if the since is allowed to
1888                        // "catch up" to the upper, that is `upper <= since`, a
1889                        // restarting ingestion cannot differentiate between
1890                        // updates that have already been written out to the
1891                        // backing persist shard and updates that have yet to be
1892                        // written. We would write duplicate updates.
1893                        //
1894                        // If this check fails, it means that the read hold
1895                        // installed on the dependency was probably not upheld
1896                        // –– if it were, the dependency's since could not have
1897                        // advanced as far the dependent's upper.
1898                        //
1899                        // We don't care about the dependency since when the
1900                        // write frontier is empty. In that case, no-one can
1901                        // write down any more updates.
1902                        mz_ore::soft_assert_or_log!(
1903                            write_frontier.elements() == &[Timestamp::MIN]
1904                                || write_frontier.is_empty()
1905                                || PartialOrder::less_than(&dependency_since, write_frontier),
1906                            "dependency ({dep}) since has advanced past dependent ({id}) upper \n
1907                            dependent ({id}): since {:?}, upper {:?} \n
1908                            dependency ({dep}): since {:?}",
1909                            data_shard_since,
1910                            write_frontier,
1911                            dependency_since
1912                        );
1913
1914                        dependency_since
1915                    } else {
1916                        data_shard_since
1917                    }
1918                }
1919                None => data_shard_since,
1920            };
1921
1922            // Determine the time dependence of the collection.
1923            let time_dependence = {
1924                use DataSource::*;
1925                if let Some(timeline) = &description.timeline
1926                    && *timeline != Timeline::EpochMilliseconds
1927                {
1928                    // Only the epoch timeline follows wall-clock.
1929                    None
1930                } else {
1931                    match &description.data_source {
1932                        Ingestion(ingestion) => {
1933                            use GenericSourceConnection::*;
1934                            match ingestion.desc.connection {
1935                                // Kafka, Postgres, MySql, and SQL Server sources all
1936                                // follow wall clock.
1937                                Kafka(_) | Postgres(_) | MySql(_) | SqlServer(_) => {
1938                                    Some(TimeDependence::default())
1939                                }
1940                                // Load generators not further specified.
1941                                LoadGenerator(_) => None,
1942                            }
1943                        }
1944                        IngestionExport { ingestion_id, .. } => {
1945                            let c = self_collections.get(ingestion_id).expect("known to exist");
1946                            c.time_dependence.clone()
1947                        }
1948                        // Introspection, other, progress, table, and webhook sources follow wall clock.
1949                        Introspection(_) | Progress | Table { .. } | Webhook { .. } => {
1950                            Some(TimeDependence::default())
1951                        }
1952                        // Materialized views, continual tasks, etc, aren't managed by storage.
1953                        Other => None,
1954                        Sink { .. } => None,
1955                    }
1956                }
1957            };
1958
1959            let ingestion_remap_collection_id = match &description.data_source {
1960                DataSource::Ingestion(desc) => Some(desc.remap_collection_id),
1961                _ => None,
1962            };
1963
1964            let mut collection_state = CollectionState::new(
1965                description.primary,
1966                time_dependence,
1967                ingestion_remap_collection_id,
1968                initial_since,
1969                write_frontier.clone(),
1970                storage_dependencies,
1971                metadata.clone(),
1972            );
1973
1974            // Install the collection state in the appropriate spot.
1975            match &description.data_source {
1976                DataSource::Introspection(_) => {
1977                    self_collections.insert(id, collection_state);
1978                }
1979                DataSource::Webhook => {
1980                    self_collections.insert(id, collection_state);
1981                }
1982                DataSource::IngestionExport { .. } => {
1983                    self_collections.insert(id, collection_state);
1984                }
1985                DataSource::Table => {
1986                    // See comment on self.initial_txn_upper on why we're doing
1987                    // this.
1988                    if is_in_txns(id, &metadata)
1989                        && PartialOrder::less_than(
1990                            &collection_state.write_frontier,
1991                            &self.initial_txn_upper,
1992                        )
1993                    {
1994                        // We could try and be cute and use the join of the txn
1995                        // upper and the table upper. But that has more
1996                        // complicated reasoning for why it is or isn't correct,
1997                        // and we're only dealing with totally ordered times
1998                        // here.
1999                        collection_state
2000                            .write_frontier
2001                            .clone_from(&self.initial_txn_upper);
2002                    }
2003                    self_collections.insert(id, collection_state);
2004                }
2005                DataSource::Progress | DataSource::Other => {
2006                    self_collections.insert(id, collection_state);
2007                }
2008                DataSource::Ingestion(_) => {
2009                    self_collections.insert(id, collection_state);
2010                }
2011                DataSource::Sink { .. } => {
2012                    self_collections.insert(id, collection_state);
2013                }
2014            }
2015
2016            self.register_handles(id, is_in_txns(id, &metadata), since_handle, write_handle);
2017
2018            // If this collection has a dependency, install a read hold on it.
2019            self.install_collection_dependency_read_holds_inner(&mut *self_collections, id)?;
2020        }
2021
2022        drop(self_collections);
2023
2024        self.synchronize_finalized_shards(storage_metadata);
2025
2026        Ok(())
2027    }
2028
2029    async fn alter_table_desc(
2030        &self,
2031        existing_collection: GlobalId,
2032        new_collection: GlobalId,
2033        new_desc: RelationDesc,
2034        expected_version: RelationVersion,
2035    ) -> Result<(), StorageError> {
2036        let data_shard = {
2037            let self_collections = self.collections.lock().expect("lock poisoned");
2038            let existing = self_collections
2039                .get(&existing_collection)
2040                .ok_or_else(|| StorageError::IdentifierMissing(existing_collection))?;
2041
2042            existing.collection_metadata.data_shard
2043        };
2044
2045        let persist_client = self
2046            .persist
2047            .open(self.persist_location.clone())
2048            .await
2049            .unwrap();
2050
2051        // Evolve the schema of this shard.
2052        let diagnostics = Diagnostics {
2053            shard_name: existing_collection.to_string(),
2054            handle_purpose: "alter_table_desc".to_string(),
2055        };
2056        // We map the Adapter's RelationVersion 1:1 with SchemaId.
2057        let expected_schema = expected_version.into();
2058        let schema_result = persist_client
2059            .compare_and_evolve_schema::<SourceData, (), Timestamp, StorageDiff>(
2060                data_shard,
2061                expected_schema,
2062                &new_desc,
2063                &UnitSchema,
2064                diagnostics,
2065            )
2066            .await
2067            .map_err(|e| StorageError::InvalidUsage(e.to_string()))?;
2068        tracing::info!(
2069            ?existing_collection,
2070            ?new_collection,
2071            ?new_desc,
2072            "evolved schema"
2073        );
2074
2075        match schema_result {
2076            CaESchema::Ok(id) => id,
2077            // TODO(alter_table): If we get an expected mismatch we should retry.
2078            CaESchema::ExpectedMismatch {
2079                schema_id,
2080                key,
2081                val,
2082            } => {
2083                mz_ore::soft_panic_or_log!(
2084                    "schema expectation mismatch {schema_id:?}, {key:?}, {val:?}"
2085                );
2086                return Err(StorageError::Generic(anyhow::anyhow!(
2087                    "schema expected mismatch, {existing_collection:?}",
2088                )));
2089            }
2090            CaESchema::Incompatible => {
2091                mz_ore::soft_panic_or_log!(
2092                    "incompatible schema! {existing_collection} {new_desc:?}"
2093                );
2094                return Err(StorageError::Generic(anyhow::anyhow!(
2095                    "schema incompatible, {existing_collection:?}"
2096                )));
2097            }
2098        };
2099
2100        // Once the new schema is registered we can open new data handles.
2101        let (write_handle, since_handle) = self
2102            .open_data_handles(
2103                &new_collection,
2104                data_shard,
2105                None,
2106                new_desc.clone(),
2107                &persist_client,
2108            )
2109            .await;
2110
2111        // TODO(alter_table): Do we need to advance the since of the table to match the time this
2112        // new version was registered with txn-wal?
2113
2114        // Great! Our new schema is registered with Persist, now we need to update our internal
2115        // data structures.
2116        {
2117            let mut self_collections = self.collections.lock().expect("lock poisoned");
2118
2119            // Update the existing collection so we know it's a "projection" of this new one.
2120            let existing = self_collections
2121                .get_mut(&existing_collection)
2122                .expect("existing collection missing");
2123
2124            // A higher level should already be asserting this, but let's make sure.
2125            assert_none!(existing.primary);
2126
2127            // The existing version of the table will depend on the new version.
2128            existing.primary = Some(new_collection);
2129            existing.storage_dependencies.push(new_collection);
2130
2131            // Copy over the frontiers from the previous version.
2132            // The new table starts with two holds - the implied capability, and the hold from
2133            // the previous version - both at the previous version's read frontier.
2134            let implied_capability = existing.read_capabilities.frontier().to_owned();
2135            let write_frontier = existing.write_frontier.clone();
2136
2137            // Determine the relevant read capabilities on the new collection.
2138            //
2139            // Note(parkmycar): Originally we used `install_collection_dependency_read_holds_inner`
2140            // here, but that only installed a ReadHold on the new collection for the implied
2141            // capability of the existing collection. This would cause runtime panics because it
2142            // would eventually result in negative read capabilities.
2143            let mut changes = ChangeBatch::new();
2144            changes.extend(implied_capability.iter().map(|t| (*t, 1)));
2145
2146            // Note: The new collection is now the "primary collection".
2147            let collection_meta = CollectionMetadata {
2148                persist_location: self.persist_location.clone(),
2149                relation_desc: new_desc.clone(),
2150                data_shard,
2151                txns_shard: Some(self.txns_read.txns_id().clone()),
2152            };
2153            let collection_state = CollectionState::new(
2154                None,
2155                existing.time_dependence.clone(),
2156                existing.ingestion_remap_collection_id.clone(),
2157                implied_capability,
2158                write_frontier,
2159                Vec::new(),
2160                collection_meta,
2161            );
2162
2163            // Add a record of the new collection.
2164            self_collections.insert(new_collection, collection_state);
2165
2166            let mut updates = BTreeMap::from([(new_collection, changes)]);
2167            StorageCollectionsImpl::update_read_capabilities_inner(
2168                &self.cmd_tx,
2169                &mut *self_collections,
2170                &mut updates,
2171            );
2172        };
2173
2174        // TODO(alter_table): Support changes to sources.
2175        self.register_handles(new_collection, true, since_handle, write_handle);
2176
2177        info!(%existing_collection, %new_collection, ?new_desc, "altered table");
2178
2179        Ok(())
2180    }
2181
2182    fn drop_collections_unvalidated(
2183        &self,
2184        storage_metadata: &StorageMetadata,
2185        identifiers: Vec<GlobalId>,
2186    ) {
2187        debug!(?identifiers, "drop_collections_unvalidated");
2188
2189        let mut self_collections = self.collections.lock().expect("lock poisoned");
2190
2191        // Policies that advance the since to the empty antichain. We do still
2192        // honor outstanding read holds, and collections will only be dropped
2193        // once those are removed as well.
2194        //
2195        // We don't explicitly remove read capabilities! Downgrading the
2196        // frontier of the source to `[]` (the empty Antichain), will propagate
2197        // to the storage dependencies.
2198        let mut finalized_policies = Vec::new();
2199
2200        for id in identifiers {
2201            // Make sure it's still there, might already have been deleted.
2202            let Some(collection) = self_collections.get(&id) else {
2203                continue;
2204            };
2205
2206            // Unless the collection has a primary, its shard must have been previously removed
2207            // by `StorageCollections::prepare_state`.
2208            if collection.primary.is_none() {
2209                let metadata = storage_metadata.get_collection_shard(id);
2210                mz_ore::soft_assert_or_log!(
2211                    matches!(metadata, Err(StorageError::IdentifierMissing(_))),
2212                    "dropping {id}, but drop was not synchronized with storage \
2213                     controller via `prepare_state`"
2214                );
2215            }
2216
2217            finalized_policies.push((id, ReadPolicy::ValidFrom(Antichain::new())));
2218        }
2219
2220        self.set_read_policies_inner(&mut self_collections, finalized_policies);
2221
2222        drop(self_collections);
2223
2224        self.synchronize_finalized_shards(storage_metadata);
2225    }
2226
2227    fn set_read_policies(&self, policies: Vec<(GlobalId, ReadPolicy)>) {
2228        let mut collections = self.collections.lock().expect("lock poisoned");
2229
2230        if tracing::enabled!(tracing::Level::TRACE) {
2231            let user_capabilities = collections
2232                .iter_mut()
2233                .filter(|(id, _c)| id.is_user())
2234                .map(|(id, c)| {
2235                    let updates = c.read_capabilities.updates().cloned().collect_vec();
2236                    (*id, c.implied_capability.clone(), updates)
2237                })
2238                .collect_vec();
2239
2240            trace!(?policies, ?user_capabilities, "set_read_policies");
2241        }
2242
2243        self.set_read_policies_inner(&mut collections, policies);
2244
2245        if tracing::enabled!(tracing::Level::TRACE) {
2246            let user_capabilities = collections
2247                .iter_mut()
2248                .filter(|(id, _c)| id.is_user())
2249                .map(|(id, c)| {
2250                    let updates = c.read_capabilities.updates().cloned().collect_vec();
2251                    (*id, c.implied_capability.clone(), updates)
2252                })
2253                .collect_vec();
2254
2255            trace!(?user_capabilities, "after! set_read_policies");
2256        }
2257    }
2258
2259    fn acquire_read_holds(
2260        &self,
2261        desired_holds: Vec<GlobalId>,
2262    ) -> Result<Vec<ReadHold>, CollectionMissing> {
2263        if desired_holds.is_empty() {
2264            return Ok(vec![]);
2265        }
2266
2267        let mut collections = self.collections.lock().expect("lock poisoned");
2268
2269        let mut advanced_holds = Vec::new();
2270        // We advance the holds by our current since frontier. Can't acquire
2271        // holds for times that have been compacted away!
2272        //
2273        // NOTE: We acquire read holds at the earliest possible time rather than
2274        // at the implied capability. This is so that, for example, adapter can
2275        // acquire a read hold to hold back the frontier, giving the COMPUTE
2276        // controller a chance to also acquire a read hold at that early
2277        // frontier. If/when we change the interplay between adapter and COMPUTE
2278        // to pass around ReadHold tokens, we might tighten this up and instead
2279        // acquire read holds at the implied capability.
2280        for id in desired_holds.iter() {
2281            let collection = collections.get(id).ok_or(CollectionMissing(*id))?;
2282            let since = collection.read_capabilities.frontier().to_owned();
2283            advanced_holds.push((*id, since));
2284        }
2285
2286        let mut updates = advanced_holds
2287            .iter()
2288            .map(|(id, hold)| {
2289                let mut changes = ChangeBatch::new();
2290                changes.extend(hold.iter().map(|time| (*time, 1)));
2291                (*id, changes)
2292            })
2293            .collect::<BTreeMap<_, _>>();
2294
2295        StorageCollectionsImpl::update_read_capabilities_inner(
2296            &self.cmd_tx,
2297            &mut collections,
2298            &mut updates,
2299        );
2300
2301        let acquired_holds = advanced_holds
2302            .into_iter()
2303            .map(|(id, since)| ReadHold::with_channel(id, since, self.holds_tx.clone()))
2304            .collect_vec();
2305
2306        trace!(?desired_holds, ?acquired_holds, "acquire_read_holds");
2307
2308        Ok(acquired_holds)
2309    }
2310
2311    /// Determine time dependence information for the object.
2312    fn determine_time_dependence(
2313        &self,
2314        id: GlobalId,
2315    ) -> Result<Option<TimeDependence>, TimeDependenceError> {
2316        use TimeDependenceError::CollectionMissing;
2317        let collections = self.collections.lock().expect("lock poisoned");
2318        let state = collections.get(&id).ok_or(CollectionMissing(id))?;
2319        Ok(state.time_dependence.clone())
2320    }
2321
2322    fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
2323        // Destructure `self` here so we don't forget to consider dumping newly added fields.
2324        let Self {
2325            envd_epoch,
2326            read_only,
2327            finalizable_shards,
2328            finalized_shards,
2329            collections,
2330            txns_read: _,
2331            config,
2332            initial_txn_upper,
2333            persist_location,
2334            persist: _,
2335            cmd_tx: _,
2336            holds_tx: _,
2337            _background_task: _,
2338            _finalize_shards_task: _,
2339        } = self;
2340
2341        let finalizable_shards: Vec<_> = finalizable_shards
2342            .lock()
2343            .iter()
2344            .map(ToString::to_string)
2345            .collect();
2346        let finalized_shards: Vec<_> = finalized_shards
2347            .lock()
2348            .iter()
2349            .map(ToString::to_string)
2350            .collect();
2351        let collections: BTreeMap<_, _> = collections
2352            .lock()
2353            .expect("poisoned")
2354            .iter()
2355            .map(|(id, c)| (id.to_string(), format!("{c:?}")))
2356            .collect();
2357        let config = format!("{:?}", config.lock().expect("poisoned"));
2358
2359        Ok(serde_json::json!({
2360            "envd_epoch": envd_epoch,
2361            "read_only": read_only,
2362            "finalizable_shards": finalizable_shards,
2363            "finalized_shards": finalized_shards,
2364            "collections": collections,
2365            "config": config,
2366            "initial_txn_upper": initial_txn_upper,
2367            "persist_location": format!("{persist_location:?}"),
2368        }))
2369    }
2370}
2371
2372/// Wraps either a "critical" [SinceHandle] or a leased [ReadHandle].
2373///
2374/// When a [StorageCollections] is in read-only mode, we will only ever acquire
2375/// [ReadHandle], because acquiring the [SinceHandle] and driving forward its
2376/// since is considered a write. Conversely, when in read-write mode, we acquire
2377/// [SinceHandle].
2378#[derive(Debug)]
2379enum SinceHandleWrapper {
2380    Critical(SinceHandle<SourceData, (), Timestamp, StorageDiff>),
2381    Leased(ReadHandle<SourceData, (), Timestamp, StorageDiff>),
2382}
2383
2384impl SinceHandleWrapper {
2385    pub fn since(&self) -> &Antichain<Timestamp> {
2386        match self {
2387            Self::Critical(handle) => handle.since(),
2388            Self::Leased(handle) => handle.since(),
2389        }
2390    }
2391
2392    pub fn opaque(&self) -> PersistEpoch {
2393        match self {
2394            Self::Critical(handle) => handle.opaque().decode(),
2395            Self::Leased(_handle) => {
2396                // The opaque is expected to be used with
2397                // `compare_and_downgrade_since`, and the leased handle doesn't
2398                // have a notion of an opaque. We pretend here and in
2399                // `compare_and_downgrade_since`.
2400                PersistEpoch(None)
2401            }
2402        }
2403    }
2404
2405    pub async fn compare_and_downgrade_since(
2406        &mut self,
2407        expected: &PersistEpoch,
2408        (opaque, since): (&PersistEpoch, &Antichain<Timestamp>),
2409    ) -> Result<Antichain<Timestamp>, PersistEpoch> {
2410        match self {
2411            Self::Critical(handle) => handle
2412                .compare_and_downgrade_since(
2413                    &Opaque::encode(expected),
2414                    (&Opaque::encode(opaque), since),
2415                )
2416                .await
2417                .map_err(|e| e.decode()),
2418            Self::Leased(handle) => {
2419                assert_none!(opaque.0);
2420
2421                handle.downgrade_since(since).await;
2422
2423                Ok(since.clone())
2424            }
2425        }
2426    }
2427
2428    pub async fn maybe_compare_and_downgrade_since(
2429        &mut self,
2430        expected: &PersistEpoch,
2431        (opaque, since): (&PersistEpoch, &Antichain<Timestamp>),
2432    ) -> Option<Result<Antichain<Timestamp>, PersistEpoch>> {
2433        match self {
2434            Self::Critical(handle) => handle
2435                .maybe_compare_and_downgrade_since(
2436                    &Opaque::encode(expected),
2437                    (&Opaque::encode(opaque), since),
2438                )
2439                .await
2440                .map(|r| r.map_err(|o| o.decode())),
2441            Self::Leased(handle) => {
2442                assert_none!(opaque.0);
2443
2444                handle.maybe_downgrade_since(since).await;
2445
2446                Some(Ok(since.clone()))
2447            }
2448        }
2449    }
2450
2451    pub fn snapshot_stats(
2452        &self,
2453        id: GlobalId,
2454        as_of: Option<Antichain<Timestamp>>,
2455    ) -> BoxFuture<'static, Result<SnapshotStats, StorageError>> {
2456        match self {
2457            Self::Critical(handle) => {
2458                let res = handle
2459                    .snapshot_stats(as_of)
2460                    .map(move |x| x.map_err(|_| StorageError::ReadBeforeSince(id)));
2461                Box::pin(res)
2462            }
2463            Self::Leased(handle) => {
2464                let res = handle
2465                    .snapshot_stats(as_of)
2466                    .map(move |x| x.map_err(|_| StorageError::ReadBeforeSince(id)));
2467                Box::pin(res)
2468            }
2469        }
2470    }
2471
2472    pub fn snapshot_stats_from_txn(
2473        &self,
2474        id: GlobalId,
2475        data_snapshot: DataSnapshot<Timestamp>,
2476    ) -> BoxFuture<'static, Result<SnapshotStats, StorageError>> {
2477        match self {
2478            Self::Critical(handle) => Box::pin(
2479                data_snapshot
2480                    .snapshot_stats_from_critical(handle)
2481                    .map(move |x| x.map_err(|_| StorageError::ReadBeforeSince(id))),
2482            ),
2483            Self::Leased(handle) => Box::pin(
2484                data_snapshot
2485                    .snapshot_stats_from_leased(handle)
2486                    .map(move |x| x.map_err(|_| StorageError::ReadBeforeSince(id))),
2487            ),
2488        }
2489    }
2490}
2491
2492/// State maintained about individual collections.
2493#[derive(Debug, Clone)]
2494struct CollectionState {
2495    /// The primary of this collections.
2496    ///
2497    /// Multiple storage collections can point to the same persist shard,
2498    /// possibly with different schemas. In such a configuration, we select one
2499    /// of the involved collections as the primary, who "owns" the persist
2500    /// shard. All other involved collections have a dependency on the primary.
2501    primary: Option<GlobalId>,
2502
2503    /// Description of how this collection's frontier follows time.
2504    time_dependence: Option<TimeDependence>,
2505    /// The ID of the source remap/progress collection, if this is an ingestion.
2506    ingestion_remap_collection_id: Option<GlobalId>,
2507
2508    /// Accumulation of read capabilities for the collection.
2509    ///
2510    /// This accumulation will always contain `self.implied_capability`, but may
2511    /// also contain capabilities held by others who have read dependencies on
2512    /// this collection.
2513    pub read_capabilities: MutableAntichain<Timestamp>,
2514
2515    /// The implicit capability associated with collection creation.  This
2516    /// should never be less than the since of the associated persist
2517    /// collection.
2518    pub implied_capability: Antichain<Timestamp>,
2519
2520    /// The policy to use to downgrade `self.implied_capability`.
2521    pub read_policy: ReadPolicy,
2522
2523    /// Storage identifiers on which this collection depends.
2524    pub storage_dependencies: Vec<GlobalId>,
2525
2526    /// Reported write frontier.
2527    pub write_frontier: Antichain<Timestamp>,
2528
2529    pub collection_metadata: CollectionMetadata,
2530}
2531
2532impl CollectionState {
2533    /// Creates a new collection state, with an initial read policy valid from
2534    /// `since`.
2535    pub fn new(
2536        primary: Option<GlobalId>,
2537        time_dependence: Option<TimeDependence>,
2538        ingestion_remap_collection_id: Option<GlobalId>,
2539        since: Antichain<Timestamp>,
2540        write_frontier: Antichain<Timestamp>,
2541        storage_dependencies: Vec<GlobalId>,
2542        metadata: CollectionMetadata,
2543    ) -> Self {
2544        let mut read_capabilities = MutableAntichain::new();
2545        read_capabilities.update_iter(since.iter().map(|time| (*time, 1)));
2546        Self {
2547            primary,
2548            time_dependence,
2549            ingestion_remap_collection_id,
2550            read_capabilities,
2551            implied_capability: since.clone(),
2552            read_policy: ReadPolicy::NoPolicy {
2553                initial_since: since,
2554            },
2555            storage_dependencies,
2556            write_frontier,
2557            collection_metadata: metadata,
2558        }
2559    }
2560
2561    /// Returns whether the collection was dropped.
2562    pub fn is_dropped(&self) -> bool {
2563        self.read_capabilities.is_empty()
2564    }
2565}
2566
2567/// A task that keeps persist handles, downgrades sinces when asked,
2568/// periodically gets recent uppers from them, and updates the shard collection
2569/// state when needed.
2570///
2571/// This shares state with [StorageCollectionsImpl] via `Arcs` and channels.
2572#[derive(Debug)]
2573struct BackgroundTask {
2574    config: Arc<Mutex<StorageConfiguration>>,
2575    cmds_tx: mpsc::UnboundedSender<BackgroundCmd>,
2576    cmds_rx: mpsc::UnboundedReceiver<BackgroundCmd>,
2577    holds_rx: mpsc::UnboundedReceiver<(GlobalId, ChangeBatch<Timestamp>)>,
2578    finalizable_shards: Arc<ShardIdSet>,
2579    collections: Arc<std::sync::Mutex<BTreeMap<GlobalId, CollectionState>>>,
2580    // So we know what shard ID corresponds to what global ID, which we need
2581    // when re-enqueing futures for determining the next upper update.
2582    shard_by_id: BTreeMap<GlobalId, ShardId>,
2583    since_handles: BTreeMap<GlobalId, SinceHandleWrapper>,
2584    txns_handle: Option<WriteHandle<SourceData, (), Timestamp, StorageDiff>>,
2585    txns_shards: BTreeSet<GlobalId>,
2586}
2587
2588#[derive(Debug)]
2589enum BackgroundCmd {
2590    Register {
2591        id: GlobalId,
2592        is_in_txns: bool,
2593        write_handle: WriteHandle<SourceData, (), Timestamp, StorageDiff>,
2594        since_handle: SinceHandleWrapper,
2595    },
2596    DowngradeSince(Vec<(GlobalId, Antichain<Timestamp>)>),
2597    SnapshotStats(
2598        GlobalId,
2599        SnapshotStatsAsOf,
2600        oneshot::Sender<SnapshotStatsRes>,
2601    ),
2602}
2603
2604/// A newtype wrapper to hang a Debug impl off of.
2605pub(crate) struct SnapshotStatsRes(BoxFuture<'static, Result<SnapshotStats, StorageError>>);
2606
2607impl Debug for SnapshotStatsRes {
2608    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2609        f.debug_struct("SnapshotStatsRes").finish_non_exhaustive()
2610    }
2611}
2612
2613impl BackgroundTask {
2614    async fn run(&mut self) {
2615        // Futures that fetch the recent upper from all other shards.
2616        let mut upper_futures: FuturesUnordered<
2617            std::pin::Pin<
2618                Box<
2619                    dyn Future<
2620                            Output = (
2621                                GlobalId,
2622                                WriteHandle<SourceData, (), Timestamp, StorageDiff>,
2623                                Antichain<Timestamp>,
2624                            ),
2625                        > + Send,
2626                >,
2627            >,
2628        > = FuturesUnordered::new();
2629
2630        let gen_upper_future =
2631            |id, mut handle: WriteHandle<_, _, _, _>, prev_upper: Antichain<Timestamp>| {
2632                let fut = async move {
2633                    soft_assert_or_log!(
2634                        !prev_upper.is_empty(),
2635                        "cannot await progress when upper is already empty"
2636                    );
2637                    handle.wait_for_upper_past(&prev_upper).await;
2638                    let new_upper = handle.shared_upper();
2639                    (id, handle, new_upper)
2640                };
2641
2642                fut
2643            };
2644
2645        let mut txns_upper_future = match self.txns_handle.take() {
2646            Some(txns_handle) => {
2647                let upper = txns_handle.upper().clone();
2648                let txns_upper_future =
2649                    gen_upper_future(GlobalId::Transient(1), txns_handle, upper);
2650                txns_upper_future.boxed()
2651            }
2652            None => async { std::future::pending().await }.boxed(),
2653        };
2654
2655        loop {
2656            tokio::select! {
2657                (id, handle, upper) = &mut txns_upper_future => {
2658                    trace!("new upper from txns shard: {:?}", upper);
2659                    let mut uppers = Vec::new();
2660                    for id in self.txns_shards.iter() {
2661                        uppers.push((*id, &upper));
2662                    }
2663                    self.update_write_frontiers(&uppers).await;
2664
2665                    let fut = gen_upper_future(id, handle, upper);
2666                    txns_upper_future = fut.boxed();
2667                }
2668                Some((id, handle, upper)) = upper_futures.next() => {
2669                    if id.is_user() {
2670                        trace!("new upper for collection {id}: {:?}", upper);
2671                    }
2672                    let current_shard = self.shard_by_id.get(&id);
2673                    if let Some(shard_id) = current_shard {
2674                        if shard_id == &handle.shard_id() {
2675                            // Still current, so process the update and enqueue
2676                            // again!
2677                            let uppers = &[(id, &upper)];
2678                            self.update_write_frontiers(uppers).await;
2679                            if !upper.is_empty() {
2680                                let fut = gen_upper_future(id, handle, upper);
2681                                upper_futures.push(fut.boxed());
2682                            }
2683                        } else {
2684                            // Be polite and expire the write handle. This can
2685                            // happen when we get an upper update for a write
2686                            // handle that has since been replaced via Update.
2687                            handle.expire().await;
2688                        }
2689                    }
2690                }
2691                cmd = self.cmds_rx.recv() => {
2692                    let Some(cmd) = cmd else {
2693                        // We're done!
2694                        break;
2695                    };
2696
2697                    // Drain all commands so we can merge `DowngradeSince` requests. Without this
2698                    // optimization, downgrading sinces could fall behind in the face of a large
2699                    // amount of storage collections.
2700                    let commands = iter::once(cmd).chain(
2701                        iter::from_fn(|| self.cmds_rx.try_recv().ok())
2702                    );
2703                    let mut downgrades = BTreeMap::<_, Antichain<_>>::new();
2704                    for cmd in commands {
2705                        match cmd {
2706                            BackgroundCmd::Register{
2707                                id,
2708                                is_in_txns,
2709                                write_handle,
2710                                since_handle
2711                            } => {
2712                                debug!("registering handles for {}", id);
2713                                let previous = self.shard_by_id.insert(id, write_handle.shard_id());
2714                                if previous.is_some() {
2715                                    panic!("already registered a WriteHandle for collection {id}");
2716                                }
2717
2718                                let previous = self.since_handles.insert(id, since_handle);
2719                                if previous.is_some() {
2720                                    panic!("already registered a SinceHandle for collection {id}");
2721                                }
2722
2723                                if is_in_txns {
2724                                    self.txns_shards.insert(id);
2725                                } else {
2726                                    let upper = write_handle.upper().clone();
2727                                    if !upper.is_empty() {
2728                                        let fut = gen_upper_future(id, write_handle, upper);
2729                                        upper_futures.push(fut.boxed());
2730                                    }
2731                                }
2732                            }
2733                            BackgroundCmd::DowngradeSince(cmds) => {
2734                                for (id, new) in cmds {
2735                                    downgrades.entry(id)
2736                                        .and_modify(|since| since.join_assign(&new))
2737                                        .or_insert(new);
2738                                }
2739                            }
2740                            BackgroundCmd::SnapshotStats(id, as_of, tx) => {
2741                                // NB: The requested as_of could be arbitrarily far
2742                                // in the future. So, in order to avoid blocking
2743                                // this loop until it's available and the
2744                                // `snapshot_stats` call resolves, instead return
2745                                // the future to the caller and await it there.
2746                                let res = match self.since_handles.get(&id) {
2747                                    Some(x) => {
2748                                        let fut: BoxFuture<
2749                                            'static,
2750                                            Result<SnapshotStats, StorageError>,
2751                                        > = match as_of {
2752                                            SnapshotStatsAsOf::Direct(as_of) => {
2753                                                x.snapshot_stats(id, Some(as_of))
2754                                            }
2755                                            SnapshotStatsAsOf::Txns(data_snapshot) => {
2756                                                x.snapshot_stats_from_txn(id, data_snapshot)
2757                                            }
2758                                        };
2759                                        SnapshotStatsRes(fut)
2760                                    }
2761                                    None => SnapshotStatsRes(Box::pin(futures::future::ready(Err(
2762                                        StorageError::IdentifierMissing(id),
2763                                    )))),
2764                                };
2765                                // It's fine if the listener hung up.
2766                                let _ = tx.send(res);
2767                            }
2768                        }
2769                    }
2770
2771                    if !downgrades.is_empty() {
2772                        self.downgrade_sinces(downgrades).await;
2773                    }
2774                }
2775                Some(holds_changes) = self.holds_rx.recv() => {
2776                    let mut batched_changes = BTreeMap::new();
2777                    batched_changes.insert(holds_changes.0, holds_changes.1);
2778
2779                    while let Ok(mut holds_changes) = self.holds_rx.try_recv() {
2780                        let entry = batched_changes.entry(holds_changes.0);
2781                        entry
2782                            .and_modify(|existing| existing.extend(holds_changes.1.drain()))
2783                            .or_insert_with(|| holds_changes.1);
2784                    }
2785
2786                    let mut collections = self.collections.lock().expect("lock poisoned");
2787
2788                    let user_changes = batched_changes
2789                        .iter()
2790                        .filter(|(id, _c)| id.is_user())
2791                        .map(|(id, c)| {
2792                            (id.clone(), c.clone())
2793                        })
2794                        .collect_vec();
2795
2796                    if !user_changes.is_empty() {
2797                        trace!(?user_changes, "applying holds changes from channel");
2798                    }
2799
2800                    StorageCollectionsImpl::update_read_capabilities_inner(
2801                        &self.cmds_tx,
2802                        &mut collections,
2803                        &mut batched_changes,
2804                    );
2805                }
2806            }
2807        }
2808
2809        warn!("BackgroundTask shutting down");
2810    }
2811
2812    #[instrument(level = "debug")]
2813    async fn update_write_frontiers(&self, updates: &[(GlobalId, &Antichain<Timestamp>)]) {
2814        let mut read_capability_changes = BTreeMap::default();
2815
2816        let mut self_collections = self.collections.lock().expect("lock poisoned");
2817
2818        for (id, new_upper) in updates.iter() {
2819            let collection = if let Some(c) = self_collections.get_mut(id) {
2820                c
2821            } else {
2822                trace!(
2823                    "Reference to absent collection {id}, due to concurrent removal of that collection"
2824                );
2825                continue;
2826            };
2827
2828            if PartialOrder::less_than(&collection.write_frontier, *new_upper) {
2829                collection.write_frontier.clone_from(new_upper);
2830            }
2831
2832            let mut new_read_capability = collection
2833                .read_policy
2834                .frontier(collection.write_frontier.borrow());
2835
2836            if id.is_user() {
2837                trace!(
2838                    %id,
2839                    implied_capability = ?collection.implied_capability,
2840                    policy = ?collection.read_policy,
2841                    write_frontier = ?collection.write_frontier,
2842                    ?new_read_capability,
2843                    "update_write_frontiers");
2844            }
2845
2846            if PartialOrder::less_equal(&collection.implied_capability, &new_read_capability) {
2847                let mut update = ChangeBatch::new();
2848                update.extend(new_read_capability.iter().map(|time| (*time, 1)));
2849                std::mem::swap(&mut collection.implied_capability, &mut new_read_capability);
2850                update.extend(new_read_capability.iter().map(|time| (*time, -1)));
2851
2852                if !update.is_empty() {
2853                    read_capability_changes.insert(*id, update);
2854                }
2855            }
2856        }
2857
2858        if !read_capability_changes.is_empty() {
2859            StorageCollectionsImpl::update_read_capabilities_inner(
2860                &self.cmds_tx,
2861                &mut self_collections,
2862                &mut read_capability_changes,
2863            );
2864        }
2865    }
2866
2867    async fn downgrade_sinces(&mut self, cmds: BTreeMap<GlobalId, Antichain<Timestamp>>) {
2868        // Process all persist calls concurrently.
2869        let mut futures = Vec::with_capacity(cmds.len());
2870        for (id, new_since) in cmds {
2871            // We need to take the since handles here, to satisfy the borrow checker.
2872            // We make sure to always put them back below.
2873            let Some(mut since_handle) = self.since_handles.remove(&id) else {
2874                // This can happen when someone concurrently drops a collection.
2875                trace!("downgrade_sinces: reference to absent collection {id}");
2876                continue;
2877            };
2878
2879            let fut = async move {
2880                if id.is_user() {
2881                    trace!("downgrading since of {} to {:?}", id, new_since);
2882                }
2883
2884                let epoch = since_handle.opaque().clone();
2885                let result = if new_since.is_empty() {
2886                    // A shard's since reaching the empty frontier is a prereq for
2887                    // being able to finalize a shard, so the final downgrade should
2888                    // never be rate-limited.
2889                    Some(
2890                        since_handle
2891                            .compare_and_downgrade_since(&epoch, (&epoch, &new_since))
2892                            .await,
2893                    )
2894                } else {
2895                    since_handle
2896                        .maybe_compare_and_downgrade_since(&epoch, (&epoch, &new_since))
2897                        .await
2898                };
2899                (id, since_handle, result)
2900            };
2901            futures.push(fut);
2902        }
2903
2904        for (id, since_handle, result) in futures::future::join_all(futures).await {
2905            let new_since = match result {
2906                Some(Ok(since)) => Some(since),
2907                Some(Err(other_epoch)) => mz_ore::halt!(
2908                    "fenced by envd @ {other_epoch:?}. ours = {:?}",
2909                    since_handle.opaque(),
2910                ),
2911                None => None,
2912            };
2913
2914            self.since_handles.insert(id, since_handle);
2915
2916            if new_since.is_some_and(|s| s.is_empty()) {
2917                info!(%id, "removing persist handles because the since advanced to []!");
2918
2919                let _since_handle = self.since_handles.remove(&id).expect("known to exist");
2920                let Some(dropped_shard_id) = self.shard_by_id.remove(&id) else {
2921                    panic!("missing GlobalId -> ShardId mapping for id {id}");
2922                };
2923
2924                // We're not responsible for writes to tables, so we also don't
2925                // de-register them from the txn system. Whoever is responsible
2926                // will remove them. We only make sure to remove the table from
2927                // our tracking.
2928                self.txns_shards.remove(&id);
2929
2930                if self
2931                    .config
2932                    .lock()
2933                    .expect("lock poisoned")
2934                    .parameters
2935                    .finalize_shards
2936                {
2937                    info!(
2938                        %id, %dropped_shard_id,
2939                        "enqueuing shard finalization due to dropped collection and dropped \
2940                         persist handle",
2941                    );
2942                    self.finalizable_shards.lock().insert(dropped_shard_id);
2943                } else {
2944                    info!(
2945                        "not triggering shard finalization due to dropped storage object \
2946                         because enable_storage_shard_finalization parameter is false"
2947                    );
2948                }
2949            }
2950        }
2951    }
2952}
2953
2954struct FinalizeShardsTaskConfig {
2955    envd_epoch: NonZeroI64,
2956    config: Arc<Mutex<StorageConfiguration>>,
2957    metrics: StorageCollectionsMetrics,
2958    finalizable_shards: Arc<ShardIdSet>,
2959    finalized_shards: Arc<ShardIdSet>,
2960    persist_location: PersistLocation,
2961    persist: Arc<PersistClientCache>,
2962    read_only: bool,
2963}
2964
2965async fn finalize_shards_task(
2966    FinalizeShardsTaskConfig {
2967        envd_epoch,
2968        config,
2969        metrics,
2970        finalizable_shards,
2971        finalized_shards,
2972        persist_location,
2973        persist,
2974        read_only,
2975    }: FinalizeShardsTaskConfig,
2976) {
2977    if read_only {
2978        info!("disabling shard finalization in read only mode");
2979        return;
2980    }
2981
2982    let mut interval = tokio::time::interval(Duration::from_secs(5));
2983    interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
2984    loop {
2985        interval.tick().await;
2986
2987        if !config
2988            .lock()
2989            .expect("lock poisoned")
2990            .parameters
2991            .finalize_shards
2992        {
2993            debug!(
2994                "not triggering shard finalization due to dropped storage object because enable_storage_shard_finalization parameter is false"
2995            );
2996            continue;
2997        }
2998
2999        let current_finalizable_shards = {
3000            // We hold the lock for as short as possible and pull our cloned set
3001            // of shards.
3002            finalizable_shards.lock().iter().cloned().collect_vec()
3003        };
3004
3005        if current_finalizable_shards.is_empty() {
3006            debug!("no shards to finalize");
3007            continue;
3008        }
3009
3010        debug!(?current_finalizable_shards, "attempting to finalize shards");
3011
3012        // Open a persist client to delete unused shards.
3013        let persist_client = persist.open(persist_location.clone()).await.unwrap();
3014
3015        let metrics = &metrics;
3016        let finalizable_shards = &finalizable_shards;
3017        let finalized_shards = &finalized_shards;
3018        let persist_client = &persist_client;
3019        let diagnostics = &Diagnostics::from_purpose("finalizing shards");
3020
3021        let force_downgrade_since = STORAGE_DOWNGRADE_SINCE_DURING_FINALIZATION
3022            .get(config.lock().expect("lock poisoned").config_set());
3023
3024        let epoch = &PersistEpoch::from(envd_epoch);
3025
3026        futures::stream::iter(current_finalizable_shards.clone())
3027            .map(|shard_id| async move {
3028                let persist_client = persist_client.clone();
3029                let diagnostics = diagnostics.clone();
3030                let epoch = epoch.clone();
3031
3032                metrics.finalization_started.inc();
3033
3034                let is_finalized = persist_client
3035                    .is_finalized::<SourceData, (), Timestamp, StorageDiff>(shard_id, diagnostics)
3036                    .await
3037                    .expect("invalid persist usage");
3038
3039                if is_finalized {
3040                    debug!(%shard_id, "shard is already finalized!");
3041                    Some(shard_id)
3042                } else {
3043                    debug!(%shard_id, "finalizing shard");
3044                    let finalize = || async move {
3045                        // TODO: thread the global ID into the shard finalization WAL
3046                        let diagnostics = Diagnostics::from_purpose("finalizing shards");
3047
3048                        // We only use the writer to advance the upper, so using a dummy schema is
3049                        // fine.
3050                        let mut write_handle: WriteHandle<SourceData, (), Timestamp, StorageDiff> =
3051                            persist_client
3052                                .open_writer(
3053                                    shard_id,
3054                                    Arc::new(RelationDesc::empty()),
3055                                    Arc::new(UnitSchema),
3056                                    diagnostics,
3057                                )
3058                                .await
3059                                .expect("invalid persist usage");
3060                        write_handle.advance_upper(&Antichain::new()).await;
3061                        write_handle.expire().await;
3062
3063                        if force_downgrade_since {
3064                            let our_opaque = Opaque::encode(&epoch);
3065                            let mut since_handle: SinceHandle<
3066                                SourceData,
3067                                (),
3068                                Timestamp,
3069                                StorageDiff,
3070                            > = persist_client
3071                                .open_critical_since(
3072                                    shard_id,
3073                                    PersistClient::CONTROLLER_CRITICAL_SINCE,
3074                                    our_opaque.clone(),
3075                                    Diagnostics::from_purpose("finalizing shards"),
3076                                )
3077                                .await
3078                                .expect("invalid persist usage");
3079                            let handle_opaque = since_handle.opaque().clone();
3080                            let opaque = if our_opaque.codec_name() == handle_opaque.codec_name()
3081                                && epoch.0 > handle_opaque.decode::<PersistEpoch>().0
3082                            {
3083                                // We're newer, but it's fine to use the
3084                                // handle's old epoch to try and downgrade.
3085                                handle_opaque
3086                            } else {
3087                                // Good luck, buddy! The downgrade below will
3088                                // not succeed. There's a process with a newer
3089                                // epoch out there and someone at some juncture
3090                                // will fence out this process.
3091                                // TODO: consider applying the downgrade no matter what!
3092                                our_opaque
3093                            };
3094                            let new_since = Antichain::new();
3095                            let downgrade = since_handle
3096                                .compare_and_downgrade_since(&opaque, (&opaque, &new_since))
3097                                .await;
3098                            if let Err(e) = downgrade {
3099                                warn!("tried to finalize a shard with an advancing epoch: {e:?}");
3100                                return Ok(());
3101                            }
3102                            // Not available now, so finalization is broken.
3103                            // since_handle.expire().await;
3104                        }
3105
3106                        persist_client
3107                            .finalize_shard::<SourceData, (), Timestamp, StorageDiff>(
3108                                shard_id,
3109                                Diagnostics::from_purpose("finalizing shards"),
3110                            )
3111                            .await
3112                    };
3113
3114                    match finalize().await {
3115                        Err(e) => {
3116                            // Rather than error, just leave this shard as
3117                            // one to finalize later.
3118                            warn!("error during finalization of shard {shard_id}: {e:?}");
3119                            None
3120                        }
3121                        Ok(()) => {
3122                            debug!(%shard_id, "finalize success!");
3123                            Some(shard_id)
3124                        }
3125                    }
3126                }
3127            })
3128            // Poll each future for each collection concurrently, maximum of 10
3129            // at a time.
3130            // TODO(benesch): the concurrency here should be configurable
3131            // via LaunchDarkly.
3132            .buffer_unordered(10)
3133            // HERE BE DRAGONS: see warning on other uses of buffer_unordered.
3134            // The closure passed to `for_each` must remain fast or we risk
3135            // starving the finalization futures of calls to `poll`.
3136            .for_each(|shard_id| async move {
3137                match shard_id {
3138                    None => metrics.finalization_failed.inc(),
3139                    Some(shard_id) => {
3140                        // We make successfully finalized shards available for
3141                        // removal from the finalization WAL one by one, so that
3142                        // a handful of stuck shards don't prevent us from
3143                        // removing the shards that have made progress. The
3144                        // overhead of repeatedly acquiring and releasing the
3145                        // locks is negligible.
3146                        {
3147                            let mut finalizable_shards = finalizable_shards.lock();
3148                            let mut finalized_shards = finalized_shards.lock();
3149                            finalizable_shards.remove(&shard_id);
3150                            finalized_shards.insert(shard_id);
3151                        }
3152
3153                        metrics.finalization_succeeded.inc();
3154                    }
3155                }
3156            })
3157            .await;
3158
3159        debug!("done finalizing shards");
3160    }
3161}
3162
3163#[derive(Debug)]
3164pub(crate) enum SnapshotStatsAsOf {
3165    /// Stats for a shard with an "eager" upper (one that continually advances
3166    /// as time passes, even if no writes are coming in).
3167    Direct(Antichain<Timestamp>),
3168    /// Stats for a shard with a "lazy" upper (one that only physically advances
3169    /// in response to writes).
3170    Txns(DataSnapshot<Timestamp>),
3171}
3172
3173#[cfg(test)]
3174mod tests {
3175    use std::str::FromStr;
3176    use std::sync::Arc;
3177
3178    use mz_build_info::DUMMY_BUILD_INFO;
3179    use mz_dyncfg::ConfigSet;
3180    use mz_ore::assert_err;
3181    use mz_ore::metrics::{MetricsRegistry, UIntGauge};
3182    use mz_ore::now::SYSTEM_TIME;
3183    use mz_ore::url::SensitiveUrl;
3184    use mz_persist_client::cache::PersistClientCache;
3185    use mz_persist_client::cfg::PersistConfig;
3186    use mz_persist_client::rpc::PubSubClientConnection;
3187    use mz_persist_client::{Diagnostics, PersistClient, PersistLocation, ShardId};
3188    use mz_persist_types::codec_impls::UnitSchema;
3189    use mz_repr::{RelationDesc, Row};
3190    use mz_secrets::InMemorySecretsController;
3191
3192    use super::*;
3193
3194    #[mz_ore::test(tokio::test)]
3195    #[cfg_attr(miri, ignore)] // unsupported operation: integer-to-pointer casts and `ptr::from_exposed_addr`
3196    async fn test_snapshot_stats(&self) {
3197        let persist_location = PersistLocation {
3198            blob_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
3199            consensus_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
3200        };
3201        let persist_client = PersistClientCache::new(
3202            PersistConfig::new_default_configs(&DUMMY_BUILD_INFO, SYSTEM_TIME.clone()),
3203            &MetricsRegistry::new(),
3204            |_, _| PubSubClientConnection::noop(),
3205        );
3206        let persist_client = Arc::new(persist_client);
3207
3208        let (cmds_tx, mut background_task) =
3209            BackgroundTask::new_for_test(persist_location.clone(), Arc::clone(&persist_client));
3210        let background_task =
3211            mz_ore::task::spawn(|| "storage_collections::background_task", async move {
3212                background_task.run().await
3213            });
3214
3215        let persist = persist_client.open(persist_location).await.unwrap();
3216
3217        let shard_id = ShardId::new();
3218        let since_handle = persist
3219            .open_critical_since(
3220                shard_id,
3221                PersistClient::CONTROLLER_CRITICAL_SINCE,
3222                Opaque::encode(&PersistEpoch::default()),
3223                Diagnostics::for_tests(),
3224            )
3225            .await
3226            .unwrap();
3227        let write_handle = persist
3228            .open_writer::<SourceData, (), mz_repr::Timestamp, StorageDiff>(
3229                shard_id,
3230                Arc::new(RelationDesc::empty()),
3231                Arc::new(UnitSchema),
3232                Diagnostics::for_tests(),
3233            )
3234            .await
3235            .unwrap();
3236
3237        cmds_tx
3238            .send(BackgroundCmd::Register {
3239                id: GlobalId::User(1),
3240                is_in_txns: false,
3241                since_handle: SinceHandleWrapper::Critical(since_handle),
3242                write_handle,
3243            })
3244            .unwrap();
3245
3246        let mut write_handle = persist
3247            .open_writer::<SourceData, (), mz_repr::Timestamp, StorageDiff>(
3248                shard_id,
3249                Arc::new(RelationDesc::empty()),
3250                Arc::new(UnitSchema),
3251                Diagnostics::for_tests(),
3252            )
3253            .await
3254            .unwrap();
3255
3256        // No stats for unknown GlobalId.
3257        let stats =
3258            snapshot_stats(&cmds_tx, GlobalId::User(2), Antichain::from_elem(0.into())).await;
3259        assert_err!(stats);
3260
3261        // Stats don't resolve for as_of past the upper.
3262        let stats_fut = snapshot_stats(&cmds_tx, GlobalId::User(1), Antichain::from_elem(1.into()));
3263        assert_none!(stats_fut.now_or_never());
3264
3265        // // Call it again because now_or_never consumed our future and it's not clone-able.
3266        let stats_ts1_fut =
3267            snapshot_stats(&cmds_tx, GlobalId::User(1), Antichain::from_elem(1.into()));
3268
3269        // Write some data.
3270        let data = (
3271            (SourceData(Ok(Row::default())), ()),
3272            mz_repr::Timestamp::from(0),
3273            1i64,
3274        );
3275        let () = write_handle
3276            .compare_and_append(
3277                &[data],
3278                Antichain::from_elem(0.into()),
3279                Antichain::from_elem(1.into()),
3280            )
3281            .await
3282            .unwrap()
3283            .unwrap();
3284
3285        // Verify that we can resolve stats for ts 0 while the ts 1 stats call is outstanding.
3286        let stats = snapshot_stats(&cmds_tx, GlobalId::User(1), Antichain::from_elem(0.into()))
3287            .await
3288            .unwrap();
3289        assert_eq!(stats.num_updates, 1);
3290
3291        // Write more data and unblock the ts 1 call
3292        let data = (
3293            (SourceData(Ok(Row::default())), ()),
3294            mz_repr::Timestamp::from(1),
3295            1i64,
3296        );
3297        let () = write_handle
3298            .compare_and_append(
3299                &[data],
3300                Antichain::from_elem(1.into()),
3301                Antichain::from_elem(2.into()),
3302            )
3303            .await
3304            .unwrap()
3305            .unwrap();
3306
3307        let stats = stats_ts1_fut.await.unwrap();
3308        assert_eq!(stats.num_updates, 2);
3309
3310        // Make sure it runs until at least here.
3311        drop(background_task);
3312    }
3313
3314    async fn snapshot_stats(
3315        cmds_tx: &mpsc::UnboundedSender<BackgroundCmd>,
3316        id: GlobalId,
3317        as_of: Antichain<Timestamp>,
3318    ) -> Result<SnapshotStats, StorageError> {
3319        let (tx, rx) = oneshot::channel();
3320        cmds_tx
3321            .send(BackgroundCmd::SnapshotStats(
3322                id,
3323                SnapshotStatsAsOf::Direct(as_of),
3324                tx,
3325            ))
3326            .unwrap();
3327        let res = rx.await.expect("BackgroundTask should be live").0;
3328
3329        res.await
3330    }
3331
3332    impl BackgroundTask {
3333        fn new_for_test(
3334            _persist_location: PersistLocation,
3335            _persist_client: Arc<PersistClientCache>,
3336        ) -> (mpsc::UnboundedSender<BackgroundCmd>, Self) {
3337            let (cmds_tx, cmds_rx) = mpsc::unbounded_channel();
3338            let (_holds_tx, holds_rx) = mpsc::unbounded_channel();
3339            let connection_context =
3340                ConnectionContext::for_tests(Arc::new(InMemorySecretsController::new()));
3341
3342            let task = Self {
3343                config: Arc::new(Mutex::new(StorageConfiguration::new(
3344                    connection_context,
3345                    ConfigSet::default(),
3346                ))),
3347                cmds_tx: cmds_tx.clone(),
3348                cmds_rx,
3349                holds_rx,
3350                finalizable_shards: Arc::new(ShardIdSet::new(
3351                    UIntGauge::new("finalizable_shards", "dummy gauge for tests").unwrap(),
3352                )),
3353                collections: Arc::new(Mutex::new(BTreeMap::new())),
3354                shard_by_id: BTreeMap::new(),
3355                since_handles: BTreeMap::new(),
3356                txns_handle: None,
3357                txns_shards: BTreeSet::new(),
3358            };
3359
3360            (cmds_tx, task)
3361        }
3362    }
3363}