mz_storage_client/
storage_collections.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! An abstraction for dealing with storage collections.
11
12use std::cmp::Reverse;
13use std::collections::{BTreeMap, BTreeSet};
14use std::fmt::Debug;
15use std::num::NonZeroI64;
16use std::sync::{Arc, Mutex};
17use std::time::Duration;
18
19use async_trait::async_trait;
20use differential_dataflow::lattice::Lattice;
21use futures::future::BoxFuture;
22use futures::stream::{BoxStream, FuturesUnordered};
23use futures::{Future, FutureExt, StreamExt};
24use itertools::Itertools;
25use mz_ore::collections::CollectionExt;
26use mz_ore::metrics::MetricsRegistry;
27use mz_ore::now::{EpochMillis, NowFn};
28use mz_ore::task::AbortOnDropHandle;
29use mz_ore::{assert_none, instrument, soft_assert_or_log};
30use mz_persist_client::cache::PersistClientCache;
31use mz_persist_client::cfg::USE_CRITICAL_SINCE_SNAPSHOT;
32use mz_persist_client::critical::SinceHandle;
33use mz_persist_client::read::{Cursor, ReadHandle};
34use mz_persist_client::schema::CaESchema;
35use mz_persist_client::stats::{SnapshotPartsStats, SnapshotStats};
36use mz_persist_client::write::WriteHandle;
37use mz_persist_client::{Diagnostics, PersistClient, PersistLocation, ShardId};
38use mz_persist_types::Codec64;
39use mz_persist_types::codec_impls::UnitSchema;
40use mz_persist_types::txn::TxnsCodec;
41use mz_repr::{GlobalId, RelationDesc, RelationVersion, Row, TimestampManipulation};
42use mz_storage_types::StorageDiff;
43use mz_storage_types::configuration::StorageConfiguration;
44use mz_storage_types::connections::ConnectionContext;
45use mz_storage_types::controller::{CollectionMetadata, StorageError, TxnsCodecRow};
46use mz_storage_types::dyncfgs::STORAGE_DOWNGRADE_SINCE_DURING_FINALIZATION;
47use mz_storage_types::errors::CollectionMissing;
48use mz_storage_types::parameters::StorageParameters;
49use mz_storage_types::read_holds::ReadHold;
50use mz_storage_types::read_policy::ReadPolicy;
51use mz_storage_types::sources::{GenericSourceConnection, SourceData, SourceEnvelope, Timeline};
52use mz_storage_types::time_dependence::{TimeDependence, TimeDependenceError};
53use mz_txn_wal::metrics::Metrics as TxnMetrics;
54use mz_txn_wal::txn_read::{DataSnapshot, TxnsRead};
55use mz_txn_wal::txns::TxnsHandle;
56use timely::PartialOrder;
57use timely::order::TotalOrder;
58use timely::progress::frontier::MutableAntichain;
59use timely::progress::{Antichain, ChangeBatch, Timestamp as TimelyTimestamp};
60use tokio::sync::{mpsc, oneshot};
61use tokio::time::MissedTickBehavior;
62use tracing::{debug, info, trace, warn};
63
64use crate::client::TimestamplessUpdateBuilder;
65use crate::controller::{
66    CollectionDescription, DataSource, PersistEpoch, StorageMetadata, StorageTxn,
67};
68use crate::storage_collections::metrics::{ShardIdSet, StorageCollectionsMetrics};
69
70mod metrics;
71
72/// An abstraction for keeping track of storage collections and managing access
73/// to them.
74///
75/// Responsibilities:
76///
77/// - Keeps a critical persist handle for holding the since of collections
78///   where it need to be.
79///
80/// - Drives the since forward based on the upper of a collection and a
81///   [ReadPolicy].
82///
83/// - Hands out [ReadHolds](ReadHold) that prevent a collection's since from
84/// advancing while it needs to be read at a specific time.
85#[async_trait]
86pub trait StorageCollections: Debug + Sync {
87    type Timestamp: TimelyTimestamp;
88
89    /// On boot, reconcile this [StorageCollections] with outside state. We get
90    /// a [StorageTxn] where we can record any durable state that we need.
91    ///
92    /// We get `init_ids`, which tells us about all collections that currently
93    /// exist, so that we can record durable state for those that _we_ don't
94    /// know yet about.
95    async fn initialize_state(
96        &self,
97        txn: &mut (dyn StorageTxn<Self::Timestamp> + Send),
98        init_ids: BTreeSet<GlobalId>,
99    ) -> Result<(), StorageError<Self::Timestamp>>;
100
101    /// Update storage configuration with new parameters.
102    fn update_parameters(&self, config_params: StorageParameters);
103
104    /// Returns the [CollectionMetadata] of the collection identified by `id`.
105    fn collection_metadata(&self, id: GlobalId) -> Result<CollectionMetadata, CollectionMissing>;
106
107    /// Acquire an iterator over [CollectionMetadata] for all active
108    /// collections.
109    ///
110    /// A collection is "active" when it has a non empty frontier of read
111    /// capabilties.
112    fn active_collection_metadatas(&self) -> Vec<(GlobalId, CollectionMetadata)>;
113
114    /// Returns the frontiers of the identified collection.
115    fn collection_frontiers(
116        &self,
117        id: GlobalId,
118    ) -> Result<CollectionFrontiers<Self::Timestamp>, CollectionMissing> {
119        let frontiers = self
120            .collections_frontiers(vec![id])?
121            .expect_element(|| "known to exist");
122
123        Ok(frontiers)
124    }
125
126    /// Atomically gets and returns the frontiers of all the identified
127    /// collections.
128    fn collections_frontiers(
129        &self,
130        id: Vec<GlobalId>,
131    ) -> Result<Vec<CollectionFrontiers<Self::Timestamp>>, CollectionMissing>;
132
133    /// Atomically gets and returns the frontiers of all active collections.
134    ///
135    /// A collection is "active" when it has a non-empty frontier of read
136    /// capabilities.
137    fn active_collection_frontiers(&self) -> Vec<CollectionFrontiers<Self::Timestamp>>;
138
139    /// Checks whether a collection exists under the given `GlobalId`. Returns
140    /// an error if the collection does not exist.
141    fn check_exists(&self, id: GlobalId) -> Result<(), StorageError<Self::Timestamp>>;
142
143    /// Returns aggregate statistics about the contents of the local input named
144    /// `id` at `as_of`.
145    async fn snapshot_stats(
146        &self,
147        id: GlobalId,
148        as_of: Antichain<Self::Timestamp>,
149    ) -> Result<SnapshotStats, StorageError<Self::Timestamp>>;
150
151    /// Returns aggregate statistics about the contents of the local input named
152    /// `id` at `as_of`.
153    ///
154    /// Note that this async function itself returns a future. We may
155    /// need to block on the stats being available, but don't want to hold a reference
156    /// to the controller for too long... so the outer future holds a reference to the
157    /// controller but returns quickly, and the inner future is slow but does not
158    /// reference the controller.
159    async fn snapshot_parts_stats(
160        &self,
161        id: GlobalId,
162        as_of: Antichain<Self::Timestamp>,
163    ) -> BoxFuture<'static, Result<SnapshotPartsStats, StorageError<Self::Timestamp>>>;
164
165    /// Returns a snapshot of the contents of collection `id` at `as_of`.
166    fn snapshot(
167        &self,
168        id: GlobalId,
169        as_of: Self::Timestamp,
170    ) -> BoxFuture<'static, Result<Vec<(Row, StorageDiff)>, StorageError<Self::Timestamp>>>;
171
172    /// Returns a snapshot of the contents of collection `id` at the largest
173    /// readable `as_of`.
174    async fn snapshot_latest(
175        &self,
176        id: GlobalId,
177    ) -> Result<Vec<Row>, StorageError<Self::Timestamp>>;
178
179    /// Returns a snapshot of the contents of collection `id` at `as_of`.
180    fn snapshot_cursor(
181        &self,
182        id: GlobalId,
183        as_of: Self::Timestamp,
184    ) -> BoxFuture<'static, Result<SnapshotCursor<Self::Timestamp>, StorageError<Self::Timestamp>>>
185    where
186        Self::Timestamp: Codec64 + TimelyTimestamp + Lattice;
187
188    /// Generates a snapshot of the contents of collection `id` at `as_of` and
189    /// streams out all of the updates in bounded memory.
190    ///
191    /// The output is __not__ consolidated.
192    fn snapshot_and_stream(
193        &self,
194        id: GlobalId,
195        as_of: Self::Timestamp,
196    ) -> BoxFuture<
197        'static,
198        Result<
199            BoxStream<'static, (SourceData, Self::Timestamp, StorageDiff)>,
200            StorageError<Self::Timestamp>,
201        >,
202    >;
203
204    /// Create a [`TimestamplessUpdateBuilder`] that can be used to stage
205    /// updates for the provided [`GlobalId`].
206    fn create_update_builder(
207        &self,
208        id: GlobalId,
209    ) -> BoxFuture<
210        'static,
211        Result<
212            TimestamplessUpdateBuilder<SourceData, (), Self::Timestamp, StorageDiff>,
213            StorageError<Self::Timestamp>,
214        >,
215    >
216    where
217        Self::Timestamp: Lattice + Codec64;
218
219    /// Update the given [`StorageTxn`] with the appropriate metadata given the
220    /// IDs to add and drop.
221    ///
222    /// The data modified in the `StorageTxn` must be made available in all
223    /// subsequent calls that require [`StorageMetadata`] as a parameter.
224    async fn prepare_state(
225        &self,
226        txn: &mut (dyn StorageTxn<Self::Timestamp> + Send),
227        ids_to_add: BTreeSet<GlobalId>,
228        ids_to_drop: BTreeSet<GlobalId>,
229        ids_to_register: BTreeMap<GlobalId, ShardId>,
230    ) -> Result<(), StorageError<Self::Timestamp>>;
231
232    /// Create the collections described by the individual
233    /// [CollectionDescriptions](CollectionDescription).
234    ///
235    /// Each command carries the source id, the source description, and any
236    /// associated metadata needed to ingest the particular source.
237    ///
238    /// This command installs collection state for the indicated sources, and
239    /// they are now valid to use in queries at times beyond the initial `since`
240    /// frontiers. Each collection also acquires a read capability at this
241    /// frontier, which will need to be repeatedly downgraded with
242    /// `allow_compaction()` to permit compaction.
243    ///
244    /// This method is NOT idempotent; It can fail between processing of
245    /// different collections and leave the [StorageCollections] in an
246    /// inconsistent state. It is almost always wrong to do anything but abort
247    /// the process on `Err`.
248    ///
249    /// The `register_ts` is used as the initial timestamp that tables are
250    /// available for reads. (We might later give non-tables the same treatment,
251    /// but hold off on that initially.) Callers must provide a Some if any of
252    /// the collections is a table. A None may be given if none of the
253    /// collections are a table (i.e. all materialized views, sources, etc).
254    ///
255    /// `migrated_storage_collections` is a set of migrated storage collections to be excluded
256    /// from the txn-wal sub-system.
257    async fn create_collections_for_bootstrap(
258        &self,
259        storage_metadata: &StorageMetadata,
260        register_ts: Option<Self::Timestamp>,
261        collections: Vec<(GlobalId, CollectionDescription<Self::Timestamp>)>,
262        migrated_storage_collections: &BTreeSet<GlobalId>,
263    ) -> Result<(), StorageError<Self::Timestamp>>;
264
265    /// Updates the [`RelationDesc`] for the specified table.
266    async fn alter_table_desc(
267        &self,
268        existing_collection: GlobalId,
269        new_collection: GlobalId,
270        new_desc: RelationDesc,
271        expected_version: RelationVersion,
272    ) -> Result<(), StorageError<Self::Timestamp>>;
273
274    /// Drops the read capability for the sources and allows their resources to
275    /// be reclaimed.
276    ///
277    /// TODO(jkosh44): This method does not validate the provided identifiers.
278    /// Currently when the controller starts/restarts it has no durable state.
279    /// That means that it has no way of remembering any past commands sent. In
280    /// the future we plan on persisting state for the controller so that it is
281    /// aware of past commands. Therefore this method is for dropping sources
282    /// that we know to have been previously created, but have been forgotten by
283    /// the controller due to a restart. Once command history becomes durable we
284    /// can remove this method and use the normal `drop_sources`.
285    fn drop_collections_unvalidated(
286        &self,
287        storage_metadata: &StorageMetadata,
288        identifiers: Vec<GlobalId>,
289    );
290
291    /// Assigns a read policy to specific identifiers.
292    ///
293    /// The policies are assigned in the order presented, and repeated
294    /// identifiers should conclude with the last policy. Changing a policy will
295    /// immediately downgrade the read capability if appropriate, but it will
296    /// not "recover" the read capability if the prior capability is already
297    /// ahead of it.
298    ///
299    /// This [StorageCollections] may include its own overrides on these
300    /// policies.
301    ///
302    /// Identifiers not present in `policies` retain their existing read
303    /// policies.
304    fn set_read_policies(&self, policies: Vec<(GlobalId, ReadPolicy<Self::Timestamp>)>);
305
306    /// Acquires and returns the earliest possible read holds for the specified
307    /// collections.
308    fn acquire_read_holds(
309        &self,
310        desired_holds: Vec<GlobalId>,
311    ) -> Result<Vec<ReadHold<Self::Timestamp>>, CollectionMissing>;
312
313    /// Get the time dependence for a storage collection. Returns no value if unknown or if
314    /// the object isn't managed by storage.
315    fn determine_time_dependence(
316        &self,
317        id: GlobalId,
318    ) -> Result<Option<TimeDependence>, TimeDependenceError>;
319
320    /// Returns the state of [`StorageCollections`] formatted as JSON.
321    fn dump(&self) -> Result<serde_json::Value, anyhow::Error>;
322}
323
324/// A cursor over a snapshot, allowing us to read just part of a snapshot in its
325/// consolidated form.
326pub struct SnapshotCursor<T: Codec64 + TimelyTimestamp + Lattice> {
327    // We allocate a temporary read handle for each snapshot, and that handle needs to live at
328    // least as long as the cursor itself, which holds part leases. Bundling them together!
329    pub _read_handle: ReadHandle<SourceData, (), T, StorageDiff>,
330    pub cursor: Cursor<SourceData, (), T, StorageDiff>,
331}
332
333impl<T: Codec64 + TimelyTimestamp + Lattice + Sync> SnapshotCursor<T> {
334    pub async fn next(
335        &mut self,
336    ) -> Option<impl Iterator<Item = (SourceData, T, StorageDiff)> + Sized + '_> {
337        let iter = self.cursor.next().await?;
338        Some(iter.map(|((k, ()), t, d)| (k, t, d)))
339    }
340}
341
342/// Frontiers of the collection identified by `id`.
343#[derive(Debug)]
344pub struct CollectionFrontiers<T> {
345    /// The [GlobalId] of the collection that these frontiers belong to.
346    pub id: GlobalId,
347
348    /// The upper/write frontier of the collection.
349    pub write_frontier: Antichain<T>,
350
351    /// The since frontier that is implied by the collection's existence,
352    /// disregarding any read holds.
353    ///
354    /// Concretely, it is the since frontier that is implied by the combination
355    /// of the `write_frontier` and a [ReadPolicy]. The implied capability is
356    /// derived from the write frontier using the [ReadPolicy].
357    pub implied_capability: Antichain<T>,
358
359    /// The frontier of all oustanding [ReadHolds](ReadHold). This includes the
360    /// implied capability.
361    pub read_capabilities: Antichain<T>,
362}
363
364/// Implementation of [StorageCollections] that is shallow-cloneable and uses a
365/// background task for doing work concurrently, in the background.
366#[derive(Debug, Clone)]
367pub struct StorageCollectionsImpl<
368    T: TimelyTimestamp + Lattice + Codec64 + From<EpochMillis> + TimestampManipulation,
369> {
370    /// The fencing token for this instance of [StorageCollections], and really
371    /// all of the controllers and Coordinator.
372    envd_epoch: NonZeroI64,
373
374    /// Whether or not this [StorageCollections] is in read-only mode.
375    ///
376    /// When in read-only mode, we are not allowed to affect changes to external
377    /// systems, including, for example, acquiring and downgrading critical
378    /// [SinceHandles](SinceHandle)
379    read_only: bool,
380
381    /// The set of [ShardIds](ShardId) that we have to finalize. These will have
382    /// been persisted by the caller of [StorageCollections::prepare_state].
383    finalizable_shards: Arc<ShardIdSet>,
384
385    /// The set of [ShardIds](ShardId) that we have finalized. We keep track of
386    /// shards here until we are given a chance to let our callers know that
387    /// these have been finalized, for example via
388    /// [StorageCollections::prepare_state].
389    finalized_shards: Arc<ShardIdSet>,
390
391    /// Collections maintained by this [StorageCollections].
392    collections: Arc<std::sync::Mutex<BTreeMap<GlobalId, CollectionState<T>>>>,
393
394    /// A shared TxnsCache running in a task and communicated with over a channel.
395    txns_read: TxnsRead<T>,
396
397    /// Storage configuration parameters.
398    config: Arc<Mutex<StorageConfiguration>>,
399
400    /// The upper of the txn shard as it was when we booted. We forward the
401    /// upper of created/registered tables to make sure that their uppers are
402    /// not less than the initially known txn upper.
403    ///
404    /// NOTE: This works around a quirk in how the adapter chooses the as_of of
405    /// existing indexes when bootstrapping, where tables that have an upper
406    /// that is less than the initially known txn upper can lead to indexes that
407    /// cannot hydrate in read-only mode.
408    initial_txn_upper: Antichain<T>,
409
410    /// The persist location where all storage collections are being written to
411    persist_location: PersistLocation,
412
413    /// A persist client used to write to storage collections
414    persist: Arc<PersistClientCache>,
415
416    /// For sending commands to our internal task.
417    cmd_tx: mpsc::UnboundedSender<BackgroundCmd<T>>,
418
419    /// For sending updates about read holds to our internal task.
420    holds_tx: mpsc::UnboundedSender<(GlobalId, ChangeBatch<T>)>,
421
422    /// Handles to tasks we own, making sure they're dropped when we are.
423    _background_task: Arc<AbortOnDropHandle<()>>,
424    _finalize_shards_task: Arc<AbortOnDropHandle<()>>,
425}
426
427// Supporting methods for implementing [StorageCollections].
428//
429// Almost all internal methods that are the backing implementation for a trait
430// method have the `_inner` suffix.
431//
432// We follow a pattern where `_inner` methods get a mutable reference to the
433// shared collections state, and it's the public-facing method that locks the
434// state for the duration of its invocation. This allows calling other `_inner`
435// methods from within `_inner` methods.
436impl<T> StorageCollectionsImpl<T>
437where
438    T: TimelyTimestamp
439        + Lattice
440        + Codec64
441        + From<EpochMillis>
442        + TimestampManipulation
443        + Into<mz_repr::Timestamp>
444        + Sync,
445{
446    /// Creates and returns a new [StorageCollections].
447    ///
448    /// Note that when creating a new [StorageCollections], you must also
449    /// reconcile it with the previous state using
450    /// [StorageCollections::initialize_state],
451    /// [StorageCollections::prepare_state], and
452    /// [StorageCollections::create_collections_for_bootstrap].
453    pub async fn new(
454        persist_location: PersistLocation,
455        persist_clients: Arc<PersistClientCache>,
456        metrics_registry: &MetricsRegistry,
457        _now: NowFn,
458        txns_metrics: Arc<TxnMetrics>,
459        envd_epoch: NonZeroI64,
460        read_only: bool,
461        connection_context: ConnectionContext,
462        txn: &dyn StorageTxn<T>,
463    ) -> Self {
464        let metrics = StorageCollectionsMetrics::register_into(metrics_registry);
465
466        // This value must be already installed because we must ensure it's
467        // durably recorded before it is used, otherwise we risk leaking persist
468        // state.
469        let txns_id = txn
470            .get_txn_wal_shard()
471            .expect("must call prepare initialization before creating StorageCollections");
472
473        let txns_client = persist_clients
474            .open(persist_location.clone())
475            .await
476            .expect("location should be valid");
477
478        // We have to initialize, so that TxnsRead::start() below does not
479        // block.
480        let _txns_handle: TxnsHandle<SourceData, (), T, StorageDiff, PersistEpoch, TxnsCodecRow> =
481            TxnsHandle::open(
482                T::minimum(),
483                txns_client.clone(),
484                txns_client.dyncfgs().clone(),
485                Arc::clone(&txns_metrics),
486                txns_id,
487            )
488            .await;
489
490        // For handing to the background task, for listening to upper updates.
491        let (txns_key_schema, txns_val_schema) = TxnsCodecRow::schemas();
492        let mut txns_write = txns_client
493            .open_writer(
494                txns_id,
495                Arc::new(txns_key_schema),
496                Arc::new(txns_val_schema),
497                Diagnostics {
498                    shard_name: "txns".to_owned(),
499                    handle_purpose: "commit txns".to_owned(),
500                },
501            )
502            .await
503            .expect("txns schema shouldn't change");
504
505        let txns_read = TxnsRead::start::<TxnsCodecRow>(txns_client.clone(), txns_id).await;
506
507        let collections = Arc::new(std::sync::Mutex::new(BTreeMap::default()));
508        let finalizable_shards =
509            Arc::new(ShardIdSet::new(metrics.finalization_outstanding.clone()));
510        let finalized_shards =
511            Arc::new(ShardIdSet::new(metrics.finalization_pending_commit.clone()));
512        let config = Arc::new(Mutex::new(StorageConfiguration::new(
513            connection_context,
514            mz_dyncfgs::all_dyncfgs(),
515        )));
516
517        let initial_txn_upper = txns_write.fetch_recent_upper().await.to_owned();
518
519        let (cmd_tx, cmd_rx) = mpsc::unbounded_channel();
520        let (holds_tx, holds_rx) = mpsc::unbounded_channel();
521        let mut background_task = BackgroundTask {
522            config: Arc::clone(&config),
523            cmds_tx: cmd_tx.clone(),
524            cmds_rx: cmd_rx,
525            holds_rx,
526            collections: Arc::clone(&collections),
527            finalizable_shards: Arc::clone(&finalizable_shards),
528            shard_by_id: BTreeMap::new(),
529            since_handles: BTreeMap::new(),
530            txns_handle: Some(txns_write),
531            txns_shards: Default::default(),
532        };
533
534        let background_task =
535            mz_ore::task::spawn(|| "storage_collections::background_task", async move {
536                background_task.run().await
537            });
538
539        let finalize_shards_task = mz_ore::task::spawn(
540            || "storage_collections::finalize_shards_task",
541            finalize_shards_task::<T>(FinalizeShardsTaskConfig {
542                envd_epoch: envd_epoch.clone(),
543                config: Arc::clone(&config),
544                metrics,
545                finalizable_shards: Arc::clone(&finalizable_shards),
546                finalized_shards: Arc::clone(&finalized_shards),
547                persist_location: persist_location.clone(),
548                persist: Arc::clone(&persist_clients),
549                read_only,
550            }),
551        );
552
553        Self {
554            finalizable_shards,
555            finalized_shards,
556            collections,
557            txns_read,
558            envd_epoch,
559            read_only,
560            config,
561            initial_txn_upper,
562            persist_location,
563            persist: persist_clients,
564            cmd_tx,
565            holds_tx,
566            _background_task: Arc::new(background_task.abort_on_drop()),
567            _finalize_shards_task: Arc::new(finalize_shards_task.abort_on_drop()),
568        }
569    }
570
571    /// Opens a [WriteHandle] and a [SinceHandleWrapper], for holding back the since.
572    ///
573    /// `since` is an optional since that the read handle will be forwarded to
574    /// if it is less than its current since.
575    ///
576    /// This will `halt!` the process if we cannot successfully acquire a
577    /// critical handle with our current epoch.
578    async fn open_data_handles(
579        &self,
580        id: &GlobalId,
581        shard: ShardId,
582        since: Option<&Antichain<T>>,
583        relation_desc: RelationDesc,
584        persist_client: &PersistClient,
585    ) -> (
586        WriteHandle<SourceData, (), T, StorageDiff>,
587        SinceHandleWrapper<T>,
588    ) {
589        let since_handle = if self.read_only {
590            let read_handle = self
591                .open_leased_handle(id, shard, relation_desc.clone(), since, persist_client)
592                .await;
593            SinceHandleWrapper::Leased(read_handle)
594        } else {
595            // We're managing the data for this shard in read-write mode, which would fence out other
596            // processes in read-only mode; it's safe to upgrade the metadata version.
597            persist_client
598                .upgrade_version::<SourceData, (), T, StorageDiff>(
599                    shard,
600                    Diagnostics {
601                        shard_name: id.to_string(),
602                        handle_purpose: format!("controller data for {}", id),
603                    },
604                )
605                .await
606                .expect("invalid persist usage");
607
608            let since_handle = self
609                .open_critical_handle(id, shard, since, persist_client)
610                .await;
611
612            SinceHandleWrapper::Critical(since_handle)
613        };
614
615        let mut write_handle = self
616            .open_write_handle(id, shard, relation_desc, persist_client)
617            .await;
618
619        // N.B.
620        // Fetch the most recent upper for the write handle. Otherwise, this may
621        // be behind the since of the since handle. Its vital this happens AFTER
622        // we create the since handle as it needs to be linearized with that
623        // operation. It may be true that creating the write handle after the
624        // since handle already ensures this, but we do this out of an abundance
625        // of caution.
626        //
627        // Note that this returns the upper, but also sets it on the handle to
628        // be fetched later.
629        write_handle.fetch_recent_upper().await;
630
631        (write_handle, since_handle)
632    }
633
634    /// Opens a write handle for the given `shard`.
635    async fn open_write_handle(
636        &self,
637        id: &GlobalId,
638        shard: ShardId,
639        relation_desc: RelationDesc,
640        persist_client: &PersistClient,
641    ) -> WriteHandle<SourceData, (), T, StorageDiff> {
642        let diagnostics = Diagnostics {
643            shard_name: id.to_string(),
644            handle_purpose: format!("controller data for {}", id),
645        };
646
647        let write = persist_client
648            .open_writer(
649                shard,
650                Arc::new(relation_desc),
651                Arc::new(UnitSchema),
652                diagnostics.clone(),
653            )
654            .await
655            .expect("invalid persist usage");
656
657        write
658    }
659
660    /// Opens a critical since handle for the given `shard`.
661    ///
662    /// `since` is an optional since that the read handle will be forwarded to
663    /// if it is less than its current since.
664    ///
665    /// This will `halt!` the process if we cannot successfully acquire a
666    /// critical handle with our current epoch.
667    async fn open_critical_handle(
668        &self,
669        id: &GlobalId,
670        shard: ShardId,
671        since: Option<&Antichain<T>>,
672        persist_client: &PersistClient,
673    ) -> SinceHandle<SourceData, (), T, StorageDiff, PersistEpoch> {
674        tracing::debug!(%id, ?since, "opening critical handle");
675
676        assert!(
677            !self.read_only,
678            "attempting to open critical SinceHandle in read-only mode"
679        );
680
681        let diagnostics = Diagnostics {
682            shard_name: id.to_string(),
683            handle_purpose: format!("controller data for {}", id),
684        };
685
686        // Construct the handle in a separate block to ensure all error paths
687        // are diverging
688        let since_handle = {
689            // This block's aim is to ensure the handle is in terms of our epoch
690            // by the time we return it.
691            let mut handle: SinceHandle<_, _, _, _, PersistEpoch> = persist_client
692                .open_critical_since(
693                    shard,
694                    PersistClient::CONTROLLER_CRITICAL_SINCE,
695                    diagnostics.clone(),
696                )
697                .await
698                .expect("invalid persist usage");
699
700            // Take the join of the handle's since and the provided `since`;
701            // this lets materialized views express the since at which their
702            // read handles "start."
703            let provided_since = match since {
704                Some(since) => since,
705                None => &Antichain::from_elem(T::minimum()),
706            };
707            let since = handle.since().join(provided_since);
708
709            let our_epoch = self.envd_epoch;
710
711            loop {
712                let current_epoch: PersistEpoch = handle.opaque().clone();
713
714                // Ensure the current epoch is <= our epoch.
715                let unchecked_success = current_epoch.0.map(|e| e <= our_epoch).unwrap_or(true);
716
717                if unchecked_success {
718                    // Update the handle's state so that it is in terms of our
719                    // epoch.
720                    let checked_success = handle
721                        .compare_and_downgrade_since(
722                            &current_epoch,
723                            (&PersistEpoch::from(our_epoch), &since),
724                        )
725                        .await
726                        .is_ok();
727                    if checked_success {
728                        break handle;
729                    }
730                } else {
731                    mz_ore::halt!("fenced by envd @ {current_epoch:?}. ours = {our_epoch}");
732                }
733            }
734        };
735
736        since_handle
737    }
738
739    /// Opens a leased [ReadHandle], for the purpose of holding back a since,
740    /// for the given `shard`.
741    ///
742    /// `since` is an optional since that the read handle will be forwarded to
743    /// if it is less than its current since.
744    async fn open_leased_handle(
745        &self,
746        id: &GlobalId,
747        shard: ShardId,
748        relation_desc: RelationDesc,
749        since: Option<&Antichain<T>>,
750        persist_client: &PersistClient,
751    ) -> ReadHandle<SourceData, (), T, StorageDiff> {
752        tracing::debug!(%id, ?since, "opening leased handle");
753
754        let diagnostics = Diagnostics {
755            shard_name: id.to_string(),
756            handle_purpose: format!("controller data for {}", id),
757        };
758
759        let use_critical_since = false;
760        let mut handle: ReadHandle<_, _, _, _> = persist_client
761            .open_leased_reader(
762                shard,
763                Arc::new(relation_desc),
764                Arc::new(UnitSchema),
765                diagnostics.clone(),
766                use_critical_since,
767            )
768            .await
769            .expect("invalid persist usage");
770
771        // Take the join of the handle's since and the provided `since`;
772        // this lets materialized views express the since at which their
773        // read handles "start."
774        let provided_since = match since {
775            Some(since) => since,
776            None => &Antichain::from_elem(T::minimum()),
777        };
778        let since = handle.since().join(provided_since);
779
780        handle.downgrade_since(&since).await;
781
782        handle
783    }
784
785    fn register_handles(
786        &self,
787        id: GlobalId,
788        is_in_txns: bool,
789        since_handle: SinceHandleWrapper<T>,
790        write_handle: WriteHandle<SourceData, (), T, StorageDiff>,
791    ) {
792        self.send(BackgroundCmd::Register {
793            id,
794            is_in_txns,
795            since_handle,
796            write_handle,
797        });
798    }
799
800    fn send(&self, cmd: BackgroundCmd<T>) {
801        let _ = self.cmd_tx.send(cmd);
802    }
803
804    async fn snapshot_stats_inner(
805        &self,
806        id: GlobalId,
807        as_of: SnapshotStatsAsOf<T>,
808    ) -> Result<SnapshotStats, StorageError<T>> {
809        // TODO: Pull this out of BackgroundTask. Unlike the other methods, the
810        // caller of this one drives it to completion.
811        //
812        // We'd need to either share the critical handle somehow or maybe have
813        // two instances around, one in the worker and one in the
814        // StorageCollections.
815        let (tx, rx) = oneshot::channel();
816        self.send(BackgroundCmd::SnapshotStats(id, as_of, tx));
817        rx.await.expect("BackgroundTask should be live").0.await
818    }
819
820    /// If this identified collection has a dependency, install a read hold on
821    /// it.
822    ///
823    /// This is necessary to ensure that the dependency's since does not advance
824    /// beyond its dependents'.
825    fn install_collection_dependency_read_holds_inner(
826        &self,
827        self_collections: &mut BTreeMap<GlobalId, CollectionState<T>>,
828        id: GlobalId,
829    ) -> Result<(), StorageError<T>> {
830        let (deps, collection_implied_capability) = match self_collections.get(&id) {
831            Some(CollectionState {
832                storage_dependencies: deps,
833                implied_capability,
834                ..
835            }) => (deps.clone(), implied_capability),
836            _ => return Ok(()),
837        };
838
839        for dep in deps.iter() {
840            let dep_collection = self_collections
841                .get(dep)
842                .ok_or(StorageError::IdentifierMissing(id))?;
843
844            mz_ore::soft_assert_or_log!(
845                PartialOrder::less_equal(
846                    &dep_collection.implied_capability,
847                    collection_implied_capability
848                ),
849                "dependency since ({dep}@{:?}) cannot be in advance of dependent's since ({id}@{:?})",
850                dep_collection.implied_capability,
851                collection_implied_capability,
852            );
853        }
854
855        self.install_read_capabilities_inner(
856            self_collections,
857            id,
858            &deps,
859            collection_implied_capability.clone(),
860        )?;
861
862        Ok(())
863    }
864
865    /// Returns the given collection's dependencies.
866    fn determine_collection_dependencies(
867        self_collections: &BTreeMap<GlobalId, CollectionState<T>>,
868        source_id: GlobalId,
869        collection_desc: &CollectionDescription<T>,
870    ) -> Result<Vec<GlobalId>, StorageError<T>> {
871        let mut dependencies = Vec::new();
872
873        if let Some(id) = collection_desc.primary {
874            dependencies.push(id);
875        }
876
877        match &collection_desc.data_source {
878            DataSource::Introspection(_)
879            | DataSource::Webhook
880            | DataSource::Table
881            | DataSource::Progress
882            | DataSource::Other => (),
883            DataSource::IngestionExport {
884                ingestion_id,
885                data_config,
886                ..
887            } => {
888                // Ingestion exports depend on their primary source's remap
889                // collection, except when they use a CDCv2 envelope.
890                let source = self_collections
891                    .get(ingestion_id)
892                    .ok_or(StorageError::IdentifierMissing(*ingestion_id))?;
893                let Some(remap_collection_id) = &source.ingestion_remap_collection_id else {
894                    panic!("SourceExport must refer to a primary source that already exists");
895                };
896
897                match data_config.envelope {
898                    SourceEnvelope::CdcV2 => (),
899                    _ => dependencies.push(*remap_collection_id),
900                }
901            }
902            // Ingestions depend on their remap collection.
903            DataSource::Ingestion(ingestion) => {
904                if ingestion.remap_collection_id != source_id {
905                    dependencies.push(ingestion.remap_collection_id);
906                }
907            }
908            DataSource::Sink { desc } => dependencies.push(desc.sink.from),
909        }
910
911        Ok(dependencies)
912    }
913
914    /// Install read capabilities on the given `storage_dependencies`.
915    #[instrument(level = "debug")]
916    fn install_read_capabilities_inner(
917        &self,
918        self_collections: &mut BTreeMap<GlobalId, CollectionState<T>>,
919        from_id: GlobalId,
920        storage_dependencies: &[GlobalId],
921        read_capability: Antichain<T>,
922    ) -> Result<(), StorageError<T>> {
923        let mut changes = ChangeBatch::new();
924        for time in read_capability.iter() {
925            changes.update(time.clone(), 1);
926        }
927
928        if tracing::span_enabled!(tracing::Level::TRACE) {
929            // Collecting `user_capabilities` is potentially slow, thus only do it when needed.
930            let user_capabilities = self_collections
931                .iter_mut()
932                .filter(|(id, _c)| id.is_user())
933                .map(|(id, c)| {
934                    let updates = c.read_capabilities.updates().cloned().collect_vec();
935                    (*id, c.implied_capability.clone(), updates)
936                })
937                .collect_vec();
938
939            trace!(
940                %from_id,
941                ?storage_dependencies,
942                ?read_capability,
943                ?user_capabilities,
944                "install_read_capabilities_inner");
945        }
946
947        let mut storage_read_updates = storage_dependencies
948            .iter()
949            .map(|id| (*id, changes.clone()))
950            .collect();
951
952        StorageCollectionsImpl::update_read_capabilities_inner(
953            &self.cmd_tx,
954            self_collections,
955            &mut storage_read_updates,
956        );
957
958        if tracing::span_enabled!(tracing::Level::TRACE) {
959            // Collecting `user_capabilities` is potentially slow, thus only do it when needed.
960            let user_capabilities = self_collections
961                .iter_mut()
962                .filter(|(id, _c)| id.is_user())
963                .map(|(id, c)| {
964                    let updates = c.read_capabilities.updates().cloned().collect_vec();
965                    (*id, c.implied_capability.clone(), updates)
966                })
967                .collect_vec();
968
969            trace!(
970                %from_id,
971                ?storage_dependencies,
972                ?read_capability,
973                ?user_capabilities,
974                "after install_read_capabilities_inner!");
975        }
976
977        Ok(())
978    }
979
980    async fn recent_upper(&self, id: GlobalId) -> Result<Antichain<T>, StorageError<T>> {
981        let metadata = &self.collection_metadata(id)?;
982        let persist_client = self
983            .persist
984            .open(metadata.persist_location.clone())
985            .await
986            .unwrap();
987        // Duplicate part of open_data_handles here because we don't need the
988        // fetch_recent_upper call. The pubsub-updated shared_upper is enough.
989        let diagnostics = Diagnostics {
990            shard_name: id.to_string(),
991            handle_purpose: format!("controller data for {}", id),
992        };
993        // NB: Opening a WriteHandle is cheap if it's never used in a
994        // compare_and_append operation.
995        let write = persist_client
996            .open_writer::<SourceData, (), T, StorageDiff>(
997                metadata.data_shard,
998                Arc::new(metadata.relation_desc.clone()),
999                Arc::new(UnitSchema),
1000                diagnostics.clone(),
1001            )
1002            .await
1003            .expect("invalid persist usage");
1004        Ok(write.shared_upper())
1005    }
1006
1007    async fn read_handle_for_snapshot(
1008        persist: Arc<PersistClientCache>,
1009        metadata: &CollectionMetadata,
1010        id: GlobalId,
1011    ) -> Result<ReadHandle<SourceData, (), T, StorageDiff>, StorageError<T>> {
1012        let persist_client = persist
1013            .open(metadata.persist_location.clone())
1014            .await
1015            .unwrap();
1016
1017        // We create a new read handle every time someone requests a snapshot
1018        // and then immediately expire it instead of keeping a read handle
1019        // permanently in our state to avoid having it heartbeat continually.
1020        // The assumption is that calls to snapshot are rare and therefore worth
1021        // it to always create a new handle.
1022        let read_handle = persist_client
1023            .open_leased_reader::<SourceData, (), _, _>(
1024                metadata.data_shard,
1025                Arc::new(metadata.relation_desc.clone()),
1026                Arc::new(UnitSchema),
1027                Diagnostics {
1028                    shard_name: id.to_string(),
1029                    handle_purpose: format!("snapshot {}", id),
1030                },
1031                USE_CRITICAL_SINCE_SNAPSHOT.get(&persist.cfg),
1032            )
1033            .await
1034            .expect("invalid persist usage");
1035        Ok(read_handle)
1036    }
1037
1038    // TODO(petrosagg): This signature is not very useful in the context of partially ordered times
1039    // where the as_of frontier might have multiple elements. In the current form the mutually
1040    // incomparable updates will be accumulated together to a state of the collection that never
1041    // actually existed. We should include the original time in the updates advanced by the as_of
1042    // frontier in the result and let the caller decide what to do with the information.
1043    fn snapshot(
1044        &self,
1045        id: GlobalId,
1046        as_of: T,
1047        txns_read: &TxnsRead<T>,
1048    ) -> BoxFuture<'static, Result<Vec<(Row, StorageDiff)>, StorageError<T>>>
1049    where
1050        T: Codec64 + From<EpochMillis> + TimestampManipulation,
1051    {
1052        let metadata = match self.collection_metadata(id) {
1053            Ok(metadata) => metadata.clone(),
1054            Err(e) => return async { Err(e.into()) }.boxed(),
1055        };
1056        let txns_read = metadata.txns_shard.as_ref().map(|txns_id| {
1057            assert_eq!(txns_id, txns_read.txns_id());
1058            txns_read.clone()
1059        });
1060        let persist = Arc::clone(&self.persist);
1061        async move {
1062            let mut read_handle = Self::read_handle_for_snapshot(persist, &metadata, id).await?;
1063            let contents = match txns_read {
1064                None => {
1065                    // We're not using txn-wal for tables, so we can take a snapshot directly.
1066                    read_handle
1067                        .snapshot_and_fetch(Antichain::from_elem(as_of))
1068                        .await
1069                }
1070                Some(txns_read) => {
1071                    // We _are_ using txn-wal for tables. It advances the physical upper of the
1072                    // shard lazily, so we need to ask it for the snapshot to ensure the read is
1073                    // unblocked.
1074                    //
1075                    // Consider the following scenario:
1076                    // - Table A is written to via txns at time 5
1077                    // - Tables other than A are written to via txns consuming timestamps up to 10
1078                    // - We'd like to read A at 7
1079                    // - The application process of A's txn has advanced the upper to 5+1, but we need
1080                    //   it to be past 7, but the txns shard knows that (5,10) is empty of writes to A
1081                    // - This branch allows it to handle that advancing the physical upper of Table A to
1082                    //   10 (NB but only once we see it get past the write at 5!)
1083                    // - Then we can read it normally.
1084                    txns_read.update_gt(as_of.clone()).await;
1085                    let data_snapshot = txns_read
1086                        .data_snapshot(metadata.data_shard, as_of.clone())
1087                        .await;
1088                    data_snapshot.snapshot_and_fetch(&mut read_handle).await
1089                }
1090            };
1091            match contents {
1092                Ok(contents) => {
1093                    let mut snapshot = Vec::with_capacity(contents.len());
1094                    for ((data, _), _, diff) in contents {
1095                        // TODO(petrosagg): We should accumulate the errors too and let the user
1096                        // interpret the result
1097                        let row = data.0?;
1098                        snapshot.push((row, diff));
1099                    }
1100                    Ok(snapshot)
1101                }
1102                Err(_) => Err(StorageError::ReadBeforeSince(id)),
1103            }
1104        }
1105        .boxed()
1106    }
1107
1108    fn snapshot_and_stream(
1109        &self,
1110        id: GlobalId,
1111        as_of: T,
1112        txns_read: &TxnsRead<T>,
1113    ) -> BoxFuture<'static, Result<BoxStream<'static, (SourceData, T, StorageDiff)>, StorageError<T>>>
1114    {
1115        use futures::stream::StreamExt;
1116
1117        let metadata = match self.collection_metadata(id) {
1118            Ok(metadata) => metadata.clone(),
1119            Err(e) => return async { Err(e.into()) }.boxed(),
1120        };
1121        let txns_read = metadata.txns_shard.as_ref().map(|txns_id| {
1122            assert_eq!(txns_id, txns_read.txns_id());
1123            txns_read.clone()
1124        });
1125        let persist = Arc::clone(&self.persist);
1126
1127        async move {
1128            let mut read_handle = Self::read_handle_for_snapshot(persist, &metadata, id).await?;
1129            let stream = match txns_read {
1130                None => {
1131                    // We're not using txn-wal for tables, so we can take a snapshot directly.
1132                    read_handle
1133                        .snapshot_and_stream(Antichain::from_elem(as_of))
1134                        .await
1135                        .map_err(|_| StorageError::ReadBeforeSince(id))?
1136                        .boxed()
1137                }
1138                Some(txns_read) => {
1139                    txns_read.update_gt(as_of.clone()).await;
1140                    let data_snapshot = txns_read
1141                        .data_snapshot(metadata.data_shard, as_of.clone())
1142                        .await;
1143                    data_snapshot
1144                        .snapshot_and_stream(&mut read_handle)
1145                        .await
1146                        .map_err(|_| StorageError::ReadBeforeSince(id))?
1147                        .boxed()
1148                }
1149            };
1150
1151            // Map our stream, unwrapping Persist internal errors.
1152            let stream = stream.map(|((data, _v), t, d)| (data, t, d)).boxed();
1153            Ok(stream)
1154        }
1155        .boxed()
1156    }
1157
1158    fn set_read_policies_inner(
1159        &self,
1160        collections: &mut BTreeMap<GlobalId, CollectionState<T>>,
1161        policies: Vec<(GlobalId, ReadPolicy<T>)>,
1162    ) {
1163        trace!("set_read_policies: {:?}", policies);
1164
1165        let mut read_capability_changes = BTreeMap::default();
1166
1167        for (id, policy) in policies.into_iter() {
1168            let collection = match collections.get_mut(&id) {
1169                Some(c) => c,
1170                None => {
1171                    panic!("Reference to absent collection {id}");
1172                }
1173            };
1174
1175            let mut new_read_capability = policy.frontier(collection.write_frontier.borrow());
1176
1177            if PartialOrder::less_equal(&collection.implied_capability, &new_read_capability) {
1178                let mut update = ChangeBatch::new();
1179                update.extend(new_read_capability.iter().map(|time| (time.clone(), 1)));
1180                std::mem::swap(&mut collection.implied_capability, &mut new_read_capability);
1181                update.extend(new_read_capability.iter().map(|time| (time.clone(), -1)));
1182                if !update.is_empty() {
1183                    read_capability_changes.insert(id, update);
1184                }
1185            }
1186
1187            collection.read_policy = policy;
1188        }
1189
1190        for (id, changes) in read_capability_changes.iter() {
1191            if id.is_user() {
1192                trace!(%id, ?changes, "in set_read_policies, capability changes");
1193            }
1194        }
1195
1196        if !read_capability_changes.is_empty() {
1197            StorageCollectionsImpl::update_read_capabilities_inner(
1198                &self.cmd_tx,
1199                collections,
1200                &mut read_capability_changes,
1201            );
1202        }
1203    }
1204
1205    // This is not an associated function so that we can share it with the task
1206    // that updates the persist handles and also has a reference to the shared
1207    // collections state.
1208    fn update_read_capabilities_inner(
1209        cmd_tx: &mpsc::UnboundedSender<BackgroundCmd<T>>,
1210        collections: &mut BTreeMap<GlobalId, CollectionState<T>>,
1211        updates: &mut BTreeMap<GlobalId, ChangeBatch<T>>,
1212    ) {
1213        // Location to record consequences that we need to act on.
1214        let mut collections_net = BTreeMap::new();
1215
1216        // We must not rely on any specific relative ordering of `GlobalId`s.
1217        // That said, it is reasonable to assume that collections generally have
1218        // greater IDs than their dependencies, so starting with the largest is
1219        // a useful optimization.
1220        while let Some(id) = updates.keys().rev().next().cloned() {
1221            let mut update = updates.remove(&id).unwrap();
1222
1223            if id.is_user() {
1224                trace!(id = ?id, update = ?update, "update_read_capabilities");
1225            }
1226
1227            let collection = if let Some(c) = collections.get_mut(&id) {
1228                c
1229            } else {
1230                let has_positive_updates = update.iter().any(|(_ts, diff)| *diff > 0);
1231                if has_positive_updates {
1232                    panic!(
1233                        "reference to absent collection {id} but we have positive updates: {:?}",
1234                        update
1235                    );
1236                } else {
1237                    // Continue purely negative updates. Someone has probably
1238                    // already dropped this collection!
1239                    continue;
1240                }
1241            };
1242
1243            let current_read_capabilities = collection.read_capabilities.frontier().to_owned();
1244            for (time, diff) in update.iter() {
1245                assert!(
1246                    collection.read_capabilities.count_for(time) + diff >= 0,
1247                    "update {:?} for collection {id} would lead to negative \
1248                        read capabilities, read capabilities before applying: {:?}",
1249                    update,
1250                    collection.read_capabilities
1251                );
1252
1253                if collection.read_capabilities.count_for(time) + diff > 0 {
1254                    assert!(
1255                        current_read_capabilities.less_equal(time),
1256                        "update {:?} for collection {id} is trying to \
1257                            install read capabilities before the current \
1258                            frontier of read capabilities, read capabilities before applying: {:?}",
1259                        update,
1260                        collection.read_capabilities
1261                    );
1262                }
1263            }
1264
1265            let changes = collection.read_capabilities.update_iter(update.drain());
1266            update.extend(changes);
1267
1268            if id.is_user() {
1269                trace!(
1270                %id,
1271                ?collection.storage_dependencies,
1272                ?update,
1273                "forwarding update to storage dependencies");
1274            }
1275
1276            for id in collection.storage_dependencies.iter() {
1277                updates
1278                    .entry(*id)
1279                    .or_insert_with(ChangeBatch::new)
1280                    .extend(update.iter().cloned());
1281            }
1282
1283            let (changes, frontier) = collections_net
1284                .entry(id)
1285                .or_insert_with(|| (<ChangeBatch<_>>::new(), Antichain::new()));
1286
1287            changes.extend(update.drain());
1288            *frontier = collection.read_capabilities.frontier().to_owned();
1289        }
1290
1291        // Translate our net compute actions into downgrades of persist sinces.
1292        // The actual downgrades are performed by a Tokio task asynchronously.
1293        let mut persist_compaction_commands = Vec::with_capacity(collections_net.len());
1294        for (key, (mut changes, frontier)) in collections_net {
1295            if !changes.is_empty() {
1296                // If the collection has a "primary" collection, let that primary drive compaction.
1297                let collection = collections.get(&key).expect("must still exist");
1298                let should_emit_persist_compaction = collection.primary.is_none();
1299
1300                if frontier.is_empty() {
1301                    info!(id = %key, "removing collection state because the since advanced to []!");
1302                    collections.remove(&key).expect("must still exist");
1303                }
1304
1305                if should_emit_persist_compaction {
1306                    persist_compaction_commands.push((key, frontier));
1307                }
1308            }
1309        }
1310
1311        if !persist_compaction_commands.is_empty() {
1312            cmd_tx
1313                .send(BackgroundCmd::DowngradeSince(persist_compaction_commands))
1314                .expect("cannot fail to send");
1315        }
1316    }
1317
1318    /// Remove any shards that we know are finalized
1319    fn synchronize_finalized_shards(&self, storage_metadata: &StorageMetadata) {
1320        self.finalized_shards
1321            .lock()
1322            .retain(|shard| storage_metadata.unfinalized_shards.contains(shard));
1323    }
1324}
1325
1326// See comments on the above impl for StorageCollectionsImpl.
1327#[async_trait]
1328impl<T> StorageCollections for StorageCollectionsImpl<T>
1329where
1330    T: TimelyTimestamp
1331        + Lattice
1332        + Codec64
1333        + From<EpochMillis>
1334        + TimestampManipulation
1335        + Into<mz_repr::Timestamp>
1336        + Sync,
1337{
1338    type Timestamp = T;
1339
1340    async fn initialize_state(
1341        &self,
1342        txn: &mut (dyn StorageTxn<T> + Send),
1343        init_ids: BTreeSet<GlobalId>,
1344    ) -> Result<(), StorageError<T>> {
1345        let metadata = txn.get_collection_metadata();
1346        let existing_metadata: BTreeSet<_> = metadata.into_iter().map(|(id, _)| id).collect();
1347
1348        // Determine which collections we do not yet have metadata for.
1349        let new_collections: BTreeSet<GlobalId> =
1350            init_ids.difference(&existing_metadata).cloned().collect();
1351
1352        self.prepare_state(
1353            txn,
1354            new_collections,
1355            BTreeSet::default(),
1356            BTreeMap::default(),
1357        )
1358        .await?;
1359
1360        // All shards that belong to collections dropped in the last epoch are
1361        // eligible for finalization.
1362        //
1363        // n.b. this introduces an unlikely race condition: if a collection is
1364        // dropped from the catalog, but the dataflow is still running on a
1365        // worker, assuming the shard is safe to finalize on reboot may cause
1366        // the cluster to panic.
1367        let unfinalized_shards = txn.get_unfinalized_shards().into_iter().collect_vec();
1368
1369        info!(?unfinalized_shards, "initializing finalizable_shards");
1370
1371        self.finalizable_shards.lock().extend(unfinalized_shards);
1372
1373        Ok(())
1374    }
1375
1376    fn update_parameters(&self, config_params: StorageParameters) {
1377        // We serialize the dyncfg updates in StorageParameters, but configure
1378        // persist separately.
1379        config_params.dyncfg_updates.apply(self.persist.cfg());
1380
1381        self.config
1382            .lock()
1383            .expect("lock poisoned")
1384            .update(config_params);
1385    }
1386
1387    fn collection_metadata(&self, id: GlobalId) -> Result<CollectionMetadata, CollectionMissing> {
1388        let collections = self.collections.lock().expect("lock poisoned");
1389
1390        collections
1391            .get(&id)
1392            .map(|c| c.collection_metadata.clone())
1393            .ok_or(CollectionMissing(id))
1394    }
1395
1396    fn active_collection_metadatas(&self) -> Vec<(GlobalId, CollectionMetadata)> {
1397        let collections = self.collections.lock().expect("lock poisoned");
1398
1399        collections
1400            .iter()
1401            .filter(|(_id, c)| !c.is_dropped())
1402            .map(|(id, c)| (*id, c.collection_metadata.clone()))
1403            .collect()
1404    }
1405
1406    fn collections_frontiers(
1407        &self,
1408        ids: Vec<GlobalId>,
1409    ) -> Result<Vec<CollectionFrontiers<Self::Timestamp>>, CollectionMissing> {
1410        if ids.is_empty() {
1411            return Ok(vec![]);
1412        }
1413
1414        let collections = self.collections.lock().expect("lock poisoned");
1415
1416        let res = ids
1417            .into_iter()
1418            .map(|id| {
1419                collections
1420                    .get(&id)
1421                    .map(|c| CollectionFrontiers {
1422                        id: id.clone(),
1423                        write_frontier: c.write_frontier.clone(),
1424                        implied_capability: c.implied_capability.clone(),
1425                        read_capabilities: c.read_capabilities.frontier().to_owned(),
1426                    })
1427                    .ok_or(CollectionMissing(id))
1428            })
1429            .collect::<Result<Vec<_>, _>>()?;
1430
1431        Ok(res)
1432    }
1433
1434    fn active_collection_frontiers(&self) -> Vec<CollectionFrontiers<Self::Timestamp>> {
1435        let collections = self.collections.lock().expect("lock poisoned");
1436
1437        let res = collections
1438            .iter()
1439            .filter(|(_id, c)| !c.is_dropped())
1440            .map(|(id, c)| CollectionFrontiers {
1441                id: id.clone(),
1442                write_frontier: c.write_frontier.clone(),
1443                implied_capability: c.implied_capability.clone(),
1444                read_capabilities: c.read_capabilities.frontier().to_owned(),
1445            })
1446            .collect_vec();
1447
1448        res
1449    }
1450
1451    async fn snapshot_stats(
1452        &self,
1453        id: GlobalId,
1454        as_of: Antichain<Self::Timestamp>,
1455    ) -> Result<SnapshotStats, StorageError<Self::Timestamp>> {
1456        let metadata = self.collection_metadata(id)?;
1457
1458        // See the comments in StorageController::snapshot for what's going on
1459        // here.
1460        let as_of = match metadata.txns_shard.as_ref() {
1461            None => SnapshotStatsAsOf::Direct(as_of),
1462            Some(txns_id) => {
1463                assert_eq!(txns_id, self.txns_read.txns_id());
1464                let as_of = as_of
1465                    .into_option()
1466                    .expect("cannot read as_of the empty antichain");
1467                self.txns_read.update_gt(as_of.clone()).await;
1468                let data_snapshot = self
1469                    .txns_read
1470                    .data_snapshot(metadata.data_shard, as_of.clone())
1471                    .await;
1472                SnapshotStatsAsOf::Txns(data_snapshot)
1473            }
1474        };
1475        self.snapshot_stats_inner(id, as_of).await
1476    }
1477
1478    async fn snapshot_parts_stats(
1479        &self,
1480        id: GlobalId,
1481        as_of: Antichain<Self::Timestamp>,
1482    ) -> BoxFuture<'static, Result<SnapshotPartsStats, StorageError<Self::Timestamp>>> {
1483        let metadata = {
1484            let self_collections = self.collections.lock().expect("lock poisoned");
1485
1486            let collection_metadata = self_collections
1487                .get(&id)
1488                .ok_or(StorageError::IdentifierMissing(id))
1489                .map(|c| c.collection_metadata.clone());
1490
1491            match collection_metadata {
1492                Ok(m) => m,
1493                Err(e) => return Box::pin(async move { Err(e) }),
1494            }
1495        };
1496
1497        // See the comments in StorageController::snapshot for what's going on
1498        // here.
1499        let persist = Arc::clone(&self.persist);
1500        let read_handle = Self::read_handle_for_snapshot(persist, &metadata, id).await;
1501
1502        let data_snapshot = match (metadata, as_of.as_option()) {
1503            (
1504                CollectionMetadata {
1505                    txns_shard: Some(txns_id),
1506                    data_shard,
1507                    ..
1508                },
1509                Some(as_of),
1510            ) => {
1511                assert_eq!(txns_id, *self.txns_read.txns_id());
1512                self.txns_read.update_gt(as_of.clone()).await;
1513                let data_snapshot = self
1514                    .txns_read
1515                    .data_snapshot(data_shard, as_of.clone())
1516                    .await;
1517                Some(data_snapshot)
1518            }
1519            _ => None,
1520        };
1521
1522        Box::pin(async move {
1523            let read_handle = read_handle?;
1524            let result = match data_snapshot {
1525                Some(data_snapshot) => data_snapshot.snapshot_parts_stats(&read_handle).await,
1526                None => read_handle.snapshot_parts_stats(as_of).await,
1527            };
1528            read_handle.expire().await;
1529            result.map_err(|_| StorageError::ReadBeforeSince(id))
1530        })
1531    }
1532
1533    // TODO(petrosagg): This signature is not very useful in the context of partially ordered times
1534    // where the as_of frontier might have multiple elements. In the current form the mutually
1535    // incomparable updates will be accumulated together to a state of the collection that never
1536    // actually existed. We should include the original time in the updates advanced by the as_of
1537    // frontier in the result and let the caller decide what to do with the information.
1538    fn snapshot(
1539        &self,
1540        id: GlobalId,
1541        as_of: Self::Timestamp,
1542    ) -> BoxFuture<'static, Result<Vec<(Row, StorageDiff)>, StorageError<Self::Timestamp>>> {
1543        self.snapshot(id, as_of, &self.txns_read)
1544    }
1545
1546    async fn snapshot_latest(
1547        &self,
1548        id: GlobalId,
1549    ) -> Result<Vec<Row>, StorageError<Self::Timestamp>> {
1550        let upper = self.recent_upper(id).await?;
1551        let res = match upper.as_option() {
1552            Some(f) if f > &T::minimum() => {
1553                let as_of = f.step_back().unwrap();
1554
1555                let snapshot = self.snapshot(id, as_of, &self.txns_read).await.unwrap();
1556                snapshot
1557                    .into_iter()
1558                    .map(|(row, diff)| {
1559                        assert_eq!(diff, 1, "snapshot doesn't accumulate to set");
1560                        row
1561                    })
1562                    .collect()
1563            }
1564            Some(_min) => {
1565                // The collection must be empty!
1566                Vec::new()
1567            }
1568            // The collection is closed, we cannot determine a latest read
1569            // timestamp based on the upper.
1570            _ => {
1571                return Err(StorageError::InvalidUsage(
1572                    "collection closed, cannot determine a read timestamp based on the upper"
1573                        .to_string(),
1574                ));
1575            }
1576        };
1577
1578        Ok(res)
1579    }
1580
1581    fn snapshot_cursor(
1582        &self,
1583        id: GlobalId,
1584        as_of: Self::Timestamp,
1585    ) -> BoxFuture<'static, Result<SnapshotCursor<Self::Timestamp>, StorageError<Self::Timestamp>>>
1586    where
1587        Self::Timestamp: TimelyTimestamp + Lattice + Codec64,
1588    {
1589        let metadata = match self.collection_metadata(id) {
1590            Ok(metadata) => metadata.clone(),
1591            Err(e) => return async { Err(e.into()) }.boxed(),
1592        };
1593        let txns_read = metadata.txns_shard.as_ref().map(|txns_id| {
1594            // Ensure the txn's shard the controller has is the same that this
1595            // collection is registered to.
1596            assert_eq!(txns_id, self.txns_read.txns_id());
1597            self.txns_read.clone()
1598        });
1599        let persist = Arc::clone(&self.persist);
1600
1601        // See the comments in Self::snapshot for what's going on here.
1602        async move {
1603            let mut handle = Self::read_handle_for_snapshot(persist, &metadata, id).await?;
1604            let cursor = match txns_read {
1605                None => {
1606                    let cursor = handle
1607                        .snapshot_cursor(Antichain::from_elem(as_of), |_| true)
1608                        .await
1609                        .map_err(|_| StorageError::ReadBeforeSince(id))?;
1610                    SnapshotCursor {
1611                        _read_handle: handle,
1612                        cursor,
1613                    }
1614                }
1615                Some(txns_read) => {
1616                    txns_read.update_gt(as_of.clone()).await;
1617                    let data_snapshot = txns_read
1618                        .data_snapshot(metadata.data_shard, as_of.clone())
1619                        .await;
1620                    let cursor = data_snapshot
1621                        .snapshot_cursor(&mut handle, |_| true)
1622                        .await
1623                        .map_err(|_| StorageError::ReadBeforeSince(id))?;
1624                    SnapshotCursor {
1625                        _read_handle: handle,
1626                        cursor,
1627                    }
1628                }
1629            };
1630
1631            Ok(cursor)
1632        }
1633        .boxed()
1634    }
1635
1636    fn snapshot_and_stream(
1637        &self,
1638        id: GlobalId,
1639        as_of: Self::Timestamp,
1640    ) -> BoxFuture<
1641        'static,
1642        Result<
1643            BoxStream<'static, (SourceData, Self::Timestamp, StorageDiff)>,
1644            StorageError<Self::Timestamp>,
1645        >,
1646    >
1647    where
1648        Self::Timestamp: TimelyTimestamp + Lattice + Codec64 + 'static,
1649    {
1650        self.snapshot_and_stream(id, as_of, &self.txns_read)
1651    }
1652
1653    fn create_update_builder(
1654        &self,
1655        id: GlobalId,
1656    ) -> BoxFuture<
1657        'static,
1658        Result<
1659            TimestamplessUpdateBuilder<SourceData, (), Self::Timestamp, StorageDiff>,
1660            StorageError<Self::Timestamp>,
1661        >,
1662    > {
1663        let metadata = match self.collection_metadata(id) {
1664            Ok(m) => m,
1665            Err(e) => return Box::pin(async move { Err(e.into()) }),
1666        };
1667        let persist = Arc::clone(&self.persist);
1668
1669        async move {
1670            let persist_client = persist
1671                .open(metadata.persist_location.clone())
1672                .await
1673                .expect("invalid persist usage");
1674            let write_handle = persist_client
1675                .open_writer::<SourceData, (), Self::Timestamp, StorageDiff>(
1676                    metadata.data_shard,
1677                    Arc::new(metadata.relation_desc.clone()),
1678                    Arc::new(UnitSchema),
1679                    Diagnostics {
1680                        shard_name: id.to_string(),
1681                        handle_purpose: format!("create write batch {}", id),
1682                    },
1683                )
1684                .await
1685                .expect("invalid persist usage");
1686            let builder = TimestamplessUpdateBuilder::new(&write_handle);
1687
1688            Ok(builder)
1689        }
1690        .boxed()
1691    }
1692
1693    fn check_exists(&self, id: GlobalId) -> Result<(), StorageError<Self::Timestamp>> {
1694        let collections = self.collections.lock().expect("lock poisoned");
1695
1696        if collections.contains_key(&id) {
1697            Ok(())
1698        } else {
1699            Err(StorageError::IdentifierMissing(id))
1700        }
1701    }
1702
1703    async fn prepare_state(
1704        &self,
1705        txn: &mut (dyn StorageTxn<Self::Timestamp> + Send),
1706        ids_to_add: BTreeSet<GlobalId>,
1707        ids_to_drop: BTreeSet<GlobalId>,
1708        ids_to_register: BTreeMap<GlobalId, ShardId>,
1709    ) -> Result<(), StorageError<T>> {
1710        txn.insert_collection_metadata(
1711            ids_to_add
1712                .into_iter()
1713                .map(|id| (id, ShardId::new()))
1714                .collect(),
1715        )?;
1716        txn.insert_collection_metadata(ids_to_register)?;
1717
1718        // Delete the metadata for any dropped collections.
1719        let dropped_mappings = txn.delete_collection_metadata(ids_to_drop);
1720
1721        let dropped_shards = dropped_mappings
1722            .into_iter()
1723            .map(|(_id, shard)| shard)
1724            .collect();
1725
1726        txn.insert_unfinalized_shards(dropped_shards)?;
1727
1728        // Reconcile any shards we've successfully finalized with the shard
1729        // finalization collection.
1730        let finalized_shards = self.finalized_shards.lock().iter().copied().collect();
1731        txn.mark_shards_as_finalized(finalized_shards);
1732
1733        Ok(())
1734    }
1735
1736    // TODO(aljoscha): It would be swell if we could refactor this Leviathan of
1737    // a method/move individual parts to their own methods.
1738    #[instrument(level = "debug")]
1739    async fn create_collections_for_bootstrap(
1740        &self,
1741        storage_metadata: &StorageMetadata,
1742        register_ts: Option<Self::Timestamp>,
1743        mut collections: Vec<(GlobalId, CollectionDescription<Self::Timestamp>)>,
1744        migrated_storage_collections: &BTreeSet<GlobalId>,
1745    ) -> Result<(), StorageError<Self::Timestamp>> {
1746        let is_in_txns = |id, metadata: &CollectionMetadata| {
1747            metadata.txns_shard.is_some()
1748                && !(self.read_only && migrated_storage_collections.contains(&id))
1749        };
1750
1751        // Validate first, to avoid corrupting state.
1752        // 1. create a dropped identifier, or
1753        // 2. create an existing identifier with a new description.
1754        // Make sure to check for errors within `ingestions` as well.
1755        collections.sort_by_key(|(id, _)| *id);
1756        collections.dedup();
1757        for pos in 1..collections.len() {
1758            if collections[pos - 1].0 == collections[pos].0 {
1759                return Err(StorageError::CollectionIdReused(collections[pos].0));
1760            }
1761        }
1762
1763        // We first enrich each collection description with some additional
1764        // metadata...
1765        let enriched_with_metadata = collections
1766            .into_iter()
1767            .map(|(id, description)| {
1768                let data_shard = storage_metadata.get_collection_shard::<T>(id)?;
1769
1770                // If the shard is being managed by txn-wal (initially,
1771                // tables), then we need to pass along the shard id for the txns
1772                // shard to dataflow rendering.
1773                let txns_shard = description
1774                    .data_source
1775                    .in_txns()
1776                    .then(|| *self.txns_read.txns_id());
1777
1778                let metadata = CollectionMetadata {
1779                    persist_location: self.persist_location.clone(),
1780                    data_shard,
1781                    relation_desc: description.desc.clone(),
1782                    txns_shard,
1783                };
1784
1785                Ok((id, description, metadata))
1786            })
1787            .collect_vec();
1788
1789        // So that we can open `SinceHandle`s for each collections concurrently.
1790        let persist_client = self
1791            .persist
1792            .open(self.persist_location.clone())
1793            .await
1794            .unwrap();
1795        let persist_client = &persist_client;
1796        // Reborrow the `&mut self` as immutable, as all the concurrent work to
1797        // be processed in this stream cannot all have exclusive access.
1798        use futures::stream::{StreamExt, TryStreamExt};
1799        let this = &*self;
1800        let mut to_register: Vec<_> = futures::stream::iter(enriched_with_metadata)
1801            .map(|data: Result<_, StorageError<Self::Timestamp>>| {
1802                let register_ts = register_ts.clone();
1803                async move {
1804                    let (id, description, metadata) = data?;
1805
1806                    // should be replaced with real introspection
1807                    // (https://github.com/MaterializeInc/database-issues/issues/4078)
1808                    // but for now, it's helpful to have this mapping written down
1809                    // somewhere
1810                    debug!("mapping GlobalId={} to shard ({})", id, metadata.data_shard);
1811
1812                    // If this collection has a primary, the primary is responsible for downgrading
1813                    // the critical since and it would be an error if we did so here while opening
1814                    // the since handle.
1815                    let since = if description.primary.is_some() {
1816                        None
1817                    } else {
1818                        description.since.as_ref()
1819                    };
1820
1821                    let (write, mut since_handle) = this
1822                        .open_data_handles(
1823                            &id,
1824                            metadata.data_shard,
1825                            since,
1826                            metadata.relation_desc.clone(),
1827                            persist_client,
1828                        )
1829                        .await;
1830
1831                    // Present tables as springing into existence at the register_ts
1832                    // by advancing the since. Otherwise, we could end up in a
1833                    // situation where a table with a long compaction window appears
1834                    // to exist before the environment (and this the table) existed.
1835                    //
1836                    // We could potentially also do the same thing for other
1837                    // sources, in particular storage's internal sources and perhaps
1838                    // others, but leave them for now.
1839                    match description.data_source {
1840                        DataSource::Introspection(_)
1841                        | DataSource::IngestionExport { .. }
1842                        | DataSource::Webhook
1843                        | DataSource::Ingestion(_)
1844                        | DataSource::Progress
1845                        | DataSource::Other => {}
1846                        DataSource::Sink { .. } => {}
1847                        DataSource::Table => {
1848                            let register_ts = register_ts.expect(
1849                                "caller should have provided a register_ts when creating a table",
1850                            );
1851                            if since_handle.since().elements() == &[T::minimum()]
1852                                && !migrated_storage_collections.contains(&id)
1853                            {
1854                                debug!("advancing {} to initial since of {:?}", id, register_ts);
1855                                let token = since_handle.opaque();
1856                                let _ = since_handle
1857                                    .compare_and_downgrade_since(
1858                                        &token,
1859                                        (&token, &Antichain::from_elem(register_ts.clone())),
1860                                    )
1861                                    .await;
1862                            }
1863                        }
1864                    }
1865
1866                    Ok::<_, StorageError<Self::Timestamp>>((
1867                        id,
1868                        description,
1869                        write,
1870                        since_handle,
1871                        metadata,
1872                    ))
1873                }
1874            })
1875            // Poll each future for each collection concurrently, maximum of 50 at a time.
1876            .buffer_unordered(50)
1877            // HERE BE DRAGONS:
1878            //
1879            // There are at least 2 subtleties in using `FuturesUnordered`
1880            // (which `buffer_unordered` uses underneath:
1881            // - One is captured here
1882            //   <https://github.com/rust-lang/futures-rs/issues/2387>
1883            // - And the other is deadlocking if processing an OUTPUT of a
1884            //   `FuturesUnordered` stream attempts to obtain an async mutex that
1885            //   is also obtained in the futures being polled.
1886            //
1887            // Both of these could potentially be issues in all usages of
1888            // `buffer_unordered` in this method, so we stick the standard
1889            // advice: only use `try_collect` or `collect`!
1890            .try_collect()
1891            .await?;
1892
1893        // Reorder in dependency order.
1894        #[derive(Ord, PartialOrd, Eq, PartialEq)]
1895        enum DependencyOrder {
1896            /// Tables should always be registered first, and large ids before small ones.
1897            Table(Reverse<GlobalId>),
1898            /// For most collections the id order is the correct one.
1899            Collection(GlobalId),
1900            /// Sinks should always be registered last.
1901            Sink(GlobalId),
1902        }
1903        to_register.sort_by_key(|(id, desc, ..)| match &desc.data_source {
1904            DataSource::Table => DependencyOrder::Table(Reverse(*id)),
1905            DataSource::Sink { .. } => DependencyOrder::Sink(*id),
1906            _ => DependencyOrder::Collection(*id),
1907        });
1908
1909        // We hold this lock for a very short amount of time, just doing some
1910        // hashmap inserts and unbounded channel sends.
1911        let mut self_collections = self.collections.lock().expect("lock poisoned");
1912
1913        for (id, description, write_handle, since_handle, metadata) in to_register {
1914            let write_frontier = write_handle.upper();
1915            let data_shard_since = since_handle.since().clone();
1916
1917            // Determine if this collection has any dependencies.
1918            let storage_dependencies =
1919                Self::determine_collection_dependencies(&*self_collections, id, &description)?;
1920
1921            // Determine the initial since of the collection.
1922            let initial_since = match storage_dependencies
1923                .iter()
1924                .at_most_one()
1925                .expect("should have at most one dependency")
1926            {
1927                Some(dep) => {
1928                    let dependency_collection = self_collections
1929                        .get(dep)
1930                        .ok_or(StorageError::IdentifierMissing(*dep))?;
1931                    let dependency_since = dependency_collection.implied_capability.clone();
1932
1933                    // If an item has a dependency, its initial since must be
1934                    // advanced as far as its dependency, i.e. a dependency's
1935                    // since may never be in advance of its dependents.
1936                    //
1937                    // We have to do this every time we initialize the
1938                    // collection, though––the invariant might have been upheld
1939                    // correctly in the previous epoch, but the
1940                    // `data_shard_since` might not have compacted and, on
1941                    // establishing a new persist connection, still have data we
1942                    // said _could_ be compacted.
1943                    if PartialOrder::less_than(&data_shard_since, &dependency_since) {
1944                        // The dependency since cannot be beyond the dependent
1945                        // (our) upper unless the collection is new. In
1946                        // practice, the depdenency is the remap shard of a
1947                        // source (export), and if the since is allowed to
1948                        // "catch up" to the upper, that is `upper <= since`, a
1949                        // restarting ingestion cannot differentiate between
1950                        // updates that have already been written out to the
1951                        // backing persist shard and updates that have yet to be
1952                        // written. We would write duplicate updates.
1953                        //
1954                        // If this check fails, it means that the read hold
1955                        // installed on the dependency was probably not upheld
1956                        // –– if it were, the dependency's since could not have
1957                        // advanced as far the dependent's upper.
1958                        //
1959                        // We don't care about the dependency since when the
1960                        // write frontier is empty. In that case, no-one can
1961                        // write down any more updates.
1962                        mz_ore::soft_assert_or_log!(
1963                            write_frontier.elements() == &[T::minimum()]
1964                                || write_frontier.is_empty()
1965                                || PartialOrder::less_than(&dependency_since, write_frontier),
1966                            "dependency ({dep}) since has advanced past dependent ({id}) upper \n
1967                            dependent ({id}): since {:?}, upper {:?} \n
1968                            dependency ({dep}): since {:?}",
1969                            data_shard_since,
1970                            write_frontier,
1971                            dependency_since
1972                        );
1973
1974                        dependency_since
1975                    } else {
1976                        data_shard_since
1977                    }
1978                }
1979                None => data_shard_since,
1980            };
1981
1982            // Determine the time dependence of the collection.
1983            let time_dependence = {
1984                use DataSource::*;
1985                if let Some(timeline) = &description.timeline
1986                    && *timeline != Timeline::EpochMilliseconds
1987                {
1988                    // Only the epoch timeline follows wall-clock.
1989                    None
1990                } else {
1991                    match &description.data_source {
1992                        Ingestion(ingestion) => {
1993                            use GenericSourceConnection::*;
1994                            match ingestion.desc.connection {
1995                                // Kafka, Postgres, MySql, and SQL Server sources all
1996                                // follow wall clock.
1997                                Kafka(_) | Postgres(_) | MySql(_) | SqlServer(_) => {
1998                                    Some(TimeDependence::default())
1999                                }
2000                                // Load generators not further specified.
2001                                LoadGenerator(_) => None,
2002                            }
2003                        }
2004                        IngestionExport { ingestion_id, .. } => {
2005                            let c = self_collections.get(ingestion_id).expect("known to exist");
2006                            c.time_dependence.clone()
2007                        }
2008                        // Introspection, other, progress, table, and webhook sources follow wall clock.
2009                        Introspection(_) | Progress | Table { .. } | Webhook { .. } => {
2010                            Some(TimeDependence::default())
2011                        }
2012                        // Materialized views, continual tasks, etc, aren't managed by storage.
2013                        Other => None,
2014                        Sink { .. } => None,
2015                    }
2016                }
2017            };
2018
2019            let ingestion_remap_collection_id = match &description.data_source {
2020                DataSource::Ingestion(desc) => Some(desc.remap_collection_id),
2021                _ => None,
2022            };
2023
2024            let mut collection_state = CollectionState::new(
2025                description.primary,
2026                time_dependence,
2027                ingestion_remap_collection_id,
2028                initial_since,
2029                write_frontier.clone(),
2030                storage_dependencies,
2031                metadata.clone(),
2032            );
2033
2034            // Install the collection state in the appropriate spot.
2035            match &description.data_source {
2036                DataSource::Introspection(_) => {
2037                    self_collections.insert(id, collection_state);
2038                }
2039                DataSource::Webhook => {
2040                    self_collections.insert(id, collection_state);
2041                }
2042                DataSource::IngestionExport { .. } => {
2043                    self_collections.insert(id, collection_state);
2044                }
2045                DataSource::Table => {
2046                    // See comment on self.initial_txn_upper on why we're doing
2047                    // this.
2048                    if is_in_txns(id, &metadata)
2049                        && PartialOrder::less_than(
2050                            &collection_state.write_frontier,
2051                            &self.initial_txn_upper,
2052                        )
2053                    {
2054                        // We could try and be cute and use the join of the txn
2055                        // upper and the table upper. But that has more
2056                        // complicated reasoning for why it is or isn't correct,
2057                        // and we're only dealing with totally ordered times
2058                        // here.
2059                        collection_state
2060                            .write_frontier
2061                            .clone_from(&self.initial_txn_upper);
2062                    }
2063                    self_collections.insert(id, collection_state);
2064                }
2065                DataSource::Progress | DataSource::Other => {
2066                    self_collections.insert(id, collection_state);
2067                }
2068                DataSource::Ingestion(_) => {
2069                    self_collections.insert(id, collection_state);
2070                }
2071                DataSource::Sink { .. } => {
2072                    self_collections.insert(id, collection_state);
2073                }
2074            }
2075
2076            self.register_handles(id, is_in_txns(id, &metadata), since_handle, write_handle);
2077
2078            // If this collection has a dependency, install a read hold on it.
2079            self.install_collection_dependency_read_holds_inner(&mut *self_collections, id)?;
2080        }
2081
2082        drop(self_collections);
2083
2084        self.synchronize_finalized_shards(storage_metadata);
2085
2086        Ok(())
2087    }
2088
2089    async fn alter_table_desc(
2090        &self,
2091        existing_collection: GlobalId,
2092        new_collection: GlobalId,
2093        new_desc: RelationDesc,
2094        expected_version: RelationVersion,
2095    ) -> Result<(), StorageError<Self::Timestamp>> {
2096        let data_shard = {
2097            let self_collections = self.collections.lock().expect("lock poisoned");
2098            let existing = self_collections
2099                .get(&existing_collection)
2100                .ok_or_else(|| StorageError::IdentifierMissing(existing_collection))?;
2101
2102            existing.collection_metadata.data_shard
2103        };
2104
2105        let persist_client = self
2106            .persist
2107            .open(self.persist_location.clone())
2108            .await
2109            .unwrap();
2110
2111        // Evolve the schema of this shard.
2112        let diagnostics = Diagnostics {
2113            shard_name: existing_collection.to_string(),
2114            handle_purpose: "alter_table_desc".to_string(),
2115        };
2116        // We map the Adapter's RelationVersion 1:1 with SchemaId.
2117        let expected_schema = expected_version.into();
2118        let schema_result = persist_client
2119            .compare_and_evolve_schema::<SourceData, (), T, StorageDiff>(
2120                data_shard,
2121                expected_schema,
2122                &new_desc,
2123                &UnitSchema,
2124                diagnostics,
2125            )
2126            .await
2127            .map_err(|e| StorageError::InvalidUsage(e.to_string()))?;
2128        tracing::info!(
2129            ?existing_collection,
2130            ?new_collection,
2131            ?new_desc,
2132            "evolved schema"
2133        );
2134
2135        match schema_result {
2136            CaESchema::Ok(id) => id,
2137            // TODO(alter_table): If we get an expected mismatch we should retry.
2138            CaESchema::ExpectedMismatch {
2139                schema_id,
2140                key,
2141                val,
2142            } => {
2143                mz_ore::soft_panic_or_log!(
2144                    "schema expectation mismatch {schema_id:?}, {key:?}, {val:?}"
2145                );
2146                return Err(StorageError::Generic(anyhow::anyhow!(
2147                    "schema expected mismatch, {existing_collection:?}",
2148                )));
2149            }
2150            CaESchema::Incompatible => {
2151                mz_ore::soft_panic_or_log!(
2152                    "incompatible schema! {existing_collection} {new_desc:?}"
2153                );
2154                return Err(StorageError::Generic(anyhow::anyhow!(
2155                    "schema incompatible, {existing_collection:?}"
2156                )));
2157            }
2158        };
2159
2160        // Once the new schema is registered we can open new data handles.
2161        let (write_handle, since_handle) = self
2162            .open_data_handles(
2163                &new_collection,
2164                data_shard,
2165                None,
2166                new_desc.clone(),
2167                &persist_client,
2168            )
2169            .await;
2170
2171        // TODO(alter_table): Do we need to advance the since of the table to match the time this
2172        // new version was registered with txn-wal?
2173
2174        // Great! Our new schema is registered with Persist, now we need to update our internal
2175        // data structures.
2176        {
2177            let mut self_collections = self.collections.lock().expect("lock poisoned");
2178
2179            // Update the existing collection so we know it's a "projection" of this new one.
2180            let existing = self_collections
2181                .get_mut(&existing_collection)
2182                .expect("existing collection missing");
2183
2184            // A higher level should already be asserting this, but let's make sure.
2185            assert_none!(existing.primary);
2186
2187            // The existing version of the table will depend on the new version.
2188            existing.primary = Some(new_collection);
2189            existing.storage_dependencies.push(new_collection);
2190
2191            // Copy over the frontiers from the previous version.
2192            // The new table starts with two holds - the implied capability, and the hold from
2193            // the previous version - both at the previous version's read frontier.
2194            let implied_capability = existing.read_capabilities.frontier().to_owned();
2195            let write_frontier = existing.write_frontier.clone();
2196
2197            // Determine the relevant read capabilities on the new collection.
2198            //
2199            // Note(parkmycar): Originally we used `install_collection_dependency_read_holds_inner`
2200            // here, but that only installed a ReadHold on the new collection for the implied
2201            // capability of the existing collection. This would cause runtime panics because it
2202            // would eventually result in negative read capabilities.
2203            let mut changes = ChangeBatch::new();
2204            changes.extend(implied_capability.iter().map(|t| (t.clone(), 1)));
2205
2206            // Note: The new collection is now the "primary collection".
2207            let collection_meta = CollectionMetadata {
2208                persist_location: self.persist_location.clone(),
2209                relation_desc: new_desc.clone(),
2210                data_shard,
2211                txns_shard: Some(self.txns_read.txns_id().clone()),
2212            };
2213            let collection_state = CollectionState::new(
2214                None,
2215                existing.time_dependence.clone(),
2216                existing.ingestion_remap_collection_id.clone(),
2217                implied_capability,
2218                write_frontier,
2219                Vec::new(),
2220                collection_meta,
2221            );
2222
2223            // Add a record of the new collection.
2224            self_collections.insert(new_collection, collection_state);
2225
2226            let mut updates = BTreeMap::from([(new_collection, changes)]);
2227            StorageCollectionsImpl::update_read_capabilities_inner(
2228                &self.cmd_tx,
2229                &mut *self_collections,
2230                &mut updates,
2231            );
2232        };
2233
2234        // TODO(alter_table): Support changes to sources.
2235        self.register_handles(new_collection, true, since_handle, write_handle);
2236
2237        info!(%existing_collection, %new_collection, ?new_desc, "altered table");
2238
2239        Ok(())
2240    }
2241
2242    fn drop_collections_unvalidated(
2243        &self,
2244        storage_metadata: &StorageMetadata,
2245        identifiers: Vec<GlobalId>,
2246    ) {
2247        debug!(?identifiers, "drop_collections_unvalidated");
2248
2249        let mut self_collections = self.collections.lock().expect("lock poisoned");
2250
2251        for id in identifiers.iter() {
2252            let metadata = storage_metadata.get_collection_shard::<T>(*id);
2253            mz_ore::soft_assert_or_log!(
2254                matches!(metadata, Err(StorageError::IdentifierMissing(_))),
2255                "dropping {id}, but drop was not synchronized with storage \
2256                controller via `synchronize_collections`"
2257            );
2258        }
2259
2260        // Policies that advance the since to the empty antichain. We do still
2261        // honor outstanding read holds, and collections will only be dropped
2262        // once those are removed as well.
2263        //
2264        // We don't explicitly remove read capabilities! Downgrading the
2265        // frontier of the source to `[]` (the empty Antichain), will propagate
2266        // to the storage dependencies.
2267        let mut finalized_policies = Vec::new();
2268
2269        for id in identifiers {
2270            // Make sure it's still there, might already have been deleted.
2271            if self_collections.contains_key(&id) {
2272                finalized_policies.push((id, ReadPolicy::ValidFrom(Antichain::new())));
2273            }
2274        }
2275        self.set_read_policies_inner(&mut self_collections, finalized_policies);
2276
2277        drop(self_collections);
2278
2279        self.synchronize_finalized_shards(storage_metadata);
2280    }
2281
2282    fn set_read_policies(&self, policies: Vec<(GlobalId, ReadPolicy<Self::Timestamp>)>) {
2283        let mut collections = self.collections.lock().expect("lock poisoned");
2284
2285        if tracing::enabled!(tracing::Level::TRACE) {
2286            let user_capabilities = collections
2287                .iter_mut()
2288                .filter(|(id, _c)| id.is_user())
2289                .map(|(id, c)| {
2290                    let updates = c.read_capabilities.updates().cloned().collect_vec();
2291                    (*id, c.implied_capability.clone(), updates)
2292                })
2293                .collect_vec();
2294
2295            trace!(?policies, ?user_capabilities, "set_read_policies");
2296        }
2297
2298        self.set_read_policies_inner(&mut collections, policies);
2299
2300        if tracing::enabled!(tracing::Level::TRACE) {
2301            let user_capabilities = collections
2302                .iter_mut()
2303                .filter(|(id, _c)| id.is_user())
2304                .map(|(id, c)| {
2305                    let updates = c.read_capabilities.updates().cloned().collect_vec();
2306                    (*id, c.implied_capability.clone(), updates)
2307                })
2308                .collect_vec();
2309
2310            trace!(?user_capabilities, "after! set_read_policies");
2311        }
2312    }
2313
2314    fn acquire_read_holds(
2315        &self,
2316        desired_holds: Vec<GlobalId>,
2317    ) -> Result<Vec<ReadHold<Self::Timestamp>>, CollectionMissing> {
2318        if desired_holds.is_empty() {
2319            return Ok(vec![]);
2320        }
2321
2322        let mut collections = self.collections.lock().expect("lock poisoned");
2323
2324        let mut advanced_holds = Vec::new();
2325        // We advance the holds by our current since frontier. Can't acquire
2326        // holds for times that have been compacted away!
2327        //
2328        // NOTE: We acquire read holds at the earliest possible time rather than
2329        // at the implied capability. This is so that, for example, adapter can
2330        // acquire a read hold to hold back the frontier, giving the COMPUTE
2331        // controller a chance to also acquire a read hold at that early
2332        // frontier. If/when we change the interplay between adapter and COMPUTE
2333        // to pass around ReadHold tokens, we might tighten this up and instead
2334        // acquire read holds at the implied capability.
2335        for id in desired_holds.iter() {
2336            let collection = collections.get(id).ok_or(CollectionMissing(*id))?;
2337            let since = collection.read_capabilities.frontier().to_owned();
2338            advanced_holds.push((*id, since));
2339        }
2340
2341        let mut updates = advanced_holds
2342            .iter()
2343            .map(|(id, hold)| {
2344                let mut changes = ChangeBatch::new();
2345                changes.extend(hold.iter().map(|time| (time.clone(), 1)));
2346                (*id, changes)
2347            })
2348            .collect::<BTreeMap<_, _>>();
2349
2350        StorageCollectionsImpl::update_read_capabilities_inner(
2351            &self.cmd_tx,
2352            &mut collections,
2353            &mut updates,
2354        );
2355
2356        let acquired_holds = advanced_holds
2357            .into_iter()
2358            .map(|(id, since)| ReadHold::with_channel(id, since, self.holds_tx.clone()))
2359            .collect_vec();
2360
2361        trace!(?desired_holds, ?acquired_holds, "acquire_read_holds");
2362
2363        Ok(acquired_holds)
2364    }
2365
2366    /// Determine time dependence information for the object.
2367    fn determine_time_dependence(
2368        &self,
2369        id: GlobalId,
2370    ) -> Result<Option<TimeDependence>, TimeDependenceError> {
2371        use TimeDependenceError::CollectionMissing;
2372        let collections = self.collections.lock().expect("lock poisoned");
2373        let state = collections.get(&id).ok_or(CollectionMissing(id))?;
2374        Ok(state.time_dependence.clone())
2375    }
2376
2377    fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
2378        // Destructure `self` here so we don't forget to consider dumping newly added fields.
2379        let Self {
2380            envd_epoch,
2381            read_only,
2382            finalizable_shards,
2383            finalized_shards,
2384            collections,
2385            txns_read: _,
2386            config,
2387            initial_txn_upper,
2388            persist_location,
2389            persist: _,
2390            cmd_tx: _,
2391            holds_tx: _,
2392            _background_task: _,
2393            _finalize_shards_task: _,
2394        } = self;
2395
2396        let finalizable_shards: Vec<_> = finalizable_shards
2397            .lock()
2398            .iter()
2399            .map(ToString::to_string)
2400            .collect();
2401        let finalized_shards: Vec<_> = finalized_shards
2402            .lock()
2403            .iter()
2404            .map(ToString::to_string)
2405            .collect();
2406        let collections: BTreeMap<_, _> = collections
2407            .lock()
2408            .expect("poisoned")
2409            .iter()
2410            .map(|(id, c)| (id.to_string(), format!("{c:?}")))
2411            .collect();
2412        let config = format!("{:?}", config.lock().expect("poisoned"));
2413
2414        Ok(serde_json::json!({
2415            "envd_epoch": envd_epoch,
2416            "read_only": read_only,
2417            "finalizable_shards": finalizable_shards,
2418            "finalized_shards": finalized_shards,
2419            "collections": collections,
2420            "config": config,
2421            "initial_txn_upper": initial_txn_upper,
2422            "persist_location": format!("{persist_location:?}"),
2423        }))
2424    }
2425}
2426
2427/// Wraps either a "critical" [SinceHandle] or a leased [ReadHandle].
2428///
2429/// When a [StorageCollections] is in read-only mode, we will only ever acquire
2430/// [ReadHandle], because acquiring the [SinceHandle] and driving forward its
2431/// since is considered a write. Conversely, when in read-write mode, we acquire
2432/// [SinceHandle].
2433#[derive(Debug)]
2434enum SinceHandleWrapper<T>
2435where
2436    T: TimelyTimestamp + Lattice + Codec64,
2437{
2438    Critical(SinceHandle<SourceData, (), T, StorageDiff, PersistEpoch>),
2439    Leased(ReadHandle<SourceData, (), T, StorageDiff>),
2440}
2441
2442impl<T> SinceHandleWrapper<T>
2443where
2444    T: TimelyTimestamp + Lattice + Codec64 + TotalOrder + Sync,
2445{
2446    pub fn since(&self) -> &Antichain<T> {
2447        match self {
2448            Self::Critical(handle) => handle.since(),
2449            Self::Leased(handle) => handle.since(),
2450        }
2451    }
2452
2453    pub fn opaque(&self) -> PersistEpoch {
2454        match self {
2455            Self::Critical(handle) => handle.opaque().clone(),
2456            Self::Leased(_handle) => {
2457                // The opaque is expected to be used with
2458                // `compare_and_downgrade_since`, and the leased handle doesn't
2459                // have a notion of an opaque. We pretend here and in
2460                // `compare_and_downgrade_since`.
2461                PersistEpoch(None)
2462            }
2463        }
2464    }
2465
2466    pub async fn compare_and_downgrade_since(
2467        &mut self,
2468        expected: &PersistEpoch,
2469        new: (&PersistEpoch, &Antichain<T>),
2470    ) -> Result<Antichain<T>, PersistEpoch> {
2471        match self {
2472            Self::Critical(handle) => handle.compare_and_downgrade_since(expected, new).await,
2473            Self::Leased(handle) => {
2474                let (opaque, since) = new;
2475                assert_none!(opaque.0);
2476
2477                handle.downgrade_since(since).await;
2478
2479                Ok(since.clone())
2480            }
2481        }
2482    }
2483
2484    pub async fn maybe_compare_and_downgrade_since(
2485        &mut self,
2486        expected: &PersistEpoch,
2487        new: (&PersistEpoch, &Antichain<T>),
2488    ) -> Option<Result<Antichain<T>, PersistEpoch>> {
2489        match self {
2490            Self::Critical(handle) => {
2491                handle
2492                    .maybe_compare_and_downgrade_since(expected, new)
2493                    .await
2494            }
2495            Self::Leased(handle) => {
2496                let (opaque, since) = new;
2497                assert_none!(opaque.0);
2498
2499                handle.maybe_downgrade_since(since).await;
2500
2501                Some(Ok(since.clone()))
2502            }
2503        }
2504    }
2505
2506    pub fn snapshot_stats(
2507        &self,
2508        id: GlobalId,
2509        as_of: Option<Antichain<T>>,
2510    ) -> BoxFuture<'static, Result<SnapshotStats, StorageError<T>>> {
2511        match self {
2512            Self::Critical(handle) => {
2513                let res = handle
2514                    .snapshot_stats(as_of)
2515                    .map(move |x| x.map_err(|_| StorageError::ReadBeforeSince(id)));
2516                Box::pin(res)
2517            }
2518            Self::Leased(handle) => {
2519                let res = handle
2520                    .snapshot_stats(as_of)
2521                    .map(move |x| x.map_err(|_| StorageError::ReadBeforeSince(id)));
2522                Box::pin(res)
2523            }
2524        }
2525    }
2526
2527    pub fn snapshot_stats_from_txn(
2528        &self,
2529        id: GlobalId,
2530        data_snapshot: DataSnapshot<T>,
2531    ) -> BoxFuture<'static, Result<SnapshotStats, StorageError<T>>> {
2532        match self {
2533            Self::Critical(handle) => Box::pin(
2534                data_snapshot
2535                    .snapshot_stats_from_critical(handle)
2536                    .map(move |x| x.map_err(|_| StorageError::ReadBeforeSince(id))),
2537            ),
2538            Self::Leased(handle) => Box::pin(
2539                data_snapshot
2540                    .snapshot_stats_from_leased(handle)
2541                    .map(move |x| x.map_err(|_| StorageError::ReadBeforeSince(id))),
2542            ),
2543        }
2544    }
2545}
2546
2547/// State maintained about individual collections.
2548#[derive(Debug, Clone)]
2549struct CollectionState<T> {
2550    /// The primary of this collections.
2551    ///
2552    /// Multiple storage collections can point to the same persist shard,
2553    /// possibly with different schemas. In such a configuration, we select one
2554    /// of the involved collections as the primary, who "owns" the persist
2555    /// shard. All other involved collections have a dependency on the primary.
2556    primary: Option<GlobalId>,
2557
2558    /// Description of how this collection's frontier follows time.
2559    time_dependence: Option<TimeDependence>,
2560    /// The ID of the source remap/progress collection, if this is an ingestion.
2561    ingestion_remap_collection_id: Option<GlobalId>,
2562
2563    /// Accumulation of read capabilities for the collection.
2564    ///
2565    /// This accumulation will always contain `self.implied_capability`, but may
2566    /// also contain capabilities held by others who have read dependencies on
2567    /// this collection.
2568    pub read_capabilities: MutableAntichain<T>,
2569
2570    /// The implicit capability associated with collection creation.  This
2571    /// should never be less than the since of the associated persist
2572    /// collection.
2573    pub implied_capability: Antichain<T>,
2574
2575    /// The policy to use to downgrade `self.implied_capability`.
2576    pub read_policy: ReadPolicy<T>,
2577
2578    /// Storage identifiers on which this collection depends.
2579    pub storage_dependencies: Vec<GlobalId>,
2580
2581    /// Reported write frontier.
2582    pub write_frontier: Antichain<T>,
2583
2584    pub collection_metadata: CollectionMetadata,
2585}
2586
2587impl<T: TimelyTimestamp> CollectionState<T> {
2588    /// Creates a new collection state, with an initial read policy valid from
2589    /// `since`.
2590    pub fn new(
2591        primary: Option<GlobalId>,
2592        time_dependence: Option<TimeDependence>,
2593        ingestion_remap_collection_id: Option<GlobalId>,
2594        since: Antichain<T>,
2595        write_frontier: Antichain<T>,
2596        storage_dependencies: Vec<GlobalId>,
2597        metadata: CollectionMetadata,
2598    ) -> Self {
2599        let mut read_capabilities = MutableAntichain::new();
2600        read_capabilities.update_iter(since.iter().map(|time| (time.clone(), 1)));
2601        Self {
2602            primary,
2603            time_dependence,
2604            ingestion_remap_collection_id,
2605            read_capabilities,
2606            implied_capability: since.clone(),
2607            read_policy: ReadPolicy::NoPolicy {
2608                initial_since: since,
2609            },
2610            storage_dependencies,
2611            write_frontier,
2612            collection_metadata: metadata,
2613        }
2614    }
2615
2616    /// Returns whether the collection was dropped.
2617    pub fn is_dropped(&self) -> bool {
2618        self.read_capabilities.is_empty()
2619    }
2620}
2621
2622/// A task that keeps persist handles, downgrades sinces when asked,
2623/// periodically gets recent uppers from them, and updates the shard collection
2624/// state when needed.
2625///
2626/// This shares state with [StorageCollectionsImpl] via `Arcs` and channels.
2627#[derive(Debug)]
2628struct BackgroundTask<T: TimelyTimestamp + Lattice + Codec64> {
2629    config: Arc<Mutex<StorageConfiguration>>,
2630    cmds_tx: mpsc::UnboundedSender<BackgroundCmd<T>>,
2631    cmds_rx: mpsc::UnboundedReceiver<BackgroundCmd<T>>,
2632    holds_rx: mpsc::UnboundedReceiver<(GlobalId, ChangeBatch<T>)>,
2633    finalizable_shards: Arc<ShardIdSet>,
2634    collections: Arc<std::sync::Mutex<BTreeMap<GlobalId, CollectionState<T>>>>,
2635    // So we know what shard ID corresponds to what global ID, which we need
2636    // when re-enqueing futures for determining the next upper update.
2637    shard_by_id: BTreeMap<GlobalId, ShardId>,
2638    since_handles: BTreeMap<GlobalId, SinceHandleWrapper<T>>,
2639    txns_handle: Option<WriteHandle<SourceData, (), T, StorageDiff>>,
2640    txns_shards: BTreeSet<GlobalId>,
2641}
2642
2643#[derive(Debug)]
2644enum BackgroundCmd<T: TimelyTimestamp + Lattice + Codec64> {
2645    Register {
2646        id: GlobalId,
2647        is_in_txns: bool,
2648        write_handle: WriteHandle<SourceData, (), T, StorageDiff>,
2649        since_handle: SinceHandleWrapper<T>,
2650    },
2651    DowngradeSince(Vec<(GlobalId, Antichain<T>)>),
2652    SnapshotStats(
2653        GlobalId,
2654        SnapshotStatsAsOf<T>,
2655        oneshot::Sender<SnapshotStatsRes<T>>,
2656    ),
2657}
2658
2659/// A newtype wrapper to hang a Debug impl off of.
2660pub(crate) struct SnapshotStatsRes<T>(BoxFuture<'static, Result<SnapshotStats, StorageError<T>>>);
2661
2662impl<T> Debug for SnapshotStatsRes<T> {
2663    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2664        f.debug_struct("SnapshotStatsRes").finish_non_exhaustive()
2665    }
2666}
2667
2668impl<T> BackgroundTask<T>
2669where
2670    T: TimelyTimestamp
2671        + Lattice
2672        + Codec64
2673        + From<EpochMillis>
2674        + TimestampManipulation
2675        + Into<mz_repr::Timestamp>
2676        + Sync,
2677{
2678    async fn run(&mut self) {
2679        // Futures that fetch the recent upper from all other shards.
2680        let mut upper_futures: FuturesUnordered<
2681            std::pin::Pin<
2682                Box<
2683                    dyn Future<
2684                            Output = (
2685                                GlobalId,
2686                                WriteHandle<SourceData, (), T, StorageDiff>,
2687                                Antichain<T>,
2688                            ),
2689                        > + Send,
2690                >,
2691            >,
2692        > = FuturesUnordered::new();
2693
2694        let gen_upper_future =
2695            |id, mut handle: WriteHandle<_, _, _, _>, prev_upper: Antichain<T>| {
2696                let fut = async move {
2697                    soft_assert_or_log!(
2698                        !prev_upper.is_empty(),
2699                        "cannot await progress when upper is already empty"
2700                    );
2701                    handle.wait_for_upper_past(&prev_upper).await;
2702                    let new_upper = handle.shared_upper();
2703                    (id, handle, new_upper)
2704                };
2705
2706                fut
2707            };
2708
2709        let mut txns_upper_future = match self.txns_handle.take() {
2710            Some(txns_handle) => {
2711                let upper = txns_handle.upper().clone();
2712                let txns_upper_future =
2713                    gen_upper_future(GlobalId::Transient(1), txns_handle, upper);
2714                txns_upper_future.boxed()
2715            }
2716            None => async { std::future::pending().await }.boxed(),
2717        };
2718
2719        loop {
2720            tokio::select! {
2721                (id, handle, upper) = &mut txns_upper_future => {
2722                    trace!("new upper from txns shard: {:?}", upper);
2723                    let mut uppers = Vec::new();
2724                    for id in self.txns_shards.iter() {
2725                        uppers.push((*id, &upper));
2726                    }
2727                    self.update_write_frontiers(&uppers).await;
2728
2729                    let fut = gen_upper_future(id, handle, upper);
2730                    txns_upper_future = fut.boxed();
2731                }
2732                Some((id, handle, upper)) = upper_futures.next() => {
2733                    if id.is_user() {
2734                        trace!("new upper for collection {id}: {:?}", upper);
2735                    }
2736                    let current_shard = self.shard_by_id.get(&id);
2737                    if let Some(shard_id) = current_shard {
2738                        if shard_id == &handle.shard_id() {
2739                            // Still current, so process the update and enqueue
2740                            // again!
2741                            let uppers = &[(id, &upper)];
2742                            self.update_write_frontiers(uppers).await;
2743                            if !upper.is_empty() {
2744                                let fut = gen_upper_future(id, handle, upper);
2745                                upper_futures.push(fut.boxed());
2746                            }
2747                        } else {
2748                            // Be polite and expire the write handle. This can
2749                            // happen when we get an upper update for a write
2750                            // handle that has since been replaced via Update.
2751                            handle.expire().await;
2752                        }
2753                    }
2754                }
2755                cmd = self.cmds_rx.recv() => {
2756                    let cmd = if let Some(cmd) = cmd {
2757                        cmd
2758                    } else {
2759                        // We're done!
2760                        break;
2761                    };
2762
2763                    match cmd {
2764                        BackgroundCmd::Register{ id, is_in_txns, write_handle, since_handle } => {
2765                            debug!("registering handles for {}", id);
2766                            let previous = self.shard_by_id.insert(id, write_handle.shard_id());
2767                            if previous.is_some() {
2768                                panic!("already registered a WriteHandle for collection {id}");
2769                            }
2770
2771                            let previous = self.since_handles.insert(id, since_handle);
2772                            if previous.is_some() {
2773                                panic!("already registered a SinceHandle for collection {id}");
2774                            }
2775
2776                            if is_in_txns {
2777                                self.txns_shards.insert(id);
2778                            } else {
2779                                let upper = write_handle.upper().clone();
2780                                if !upper.is_empty() {
2781                                    let fut = gen_upper_future(id, write_handle, upper);
2782                                    upper_futures.push(fut.boxed());
2783                                }
2784                            }
2785
2786                        }
2787                        BackgroundCmd::DowngradeSince(cmds) => {
2788                            self.downgrade_sinces(cmds).await;
2789                        }
2790                        BackgroundCmd::SnapshotStats(id, as_of, tx) => {
2791                            // NB: The requested as_of could be arbitrarily far
2792                            // in the future. So, in order to avoid blocking
2793                            // this loop until it's available and the
2794                            // `snapshot_stats` call resolves, instead return
2795                            // the future to the caller and await it there.
2796                            let res = match self.since_handles.get(&id) {
2797                                Some(x) => {
2798                                    let fut: BoxFuture<
2799                                        'static,
2800                                        Result<SnapshotStats, StorageError<T>>,
2801                                    > = match as_of {
2802                                        SnapshotStatsAsOf::Direct(as_of) => {
2803                                            x.snapshot_stats(id, Some(as_of))
2804                                        }
2805                                        SnapshotStatsAsOf::Txns(data_snapshot) => {
2806                                            x.snapshot_stats_from_txn(id, data_snapshot)
2807                                        }
2808                                    };
2809                                    SnapshotStatsRes(fut)
2810                                }
2811                                None => SnapshotStatsRes(Box::pin(futures::future::ready(Err(
2812                                    StorageError::IdentifierMissing(id),
2813                                )))),
2814                            };
2815                            // It's fine if the listener hung up.
2816                            let _ = tx.send(res);
2817                        }
2818                    }
2819                }
2820                Some(holds_changes) = self.holds_rx.recv() => {
2821                    let mut batched_changes = BTreeMap::new();
2822                    batched_changes.insert(holds_changes.0, holds_changes.1);
2823
2824                    while let Ok(mut holds_changes) = self.holds_rx.try_recv() {
2825                        let entry = batched_changes.entry(holds_changes.0);
2826                        entry
2827                            .and_modify(|existing| existing.extend(holds_changes.1.drain()))
2828                            .or_insert_with(|| holds_changes.1);
2829                    }
2830
2831                    let mut collections = self.collections.lock().expect("lock poisoned");
2832
2833                    let user_changes = batched_changes
2834                        .iter()
2835                        .filter(|(id, _c)| id.is_user())
2836                        .map(|(id, c)| {
2837                            (id.clone(), c.clone())
2838                        })
2839                        .collect_vec();
2840
2841                    if !user_changes.is_empty() {
2842                        trace!(?user_changes, "applying holds changes from channel");
2843                    }
2844
2845                    StorageCollectionsImpl::update_read_capabilities_inner(
2846                        &self.cmds_tx,
2847                        &mut collections,
2848                        &mut batched_changes,
2849                    );
2850                }
2851            }
2852        }
2853
2854        warn!("BackgroundTask shutting down");
2855    }
2856
2857    #[instrument(level = "debug")]
2858    async fn update_write_frontiers(&self, updates: &[(GlobalId, &Antichain<T>)]) {
2859        let mut read_capability_changes = BTreeMap::default();
2860
2861        let mut self_collections = self.collections.lock().expect("lock poisoned");
2862
2863        for (id, new_upper) in updates.iter() {
2864            let collection = if let Some(c) = self_collections.get_mut(id) {
2865                c
2866            } else {
2867                trace!(
2868                    "Reference to absent collection {id}, due to concurrent removal of that collection"
2869                );
2870                continue;
2871            };
2872
2873            if PartialOrder::less_than(&collection.write_frontier, *new_upper) {
2874                collection.write_frontier.clone_from(new_upper);
2875            }
2876
2877            let mut new_read_capability = collection
2878                .read_policy
2879                .frontier(collection.write_frontier.borrow());
2880
2881            if id.is_user() {
2882                trace!(
2883                    %id,
2884                    implied_capability = ?collection.implied_capability,
2885                    policy = ?collection.read_policy,
2886                    write_frontier = ?collection.write_frontier,
2887                    ?new_read_capability,
2888                    "update_write_frontiers");
2889            }
2890
2891            if PartialOrder::less_equal(&collection.implied_capability, &new_read_capability) {
2892                let mut update = ChangeBatch::new();
2893                update.extend(new_read_capability.iter().map(|time| (time.clone(), 1)));
2894                std::mem::swap(&mut collection.implied_capability, &mut new_read_capability);
2895                update.extend(new_read_capability.iter().map(|time| (time.clone(), -1)));
2896
2897                if !update.is_empty() {
2898                    read_capability_changes.insert(*id, update);
2899                }
2900            }
2901        }
2902
2903        if !read_capability_changes.is_empty() {
2904            StorageCollectionsImpl::update_read_capabilities_inner(
2905                &self.cmds_tx,
2906                &mut self_collections,
2907                &mut read_capability_changes,
2908            );
2909        }
2910    }
2911
2912    async fn downgrade_sinces(&mut self, cmds: Vec<(GlobalId, Antichain<T>)>) {
2913        for (id, new_since) in cmds {
2914            let since_handle = if let Some(c) = self.since_handles.get_mut(&id) {
2915                c
2916            } else {
2917                // This can happen when someone concurrently drops a collection.
2918                trace!("downgrade_sinces: reference to absent collection {id}");
2919                continue;
2920            };
2921
2922            if id.is_user() {
2923                trace!("downgrading since of {} to {:?}", id, new_since);
2924            }
2925
2926            let epoch = since_handle.opaque().clone();
2927            let result = if new_since.is_empty() {
2928                // A shard's since reaching the empty frontier is a prereq for
2929                // being able to finalize a shard, so the final downgrade should
2930                // never be rate-limited.
2931                let res = Some(
2932                    since_handle
2933                        .compare_and_downgrade_since(&epoch, (&epoch, &new_since))
2934                        .await,
2935                );
2936
2937                info!(%id, "removing persist handles because the since advanced to []!");
2938
2939                let _since_handle = self.since_handles.remove(&id).expect("known to exist");
2940                let dropped_shard_id = if let Some(shard_id) = self.shard_by_id.remove(&id) {
2941                    shard_id
2942                } else {
2943                    panic!("missing GlobalId -> ShardId mapping for id {id}");
2944                };
2945
2946                // We're not responsible for writes to tables, so we also don't
2947                // de-register them from the txn system. Whoever is responsible
2948                // will remove them. We only make sure to remove the table from
2949                // our tracking.
2950                self.txns_shards.remove(&id);
2951
2952                if !self
2953                    .config
2954                    .lock()
2955                    .expect("lock poisoned")
2956                    .parameters
2957                    .finalize_shards
2958                {
2959                    info!(
2960                        "not triggering shard finalization due to dropped storage object because enable_storage_shard_finalization parameter is false"
2961                    );
2962                    return;
2963                }
2964
2965                info!(%id, %dropped_shard_id, "enqueing shard finalization due to dropped collection and dropped persist handle");
2966
2967                self.finalizable_shards.lock().insert(dropped_shard_id);
2968
2969                res
2970            } else {
2971                since_handle
2972                    .maybe_compare_and_downgrade_since(&epoch, (&epoch, &new_since))
2973                    .await
2974            };
2975
2976            if let Some(Err(other_epoch)) = result {
2977                mz_ore::halt!("fenced by envd @ {other_epoch:?}. ours = {epoch:?}");
2978            }
2979        }
2980    }
2981}
2982
2983struct FinalizeShardsTaskConfig {
2984    envd_epoch: NonZeroI64,
2985    config: Arc<Mutex<StorageConfiguration>>,
2986    metrics: StorageCollectionsMetrics,
2987    finalizable_shards: Arc<ShardIdSet>,
2988    finalized_shards: Arc<ShardIdSet>,
2989    persist_location: PersistLocation,
2990    persist: Arc<PersistClientCache>,
2991    read_only: bool,
2992}
2993
2994async fn finalize_shards_task<T>(
2995    FinalizeShardsTaskConfig {
2996        envd_epoch,
2997        config,
2998        metrics,
2999        finalizable_shards,
3000        finalized_shards,
3001        persist_location,
3002        persist,
3003        read_only,
3004    }: FinalizeShardsTaskConfig,
3005) where
3006    T: TimelyTimestamp + TotalOrder + Lattice + Codec64 + Sync,
3007{
3008    if read_only {
3009        info!("disabling shard finalization in read only mode");
3010        return;
3011    }
3012
3013    let mut interval = tokio::time::interval(Duration::from_secs(5));
3014    interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
3015    loop {
3016        interval.tick().await;
3017
3018        if !config
3019            .lock()
3020            .expect("lock poisoned")
3021            .parameters
3022            .finalize_shards
3023        {
3024            debug!(
3025                "not triggering shard finalization due to dropped storage object because enable_storage_shard_finalization parameter is false"
3026            );
3027            continue;
3028        }
3029
3030        let current_finalizable_shards = {
3031            // We hold the lock for as short as possible and pull our cloned set
3032            // of shards.
3033            finalizable_shards.lock().iter().cloned().collect_vec()
3034        };
3035
3036        if current_finalizable_shards.is_empty() {
3037            debug!("no shards to finalize");
3038            continue;
3039        }
3040
3041        debug!(?current_finalizable_shards, "attempting to finalize shards");
3042
3043        // Open a persist client to delete unused shards.
3044        let persist_client = persist.open(persist_location.clone()).await.unwrap();
3045
3046        let metrics = &metrics;
3047        let finalizable_shards = &finalizable_shards;
3048        let finalized_shards = &finalized_shards;
3049        let persist_client = &persist_client;
3050        let diagnostics = &Diagnostics::from_purpose("finalizing shards");
3051
3052        let force_downgrade_since = STORAGE_DOWNGRADE_SINCE_DURING_FINALIZATION
3053            .get(config.lock().expect("lock poisoned").config_set());
3054
3055        let epoch = &PersistEpoch::from(envd_epoch);
3056
3057        futures::stream::iter(current_finalizable_shards.clone())
3058            .map(|shard_id| async move {
3059                let persist_client = persist_client.clone();
3060                let diagnostics = diagnostics.clone();
3061                let epoch = epoch.clone();
3062
3063                metrics.finalization_started.inc();
3064
3065                let is_finalized = persist_client
3066                    .is_finalized::<SourceData, (), T, StorageDiff>(shard_id, diagnostics)
3067                    .await
3068                    .expect("invalid persist usage");
3069
3070                if is_finalized {
3071                    debug!(%shard_id, "shard is already finalized!");
3072                    Some(shard_id)
3073                } else {
3074                    debug!(%shard_id, "finalizing shard");
3075                    let finalize = || async move {
3076                        // TODO: thread the global ID into the shard finalization WAL
3077                        let diagnostics = Diagnostics::from_purpose("finalizing shards");
3078
3079                        // We only use the writer to advance the upper, so using a dummy schema is
3080                        // fine.
3081                        let mut write_handle: WriteHandle<SourceData, (), T, StorageDiff> =
3082                            persist_client
3083                                .open_writer(
3084                                    shard_id,
3085                                    Arc::new(RelationDesc::empty()),
3086                                    Arc::new(UnitSchema),
3087                                    diagnostics,
3088                                )
3089                                .await
3090                                .expect("invalid persist usage");
3091                        write_handle.advance_upper(&Antichain::new()).await;
3092                        write_handle.expire().await;
3093
3094                        if force_downgrade_since {
3095                            let mut since_handle: SinceHandle<
3096                                SourceData,
3097                                (),
3098                                T,
3099                                StorageDiff,
3100                                PersistEpoch,
3101                            > = persist_client
3102                                .open_critical_since(
3103                                    shard_id,
3104                                    PersistClient::CONTROLLER_CRITICAL_SINCE,
3105                                    Diagnostics::from_purpose("finalizing shards"),
3106                                )
3107                                .await
3108                                .expect("invalid persist usage");
3109                            let handle_epoch = since_handle.opaque().clone();
3110                            let our_epoch = epoch.clone();
3111                            let epoch = if our_epoch.0 > handle_epoch.0 {
3112                                // We're newer, but it's fine to use the
3113                                // handle's old epoch to try and downgrade.
3114                                handle_epoch
3115                            } else {
3116                                // Good luck, buddy! The downgrade below will
3117                                // not succeed. There's a process with a newer
3118                                // epoch out there and someone at some juncture
3119                                // will fence out this process.
3120                                our_epoch
3121                            };
3122                            let new_since = Antichain::new();
3123                            let downgrade = since_handle
3124                                .compare_and_downgrade_since(&epoch, (&epoch, &new_since))
3125                                .await;
3126                            if let Err(e) = downgrade {
3127                                warn!("tried to finalize a shard with an advancing epoch: {e:?}");
3128                                return Ok(());
3129                            }
3130                            // Not available now, so finalization is broken.
3131                            // since_handle.expire().await;
3132                        }
3133
3134                        persist_client
3135                            .finalize_shard::<SourceData, (), T, StorageDiff>(
3136                                shard_id,
3137                                Diagnostics::from_purpose("finalizing shards"),
3138                            )
3139                            .await
3140                    };
3141
3142                    match finalize().await {
3143                        Err(e) => {
3144                            // Rather than error, just leave this shard as
3145                            // one to finalize later.
3146                            warn!("error during finalization of shard {shard_id}: {e:?}");
3147                            None
3148                        }
3149                        Ok(()) => {
3150                            debug!(%shard_id, "finalize success!");
3151                            Some(shard_id)
3152                        }
3153                    }
3154                }
3155            })
3156            // Poll each future for each collection concurrently, maximum of 10
3157            // at a time.
3158            // TODO(benesch): the concurrency here should be configurable
3159            // via LaunchDarkly.
3160            .buffer_unordered(10)
3161            // HERE BE DRAGONS: see warning on other uses of buffer_unordered.
3162            // The closure passed to `for_each` must remain fast or we risk
3163            // starving the finalization futures of calls to `poll`.
3164            .for_each(|shard_id| async move {
3165                match shard_id {
3166                    None => metrics.finalization_failed.inc(),
3167                    Some(shard_id) => {
3168                        // We make successfully finalized shards available for
3169                        // removal from the finalization WAL one by one, so that
3170                        // a handful of stuck shards don't prevent us from
3171                        // removing the shards that have made progress. The
3172                        // overhead of repeatedly acquiring and releasing the
3173                        // locks is negligible.
3174                        {
3175                            let mut finalizable_shards = finalizable_shards.lock();
3176                            let mut finalized_shards = finalized_shards.lock();
3177                            finalizable_shards.remove(&shard_id);
3178                            finalized_shards.insert(shard_id);
3179                        }
3180
3181                        metrics.finalization_succeeded.inc();
3182                    }
3183                }
3184            })
3185            .await;
3186
3187        debug!("done finalizing shards");
3188    }
3189}
3190
3191#[derive(Debug)]
3192pub(crate) enum SnapshotStatsAsOf<T: TimelyTimestamp + Lattice + Codec64> {
3193    /// Stats for a shard with an "eager" upper (one that continually advances
3194    /// as time passes, even if no writes are coming in).
3195    Direct(Antichain<T>),
3196    /// Stats for a shard with a "lazy" upper (one that only physically advances
3197    /// in response to writes).
3198    Txns(DataSnapshot<T>),
3199}
3200
3201#[cfg(test)]
3202mod tests {
3203    use std::str::FromStr;
3204    use std::sync::Arc;
3205
3206    use mz_build_info::DUMMY_BUILD_INFO;
3207    use mz_dyncfg::ConfigSet;
3208    use mz_ore::assert_err;
3209    use mz_ore::metrics::{MetricsRegistry, UIntGauge};
3210    use mz_ore::now::SYSTEM_TIME;
3211    use mz_ore::url::SensitiveUrl;
3212    use mz_persist_client::cache::PersistClientCache;
3213    use mz_persist_client::cfg::PersistConfig;
3214    use mz_persist_client::rpc::PubSubClientConnection;
3215    use mz_persist_client::{Diagnostics, PersistClient, PersistLocation, ShardId};
3216    use mz_persist_types::codec_impls::UnitSchema;
3217    use mz_repr::{RelationDesc, Row};
3218    use mz_secrets::InMemorySecretsController;
3219
3220    use super::*;
3221
3222    #[mz_ore::test(tokio::test)]
3223    #[cfg_attr(miri, ignore)] // unsupported operation: integer-to-pointer casts and `ptr::from_exposed_addr`
3224    async fn test_snapshot_stats(&self) {
3225        let persist_location = PersistLocation {
3226            blob_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
3227            consensus_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
3228        };
3229        let persist_client = PersistClientCache::new(
3230            PersistConfig::new_default_configs(&DUMMY_BUILD_INFO, SYSTEM_TIME.clone()),
3231            &MetricsRegistry::new(),
3232            |_, _| PubSubClientConnection::noop(),
3233        );
3234        let persist_client = Arc::new(persist_client);
3235
3236        let (cmds_tx, mut background_task) =
3237            BackgroundTask::new_for_test(persist_location.clone(), Arc::clone(&persist_client));
3238        let background_task =
3239            mz_ore::task::spawn(|| "storage_collections::background_task", async move {
3240                background_task.run().await
3241            });
3242
3243        let persist = persist_client.open(persist_location).await.unwrap();
3244
3245        let shard_id = ShardId::new();
3246        let since_handle = persist
3247            .open_critical_since(
3248                shard_id,
3249                PersistClient::CONTROLLER_CRITICAL_SINCE,
3250                Diagnostics::for_tests(),
3251            )
3252            .await
3253            .unwrap();
3254        let write_handle = persist
3255            .open_writer::<SourceData, (), mz_repr::Timestamp, StorageDiff>(
3256                shard_id,
3257                Arc::new(RelationDesc::empty()),
3258                Arc::new(UnitSchema),
3259                Diagnostics::for_tests(),
3260            )
3261            .await
3262            .unwrap();
3263
3264        cmds_tx
3265            .send(BackgroundCmd::Register {
3266                id: GlobalId::User(1),
3267                is_in_txns: false,
3268                since_handle: SinceHandleWrapper::Critical(since_handle),
3269                write_handle,
3270            })
3271            .unwrap();
3272
3273        let mut write_handle = persist
3274            .open_writer::<SourceData, (), mz_repr::Timestamp, StorageDiff>(
3275                shard_id,
3276                Arc::new(RelationDesc::empty()),
3277                Arc::new(UnitSchema),
3278                Diagnostics::for_tests(),
3279            )
3280            .await
3281            .unwrap();
3282
3283        // No stats for unknown GlobalId.
3284        let stats =
3285            snapshot_stats(&cmds_tx, GlobalId::User(2), Antichain::from_elem(0.into())).await;
3286        assert_err!(stats);
3287
3288        // Stats don't resolve for as_of past the upper.
3289        let stats_fut = snapshot_stats(&cmds_tx, GlobalId::User(1), Antichain::from_elem(1.into()));
3290        assert_none!(stats_fut.now_or_never());
3291
3292        // // Call it again because now_or_never consumed our future and it's not clone-able.
3293        let stats_ts1_fut =
3294            snapshot_stats(&cmds_tx, GlobalId::User(1), Antichain::from_elem(1.into()));
3295
3296        // Write some data.
3297        let data = (
3298            (SourceData(Ok(Row::default())), ()),
3299            mz_repr::Timestamp::from(0),
3300            1i64,
3301        );
3302        let () = write_handle
3303            .compare_and_append(
3304                &[data],
3305                Antichain::from_elem(0.into()),
3306                Antichain::from_elem(1.into()),
3307            )
3308            .await
3309            .unwrap()
3310            .unwrap();
3311
3312        // Verify that we can resolve stats for ts 0 while the ts 1 stats call is outstanding.
3313        let stats = snapshot_stats(&cmds_tx, GlobalId::User(1), Antichain::from_elem(0.into()))
3314            .await
3315            .unwrap();
3316        assert_eq!(stats.num_updates, 1);
3317
3318        // Write more data and unblock the ts 1 call
3319        let data = (
3320            (SourceData(Ok(Row::default())), ()),
3321            mz_repr::Timestamp::from(1),
3322            1i64,
3323        );
3324        let () = write_handle
3325            .compare_and_append(
3326                &[data],
3327                Antichain::from_elem(1.into()),
3328                Antichain::from_elem(2.into()),
3329            )
3330            .await
3331            .unwrap()
3332            .unwrap();
3333
3334        let stats = stats_ts1_fut.await.unwrap();
3335        assert_eq!(stats.num_updates, 2);
3336
3337        // Make sure it runs until at least here.
3338        drop(background_task);
3339    }
3340
3341    async fn snapshot_stats<T: TimelyTimestamp + Lattice + Codec64>(
3342        cmds_tx: &mpsc::UnboundedSender<BackgroundCmd<T>>,
3343        id: GlobalId,
3344        as_of: Antichain<T>,
3345    ) -> Result<SnapshotStats, StorageError<T>> {
3346        let (tx, rx) = oneshot::channel();
3347        cmds_tx
3348            .send(BackgroundCmd::SnapshotStats(
3349                id,
3350                SnapshotStatsAsOf::Direct(as_of),
3351                tx,
3352            ))
3353            .unwrap();
3354        let res = rx.await.expect("BackgroundTask should be live").0;
3355
3356        res.await
3357    }
3358
3359    impl<T: TimelyTimestamp + Lattice + Codec64> BackgroundTask<T> {
3360        fn new_for_test(
3361            _persist_location: PersistLocation,
3362            _persist_client: Arc<PersistClientCache>,
3363        ) -> (mpsc::UnboundedSender<BackgroundCmd<T>>, Self) {
3364            let (cmds_tx, cmds_rx) = mpsc::unbounded_channel();
3365            let (_holds_tx, holds_rx) = mpsc::unbounded_channel();
3366            let connection_context =
3367                ConnectionContext::for_tests(Arc::new(InMemorySecretsController::new()));
3368
3369            let task = Self {
3370                config: Arc::new(Mutex::new(StorageConfiguration::new(
3371                    connection_context,
3372                    ConfigSet::default(),
3373                ))),
3374                cmds_tx: cmds_tx.clone(),
3375                cmds_rx,
3376                holds_rx,
3377                finalizable_shards: Arc::new(ShardIdSet::new(
3378                    UIntGauge::new("finalizable_shards", "dummy gauge for tests").unwrap(),
3379                )),
3380                collections: Arc::new(Mutex::new(BTreeMap::new())),
3381                shard_by_id: BTreeMap::new(),
3382                since_handles: BTreeMap::new(),
3383                txns_handle: None,
3384                txns_shards: BTreeSet::new(),
3385            };
3386
3387            (cmds_tx, task)
3388        }
3389    }
3390}