Skip to main content

mz_storage_client/
storage_collections.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! An abstraction for dealing with storage collections.
11
12use std::cmp::Reverse;
13use std::collections::{BTreeMap, BTreeSet};
14use std::fmt::Debug;
15use std::iter;
16use std::num::NonZeroI64;
17use std::sync::{Arc, Mutex};
18use std::time::Duration;
19
20use async_trait::async_trait;
21use differential_dataflow::lattice::Lattice;
22use futures::future::BoxFuture;
23use futures::stream::{BoxStream, FuturesUnordered};
24use futures::{Future, FutureExt, StreamExt};
25use itertools::Itertools;
26use mz_ore::collections::CollectionExt;
27use mz_ore::metrics::MetricsRegistry;
28use mz_ore::now::{EpochMillis, NowFn};
29use mz_ore::task::AbortOnDropHandle;
30use mz_ore::{assert_none, instrument, soft_assert_or_log};
31use mz_persist_client::cache::PersistClientCache;
32use mz_persist_client::cfg::USE_CRITICAL_SINCE_SNAPSHOT;
33use mz_persist_client::critical::{Opaque, SinceHandle};
34use mz_persist_client::read::{Cursor, ReadHandle};
35use mz_persist_client::schema::CaESchema;
36use mz_persist_client::stats::{SnapshotPartsStats, SnapshotStats};
37use mz_persist_client::write::WriteHandle;
38use mz_persist_client::{Diagnostics, PersistClient, PersistLocation, ShardId};
39use mz_persist_types::Codec64;
40use mz_persist_types::codec_impls::UnitSchema;
41use mz_persist_types::txn::TxnsCodec;
42use mz_repr::{GlobalId, RelationDesc, RelationVersion, Row, TimestampManipulation};
43use mz_storage_types::StorageDiff;
44use mz_storage_types::configuration::StorageConfiguration;
45use mz_storage_types::connections::ConnectionContext;
46use mz_storage_types::controller::{CollectionMetadata, StorageError, TxnsCodecRow};
47use mz_storage_types::dyncfgs::STORAGE_DOWNGRADE_SINCE_DURING_FINALIZATION;
48use mz_storage_types::errors::CollectionMissing;
49use mz_storage_types::parameters::StorageParameters;
50use mz_storage_types::read_holds::ReadHold;
51use mz_storage_types::read_policy::ReadPolicy;
52use mz_storage_types::sources::{GenericSourceConnection, SourceData, SourceEnvelope, Timeline};
53use mz_storage_types::time_dependence::{TimeDependence, TimeDependenceError};
54use mz_txn_wal::metrics::Metrics as TxnMetrics;
55use mz_txn_wal::txn_read::{DataSnapshot, TxnsRead};
56use mz_txn_wal::txns::TxnsHandle;
57use timely::PartialOrder;
58use timely::order::TotalOrder;
59use timely::progress::frontier::MutableAntichain;
60use timely::progress::{Antichain, ChangeBatch, Timestamp as TimelyTimestamp};
61use tokio::sync::{mpsc, oneshot};
62use tokio::time::MissedTickBehavior;
63use tracing::{debug, info, trace, warn};
64
65use crate::client::TimestamplessUpdateBuilder;
66use crate::controller::{
67    CollectionDescription, DataSource, PersistEpoch, StorageMetadata, StorageTxn,
68};
69use crate::storage_collections::metrics::{ShardIdSet, StorageCollectionsMetrics};
70
71mod metrics;
72
73/// An abstraction for keeping track of storage collections and managing access
74/// to them.
75///
76/// Responsibilities:
77///
78/// - Keeps a critical persist handle for holding the since of collections
79///   where it need to be.
80///
81/// - Drives the since forward based on the upper of a collection and a
82///   [ReadPolicy].
83///
84/// - Hands out [ReadHolds](ReadHold) that prevent a collection's since from
85/// advancing while it needs to be read at a specific time.
86#[async_trait]
87pub trait StorageCollections: Debug + Sync {
88    type Timestamp: TimelyTimestamp;
89
90    /// On boot, reconcile this [StorageCollections] with outside state. We get
91    /// a [StorageTxn] where we can record any durable state that we need.
92    ///
93    /// We get `init_ids`, which tells us about all collections that currently
94    /// exist, so that we can record durable state for those that _we_ don't
95    /// know yet about.
96    async fn initialize_state(
97        &self,
98        txn: &mut (dyn StorageTxn<Self::Timestamp> + Send),
99        init_ids: BTreeSet<GlobalId>,
100    ) -> Result<(), StorageError<Self::Timestamp>>;
101
102    /// Update storage configuration with new parameters.
103    fn update_parameters(&self, config_params: StorageParameters);
104
105    /// Returns the [CollectionMetadata] of the collection identified by `id`.
106    fn collection_metadata(&self, id: GlobalId) -> Result<CollectionMetadata, CollectionMissing>;
107
108    /// Acquire an iterator over [CollectionMetadata] for all active
109    /// collections.
110    ///
111    /// A collection is "active" when it has a non empty frontier of read
112    /// capabilties.
113    fn active_collection_metadatas(&self) -> Vec<(GlobalId, CollectionMetadata)>;
114
115    /// Returns the frontiers of the identified collection.
116    fn collection_frontiers(
117        &self,
118        id: GlobalId,
119    ) -> Result<CollectionFrontiers<Self::Timestamp>, CollectionMissing> {
120        let frontiers = self
121            .collections_frontiers(vec![id])?
122            .expect_element(|| "known to exist");
123
124        Ok(frontiers)
125    }
126
127    /// Atomically gets and returns the frontiers of all the identified
128    /// collections.
129    fn collections_frontiers(
130        &self,
131        id: Vec<GlobalId>,
132    ) -> Result<Vec<CollectionFrontiers<Self::Timestamp>>, CollectionMissing>;
133
134    /// Atomically gets and returns the frontiers of all active collections.
135    ///
136    /// A collection is "active" when it has a non-empty frontier of read
137    /// capabilities.
138    fn active_collection_frontiers(&self) -> Vec<CollectionFrontiers<Self::Timestamp>>;
139
140    /// Checks whether a collection exists under the given `GlobalId`. Returns
141    /// an error if the collection does not exist.
142    fn check_exists(&self, id: GlobalId) -> Result<(), StorageError<Self::Timestamp>>;
143
144    /// Returns aggregate statistics about the contents of the local input named
145    /// `id` at `as_of`.
146    async fn snapshot_stats(
147        &self,
148        id: GlobalId,
149        as_of: Antichain<Self::Timestamp>,
150    ) -> Result<SnapshotStats, StorageError<Self::Timestamp>>;
151
152    /// Returns aggregate statistics about the contents of the local input named
153    /// `id` at `as_of`.
154    ///
155    /// Note that this async function itself returns a future. We may
156    /// need to block on the stats being available, but don't want to hold a reference
157    /// to the controller for too long... so the outer future holds a reference to the
158    /// controller but returns quickly, and the inner future is slow but does not
159    /// reference the controller.
160    async fn snapshot_parts_stats(
161        &self,
162        id: GlobalId,
163        as_of: Antichain<Self::Timestamp>,
164    ) -> BoxFuture<'static, Result<SnapshotPartsStats, StorageError<Self::Timestamp>>>;
165
166    /// Returns a snapshot of the contents of collection `id` at `as_of`.
167    fn snapshot(
168        &self,
169        id: GlobalId,
170        as_of: Self::Timestamp,
171    ) -> BoxFuture<'static, Result<Vec<(Row, StorageDiff)>, StorageError<Self::Timestamp>>>;
172
173    /// Returns a snapshot of the contents of collection `id` at the largest
174    /// readable `as_of`.
175    async fn snapshot_latest(
176        &self,
177        id: GlobalId,
178    ) -> Result<Vec<Row>, StorageError<Self::Timestamp>>;
179
180    /// Returns a snapshot of the contents of collection `id` at `as_of`.
181    fn snapshot_cursor(
182        &self,
183        id: GlobalId,
184        as_of: Self::Timestamp,
185    ) -> BoxFuture<'static, Result<SnapshotCursor<Self::Timestamp>, StorageError<Self::Timestamp>>>
186    where
187        Self::Timestamp: Codec64 + TimelyTimestamp + Lattice;
188
189    /// Generates a snapshot of the contents of collection `id` at `as_of` and
190    /// streams out all of the updates in bounded memory.
191    ///
192    /// The output is __not__ consolidated.
193    fn snapshot_and_stream(
194        &self,
195        id: GlobalId,
196        as_of: Self::Timestamp,
197    ) -> BoxFuture<
198        'static,
199        Result<
200            BoxStream<'static, (SourceData, Self::Timestamp, StorageDiff)>,
201            StorageError<Self::Timestamp>,
202        >,
203    >;
204
205    /// Create a [`TimestamplessUpdateBuilder`] that can be used to stage
206    /// updates for the provided [`GlobalId`].
207    fn create_update_builder(
208        &self,
209        id: GlobalId,
210    ) -> BoxFuture<
211        'static,
212        Result<
213            TimestamplessUpdateBuilder<SourceData, (), Self::Timestamp, StorageDiff>,
214            StorageError<Self::Timestamp>,
215        >,
216    >
217    where
218        Self::Timestamp: Lattice + Codec64;
219
220    /// Update the given [`StorageTxn`] with the appropriate metadata given the
221    /// IDs to add and drop.
222    ///
223    /// The data modified in the `StorageTxn` must be made available in all
224    /// subsequent calls that require [`StorageMetadata`] as a parameter.
225    async fn prepare_state(
226        &self,
227        txn: &mut (dyn StorageTxn<Self::Timestamp> + Send),
228        ids_to_add: BTreeSet<GlobalId>,
229        ids_to_drop: BTreeSet<GlobalId>,
230        ids_to_register: BTreeMap<GlobalId, ShardId>,
231    ) -> Result<(), StorageError<Self::Timestamp>>;
232
233    /// Create the collections described by the individual
234    /// [CollectionDescriptions](CollectionDescription).
235    ///
236    /// Each command carries the source id, the source description, and any
237    /// associated metadata needed to ingest the particular source.
238    ///
239    /// This command installs collection state for the indicated sources, and
240    /// they are now valid to use in queries at times beyond the initial `since`
241    /// frontiers. Each collection also acquires a read capability at this
242    /// frontier, which will need to be repeatedly downgraded with
243    /// `allow_compaction()` to permit compaction.
244    ///
245    /// This method is NOT idempotent; It can fail between processing of
246    /// different collections and leave the [StorageCollections] in an
247    /// inconsistent state. It is almost always wrong to do anything but abort
248    /// the process on `Err`.
249    ///
250    /// The `register_ts` is used as the initial timestamp that tables are
251    /// available for reads. (We might later give non-tables the same treatment,
252    /// but hold off on that initially.) Callers must provide a Some if any of
253    /// the collections is a table. A None may be given if none of the
254    /// collections are a table (i.e. all materialized views, sources, etc).
255    ///
256    /// `migrated_storage_collections` is a set of migrated storage collections to be excluded
257    /// from the txn-wal sub-system.
258    async fn create_collections_for_bootstrap(
259        &self,
260        storage_metadata: &StorageMetadata,
261        register_ts: Option<Self::Timestamp>,
262        collections: Vec<(GlobalId, CollectionDescription<Self::Timestamp>)>,
263        migrated_storage_collections: &BTreeSet<GlobalId>,
264    ) -> Result<(), StorageError<Self::Timestamp>>;
265
266    /// Updates the [`RelationDesc`] for the specified table.
267    async fn alter_table_desc(
268        &self,
269        existing_collection: GlobalId,
270        new_collection: GlobalId,
271        new_desc: RelationDesc,
272        expected_version: RelationVersion,
273    ) -> Result<(), StorageError<Self::Timestamp>>;
274
275    /// Drops the read capability for the sources and allows their resources to
276    /// be reclaimed.
277    ///
278    /// TODO(jkosh44): This method does not validate the provided identifiers.
279    /// Currently when the controller starts/restarts it has no durable state.
280    /// That means that it has no way of remembering any past commands sent. In
281    /// the future we plan on persisting state for the controller so that it is
282    /// aware of past commands. Therefore this method is for dropping sources
283    /// that we know to have been previously created, but have been forgotten by
284    /// the controller due to a restart. Once command history becomes durable we
285    /// can remove this method and use the normal `drop_sources`.
286    fn drop_collections_unvalidated(
287        &self,
288        storage_metadata: &StorageMetadata,
289        identifiers: Vec<GlobalId>,
290    );
291
292    /// Assigns a read policy to specific identifiers.
293    ///
294    /// The policies are assigned in the order presented, and repeated
295    /// identifiers should conclude with the last policy. Changing a policy will
296    /// immediately downgrade the read capability if appropriate, but it will
297    /// not "recover" the read capability if the prior capability is already
298    /// ahead of it.
299    ///
300    /// This [StorageCollections] may include its own overrides on these
301    /// policies.
302    ///
303    /// Identifiers not present in `policies` retain their existing read
304    /// policies.
305    fn set_read_policies(&self, policies: Vec<(GlobalId, ReadPolicy<Self::Timestamp>)>);
306
307    /// Acquires and returns the earliest possible read holds for the specified
308    /// collections.
309    fn acquire_read_holds(
310        &self,
311        desired_holds: Vec<GlobalId>,
312    ) -> Result<Vec<ReadHold<Self::Timestamp>>, CollectionMissing>;
313
314    /// Get the time dependence for a storage collection. Returns no value if unknown or if
315    /// the object isn't managed by storage.
316    fn determine_time_dependence(
317        &self,
318        id: GlobalId,
319    ) -> Result<Option<TimeDependence>, TimeDependenceError>;
320
321    /// Returns the state of [`StorageCollections`] formatted as JSON.
322    fn dump(&self) -> Result<serde_json::Value, anyhow::Error>;
323}
324
325/// A cursor over a snapshot, allowing us to read just part of a snapshot in its
326/// consolidated form.
327pub struct SnapshotCursor<T: Codec64 + TimelyTimestamp + Lattice> {
328    // We allocate a temporary read handle for each snapshot, and that handle needs to live at
329    // least as long as the cursor itself, which holds part leases. Bundling them together!
330    pub _read_handle: ReadHandle<SourceData, (), T, StorageDiff>,
331    pub cursor: Cursor<SourceData, (), T, StorageDiff>,
332}
333
334impl<T: Codec64 + TimelyTimestamp + Lattice + Sync> SnapshotCursor<T> {
335    pub async fn next(
336        &mut self,
337    ) -> Option<impl Iterator<Item = (SourceData, T, StorageDiff)> + Sized + '_> {
338        let iter = self.cursor.next().await?;
339        Some(iter.map(|((k, ()), t, d)| (k, t, d)))
340    }
341}
342
343/// Frontiers of the collection identified by `id`.
344#[derive(Debug)]
345pub struct CollectionFrontiers<T> {
346    /// The [GlobalId] of the collection that these frontiers belong to.
347    pub id: GlobalId,
348
349    /// The upper/write frontier of the collection.
350    pub write_frontier: Antichain<T>,
351
352    /// The since frontier that is implied by the collection's existence,
353    /// disregarding any read holds.
354    ///
355    /// Concretely, it is the since frontier that is implied by the combination
356    /// of the `write_frontier` and a [ReadPolicy]. The implied capability is
357    /// derived from the write frontier using the [ReadPolicy].
358    pub implied_capability: Antichain<T>,
359
360    /// The frontier of all oustanding [ReadHolds](ReadHold). This includes the
361    /// implied capability.
362    pub read_capabilities: Antichain<T>,
363}
364
365/// Implementation of [StorageCollections] that is shallow-cloneable and uses a
366/// background task for doing work concurrently, in the background.
367#[derive(Debug, Clone)]
368pub struct StorageCollectionsImpl<
369    T: TimelyTimestamp + Lattice + Codec64 + From<EpochMillis> + TimestampManipulation,
370> {
371    /// The fencing token for this instance of [StorageCollections], and really
372    /// all of the controllers and Coordinator.
373    envd_epoch: NonZeroI64,
374
375    /// Whether or not this [StorageCollections] is in read-only mode.
376    ///
377    /// When in read-only mode, we are not allowed to affect changes to external
378    /// systems, including, for example, acquiring and downgrading critical
379    /// [SinceHandles](SinceHandle)
380    read_only: bool,
381
382    /// The set of [ShardIds](ShardId) that we have to finalize. These will have
383    /// been persisted by the caller of [StorageCollections::prepare_state].
384    finalizable_shards: Arc<ShardIdSet>,
385
386    /// The set of [ShardIds](ShardId) that we have finalized. We keep track of
387    /// shards here until we are given a chance to let our callers know that
388    /// these have been finalized, for example via
389    /// [StorageCollections::prepare_state].
390    finalized_shards: Arc<ShardIdSet>,
391
392    /// Collections maintained by this [StorageCollections].
393    collections: Arc<std::sync::Mutex<BTreeMap<GlobalId, CollectionState<T>>>>,
394
395    /// A shared TxnsCache running in a task and communicated with over a channel.
396    txns_read: TxnsRead<T>,
397
398    /// Storage configuration parameters.
399    config: Arc<Mutex<StorageConfiguration>>,
400
401    /// The upper of the txn shard as it was when we booted. We forward the
402    /// upper of created/registered tables to make sure that their uppers are
403    /// not less than the initially known txn upper.
404    ///
405    /// NOTE: This works around a quirk in how the adapter chooses the as_of of
406    /// existing indexes when bootstrapping, where tables that have an upper
407    /// that is less than the initially known txn upper can lead to indexes that
408    /// cannot hydrate in read-only mode.
409    initial_txn_upper: Antichain<T>,
410
411    /// The persist location where all storage collections are being written to
412    persist_location: PersistLocation,
413
414    /// A persist client used to write to storage collections
415    persist: Arc<PersistClientCache>,
416
417    /// For sending commands to our internal task.
418    cmd_tx: mpsc::UnboundedSender<BackgroundCmd<T>>,
419
420    /// For sending updates about read holds to our internal task.
421    holds_tx: mpsc::UnboundedSender<(GlobalId, ChangeBatch<T>)>,
422
423    /// Handles to tasks we own, making sure they're dropped when we are.
424    _background_task: Arc<AbortOnDropHandle<()>>,
425    _finalize_shards_task: Arc<AbortOnDropHandle<()>>,
426}
427
428// Supporting methods for implementing [StorageCollections].
429//
430// Almost all internal methods that are the backing implementation for a trait
431// method have the `_inner` suffix.
432//
433// We follow a pattern where `_inner` methods get a mutable reference to the
434// shared collections state, and it's the public-facing method that locks the
435/// A boxed stream of source data with timestamps and diffs.
436type SourceDataStream<T> = BoxStream<'static, (SourceData, T, StorageDiff)>;
437
438// state for the duration of its invocation. This allows calling other `_inner`
439// methods from within `_inner` methods.
440impl<T> StorageCollectionsImpl<T>
441where
442    T: TimelyTimestamp
443        + Lattice
444        + Codec64
445        + From<EpochMillis>
446        + TimestampManipulation
447        + Into<mz_repr::Timestamp>
448        + Sync,
449{
450    /// Creates and returns a new [StorageCollections].
451    ///
452    /// Note that when creating a new [StorageCollections], you must also
453    /// reconcile it with the previous state using
454    /// [StorageCollections::initialize_state],
455    /// [StorageCollections::prepare_state], and
456    /// [StorageCollections::create_collections_for_bootstrap].
457    pub async fn new(
458        persist_location: PersistLocation,
459        persist_clients: Arc<PersistClientCache>,
460        metrics_registry: &MetricsRegistry,
461        _now: NowFn,
462        txns_metrics: Arc<TxnMetrics>,
463        envd_epoch: NonZeroI64,
464        read_only: bool,
465        connection_context: ConnectionContext,
466        txn: &dyn StorageTxn<T>,
467    ) -> Self {
468        let metrics = StorageCollectionsMetrics::register_into(metrics_registry);
469
470        // This value must be already installed because we must ensure it's
471        // durably recorded before it is used, otherwise we risk leaking persist
472        // state.
473        let txns_id = txn
474            .get_txn_wal_shard()
475            .expect("must call prepare initialization before creating StorageCollections");
476
477        let txns_client = persist_clients
478            .open(persist_location.clone())
479            .await
480            .expect("location should be valid");
481
482        // We have to initialize, so that TxnsRead::start() below does not
483        // block.
484        let _txns_handle: TxnsHandle<SourceData, (), T, StorageDiff, TxnsCodecRow> =
485            TxnsHandle::open(
486                T::minimum(),
487                txns_client.clone(),
488                txns_client.dyncfgs().clone(),
489                Arc::clone(&txns_metrics),
490                txns_id,
491                Opaque::encode(&PersistEpoch::default()),
492            )
493            .await;
494
495        // For handing to the background task, for listening to upper updates.
496        let (txns_key_schema, txns_val_schema) = TxnsCodecRow::schemas();
497        let mut txns_write = txns_client
498            .open_writer(
499                txns_id,
500                Arc::new(txns_key_schema),
501                Arc::new(txns_val_schema),
502                Diagnostics {
503                    shard_name: "txns".to_owned(),
504                    handle_purpose: "commit txns".to_owned(),
505                },
506            )
507            .await
508            .expect("txns schema shouldn't change");
509
510        let txns_read = TxnsRead::start::<TxnsCodecRow>(txns_client.clone(), txns_id).await;
511
512        let collections = Arc::new(std::sync::Mutex::new(BTreeMap::default()));
513        let finalizable_shards =
514            Arc::new(ShardIdSet::new(metrics.finalization_outstanding.clone()));
515        let finalized_shards =
516            Arc::new(ShardIdSet::new(metrics.finalization_pending_commit.clone()));
517        let config = Arc::new(Mutex::new(StorageConfiguration::new(
518            connection_context,
519            mz_dyncfgs::all_dyncfgs(),
520        )));
521
522        let initial_txn_upper = txns_write.fetch_recent_upper().await.to_owned();
523
524        let (cmd_tx, cmd_rx) = mpsc::unbounded_channel();
525        let (holds_tx, holds_rx) = mpsc::unbounded_channel();
526        let mut background_task = BackgroundTask {
527            config: Arc::clone(&config),
528            cmds_tx: cmd_tx.clone(),
529            cmds_rx: cmd_rx,
530            holds_rx,
531            collections: Arc::clone(&collections),
532            finalizable_shards: Arc::clone(&finalizable_shards),
533            shard_by_id: BTreeMap::new(),
534            since_handles: BTreeMap::new(),
535            txns_handle: Some(txns_write),
536            txns_shards: Default::default(),
537        };
538
539        let background_task =
540            mz_ore::task::spawn(|| "storage_collections::background_task", async move {
541                background_task.run().await
542            });
543
544        let finalize_shards_task = mz_ore::task::spawn(
545            || "storage_collections::finalize_shards_task",
546            finalize_shards_task::<T>(FinalizeShardsTaskConfig {
547                envd_epoch: envd_epoch.clone(),
548                config: Arc::clone(&config),
549                metrics,
550                finalizable_shards: Arc::clone(&finalizable_shards),
551                finalized_shards: Arc::clone(&finalized_shards),
552                persist_location: persist_location.clone(),
553                persist: Arc::clone(&persist_clients),
554                read_only,
555            }),
556        );
557
558        Self {
559            finalizable_shards,
560            finalized_shards,
561            collections,
562            txns_read,
563            envd_epoch,
564            read_only,
565            config,
566            initial_txn_upper,
567            persist_location,
568            persist: persist_clients,
569            cmd_tx,
570            holds_tx,
571            _background_task: Arc::new(background_task.abort_on_drop()),
572            _finalize_shards_task: Arc::new(finalize_shards_task.abort_on_drop()),
573        }
574    }
575
576    /// Opens a [WriteHandle] and a [SinceHandleWrapper], for holding back the since.
577    ///
578    /// `since` is an optional since that the read handle will be forwarded to
579    /// if it is less than its current since.
580    ///
581    /// This will `halt!` the process if we cannot successfully acquire a
582    /// critical handle with our current epoch.
583    async fn open_data_handles(
584        &self,
585        id: &GlobalId,
586        shard: ShardId,
587        since: Option<&Antichain<T>>,
588        relation_desc: RelationDesc,
589        persist_client: &PersistClient,
590    ) -> (
591        WriteHandle<SourceData, (), T, StorageDiff>,
592        SinceHandleWrapper<T>,
593    ) {
594        let since_handle = if self.read_only {
595            let read_handle = self
596                .open_leased_handle(id, shard, relation_desc.clone(), since, persist_client)
597                .await;
598            SinceHandleWrapper::Leased(read_handle)
599        } else {
600            // We're managing the data for this shard in read-write mode, which would fence out other
601            // processes in read-only mode; it's safe to upgrade the metadata version.
602            persist_client
603                .upgrade_version::<SourceData, (), T, StorageDiff>(
604                    shard,
605                    Diagnostics {
606                        shard_name: id.to_string(),
607                        handle_purpose: format!("controller data for {}", id),
608                    },
609                )
610                .await
611                .expect("invalid persist usage");
612
613            let since_handle = self
614                .open_critical_handle(id, shard, since, persist_client)
615                .await;
616
617            SinceHandleWrapper::Critical(since_handle)
618        };
619
620        let mut write_handle = self
621            .open_write_handle(id, shard, relation_desc, persist_client)
622            .await;
623
624        // N.B.
625        // Fetch the most recent upper for the write handle. Otherwise, this may
626        // be behind the since of the since handle. Its vital this happens AFTER
627        // we create the since handle as it needs to be linearized with that
628        // operation. It may be true that creating the write handle after the
629        // since handle already ensures this, but we do this out of an abundance
630        // of caution.
631        //
632        // Note that this returns the upper, but also sets it on the handle to
633        // be fetched later.
634        write_handle.fetch_recent_upper().await;
635
636        (write_handle, since_handle)
637    }
638
639    /// Opens a write handle for the given `shard`.
640    async fn open_write_handle(
641        &self,
642        id: &GlobalId,
643        shard: ShardId,
644        relation_desc: RelationDesc,
645        persist_client: &PersistClient,
646    ) -> WriteHandle<SourceData, (), T, StorageDiff> {
647        let diagnostics = Diagnostics {
648            shard_name: id.to_string(),
649            handle_purpose: format!("controller data for {}", id),
650        };
651
652        let write = persist_client
653            .open_writer(
654                shard,
655                Arc::new(relation_desc),
656                Arc::new(UnitSchema),
657                diagnostics.clone(),
658            )
659            .await
660            .expect("invalid persist usage");
661
662        write
663    }
664
665    /// Opens a critical since handle for the given `shard`.
666    ///
667    /// `since` is an optional since that the read handle will be forwarded to
668    /// if it is less than its current since.
669    ///
670    /// This will `halt!` the process if we cannot successfully acquire a
671    /// critical handle with our current epoch.
672    async fn open_critical_handle(
673        &self,
674        id: &GlobalId,
675        shard: ShardId,
676        since: Option<&Antichain<T>>,
677        persist_client: &PersistClient,
678    ) -> SinceHandle<SourceData, (), T, StorageDiff> {
679        tracing::debug!(%id, ?since, "opening critical handle");
680
681        assert!(
682            !self.read_only,
683            "attempting to open critical SinceHandle in read-only mode"
684        );
685
686        let diagnostics = Diagnostics {
687            shard_name: id.to_string(),
688            handle_purpose: format!("controller data for {}", id),
689        };
690
691        // Construct the handle in a separate block to ensure all error paths
692        // are diverging
693        let since_handle = {
694            // This block's aim is to ensure the handle is in terms of our epoch
695            // by the time we return it.
696            let mut handle = persist_client
697                .open_critical_since(
698                    shard,
699                    PersistClient::CONTROLLER_CRITICAL_SINCE,
700                    Opaque::encode(&PersistEpoch::default()),
701                    diagnostics.clone(),
702                )
703                .await
704                .expect("invalid persist usage");
705
706            // Take the join of the handle's since and the provided `since`;
707            // this lets materialized views express the since at which their
708            // read handles "start."
709            let provided_since = match since {
710                Some(since) => since,
711                None => &Antichain::from_elem(T::minimum()),
712            };
713            let since = handle.since().join(provided_since);
714
715            let our_epoch = self.envd_epoch;
716
717            loop {
718                let current_epoch: PersistEpoch = handle.opaque().decode();
719
720                // Ensure the current epoch is <= our epoch.
721                let unchecked_success = current_epoch.0.map(|e| e <= our_epoch).unwrap_or(true);
722
723                if unchecked_success {
724                    // Update the handle's state so that it is in terms of our
725                    // epoch.
726                    let checked_success = handle
727                        .compare_and_downgrade_since(
728                            &Opaque::encode(&current_epoch),
729                            (&Opaque::encode(&PersistEpoch::from(our_epoch)), &since),
730                        )
731                        .await
732                        .is_ok();
733                    if checked_success {
734                        break handle;
735                    }
736                } else {
737                    mz_ore::halt!("fenced by envd @ {current_epoch:?}. ours = {our_epoch}");
738                }
739            }
740        };
741
742        since_handle
743    }
744
745    /// Opens a leased [ReadHandle], for the purpose of holding back a since,
746    /// for the given `shard`.
747    ///
748    /// `since` is an optional since that the read handle will be forwarded to
749    /// if it is less than its current since.
750    async fn open_leased_handle(
751        &self,
752        id: &GlobalId,
753        shard: ShardId,
754        relation_desc: RelationDesc,
755        since: Option<&Antichain<T>>,
756        persist_client: &PersistClient,
757    ) -> ReadHandle<SourceData, (), T, StorageDiff> {
758        tracing::debug!(%id, ?since, "opening leased handle");
759
760        let diagnostics = Diagnostics {
761            shard_name: id.to_string(),
762            handle_purpose: format!("controller data for {}", id),
763        };
764
765        let use_critical_since = false;
766        let mut handle: ReadHandle<_, _, _, _> = persist_client
767            .open_leased_reader(
768                shard,
769                Arc::new(relation_desc),
770                Arc::new(UnitSchema),
771                diagnostics.clone(),
772                use_critical_since,
773            )
774            .await
775            .expect("invalid persist usage");
776
777        // Take the join of the handle's since and the provided `since`;
778        // this lets materialized views express the since at which their
779        // read handles "start."
780        let provided_since = match since {
781            Some(since) => since,
782            None => &Antichain::from_elem(T::minimum()),
783        };
784        let since = handle.since().join(provided_since);
785
786        handle.downgrade_since(&since).await;
787
788        handle
789    }
790
791    fn register_handles(
792        &self,
793        id: GlobalId,
794        is_in_txns: bool,
795        since_handle: SinceHandleWrapper<T>,
796        write_handle: WriteHandle<SourceData, (), T, StorageDiff>,
797    ) {
798        self.send(BackgroundCmd::Register {
799            id,
800            is_in_txns,
801            since_handle,
802            write_handle,
803        });
804    }
805
806    fn send(&self, cmd: BackgroundCmd<T>) {
807        let _ = self.cmd_tx.send(cmd);
808    }
809
810    async fn snapshot_stats_inner(
811        &self,
812        id: GlobalId,
813        as_of: SnapshotStatsAsOf<T>,
814    ) -> Result<SnapshotStats, StorageError<T>> {
815        // TODO: Pull this out of BackgroundTask. Unlike the other methods, the
816        // caller of this one drives it to completion.
817        //
818        // We'd need to either share the critical handle somehow or maybe have
819        // two instances around, one in the worker and one in the
820        // StorageCollections.
821        let (tx, rx) = oneshot::channel();
822        self.send(BackgroundCmd::SnapshotStats(id, as_of, tx));
823        rx.await.expect("BackgroundTask should be live").0.await
824    }
825
826    /// If this identified collection has a dependency, install a read hold on
827    /// it.
828    ///
829    /// This is necessary to ensure that the dependency's since does not advance
830    /// beyond its dependents'.
831    fn install_collection_dependency_read_holds_inner(
832        &self,
833        self_collections: &mut BTreeMap<GlobalId, CollectionState<T>>,
834        id: GlobalId,
835    ) -> Result<(), StorageError<T>> {
836        let (deps, collection_implied_capability) = match self_collections.get(&id) {
837            Some(CollectionState {
838                storage_dependencies: deps,
839                implied_capability,
840                ..
841            }) => (deps.clone(), implied_capability),
842            _ => return Ok(()),
843        };
844
845        for dep in deps.iter() {
846            let dep_collection = self_collections
847                .get(dep)
848                .ok_or(StorageError::IdentifierMissing(id))?;
849
850            mz_ore::soft_assert_or_log!(
851                PartialOrder::less_equal(
852                    &dep_collection.implied_capability,
853                    collection_implied_capability
854                ),
855                "dependency since ({dep}@{:?}) cannot be in advance of dependent's since ({id}@{:?})",
856                dep_collection.implied_capability,
857                collection_implied_capability,
858            );
859        }
860
861        self.install_read_capabilities_inner(
862            self_collections,
863            id,
864            &deps,
865            collection_implied_capability.clone(),
866        )?;
867
868        Ok(())
869    }
870
871    /// Returns the given collection's dependencies.
872    fn determine_collection_dependencies(
873        self_collections: &BTreeMap<GlobalId, CollectionState<T>>,
874        source_id: GlobalId,
875        collection_desc: &CollectionDescription<T>,
876    ) -> Result<Vec<GlobalId>, StorageError<T>> {
877        let mut dependencies = Vec::new();
878
879        if let Some(id) = collection_desc.primary {
880            dependencies.push(id);
881        }
882
883        match &collection_desc.data_source {
884            DataSource::Introspection(_)
885            | DataSource::Webhook
886            | DataSource::Table
887            | DataSource::Progress
888            | DataSource::Other => (),
889            DataSource::IngestionExport {
890                ingestion_id,
891                data_config,
892                ..
893            } => {
894                // Ingestion exports depend on their primary source's remap
895                // collection, except when they use a CDCv2 envelope.
896                let source = self_collections
897                    .get(ingestion_id)
898                    .ok_or(StorageError::IdentifierMissing(*ingestion_id))?;
899                let Some(remap_collection_id) = &source.ingestion_remap_collection_id else {
900                    panic!("SourceExport must refer to a primary source that already exists");
901                };
902
903                match data_config.envelope {
904                    SourceEnvelope::CdcV2 => (),
905                    _ => dependencies.push(*remap_collection_id),
906                }
907            }
908            // Ingestions depend on their remap collection.
909            DataSource::Ingestion(ingestion) => {
910                if ingestion.remap_collection_id != source_id {
911                    dependencies.push(ingestion.remap_collection_id);
912                }
913            }
914            DataSource::Sink { desc } => dependencies.push(desc.sink.from),
915        }
916
917        Ok(dependencies)
918    }
919
920    /// Install read capabilities on the given `storage_dependencies`.
921    #[instrument(level = "debug")]
922    fn install_read_capabilities_inner(
923        &self,
924        self_collections: &mut BTreeMap<GlobalId, CollectionState<T>>,
925        from_id: GlobalId,
926        storage_dependencies: &[GlobalId],
927        read_capability: Antichain<T>,
928    ) -> Result<(), StorageError<T>> {
929        let mut changes = ChangeBatch::new();
930        for time in read_capability.iter() {
931            changes.update(time.clone(), 1);
932        }
933
934        if tracing::span_enabled!(tracing::Level::TRACE) {
935            // Collecting `user_capabilities` is potentially slow, thus only do it when needed.
936            let user_capabilities = self_collections
937                .iter_mut()
938                .filter(|(id, _c)| id.is_user())
939                .map(|(id, c)| {
940                    let updates = c.read_capabilities.updates().cloned().collect_vec();
941                    (*id, c.implied_capability.clone(), updates)
942                })
943                .collect_vec();
944
945            trace!(
946                %from_id,
947                ?storage_dependencies,
948                ?read_capability,
949                ?user_capabilities,
950                "install_read_capabilities_inner");
951        }
952
953        let mut storage_read_updates = storage_dependencies
954            .iter()
955            .map(|id| (*id, changes.clone()))
956            .collect();
957
958        StorageCollectionsImpl::update_read_capabilities_inner(
959            &self.cmd_tx,
960            self_collections,
961            &mut storage_read_updates,
962        );
963
964        if tracing::span_enabled!(tracing::Level::TRACE) {
965            // Collecting `user_capabilities` is potentially slow, thus only do it when needed.
966            let user_capabilities = self_collections
967                .iter_mut()
968                .filter(|(id, _c)| id.is_user())
969                .map(|(id, c)| {
970                    let updates = c.read_capabilities.updates().cloned().collect_vec();
971                    (*id, c.implied_capability.clone(), updates)
972                })
973                .collect_vec();
974
975            trace!(
976                %from_id,
977                ?storage_dependencies,
978                ?read_capability,
979                ?user_capabilities,
980                "after install_read_capabilities_inner!");
981        }
982
983        Ok(())
984    }
985
986    async fn recent_upper(&self, id: GlobalId) -> Result<Antichain<T>, StorageError<T>> {
987        let metadata = &self.collection_metadata(id)?;
988        let persist_client = self
989            .persist
990            .open(metadata.persist_location.clone())
991            .await
992            .unwrap();
993        // Duplicate part of open_data_handles here because we don't need the
994        // fetch_recent_upper call. The pubsub-updated shared_upper is enough.
995        let diagnostics = Diagnostics {
996            shard_name: id.to_string(),
997            handle_purpose: format!("controller data for {}", id),
998        };
999        // NB: Opening a WriteHandle is cheap if it's never used in a
1000        // compare_and_append operation.
1001        let write = persist_client
1002            .open_writer::<SourceData, (), T, StorageDiff>(
1003                metadata.data_shard,
1004                Arc::new(metadata.relation_desc.clone()),
1005                Arc::new(UnitSchema),
1006                diagnostics.clone(),
1007            )
1008            .await
1009            .expect("invalid persist usage");
1010        Ok(write.shared_upper())
1011    }
1012
1013    async fn read_handle_for_snapshot(
1014        persist: Arc<PersistClientCache>,
1015        metadata: &CollectionMetadata,
1016        id: GlobalId,
1017    ) -> Result<ReadHandle<SourceData, (), T, StorageDiff>, StorageError<T>> {
1018        let persist_client = persist
1019            .open(metadata.persist_location.clone())
1020            .await
1021            .unwrap();
1022
1023        // We create a new read handle every time someone requests a snapshot
1024        // and then immediately expire it instead of keeping a read handle
1025        // permanently in our state to avoid having it heartbeat continually.
1026        // The assumption is that calls to snapshot are rare and therefore worth
1027        // it to always create a new handle.
1028        let read_handle = persist_client
1029            .open_leased_reader::<SourceData, (), _, _>(
1030                metadata.data_shard,
1031                Arc::new(metadata.relation_desc.clone()),
1032                Arc::new(UnitSchema),
1033                Diagnostics {
1034                    shard_name: id.to_string(),
1035                    handle_purpose: format!("snapshot {}", id),
1036                },
1037                USE_CRITICAL_SINCE_SNAPSHOT.get(&persist.cfg),
1038            )
1039            .await
1040            .expect("invalid persist usage");
1041        Ok(read_handle)
1042    }
1043
1044    // TODO(petrosagg): This signature is not very useful in the context of partially ordered times
1045    // where the as_of frontier might have multiple elements. In the current form the mutually
1046    // incomparable updates will be accumulated together to a state of the collection that never
1047    // actually existed. We should include the original time in the updates advanced by the as_of
1048    // frontier in the result and let the caller decide what to do with the information.
1049    fn snapshot(
1050        &self,
1051        id: GlobalId,
1052        as_of: T,
1053        txns_read: &TxnsRead<T>,
1054    ) -> BoxFuture<'static, Result<Vec<(Row, StorageDiff)>, StorageError<T>>>
1055    where
1056        T: Codec64 + From<EpochMillis> + TimestampManipulation,
1057    {
1058        let metadata = match self.collection_metadata(id) {
1059            Ok(metadata) => metadata.clone(),
1060            Err(e) => return async { Err(e.into()) }.boxed(),
1061        };
1062        let txns_read = metadata.txns_shard.as_ref().map(|txns_id| {
1063            assert_eq!(txns_id, txns_read.txns_id());
1064            txns_read.clone()
1065        });
1066        let persist = Arc::clone(&self.persist);
1067        async move {
1068            let mut read_handle = Self::read_handle_for_snapshot(persist, &metadata, id).await?;
1069            let contents = match txns_read {
1070                None => {
1071                    // We're not using txn-wal for tables, so we can take a snapshot directly.
1072                    read_handle
1073                        .snapshot_and_fetch(Antichain::from_elem(as_of))
1074                        .await
1075                }
1076                Some(txns_read) => {
1077                    // We _are_ using txn-wal for tables. It advances the physical upper of the
1078                    // shard lazily, so we need to ask it for the snapshot to ensure the read is
1079                    // unblocked.
1080                    //
1081                    // Consider the following scenario:
1082                    // - Table A is written to via txns at time 5
1083                    // - Tables other than A are written to via txns consuming timestamps up to 10
1084                    // - We'd like to read A at 7
1085                    // - The application process of A's txn has advanced the upper to 5+1, but we need
1086                    //   it to be past 7, but the txns shard knows that (5,10) is empty of writes to A
1087                    // - This branch allows it to handle that advancing the physical upper of Table A to
1088                    //   10 (NB but only once we see it get past the write at 5!)
1089                    // - Then we can read it normally.
1090                    txns_read.update_gt(as_of.clone()).await;
1091                    let data_snapshot = txns_read
1092                        .data_snapshot(metadata.data_shard, as_of.clone())
1093                        .await;
1094                    data_snapshot.snapshot_and_fetch(&mut read_handle).await
1095                }
1096            };
1097            match contents {
1098                Ok(contents) => {
1099                    let mut snapshot = Vec::with_capacity(contents.len());
1100                    for ((data, _), _, diff) in contents {
1101                        // TODO(petrosagg): We should accumulate the errors too and let the user
1102                        // interpret the result
1103                        let row = data.0?;
1104                        snapshot.push((row, diff));
1105                    }
1106                    Ok(snapshot)
1107                }
1108                Err(_) => Err(StorageError::ReadBeforeSince(id)),
1109            }
1110        }
1111        .boxed()
1112    }
1113
1114    fn snapshot_and_stream(
1115        &self,
1116        id: GlobalId,
1117        as_of: T,
1118        txns_read: &TxnsRead<T>,
1119    ) -> BoxFuture<'static, Result<SourceDataStream<T>, StorageError<T>>> {
1120        use futures::stream::StreamExt;
1121
1122        let metadata = match self.collection_metadata(id) {
1123            Ok(metadata) => metadata.clone(),
1124            Err(e) => return async { Err(e.into()) }.boxed(),
1125        };
1126        let txns_read = metadata.txns_shard.as_ref().map(|txns_id| {
1127            assert_eq!(txns_id, txns_read.txns_id());
1128            txns_read.clone()
1129        });
1130        let persist = Arc::clone(&self.persist);
1131
1132        async move {
1133            let mut read_handle = Self::read_handle_for_snapshot(persist, &metadata, id).await?;
1134            let stream = match txns_read {
1135                None => {
1136                    // We're not using txn-wal for tables, so we can take a snapshot directly.
1137                    read_handle
1138                        .snapshot_and_stream(Antichain::from_elem(as_of))
1139                        .await
1140                        .map_err(|_| StorageError::ReadBeforeSince(id))?
1141                        .boxed()
1142                }
1143                Some(txns_read) => {
1144                    txns_read.update_gt(as_of.clone()).await;
1145                    let data_snapshot = txns_read
1146                        .data_snapshot(metadata.data_shard, as_of.clone())
1147                        .await;
1148                    data_snapshot
1149                        .snapshot_and_stream(&mut read_handle)
1150                        .await
1151                        .map_err(|_| StorageError::ReadBeforeSince(id))?
1152                        .boxed()
1153                }
1154            };
1155
1156            // Map our stream, unwrapping Persist internal errors.
1157            let stream = stream.map(|((data, _v), t, d)| (data, t, d)).boxed();
1158            Ok(stream)
1159        }
1160        .boxed()
1161    }
1162
1163    fn set_read_policies_inner(
1164        &self,
1165        collections: &mut BTreeMap<GlobalId, CollectionState<T>>,
1166        policies: Vec<(GlobalId, ReadPolicy<T>)>,
1167    ) {
1168        trace!("set_read_policies: {:?}", policies);
1169
1170        let mut read_capability_changes = BTreeMap::default();
1171
1172        for (id, policy) in policies.into_iter() {
1173            let collection = match collections.get_mut(&id) {
1174                Some(c) => c,
1175                None => {
1176                    panic!("Reference to absent collection {id}");
1177                }
1178            };
1179
1180            let mut new_read_capability = policy.frontier(collection.write_frontier.borrow());
1181
1182            if PartialOrder::less_equal(&collection.implied_capability, &new_read_capability) {
1183                let mut update = ChangeBatch::new();
1184                update.extend(new_read_capability.iter().map(|time| (time.clone(), 1)));
1185                std::mem::swap(&mut collection.implied_capability, &mut new_read_capability);
1186                update.extend(new_read_capability.iter().map(|time| (time.clone(), -1)));
1187                if !update.is_empty() {
1188                    read_capability_changes.insert(id, update);
1189                }
1190            }
1191
1192            collection.read_policy = policy;
1193        }
1194
1195        for (id, changes) in read_capability_changes.iter() {
1196            if id.is_user() {
1197                trace!(%id, ?changes, "in set_read_policies, capability changes");
1198            }
1199        }
1200
1201        if !read_capability_changes.is_empty() {
1202            StorageCollectionsImpl::update_read_capabilities_inner(
1203                &self.cmd_tx,
1204                collections,
1205                &mut read_capability_changes,
1206            );
1207        }
1208    }
1209
1210    // This is not an associated function so that we can share it with the task
1211    // that updates the persist handles and also has a reference to the shared
1212    // collections state.
1213    fn update_read_capabilities_inner(
1214        cmd_tx: &mpsc::UnboundedSender<BackgroundCmd<T>>,
1215        collections: &mut BTreeMap<GlobalId, CollectionState<T>>,
1216        updates: &mut BTreeMap<GlobalId, ChangeBatch<T>>,
1217    ) {
1218        // Location to record consequences that we need to act on.
1219        let mut collections_net = BTreeMap::new();
1220
1221        // We must not rely on any specific relative ordering of `GlobalId`s.
1222        // That said, it is reasonable to assume that collections generally have
1223        // greater IDs than their dependencies, so starting with the largest is
1224        // a useful optimization.
1225        while let Some(id) = updates.keys().rev().next().cloned() {
1226            let mut update = updates.remove(&id).unwrap();
1227
1228            if id.is_user() {
1229                trace!(id = ?id, update = ?update, "update_read_capabilities");
1230            }
1231
1232            let collection = if let Some(c) = collections.get_mut(&id) {
1233                c
1234            } else {
1235                let has_positive_updates = update.iter().any(|(_ts, diff)| *diff > 0);
1236                if has_positive_updates {
1237                    panic!(
1238                        "reference to absent collection {id} but we have positive updates: {:?}",
1239                        update
1240                    );
1241                } else {
1242                    // Continue purely negative updates. Someone has probably
1243                    // already dropped this collection!
1244                    continue;
1245                }
1246            };
1247
1248            let current_read_capabilities = collection.read_capabilities.frontier().to_owned();
1249            for (time, diff) in update.iter() {
1250                assert!(
1251                    collection.read_capabilities.count_for(time) + diff >= 0,
1252                    "update {:?} for collection {id} would lead to negative \
1253                        read capabilities, read capabilities before applying: {:?}",
1254                    update,
1255                    collection.read_capabilities
1256                );
1257
1258                if collection.read_capabilities.count_for(time) + diff > 0 {
1259                    assert!(
1260                        current_read_capabilities.less_equal(time),
1261                        "update {:?} for collection {id} is trying to \
1262                            install read capabilities before the current \
1263                            frontier of read capabilities, read capabilities before applying: {:?}",
1264                        update,
1265                        collection.read_capabilities
1266                    );
1267                }
1268            }
1269
1270            let changes = collection.read_capabilities.update_iter(update.drain());
1271            update.extend(changes);
1272
1273            if id.is_user() {
1274                trace!(
1275                %id,
1276                ?collection.storage_dependencies,
1277                ?update,
1278                "forwarding update to storage dependencies");
1279            }
1280
1281            for id in collection.storage_dependencies.iter() {
1282                updates
1283                    .entry(*id)
1284                    .or_insert_with(ChangeBatch::new)
1285                    .extend(update.iter().cloned());
1286            }
1287
1288            let (changes, frontier) = collections_net
1289                .entry(id)
1290                .or_insert_with(|| (<ChangeBatch<_>>::new(), Antichain::new()));
1291
1292            changes.extend(update.drain());
1293            *frontier = collection.read_capabilities.frontier().to_owned();
1294        }
1295
1296        // Translate our net compute actions into downgrades of persist sinces.
1297        // The actual downgrades are performed by a Tokio task asynchronously.
1298        let mut persist_compaction_commands = Vec::with_capacity(collections_net.len());
1299        for (key, (mut changes, frontier)) in collections_net {
1300            if !changes.is_empty() {
1301                // If the collection has a "primary" collection, let that primary drive compaction.
1302                let collection = collections.get(&key).expect("must still exist");
1303                let should_emit_persist_compaction = collection.primary.is_none();
1304
1305                if frontier.is_empty() {
1306                    info!(id = %key, "removing collection state because the since advanced to []!");
1307                    collections.remove(&key).expect("must still exist");
1308                }
1309
1310                if should_emit_persist_compaction {
1311                    persist_compaction_commands.push((key, frontier));
1312                }
1313            }
1314        }
1315
1316        if !persist_compaction_commands.is_empty() {
1317            cmd_tx
1318                .send(BackgroundCmd::DowngradeSince(persist_compaction_commands))
1319                .expect("cannot fail to send");
1320        }
1321    }
1322
1323    /// Remove any shards that we know are finalized
1324    fn synchronize_finalized_shards(&self, storage_metadata: &StorageMetadata) {
1325        self.finalized_shards
1326            .lock()
1327            .retain(|shard| storage_metadata.unfinalized_shards.contains(shard));
1328    }
1329}
1330
1331// See comments on the above impl for StorageCollectionsImpl.
1332#[async_trait]
1333impl<T> StorageCollections for StorageCollectionsImpl<T>
1334where
1335    T: TimelyTimestamp
1336        + Lattice
1337        + Codec64
1338        + From<EpochMillis>
1339        + TimestampManipulation
1340        + Into<mz_repr::Timestamp>
1341        + Sync,
1342{
1343    type Timestamp = T;
1344
1345    async fn initialize_state(
1346        &self,
1347        txn: &mut (dyn StorageTxn<T> + Send),
1348        init_ids: BTreeSet<GlobalId>,
1349    ) -> Result<(), StorageError<T>> {
1350        let metadata = txn.get_collection_metadata();
1351        let existing_metadata: BTreeSet<_> = metadata.into_iter().map(|(id, _)| id).collect();
1352
1353        // Determine which collections we do not yet have metadata for.
1354        let new_collections: BTreeSet<GlobalId> =
1355            init_ids.difference(&existing_metadata).cloned().collect();
1356
1357        self.prepare_state(
1358            txn,
1359            new_collections,
1360            BTreeSet::default(),
1361            BTreeMap::default(),
1362        )
1363        .await?;
1364
1365        // All shards that belong to collections dropped in the last epoch are
1366        // eligible for finalization.
1367        //
1368        // n.b. this introduces an unlikely race condition: if a collection is
1369        // dropped from the catalog, but the dataflow is still running on a
1370        // worker, assuming the shard is safe to finalize on reboot may cause
1371        // the cluster to panic.
1372        let unfinalized_shards = txn.get_unfinalized_shards().into_iter().collect_vec();
1373
1374        info!(?unfinalized_shards, "initializing finalizable_shards");
1375
1376        self.finalizable_shards.lock().extend(unfinalized_shards);
1377
1378        Ok(())
1379    }
1380
1381    fn update_parameters(&self, config_params: StorageParameters) {
1382        // We serialize the dyncfg updates in StorageParameters, but configure
1383        // persist separately.
1384        config_params.dyncfg_updates.apply(self.persist.cfg());
1385
1386        self.config
1387            .lock()
1388            .expect("lock poisoned")
1389            .update(config_params);
1390    }
1391
1392    fn collection_metadata(&self, id: GlobalId) -> Result<CollectionMetadata, CollectionMissing> {
1393        let collections = self.collections.lock().expect("lock poisoned");
1394
1395        collections
1396            .get(&id)
1397            .map(|c| c.collection_metadata.clone())
1398            .ok_or(CollectionMissing(id))
1399    }
1400
1401    fn active_collection_metadatas(&self) -> Vec<(GlobalId, CollectionMetadata)> {
1402        let collections = self.collections.lock().expect("lock poisoned");
1403
1404        collections
1405            .iter()
1406            .filter(|(_id, c)| !c.is_dropped())
1407            .map(|(id, c)| (*id, c.collection_metadata.clone()))
1408            .collect()
1409    }
1410
1411    fn collections_frontiers(
1412        &self,
1413        ids: Vec<GlobalId>,
1414    ) -> Result<Vec<CollectionFrontiers<Self::Timestamp>>, CollectionMissing> {
1415        if ids.is_empty() {
1416            return Ok(vec![]);
1417        }
1418
1419        let collections = self.collections.lock().expect("lock poisoned");
1420
1421        let res = ids
1422            .into_iter()
1423            .map(|id| {
1424                collections
1425                    .get(&id)
1426                    .map(|c| CollectionFrontiers {
1427                        id: id.clone(),
1428                        write_frontier: c.write_frontier.clone(),
1429                        implied_capability: c.implied_capability.clone(),
1430                        read_capabilities: c.read_capabilities.frontier().to_owned(),
1431                    })
1432                    .ok_or(CollectionMissing(id))
1433            })
1434            .collect::<Result<Vec<_>, _>>()?;
1435
1436        Ok(res)
1437    }
1438
1439    fn active_collection_frontiers(&self) -> Vec<CollectionFrontiers<Self::Timestamp>> {
1440        let collections = self.collections.lock().expect("lock poisoned");
1441
1442        let res = collections
1443            .iter()
1444            .filter(|(_id, c)| !c.is_dropped())
1445            .map(|(id, c)| CollectionFrontiers {
1446                id: id.clone(),
1447                write_frontier: c.write_frontier.clone(),
1448                implied_capability: c.implied_capability.clone(),
1449                read_capabilities: c.read_capabilities.frontier().to_owned(),
1450            })
1451            .collect_vec();
1452
1453        res
1454    }
1455
1456    async fn snapshot_stats(
1457        &self,
1458        id: GlobalId,
1459        as_of: Antichain<Self::Timestamp>,
1460    ) -> Result<SnapshotStats, StorageError<Self::Timestamp>> {
1461        let metadata = self.collection_metadata(id)?;
1462
1463        // See the comments in StorageController::snapshot for what's going on
1464        // here.
1465        let as_of = match metadata.txns_shard.as_ref() {
1466            None => SnapshotStatsAsOf::Direct(as_of),
1467            Some(txns_id) => {
1468                assert_eq!(txns_id, self.txns_read.txns_id());
1469                let as_of = as_of
1470                    .into_option()
1471                    .expect("cannot read as_of the empty antichain");
1472                self.txns_read.update_gt(as_of.clone()).await;
1473                let data_snapshot = self
1474                    .txns_read
1475                    .data_snapshot(metadata.data_shard, as_of.clone())
1476                    .await;
1477                SnapshotStatsAsOf::Txns(data_snapshot)
1478            }
1479        };
1480        self.snapshot_stats_inner(id, as_of).await
1481    }
1482
1483    async fn snapshot_parts_stats(
1484        &self,
1485        id: GlobalId,
1486        as_of: Antichain<Self::Timestamp>,
1487    ) -> BoxFuture<'static, Result<SnapshotPartsStats, StorageError<Self::Timestamp>>> {
1488        let metadata = {
1489            let self_collections = self.collections.lock().expect("lock poisoned");
1490
1491            let collection_metadata = self_collections
1492                .get(&id)
1493                .ok_or(StorageError::IdentifierMissing(id))
1494                .map(|c| c.collection_metadata.clone());
1495
1496            match collection_metadata {
1497                Ok(m) => m,
1498                Err(e) => return Box::pin(async move { Err(e) }),
1499            }
1500        };
1501
1502        // See the comments in StorageController::snapshot for what's going on
1503        // here.
1504        let persist = Arc::clone(&self.persist);
1505        let read_handle = Self::read_handle_for_snapshot(persist, &metadata, id).await;
1506
1507        let data_snapshot = match (metadata, as_of.as_option()) {
1508            (
1509                CollectionMetadata {
1510                    txns_shard: Some(txns_id),
1511                    data_shard,
1512                    ..
1513                },
1514                Some(as_of),
1515            ) => {
1516                assert_eq!(txns_id, *self.txns_read.txns_id());
1517                self.txns_read.update_gt(as_of.clone()).await;
1518                let data_snapshot = self
1519                    .txns_read
1520                    .data_snapshot(data_shard, as_of.clone())
1521                    .await;
1522                Some(data_snapshot)
1523            }
1524            _ => None,
1525        };
1526
1527        Box::pin(async move {
1528            let read_handle = read_handle?;
1529            let result = match data_snapshot {
1530                Some(data_snapshot) => data_snapshot.snapshot_parts_stats(&read_handle).await,
1531                None => read_handle.snapshot_parts_stats(as_of).await,
1532            };
1533            read_handle.expire().await;
1534            result.map_err(|_| StorageError::ReadBeforeSince(id))
1535        })
1536    }
1537
1538    // TODO(petrosagg): This signature is not very useful in the context of partially ordered times
1539    // where the as_of frontier might have multiple elements. In the current form the mutually
1540    // incomparable updates will be accumulated together to a state of the collection that never
1541    // actually existed. We should include the original time in the updates advanced by the as_of
1542    // frontier in the result and let the caller decide what to do with the information.
1543    fn snapshot(
1544        &self,
1545        id: GlobalId,
1546        as_of: Self::Timestamp,
1547    ) -> BoxFuture<'static, Result<Vec<(Row, StorageDiff)>, StorageError<Self::Timestamp>>> {
1548        self.snapshot(id, as_of, &self.txns_read)
1549    }
1550
1551    async fn snapshot_latest(
1552        &self,
1553        id: GlobalId,
1554    ) -> Result<Vec<Row>, StorageError<Self::Timestamp>> {
1555        let upper = self.recent_upper(id).await?;
1556        let res = match upper.as_option() {
1557            Some(f) if f > &T::minimum() => {
1558                let as_of = f.step_back().unwrap();
1559
1560                let snapshot = self.snapshot(id, as_of, &self.txns_read).await.unwrap();
1561                snapshot
1562                    .into_iter()
1563                    .map(|(row, diff)| {
1564                        assert_eq!(diff, 1, "snapshot doesn't accumulate to set");
1565                        row
1566                    })
1567                    .collect()
1568            }
1569            Some(_min) => {
1570                // The collection must be empty!
1571                Vec::new()
1572            }
1573            // The collection is closed, we cannot determine a latest read
1574            // timestamp based on the upper.
1575            _ => {
1576                return Err(StorageError::InvalidUsage(
1577                    "collection closed, cannot determine a read timestamp based on the upper"
1578                        .to_string(),
1579                ));
1580            }
1581        };
1582
1583        Ok(res)
1584    }
1585
1586    fn snapshot_cursor(
1587        &self,
1588        id: GlobalId,
1589        as_of: Self::Timestamp,
1590    ) -> BoxFuture<'static, Result<SnapshotCursor<Self::Timestamp>, StorageError<Self::Timestamp>>>
1591    where
1592        Self::Timestamp: TimelyTimestamp + Lattice + Codec64,
1593    {
1594        let metadata = match self.collection_metadata(id) {
1595            Ok(metadata) => metadata.clone(),
1596            Err(e) => return async { Err(e.into()) }.boxed(),
1597        };
1598        let txns_read = metadata.txns_shard.as_ref().map(|txns_id| {
1599            // Ensure the txn's shard the controller has is the same that this
1600            // collection is registered to.
1601            assert_eq!(txns_id, self.txns_read.txns_id());
1602            self.txns_read.clone()
1603        });
1604        let persist = Arc::clone(&self.persist);
1605
1606        // See the comments in Self::snapshot for what's going on here.
1607        async move {
1608            let mut handle = Self::read_handle_for_snapshot(persist, &metadata, id).await?;
1609            let cursor = match txns_read {
1610                None => {
1611                    let cursor = handle
1612                        .snapshot_cursor(Antichain::from_elem(as_of), |_| true)
1613                        .await
1614                        .map_err(|_| StorageError::ReadBeforeSince(id))?;
1615                    SnapshotCursor {
1616                        _read_handle: handle,
1617                        cursor,
1618                    }
1619                }
1620                Some(txns_read) => {
1621                    txns_read.update_gt(as_of.clone()).await;
1622                    let data_snapshot = txns_read
1623                        .data_snapshot(metadata.data_shard, as_of.clone())
1624                        .await;
1625                    let cursor = data_snapshot
1626                        .snapshot_cursor(&mut handle, |_| true)
1627                        .await
1628                        .map_err(|_| StorageError::ReadBeforeSince(id))?;
1629                    SnapshotCursor {
1630                        _read_handle: handle,
1631                        cursor,
1632                    }
1633                }
1634            };
1635
1636            Ok(cursor)
1637        }
1638        .boxed()
1639    }
1640
1641    fn snapshot_and_stream(
1642        &self,
1643        id: GlobalId,
1644        as_of: Self::Timestamp,
1645    ) -> BoxFuture<
1646        'static,
1647        Result<
1648            BoxStream<'static, (SourceData, Self::Timestamp, StorageDiff)>,
1649            StorageError<Self::Timestamp>,
1650        >,
1651    >
1652    where
1653        Self::Timestamp: TimelyTimestamp + Lattice + Codec64 + 'static,
1654    {
1655        self.snapshot_and_stream(id, as_of, &self.txns_read)
1656    }
1657
1658    fn create_update_builder(
1659        &self,
1660        id: GlobalId,
1661    ) -> BoxFuture<
1662        'static,
1663        Result<
1664            TimestamplessUpdateBuilder<SourceData, (), Self::Timestamp, StorageDiff>,
1665            StorageError<Self::Timestamp>,
1666        >,
1667    > {
1668        let metadata = match self.collection_metadata(id) {
1669            Ok(m) => m,
1670            Err(e) => return Box::pin(async move { Err(e.into()) }),
1671        };
1672        let persist = Arc::clone(&self.persist);
1673
1674        async move {
1675            let persist_client = persist
1676                .open(metadata.persist_location.clone())
1677                .await
1678                .expect("invalid persist usage");
1679            let write_handle = persist_client
1680                .open_writer::<SourceData, (), Self::Timestamp, StorageDiff>(
1681                    metadata.data_shard,
1682                    Arc::new(metadata.relation_desc.clone()),
1683                    Arc::new(UnitSchema),
1684                    Diagnostics {
1685                        shard_name: id.to_string(),
1686                        handle_purpose: format!("create write batch {}", id),
1687                    },
1688                )
1689                .await
1690                .expect("invalid persist usage");
1691            let builder = TimestamplessUpdateBuilder::new(&write_handle);
1692
1693            Ok(builder)
1694        }
1695        .boxed()
1696    }
1697
1698    fn check_exists(&self, id: GlobalId) -> Result<(), StorageError<Self::Timestamp>> {
1699        let collections = self.collections.lock().expect("lock poisoned");
1700
1701        if collections.contains_key(&id) {
1702            Ok(())
1703        } else {
1704            Err(StorageError::IdentifierMissing(id))
1705        }
1706    }
1707
1708    async fn prepare_state(
1709        &self,
1710        txn: &mut (dyn StorageTxn<Self::Timestamp> + Send),
1711        ids_to_add: BTreeSet<GlobalId>,
1712        ids_to_drop: BTreeSet<GlobalId>,
1713        ids_to_register: BTreeMap<GlobalId, ShardId>,
1714    ) -> Result<(), StorageError<T>> {
1715        txn.insert_collection_metadata(
1716            ids_to_add
1717                .into_iter()
1718                .map(|id| (id, ShardId::new()))
1719                .collect(),
1720        )?;
1721        txn.insert_collection_metadata(ids_to_register)?;
1722
1723        // Delete the metadata for any dropped collections.
1724        let dropped_mappings = txn.delete_collection_metadata(ids_to_drop);
1725
1726        let dropped_shards = dropped_mappings
1727            .into_iter()
1728            .map(|(_id, shard)| shard)
1729            .collect();
1730
1731        txn.insert_unfinalized_shards(dropped_shards)?;
1732
1733        // Reconcile any shards we've successfully finalized with the shard
1734        // finalization collection.
1735        let finalized_shards = self.finalized_shards.lock().iter().copied().collect();
1736        txn.mark_shards_as_finalized(finalized_shards);
1737
1738        Ok(())
1739    }
1740
1741    // TODO(aljoscha): It would be swell if we could refactor this Leviathan of
1742    // a method/move individual parts to their own methods.
1743    #[instrument(level = "debug")]
1744    async fn create_collections_for_bootstrap(
1745        &self,
1746        storage_metadata: &StorageMetadata,
1747        register_ts: Option<Self::Timestamp>,
1748        mut collections: Vec<(GlobalId, CollectionDescription<Self::Timestamp>)>,
1749        migrated_storage_collections: &BTreeSet<GlobalId>,
1750    ) -> Result<(), StorageError<Self::Timestamp>> {
1751        let is_in_txns = |id, metadata: &CollectionMetadata| {
1752            metadata.txns_shard.is_some()
1753                && !(self.read_only && migrated_storage_collections.contains(&id))
1754        };
1755
1756        // Validate first, to avoid corrupting state.
1757        // 1. create a dropped identifier, or
1758        // 2. create an existing identifier with a new description.
1759        // Make sure to check for errors within `ingestions` as well.
1760        collections.sort_by_key(|(id, _)| *id);
1761        collections.dedup();
1762        for pos in 1..collections.len() {
1763            if collections[pos - 1].0 == collections[pos].0 {
1764                return Err(StorageError::CollectionIdReused(collections[pos].0));
1765            }
1766        }
1767
1768        // We first enrich each collection description with some additional
1769        // metadata...
1770        let enriched_with_metadata = collections
1771            .into_iter()
1772            .map(|(id, description)| {
1773                let data_shard = storage_metadata.get_collection_shard::<T>(id)?;
1774
1775                // If the shard is being managed by txn-wal (initially,
1776                // tables), then we need to pass along the shard id for the txns
1777                // shard to dataflow rendering.
1778                let txns_shard = description
1779                    .data_source
1780                    .in_txns()
1781                    .then(|| *self.txns_read.txns_id());
1782
1783                let metadata = CollectionMetadata {
1784                    persist_location: self.persist_location.clone(),
1785                    data_shard,
1786                    relation_desc: description.desc.clone(),
1787                    txns_shard,
1788                };
1789
1790                Ok((id, description, metadata))
1791            })
1792            .collect_vec();
1793
1794        // So that we can open `SinceHandle`s for each collections concurrently.
1795        let persist_client = self
1796            .persist
1797            .open(self.persist_location.clone())
1798            .await
1799            .unwrap();
1800        let persist_client = &persist_client;
1801        // Reborrow the `&mut self` as immutable, as all the concurrent work to
1802        // be processed in this stream cannot all have exclusive access.
1803        use futures::stream::{StreamExt, TryStreamExt};
1804        let this = &*self;
1805        let mut to_register: Vec<_> = futures::stream::iter(enriched_with_metadata)
1806            .map(|data: Result<_, StorageError<Self::Timestamp>>| {
1807                let register_ts = register_ts.clone();
1808                async move {
1809                    let (id, description, metadata) = data?;
1810
1811                    // should be replaced with real introspection
1812                    // (https://github.com/MaterializeInc/database-issues/issues/4078)
1813                    // but for now, it's helpful to have this mapping written down
1814                    // somewhere
1815                    debug!("mapping GlobalId={} to shard ({})", id, metadata.data_shard);
1816
1817                    // If this collection has a primary, the primary is responsible for downgrading
1818                    // the critical since and it would be an error if we did so here while opening
1819                    // the since handle.
1820                    let since = if description.primary.is_some() {
1821                        None
1822                    } else {
1823                        description.since.as_ref()
1824                    };
1825
1826                    let (write, mut since_handle) = this
1827                        .open_data_handles(
1828                            &id,
1829                            metadata.data_shard,
1830                            since,
1831                            metadata.relation_desc.clone(),
1832                            persist_client,
1833                        )
1834                        .await;
1835
1836                    // Present tables as springing into existence at the register_ts
1837                    // by advancing the since. Otherwise, we could end up in a
1838                    // situation where a table with a long compaction window appears
1839                    // to exist before the environment (and this the table) existed.
1840                    //
1841                    // We could potentially also do the same thing for other
1842                    // sources, in particular storage's internal sources and perhaps
1843                    // others, but leave them for now.
1844                    match description.data_source {
1845                        DataSource::Introspection(_)
1846                        | DataSource::IngestionExport { .. }
1847                        | DataSource::Webhook
1848                        | DataSource::Ingestion(_)
1849                        | DataSource::Progress
1850                        | DataSource::Other => {}
1851                        DataSource::Sink { .. } => {}
1852                        DataSource::Table => {
1853                            let register_ts = register_ts.expect(
1854                                "caller should have provided a register_ts when creating a table",
1855                            );
1856                            if since_handle.since().elements() == &[T::minimum()]
1857                                && !migrated_storage_collections.contains(&id)
1858                            {
1859                                debug!("advancing {} to initial since of {:?}", id, register_ts);
1860                                let token = since_handle.opaque();
1861                                let _ = since_handle
1862                                    .compare_and_downgrade_since(
1863                                        &token,
1864                                        (&token, &Antichain::from_elem(register_ts.clone())),
1865                                    )
1866                                    .await;
1867                            }
1868                        }
1869                    }
1870
1871                    Ok::<_, StorageError<Self::Timestamp>>((
1872                        id,
1873                        description,
1874                        write,
1875                        since_handle,
1876                        metadata,
1877                    ))
1878                }
1879            })
1880            // Poll each future for each collection concurrently, maximum of 50 at a time.
1881            .buffer_unordered(50)
1882            // HERE BE DRAGONS:
1883            //
1884            // There are at least 2 subtleties in using `FuturesUnordered`
1885            // (which `buffer_unordered` uses underneath:
1886            // - One is captured here
1887            //   <https://github.com/rust-lang/futures-rs/issues/2387>
1888            // - And the other is deadlocking if processing an OUTPUT of a
1889            //   `FuturesUnordered` stream attempts to obtain an async mutex that
1890            //   is also obtained in the futures being polled.
1891            //
1892            // Both of these could potentially be issues in all usages of
1893            // `buffer_unordered` in this method, so we stick the standard
1894            // advice: only use `try_collect` or `collect`!
1895            .try_collect()
1896            .await?;
1897
1898        // Reorder in dependency order.
1899        #[derive(Ord, PartialOrd, Eq, PartialEq)]
1900        enum DependencyOrder {
1901            /// Tables should always be registered first, and large ids before small ones.
1902            Table(Reverse<GlobalId>),
1903            /// For most collections the id order is the correct one.
1904            Collection(GlobalId),
1905            /// Sinks should always be registered last.
1906            Sink(GlobalId),
1907        }
1908        to_register.sort_by_key(|(id, desc, ..)| match &desc.data_source {
1909            DataSource::Table => DependencyOrder::Table(Reverse(*id)),
1910            DataSource::Sink { .. } => DependencyOrder::Sink(*id),
1911            _ => DependencyOrder::Collection(*id),
1912        });
1913
1914        // We hold this lock for a very short amount of time, just doing some
1915        // hashmap inserts and unbounded channel sends.
1916        let mut self_collections = self.collections.lock().expect("lock poisoned");
1917
1918        for (id, description, write_handle, since_handle, metadata) in to_register {
1919            let write_frontier = write_handle.upper();
1920            let data_shard_since = since_handle.since().clone();
1921
1922            // Determine if this collection has any dependencies.
1923            let storage_dependencies =
1924                Self::determine_collection_dependencies(&*self_collections, id, &description)?;
1925
1926            // Determine the initial since of the collection.
1927            let initial_since = match storage_dependencies
1928                .iter()
1929                .at_most_one()
1930                .expect("should have at most one dependency")
1931            {
1932                Some(dep) => {
1933                    let dependency_collection = self_collections
1934                        .get(dep)
1935                        .ok_or(StorageError::IdentifierMissing(*dep))?;
1936                    let dependency_since = dependency_collection.implied_capability.clone();
1937
1938                    // If an item has a dependency, its initial since must be
1939                    // advanced as far as its dependency, i.e. a dependency's
1940                    // since may never be in advance of its dependents.
1941                    //
1942                    // We have to do this every time we initialize the
1943                    // collection, though––the invariant might have been upheld
1944                    // correctly in the previous epoch, but the
1945                    // `data_shard_since` might not have compacted and, on
1946                    // establishing a new persist connection, still have data we
1947                    // said _could_ be compacted.
1948                    if PartialOrder::less_than(&data_shard_since, &dependency_since) {
1949                        // The dependency since cannot be beyond the dependent
1950                        // (our) upper unless the collection is new. In
1951                        // practice, the depdenency is the remap shard of a
1952                        // source (export), and if the since is allowed to
1953                        // "catch up" to the upper, that is `upper <= since`, a
1954                        // restarting ingestion cannot differentiate between
1955                        // updates that have already been written out to the
1956                        // backing persist shard and updates that have yet to be
1957                        // written. We would write duplicate updates.
1958                        //
1959                        // If this check fails, it means that the read hold
1960                        // installed on the dependency was probably not upheld
1961                        // –– if it were, the dependency's since could not have
1962                        // advanced as far the dependent's upper.
1963                        //
1964                        // We don't care about the dependency since when the
1965                        // write frontier is empty. In that case, no-one can
1966                        // write down any more updates.
1967                        mz_ore::soft_assert_or_log!(
1968                            write_frontier.elements() == &[T::minimum()]
1969                                || write_frontier.is_empty()
1970                                || PartialOrder::less_than(&dependency_since, write_frontier),
1971                            "dependency ({dep}) since has advanced past dependent ({id}) upper \n
1972                            dependent ({id}): since {:?}, upper {:?} \n
1973                            dependency ({dep}): since {:?}",
1974                            data_shard_since,
1975                            write_frontier,
1976                            dependency_since
1977                        );
1978
1979                        dependency_since
1980                    } else {
1981                        data_shard_since
1982                    }
1983                }
1984                None => data_shard_since,
1985            };
1986
1987            // Determine the time dependence of the collection.
1988            let time_dependence = {
1989                use DataSource::*;
1990                if let Some(timeline) = &description.timeline
1991                    && *timeline != Timeline::EpochMilliseconds
1992                {
1993                    // Only the epoch timeline follows wall-clock.
1994                    None
1995                } else {
1996                    match &description.data_source {
1997                        Ingestion(ingestion) => {
1998                            use GenericSourceConnection::*;
1999                            match ingestion.desc.connection {
2000                                // Kafka, Postgres, MySql, and SQL Server sources all
2001                                // follow wall clock.
2002                                Kafka(_) | Postgres(_) | MySql(_) | SqlServer(_) => {
2003                                    Some(TimeDependence::default())
2004                                }
2005                                // Load generators not further specified.
2006                                LoadGenerator(_) => None,
2007                            }
2008                        }
2009                        IngestionExport { ingestion_id, .. } => {
2010                            let c = self_collections.get(ingestion_id).expect("known to exist");
2011                            c.time_dependence.clone()
2012                        }
2013                        // Introspection, other, progress, table, and webhook sources follow wall clock.
2014                        Introspection(_) | Progress | Table { .. } | Webhook { .. } => {
2015                            Some(TimeDependence::default())
2016                        }
2017                        // Materialized views, continual tasks, etc, aren't managed by storage.
2018                        Other => None,
2019                        Sink { .. } => None,
2020                    }
2021                }
2022            };
2023
2024            let ingestion_remap_collection_id = match &description.data_source {
2025                DataSource::Ingestion(desc) => Some(desc.remap_collection_id),
2026                _ => None,
2027            };
2028
2029            let mut collection_state = CollectionState::new(
2030                description.primary,
2031                time_dependence,
2032                ingestion_remap_collection_id,
2033                initial_since,
2034                write_frontier.clone(),
2035                storage_dependencies,
2036                metadata.clone(),
2037            );
2038
2039            // Install the collection state in the appropriate spot.
2040            match &description.data_source {
2041                DataSource::Introspection(_) => {
2042                    self_collections.insert(id, collection_state);
2043                }
2044                DataSource::Webhook => {
2045                    self_collections.insert(id, collection_state);
2046                }
2047                DataSource::IngestionExport { .. } => {
2048                    self_collections.insert(id, collection_state);
2049                }
2050                DataSource::Table => {
2051                    // See comment on self.initial_txn_upper on why we're doing
2052                    // this.
2053                    if is_in_txns(id, &metadata)
2054                        && PartialOrder::less_than(
2055                            &collection_state.write_frontier,
2056                            &self.initial_txn_upper,
2057                        )
2058                    {
2059                        // We could try and be cute and use the join of the txn
2060                        // upper and the table upper. But that has more
2061                        // complicated reasoning for why it is or isn't correct,
2062                        // and we're only dealing with totally ordered times
2063                        // here.
2064                        collection_state
2065                            .write_frontier
2066                            .clone_from(&self.initial_txn_upper);
2067                    }
2068                    self_collections.insert(id, collection_state);
2069                }
2070                DataSource::Progress | DataSource::Other => {
2071                    self_collections.insert(id, collection_state);
2072                }
2073                DataSource::Ingestion(_) => {
2074                    self_collections.insert(id, collection_state);
2075                }
2076                DataSource::Sink { .. } => {
2077                    self_collections.insert(id, collection_state);
2078                }
2079            }
2080
2081            self.register_handles(id, is_in_txns(id, &metadata), since_handle, write_handle);
2082
2083            // If this collection has a dependency, install a read hold on it.
2084            self.install_collection_dependency_read_holds_inner(&mut *self_collections, id)?;
2085        }
2086
2087        drop(self_collections);
2088
2089        self.synchronize_finalized_shards(storage_metadata);
2090
2091        Ok(())
2092    }
2093
2094    async fn alter_table_desc(
2095        &self,
2096        existing_collection: GlobalId,
2097        new_collection: GlobalId,
2098        new_desc: RelationDesc,
2099        expected_version: RelationVersion,
2100    ) -> Result<(), StorageError<Self::Timestamp>> {
2101        let data_shard = {
2102            let self_collections = self.collections.lock().expect("lock poisoned");
2103            let existing = self_collections
2104                .get(&existing_collection)
2105                .ok_or_else(|| StorageError::IdentifierMissing(existing_collection))?;
2106
2107            existing.collection_metadata.data_shard
2108        };
2109
2110        let persist_client = self
2111            .persist
2112            .open(self.persist_location.clone())
2113            .await
2114            .unwrap();
2115
2116        // Evolve the schema of this shard.
2117        let diagnostics = Diagnostics {
2118            shard_name: existing_collection.to_string(),
2119            handle_purpose: "alter_table_desc".to_string(),
2120        };
2121        // We map the Adapter's RelationVersion 1:1 with SchemaId.
2122        let expected_schema = expected_version.into();
2123        let schema_result = persist_client
2124            .compare_and_evolve_schema::<SourceData, (), T, StorageDiff>(
2125                data_shard,
2126                expected_schema,
2127                &new_desc,
2128                &UnitSchema,
2129                diagnostics,
2130            )
2131            .await
2132            .map_err(|e| StorageError::InvalidUsage(e.to_string()))?;
2133        tracing::info!(
2134            ?existing_collection,
2135            ?new_collection,
2136            ?new_desc,
2137            "evolved schema"
2138        );
2139
2140        match schema_result {
2141            CaESchema::Ok(id) => id,
2142            // TODO(alter_table): If we get an expected mismatch we should retry.
2143            CaESchema::ExpectedMismatch {
2144                schema_id,
2145                key,
2146                val,
2147            } => {
2148                mz_ore::soft_panic_or_log!(
2149                    "schema expectation mismatch {schema_id:?}, {key:?}, {val:?}"
2150                );
2151                return Err(StorageError::Generic(anyhow::anyhow!(
2152                    "schema expected mismatch, {existing_collection:?}",
2153                )));
2154            }
2155            CaESchema::Incompatible => {
2156                mz_ore::soft_panic_or_log!(
2157                    "incompatible schema! {existing_collection} {new_desc:?}"
2158                );
2159                return Err(StorageError::Generic(anyhow::anyhow!(
2160                    "schema incompatible, {existing_collection:?}"
2161                )));
2162            }
2163        };
2164
2165        // Once the new schema is registered we can open new data handles.
2166        let (write_handle, since_handle) = self
2167            .open_data_handles(
2168                &new_collection,
2169                data_shard,
2170                None,
2171                new_desc.clone(),
2172                &persist_client,
2173            )
2174            .await;
2175
2176        // TODO(alter_table): Do we need to advance the since of the table to match the time this
2177        // new version was registered with txn-wal?
2178
2179        // Great! Our new schema is registered with Persist, now we need to update our internal
2180        // data structures.
2181        {
2182            let mut self_collections = self.collections.lock().expect("lock poisoned");
2183
2184            // Update the existing collection so we know it's a "projection" of this new one.
2185            let existing = self_collections
2186                .get_mut(&existing_collection)
2187                .expect("existing collection missing");
2188
2189            // A higher level should already be asserting this, but let's make sure.
2190            assert_none!(existing.primary);
2191
2192            // The existing version of the table will depend on the new version.
2193            existing.primary = Some(new_collection);
2194            existing.storage_dependencies.push(new_collection);
2195
2196            // Copy over the frontiers from the previous version.
2197            // The new table starts with two holds - the implied capability, and the hold from
2198            // the previous version - both at the previous version's read frontier.
2199            let implied_capability = existing.read_capabilities.frontier().to_owned();
2200            let write_frontier = existing.write_frontier.clone();
2201
2202            // Determine the relevant read capabilities on the new collection.
2203            //
2204            // Note(parkmycar): Originally we used `install_collection_dependency_read_holds_inner`
2205            // here, but that only installed a ReadHold on the new collection for the implied
2206            // capability of the existing collection. This would cause runtime panics because it
2207            // would eventually result in negative read capabilities.
2208            let mut changes = ChangeBatch::new();
2209            changes.extend(implied_capability.iter().map(|t| (t.clone(), 1)));
2210
2211            // Note: The new collection is now the "primary collection".
2212            let collection_meta = CollectionMetadata {
2213                persist_location: self.persist_location.clone(),
2214                relation_desc: new_desc.clone(),
2215                data_shard,
2216                txns_shard: Some(self.txns_read.txns_id().clone()),
2217            };
2218            let collection_state = CollectionState::new(
2219                None,
2220                existing.time_dependence.clone(),
2221                existing.ingestion_remap_collection_id.clone(),
2222                implied_capability,
2223                write_frontier,
2224                Vec::new(),
2225                collection_meta,
2226            );
2227
2228            // Add a record of the new collection.
2229            self_collections.insert(new_collection, collection_state);
2230
2231            let mut updates = BTreeMap::from([(new_collection, changes)]);
2232            StorageCollectionsImpl::update_read_capabilities_inner(
2233                &self.cmd_tx,
2234                &mut *self_collections,
2235                &mut updates,
2236            );
2237        };
2238
2239        // TODO(alter_table): Support changes to sources.
2240        self.register_handles(new_collection, true, since_handle, write_handle);
2241
2242        info!(%existing_collection, %new_collection, ?new_desc, "altered table");
2243
2244        Ok(())
2245    }
2246
2247    fn drop_collections_unvalidated(
2248        &self,
2249        storage_metadata: &StorageMetadata,
2250        identifiers: Vec<GlobalId>,
2251    ) {
2252        debug!(?identifiers, "drop_collections_unvalidated");
2253
2254        let mut self_collections = self.collections.lock().expect("lock poisoned");
2255
2256        // Policies that advance the since to the empty antichain. We do still
2257        // honor outstanding read holds, and collections will only be dropped
2258        // once those are removed as well.
2259        //
2260        // We don't explicitly remove read capabilities! Downgrading the
2261        // frontier of the source to `[]` (the empty Antichain), will propagate
2262        // to the storage dependencies.
2263        let mut finalized_policies = Vec::new();
2264
2265        for id in identifiers {
2266            // Make sure it's still there, might already have been deleted.
2267            let Some(collection) = self_collections.get(&id) else {
2268                continue;
2269            };
2270
2271            // Unless the collection has a primary, its shard must have been previously removed
2272            // by `StorageCollections::prepare_state`.
2273            if collection.primary.is_none() {
2274                let metadata = storage_metadata.get_collection_shard::<T>(id);
2275                mz_ore::soft_assert_or_log!(
2276                    matches!(metadata, Err(StorageError::IdentifierMissing(_))),
2277                    "dropping {id}, but drop was not synchronized with storage \
2278                     controller via `prepare_state`"
2279                );
2280            }
2281
2282            finalized_policies.push((id, ReadPolicy::ValidFrom(Antichain::new())));
2283        }
2284
2285        self.set_read_policies_inner(&mut self_collections, finalized_policies);
2286
2287        drop(self_collections);
2288
2289        self.synchronize_finalized_shards(storage_metadata);
2290    }
2291
2292    fn set_read_policies(&self, policies: Vec<(GlobalId, ReadPolicy<Self::Timestamp>)>) {
2293        let mut collections = self.collections.lock().expect("lock poisoned");
2294
2295        if tracing::enabled!(tracing::Level::TRACE) {
2296            let user_capabilities = collections
2297                .iter_mut()
2298                .filter(|(id, _c)| id.is_user())
2299                .map(|(id, c)| {
2300                    let updates = c.read_capabilities.updates().cloned().collect_vec();
2301                    (*id, c.implied_capability.clone(), updates)
2302                })
2303                .collect_vec();
2304
2305            trace!(?policies, ?user_capabilities, "set_read_policies");
2306        }
2307
2308        self.set_read_policies_inner(&mut collections, policies);
2309
2310        if tracing::enabled!(tracing::Level::TRACE) {
2311            let user_capabilities = collections
2312                .iter_mut()
2313                .filter(|(id, _c)| id.is_user())
2314                .map(|(id, c)| {
2315                    let updates = c.read_capabilities.updates().cloned().collect_vec();
2316                    (*id, c.implied_capability.clone(), updates)
2317                })
2318                .collect_vec();
2319
2320            trace!(?user_capabilities, "after! set_read_policies");
2321        }
2322    }
2323
2324    fn acquire_read_holds(
2325        &self,
2326        desired_holds: Vec<GlobalId>,
2327    ) -> Result<Vec<ReadHold<Self::Timestamp>>, CollectionMissing> {
2328        if desired_holds.is_empty() {
2329            return Ok(vec![]);
2330        }
2331
2332        let mut collections = self.collections.lock().expect("lock poisoned");
2333
2334        let mut advanced_holds = Vec::new();
2335        // We advance the holds by our current since frontier. Can't acquire
2336        // holds for times that have been compacted away!
2337        //
2338        // NOTE: We acquire read holds at the earliest possible time rather than
2339        // at the implied capability. This is so that, for example, adapter can
2340        // acquire a read hold to hold back the frontier, giving the COMPUTE
2341        // controller a chance to also acquire a read hold at that early
2342        // frontier. If/when we change the interplay between adapter and COMPUTE
2343        // to pass around ReadHold tokens, we might tighten this up and instead
2344        // acquire read holds at the implied capability.
2345        for id in desired_holds.iter() {
2346            let collection = collections.get(id).ok_or(CollectionMissing(*id))?;
2347            let since = collection.read_capabilities.frontier().to_owned();
2348            advanced_holds.push((*id, since));
2349        }
2350
2351        let mut updates = advanced_holds
2352            .iter()
2353            .map(|(id, hold)| {
2354                let mut changes = ChangeBatch::new();
2355                changes.extend(hold.iter().map(|time| (time.clone(), 1)));
2356                (*id, changes)
2357            })
2358            .collect::<BTreeMap<_, _>>();
2359
2360        StorageCollectionsImpl::update_read_capabilities_inner(
2361            &self.cmd_tx,
2362            &mut collections,
2363            &mut updates,
2364        );
2365
2366        let acquired_holds = advanced_holds
2367            .into_iter()
2368            .map(|(id, since)| ReadHold::with_channel(id, since, self.holds_tx.clone()))
2369            .collect_vec();
2370
2371        trace!(?desired_holds, ?acquired_holds, "acquire_read_holds");
2372
2373        Ok(acquired_holds)
2374    }
2375
2376    /// Determine time dependence information for the object.
2377    fn determine_time_dependence(
2378        &self,
2379        id: GlobalId,
2380    ) -> Result<Option<TimeDependence>, TimeDependenceError> {
2381        use TimeDependenceError::CollectionMissing;
2382        let collections = self.collections.lock().expect("lock poisoned");
2383        let state = collections.get(&id).ok_or(CollectionMissing(id))?;
2384        Ok(state.time_dependence.clone())
2385    }
2386
2387    fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
2388        // Destructure `self` here so we don't forget to consider dumping newly added fields.
2389        let Self {
2390            envd_epoch,
2391            read_only,
2392            finalizable_shards,
2393            finalized_shards,
2394            collections,
2395            txns_read: _,
2396            config,
2397            initial_txn_upper,
2398            persist_location,
2399            persist: _,
2400            cmd_tx: _,
2401            holds_tx: _,
2402            _background_task: _,
2403            _finalize_shards_task: _,
2404        } = self;
2405
2406        let finalizable_shards: Vec<_> = finalizable_shards
2407            .lock()
2408            .iter()
2409            .map(ToString::to_string)
2410            .collect();
2411        let finalized_shards: Vec<_> = finalized_shards
2412            .lock()
2413            .iter()
2414            .map(ToString::to_string)
2415            .collect();
2416        let collections: BTreeMap<_, _> = collections
2417            .lock()
2418            .expect("poisoned")
2419            .iter()
2420            .map(|(id, c)| (id.to_string(), format!("{c:?}")))
2421            .collect();
2422        let config = format!("{:?}", config.lock().expect("poisoned"));
2423
2424        Ok(serde_json::json!({
2425            "envd_epoch": envd_epoch,
2426            "read_only": read_only,
2427            "finalizable_shards": finalizable_shards,
2428            "finalized_shards": finalized_shards,
2429            "collections": collections,
2430            "config": config,
2431            "initial_txn_upper": initial_txn_upper,
2432            "persist_location": format!("{persist_location:?}"),
2433        }))
2434    }
2435}
2436
2437/// Wraps either a "critical" [SinceHandle] or a leased [ReadHandle].
2438///
2439/// When a [StorageCollections] is in read-only mode, we will only ever acquire
2440/// [ReadHandle], because acquiring the [SinceHandle] and driving forward its
2441/// since is considered a write. Conversely, when in read-write mode, we acquire
2442/// [SinceHandle].
2443#[derive(Debug)]
2444enum SinceHandleWrapper<T>
2445where
2446    T: TimelyTimestamp + Lattice + Codec64,
2447{
2448    Critical(SinceHandle<SourceData, (), T, StorageDiff>),
2449    Leased(ReadHandle<SourceData, (), T, StorageDiff>),
2450}
2451
2452impl<T> SinceHandleWrapper<T>
2453where
2454    T: TimelyTimestamp + Lattice + Codec64 + TotalOrder + Sync,
2455{
2456    pub fn since(&self) -> &Antichain<T> {
2457        match self {
2458            Self::Critical(handle) => handle.since(),
2459            Self::Leased(handle) => handle.since(),
2460        }
2461    }
2462
2463    pub fn opaque(&self) -> PersistEpoch {
2464        match self {
2465            Self::Critical(handle) => handle.opaque().decode(),
2466            Self::Leased(_handle) => {
2467                // The opaque is expected to be used with
2468                // `compare_and_downgrade_since`, and the leased handle doesn't
2469                // have a notion of an opaque. We pretend here and in
2470                // `compare_and_downgrade_since`.
2471                PersistEpoch(None)
2472            }
2473        }
2474    }
2475
2476    pub async fn compare_and_downgrade_since(
2477        &mut self,
2478        expected: &PersistEpoch,
2479        (opaque, since): (&PersistEpoch, &Antichain<T>),
2480    ) -> Result<Antichain<T>, PersistEpoch> {
2481        match self {
2482            Self::Critical(handle) => handle
2483                .compare_and_downgrade_since(
2484                    &Opaque::encode(expected),
2485                    (&Opaque::encode(opaque), since),
2486                )
2487                .await
2488                .map_err(|e| e.decode()),
2489            Self::Leased(handle) => {
2490                assert_none!(opaque.0);
2491
2492                handle.downgrade_since(since).await;
2493
2494                Ok(since.clone())
2495            }
2496        }
2497    }
2498
2499    pub async fn maybe_compare_and_downgrade_since(
2500        &mut self,
2501        expected: &PersistEpoch,
2502        (opaque, since): (&PersistEpoch, &Antichain<T>),
2503    ) -> Option<Result<Antichain<T>, PersistEpoch>> {
2504        match self {
2505            Self::Critical(handle) => handle
2506                .maybe_compare_and_downgrade_since(
2507                    &Opaque::encode(expected),
2508                    (&Opaque::encode(opaque), since),
2509                )
2510                .await
2511                .map(|r| r.map_err(|o| o.decode())),
2512            Self::Leased(handle) => {
2513                assert_none!(opaque.0);
2514
2515                handle.maybe_downgrade_since(since).await;
2516
2517                Some(Ok(since.clone()))
2518            }
2519        }
2520    }
2521
2522    pub fn snapshot_stats(
2523        &self,
2524        id: GlobalId,
2525        as_of: Option<Antichain<T>>,
2526    ) -> BoxFuture<'static, Result<SnapshotStats, StorageError<T>>> {
2527        match self {
2528            Self::Critical(handle) => {
2529                let res = handle
2530                    .snapshot_stats(as_of)
2531                    .map(move |x| x.map_err(|_| StorageError::ReadBeforeSince(id)));
2532                Box::pin(res)
2533            }
2534            Self::Leased(handle) => {
2535                let res = handle
2536                    .snapshot_stats(as_of)
2537                    .map(move |x| x.map_err(|_| StorageError::ReadBeforeSince(id)));
2538                Box::pin(res)
2539            }
2540        }
2541    }
2542
2543    pub fn snapshot_stats_from_txn(
2544        &self,
2545        id: GlobalId,
2546        data_snapshot: DataSnapshot<T>,
2547    ) -> BoxFuture<'static, Result<SnapshotStats, StorageError<T>>> {
2548        match self {
2549            Self::Critical(handle) => Box::pin(
2550                data_snapshot
2551                    .snapshot_stats_from_critical(handle)
2552                    .map(move |x| x.map_err(|_| StorageError::ReadBeforeSince(id))),
2553            ),
2554            Self::Leased(handle) => Box::pin(
2555                data_snapshot
2556                    .snapshot_stats_from_leased(handle)
2557                    .map(move |x| x.map_err(|_| StorageError::ReadBeforeSince(id))),
2558            ),
2559        }
2560    }
2561}
2562
2563/// State maintained about individual collections.
2564#[derive(Debug, Clone)]
2565struct CollectionState<T> {
2566    /// The primary of this collections.
2567    ///
2568    /// Multiple storage collections can point to the same persist shard,
2569    /// possibly with different schemas. In such a configuration, we select one
2570    /// of the involved collections as the primary, who "owns" the persist
2571    /// shard. All other involved collections have a dependency on the primary.
2572    primary: Option<GlobalId>,
2573
2574    /// Description of how this collection's frontier follows time.
2575    time_dependence: Option<TimeDependence>,
2576    /// The ID of the source remap/progress collection, if this is an ingestion.
2577    ingestion_remap_collection_id: Option<GlobalId>,
2578
2579    /// Accumulation of read capabilities for the collection.
2580    ///
2581    /// This accumulation will always contain `self.implied_capability`, but may
2582    /// also contain capabilities held by others who have read dependencies on
2583    /// this collection.
2584    pub read_capabilities: MutableAntichain<T>,
2585
2586    /// The implicit capability associated with collection creation.  This
2587    /// should never be less than the since of the associated persist
2588    /// collection.
2589    pub implied_capability: Antichain<T>,
2590
2591    /// The policy to use to downgrade `self.implied_capability`.
2592    pub read_policy: ReadPolicy<T>,
2593
2594    /// Storage identifiers on which this collection depends.
2595    pub storage_dependencies: Vec<GlobalId>,
2596
2597    /// Reported write frontier.
2598    pub write_frontier: Antichain<T>,
2599
2600    pub collection_metadata: CollectionMetadata,
2601}
2602
2603impl<T: TimelyTimestamp> CollectionState<T> {
2604    /// Creates a new collection state, with an initial read policy valid from
2605    /// `since`.
2606    pub fn new(
2607        primary: Option<GlobalId>,
2608        time_dependence: Option<TimeDependence>,
2609        ingestion_remap_collection_id: Option<GlobalId>,
2610        since: Antichain<T>,
2611        write_frontier: Antichain<T>,
2612        storage_dependencies: Vec<GlobalId>,
2613        metadata: CollectionMetadata,
2614    ) -> Self {
2615        let mut read_capabilities = MutableAntichain::new();
2616        read_capabilities.update_iter(since.iter().map(|time| (time.clone(), 1)));
2617        Self {
2618            primary,
2619            time_dependence,
2620            ingestion_remap_collection_id,
2621            read_capabilities,
2622            implied_capability: since.clone(),
2623            read_policy: ReadPolicy::NoPolicy {
2624                initial_since: since,
2625            },
2626            storage_dependencies,
2627            write_frontier,
2628            collection_metadata: metadata,
2629        }
2630    }
2631
2632    /// Returns whether the collection was dropped.
2633    pub fn is_dropped(&self) -> bool {
2634        self.read_capabilities.is_empty()
2635    }
2636}
2637
2638/// A task that keeps persist handles, downgrades sinces when asked,
2639/// periodically gets recent uppers from them, and updates the shard collection
2640/// state when needed.
2641///
2642/// This shares state with [StorageCollectionsImpl] via `Arcs` and channels.
2643#[derive(Debug)]
2644struct BackgroundTask<T: TimelyTimestamp + Lattice + Codec64> {
2645    config: Arc<Mutex<StorageConfiguration>>,
2646    cmds_tx: mpsc::UnboundedSender<BackgroundCmd<T>>,
2647    cmds_rx: mpsc::UnboundedReceiver<BackgroundCmd<T>>,
2648    holds_rx: mpsc::UnboundedReceiver<(GlobalId, ChangeBatch<T>)>,
2649    finalizable_shards: Arc<ShardIdSet>,
2650    collections: Arc<std::sync::Mutex<BTreeMap<GlobalId, CollectionState<T>>>>,
2651    // So we know what shard ID corresponds to what global ID, which we need
2652    // when re-enqueing futures for determining the next upper update.
2653    shard_by_id: BTreeMap<GlobalId, ShardId>,
2654    since_handles: BTreeMap<GlobalId, SinceHandleWrapper<T>>,
2655    txns_handle: Option<WriteHandle<SourceData, (), T, StorageDiff>>,
2656    txns_shards: BTreeSet<GlobalId>,
2657}
2658
2659#[derive(Debug)]
2660enum BackgroundCmd<T: TimelyTimestamp + Lattice + Codec64> {
2661    Register {
2662        id: GlobalId,
2663        is_in_txns: bool,
2664        write_handle: WriteHandle<SourceData, (), T, StorageDiff>,
2665        since_handle: SinceHandleWrapper<T>,
2666    },
2667    DowngradeSince(Vec<(GlobalId, Antichain<T>)>),
2668    SnapshotStats(
2669        GlobalId,
2670        SnapshotStatsAsOf<T>,
2671        oneshot::Sender<SnapshotStatsRes<T>>,
2672    ),
2673}
2674
2675/// A newtype wrapper to hang a Debug impl off of.
2676pub(crate) struct SnapshotStatsRes<T>(BoxFuture<'static, Result<SnapshotStats, StorageError<T>>>);
2677
2678impl<T> Debug for SnapshotStatsRes<T> {
2679    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2680        f.debug_struct("SnapshotStatsRes").finish_non_exhaustive()
2681    }
2682}
2683
2684impl<T> BackgroundTask<T>
2685where
2686    T: TimelyTimestamp
2687        + Lattice
2688        + Codec64
2689        + From<EpochMillis>
2690        + TimestampManipulation
2691        + Into<mz_repr::Timestamp>
2692        + Sync,
2693{
2694    async fn run(&mut self) {
2695        // Futures that fetch the recent upper from all other shards.
2696        let mut upper_futures: FuturesUnordered<
2697            std::pin::Pin<
2698                Box<
2699                    dyn Future<
2700                            Output = (
2701                                GlobalId,
2702                                WriteHandle<SourceData, (), T, StorageDiff>,
2703                                Antichain<T>,
2704                            ),
2705                        > + Send,
2706                >,
2707            >,
2708        > = FuturesUnordered::new();
2709
2710        let gen_upper_future =
2711            |id, mut handle: WriteHandle<_, _, _, _>, prev_upper: Antichain<T>| {
2712                let fut = async move {
2713                    soft_assert_or_log!(
2714                        !prev_upper.is_empty(),
2715                        "cannot await progress when upper is already empty"
2716                    );
2717                    handle.wait_for_upper_past(&prev_upper).await;
2718                    let new_upper = handle.shared_upper();
2719                    (id, handle, new_upper)
2720                };
2721
2722                fut
2723            };
2724
2725        let mut txns_upper_future = match self.txns_handle.take() {
2726            Some(txns_handle) => {
2727                let upper = txns_handle.upper().clone();
2728                let txns_upper_future =
2729                    gen_upper_future(GlobalId::Transient(1), txns_handle, upper);
2730                txns_upper_future.boxed()
2731            }
2732            None => async { std::future::pending().await }.boxed(),
2733        };
2734
2735        loop {
2736            tokio::select! {
2737                (id, handle, upper) = &mut txns_upper_future => {
2738                    trace!("new upper from txns shard: {:?}", upper);
2739                    let mut uppers = Vec::new();
2740                    for id in self.txns_shards.iter() {
2741                        uppers.push((*id, &upper));
2742                    }
2743                    self.update_write_frontiers(&uppers).await;
2744
2745                    let fut = gen_upper_future(id, handle, upper);
2746                    txns_upper_future = fut.boxed();
2747                }
2748                Some((id, handle, upper)) = upper_futures.next() => {
2749                    if id.is_user() {
2750                        trace!("new upper for collection {id}: {:?}", upper);
2751                    }
2752                    let current_shard = self.shard_by_id.get(&id);
2753                    if let Some(shard_id) = current_shard {
2754                        if shard_id == &handle.shard_id() {
2755                            // Still current, so process the update and enqueue
2756                            // again!
2757                            let uppers = &[(id, &upper)];
2758                            self.update_write_frontiers(uppers).await;
2759                            if !upper.is_empty() {
2760                                let fut = gen_upper_future(id, handle, upper);
2761                                upper_futures.push(fut.boxed());
2762                            }
2763                        } else {
2764                            // Be polite and expire the write handle. This can
2765                            // happen when we get an upper update for a write
2766                            // handle that has since been replaced via Update.
2767                            handle.expire().await;
2768                        }
2769                    }
2770                }
2771                cmd = self.cmds_rx.recv() => {
2772                    let Some(cmd) = cmd else {
2773                        // We're done!
2774                        break;
2775                    };
2776
2777                    // Drain all commands so we can merge `DowngradeSince` requests. Without this
2778                    // optimization, downgrading sinces could fall behind in the face of a large
2779                    // amount of storage collections.
2780                    let commands = iter::once(cmd).chain(
2781                        iter::from_fn(|| self.cmds_rx.try_recv().ok())
2782                    );
2783                    let mut downgrades = BTreeMap::<_, Antichain<_>>::new();
2784                    for cmd in commands {
2785                        match cmd {
2786                            BackgroundCmd::Register{
2787                                id,
2788                                is_in_txns,
2789                                write_handle,
2790                                since_handle
2791                            } => {
2792                                debug!("registering handles for {}", id);
2793                                let previous = self.shard_by_id.insert(id, write_handle.shard_id());
2794                                if previous.is_some() {
2795                                    panic!("already registered a WriteHandle for collection {id}");
2796                                }
2797
2798                                let previous = self.since_handles.insert(id, since_handle);
2799                                if previous.is_some() {
2800                                    panic!("already registered a SinceHandle for collection {id}");
2801                                }
2802
2803                                if is_in_txns {
2804                                    self.txns_shards.insert(id);
2805                                } else {
2806                                    let upper = write_handle.upper().clone();
2807                                    if !upper.is_empty() {
2808                                        let fut = gen_upper_future(id, write_handle, upper);
2809                                        upper_futures.push(fut.boxed());
2810                                    }
2811                                }
2812                            }
2813                            BackgroundCmd::DowngradeSince(cmds) => {
2814                                for (id, new) in cmds {
2815                                    downgrades.entry(id)
2816                                        .and_modify(|since| since.join_assign(&new))
2817                                        .or_insert(new);
2818                                }
2819                            }
2820                            BackgroundCmd::SnapshotStats(id, as_of, tx) => {
2821                                // NB: The requested as_of could be arbitrarily far
2822                                // in the future. So, in order to avoid blocking
2823                                // this loop until it's available and the
2824                                // `snapshot_stats` call resolves, instead return
2825                                // the future to the caller and await it there.
2826                                let res = match self.since_handles.get(&id) {
2827                                    Some(x) => {
2828                                        let fut: BoxFuture<
2829                                            'static,
2830                                            Result<SnapshotStats, StorageError<T>>,
2831                                        > = match as_of {
2832                                            SnapshotStatsAsOf::Direct(as_of) => {
2833                                                x.snapshot_stats(id, Some(as_of))
2834                                            }
2835                                            SnapshotStatsAsOf::Txns(data_snapshot) => {
2836                                                x.snapshot_stats_from_txn(id, data_snapshot)
2837                                            }
2838                                        };
2839                                        SnapshotStatsRes(fut)
2840                                    }
2841                                    None => SnapshotStatsRes(Box::pin(futures::future::ready(Err(
2842                                        StorageError::IdentifierMissing(id),
2843                                    )))),
2844                                };
2845                                // It's fine if the listener hung up.
2846                                let _ = tx.send(res);
2847                            }
2848                        }
2849                    }
2850
2851                    if !downgrades.is_empty() {
2852                        self.downgrade_sinces(downgrades).await;
2853                    }
2854                }
2855                Some(holds_changes) = self.holds_rx.recv() => {
2856                    let mut batched_changes = BTreeMap::new();
2857                    batched_changes.insert(holds_changes.0, holds_changes.1);
2858
2859                    while let Ok(mut holds_changes) = self.holds_rx.try_recv() {
2860                        let entry = batched_changes.entry(holds_changes.0);
2861                        entry
2862                            .and_modify(|existing| existing.extend(holds_changes.1.drain()))
2863                            .or_insert_with(|| holds_changes.1);
2864                    }
2865
2866                    let mut collections = self.collections.lock().expect("lock poisoned");
2867
2868                    let user_changes = batched_changes
2869                        .iter()
2870                        .filter(|(id, _c)| id.is_user())
2871                        .map(|(id, c)| {
2872                            (id.clone(), c.clone())
2873                        })
2874                        .collect_vec();
2875
2876                    if !user_changes.is_empty() {
2877                        trace!(?user_changes, "applying holds changes from channel");
2878                    }
2879
2880                    StorageCollectionsImpl::update_read_capabilities_inner(
2881                        &self.cmds_tx,
2882                        &mut collections,
2883                        &mut batched_changes,
2884                    );
2885                }
2886            }
2887        }
2888
2889        warn!("BackgroundTask shutting down");
2890    }
2891
2892    #[instrument(level = "debug")]
2893    async fn update_write_frontiers(&self, updates: &[(GlobalId, &Antichain<T>)]) {
2894        let mut read_capability_changes = BTreeMap::default();
2895
2896        let mut self_collections = self.collections.lock().expect("lock poisoned");
2897
2898        for (id, new_upper) in updates.iter() {
2899            let collection = if let Some(c) = self_collections.get_mut(id) {
2900                c
2901            } else {
2902                trace!(
2903                    "Reference to absent collection {id}, due to concurrent removal of that collection"
2904                );
2905                continue;
2906            };
2907
2908            if PartialOrder::less_than(&collection.write_frontier, *new_upper) {
2909                collection.write_frontier.clone_from(new_upper);
2910            }
2911
2912            let mut new_read_capability = collection
2913                .read_policy
2914                .frontier(collection.write_frontier.borrow());
2915
2916            if id.is_user() {
2917                trace!(
2918                    %id,
2919                    implied_capability = ?collection.implied_capability,
2920                    policy = ?collection.read_policy,
2921                    write_frontier = ?collection.write_frontier,
2922                    ?new_read_capability,
2923                    "update_write_frontiers");
2924            }
2925
2926            if PartialOrder::less_equal(&collection.implied_capability, &new_read_capability) {
2927                let mut update = ChangeBatch::new();
2928                update.extend(new_read_capability.iter().map(|time| (time.clone(), 1)));
2929                std::mem::swap(&mut collection.implied_capability, &mut new_read_capability);
2930                update.extend(new_read_capability.iter().map(|time| (time.clone(), -1)));
2931
2932                if !update.is_empty() {
2933                    read_capability_changes.insert(*id, update);
2934                }
2935            }
2936        }
2937
2938        if !read_capability_changes.is_empty() {
2939            StorageCollectionsImpl::update_read_capabilities_inner(
2940                &self.cmds_tx,
2941                &mut self_collections,
2942                &mut read_capability_changes,
2943            );
2944        }
2945    }
2946
2947    async fn downgrade_sinces(&mut self, cmds: BTreeMap<GlobalId, Antichain<T>>) {
2948        // Process all persist calls concurrently.
2949        let mut futures = Vec::with_capacity(cmds.len());
2950        for (id, new_since) in cmds {
2951            // We need to take the since handles here, to satisfy the borrow checker.
2952            // We make sure to always put them back below.
2953            let Some(mut since_handle) = self.since_handles.remove(&id) else {
2954                // This can happen when someone concurrently drops a collection.
2955                trace!("downgrade_sinces: reference to absent collection {id}");
2956                continue;
2957            };
2958
2959            let fut = async move {
2960                if id.is_user() {
2961                    trace!("downgrading since of {} to {:?}", id, new_since);
2962                }
2963
2964                let epoch = since_handle.opaque().clone();
2965                let result = if new_since.is_empty() {
2966                    // A shard's since reaching the empty frontier is a prereq for
2967                    // being able to finalize a shard, so the final downgrade should
2968                    // never be rate-limited.
2969                    Some(
2970                        since_handle
2971                            .compare_and_downgrade_since(&epoch, (&epoch, &new_since))
2972                            .await,
2973                    )
2974                } else {
2975                    since_handle
2976                        .maybe_compare_and_downgrade_since(&epoch, (&epoch, &new_since))
2977                        .await
2978                };
2979                (id, since_handle, result)
2980            };
2981            futures.push(fut);
2982        }
2983
2984        for (id, since_handle, result) in futures::future::join_all(futures).await {
2985            let new_since = match result {
2986                Some(Ok(since)) => Some(since),
2987                Some(Err(other_epoch)) => mz_ore::halt!(
2988                    "fenced by envd @ {other_epoch:?}. ours = {:?}",
2989                    since_handle.opaque(),
2990                ),
2991                None => None,
2992            };
2993
2994            self.since_handles.insert(id, since_handle);
2995
2996            if new_since.is_some_and(|s| s.is_empty()) {
2997                info!(%id, "removing persist handles because the since advanced to []!");
2998
2999                let _since_handle = self.since_handles.remove(&id).expect("known to exist");
3000                let Some(dropped_shard_id) = self.shard_by_id.remove(&id) else {
3001                    panic!("missing GlobalId -> ShardId mapping for id {id}");
3002                };
3003
3004                // We're not responsible for writes to tables, so we also don't
3005                // de-register them from the txn system. Whoever is responsible
3006                // will remove them. We only make sure to remove the table from
3007                // our tracking.
3008                self.txns_shards.remove(&id);
3009
3010                if self
3011                    .config
3012                    .lock()
3013                    .expect("lock poisoned")
3014                    .parameters
3015                    .finalize_shards
3016                {
3017                    info!(
3018                        %id, %dropped_shard_id,
3019                        "enqueuing shard finalization due to dropped collection and dropped \
3020                         persist handle",
3021                    );
3022                    self.finalizable_shards.lock().insert(dropped_shard_id);
3023                } else {
3024                    info!(
3025                        "not triggering shard finalization due to dropped storage object \
3026                         because enable_storage_shard_finalization parameter is false"
3027                    );
3028                }
3029            }
3030        }
3031    }
3032}
3033
3034struct FinalizeShardsTaskConfig {
3035    envd_epoch: NonZeroI64,
3036    config: Arc<Mutex<StorageConfiguration>>,
3037    metrics: StorageCollectionsMetrics,
3038    finalizable_shards: Arc<ShardIdSet>,
3039    finalized_shards: Arc<ShardIdSet>,
3040    persist_location: PersistLocation,
3041    persist: Arc<PersistClientCache>,
3042    read_only: bool,
3043}
3044
3045async fn finalize_shards_task<T>(
3046    FinalizeShardsTaskConfig {
3047        envd_epoch,
3048        config,
3049        metrics,
3050        finalizable_shards,
3051        finalized_shards,
3052        persist_location,
3053        persist,
3054        read_only,
3055    }: FinalizeShardsTaskConfig,
3056) where
3057    T: TimelyTimestamp + TotalOrder + Lattice + Codec64 + Sync,
3058{
3059    if read_only {
3060        info!("disabling shard finalization in read only mode");
3061        return;
3062    }
3063
3064    let mut interval = tokio::time::interval(Duration::from_secs(5));
3065    interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
3066    loop {
3067        interval.tick().await;
3068
3069        if !config
3070            .lock()
3071            .expect("lock poisoned")
3072            .parameters
3073            .finalize_shards
3074        {
3075            debug!(
3076                "not triggering shard finalization due to dropped storage object because enable_storage_shard_finalization parameter is false"
3077            );
3078            continue;
3079        }
3080
3081        let current_finalizable_shards = {
3082            // We hold the lock for as short as possible and pull our cloned set
3083            // of shards.
3084            finalizable_shards.lock().iter().cloned().collect_vec()
3085        };
3086
3087        if current_finalizable_shards.is_empty() {
3088            debug!("no shards to finalize");
3089            continue;
3090        }
3091
3092        debug!(?current_finalizable_shards, "attempting to finalize shards");
3093
3094        // Open a persist client to delete unused shards.
3095        let persist_client = persist.open(persist_location.clone()).await.unwrap();
3096
3097        let metrics = &metrics;
3098        let finalizable_shards = &finalizable_shards;
3099        let finalized_shards = &finalized_shards;
3100        let persist_client = &persist_client;
3101        let diagnostics = &Diagnostics::from_purpose("finalizing shards");
3102
3103        let force_downgrade_since = STORAGE_DOWNGRADE_SINCE_DURING_FINALIZATION
3104            .get(config.lock().expect("lock poisoned").config_set());
3105
3106        let epoch = &PersistEpoch::from(envd_epoch);
3107
3108        futures::stream::iter(current_finalizable_shards.clone())
3109            .map(|shard_id| async move {
3110                let persist_client = persist_client.clone();
3111                let diagnostics = diagnostics.clone();
3112                let epoch = epoch.clone();
3113
3114                metrics.finalization_started.inc();
3115
3116                let is_finalized = persist_client
3117                    .is_finalized::<SourceData, (), T, StorageDiff>(shard_id, diagnostics)
3118                    .await
3119                    .expect("invalid persist usage");
3120
3121                if is_finalized {
3122                    debug!(%shard_id, "shard is already finalized!");
3123                    Some(shard_id)
3124                } else {
3125                    debug!(%shard_id, "finalizing shard");
3126                    let finalize = || async move {
3127                        // TODO: thread the global ID into the shard finalization WAL
3128                        let diagnostics = Diagnostics::from_purpose("finalizing shards");
3129
3130                        // We only use the writer to advance the upper, so using a dummy schema is
3131                        // fine.
3132                        let mut write_handle: WriteHandle<SourceData, (), T, StorageDiff> =
3133                            persist_client
3134                                .open_writer(
3135                                    shard_id,
3136                                    Arc::new(RelationDesc::empty()),
3137                                    Arc::new(UnitSchema),
3138                                    diagnostics,
3139                                )
3140                                .await
3141                                .expect("invalid persist usage");
3142                        write_handle.advance_upper(&Antichain::new()).await;
3143                        write_handle.expire().await;
3144
3145                        if force_downgrade_since {
3146                            let our_opaque = Opaque::encode(&epoch);
3147                            let mut since_handle: SinceHandle<SourceData, (), T, StorageDiff> =
3148                                persist_client
3149                                    .open_critical_since(
3150                                        shard_id,
3151                                        PersistClient::CONTROLLER_CRITICAL_SINCE,
3152                                        our_opaque.clone(),
3153                                        Diagnostics::from_purpose("finalizing shards"),
3154                                    )
3155                                    .await
3156                                    .expect("invalid persist usage");
3157                            let handle_opaque = since_handle.opaque().clone();
3158                            let opaque = if our_opaque.codec_name() == handle_opaque.codec_name()
3159                                && epoch.0 > handle_opaque.decode::<PersistEpoch>().0
3160                            {
3161                                // We're newer, but it's fine to use the
3162                                // handle's old epoch to try and downgrade.
3163                                handle_opaque
3164                            } else {
3165                                // Good luck, buddy! The downgrade below will
3166                                // not succeed. There's a process with a newer
3167                                // epoch out there and someone at some juncture
3168                                // will fence out this process.
3169                                // TODO: consider applying the downgrade no matter what!
3170                                our_opaque
3171                            };
3172                            let new_since = Antichain::new();
3173                            let downgrade = since_handle
3174                                .compare_and_downgrade_since(&opaque, (&opaque, &new_since))
3175                                .await;
3176                            if let Err(e) = downgrade {
3177                                warn!("tried to finalize a shard with an advancing epoch: {e:?}");
3178                                return Ok(());
3179                            }
3180                            // Not available now, so finalization is broken.
3181                            // since_handle.expire().await;
3182                        }
3183
3184                        persist_client
3185                            .finalize_shard::<SourceData, (), T, StorageDiff>(
3186                                shard_id,
3187                                Diagnostics::from_purpose("finalizing shards"),
3188                            )
3189                            .await
3190                    };
3191
3192                    match finalize().await {
3193                        Err(e) => {
3194                            // Rather than error, just leave this shard as
3195                            // one to finalize later.
3196                            warn!("error during finalization of shard {shard_id}: {e:?}");
3197                            None
3198                        }
3199                        Ok(()) => {
3200                            debug!(%shard_id, "finalize success!");
3201                            Some(shard_id)
3202                        }
3203                    }
3204                }
3205            })
3206            // Poll each future for each collection concurrently, maximum of 10
3207            // at a time.
3208            // TODO(benesch): the concurrency here should be configurable
3209            // via LaunchDarkly.
3210            .buffer_unordered(10)
3211            // HERE BE DRAGONS: see warning on other uses of buffer_unordered.
3212            // The closure passed to `for_each` must remain fast or we risk
3213            // starving the finalization futures of calls to `poll`.
3214            .for_each(|shard_id| async move {
3215                match shard_id {
3216                    None => metrics.finalization_failed.inc(),
3217                    Some(shard_id) => {
3218                        // We make successfully finalized shards available for
3219                        // removal from the finalization WAL one by one, so that
3220                        // a handful of stuck shards don't prevent us from
3221                        // removing the shards that have made progress. The
3222                        // overhead of repeatedly acquiring and releasing the
3223                        // locks is negligible.
3224                        {
3225                            let mut finalizable_shards = finalizable_shards.lock();
3226                            let mut finalized_shards = finalized_shards.lock();
3227                            finalizable_shards.remove(&shard_id);
3228                            finalized_shards.insert(shard_id);
3229                        }
3230
3231                        metrics.finalization_succeeded.inc();
3232                    }
3233                }
3234            })
3235            .await;
3236
3237        debug!("done finalizing shards");
3238    }
3239}
3240
3241#[derive(Debug)]
3242pub(crate) enum SnapshotStatsAsOf<T: TimelyTimestamp + Lattice + Codec64> {
3243    /// Stats for a shard with an "eager" upper (one that continually advances
3244    /// as time passes, even if no writes are coming in).
3245    Direct(Antichain<T>),
3246    /// Stats for a shard with a "lazy" upper (one that only physically advances
3247    /// in response to writes).
3248    Txns(DataSnapshot<T>),
3249}
3250
3251#[cfg(test)]
3252mod tests {
3253    use std::str::FromStr;
3254    use std::sync::Arc;
3255
3256    use mz_build_info::DUMMY_BUILD_INFO;
3257    use mz_dyncfg::ConfigSet;
3258    use mz_ore::assert_err;
3259    use mz_ore::metrics::{MetricsRegistry, UIntGauge};
3260    use mz_ore::now::SYSTEM_TIME;
3261    use mz_ore::url::SensitiveUrl;
3262    use mz_persist_client::cache::PersistClientCache;
3263    use mz_persist_client::cfg::PersistConfig;
3264    use mz_persist_client::rpc::PubSubClientConnection;
3265    use mz_persist_client::{Diagnostics, PersistClient, PersistLocation, ShardId};
3266    use mz_persist_types::codec_impls::UnitSchema;
3267    use mz_repr::{RelationDesc, Row};
3268    use mz_secrets::InMemorySecretsController;
3269
3270    use super::*;
3271
3272    #[mz_ore::test(tokio::test)]
3273    #[cfg_attr(miri, ignore)] // unsupported operation: integer-to-pointer casts and `ptr::from_exposed_addr`
3274    async fn test_snapshot_stats(&self) {
3275        let persist_location = PersistLocation {
3276            blob_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
3277            consensus_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
3278        };
3279        let persist_client = PersistClientCache::new(
3280            PersistConfig::new_default_configs(&DUMMY_BUILD_INFO, SYSTEM_TIME.clone()),
3281            &MetricsRegistry::new(),
3282            |_, _| PubSubClientConnection::noop(),
3283        );
3284        let persist_client = Arc::new(persist_client);
3285
3286        let (cmds_tx, mut background_task) =
3287            BackgroundTask::new_for_test(persist_location.clone(), Arc::clone(&persist_client));
3288        let background_task =
3289            mz_ore::task::spawn(|| "storage_collections::background_task", async move {
3290                background_task.run().await
3291            });
3292
3293        let persist = persist_client.open(persist_location).await.unwrap();
3294
3295        let shard_id = ShardId::new();
3296        let since_handle = persist
3297            .open_critical_since(
3298                shard_id,
3299                PersistClient::CONTROLLER_CRITICAL_SINCE,
3300                Opaque::encode(&PersistEpoch::default()),
3301                Diagnostics::for_tests(),
3302            )
3303            .await
3304            .unwrap();
3305        let write_handle = persist
3306            .open_writer::<SourceData, (), mz_repr::Timestamp, StorageDiff>(
3307                shard_id,
3308                Arc::new(RelationDesc::empty()),
3309                Arc::new(UnitSchema),
3310                Diagnostics::for_tests(),
3311            )
3312            .await
3313            .unwrap();
3314
3315        cmds_tx
3316            .send(BackgroundCmd::Register {
3317                id: GlobalId::User(1),
3318                is_in_txns: false,
3319                since_handle: SinceHandleWrapper::Critical(since_handle),
3320                write_handle,
3321            })
3322            .unwrap();
3323
3324        let mut write_handle = persist
3325            .open_writer::<SourceData, (), mz_repr::Timestamp, StorageDiff>(
3326                shard_id,
3327                Arc::new(RelationDesc::empty()),
3328                Arc::new(UnitSchema),
3329                Diagnostics::for_tests(),
3330            )
3331            .await
3332            .unwrap();
3333
3334        // No stats for unknown GlobalId.
3335        let stats =
3336            snapshot_stats(&cmds_tx, GlobalId::User(2), Antichain::from_elem(0.into())).await;
3337        assert_err!(stats);
3338
3339        // Stats don't resolve for as_of past the upper.
3340        let stats_fut = snapshot_stats(&cmds_tx, GlobalId::User(1), Antichain::from_elem(1.into()));
3341        assert_none!(stats_fut.now_or_never());
3342
3343        // // Call it again because now_or_never consumed our future and it's not clone-able.
3344        let stats_ts1_fut =
3345            snapshot_stats(&cmds_tx, GlobalId::User(1), Antichain::from_elem(1.into()));
3346
3347        // Write some data.
3348        let data = (
3349            (SourceData(Ok(Row::default())), ()),
3350            mz_repr::Timestamp::from(0),
3351            1i64,
3352        );
3353        let () = write_handle
3354            .compare_and_append(
3355                &[data],
3356                Antichain::from_elem(0.into()),
3357                Antichain::from_elem(1.into()),
3358            )
3359            .await
3360            .unwrap()
3361            .unwrap();
3362
3363        // Verify that we can resolve stats for ts 0 while the ts 1 stats call is outstanding.
3364        let stats = snapshot_stats(&cmds_tx, GlobalId::User(1), Antichain::from_elem(0.into()))
3365            .await
3366            .unwrap();
3367        assert_eq!(stats.num_updates, 1);
3368
3369        // Write more data and unblock the ts 1 call
3370        let data = (
3371            (SourceData(Ok(Row::default())), ()),
3372            mz_repr::Timestamp::from(1),
3373            1i64,
3374        );
3375        let () = write_handle
3376            .compare_and_append(
3377                &[data],
3378                Antichain::from_elem(1.into()),
3379                Antichain::from_elem(2.into()),
3380            )
3381            .await
3382            .unwrap()
3383            .unwrap();
3384
3385        let stats = stats_ts1_fut.await.unwrap();
3386        assert_eq!(stats.num_updates, 2);
3387
3388        // Make sure it runs until at least here.
3389        drop(background_task);
3390    }
3391
3392    async fn snapshot_stats<T: TimelyTimestamp + Lattice + Codec64>(
3393        cmds_tx: &mpsc::UnboundedSender<BackgroundCmd<T>>,
3394        id: GlobalId,
3395        as_of: Antichain<T>,
3396    ) -> Result<SnapshotStats, StorageError<T>> {
3397        let (tx, rx) = oneshot::channel();
3398        cmds_tx
3399            .send(BackgroundCmd::SnapshotStats(
3400                id,
3401                SnapshotStatsAsOf::Direct(as_of),
3402                tx,
3403            ))
3404            .unwrap();
3405        let res = rx.await.expect("BackgroundTask should be live").0;
3406
3407        res.await
3408    }
3409
3410    impl<T: TimelyTimestamp + Lattice + Codec64> BackgroundTask<T> {
3411        fn new_for_test(
3412            _persist_location: PersistLocation,
3413            _persist_client: Arc<PersistClientCache>,
3414        ) -> (mpsc::UnboundedSender<BackgroundCmd<T>>, Self) {
3415            let (cmds_tx, cmds_rx) = mpsc::unbounded_channel();
3416            let (_holds_tx, holds_rx) = mpsc::unbounded_channel();
3417            let connection_context =
3418                ConnectionContext::for_tests(Arc::new(InMemorySecretsController::new()));
3419
3420            let task = Self {
3421                config: Arc::new(Mutex::new(StorageConfiguration::new(
3422                    connection_context,
3423                    ConfigSet::default(),
3424                ))),
3425                cmds_tx: cmds_tx.clone(),
3426                cmds_rx,
3427                holds_rx,
3428                finalizable_shards: Arc::new(ShardIdSet::new(
3429                    UIntGauge::new("finalizable_shards", "dummy gauge for tests").unwrap(),
3430                )),
3431                collections: Arc::new(Mutex::new(BTreeMap::new())),
3432                shard_by_id: BTreeMap::new(),
3433                since_handles: BTreeMap::new(),
3434                txns_handle: None,
3435                txns_shards: BTreeSet::new(),
3436            };
3437
3438            (cmds_tx, task)
3439        }
3440    }
3441}