Skip to main content

mz_storage_client/
storage_collections.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! An abstraction for dealing with storage collections.
11
12use std::cmp::Reverse;
13use std::collections::{BTreeMap, BTreeSet};
14use std::fmt::Debug;
15use std::num::NonZeroI64;
16use std::sync::{Arc, Mutex};
17use std::time::Duration;
18
19use async_trait::async_trait;
20use differential_dataflow::lattice::Lattice;
21use futures::future::BoxFuture;
22use futures::stream::{BoxStream, FuturesUnordered};
23use futures::{Future, FutureExt, StreamExt};
24use itertools::Itertools;
25use mz_ore::collections::CollectionExt;
26use mz_ore::metrics::MetricsRegistry;
27use mz_ore::now::{EpochMillis, NowFn};
28use mz_ore::task::AbortOnDropHandle;
29use mz_ore::{assert_none, instrument, soft_assert_or_log};
30use mz_persist_client::cache::PersistClientCache;
31use mz_persist_client::cfg::USE_CRITICAL_SINCE_SNAPSHOT;
32use mz_persist_client::critical::SinceHandle;
33use mz_persist_client::read::{Cursor, ReadHandle};
34use mz_persist_client::schema::CaESchema;
35use mz_persist_client::stats::{SnapshotPartsStats, SnapshotStats};
36use mz_persist_client::write::WriteHandle;
37use mz_persist_client::{Diagnostics, PersistClient, PersistLocation, ShardId};
38use mz_persist_types::Codec64;
39use mz_persist_types::codec_impls::UnitSchema;
40use mz_persist_types::txn::TxnsCodec;
41use mz_repr::{GlobalId, RelationDesc, RelationVersion, Row, TimestampManipulation};
42use mz_storage_types::StorageDiff;
43use mz_storage_types::configuration::StorageConfiguration;
44use mz_storage_types::connections::ConnectionContext;
45use mz_storage_types::controller::{CollectionMetadata, StorageError, TxnsCodecRow};
46use mz_storage_types::dyncfgs::STORAGE_DOWNGRADE_SINCE_DURING_FINALIZATION;
47use mz_storage_types::errors::CollectionMissing;
48use mz_storage_types::parameters::StorageParameters;
49use mz_storage_types::read_holds::ReadHold;
50use mz_storage_types::read_policy::ReadPolicy;
51use mz_storage_types::sources::{GenericSourceConnection, SourceData, SourceEnvelope, Timeline};
52use mz_storage_types::time_dependence::{TimeDependence, TimeDependenceError};
53use mz_txn_wal::metrics::Metrics as TxnMetrics;
54use mz_txn_wal::txn_read::{DataSnapshot, TxnsRead};
55use mz_txn_wal::txns::TxnsHandle;
56use timely::PartialOrder;
57use timely::order::TotalOrder;
58use timely::progress::frontier::MutableAntichain;
59use timely::progress::{Antichain, ChangeBatch, Timestamp as TimelyTimestamp};
60use tokio::sync::{mpsc, oneshot};
61use tokio::time::MissedTickBehavior;
62use tracing::{debug, info, trace, warn};
63
64use crate::client::TimestamplessUpdateBuilder;
65use crate::controller::{
66    CollectionDescription, DataSource, PersistEpoch, StorageMetadata, StorageTxn,
67};
68use crate::storage_collections::metrics::{ShardIdSet, StorageCollectionsMetrics};
69
70mod metrics;
71
72/// An abstraction for keeping track of storage collections and managing access
73/// to them.
74///
75/// Responsibilities:
76///
77/// - Keeps a critical persist handle for holding the since of collections
78///   where it need to be.
79///
80/// - Drives the since forward based on the upper of a collection and a
81///   [ReadPolicy].
82///
83/// - Hands out [ReadHolds](ReadHold) that prevent a collection's since from
84/// advancing while it needs to be read at a specific time.
85#[async_trait]
86pub trait StorageCollections: Debug + Sync {
87    type Timestamp: TimelyTimestamp;
88
89    /// On boot, reconcile this [StorageCollections] with outside state. We get
90    /// a [StorageTxn] where we can record any durable state that we need.
91    ///
92    /// We get `init_ids`, which tells us about all collections that currently
93    /// exist, so that we can record durable state for those that _we_ don't
94    /// know yet about.
95    async fn initialize_state(
96        &self,
97        txn: &mut (dyn StorageTxn<Self::Timestamp> + Send),
98        init_ids: BTreeSet<GlobalId>,
99    ) -> Result<(), StorageError<Self::Timestamp>>;
100
101    /// Update storage configuration with new parameters.
102    fn update_parameters(&self, config_params: StorageParameters);
103
104    /// Returns the [CollectionMetadata] of the collection identified by `id`.
105    fn collection_metadata(&self, id: GlobalId) -> Result<CollectionMetadata, CollectionMissing>;
106
107    /// Acquire an iterator over [CollectionMetadata] for all active
108    /// collections.
109    ///
110    /// A collection is "active" when it has a non empty frontier of read
111    /// capabilties.
112    fn active_collection_metadatas(&self) -> Vec<(GlobalId, CollectionMetadata)>;
113
114    /// Returns the frontiers of the identified collection.
115    fn collection_frontiers(
116        &self,
117        id: GlobalId,
118    ) -> Result<CollectionFrontiers<Self::Timestamp>, CollectionMissing> {
119        let frontiers = self
120            .collections_frontiers(vec![id])?
121            .expect_element(|| "known to exist");
122
123        Ok(frontiers)
124    }
125
126    /// Atomically gets and returns the frontiers of all the identified
127    /// collections.
128    fn collections_frontiers(
129        &self,
130        id: Vec<GlobalId>,
131    ) -> Result<Vec<CollectionFrontiers<Self::Timestamp>>, CollectionMissing>;
132
133    /// Atomically gets and returns the frontiers of all active collections.
134    ///
135    /// A collection is "active" when it has a non-empty frontier of read
136    /// capabilities.
137    fn active_collection_frontiers(&self) -> Vec<CollectionFrontiers<Self::Timestamp>>;
138
139    /// Checks whether a collection exists under the given `GlobalId`. Returns
140    /// an error if the collection does not exist.
141    fn check_exists(&self, id: GlobalId) -> Result<(), StorageError<Self::Timestamp>>;
142
143    /// Returns aggregate statistics about the contents of the local input named
144    /// `id` at `as_of`.
145    async fn snapshot_stats(
146        &self,
147        id: GlobalId,
148        as_of: Antichain<Self::Timestamp>,
149    ) -> Result<SnapshotStats, StorageError<Self::Timestamp>>;
150
151    /// Returns aggregate statistics about the contents of the local input named
152    /// `id` at `as_of`.
153    ///
154    /// Note that this async function itself returns a future. We may
155    /// need to block on the stats being available, but don't want to hold a reference
156    /// to the controller for too long... so the outer future holds a reference to the
157    /// controller but returns quickly, and the inner future is slow but does not
158    /// reference the controller.
159    async fn snapshot_parts_stats(
160        &self,
161        id: GlobalId,
162        as_of: Antichain<Self::Timestamp>,
163    ) -> BoxFuture<'static, Result<SnapshotPartsStats, StorageError<Self::Timestamp>>>;
164
165    /// Returns a snapshot of the contents of collection `id` at `as_of`.
166    fn snapshot(
167        &self,
168        id: GlobalId,
169        as_of: Self::Timestamp,
170    ) -> BoxFuture<'static, Result<Vec<(Row, StorageDiff)>, StorageError<Self::Timestamp>>>;
171
172    /// Returns a snapshot of the contents of collection `id` at the largest
173    /// readable `as_of`.
174    async fn snapshot_latest(
175        &self,
176        id: GlobalId,
177    ) -> Result<Vec<Row>, StorageError<Self::Timestamp>>;
178
179    /// Returns a snapshot of the contents of collection `id` at `as_of`.
180    fn snapshot_cursor(
181        &self,
182        id: GlobalId,
183        as_of: Self::Timestamp,
184    ) -> BoxFuture<'static, Result<SnapshotCursor<Self::Timestamp>, StorageError<Self::Timestamp>>>
185    where
186        Self::Timestamp: Codec64 + TimelyTimestamp + Lattice;
187
188    /// Generates a snapshot of the contents of collection `id` at `as_of` and
189    /// streams out all of the updates in bounded memory.
190    ///
191    /// The output is __not__ consolidated.
192    fn snapshot_and_stream(
193        &self,
194        id: GlobalId,
195        as_of: Self::Timestamp,
196    ) -> BoxFuture<
197        'static,
198        Result<
199            BoxStream<'static, (SourceData, Self::Timestamp, StorageDiff)>,
200            StorageError<Self::Timestamp>,
201        >,
202    >;
203
204    /// Create a [`TimestamplessUpdateBuilder`] that can be used to stage
205    /// updates for the provided [`GlobalId`].
206    fn create_update_builder(
207        &self,
208        id: GlobalId,
209    ) -> BoxFuture<
210        'static,
211        Result<
212            TimestamplessUpdateBuilder<SourceData, (), Self::Timestamp, StorageDiff>,
213            StorageError<Self::Timestamp>,
214        >,
215    >
216    where
217        Self::Timestamp: Lattice + Codec64;
218
219    /// Update the given [`StorageTxn`] with the appropriate metadata given the
220    /// IDs to add and drop.
221    ///
222    /// The data modified in the `StorageTxn` must be made available in all
223    /// subsequent calls that require [`StorageMetadata`] as a parameter.
224    async fn prepare_state(
225        &self,
226        txn: &mut (dyn StorageTxn<Self::Timestamp> + Send),
227        ids_to_add: BTreeSet<GlobalId>,
228        ids_to_drop: BTreeSet<GlobalId>,
229        ids_to_register: BTreeMap<GlobalId, ShardId>,
230    ) -> Result<(), StorageError<Self::Timestamp>>;
231
232    /// Create the collections described by the individual
233    /// [CollectionDescriptions](CollectionDescription).
234    ///
235    /// Each command carries the source id, the source description, and any
236    /// associated metadata needed to ingest the particular source.
237    ///
238    /// This command installs collection state for the indicated sources, and
239    /// they are now valid to use in queries at times beyond the initial `since`
240    /// frontiers. Each collection also acquires a read capability at this
241    /// frontier, which will need to be repeatedly downgraded with
242    /// `allow_compaction()` to permit compaction.
243    ///
244    /// This method is NOT idempotent; It can fail between processing of
245    /// different collections and leave the [StorageCollections] in an
246    /// inconsistent state. It is almost always wrong to do anything but abort
247    /// the process on `Err`.
248    ///
249    /// The `register_ts` is used as the initial timestamp that tables are
250    /// available for reads. (We might later give non-tables the same treatment,
251    /// but hold off on that initially.) Callers must provide a Some if any of
252    /// the collections is a table. A None may be given if none of the
253    /// collections are a table (i.e. all materialized views, sources, etc).
254    ///
255    /// `migrated_storage_collections` is a set of migrated storage collections to be excluded
256    /// from the txn-wal sub-system.
257    async fn create_collections_for_bootstrap(
258        &self,
259        storage_metadata: &StorageMetadata,
260        register_ts: Option<Self::Timestamp>,
261        collections: Vec<(GlobalId, CollectionDescription<Self::Timestamp>)>,
262        migrated_storage_collections: &BTreeSet<GlobalId>,
263    ) -> Result<(), StorageError<Self::Timestamp>>;
264
265    /// Updates the [`RelationDesc`] for the specified table.
266    async fn alter_table_desc(
267        &self,
268        existing_collection: GlobalId,
269        new_collection: GlobalId,
270        new_desc: RelationDesc,
271        expected_version: RelationVersion,
272    ) -> Result<(), StorageError<Self::Timestamp>>;
273
274    /// Drops the read capability for the sources and allows their resources to
275    /// be reclaimed.
276    ///
277    /// TODO(jkosh44): This method does not validate the provided identifiers.
278    /// Currently when the controller starts/restarts it has no durable state.
279    /// That means that it has no way of remembering any past commands sent. In
280    /// the future we plan on persisting state for the controller so that it is
281    /// aware of past commands. Therefore this method is for dropping sources
282    /// that we know to have been previously created, but have been forgotten by
283    /// the controller due to a restart. Once command history becomes durable we
284    /// can remove this method and use the normal `drop_sources`.
285    fn drop_collections_unvalidated(
286        &self,
287        storage_metadata: &StorageMetadata,
288        identifiers: Vec<GlobalId>,
289    );
290
291    /// Assigns a read policy to specific identifiers.
292    ///
293    /// The policies are assigned in the order presented, and repeated
294    /// identifiers should conclude with the last policy. Changing a policy will
295    /// immediately downgrade the read capability if appropriate, but it will
296    /// not "recover" the read capability if the prior capability is already
297    /// ahead of it.
298    ///
299    /// This [StorageCollections] may include its own overrides on these
300    /// policies.
301    ///
302    /// Identifiers not present in `policies` retain their existing read
303    /// policies.
304    fn set_read_policies(&self, policies: Vec<(GlobalId, ReadPolicy<Self::Timestamp>)>);
305
306    /// Acquires and returns the earliest possible read holds for the specified
307    /// collections.
308    fn acquire_read_holds(
309        &self,
310        desired_holds: Vec<GlobalId>,
311    ) -> Result<Vec<ReadHold<Self::Timestamp>>, CollectionMissing>;
312
313    /// Get the time dependence for a storage collection. Returns no value if unknown or if
314    /// the object isn't managed by storage.
315    fn determine_time_dependence(
316        &self,
317        id: GlobalId,
318    ) -> Result<Option<TimeDependence>, TimeDependenceError>;
319
320    /// Returns the state of [`StorageCollections`] formatted as JSON.
321    fn dump(&self) -> Result<serde_json::Value, anyhow::Error>;
322}
323
324/// A cursor over a snapshot, allowing us to read just part of a snapshot in its
325/// consolidated form.
326pub struct SnapshotCursor<T: Codec64 + TimelyTimestamp + Lattice> {
327    // We allocate a temporary read handle for each snapshot, and that handle needs to live at
328    // least as long as the cursor itself, which holds part leases. Bundling them together!
329    pub _read_handle: ReadHandle<SourceData, (), T, StorageDiff>,
330    pub cursor: Cursor<SourceData, (), T, StorageDiff>,
331}
332
333impl<T: Codec64 + TimelyTimestamp + Lattice + Sync> SnapshotCursor<T> {
334    pub async fn next(
335        &mut self,
336    ) -> Option<impl Iterator<Item = (SourceData, T, StorageDiff)> + Sized + '_> {
337        let iter = self.cursor.next().await?;
338        Some(iter.map(|((k, ()), t, d)| (k, t, d)))
339    }
340}
341
342/// Frontiers of the collection identified by `id`.
343#[derive(Debug)]
344pub struct CollectionFrontiers<T> {
345    /// The [GlobalId] of the collection that these frontiers belong to.
346    pub id: GlobalId,
347
348    /// The upper/write frontier of the collection.
349    pub write_frontier: Antichain<T>,
350
351    /// The since frontier that is implied by the collection's existence,
352    /// disregarding any read holds.
353    ///
354    /// Concretely, it is the since frontier that is implied by the combination
355    /// of the `write_frontier` and a [ReadPolicy]. The implied capability is
356    /// derived from the write frontier using the [ReadPolicy].
357    pub implied_capability: Antichain<T>,
358
359    /// The frontier of all oustanding [ReadHolds](ReadHold). This includes the
360    /// implied capability.
361    pub read_capabilities: Antichain<T>,
362}
363
364/// Implementation of [StorageCollections] that is shallow-cloneable and uses a
365/// background task for doing work concurrently, in the background.
366#[derive(Debug, Clone)]
367pub struct StorageCollectionsImpl<
368    T: TimelyTimestamp + Lattice + Codec64 + From<EpochMillis> + TimestampManipulation,
369> {
370    /// The fencing token for this instance of [StorageCollections], and really
371    /// all of the controllers and Coordinator.
372    envd_epoch: NonZeroI64,
373
374    /// Whether or not this [StorageCollections] is in read-only mode.
375    ///
376    /// When in read-only mode, we are not allowed to affect changes to external
377    /// systems, including, for example, acquiring and downgrading critical
378    /// [SinceHandles](SinceHandle)
379    read_only: bool,
380
381    /// The set of [ShardIds](ShardId) that we have to finalize. These will have
382    /// been persisted by the caller of [StorageCollections::prepare_state].
383    finalizable_shards: Arc<ShardIdSet>,
384
385    /// The set of [ShardIds](ShardId) that we have finalized. We keep track of
386    /// shards here until we are given a chance to let our callers know that
387    /// these have been finalized, for example via
388    /// [StorageCollections::prepare_state].
389    finalized_shards: Arc<ShardIdSet>,
390
391    /// Collections maintained by this [StorageCollections].
392    collections: Arc<std::sync::Mutex<BTreeMap<GlobalId, CollectionState<T>>>>,
393
394    /// A shared TxnsCache running in a task and communicated with over a channel.
395    txns_read: TxnsRead<T>,
396
397    /// Storage configuration parameters.
398    config: Arc<Mutex<StorageConfiguration>>,
399
400    /// The upper of the txn shard as it was when we booted. We forward the
401    /// upper of created/registered tables to make sure that their uppers are
402    /// not less than the initially known txn upper.
403    ///
404    /// NOTE: This works around a quirk in how the adapter chooses the as_of of
405    /// existing indexes when bootstrapping, where tables that have an upper
406    /// that is less than the initially known txn upper can lead to indexes that
407    /// cannot hydrate in read-only mode.
408    initial_txn_upper: Antichain<T>,
409
410    /// The persist location where all storage collections are being written to
411    persist_location: PersistLocation,
412
413    /// A persist client used to write to storage collections
414    persist: Arc<PersistClientCache>,
415
416    /// For sending commands to our internal task.
417    cmd_tx: mpsc::UnboundedSender<BackgroundCmd<T>>,
418
419    /// For sending updates about read holds to our internal task.
420    holds_tx: mpsc::UnboundedSender<(GlobalId, ChangeBatch<T>)>,
421
422    /// Handles to tasks we own, making sure they're dropped when we are.
423    _background_task: Arc<AbortOnDropHandle<()>>,
424    _finalize_shards_task: Arc<AbortOnDropHandle<()>>,
425}
426
427// Supporting methods for implementing [StorageCollections].
428//
429// Almost all internal methods that are the backing implementation for a trait
430// method have the `_inner` suffix.
431//
432// We follow a pattern where `_inner` methods get a mutable reference to the
433// shared collections state, and it's the public-facing method that locks the
434/// A boxed stream of source data with timestamps and diffs.
435type SourceDataStream<T> = BoxStream<'static, (SourceData, T, StorageDiff)>;
436
437// state for the duration of its invocation. This allows calling other `_inner`
438// methods from within `_inner` methods.
439impl<T> StorageCollectionsImpl<T>
440where
441    T: TimelyTimestamp
442        + Lattice
443        + Codec64
444        + From<EpochMillis>
445        + TimestampManipulation
446        + Into<mz_repr::Timestamp>
447        + Sync,
448{
449    /// Creates and returns a new [StorageCollections].
450    ///
451    /// Note that when creating a new [StorageCollections], you must also
452    /// reconcile it with the previous state using
453    /// [StorageCollections::initialize_state],
454    /// [StorageCollections::prepare_state], and
455    /// [StorageCollections::create_collections_for_bootstrap].
456    pub async fn new(
457        persist_location: PersistLocation,
458        persist_clients: Arc<PersistClientCache>,
459        metrics_registry: &MetricsRegistry,
460        _now: NowFn,
461        txns_metrics: Arc<TxnMetrics>,
462        envd_epoch: NonZeroI64,
463        read_only: bool,
464        connection_context: ConnectionContext,
465        txn: &dyn StorageTxn<T>,
466    ) -> Self {
467        let metrics = StorageCollectionsMetrics::register_into(metrics_registry);
468
469        // This value must be already installed because we must ensure it's
470        // durably recorded before it is used, otherwise we risk leaking persist
471        // state.
472        let txns_id = txn
473            .get_txn_wal_shard()
474            .expect("must call prepare initialization before creating StorageCollections");
475
476        let txns_client = persist_clients
477            .open(persist_location.clone())
478            .await
479            .expect("location should be valid");
480
481        // We have to initialize, so that TxnsRead::start() below does not
482        // block.
483        let _txns_handle: TxnsHandle<SourceData, (), T, StorageDiff, PersistEpoch, TxnsCodecRow> =
484            TxnsHandle::open(
485                T::minimum(),
486                txns_client.clone(),
487                txns_client.dyncfgs().clone(),
488                Arc::clone(&txns_metrics),
489                txns_id,
490            )
491            .await;
492
493        // For handing to the background task, for listening to upper updates.
494        let (txns_key_schema, txns_val_schema) = TxnsCodecRow::schemas();
495        let mut txns_write = txns_client
496            .open_writer(
497                txns_id,
498                Arc::new(txns_key_schema),
499                Arc::new(txns_val_schema),
500                Diagnostics {
501                    shard_name: "txns".to_owned(),
502                    handle_purpose: "commit txns".to_owned(),
503                },
504            )
505            .await
506            .expect("txns schema shouldn't change");
507
508        let txns_read = TxnsRead::start::<TxnsCodecRow>(txns_client.clone(), txns_id).await;
509
510        let collections = Arc::new(std::sync::Mutex::new(BTreeMap::default()));
511        let finalizable_shards =
512            Arc::new(ShardIdSet::new(metrics.finalization_outstanding.clone()));
513        let finalized_shards =
514            Arc::new(ShardIdSet::new(metrics.finalization_pending_commit.clone()));
515        let config = Arc::new(Mutex::new(StorageConfiguration::new(
516            connection_context,
517            mz_dyncfgs::all_dyncfgs(),
518        )));
519
520        let initial_txn_upper = txns_write.fetch_recent_upper().await.to_owned();
521
522        let (cmd_tx, cmd_rx) = mpsc::unbounded_channel();
523        let (holds_tx, holds_rx) = mpsc::unbounded_channel();
524        let mut background_task = BackgroundTask {
525            config: Arc::clone(&config),
526            cmds_tx: cmd_tx.clone(),
527            cmds_rx: cmd_rx,
528            holds_rx,
529            collections: Arc::clone(&collections),
530            finalizable_shards: Arc::clone(&finalizable_shards),
531            shard_by_id: BTreeMap::new(),
532            since_handles: BTreeMap::new(),
533            txns_handle: Some(txns_write),
534            txns_shards: Default::default(),
535        };
536
537        let background_task =
538            mz_ore::task::spawn(|| "storage_collections::background_task", async move {
539                background_task.run().await
540            });
541
542        let finalize_shards_task = mz_ore::task::spawn(
543            || "storage_collections::finalize_shards_task",
544            finalize_shards_task::<T>(FinalizeShardsTaskConfig {
545                envd_epoch: envd_epoch.clone(),
546                config: Arc::clone(&config),
547                metrics,
548                finalizable_shards: Arc::clone(&finalizable_shards),
549                finalized_shards: Arc::clone(&finalized_shards),
550                persist_location: persist_location.clone(),
551                persist: Arc::clone(&persist_clients),
552                read_only,
553            }),
554        );
555
556        Self {
557            finalizable_shards,
558            finalized_shards,
559            collections,
560            txns_read,
561            envd_epoch,
562            read_only,
563            config,
564            initial_txn_upper,
565            persist_location,
566            persist: persist_clients,
567            cmd_tx,
568            holds_tx,
569            _background_task: Arc::new(background_task.abort_on_drop()),
570            _finalize_shards_task: Arc::new(finalize_shards_task.abort_on_drop()),
571        }
572    }
573
574    /// Opens a [WriteHandle] and a [SinceHandleWrapper], for holding back the since.
575    ///
576    /// `since` is an optional since that the read handle will be forwarded to
577    /// if it is less than its current since.
578    ///
579    /// This will `halt!` the process if we cannot successfully acquire a
580    /// critical handle with our current epoch.
581    async fn open_data_handles(
582        &self,
583        id: &GlobalId,
584        shard: ShardId,
585        since: Option<&Antichain<T>>,
586        relation_desc: RelationDesc,
587        persist_client: &PersistClient,
588    ) -> (
589        WriteHandle<SourceData, (), T, StorageDiff>,
590        SinceHandleWrapper<T>,
591    ) {
592        let since_handle = if self.read_only {
593            let read_handle = self
594                .open_leased_handle(id, shard, relation_desc.clone(), since, persist_client)
595                .await;
596            SinceHandleWrapper::Leased(read_handle)
597        } else {
598            // We're managing the data for this shard in read-write mode, which would fence out other
599            // processes in read-only mode; it's safe to upgrade the metadata version.
600            persist_client
601                .upgrade_version::<SourceData, (), T, StorageDiff>(
602                    shard,
603                    Diagnostics {
604                        shard_name: id.to_string(),
605                        handle_purpose: format!("controller data for {}", id),
606                    },
607                )
608                .await
609                .expect("invalid persist usage");
610
611            let since_handle = self
612                .open_critical_handle(id, shard, since, persist_client)
613                .await;
614
615            SinceHandleWrapper::Critical(since_handle)
616        };
617
618        let mut write_handle = self
619            .open_write_handle(id, shard, relation_desc, persist_client)
620            .await;
621
622        // N.B.
623        // Fetch the most recent upper for the write handle. Otherwise, this may
624        // be behind the since of the since handle. Its vital this happens AFTER
625        // we create the since handle as it needs to be linearized with that
626        // operation. It may be true that creating the write handle after the
627        // since handle already ensures this, but we do this out of an abundance
628        // of caution.
629        //
630        // Note that this returns the upper, but also sets it on the handle to
631        // be fetched later.
632        write_handle.fetch_recent_upper().await;
633
634        (write_handle, since_handle)
635    }
636
637    /// Opens a write handle for the given `shard`.
638    async fn open_write_handle(
639        &self,
640        id: &GlobalId,
641        shard: ShardId,
642        relation_desc: RelationDesc,
643        persist_client: &PersistClient,
644    ) -> WriteHandle<SourceData, (), T, StorageDiff> {
645        let diagnostics = Diagnostics {
646            shard_name: id.to_string(),
647            handle_purpose: format!("controller data for {}", id),
648        };
649
650        let write = persist_client
651            .open_writer(
652                shard,
653                Arc::new(relation_desc),
654                Arc::new(UnitSchema),
655                diagnostics.clone(),
656            )
657            .await
658            .expect("invalid persist usage");
659
660        write
661    }
662
663    /// Opens a critical since handle for the given `shard`.
664    ///
665    /// `since` is an optional since that the read handle will be forwarded to
666    /// if it is less than its current since.
667    ///
668    /// This will `halt!` the process if we cannot successfully acquire a
669    /// critical handle with our current epoch.
670    async fn open_critical_handle(
671        &self,
672        id: &GlobalId,
673        shard: ShardId,
674        since: Option<&Antichain<T>>,
675        persist_client: &PersistClient,
676    ) -> SinceHandle<SourceData, (), T, StorageDiff, PersistEpoch> {
677        tracing::debug!(%id, ?since, "opening critical handle");
678
679        assert!(
680            !self.read_only,
681            "attempting to open critical SinceHandle in read-only mode"
682        );
683
684        let diagnostics = Diagnostics {
685            shard_name: id.to_string(),
686            handle_purpose: format!("controller data for {}", id),
687        };
688
689        // Construct the handle in a separate block to ensure all error paths
690        // are diverging
691        let since_handle = {
692            // This block's aim is to ensure the handle is in terms of our epoch
693            // by the time we return it.
694            let mut handle: SinceHandle<_, _, _, _, PersistEpoch> = persist_client
695                .open_critical_since(
696                    shard,
697                    PersistClient::CONTROLLER_CRITICAL_SINCE,
698                    diagnostics.clone(),
699                )
700                .await
701                .expect("invalid persist usage");
702
703            // Take the join of the handle's since and the provided `since`;
704            // this lets materialized views express the since at which their
705            // read handles "start."
706            let provided_since = match since {
707                Some(since) => since,
708                None => &Antichain::from_elem(T::minimum()),
709            };
710            let since = handle.since().join(provided_since);
711
712            let our_epoch = self.envd_epoch;
713
714            loop {
715                let current_epoch: PersistEpoch = handle.opaque().clone();
716
717                // Ensure the current epoch is <= our epoch.
718                let unchecked_success = current_epoch.0.map(|e| e <= our_epoch).unwrap_or(true);
719
720                if unchecked_success {
721                    // Update the handle's state so that it is in terms of our
722                    // epoch.
723                    let checked_success = handle
724                        .compare_and_downgrade_since(
725                            &current_epoch,
726                            (&PersistEpoch::from(our_epoch), &since),
727                        )
728                        .await
729                        .is_ok();
730                    if checked_success {
731                        break handle;
732                    }
733                } else {
734                    mz_ore::halt!("fenced by envd @ {current_epoch:?}. ours = {our_epoch}");
735                }
736            }
737        };
738
739        since_handle
740    }
741
742    /// Opens a leased [ReadHandle], for the purpose of holding back a since,
743    /// for the given `shard`.
744    ///
745    /// `since` is an optional since that the read handle will be forwarded to
746    /// if it is less than its current since.
747    async fn open_leased_handle(
748        &self,
749        id: &GlobalId,
750        shard: ShardId,
751        relation_desc: RelationDesc,
752        since: Option<&Antichain<T>>,
753        persist_client: &PersistClient,
754    ) -> ReadHandle<SourceData, (), T, StorageDiff> {
755        tracing::debug!(%id, ?since, "opening leased handle");
756
757        let diagnostics = Diagnostics {
758            shard_name: id.to_string(),
759            handle_purpose: format!("controller data for {}", id),
760        };
761
762        let use_critical_since = false;
763        let mut handle: ReadHandle<_, _, _, _> = persist_client
764            .open_leased_reader(
765                shard,
766                Arc::new(relation_desc),
767                Arc::new(UnitSchema),
768                diagnostics.clone(),
769                use_critical_since,
770            )
771            .await
772            .expect("invalid persist usage");
773
774        // Take the join of the handle's since and the provided `since`;
775        // this lets materialized views express the since at which their
776        // read handles "start."
777        let provided_since = match since {
778            Some(since) => since,
779            None => &Antichain::from_elem(T::minimum()),
780        };
781        let since = handle.since().join(provided_since);
782
783        handle.downgrade_since(&since).await;
784
785        handle
786    }
787
788    fn register_handles(
789        &self,
790        id: GlobalId,
791        is_in_txns: bool,
792        since_handle: SinceHandleWrapper<T>,
793        write_handle: WriteHandle<SourceData, (), T, StorageDiff>,
794    ) {
795        self.send(BackgroundCmd::Register {
796            id,
797            is_in_txns,
798            since_handle,
799            write_handle,
800        });
801    }
802
803    fn send(&self, cmd: BackgroundCmd<T>) {
804        let _ = self.cmd_tx.send(cmd);
805    }
806
807    async fn snapshot_stats_inner(
808        &self,
809        id: GlobalId,
810        as_of: SnapshotStatsAsOf<T>,
811    ) -> Result<SnapshotStats, StorageError<T>> {
812        // TODO: Pull this out of BackgroundTask. Unlike the other methods, the
813        // caller of this one drives it to completion.
814        //
815        // We'd need to either share the critical handle somehow or maybe have
816        // two instances around, one in the worker and one in the
817        // StorageCollections.
818        let (tx, rx) = oneshot::channel();
819        self.send(BackgroundCmd::SnapshotStats(id, as_of, tx));
820        rx.await.expect("BackgroundTask should be live").0.await
821    }
822
823    /// If this identified collection has a dependency, install a read hold on
824    /// it.
825    ///
826    /// This is necessary to ensure that the dependency's since does not advance
827    /// beyond its dependents'.
828    fn install_collection_dependency_read_holds_inner(
829        &self,
830        self_collections: &mut BTreeMap<GlobalId, CollectionState<T>>,
831        id: GlobalId,
832    ) -> Result<(), StorageError<T>> {
833        let (deps, collection_implied_capability) = match self_collections.get(&id) {
834            Some(CollectionState {
835                storage_dependencies: deps,
836                implied_capability,
837                ..
838            }) => (deps.clone(), implied_capability),
839            _ => return Ok(()),
840        };
841
842        for dep in deps.iter() {
843            let dep_collection = self_collections
844                .get(dep)
845                .ok_or(StorageError::IdentifierMissing(id))?;
846
847            mz_ore::soft_assert_or_log!(
848                PartialOrder::less_equal(
849                    &dep_collection.implied_capability,
850                    collection_implied_capability
851                ),
852                "dependency since ({dep}@{:?}) cannot be in advance of dependent's since ({id}@{:?})",
853                dep_collection.implied_capability,
854                collection_implied_capability,
855            );
856        }
857
858        self.install_read_capabilities_inner(
859            self_collections,
860            id,
861            &deps,
862            collection_implied_capability.clone(),
863        )?;
864
865        Ok(())
866    }
867
868    /// Returns the given collection's dependencies.
869    fn determine_collection_dependencies(
870        self_collections: &BTreeMap<GlobalId, CollectionState<T>>,
871        source_id: GlobalId,
872        collection_desc: &CollectionDescription<T>,
873    ) -> Result<Vec<GlobalId>, StorageError<T>> {
874        let mut dependencies = Vec::new();
875
876        if let Some(id) = collection_desc.primary {
877            dependencies.push(id);
878        }
879
880        match &collection_desc.data_source {
881            DataSource::Introspection(_)
882            | DataSource::Webhook
883            | DataSource::Table
884            | DataSource::Progress
885            | DataSource::Other => (),
886            DataSource::IngestionExport {
887                ingestion_id,
888                data_config,
889                ..
890            } => {
891                // Ingestion exports depend on their primary source's remap
892                // collection, except when they use a CDCv2 envelope.
893                let source = self_collections
894                    .get(ingestion_id)
895                    .ok_or(StorageError::IdentifierMissing(*ingestion_id))?;
896                let Some(remap_collection_id) = &source.ingestion_remap_collection_id else {
897                    panic!("SourceExport must refer to a primary source that already exists");
898                };
899
900                match data_config.envelope {
901                    SourceEnvelope::CdcV2 => (),
902                    _ => dependencies.push(*remap_collection_id),
903                }
904            }
905            // Ingestions depend on their remap collection.
906            DataSource::Ingestion(ingestion) => {
907                if ingestion.remap_collection_id != source_id {
908                    dependencies.push(ingestion.remap_collection_id);
909                }
910            }
911            DataSource::Sink { desc } => dependencies.push(desc.sink.from),
912        }
913
914        Ok(dependencies)
915    }
916
917    /// Install read capabilities on the given `storage_dependencies`.
918    #[instrument(level = "debug")]
919    fn install_read_capabilities_inner(
920        &self,
921        self_collections: &mut BTreeMap<GlobalId, CollectionState<T>>,
922        from_id: GlobalId,
923        storage_dependencies: &[GlobalId],
924        read_capability: Antichain<T>,
925    ) -> Result<(), StorageError<T>> {
926        let mut changes = ChangeBatch::new();
927        for time in read_capability.iter() {
928            changes.update(time.clone(), 1);
929        }
930
931        if tracing::span_enabled!(tracing::Level::TRACE) {
932            // Collecting `user_capabilities` is potentially slow, thus only do it when needed.
933            let user_capabilities = self_collections
934                .iter_mut()
935                .filter(|(id, _c)| id.is_user())
936                .map(|(id, c)| {
937                    let updates = c.read_capabilities.updates().cloned().collect_vec();
938                    (*id, c.implied_capability.clone(), updates)
939                })
940                .collect_vec();
941
942            trace!(
943                %from_id,
944                ?storage_dependencies,
945                ?read_capability,
946                ?user_capabilities,
947                "install_read_capabilities_inner");
948        }
949
950        let mut storage_read_updates = storage_dependencies
951            .iter()
952            .map(|id| (*id, changes.clone()))
953            .collect();
954
955        StorageCollectionsImpl::update_read_capabilities_inner(
956            &self.cmd_tx,
957            self_collections,
958            &mut storage_read_updates,
959        );
960
961        if tracing::span_enabled!(tracing::Level::TRACE) {
962            // Collecting `user_capabilities` is potentially slow, thus only do it when needed.
963            let user_capabilities = self_collections
964                .iter_mut()
965                .filter(|(id, _c)| id.is_user())
966                .map(|(id, c)| {
967                    let updates = c.read_capabilities.updates().cloned().collect_vec();
968                    (*id, c.implied_capability.clone(), updates)
969                })
970                .collect_vec();
971
972            trace!(
973                %from_id,
974                ?storage_dependencies,
975                ?read_capability,
976                ?user_capabilities,
977                "after install_read_capabilities_inner!");
978        }
979
980        Ok(())
981    }
982
983    async fn recent_upper(&self, id: GlobalId) -> Result<Antichain<T>, StorageError<T>> {
984        let metadata = &self.collection_metadata(id)?;
985        let persist_client = self
986            .persist
987            .open(metadata.persist_location.clone())
988            .await
989            .unwrap();
990        // Duplicate part of open_data_handles here because we don't need the
991        // fetch_recent_upper call. The pubsub-updated shared_upper is enough.
992        let diagnostics = Diagnostics {
993            shard_name: id.to_string(),
994            handle_purpose: format!("controller data for {}", id),
995        };
996        // NB: Opening a WriteHandle is cheap if it's never used in a
997        // compare_and_append operation.
998        let write = persist_client
999            .open_writer::<SourceData, (), T, StorageDiff>(
1000                metadata.data_shard,
1001                Arc::new(metadata.relation_desc.clone()),
1002                Arc::new(UnitSchema),
1003                diagnostics.clone(),
1004            )
1005            .await
1006            .expect("invalid persist usage");
1007        Ok(write.shared_upper())
1008    }
1009
1010    async fn read_handle_for_snapshot(
1011        persist: Arc<PersistClientCache>,
1012        metadata: &CollectionMetadata,
1013        id: GlobalId,
1014    ) -> Result<ReadHandle<SourceData, (), T, StorageDiff>, StorageError<T>> {
1015        let persist_client = persist
1016            .open(metadata.persist_location.clone())
1017            .await
1018            .unwrap();
1019
1020        // We create a new read handle every time someone requests a snapshot
1021        // and then immediately expire it instead of keeping a read handle
1022        // permanently in our state to avoid having it heartbeat continually.
1023        // The assumption is that calls to snapshot are rare and therefore worth
1024        // it to always create a new handle.
1025        let read_handle = persist_client
1026            .open_leased_reader::<SourceData, (), _, _>(
1027                metadata.data_shard,
1028                Arc::new(metadata.relation_desc.clone()),
1029                Arc::new(UnitSchema),
1030                Diagnostics {
1031                    shard_name: id.to_string(),
1032                    handle_purpose: format!("snapshot {}", id),
1033                },
1034                USE_CRITICAL_SINCE_SNAPSHOT.get(&persist.cfg),
1035            )
1036            .await
1037            .expect("invalid persist usage");
1038        Ok(read_handle)
1039    }
1040
1041    // TODO(petrosagg): This signature is not very useful in the context of partially ordered times
1042    // where the as_of frontier might have multiple elements. In the current form the mutually
1043    // incomparable updates will be accumulated together to a state of the collection that never
1044    // actually existed. We should include the original time in the updates advanced by the as_of
1045    // frontier in the result and let the caller decide what to do with the information.
1046    fn snapshot(
1047        &self,
1048        id: GlobalId,
1049        as_of: T,
1050        txns_read: &TxnsRead<T>,
1051    ) -> BoxFuture<'static, Result<Vec<(Row, StorageDiff)>, StorageError<T>>>
1052    where
1053        T: Codec64 + From<EpochMillis> + TimestampManipulation,
1054    {
1055        let metadata = match self.collection_metadata(id) {
1056            Ok(metadata) => metadata.clone(),
1057            Err(e) => return async { Err(e.into()) }.boxed(),
1058        };
1059        let txns_read = metadata.txns_shard.as_ref().map(|txns_id| {
1060            assert_eq!(txns_id, txns_read.txns_id());
1061            txns_read.clone()
1062        });
1063        let persist = Arc::clone(&self.persist);
1064        async move {
1065            let mut read_handle = Self::read_handle_for_snapshot(persist, &metadata, id).await?;
1066            let contents = match txns_read {
1067                None => {
1068                    // We're not using txn-wal for tables, so we can take a snapshot directly.
1069                    read_handle
1070                        .snapshot_and_fetch(Antichain::from_elem(as_of))
1071                        .await
1072                }
1073                Some(txns_read) => {
1074                    // We _are_ using txn-wal for tables. It advances the physical upper of the
1075                    // shard lazily, so we need to ask it for the snapshot to ensure the read is
1076                    // unblocked.
1077                    //
1078                    // Consider the following scenario:
1079                    // - Table A is written to via txns at time 5
1080                    // - Tables other than A are written to via txns consuming timestamps up to 10
1081                    // - We'd like to read A at 7
1082                    // - The application process of A's txn has advanced the upper to 5+1, but we need
1083                    //   it to be past 7, but the txns shard knows that (5,10) is empty of writes to A
1084                    // - This branch allows it to handle that advancing the physical upper of Table A to
1085                    //   10 (NB but only once we see it get past the write at 5!)
1086                    // - Then we can read it normally.
1087                    txns_read.update_gt(as_of.clone()).await;
1088                    let data_snapshot = txns_read
1089                        .data_snapshot(metadata.data_shard, as_of.clone())
1090                        .await;
1091                    data_snapshot.snapshot_and_fetch(&mut read_handle).await
1092                }
1093            };
1094            match contents {
1095                Ok(contents) => {
1096                    let mut snapshot = Vec::with_capacity(contents.len());
1097                    for ((data, _), _, diff) in contents {
1098                        // TODO(petrosagg): We should accumulate the errors too and let the user
1099                        // interpret the result
1100                        let row = data.0?;
1101                        snapshot.push((row, diff));
1102                    }
1103                    Ok(snapshot)
1104                }
1105                Err(_) => Err(StorageError::ReadBeforeSince(id)),
1106            }
1107        }
1108        .boxed()
1109    }
1110
1111    fn snapshot_and_stream(
1112        &self,
1113        id: GlobalId,
1114        as_of: T,
1115        txns_read: &TxnsRead<T>,
1116    ) -> BoxFuture<'static, Result<SourceDataStream<T>, StorageError<T>>> {
1117        use futures::stream::StreamExt;
1118
1119        let metadata = match self.collection_metadata(id) {
1120            Ok(metadata) => metadata.clone(),
1121            Err(e) => return async { Err(e.into()) }.boxed(),
1122        };
1123        let txns_read = metadata.txns_shard.as_ref().map(|txns_id| {
1124            assert_eq!(txns_id, txns_read.txns_id());
1125            txns_read.clone()
1126        });
1127        let persist = Arc::clone(&self.persist);
1128
1129        async move {
1130            let mut read_handle = Self::read_handle_for_snapshot(persist, &metadata, id).await?;
1131            let stream = match txns_read {
1132                None => {
1133                    // We're not using txn-wal for tables, so we can take a snapshot directly.
1134                    read_handle
1135                        .snapshot_and_stream(Antichain::from_elem(as_of))
1136                        .await
1137                        .map_err(|_| StorageError::ReadBeforeSince(id))?
1138                        .boxed()
1139                }
1140                Some(txns_read) => {
1141                    txns_read.update_gt(as_of.clone()).await;
1142                    let data_snapshot = txns_read
1143                        .data_snapshot(metadata.data_shard, as_of.clone())
1144                        .await;
1145                    data_snapshot
1146                        .snapshot_and_stream(&mut read_handle)
1147                        .await
1148                        .map_err(|_| StorageError::ReadBeforeSince(id))?
1149                        .boxed()
1150                }
1151            };
1152
1153            // Map our stream, unwrapping Persist internal errors.
1154            let stream = stream.map(|((data, _v), t, d)| (data, t, d)).boxed();
1155            Ok(stream)
1156        }
1157        .boxed()
1158    }
1159
1160    fn set_read_policies_inner(
1161        &self,
1162        collections: &mut BTreeMap<GlobalId, CollectionState<T>>,
1163        policies: Vec<(GlobalId, ReadPolicy<T>)>,
1164    ) {
1165        trace!("set_read_policies: {:?}", policies);
1166
1167        let mut read_capability_changes = BTreeMap::default();
1168
1169        for (id, policy) in policies.into_iter() {
1170            let collection = match collections.get_mut(&id) {
1171                Some(c) => c,
1172                None => {
1173                    panic!("Reference to absent collection {id}");
1174                }
1175            };
1176
1177            let mut new_read_capability = policy.frontier(collection.write_frontier.borrow());
1178
1179            if PartialOrder::less_equal(&collection.implied_capability, &new_read_capability) {
1180                let mut update = ChangeBatch::new();
1181                update.extend(new_read_capability.iter().map(|time| (time.clone(), 1)));
1182                std::mem::swap(&mut collection.implied_capability, &mut new_read_capability);
1183                update.extend(new_read_capability.iter().map(|time| (time.clone(), -1)));
1184                if !update.is_empty() {
1185                    read_capability_changes.insert(id, update);
1186                }
1187            }
1188
1189            collection.read_policy = policy;
1190        }
1191
1192        for (id, changes) in read_capability_changes.iter() {
1193            if id.is_user() {
1194                trace!(%id, ?changes, "in set_read_policies, capability changes");
1195            }
1196        }
1197
1198        if !read_capability_changes.is_empty() {
1199            StorageCollectionsImpl::update_read_capabilities_inner(
1200                &self.cmd_tx,
1201                collections,
1202                &mut read_capability_changes,
1203            );
1204        }
1205    }
1206
1207    // This is not an associated function so that we can share it with the task
1208    // that updates the persist handles and also has a reference to the shared
1209    // collections state.
1210    fn update_read_capabilities_inner(
1211        cmd_tx: &mpsc::UnboundedSender<BackgroundCmd<T>>,
1212        collections: &mut BTreeMap<GlobalId, CollectionState<T>>,
1213        updates: &mut BTreeMap<GlobalId, ChangeBatch<T>>,
1214    ) {
1215        // Location to record consequences that we need to act on.
1216        let mut collections_net = BTreeMap::new();
1217
1218        // We must not rely on any specific relative ordering of `GlobalId`s.
1219        // That said, it is reasonable to assume that collections generally have
1220        // greater IDs than their dependencies, so starting with the largest is
1221        // a useful optimization.
1222        while let Some(id) = updates.keys().rev().next().cloned() {
1223            let mut update = updates.remove(&id).unwrap();
1224
1225            if id.is_user() {
1226                trace!(id = ?id, update = ?update, "update_read_capabilities");
1227            }
1228
1229            let collection = if let Some(c) = collections.get_mut(&id) {
1230                c
1231            } else {
1232                let has_positive_updates = update.iter().any(|(_ts, diff)| *diff > 0);
1233                if has_positive_updates {
1234                    panic!(
1235                        "reference to absent collection {id} but we have positive updates: {:?}",
1236                        update
1237                    );
1238                } else {
1239                    // Continue purely negative updates. Someone has probably
1240                    // already dropped this collection!
1241                    continue;
1242                }
1243            };
1244
1245            let current_read_capabilities = collection.read_capabilities.frontier().to_owned();
1246            for (time, diff) in update.iter() {
1247                assert!(
1248                    collection.read_capabilities.count_for(time) + diff >= 0,
1249                    "update {:?} for collection {id} would lead to negative \
1250                        read capabilities, read capabilities before applying: {:?}",
1251                    update,
1252                    collection.read_capabilities
1253                );
1254
1255                if collection.read_capabilities.count_for(time) + diff > 0 {
1256                    assert!(
1257                        current_read_capabilities.less_equal(time),
1258                        "update {:?} for collection {id} is trying to \
1259                            install read capabilities before the current \
1260                            frontier of read capabilities, read capabilities before applying: {:?}",
1261                        update,
1262                        collection.read_capabilities
1263                    );
1264                }
1265            }
1266
1267            let changes = collection.read_capabilities.update_iter(update.drain());
1268            update.extend(changes);
1269
1270            if id.is_user() {
1271                trace!(
1272                %id,
1273                ?collection.storage_dependencies,
1274                ?update,
1275                "forwarding update to storage dependencies");
1276            }
1277
1278            for id in collection.storage_dependencies.iter() {
1279                updates
1280                    .entry(*id)
1281                    .or_insert_with(ChangeBatch::new)
1282                    .extend(update.iter().cloned());
1283            }
1284
1285            let (changes, frontier) = collections_net
1286                .entry(id)
1287                .or_insert_with(|| (<ChangeBatch<_>>::new(), Antichain::new()));
1288
1289            changes.extend(update.drain());
1290            *frontier = collection.read_capabilities.frontier().to_owned();
1291        }
1292
1293        // Translate our net compute actions into downgrades of persist sinces.
1294        // The actual downgrades are performed by a Tokio task asynchronously.
1295        let mut persist_compaction_commands = Vec::with_capacity(collections_net.len());
1296        for (key, (mut changes, frontier)) in collections_net {
1297            if !changes.is_empty() {
1298                // If the collection has a "primary" collection, let that primary drive compaction.
1299                let collection = collections.get(&key).expect("must still exist");
1300                let should_emit_persist_compaction = collection.primary.is_none();
1301
1302                if frontier.is_empty() {
1303                    info!(id = %key, "removing collection state because the since advanced to []!");
1304                    collections.remove(&key).expect("must still exist");
1305                }
1306
1307                if should_emit_persist_compaction {
1308                    persist_compaction_commands.push((key, frontier));
1309                }
1310            }
1311        }
1312
1313        if !persist_compaction_commands.is_empty() {
1314            cmd_tx
1315                .send(BackgroundCmd::DowngradeSince(persist_compaction_commands))
1316                .expect("cannot fail to send");
1317        }
1318    }
1319
1320    /// Remove any shards that we know are finalized
1321    fn synchronize_finalized_shards(&self, storage_metadata: &StorageMetadata) {
1322        self.finalized_shards
1323            .lock()
1324            .retain(|shard| storage_metadata.unfinalized_shards.contains(shard));
1325    }
1326}
1327
1328// See comments on the above impl for StorageCollectionsImpl.
1329#[async_trait]
1330impl<T> StorageCollections for StorageCollectionsImpl<T>
1331where
1332    T: TimelyTimestamp
1333        + Lattice
1334        + Codec64
1335        + From<EpochMillis>
1336        + TimestampManipulation
1337        + Into<mz_repr::Timestamp>
1338        + Sync,
1339{
1340    type Timestamp = T;
1341
1342    async fn initialize_state(
1343        &self,
1344        txn: &mut (dyn StorageTxn<T> + Send),
1345        init_ids: BTreeSet<GlobalId>,
1346    ) -> Result<(), StorageError<T>> {
1347        let metadata = txn.get_collection_metadata();
1348        let existing_metadata: BTreeSet<_> = metadata.into_iter().map(|(id, _)| id).collect();
1349
1350        // Determine which collections we do not yet have metadata for.
1351        let new_collections: BTreeSet<GlobalId> =
1352            init_ids.difference(&existing_metadata).cloned().collect();
1353
1354        self.prepare_state(
1355            txn,
1356            new_collections,
1357            BTreeSet::default(),
1358            BTreeMap::default(),
1359        )
1360        .await?;
1361
1362        // All shards that belong to collections dropped in the last epoch are
1363        // eligible for finalization.
1364        //
1365        // n.b. this introduces an unlikely race condition: if a collection is
1366        // dropped from the catalog, but the dataflow is still running on a
1367        // worker, assuming the shard is safe to finalize on reboot may cause
1368        // the cluster to panic.
1369        let unfinalized_shards = txn.get_unfinalized_shards().into_iter().collect_vec();
1370
1371        info!(?unfinalized_shards, "initializing finalizable_shards");
1372
1373        self.finalizable_shards.lock().extend(unfinalized_shards);
1374
1375        Ok(())
1376    }
1377
1378    fn update_parameters(&self, config_params: StorageParameters) {
1379        // We serialize the dyncfg updates in StorageParameters, but configure
1380        // persist separately.
1381        config_params.dyncfg_updates.apply(self.persist.cfg());
1382
1383        self.config
1384            .lock()
1385            .expect("lock poisoned")
1386            .update(config_params);
1387    }
1388
1389    fn collection_metadata(&self, id: GlobalId) -> Result<CollectionMetadata, CollectionMissing> {
1390        let collections = self.collections.lock().expect("lock poisoned");
1391
1392        collections
1393            .get(&id)
1394            .map(|c| c.collection_metadata.clone())
1395            .ok_or(CollectionMissing(id))
1396    }
1397
1398    fn active_collection_metadatas(&self) -> Vec<(GlobalId, CollectionMetadata)> {
1399        let collections = self.collections.lock().expect("lock poisoned");
1400
1401        collections
1402            .iter()
1403            .filter(|(_id, c)| !c.is_dropped())
1404            .map(|(id, c)| (*id, c.collection_metadata.clone()))
1405            .collect()
1406    }
1407
1408    fn collections_frontiers(
1409        &self,
1410        ids: Vec<GlobalId>,
1411    ) -> Result<Vec<CollectionFrontiers<Self::Timestamp>>, CollectionMissing> {
1412        if ids.is_empty() {
1413            return Ok(vec![]);
1414        }
1415
1416        let collections = self.collections.lock().expect("lock poisoned");
1417
1418        let res = ids
1419            .into_iter()
1420            .map(|id| {
1421                collections
1422                    .get(&id)
1423                    .map(|c| CollectionFrontiers {
1424                        id: id.clone(),
1425                        write_frontier: c.write_frontier.clone(),
1426                        implied_capability: c.implied_capability.clone(),
1427                        read_capabilities: c.read_capabilities.frontier().to_owned(),
1428                    })
1429                    .ok_or(CollectionMissing(id))
1430            })
1431            .collect::<Result<Vec<_>, _>>()?;
1432
1433        Ok(res)
1434    }
1435
1436    fn active_collection_frontiers(&self) -> Vec<CollectionFrontiers<Self::Timestamp>> {
1437        let collections = self.collections.lock().expect("lock poisoned");
1438
1439        let res = collections
1440            .iter()
1441            .filter(|(_id, c)| !c.is_dropped())
1442            .map(|(id, c)| CollectionFrontiers {
1443                id: id.clone(),
1444                write_frontier: c.write_frontier.clone(),
1445                implied_capability: c.implied_capability.clone(),
1446                read_capabilities: c.read_capabilities.frontier().to_owned(),
1447            })
1448            .collect_vec();
1449
1450        res
1451    }
1452
1453    async fn snapshot_stats(
1454        &self,
1455        id: GlobalId,
1456        as_of: Antichain<Self::Timestamp>,
1457    ) -> Result<SnapshotStats, StorageError<Self::Timestamp>> {
1458        let metadata = self.collection_metadata(id)?;
1459
1460        // See the comments in StorageController::snapshot for what's going on
1461        // here.
1462        let as_of = match metadata.txns_shard.as_ref() {
1463            None => SnapshotStatsAsOf::Direct(as_of),
1464            Some(txns_id) => {
1465                assert_eq!(txns_id, self.txns_read.txns_id());
1466                let as_of = as_of
1467                    .into_option()
1468                    .expect("cannot read as_of the empty antichain");
1469                self.txns_read.update_gt(as_of.clone()).await;
1470                let data_snapshot = self
1471                    .txns_read
1472                    .data_snapshot(metadata.data_shard, as_of.clone())
1473                    .await;
1474                SnapshotStatsAsOf::Txns(data_snapshot)
1475            }
1476        };
1477        self.snapshot_stats_inner(id, as_of).await
1478    }
1479
1480    async fn snapshot_parts_stats(
1481        &self,
1482        id: GlobalId,
1483        as_of: Antichain<Self::Timestamp>,
1484    ) -> BoxFuture<'static, Result<SnapshotPartsStats, StorageError<Self::Timestamp>>> {
1485        let metadata = {
1486            let self_collections = self.collections.lock().expect("lock poisoned");
1487
1488            let collection_metadata = self_collections
1489                .get(&id)
1490                .ok_or(StorageError::IdentifierMissing(id))
1491                .map(|c| c.collection_metadata.clone());
1492
1493            match collection_metadata {
1494                Ok(m) => m,
1495                Err(e) => return Box::pin(async move { Err(e) }),
1496            }
1497        };
1498
1499        // See the comments in StorageController::snapshot for what's going on
1500        // here.
1501        let persist = Arc::clone(&self.persist);
1502        let read_handle = Self::read_handle_for_snapshot(persist, &metadata, id).await;
1503
1504        let data_snapshot = match (metadata, as_of.as_option()) {
1505            (
1506                CollectionMetadata {
1507                    txns_shard: Some(txns_id),
1508                    data_shard,
1509                    ..
1510                },
1511                Some(as_of),
1512            ) => {
1513                assert_eq!(txns_id, *self.txns_read.txns_id());
1514                self.txns_read.update_gt(as_of.clone()).await;
1515                let data_snapshot = self
1516                    .txns_read
1517                    .data_snapshot(data_shard, as_of.clone())
1518                    .await;
1519                Some(data_snapshot)
1520            }
1521            _ => None,
1522        };
1523
1524        Box::pin(async move {
1525            let read_handle = read_handle?;
1526            let result = match data_snapshot {
1527                Some(data_snapshot) => data_snapshot.snapshot_parts_stats(&read_handle).await,
1528                None => read_handle.snapshot_parts_stats(as_of).await,
1529            };
1530            read_handle.expire().await;
1531            result.map_err(|_| StorageError::ReadBeforeSince(id))
1532        })
1533    }
1534
1535    // TODO(petrosagg): This signature is not very useful in the context of partially ordered times
1536    // where the as_of frontier might have multiple elements. In the current form the mutually
1537    // incomparable updates will be accumulated together to a state of the collection that never
1538    // actually existed. We should include the original time in the updates advanced by the as_of
1539    // frontier in the result and let the caller decide what to do with the information.
1540    fn snapshot(
1541        &self,
1542        id: GlobalId,
1543        as_of: Self::Timestamp,
1544    ) -> BoxFuture<'static, Result<Vec<(Row, StorageDiff)>, StorageError<Self::Timestamp>>> {
1545        self.snapshot(id, as_of, &self.txns_read)
1546    }
1547
1548    async fn snapshot_latest(
1549        &self,
1550        id: GlobalId,
1551    ) -> Result<Vec<Row>, StorageError<Self::Timestamp>> {
1552        let upper = self.recent_upper(id).await?;
1553        let res = match upper.as_option() {
1554            Some(f) if f > &T::minimum() => {
1555                let as_of = f.step_back().unwrap();
1556
1557                let snapshot = self.snapshot(id, as_of, &self.txns_read).await.unwrap();
1558                snapshot
1559                    .into_iter()
1560                    .map(|(row, diff)| {
1561                        assert_eq!(diff, 1, "snapshot doesn't accumulate to set");
1562                        row
1563                    })
1564                    .collect()
1565            }
1566            Some(_min) => {
1567                // The collection must be empty!
1568                Vec::new()
1569            }
1570            // The collection is closed, we cannot determine a latest read
1571            // timestamp based on the upper.
1572            _ => {
1573                return Err(StorageError::InvalidUsage(
1574                    "collection closed, cannot determine a read timestamp based on the upper"
1575                        .to_string(),
1576                ));
1577            }
1578        };
1579
1580        Ok(res)
1581    }
1582
1583    fn snapshot_cursor(
1584        &self,
1585        id: GlobalId,
1586        as_of: Self::Timestamp,
1587    ) -> BoxFuture<'static, Result<SnapshotCursor<Self::Timestamp>, StorageError<Self::Timestamp>>>
1588    where
1589        Self::Timestamp: TimelyTimestamp + Lattice + Codec64,
1590    {
1591        let metadata = match self.collection_metadata(id) {
1592            Ok(metadata) => metadata.clone(),
1593            Err(e) => return async { Err(e.into()) }.boxed(),
1594        };
1595        let txns_read = metadata.txns_shard.as_ref().map(|txns_id| {
1596            // Ensure the txn's shard the controller has is the same that this
1597            // collection is registered to.
1598            assert_eq!(txns_id, self.txns_read.txns_id());
1599            self.txns_read.clone()
1600        });
1601        let persist = Arc::clone(&self.persist);
1602
1603        // See the comments in Self::snapshot for what's going on here.
1604        async move {
1605            let mut handle = Self::read_handle_for_snapshot(persist, &metadata, id).await?;
1606            let cursor = match txns_read {
1607                None => {
1608                    let cursor = handle
1609                        .snapshot_cursor(Antichain::from_elem(as_of), |_| true)
1610                        .await
1611                        .map_err(|_| StorageError::ReadBeforeSince(id))?;
1612                    SnapshotCursor {
1613                        _read_handle: handle,
1614                        cursor,
1615                    }
1616                }
1617                Some(txns_read) => {
1618                    txns_read.update_gt(as_of.clone()).await;
1619                    let data_snapshot = txns_read
1620                        .data_snapshot(metadata.data_shard, as_of.clone())
1621                        .await;
1622                    let cursor = data_snapshot
1623                        .snapshot_cursor(&mut handle, |_| true)
1624                        .await
1625                        .map_err(|_| StorageError::ReadBeforeSince(id))?;
1626                    SnapshotCursor {
1627                        _read_handle: handle,
1628                        cursor,
1629                    }
1630                }
1631            };
1632
1633            Ok(cursor)
1634        }
1635        .boxed()
1636    }
1637
1638    fn snapshot_and_stream(
1639        &self,
1640        id: GlobalId,
1641        as_of: Self::Timestamp,
1642    ) -> BoxFuture<
1643        'static,
1644        Result<
1645            BoxStream<'static, (SourceData, Self::Timestamp, StorageDiff)>,
1646            StorageError<Self::Timestamp>,
1647        >,
1648    >
1649    where
1650        Self::Timestamp: TimelyTimestamp + Lattice + Codec64 + 'static,
1651    {
1652        self.snapshot_and_stream(id, as_of, &self.txns_read)
1653    }
1654
1655    fn create_update_builder(
1656        &self,
1657        id: GlobalId,
1658    ) -> BoxFuture<
1659        'static,
1660        Result<
1661            TimestamplessUpdateBuilder<SourceData, (), Self::Timestamp, StorageDiff>,
1662            StorageError<Self::Timestamp>,
1663        >,
1664    > {
1665        let metadata = match self.collection_metadata(id) {
1666            Ok(m) => m,
1667            Err(e) => return Box::pin(async move { Err(e.into()) }),
1668        };
1669        let persist = Arc::clone(&self.persist);
1670
1671        async move {
1672            let persist_client = persist
1673                .open(metadata.persist_location.clone())
1674                .await
1675                .expect("invalid persist usage");
1676            let write_handle = persist_client
1677                .open_writer::<SourceData, (), Self::Timestamp, StorageDiff>(
1678                    metadata.data_shard,
1679                    Arc::new(metadata.relation_desc.clone()),
1680                    Arc::new(UnitSchema),
1681                    Diagnostics {
1682                        shard_name: id.to_string(),
1683                        handle_purpose: format!("create write batch {}", id),
1684                    },
1685                )
1686                .await
1687                .expect("invalid persist usage");
1688            let builder = TimestamplessUpdateBuilder::new(&write_handle);
1689
1690            Ok(builder)
1691        }
1692        .boxed()
1693    }
1694
1695    fn check_exists(&self, id: GlobalId) -> Result<(), StorageError<Self::Timestamp>> {
1696        let collections = self.collections.lock().expect("lock poisoned");
1697
1698        if collections.contains_key(&id) {
1699            Ok(())
1700        } else {
1701            Err(StorageError::IdentifierMissing(id))
1702        }
1703    }
1704
1705    async fn prepare_state(
1706        &self,
1707        txn: &mut (dyn StorageTxn<Self::Timestamp> + Send),
1708        ids_to_add: BTreeSet<GlobalId>,
1709        ids_to_drop: BTreeSet<GlobalId>,
1710        ids_to_register: BTreeMap<GlobalId, ShardId>,
1711    ) -> Result<(), StorageError<T>> {
1712        txn.insert_collection_metadata(
1713            ids_to_add
1714                .into_iter()
1715                .map(|id| (id, ShardId::new()))
1716                .collect(),
1717        )?;
1718        txn.insert_collection_metadata(ids_to_register)?;
1719
1720        // Delete the metadata for any dropped collections.
1721        let dropped_mappings = txn.delete_collection_metadata(ids_to_drop);
1722
1723        let dropped_shards = dropped_mappings
1724            .into_iter()
1725            .map(|(_id, shard)| shard)
1726            .collect();
1727
1728        txn.insert_unfinalized_shards(dropped_shards)?;
1729
1730        // Reconcile any shards we've successfully finalized with the shard
1731        // finalization collection.
1732        let finalized_shards = self.finalized_shards.lock().iter().copied().collect();
1733        txn.mark_shards_as_finalized(finalized_shards);
1734
1735        Ok(())
1736    }
1737
1738    // TODO(aljoscha): It would be swell if we could refactor this Leviathan of
1739    // a method/move individual parts to their own methods.
1740    #[instrument(level = "debug")]
1741    async fn create_collections_for_bootstrap(
1742        &self,
1743        storage_metadata: &StorageMetadata,
1744        register_ts: Option<Self::Timestamp>,
1745        mut collections: Vec<(GlobalId, CollectionDescription<Self::Timestamp>)>,
1746        migrated_storage_collections: &BTreeSet<GlobalId>,
1747    ) -> Result<(), StorageError<Self::Timestamp>> {
1748        let is_in_txns = |id, metadata: &CollectionMetadata| {
1749            metadata.txns_shard.is_some()
1750                && !(self.read_only && migrated_storage_collections.contains(&id))
1751        };
1752
1753        // Validate first, to avoid corrupting state.
1754        // 1. create a dropped identifier, or
1755        // 2. create an existing identifier with a new description.
1756        // Make sure to check for errors within `ingestions` as well.
1757        collections.sort_by_key(|(id, _)| *id);
1758        collections.dedup();
1759        for pos in 1..collections.len() {
1760            if collections[pos - 1].0 == collections[pos].0 {
1761                return Err(StorageError::CollectionIdReused(collections[pos].0));
1762            }
1763        }
1764
1765        // We first enrich each collection description with some additional
1766        // metadata...
1767        let enriched_with_metadata = collections
1768            .into_iter()
1769            .map(|(id, description)| {
1770                let data_shard = storage_metadata.get_collection_shard::<T>(id)?;
1771
1772                // If the shard is being managed by txn-wal (initially,
1773                // tables), then we need to pass along the shard id for the txns
1774                // shard to dataflow rendering.
1775                let txns_shard = description
1776                    .data_source
1777                    .in_txns()
1778                    .then(|| *self.txns_read.txns_id());
1779
1780                let metadata = CollectionMetadata {
1781                    persist_location: self.persist_location.clone(),
1782                    data_shard,
1783                    relation_desc: description.desc.clone(),
1784                    txns_shard,
1785                };
1786
1787                Ok((id, description, metadata))
1788            })
1789            .collect_vec();
1790
1791        // So that we can open `SinceHandle`s for each collections concurrently.
1792        let persist_client = self
1793            .persist
1794            .open(self.persist_location.clone())
1795            .await
1796            .unwrap();
1797        let persist_client = &persist_client;
1798        // Reborrow the `&mut self` as immutable, as all the concurrent work to
1799        // be processed in this stream cannot all have exclusive access.
1800        use futures::stream::{StreamExt, TryStreamExt};
1801        let this = &*self;
1802        let mut to_register: Vec<_> = futures::stream::iter(enriched_with_metadata)
1803            .map(|data: Result<_, StorageError<Self::Timestamp>>| {
1804                let register_ts = register_ts.clone();
1805                async move {
1806                    let (id, description, metadata) = data?;
1807
1808                    // should be replaced with real introspection
1809                    // (https://github.com/MaterializeInc/database-issues/issues/4078)
1810                    // but for now, it's helpful to have this mapping written down
1811                    // somewhere
1812                    debug!("mapping GlobalId={} to shard ({})", id, metadata.data_shard);
1813
1814                    // If this collection has a primary, the primary is responsible for downgrading
1815                    // the critical since and it would be an error if we did so here while opening
1816                    // the since handle.
1817                    let since = if description.primary.is_some() {
1818                        None
1819                    } else {
1820                        description.since.as_ref()
1821                    };
1822
1823                    let (write, mut since_handle) = this
1824                        .open_data_handles(
1825                            &id,
1826                            metadata.data_shard,
1827                            since,
1828                            metadata.relation_desc.clone(),
1829                            persist_client,
1830                        )
1831                        .await;
1832
1833                    // Present tables as springing into existence at the register_ts
1834                    // by advancing the since. Otherwise, we could end up in a
1835                    // situation where a table with a long compaction window appears
1836                    // to exist before the environment (and this the table) existed.
1837                    //
1838                    // We could potentially also do the same thing for other
1839                    // sources, in particular storage's internal sources and perhaps
1840                    // others, but leave them for now.
1841                    match description.data_source {
1842                        DataSource::Introspection(_)
1843                        | DataSource::IngestionExport { .. }
1844                        | DataSource::Webhook
1845                        | DataSource::Ingestion(_)
1846                        | DataSource::Progress
1847                        | DataSource::Other => {}
1848                        DataSource::Sink { .. } => {}
1849                        DataSource::Table => {
1850                            let register_ts = register_ts.expect(
1851                                "caller should have provided a register_ts when creating a table",
1852                            );
1853                            if since_handle.since().elements() == &[T::minimum()]
1854                                && !migrated_storage_collections.contains(&id)
1855                            {
1856                                debug!("advancing {} to initial since of {:?}", id, register_ts);
1857                                let token = since_handle.opaque();
1858                                let _ = since_handle
1859                                    .compare_and_downgrade_since(
1860                                        &token,
1861                                        (&token, &Antichain::from_elem(register_ts.clone())),
1862                                    )
1863                                    .await;
1864                            }
1865                        }
1866                    }
1867
1868                    Ok::<_, StorageError<Self::Timestamp>>((
1869                        id,
1870                        description,
1871                        write,
1872                        since_handle,
1873                        metadata,
1874                    ))
1875                }
1876            })
1877            // Poll each future for each collection concurrently, maximum of 50 at a time.
1878            .buffer_unordered(50)
1879            // HERE BE DRAGONS:
1880            //
1881            // There are at least 2 subtleties in using `FuturesUnordered`
1882            // (which `buffer_unordered` uses underneath:
1883            // - One is captured here
1884            //   <https://github.com/rust-lang/futures-rs/issues/2387>
1885            // - And the other is deadlocking if processing an OUTPUT of a
1886            //   `FuturesUnordered` stream attempts to obtain an async mutex that
1887            //   is also obtained in the futures being polled.
1888            //
1889            // Both of these could potentially be issues in all usages of
1890            // `buffer_unordered` in this method, so we stick the standard
1891            // advice: only use `try_collect` or `collect`!
1892            .try_collect()
1893            .await?;
1894
1895        // Reorder in dependency order.
1896        #[derive(Ord, PartialOrd, Eq, PartialEq)]
1897        enum DependencyOrder {
1898            /// Tables should always be registered first, and large ids before small ones.
1899            Table(Reverse<GlobalId>),
1900            /// For most collections the id order is the correct one.
1901            Collection(GlobalId),
1902            /// Sinks should always be registered last.
1903            Sink(GlobalId),
1904        }
1905        to_register.sort_by_key(|(id, desc, ..)| match &desc.data_source {
1906            DataSource::Table => DependencyOrder::Table(Reverse(*id)),
1907            DataSource::Sink { .. } => DependencyOrder::Sink(*id),
1908            _ => DependencyOrder::Collection(*id),
1909        });
1910
1911        // We hold this lock for a very short amount of time, just doing some
1912        // hashmap inserts and unbounded channel sends.
1913        let mut self_collections = self.collections.lock().expect("lock poisoned");
1914
1915        for (id, description, write_handle, since_handle, metadata) in to_register {
1916            let write_frontier = write_handle.upper();
1917            let data_shard_since = since_handle.since().clone();
1918
1919            // Determine if this collection has any dependencies.
1920            let storage_dependencies =
1921                Self::determine_collection_dependencies(&*self_collections, id, &description)?;
1922
1923            // Determine the initial since of the collection.
1924            let initial_since = match storage_dependencies
1925                .iter()
1926                .at_most_one()
1927                .expect("should have at most one dependency")
1928            {
1929                Some(dep) => {
1930                    let dependency_collection = self_collections
1931                        .get(dep)
1932                        .ok_or(StorageError::IdentifierMissing(*dep))?;
1933                    let dependency_since = dependency_collection.implied_capability.clone();
1934
1935                    // If an item has a dependency, its initial since must be
1936                    // advanced as far as its dependency, i.e. a dependency's
1937                    // since may never be in advance of its dependents.
1938                    //
1939                    // We have to do this every time we initialize the
1940                    // collection, though––the invariant might have been upheld
1941                    // correctly in the previous epoch, but the
1942                    // `data_shard_since` might not have compacted and, on
1943                    // establishing a new persist connection, still have data we
1944                    // said _could_ be compacted.
1945                    if PartialOrder::less_than(&data_shard_since, &dependency_since) {
1946                        // The dependency since cannot be beyond the dependent
1947                        // (our) upper unless the collection is new. In
1948                        // practice, the depdenency is the remap shard of a
1949                        // source (export), and if the since is allowed to
1950                        // "catch up" to the upper, that is `upper <= since`, a
1951                        // restarting ingestion cannot differentiate between
1952                        // updates that have already been written out to the
1953                        // backing persist shard and updates that have yet to be
1954                        // written. We would write duplicate updates.
1955                        //
1956                        // If this check fails, it means that the read hold
1957                        // installed on the dependency was probably not upheld
1958                        // –– if it were, the dependency's since could not have
1959                        // advanced as far the dependent's upper.
1960                        //
1961                        // We don't care about the dependency since when the
1962                        // write frontier is empty. In that case, no-one can
1963                        // write down any more updates.
1964                        mz_ore::soft_assert_or_log!(
1965                            write_frontier.elements() == &[T::minimum()]
1966                                || write_frontier.is_empty()
1967                                || PartialOrder::less_than(&dependency_since, write_frontier),
1968                            "dependency ({dep}) since has advanced past dependent ({id}) upper \n
1969                            dependent ({id}): since {:?}, upper {:?} \n
1970                            dependency ({dep}): since {:?}",
1971                            data_shard_since,
1972                            write_frontier,
1973                            dependency_since
1974                        );
1975
1976                        dependency_since
1977                    } else {
1978                        data_shard_since
1979                    }
1980                }
1981                None => data_shard_since,
1982            };
1983
1984            // Determine the time dependence of the collection.
1985            let time_dependence = {
1986                use DataSource::*;
1987                if let Some(timeline) = &description.timeline
1988                    && *timeline != Timeline::EpochMilliseconds
1989                {
1990                    // Only the epoch timeline follows wall-clock.
1991                    None
1992                } else {
1993                    match &description.data_source {
1994                        Ingestion(ingestion) => {
1995                            use GenericSourceConnection::*;
1996                            match ingestion.desc.connection {
1997                                // Kafka, Postgres, MySql, and SQL Server sources all
1998                                // follow wall clock.
1999                                Kafka(_) | Postgres(_) | MySql(_) | SqlServer(_) => {
2000                                    Some(TimeDependence::default())
2001                                }
2002                                // Load generators not further specified.
2003                                LoadGenerator(_) => None,
2004                            }
2005                        }
2006                        IngestionExport { ingestion_id, .. } => {
2007                            let c = self_collections.get(ingestion_id).expect("known to exist");
2008                            c.time_dependence.clone()
2009                        }
2010                        // Introspection, other, progress, table, and webhook sources follow wall clock.
2011                        Introspection(_) | Progress | Table { .. } | Webhook { .. } => {
2012                            Some(TimeDependence::default())
2013                        }
2014                        // Materialized views, continual tasks, etc, aren't managed by storage.
2015                        Other => None,
2016                        Sink { .. } => None,
2017                    }
2018                }
2019            };
2020
2021            let ingestion_remap_collection_id = match &description.data_source {
2022                DataSource::Ingestion(desc) => Some(desc.remap_collection_id),
2023                _ => None,
2024            };
2025
2026            let mut collection_state = CollectionState::new(
2027                description.primary,
2028                time_dependence,
2029                ingestion_remap_collection_id,
2030                initial_since,
2031                write_frontier.clone(),
2032                storage_dependencies,
2033                metadata.clone(),
2034            );
2035
2036            // Install the collection state in the appropriate spot.
2037            match &description.data_source {
2038                DataSource::Introspection(_) => {
2039                    self_collections.insert(id, collection_state);
2040                }
2041                DataSource::Webhook => {
2042                    self_collections.insert(id, collection_state);
2043                }
2044                DataSource::IngestionExport { .. } => {
2045                    self_collections.insert(id, collection_state);
2046                }
2047                DataSource::Table => {
2048                    // See comment on self.initial_txn_upper on why we're doing
2049                    // this.
2050                    if is_in_txns(id, &metadata)
2051                        && PartialOrder::less_than(
2052                            &collection_state.write_frontier,
2053                            &self.initial_txn_upper,
2054                        )
2055                    {
2056                        // We could try and be cute and use the join of the txn
2057                        // upper and the table upper. But that has more
2058                        // complicated reasoning for why it is or isn't correct,
2059                        // and we're only dealing with totally ordered times
2060                        // here.
2061                        collection_state
2062                            .write_frontier
2063                            .clone_from(&self.initial_txn_upper);
2064                    }
2065                    self_collections.insert(id, collection_state);
2066                }
2067                DataSource::Progress | DataSource::Other => {
2068                    self_collections.insert(id, collection_state);
2069                }
2070                DataSource::Ingestion(_) => {
2071                    self_collections.insert(id, collection_state);
2072                }
2073                DataSource::Sink { .. } => {
2074                    self_collections.insert(id, collection_state);
2075                }
2076            }
2077
2078            self.register_handles(id, is_in_txns(id, &metadata), since_handle, write_handle);
2079
2080            // If this collection has a dependency, install a read hold on it.
2081            self.install_collection_dependency_read_holds_inner(&mut *self_collections, id)?;
2082        }
2083
2084        drop(self_collections);
2085
2086        self.synchronize_finalized_shards(storage_metadata);
2087
2088        Ok(())
2089    }
2090
2091    async fn alter_table_desc(
2092        &self,
2093        existing_collection: GlobalId,
2094        new_collection: GlobalId,
2095        new_desc: RelationDesc,
2096        expected_version: RelationVersion,
2097    ) -> Result<(), StorageError<Self::Timestamp>> {
2098        let data_shard = {
2099            let self_collections = self.collections.lock().expect("lock poisoned");
2100            let existing = self_collections
2101                .get(&existing_collection)
2102                .ok_or_else(|| StorageError::IdentifierMissing(existing_collection))?;
2103
2104            existing.collection_metadata.data_shard
2105        };
2106
2107        let persist_client = self
2108            .persist
2109            .open(self.persist_location.clone())
2110            .await
2111            .unwrap();
2112
2113        // Evolve the schema of this shard.
2114        let diagnostics = Diagnostics {
2115            shard_name: existing_collection.to_string(),
2116            handle_purpose: "alter_table_desc".to_string(),
2117        };
2118        // We map the Adapter's RelationVersion 1:1 with SchemaId.
2119        let expected_schema = expected_version.into();
2120        let schema_result = persist_client
2121            .compare_and_evolve_schema::<SourceData, (), T, StorageDiff>(
2122                data_shard,
2123                expected_schema,
2124                &new_desc,
2125                &UnitSchema,
2126                diagnostics,
2127            )
2128            .await
2129            .map_err(|e| StorageError::InvalidUsage(e.to_string()))?;
2130        tracing::info!(
2131            ?existing_collection,
2132            ?new_collection,
2133            ?new_desc,
2134            "evolved schema"
2135        );
2136
2137        match schema_result {
2138            CaESchema::Ok(id) => id,
2139            // TODO(alter_table): If we get an expected mismatch we should retry.
2140            CaESchema::ExpectedMismatch {
2141                schema_id,
2142                key,
2143                val,
2144            } => {
2145                mz_ore::soft_panic_or_log!(
2146                    "schema expectation mismatch {schema_id:?}, {key:?}, {val:?}"
2147                );
2148                return Err(StorageError::Generic(anyhow::anyhow!(
2149                    "schema expected mismatch, {existing_collection:?}",
2150                )));
2151            }
2152            CaESchema::Incompatible => {
2153                mz_ore::soft_panic_or_log!(
2154                    "incompatible schema! {existing_collection} {new_desc:?}"
2155                );
2156                return Err(StorageError::Generic(anyhow::anyhow!(
2157                    "schema incompatible, {existing_collection:?}"
2158                )));
2159            }
2160        };
2161
2162        // Once the new schema is registered we can open new data handles.
2163        let (write_handle, since_handle) = self
2164            .open_data_handles(
2165                &new_collection,
2166                data_shard,
2167                None,
2168                new_desc.clone(),
2169                &persist_client,
2170            )
2171            .await;
2172
2173        // TODO(alter_table): Do we need to advance the since of the table to match the time this
2174        // new version was registered with txn-wal?
2175
2176        // Great! Our new schema is registered with Persist, now we need to update our internal
2177        // data structures.
2178        {
2179            let mut self_collections = self.collections.lock().expect("lock poisoned");
2180
2181            // Update the existing collection so we know it's a "projection" of this new one.
2182            let existing = self_collections
2183                .get_mut(&existing_collection)
2184                .expect("existing collection missing");
2185
2186            // A higher level should already be asserting this, but let's make sure.
2187            assert_none!(existing.primary);
2188
2189            // The existing version of the table will depend on the new version.
2190            existing.primary = Some(new_collection);
2191            existing.storage_dependencies.push(new_collection);
2192
2193            // Copy over the frontiers from the previous version.
2194            // The new table starts with two holds - the implied capability, and the hold from
2195            // the previous version - both at the previous version's read frontier.
2196            let implied_capability = existing.read_capabilities.frontier().to_owned();
2197            let write_frontier = existing.write_frontier.clone();
2198
2199            // Determine the relevant read capabilities on the new collection.
2200            //
2201            // Note(parkmycar): Originally we used `install_collection_dependency_read_holds_inner`
2202            // here, but that only installed a ReadHold on the new collection for the implied
2203            // capability of the existing collection. This would cause runtime panics because it
2204            // would eventually result in negative read capabilities.
2205            let mut changes = ChangeBatch::new();
2206            changes.extend(implied_capability.iter().map(|t| (t.clone(), 1)));
2207
2208            // Note: The new collection is now the "primary collection".
2209            let collection_meta = CollectionMetadata {
2210                persist_location: self.persist_location.clone(),
2211                relation_desc: new_desc.clone(),
2212                data_shard,
2213                txns_shard: Some(self.txns_read.txns_id().clone()),
2214            };
2215            let collection_state = CollectionState::new(
2216                None,
2217                existing.time_dependence.clone(),
2218                existing.ingestion_remap_collection_id.clone(),
2219                implied_capability,
2220                write_frontier,
2221                Vec::new(),
2222                collection_meta,
2223            );
2224
2225            // Add a record of the new collection.
2226            self_collections.insert(new_collection, collection_state);
2227
2228            let mut updates = BTreeMap::from([(new_collection, changes)]);
2229            StorageCollectionsImpl::update_read_capabilities_inner(
2230                &self.cmd_tx,
2231                &mut *self_collections,
2232                &mut updates,
2233            );
2234        };
2235
2236        // TODO(alter_table): Support changes to sources.
2237        self.register_handles(new_collection, true, since_handle, write_handle);
2238
2239        info!(%existing_collection, %new_collection, ?new_desc, "altered table");
2240
2241        Ok(())
2242    }
2243
2244    fn drop_collections_unvalidated(
2245        &self,
2246        storage_metadata: &StorageMetadata,
2247        identifiers: Vec<GlobalId>,
2248    ) {
2249        debug!(?identifiers, "drop_collections_unvalidated");
2250
2251        let mut self_collections = self.collections.lock().expect("lock poisoned");
2252
2253        // Policies that advance the since to the empty antichain. We do still
2254        // honor outstanding read holds, and collections will only be dropped
2255        // once those are removed as well.
2256        //
2257        // We don't explicitly remove read capabilities! Downgrading the
2258        // frontier of the source to `[]` (the empty Antichain), will propagate
2259        // to the storage dependencies.
2260        let mut finalized_policies = Vec::new();
2261
2262        for id in identifiers {
2263            // Make sure it's still there, might already have been deleted.
2264            let Some(collection) = self_collections.get(&id) else {
2265                continue;
2266            };
2267
2268            // Unless the collection has a primary, its shard must have been previously removed
2269            // by `StorageCollections::prepare_state`.
2270            if collection.primary.is_none() {
2271                let metadata = storage_metadata.get_collection_shard::<T>(id);
2272                mz_ore::soft_assert_or_log!(
2273                    matches!(metadata, Err(StorageError::IdentifierMissing(_))),
2274                    "dropping {id}, but drop was not synchronized with storage \
2275                     controller via `prepare_state`"
2276                );
2277            }
2278
2279            finalized_policies.push((id, ReadPolicy::ValidFrom(Antichain::new())));
2280        }
2281
2282        self.set_read_policies_inner(&mut self_collections, finalized_policies);
2283
2284        drop(self_collections);
2285
2286        self.synchronize_finalized_shards(storage_metadata);
2287    }
2288
2289    fn set_read_policies(&self, policies: Vec<(GlobalId, ReadPolicy<Self::Timestamp>)>) {
2290        let mut collections = self.collections.lock().expect("lock poisoned");
2291
2292        if tracing::enabled!(tracing::Level::TRACE) {
2293            let user_capabilities = collections
2294                .iter_mut()
2295                .filter(|(id, _c)| id.is_user())
2296                .map(|(id, c)| {
2297                    let updates = c.read_capabilities.updates().cloned().collect_vec();
2298                    (*id, c.implied_capability.clone(), updates)
2299                })
2300                .collect_vec();
2301
2302            trace!(?policies, ?user_capabilities, "set_read_policies");
2303        }
2304
2305        self.set_read_policies_inner(&mut collections, policies);
2306
2307        if tracing::enabled!(tracing::Level::TRACE) {
2308            let user_capabilities = collections
2309                .iter_mut()
2310                .filter(|(id, _c)| id.is_user())
2311                .map(|(id, c)| {
2312                    let updates = c.read_capabilities.updates().cloned().collect_vec();
2313                    (*id, c.implied_capability.clone(), updates)
2314                })
2315                .collect_vec();
2316
2317            trace!(?user_capabilities, "after! set_read_policies");
2318        }
2319    }
2320
2321    fn acquire_read_holds(
2322        &self,
2323        desired_holds: Vec<GlobalId>,
2324    ) -> Result<Vec<ReadHold<Self::Timestamp>>, CollectionMissing> {
2325        if desired_holds.is_empty() {
2326            return Ok(vec![]);
2327        }
2328
2329        let mut collections = self.collections.lock().expect("lock poisoned");
2330
2331        let mut advanced_holds = Vec::new();
2332        // We advance the holds by our current since frontier. Can't acquire
2333        // holds for times that have been compacted away!
2334        //
2335        // NOTE: We acquire read holds at the earliest possible time rather than
2336        // at the implied capability. This is so that, for example, adapter can
2337        // acquire a read hold to hold back the frontier, giving the COMPUTE
2338        // controller a chance to also acquire a read hold at that early
2339        // frontier. If/when we change the interplay between adapter and COMPUTE
2340        // to pass around ReadHold tokens, we might tighten this up and instead
2341        // acquire read holds at the implied capability.
2342        for id in desired_holds.iter() {
2343            let collection = collections.get(id).ok_or(CollectionMissing(*id))?;
2344            let since = collection.read_capabilities.frontier().to_owned();
2345            advanced_holds.push((*id, since));
2346        }
2347
2348        let mut updates = advanced_holds
2349            .iter()
2350            .map(|(id, hold)| {
2351                let mut changes = ChangeBatch::new();
2352                changes.extend(hold.iter().map(|time| (time.clone(), 1)));
2353                (*id, changes)
2354            })
2355            .collect::<BTreeMap<_, _>>();
2356
2357        StorageCollectionsImpl::update_read_capabilities_inner(
2358            &self.cmd_tx,
2359            &mut collections,
2360            &mut updates,
2361        );
2362
2363        let acquired_holds = advanced_holds
2364            .into_iter()
2365            .map(|(id, since)| ReadHold::with_channel(id, since, self.holds_tx.clone()))
2366            .collect_vec();
2367
2368        trace!(?desired_holds, ?acquired_holds, "acquire_read_holds");
2369
2370        Ok(acquired_holds)
2371    }
2372
2373    /// Determine time dependence information for the object.
2374    fn determine_time_dependence(
2375        &self,
2376        id: GlobalId,
2377    ) -> Result<Option<TimeDependence>, TimeDependenceError> {
2378        use TimeDependenceError::CollectionMissing;
2379        let collections = self.collections.lock().expect("lock poisoned");
2380        let state = collections.get(&id).ok_or(CollectionMissing(id))?;
2381        Ok(state.time_dependence.clone())
2382    }
2383
2384    fn dump(&self) -> Result<serde_json::Value, anyhow::Error> {
2385        // Destructure `self` here so we don't forget to consider dumping newly added fields.
2386        let Self {
2387            envd_epoch,
2388            read_only,
2389            finalizable_shards,
2390            finalized_shards,
2391            collections,
2392            txns_read: _,
2393            config,
2394            initial_txn_upper,
2395            persist_location,
2396            persist: _,
2397            cmd_tx: _,
2398            holds_tx: _,
2399            _background_task: _,
2400            _finalize_shards_task: _,
2401        } = self;
2402
2403        let finalizable_shards: Vec<_> = finalizable_shards
2404            .lock()
2405            .iter()
2406            .map(ToString::to_string)
2407            .collect();
2408        let finalized_shards: Vec<_> = finalized_shards
2409            .lock()
2410            .iter()
2411            .map(ToString::to_string)
2412            .collect();
2413        let collections: BTreeMap<_, _> = collections
2414            .lock()
2415            .expect("poisoned")
2416            .iter()
2417            .map(|(id, c)| (id.to_string(), format!("{c:?}")))
2418            .collect();
2419        let config = format!("{:?}", config.lock().expect("poisoned"));
2420
2421        Ok(serde_json::json!({
2422            "envd_epoch": envd_epoch,
2423            "read_only": read_only,
2424            "finalizable_shards": finalizable_shards,
2425            "finalized_shards": finalized_shards,
2426            "collections": collections,
2427            "config": config,
2428            "initial_txn_upper": initial_txn_upper,
2429            "persist_location": format!("{persist_location:?}"),
2430        }))
2431    }
2432}
2433
2434/// Wraps either a "critical" [SinceHandle] or a leased [ReadHandle].
2435///
2436/// When a [StorageCollections] is in read-only mode, we will only ever acquire
2437/// [ReadHandle], because acquiring the [SinceHandle] and driving forward its
2438/// since is considered a write. Conversely, when in read-write mode, we acquire
2439/// [SinceHandle].
2440#[derive(Debug)]
2441enum SinceHandleWrapper<T>
2442where
2443    T: TimelyTimestamp + Lattice + Codec64,
2444{
2445    Critical(SinceHandle<SourceData, (), T, StorageDiff, PersistEpoch>),
2446    Leased(ReadHandle<SourceData, (), T, StorageDiff>),
2447}
2448
2449impl<T> SinceHandleWrapper<T>
2450where
2451    T: TimelyTimestamp + Lattice + Codec64 + TotalOrder + Sync,
2452{
2453    pub fn since(&self) -> &Antichain<T> {
2454        match self {
2455            Self::Critical(handle) => handle.since(),
2456            Self::Leased(handle) => handle.since(),
2457        }
2458    }
2459
2460    pub fn opaque(&self) -> PersistEpoch {
2461        match self {
2462            Self::Critical(handle) => handle.opaque().clone(),
2463            Self::Leased(_handle) => {
2464                // The opaque is expected to be used with
2465                // `compare_and_downgrade_since`, and the leased handle doesn't
2466                // have a notion of an opaque. We pretend here and in
2467                // `compare_and_downgrade_since`.
2468                PersistEpoch(None)
2469            }
2470        }
2471    }
2472
2473    pub async fn compare_and_downgrade_since(
2474        &mut self,
2475        expected: &PersistEpoch,
2476        new: (&PersistEpoch, &Antichain<T>),
2477    ) -> Result<Antichain<T>, PersistEpoch> {
2478        match self {
2479            Self::Critical(handle) => handle.compare_and_downgrade_since(expected, new).await,
2480            Self::Leased(handle) => {
2481                let (opaque, since) = new;
2482                assert_none!(opaque.0);
2483
2484                handle.downgrade_since(since).await;
2485
2486                Ok(since.clone())
2487            }
2488        }
2489    }
2490
2491    pub async fn maybe_compare_and_downgrade_since(
2492        &mut self,
2493        expected: &PersistEpoch,
2494        new: (&PersistEpoch, &Antichain<T>),
2495    ) -> Option<Result<Antichain<T>, PersistEpoch>> {
2496        match self {
2497            Self::Critical(handle) => {
2498                handle
2499                    .maybe_compare_and_downgrade_since(expected, new)
2500                    .await
2501            }
2502            Self::Leased(handle) => {
2503                let (opaque, since) = new;
2504                assert_none!(opaque.0);
2505
2506                handle.maybe_downgrade_since(since).await;
2507
2508                Some(Ok(since.clone()))
2509            }
2510        }
2511    }
2512
2513    pub fn snapshot_stats(
2514        &self,
2515        id: GlobalId,
2516        as_of: Option<Antichain<T>>,
2517    ) -> BoxFuture<'static, Result<SnapshotStats, StorageError<T>>> {
2518        match self {
2519            Self::Critical(handle) => {
2520                let res = handle
2521                    .snapshot_stats(as_of)
2522                    .map(move |x| x.map_err(|_| StorageError::ReadBeforeSince(id)));
2523                Box::pin(res)
2524            }
2525            Self::Leased(handle) => {
2526                let res = handle
2527                    .snapshot_stats(as_of)
2528                    .map(move |x| x.map_err(|_| StorageError::ReadBeforeSince(id)));
2529                Box::pin(res)
2530            }
2531        }
2532    }
2533
2534    pub fn snapshot_stats_from_txn(
2535        &self,
2536        id: GlobalId,
2537        data_snapshot: DataSnapshot<T>,
2538    ) -> BoxFuture<'static, Result<SnapshotStats, StorageError<T>>> {
2539        match self {
2540            Self::Critical(handle) => Box::pin(
2541                data_snapshot
2542                    .snapshot_stats_from_critical(handle)
2543                    .map(move |x| x.map_err(|_| StorageError::ReadBeforeSince(id))),
2544            ),
2545            Self::Leased(handle) => Box::pin(
2546                data_snapshot
2547                    .snapshot_stats_from_leased(handle)
2548                    .map(move |x| x.map_err(|_| StorageError::ReadBeforeSince(id))),
2549            ),
2550        }
2551    }
2552}
2553
2554/// State maintained about individual collections.
2555#[derive(Debug, Clone)]
2556struct CollectionState<T> {
2557    /// The primary of this collections.
2558    ///
2559    /// Multiple storage collections can point to the same persist shard,
2560    /// possibly with different schemas. In such a configuration, we select one
2561    /// of the involved collections as the primary, who "owns" the persist
2562    /// shard. All other involved collections have a dependency on the primary.
2563    primary: Option<GlobalId>,
2564
2565    /// Description of how this collection's frontier follows time.
2566    time_dependence: Option<TimeDependence>,
2567    /// The ID of the source remap/progress collection, if this is an ingestion.
2568    ingestion_remap_collection_id: Option<GlobalId>,
2569
2570    /// Accumulation of read capabilities for the collection.
2571    ///
2572    /// This accumulation will always contain `self.implied_capability`, but may
2573    /// also contain capabilities held by others who have read dependencies on
2574    /// this collection.
2575    pub read_capabilities: MutableAntichain<T>,
2576
2577    /// The implicit capability associated with collection creation.  This
2578    /// should never be less than the since of the associated persist
2579    /// collection.
2580    pub implied_capability: Antichain<T>,
2581
2582    /// The policy to use to downgrade `self.implied_capability`.
2583    pub read_policy: ReadPolicy<T>,
2584
2585    /// Storage identifiers on which this collection depends.
2586    pub storage_dependencies: Vec<GlobalId>,
2587
2588    /// Reported write frontier.
2589    pub write_frontier: Antichain<T>,
2590
2591    pub collection_metadata: CollectionMetadata,
2592}
2593
2594impl<T: TimelyTimestamp> CollectionState<T> {
2595    /// Creates a new collection state, with an initial read policy valid from
2596    /// `since`.
2597    pub fn new(
2598        primary: Option<GlobalId>,
2599        time_dependence: Option<TimeDependence>,
2600        ingestion_remap_collection_id: Option<GlobalId>,
2601        since: Antichain<T>,
2602        write_frontier: Antichain<T>,
2603        storage_dependencies: Vec<GlobalId>,
2604        metadata: CollectionMetadata,
2605    ) -> Self {
2606        let mut read_capabilities = MutableAntichain::new();
2607        read_capabilities.update_iter(since.iter().map(|time| (time.clone(), 1)));
2608        Self {
2609            primary,
2610            time_dependence,
2611            ingestion_remap_collection_id,
2612            read_capabilities,
2613            implied_capability: since.clone(),
2614            read_policy: ReadPolicy::NoPolicy {
2615                initial_since: since,
2616            },
2617            storage_dependencies,
2618            write_frontier,
2619            collection_metadata: metadata,
2620        }
2621    }
2622
2623    /// Returns whether the collection was dropped.
2624    pub fn is_dropped(&self) -> bool {
2625        self.read_capabilities.is_empty()
2626    }
2627}
2628
2629/// A task that keeps persist handles, downgrades sinces when asked,
2630/// periodically gets recent uppers from them, and updates the shard collection
2631/// state when needed.
2632///
2633/// This shares state with [StorageCollectionsImpl] via `Arcs` and channels.
2634#[derive(Debug)]
2635struct BackgroundTask<T: TimelyTimestamp + Lattice + Codec64> {
2636    config: Arc<Mutex<StorageConfiguration>>,
2637    cmds_tx: mpsc::UnboundedSender<BackgroundCmd<T>>,
2638    cmds_rx: mpsc::UnboundedReceiver<BackgroundCmd<T>>,
2639    holds_rx: mpsc::UnboundedReceiver<(GlobalId, ChangeBatch<T>)>,
2640    finalizable_shards: Arc<ShardIdSet>,
2641    collections: Arc<std::sync::Mutex<BTreeMap<GlobalId, CollectionState<T>>>>,
2642    // So we know what shard ID corresponds to what global ID, which we need
2643    // when re-enqueing futures for determining the next upper update.
2644    shard_by_id: BTreeMap<GlobalId, ShardId>,
2645    since_handles: BTreeMap<GlobalId, SinceHandleWrapper<T>>,
2646    txns_handle: Option<WriteHandle<SourceData, (), T, StorageDiff>>,
2647    txns_shards: BTreeSet<GlobalId>,
2648}
2649
2650#[derive(Debug)]
2651enum BackgroundCmd<T: TimelyTimestamp + Lattice + Codec64> {
2652    Register {
2653        id: GlobalId,
2654        is_in_txns: bool,
2655        write_handle: WriteHandle<SourceData, (), T, StorageDiff>,
2656        since_handle: SinceHandleWrapper<T>,
2657    },
2658    DowngradeSince(Vec<(GlobalId, Antichain<T>)>),
2659    SnapshotStats(
2660        GlobalId,
2661        SnapshotStatsAsOf<T>,
2662        oneshot::Sender<SnapshotStatsRes<T>>,
2663    ),
2664}
2665
2666/// A newtype wrapper to hang a Debug impl off of.
2667pub(crate) struct SnapshotStatsRes<T>(BoxFuture<'static, Result<SnapshotStats, StorageError<T>>>);
2668
2669impl<T> Debug for SnapshotStatsRes<T> {
2670    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2671        f.debug_struct("SnapshotStatsRes").finish_non_exhaustive()
2672    }
2673}
2674
2675impl<T> BackgroundTask<T>
2676where
2677    T: TimelyTimestamp
2678        + Lattice
2679        + Codec64
2680        + From<EpochMillis>
2681        + TimestampManipulation
2682        + Into<mz_repr::Timestamp>
2683        + Sync,
2684{
2685    async fn run(&mut self) {
2686        // Futures that fetch the recent upper from all other shards.
2687        let mut upper_futures: FuturesUnordered<
2688            std::pin::Pin<
2689                Box<
2690                    dyn Future<
2691                            Output = (
2692                                GlobalId,
2693                                WriteHandle<SourceData, (), T, StorageDiff>,
2694                                Antichain<T>,
2695                            ),
2696                        > + Send,
2697                >,
2698            >,
2699        > = FuturesUnordered::new();
2700
2701        let gen_upper_future =
2702            |id, mut handle: WriteHandle<_, _, _, _>, prev_upper: Antichain<T>| {
2703                let fut = async move {
2704                    soft_assert_or_log!(
2705                        !prev_upper.is_empty(),
2706                        "cannot await progress when upper is already empty"
2707                    );
2708                    handle.wait_for_upper_past(&prev_upper).await;
2709                    let new_upper = handle.shared_upper();
2710                    (id, handle, new_upper)
2711                };
2712
2713                fut
2714            };
2715
2716        let mut txns_upper_future = match self.txns_handle.take() {
2717            Some(txns_handle) => {
2718                let upper = txns_handle.upper().clone();
2719                let txns_upper_future =
2720                    gen_upper_future(GlobalId::Transient(1), txns_handle, upper);
2721                txns_upper_future.boxed()
2722            }
2723            None => async { std::future::pending().await }.boxed(),
2724        };
2725
2726        loop {
2727            tokio::select! {
2728                (id, handle, upper) = &mut txns_upper_future => {
2729                    trace!("new upper from txns shard: {:?}", upper);
2730                    let mut uppers = Vec::new();
2731                    for id in self.txns_shards.iter() {
2732                        uppers.push((*id, &upper));
2733                    }
2734                    self.update_write_frontiers(&uppers).await;
2735
2736                    let fut = gen_upper_future(id, handle, upper);
2737                    txns_upper_future = fut.boxed();
2738                }
2739                Some((id, handle, upper)) = upper_futures.next() => {
2740                    if id.is_user() {
2741                        trace!("new upper for collection {id}: {:?}", upper);
2742                    }
2743                    let current_shard = self.shard_by_id.get(&id);
2744                    if let Some(shard_id) = current_shard {
2745                        if shard_id == &handle.shard_id() {
2746                            // Still current, so process the update and enqueue
2747                            // again!
2748                            let uppers = &[(id, &upper)];
2749                            self.update_write_frontiers(uppers).await;
2750                            if !upper.is_empty() {
2751                                let fut = gen_upper_future(id, handle, upper);
2752                                upper_futures.push(fut.boxed());
2753                            }
2754                        } else {
2755                            // Be polite and expire the write handle. This can
2756                            // happen when we get an upper update for a write
2757                            // handle that has since been replaced via Update.
2758                            handle.expire().await;
2759                        }
2760                    }
2761                }
2762                cmd = self.cmds_rx.recv() => {
2763                    let cmd = if let Some(cmd) = cmd {
2764                        cmd
2765                    } else {
2766                        // We're done!
2767                        break;
2768                    };
2769
2770                    match cmd {
2771                        BackgroundCmd::Register{ id, is_in_txns, write_handle, since_handle } => {
2772                            debug!("registering handles for {}", id);
2773                            let previous = self.shard_by_id.insert(id, write_handle.shard_id());
2774                            if previous.is_some() {
2775                                panic!("already registered a WriteHandle for collection {id}");
2776                            }
2777
2778                            let previous = self.since_handles.insert(id, since_handle);
2779                            if previous.is_some() {
2780                                panic!("already registered a SinceHandle for collection {id}");
2781                            }
2782
2783                            if is_in_txns {
2784                                self.txns_shards.insert(id);
2785                            } else {
2786                                let upper = write_handle.upper().clone();
2787                                if !upper.is_empty() {
2788                                    let fut = gen_upper_future(id, write_handle, upper);
2789                                    upper_futures.push(fut.boxed());
2790                                }
2791                            }
2792
2793                        }
2794                        BackgroundCmd::DowngradeSince(cmds) => {
2795                            self.downgrade_sinces(cmds).await;
2796                        }
2797                        BackgroundCmd::SnapshotStats(id, as_of, tx) => {
2798                            // NB: The requested as_of could be arbitrarily far
2799                            // in the future. So, in order to avoid blocking
2800                            // this loop until it's available and the
2801                            // `snapshot_stats` call resolves, instead return
2802                            // the future to the caller and await it there.
2803                            let res = match self.since_handles.get(&id) {
2804                                Some(x) => {
2805                                    let fut: BoxFuture<
2806                                        'static,
2807                                        Result<SnapshotStats, StorageError<T>>,
2808                                    > = match as_of {
2809                                        SnapshotStatsAsOf::Direct(as_of) => {
2810                                            x.snapshot_stats(id, Some(as_of))
2811                                        }
2812                                        SnapshotStatsAsOf::Txns(data_snapshot) => {
2813                                            x.snapshot_stats_from_txn(id, data_snapshot)
2814                                        }
2815                                    };
2816                                    SnapshotStatsRes(fut)
2817                                }
2818                                None => SnapshotStatsRes(Box::pin(futures::future::ready(Err(
2819                                    StorageError::IdentifierMissing(id),
2820                                )))),
2821                            };
2822                            // It's fine if the listener hung up.
2823                            let _ = tx.send(res);
2824                        }
2825                    }
2826                }
2827                Some(holds_changes) = self.holds_rx.recv() => {
2828                    let mut batched_changes = BTreeMap::new();
2829                    batched_changes.insert(holds_changes.0, holds_changes.1);
2830
2831                    while let Ok(mut holds_changes) = self.holds_rx.try_recv() {
2832                        let entry = batched_changes.entry(holds_changes.0);
2833                        entry
2834                            .and_modify(|existing| existing.extend(holds_changes.1.drain()))
2835                            .or_insert_with(|| holds_changes.1);
2836                    }
2837
2838                    let mut collections = self.collections.lock().expect("lock poisoned");
2839
2840                    let user_changes = batched_changes
2841                        .iter()
2842                        .filter(|(id, _c)| id.is_user())
2843                        .map(|(id, c)| {
2844                            (id.clone(), c.clone())
2845                        })
2846                        .collect_vec();
2847
2848                    if !user_changes.is_empty() {
2849                        trace!(?user_changes, "applying holds changes from channel");
2850                    }
2851
2852                    StorageCollectionsImpl::update_read_capabilities_inner(
2853                        &self.cmds_tx,
2854                        &mut collections,
2855                        &mut batched_changes,
2856                    );
2857                }
2858            }
2859        }
2860
2861        warn!("BackgroundTask shutting down");
2862    }
2863
2864    #[instrument(level = "debug")]
2865    async fn update_write_frontiers(&self, updates: &[(GlobalId, &Antichain<T>)]) {
2866        let mut read_capability_changes = BTreeMap::default();
2867
2868        let mut self_collections = self.collections.lock().expect("lock poisoned");
2869
2870        for (id, new_upper) in updates.iter() {
2871            let collection = if let Some(c) = self_collections.get_mut(id) {
2872                c
2873            } else {
2874                trace!(
2875                    "Reference to absent collection {id}, due to concurrent removal of that collection"
2876                );
2877                continue;
2878            };
2879
2880            if PartialOrder::less_than(&collection.write_frontier, *new_upper) {
2881                collection.write_frontier.clone_from(new_upper);
2882            }
2883
2884            let mut new_read_capability = collection
2885                .read_policy
2886                .frontier(collection.write_frontier.borrow());
2887
2888            if id.is_user() {
2889                trace!(
2890                    %id,
2891                    implied_capability = ?collection.implied_capability,
2892                    policy = ?collection.read_policy,
2893                    write_frontier = ?collection.write_frontier,
2894                    ?new_read_capability,
2895                    "update_write_frontiers");
2896            }
2897
2898            if PartialOrder::less_equal(&collection.implied_capability, &new_read_capability) {
2899                let mut update = ChangeBatch::new();
2900                update.extend(new_read_capability.iter().map(|time| (time.clone(), 1)));
2901                std::mem::swap(&mut collection.implied_capability, &mut new_read_capability);
2902                update.extend(new_read_capability.iter().map(|time| (time.clone(), -1)));
2903
2904                if !update.is_empty() {
2905                    read_capability_changes.insert(*id, update);
2906                }
2907            }
2908        }
2909
2910        if !read_capability_changes.is_empty() {
2911            StorageCollectionsImpl::update_read_capabilities_inner(
2912                &self.cmds_tx,
2913                &mut self_collections,
2914                &mut read_capability_changes,
2915            );
2916        }
2917    }
2918
2919    async fn downgrade_sinces(&mut self, cmds: Vec<(GlobalId, Antichain<T>)>) {
2920        for (id, new_since) in cmds {
2921            let since_handle = if let Some(c) = self.since_handles.get_mut(&id) {
2922                c
2923            } else {
2924                // This can happen when someone concurrently drops a collection.
2925                trace!("downgrade_sinces: reference to absent collection {id}");
2926                continue;
2927            };
2928
2929            if id.is_user() {
2930                trace!("downgrading since of {} to {:?}", id, new_since);
2931            }
2932
2933            let epoch = since_handle.opaque().clone();
2934            let result = if new_since.is_empty() {
2935                // A shard's since reaching the empty frontier is a prereq for
2936                // being able to finalize a shard, so the final downgrade should
2937                // never be rate-limited.
2938                let res = Some(
2939                    since_handle
2940                        .compare_and_downgrade_since(&epoch, (&epoch, &new_since))
2941                        .await,
2942                );
2943
2944                info!(%id, "removing persist handles because the since advanced to []!");
2945
2946                let _since_handle = self.since_handles.remove(&id).expect("known to exist");
2947                let dropped_shard_id = if let Some(shard_id) = self.shard_by_id.remove(&id) {
2948                    shard_id
2949                } else {
2950                    panic!("missing GlobalId -> ShardId mapping for id {id}");
2951                };
2952
2953                // We're not responsible for writes to tables, so we also don't
2954                // de-register them from the txn system. Whoever is responsible
2955                // will remove them. We only make sure to remove the table from
2956                // our tracking.
2957                self.txns_shards.remove(&id);
2958
2959                if !self
2960                    .config
2961                    .lock()
2962                    .expect("lock poisoned")
2963                    .parameters
2964                    .finalize_shards
2965                {
2966                    info!(
2967                        "not triggering shard finalization due to dropped storage object because enable_storage_shard_finalization parameter is false"
2968                    );
2969                    return;
2970                }
2971
2972                info!(%id, %dropped_shard_id, "enqueing shard finalization due to dropped collection and dropped persist handle");
2973
2974                self.finalizable_shards.lock().insert(dropped_shard_id);
2975
2976                res
2977            } else {
2978                since_handle
2979                    .maybe_compare_and_downgrade_since(&epoch, (&epoch, &new_since))
2980                    .await
2981            };
2982
2983            if let Some(Err(other_epoch)) = result {
2984                mz_ore::halt!("fenced by envd @ {other_epoch:?}. ours = {epoch:?}");
2985            }
2986        }
2987    }
2988}
2989
2990struct FinalizeShardsTaskConfig {
2991    envd_epoch: NonZeroI64,
2992    config: Arc<Mutex<StorageConfiguration>>,
2993    metrics: StorageCollectionsMetrics,
2994    finalizable_shards: Arc<ShardIdSet>,
2995    finalized_shards: Arc<ShardIdSet>,
2996    persist_location: PersistLocation,
2997    persist: Arc<PersistClientCache>,
2998    read_only: bool,
2999}
3000
3001async fn finalize_shards_task<T>(
3002    FinalizeShardsTaskConfig {
3003        envd_epoch,
3004        config,
3005        metrics,
3006        finalizable_shards,
3007        finalized_shards,
3008        persist_location,
3009        persist,
3010        read_only,
3011    }: FinalizeShardsTaskConfig,
3012) where
3013    T: TimelyTimestamp + TotalOrder + Lattice + Codec64 + Sync,
3014{
3015    if read_only {
3016        info!("disabling shard finalization in read only mode");
3017        return;
3018    }
3019
3020    let mut interval = tokio::time::interval(Duration::from_secs(5));
3021    interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
3022    loop {
3023        interval.tick().await;
3024
3025        if !config
3026            .lock()
3027            .expect("lock poisoned")
3028            .parameters
3029            .finalize_shards
3030        {
3031            debug!(
3032                "not triggering shard finalization due to dropped storage object because enable_storage_shard_finalization parameter is false"
3033            );
3034            continue;
3035        }
3036
3037        let current_finalizable_shards = {
3038            // We hold the lock for as short as possible and pull our cloned set
3039            // of shards.
3040            finalizable_shards.lock().iter().cloned().collect_vec()
3041        };
3042
3043        if current_finalizable_shards.is_empty() {
3044            debug!("no shards to finalize");
3045            continue;
3046        }
3047
3048        debug!(?current_finalizable_shards, "attempting to finalize shards");
3049
3050        // Open a persist client to delete unused shards.
3051        let persist_client = persist.open(persist_location.clone()).await.unwrap();
3052
3053        let metrics = &metrics;
3054        let finalizable_shards = &finalizable_shards;
3055        let finalized_shards = &finalized_shards;
3056        let persist_client = &persist_client;
3057        let diagnostics = &Diagnostics::from_purpose("finalizing shards");
3058
3059        let force_downgrade_since = STORAGE_DOWNGRADE_SINCE_DURING_FINALIZATION
3060            .get(config.lock().expect("lock poisoned").config_set());
3061
3062        let epoch = &PersistEpoch::from(envd_epoch);
3063
3064        futures::stream::iter(current_finalizable_shards.clone())
3065            .map(|shard_id| async move {
3066                let persist_client = persist_client.clone();
3067                let diagnostics = diagnostics.clone();
3068                let epoch = epoch.clone();
3069
3070                metrics.finalization_started.inc();
3071
3072                let is_finalized = persist_client
3073                    .is_finalized::<SourceData, (), T, StorageDiff>(shard_id, diagnostics)
3074                    .await
3075                    .expect("invalid persist usage");
3076
3077                if is_finalized {
3078                    debug!(%shard_id, "shard is already finalized!");
3079                    Some(shard_id)
3080                } else {
3081                    debug!(%shard_id, "finalizing shard");
3082                    let finalize = || async move {
3083                        // TODO: thread the global ID into the shard finalization WAL
3084                        let diagnostics = Diagnostics::from_purpose("finalizing shards");
3085
3086                        // We only use the writer to advance the upper, so using a dummy schema is
3087                        // fine.
3088                        let mut write_handle: WriteHandle<SourceData, (), T, StorageDiff> =
3089                            persist_client
3090                                .open_writer(
3091                                    shard_id,
3092                                    Arc::new(RelationDesc::empty()),
3093                                    Arc::new(UnitSchema),
3094                                    diagnostics,
3095                                )
3096                                .await
3097                                .expect("invalid persist usage");
3098                        write_handle.advance_upper(&Antichain::new()).await;
3099                        write_handle.expire().await;
3100
3101                        if force_downgrade_since {
3102                            let mut since_handle: SinceHandle<
3103                                SourceData,
3104                                (),
3105                                T,
3106                                StorageDiff,
3107                                PersistEpoch,
3108                            > = persist_client
3109                                .open_critical_since(
3110                                    shard_id,
3111                                    PersistClient::CONTROLLER_CRITICAL_SINCE,
3112                                    Diagnostics::from_purpose("finalizing shards"),
3113                                )
3114                                .await
3115                                .expect("invalid persist usage");
3116                            let handle_epoch = since_handle.opaque().clone();
3117                            let our_epoch = epoch.clone();
3118                            let epoch = if our_epoch.0 > handle_epoch.0 {
3119                                // We're newer, but it's fine to use the
3120                                // handle's old epoch to try and downgrade.
3121                                handle_epoch
3122                            } else {
3123                                // Good luck, buddy! The downgrade below will
3124                                // not succeed. There's a process with a newer
3125                                // epoch out there and someone at some juncture
3126                                // will fence out this process.
3127                                our_epoch
3128                            };
3129                            let new_since = Antichain::new();
3130                            let downgrade = since_handle
3131                                .compare_and_downgrade_since(&epoch, (&epoch, &new_since))
3132                                .await;
3133                            if let Err(e) = downgrade {
3134                                warn!("tried to finalize a shard with an advancing epoch: {e:?}");
3135                                return Ok(());
3136                            }
3137                            // Not available now, so finalization is broken.
3138                            // since_handle.expire().await;
3139                        }
3140
3141                        persist_client
3142                            .finalize_shard::<SourceData, (), T, StorageDiff>(
3143                                shard_id,
3144                                Diagnostics::from_purpose("finalizing shards"),
3145                            )
3146                            .await
3147                    };
3148
3149                    match finalize().await {
3150                        Err(e) => {
3151                            // Rather than error, just leave this shard as
3152                            // one to finalize later.
3153                            warn!("error during finalization of shard {shard_id}: {e:?}");
3154                            None
3155                        }
3156                        Ok(()) => {
3157                            debug!(%shard_id, "finalize success!");
3158                            Some(shard_id)
3159                        }
3160                    }
3161                }
3162            })
3163            // Poll each future for each collection concurrently, maximum of 10
3164            // at a time.
3165            // TODO(benesch): the concurrency here should be configurable
3166            // via LaunchDarkly.
3167            .buffer_unordered(10)
3168            // HERE BE DRAGONS: see warning on other uses of buffer_unordered.
3169            // The closure passed to `for_each` must remain fast or we risk
3170            // starving the finalization futures of calls to `poll`.
3171            .for_each(|shard_id| async move {
3172                match shard_id {
3173                    None => metrics.finalization_failed.inc(),
3174                    Some(shard_id) => {
3175                        // We make successfully finalized shards available for
3176                        // removal from the finalization WAL one by one, so that
3177                        // a handful of stuck shards don't prevent us from
3178                        // removing the shards that have made progress. The
3179                        // overhead of repeatedly acquiring and releasing the
3180                        // locks is negligible.
3181                        {
3182                            let mut finalizable_shards = finalizable_shards.lock();
3183                            let mut finalized_shards = finalized_shards.lock();
3184                            finalizable_shards.remove(&shard_id);
3185                            finalized_shards.insert(shard_id);
3186                        }
3187
3188                        metrics.finalization_succeeded.inc();
3189                    }
3190                }
3191            })
3192            .await;
3193
3194        debug!("done finalizing shards");
3195    }
3196}
3197
3198#[derive(Debug)]
3199pub(crate) enum SnapshotStatsAsOf<T: TimelyTimestamp + Lattice + Codec64> {
3200    /// Stats for a shard with an "eager" upper (one that continually advances
3201    /// as time passes, even if no writes are coming in).
3202    Direct(Antichain<T>),
3203    /// Stats for a shard with a "lazy" upper (one that only physically advances
3204    /// in response to writes).
3205    Txns(DataSnapshot<T>),
3206}
3207
3208#[cfg(test)]
3209mod tests {
3210    use std::str::FromStr;
3211    use std::sync::Arc;
3212
3213    use mz_build_info::DUMMY_BUILD_INFO;
3214    use mz_dyncfg::ConfigSet;
3215    use mz_ore::assert_err;
3216    use mz_ore::metrics::{MetricsRegistry, UIntGauge};
3217    use mz_ore::now::SYSTEM_TIME;
3218    use mz_ore::url::SensitiveUrl;
3219    use mz_persist_client::cache::PersistClientCache;
3220    use mz_persist_client::cfg::PersistConfig;
3221    use mz_persist_client::rpc::PubSubClientConnection;
3222    use mz_persist_client::{Diagnostics, PersistClient, PersistLocation, ShardId};
3223    use mz_persist_types::codec_impls::UnitSchema;
3224    use mz_repr::{RelationDesc, Row};
3225    use mz_secrets::InMemorySecretsController;
3226
3227    use super::*;
3228
3229    #[mz_ore::test(tokio::test)]
3230    #[cfg_attr(miri, ignore)] // unsupported operation: integer-to-pointer casts and `ptr::from_exposed_addr`
3231    async fn test_snapshot_stats(&self) {
3232        let persist_location = PersistLocation {
3233            blob_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
3234            consensus_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
3235        };
3236        let persist_client = PersistClientCache::new(
3237            PersistConfig::new_default_configs(&DUMMY_BUILD_INFO, SYSTEM_TIME.clone()),
3238            &MetricsRegistry::new(),
3239            |_, _| PubSubClientConnection::noop(),
3240        );
3241        let persist_client = Arc::new(persist_client);
3242
3243        let (cmds_tx, mut background_task) =
3244            BackgroundTask::new_for_test(persist_location.clone(), Arc::clone(&persist_client));
3245        let background_task =
3246            mz_ore::task::spawn(|| "storage_collections::background_task", async move {
3247                background_task.run().await
3248            });
3249
3250        let persist = persist_client.open(persist_location).await.unwrap();
3251
3252        let shard_id = ShardId::new();
3253        let since_handle = persist
3254            .open_critical_since(
3255                shard_id,
3256                PersistClient::CONTROLLER_CRITICAL_SINCE,
3257                Diagnostics::for_tests(),
3258            )
3259            .await
3260            .unwrap();
3261        let write_handle = persist
3262            .open_writer::<SourceData, (), mz_repr::Timestamp, StorageDiff>(
3263                shard_id,
3264                Arc::new(RelationDesc::empty()),
3265                Arc::new(UnitSchema),
3266                Diagnostics::for_tests(),
3267            )
3268            .await
3269            .unwrap();
3270
3271        cmds_tx
3272            .send(BackgroundCmd::Register {
3273                id: GlobalId::User(1),
3274                is_in_txns: false,
3275                since_handle: SinceHandleWrapper::Critical(since_handle),
3276                write_handle,
3277            })
3278            .unwrap();
3279
3280        let mut write_handle = persist
3281            .open_writer::<SourceData, (), mz_repr::Timestamp, StorageDiff>(
3282                shard_id,
3283                Arc::new(RelationDesc::empty()),
3284                Arc::new(UnitSchema),
3285                Diagnostics::for_tests(),
3286            )
3287            .await
3288            .unwrap();
3289
3290        // No stats for unknown GlobalId.
3291        let stats =
3292            snapshot_stats(&cmds_tx, GlobalId::User(2), Antichain::from_elem(0.into())).await;
3293        assert_err!(stats);
3294
3295        // Stats don't resolve for as_of past the upper.
3296        let stats_fut = snapshot_stats(&cmds_tx, GlobalId::User(1), Antichain::from_elem(1.into()));
3297        assert_none!(stats_fut.now_or_never());
3298
3299        // // Call it again because now_or_never consumed our future and it's not clone-able.
3300        let stats_ts1_fut =
3301            snapshot_stats(&cmds_tx, GlobalId::User(1), Antichain::from_elem(1.into()));
3302
3303        // Write some data.
3304        let data = (
3305            (SourceData(Ok(Row::default())), ()),
3306            mz_repr::Timestamp::from(0),
3307            1i64,
3308        );
3309        let () = write_handle
3310            .compare_and_append(
3311                &[data],
3312                Antichain::from_elem(0.into()),
3313                Antichain::from_elem(1.into()),
3314            )
3315            .await
3316            .unwrap()
3317            .unwrap();
3318
3319        // Verify that we can resolve stats for ts 0 while the ts 1 stats call is outstanding.
3320        let stats = snapshot_stats(&cmds_tx, GlobalId::User(1), Antichain::from_elem(0.into()))
3321            .await
3322            .unwrap();
3323        assert_eq!(stats.num_updates, 1);
3324
3325        // Write more data and unblock the ts 1 call
3326        let data = (
3327            (SourceData(Ok(Row::default())), ()),
3328            mz_repr::Timestamp::from(1),
3329            1i64,
3330        );
3331        let () = write_handle
3332            .compare_and_append(
3333                &[data],
3334                Antichain::from_elem(1.into()),
3335                Antichain::from_elem(2.into()),
3336            )
3337            .await
3338            .unwrap()
3339            .unwrap();
3340
3341        let stats = stats_ts1_fut.await.unwrap();
3342        assert_eq!(stats.num_updates, 2);
3343
3344        // Make sure it runs until at least here.
3345        drop(background_task);
3346    }
3347
3348    async fn snapshot_stats<T: TimelyTimestamp + Lattice + Codec64>(
3349        cmds_tx: &mpsc::UnboundedSender<BackgroundCmd<T>>,
3350        id: GlobalId,
3351        as_of: Antichain<T>,
3352    ) -> Result<SnapshotStats, StorageError<T>> {
3353        let (tx, rx) = oneshot::channel();
3354        cmds_tx
3355            .send(BackgroundCmd::SnapshotStats(
3356                id,
3357                SnapshotStatsAsOf::Direct(as_of),
3358                tx,
3359            ))
3360            .unwrap();
3361        let res = rx.await.expect("BackgroundTask should be live").0;
3362
3363        res.await
3364    }
3365
3366    impl<T: TimelyTimestamp + Lattice + Codec64> BackgroundTask<T> {
3367        fn new_for_test(
3368            _persist_location: PersistLocation,
3369            _persist_client: Arc<PersistClientCache>,
3370        ) -> (mpsc::UnboundedSender<BackgroundCmd<T>>, Self) {
3371            let (cmds_tx, cmds_rx) = mpsc::unbounded_channel();
3372            let (_holds_tx, holds_rx) = mpsc::unbounded_channel();
3373            let connection_context =
3374                ConnectionContext::for_tests(Arc::new(InMemorySecretsController::new()));
3375
3376            let task = Self {
3377                config: Arc::new(Mutex::new(StorageConfiguration::new(
3378                    connection_context,
3379                    ConfigSet::default(),
3380                ))),
3381                cmds_tx: cmds_tx.clone(),
3382                cmds_rx,
3383                holds_rx,
3384                finalizable_shards: Arc::new(ShardIdSet::new(
3385                    UIntGauge::new("finalizable_shards", "dummy gauge for tests").unwrap(),
3386                )),
3387                collections: Arc::new(Mutex::new(BTreeMap::new())),
3388                shard_by_id: BTreeMap::new(),
3389                since_handles: BTreeMap::new(),
3390                txns_handle: None,
3391                txns_shards: BTreeSet::new(),
3392            };
3393
3394            (cmds_tx, task)
3395        }
3396    }
3397}