mz_storage_client/
storage_collections.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! An abstraction for dealing with storage collections.
11
12use std::cmp::Reverse;
13use std::collections::{BTreeMap, BTreeSet};
14use std::fmt::Debug;
15use std::num::NonZeroI64;
16use std::sync::{Arc, Mutex};
17use std::time::Duration;
18
19use async_trait::async_trait;
20use differential_dataflow::lattice::Lattice;
21use futures::future::BoxFuture;
22use futures::stream::{BoxStream, FuturesUnordered};
23use futures::{Future, FutureExt, StreamExt};
24use itertools::Itertools;
25
26use mz_ore::collections::CollectionExt;
27use mz_ore::metrics::MetricsRegistry;
28use mz_ore::now::{EpochMillis, NowFn};
29use mz_ore::task::AbortOnDropHandle;
30use mz_ore::{assert_none, instrument, soft_assert_or_log};
31use mz_persist_client::cache::PersistClientCache;
32use mz_persist_client::cfg::USE_CRITICAL_SINCE_SNAPSHOT;
33use mz_persist_client::critical::SinceHandle;
34use mz_persist_client::read::{Cursor, ReadHandle};
35use mz_persist_client::schema::CaESchema;
36use mz_persist_client::stats::{SnapshotPartsStats, SnapshotStats};
37use mz_persist_client::write::WriteHandle;
38use mz_persist_client::{Diagnostics, PersistClient, PersistLocation, ShardId};
39use mz_persist_types::Codec64;
40use mz_persist_types::codec_impls::UnitSchema;
41use mz_persist_types::txn::TxnsCodec;
42use mz_repr::{GlobalId, RelationDesc, RelationVersion, Row, TimestampManipulation};
43use mz_storage_types::StorageDiff;
44use mz_storage_types::configuration::StorageConfiguration;
45use mz_storage_types::connections::ConnectionContext;
46use mz_storage_types::connections::inline::InlinedConnection;
47use mz_storage_types::controller::{CollectionMetadata, StorageError, TxnsCodecRow};
48use mz_storage_types::dyncfgs::STORAGE_DOWNGRADE_SINCE_DURING_FINALIZATION;
49use mz_storage_types::parameters::StorageParameters;
50use mz_storage_types::read_holds::{ReadHold, ReadHoldError};
51use mz_storage_types::read_policy::ReadPolicy;
52use mz_storage_types::sources::{
53    GenericSourceConnection, SourceData, SourceDesc, SourceEnvelope, SourceExport,
54    SourceExportDataConfig, Timeline,
55};
56use mz_storage_types::time_dependence::{TimeDependence, TimeDependenceError};
57use mz_txn_wal::metrics::Metrics as TxnMetrics;
58use mz_txn_wal::txn_read::{DataSnapshot, TxnsRead};
59use mz_txn_wal::txns::TxnsHandle;
60use timely::PartialOrder;
61use timely::order::TotalOrder;
62use timely::progress::frontier::MutableAntichain;
63use timely::progress::{Antichain, ChangeBatch, Timestamp as TimelyTimestamp};
64use tokio::sync::{mpsc, oneshot};
65use tokio::time::MissedTickBehavior;
66use tracing::{debug, info, trace, warn};
67
68use crate::client::TimestamplessUpdateBuilder;
69use crate::controller::{
70    CollectionDescription, DataSource, PersistEpoch, StorageMetadata, StorageTxn,
71};
72use crate::storage_collections::metrics::{ShardIdSet, StorageCollectionsMetrics};
73
74mod metrics;
75
76/// An abstraction for keeping track of storage collections and managing access
77/// to them.
78///
79/// Responsibilities:
80///
81/// - Keeps a critical persist handle for holding the since of collections
82///   where it need to be.
83///
84/// - Drives the since forward based on the upper of a collection and a
85///   [ReadPolicy].
86///
87/// - Hands out [ReadHolds](ReadHold) that prevent a collection's since from
88/// advancing while it needs to be read at a specific time.
89#[async_trait]
90pub trait StorageCollections: Debug {
91    type Timestamp: TimelyTimestamp;
92
93    /// On boot, reconcile this [StorageCollections] with outside state. We get
94    /// a [StorageTxn] where we can record any durable state that we need.
95    ///
96    /// We get `init_ids`, which tells us about all collections that currently
97    /// exist, so that we can record durable state for those that _we_ don't
98    /// know yet about.
99    async fn initialize_state(
100        &self,
101        txn: &mut (dyn StorageTxn<Self::Timestamp> + Send),
102        init_ids: BTreeSet<GlobalId>,
103    ) -> Result<(), StorageError<Self::Timestamp>>;
104
105    /// Update storage configuration with new parameters.
106    fn update_parameters(&self, config_params: StorageParameters);
107
108    /// Returns the [CollectionMetadata] of the collection identified by `id`.
109    fn collection_metadata(
110        &self,
111        id: GlobalId,
112    ) -> Result<CollectionMetadata, StorageError<Self::Timestamp>>;
113
114    /// Acquire an iterator over [CollectionMetadata] for all active
115    /// collections.
116    ///
117    /// A collection is "active" when it has a non empty frontier of read
118    /// capabilties.
119    fn active_collection_metadatas(&self) -> Vec<(GlobalId, CollectionMetadata)>;
120
121    /// Returns the frontiers of the identified collection.
122    fn collection_frontiers(
123        &self,
124        id: GlobalId,
125    ) -> Result<CollectionFrontiers<Self::Timestamp>, StorageError<Self::Timestamp>> {
126        let frontiers = self
127            .collections_frontiers(vec![id])?
128            .expect_element(|| "known to exist");
129
130        Ok(frontiers)
131    }
132
133    /// Atomically gets and returns the frontiers of all the identified
134    /// collections.
135    fn collections_frontiers(
136        &self,
137        id: Vec<GlobalId>,
138    ) -> Result<Vec<CollectionFrontiers<Self::Timestamp>>, StorageError<Self::Timestamp>>;
139
140    /// Atomically gets and returns the frontiers of all active collections.
141    ///
142    /// A collection is "active" when it has a non empty frontier of read
143    /// capabilties.
144    fn active_collection_frontiers(&self) -> Vec<CollectionFrontiers<Self::Timestamp>>;
145
146    /// Checks whether a collection exists under the given `GlobalId`. Returns
147    /// an error if the collection does not exist.
148    fn check_exists(&self, id: GlobalId) -> Result<(), StorageError<Self::Timestamp>>;
149
150    /// Returns aggregate statistics about the contents of the local input named
151    /// `id` at `as_of`.
152    async fn snapshot_stats(
153        &self,
154        id: GlobalId,
155        as_of: Antichain<Self::Timestamp>,
156    ) -> Result<SnapshotStats, StorageError<Self::Timestamp>>;
157
158    /// Returns aggregate statistics about the contents of the local input named
159    /// `id` at `as_of`.
160    ///
161    /// Note that this async function itself returns a future. We may
162    /// need to block on the stats being available, but don't want to hold a reference
163    /// to the controller for too long... so the outer future holds a reference to the
164    /// controller but returns quickly, and the inner future is slow but does not
165    /// reference the controller.
166    async fn snapshot_parts_stats(
167        &self,
168        id: GlobalId,
169        as_of: Antichain<Self::Timestamp>,
170    ) -> BoxFuture<'static, Result<SnapshotPartsStats, StorageError<Self::Timestamp>>>;
171
172    /// Returns a snapshot of the contents of collection `id` at `as_of`.
173    fn snapshot(
174        &self,
175        id: GlobalId,
176        as_of: Self::Timestamp,
177    ) -> BoxFuture<'static, Result<Vec<(Row, StorageDiff)>, StorageError<Self::Timestamp>>>;
178
179    /// Returns a snapshot of the contents of collection `id` at the largest
180    /// readable `as_of`.
181    async fn snapshot_latest(
182        &self,
183        id: GlobalId,
184    ) -> Result<Vec<Row>, StorageError<Self::Timestamp>>;
185
186    /// Returns a snapshot of the contents of collection `id` at `as_of`.
187    fn snapshot_cursor(
188        &self,
189        id: GlobalId,
190        as_of: Self::Timestamp,
191    ) -> BoxFuture<'static, Result<SnapshotCursor<Self::Timestamp>, StorageError<Self::Timestamp>>>
192    where
193        Self::Timestamp: Codec64 + TimelyTimestamp + Lattice;
194
195    /// Generates a snapshot of the contents of collection `id` at `as_of` and
196    /// streams out all of the updates in bounded memory.
197    ///
198    /// The output is __not__ consolidated.
199    fn snapshot_and_stream(
200        &self,
201        id: GlobalId,
202        as_of: Self::Timestamp,
203    ) -> BoxFuture<
204        'static,
205        Result<
206            BoxStream<'static, (SourceData, Self::Timestamp, StorageDiff)>,
207            StorageError<Self::Timestamp>,
208        >,
209    >;
210
211    /// Create a [`TimestamplessUpdateBuilder`] that can be used to stage
212    /// updates for the provided [`GlobalId`].
213    fn create_update_builder(
214        &self,
215        id: GlobalId,
216    ) -> BoxFuture<
217        'static,
218        Result<
219            TimestamplessUpdateBuilder<SourceData, (), Self::Timestamp, StorageDiff>,
220            StorageError<Self::Timestamp>,
221        >,
222    >
223    where
224        Self::Timestamp: Lattice + Codec64;
225
226    /// Update the given [`StorageTxn`] with the appropriate metadata given the
227    /// IDs to add and drop.
228    ///
229    /// The data modified in the `StorageTxn` must be made available in all
230    /// subsequent calls that require [`StorageMetadata`] as a parameter.
231    async fn prepare_state(
232        &self,
233        txn: &mut (dyn StorageTxn<Self::Timestamp> + Send),
234        ids_to_add: BTreeSet<GlobalId>,
235        ids_to_drop: BTreeSet<GlobalId>,
236        ids_to_register: BTreeMap<GlobalId, ShardId>,
237    ) -> Result<(), StorageError<Self::Timestamp>>;
238
239    /// Create the collections described by the individual
240    /// [CollectionDescriptions](CollectionDescription).
241    ///
242    /// Each command carries the source id, the source description, and any
243    /// associated metadata needed to ingest the particular source.
244    ///
245    /// This command installs collection state for the indicated sources, and
246    /// they are now valid to use in queries at times beyond the initial `since`
247    /// frontiers. Each collection also acquires a read capability at this
248    /// frontier, which will need to be repeatedly downgraded with
249    /// `allow_compaction()` to permit compaction.
250    ///
251    /// This method is NOT idempotent; It can fail between processing of
252    /// different collections and leave the [StorageCollections] in an
253    /// inconsistent state. It is almost always wrong to do anything but abort
254    /// the process on `Err`.
255    ///
256    /// The `register_ts` is used as the initial timestamp that tables are
257    /// available for reads. (We might later give non-tables the same treatment,
258    /// but hold off on that initially.) Callers must provide a Some if any of
259    /// the collections is a table. A None may be given if none of the
260    /// collections are a table (i.e. all materialized views, sources, etc).
261    ///
262    /// `migrated_storage_collections` is a set of migrated storage collections to be excluded
263    /// from the txn-wal sub-system.
264    async fn create_collections_for_bootstrap(
265        &self,
266        storage_metadata: &StorageMetadata,
267        register_ts: Option<Self::Timestamp>,
268        collections: Vec<(GlobalId, CollectionDescription<Self::Timestamp>)>,
269        migrated_storage_collections: &BTreeSet<GlobalId>,
270    ) -> Result<(), StorageError<Self::Timestamp>>;
271
272    /// Alters the identified ingestion to use the provided [`SourceDesc`].
273    ///
274    /// NOTE: Ideally, [StorageCollections] would not care about these, but we
275    /// have to learn about changes such that when new subsources are created we
276    /// can correctly determine a since based on its depenencies' sinces. This
277    /// is really only relevant because newly created subsources depend on the
278    /// remap shard, and we can't just have them start at since 0.
279    async fn alter_ingestion_source_desc(
280        &self,
281        ingestion_id: GlobalId,
282        source_desc: SourceDesc,
283    ) -> Result<(), StorageError<Self::Timestamp>>;
284
285    /// Alters the data config for the specified source exports of the specified ingestions.
286    async fn alter_ingestion_export_data_configs(
287        &self,
288        source_exports: BTreeMap<GlobalId, SourceExportDataConfig>,
289    ) -> Result<(), StorageError<Self::Timestamp>>;
290
291    /// Alters each identified collection to use the correlated
292    /// [`GenericSourceConnection`].
293    ///
294    /// See NOTE on [StorageCollections::alter_ingestion_source_desc].
295    async fn alter_ingestion_connections(
296        &self,
297        source_connections: BTreeMap<GlobalId, GenericSourceConnection<InlinedConnection>>,
298    ) -> Result<(), StorageError<Self::Timestamp>>;
299
300    /// Updates the [`RelationDesc`] for the specified table.
301    async fn alter_table_desc(
302        &self,
303        existing_collection: GlobalId,
304        new_collection: GlobalId,
305        new_desc: RelationDesc,
306        expected_version: RelationVersion,
307    ) -> Result<(), StorageError<Self::Timestamp>>;
308
309    /// Drops the read capability for the sources and allows their resources to
310    /// be reclaimed.
311    ///
312    /// TODO(jkosh44): This method does not validate the provided identifiers.
313    /// Currently when the controller starts/restarts it has no durable state.
314    /// That means that it has no way of remembering any past commands sent. In
315    /// the future we plan on persisting state for the controller so that it is
316    /// aware of past commands. Therefore this method is for dropping sources
317    /// that we know to have been previously created, but have been forgotten by
318    /// the controller due to a restart. Once command history becomes durable we
319    /// can remove this method and use the normal `drop_sources`.
320    fn drop_collections_unvalidated(
321        &self,
322        storage_metadata: &StorageMetadata,
323        identifiers: Vec<GlobalId>,
324    );
325
326    /// Assigns a read policy to specific identifiers.
327    ///
328    /// The policies are assigned in the order presented, and repeated
329    /// identifiers should conclude with the last policy. Changing a policy will
330    /// immediately downgrade the read capability if appropriate, but it will
331    /// not "recover" the read capability if the prior capability is already
332    /// ahead of it.
333    ///
334    /// This [StorageCollections] may include its own overrides on these
335    /// policies.
336    ///
337    /// Identifiers not present in `policies` retain their existing read
338    /// policies.
339    fn set_read_policies(&self, policies: Vec<(GlobalId, ReadPolicy<Self::Timestamp>)>);
340
341    /// Acquires and returns the earliest possible read holds for the specified
342    /// collections.
343    fn acquire_read_holds(
344        &self,
345        desired_holds: Vec<GlobalId>,
346    ) -> Result<Vec<ReadHold<Self::Timestamp>>, ReadHoldError>;
347
348    /// Get the time dependence for a storage collection. Returns no value if unknown or if
349    /// the object isn't managed by storage.
350    fn determine_time_dependence(
351        &self,
352        id: GlobalId,
353    ) -> Result<Option<TimeDependence>, TimeDependenceError>;
354}
355
356/// A cursor over a snapshot, allowing us to read just part of a snapshot in its
357/// consolidated form.
358pub struct SnapshotCursor<T: Codec64 + TimelyTimestamp + Lattice> {
359    // We allocate a temporary read handle for each snapshot, and that handle needs to live at
360    // least as long as the cursor itself, which holds part leases. Bundling them together!
361    pub _read_handle: ReadHandle<SourceData, (), T, StorageDiff>,
362    pub cursor: Cursor<SourceData, (), T, StorageDiff>,
363}
364
365impl<T: Codec64 + TimelyTimestamp + Lattice + Sync> SnapshotCursor<T> {
366    pub async fn next(
367        &mut self,
368    ) -> Option<
369        impl Iterator<
370            Item = (
371                (Result<SourceData, String>, Result<(), String>),
372                T,
373                StorageDiff,
374            ),
375        > + Sized
376        + '_,
377    > {
378        self.cursor.next().await
379    }
380}
381
382/// Frontiers of the collection identified by `id`.
383#[derive(Debug)]
384pub struct CollectionFrontiers<T> {
385    /// The [GlobalId] of the collection that these frontiers belong to.
386    pub id: GlobalId,
387
388    /// The upper/write frontier of the collection.
389    pub write_frontier: Antichain<T>,
390
391    /// The since frontier that is implied by the collection's existence,
392    /// disregarding any read holds.
393    ///
394    /// Concretely, it is the since frontier that is implied by the combination
395    /// of the `write_frontier` and a [ReadPolicy]. The implied capability is
396    /// derived from the write frontier using the [ReadPolicy].
397    pub implied_capability: Antichain<T>,
398
399    /// The frontier of all oustanding [ReadHolds](ReadHold). This includes the
400    /// implied capability.
401    pub read_capabilities: Antichain<T>,
402}
403
404/// Implementation of [StorageCollections] that is shallow-cloneable and uses a
405/// background task for doing work concurrently, in the background.
406#[derive(Debug, Clone)]
407pub struct StorageCollectionsImpl<
408    T: TimelyTimestamp + Lattice + Codec64 + From<EpochMillis> + TimestampManipulation,
409> {
410    /// The fencing token for this instance of [StorageCollections], and really
411    /// all of the controllers and Coordinator.
412    envd_epoch: NonZeroI64,
413
414    /// Whether or not this [StorageCollections] is in read-only mode.
415    ///
416    /// When in read-only mode, we are not allowed to affect changes to external
417    /// systems, including, for example, acquiring and downgrading critical
418    /// [SinceHandles](SinceHandle)
419    read_only: bool,
420
421    /// The set of [ShardIds](ShardId) that we have to finalize. These will have
422    /// been persisted by the caller of [StorageCollections::prepare_state].
423    finalizable_shards: Arc<ShardIdSet>,
424
425    /// The set of [ShardIds](ShardId) that we have finalized. We keep track of
426    /// shards here until we are given a chance to let our callers know that
427    /// these have been finalized, for example via
428    /// [StorageCollections::prepare_state].
429    finalized_shards: Arc<ShardIdSet>,
430
431    /// Collections maintained by this [StorageCollections].
432    collections: Arc<std::sync::Mutex<BTreeMap<GlobalId, CollectionState<T>>>>,
433
434    /// A shared TxnsCache running in a task and communicated with over a channel.
435    txns_read: TxnsRead<T>,
436
437    /// Storage configuration parameters.
438    config: Arc<Mutex<StorageConfiguration>>,
439
440    /// The upper of the txn shard as it was when we booted. We forward the
441    /// upper of created/registered tables to make sure that their uppers are at
442    /// least not less than the initially known txn upper.
443    ///
444    /// NOTE: This works around a quirk in how the adapter chooses the as_of of
445    /// existing indexes when bootstrapping, where tables that have an upper
446    /// that is less than the initially known txn upper can lead to indexes that
447    /// cannot hydrate in read-only mode.
448    initial_txn_upper: Antichain<T>,
449
450    /// The persist location where all storage collections are being written to
451    persist_location: PersistLocation,
452
453    /// A persist client used to write to storage collections
454    persist: Arc<PersistClientCache>,
455
456    /// For sending commands to our internal task.
457    cmd_tx: mpsc::UnboundedSender<BackgroundCmd<T>>,
458
459    /// For sending updates about read holds to our internal task.
460    holds_tx: mpsc::UnboundedSender<(GlobalId, ChangeBatch<T>)>,
461
462    /// Handles to tasks we own, making sure they're dropped when we are.
463    _background_task: Arc<AbortOnDropHandle<()>>,
464    _finalize_shards_task: Arc<AbortOnDropHandle<()>>,
465}
466
467// Supporting methods for implementing [StorageCollections].
468//
469// Almost all internal methods that are the backing implementation for a trait
470// method have the `_inner` suffix.
471//
472// We follow a pattern where `_inner` methods get a mutable reference to the
473// shared collections state, and it's the public-facing method that locks the
474// state for the duration of its invocation. This allows calling other `_inner`
475// methods from within `_inner` methods.
476impl<T> StorageCollectionsImpl<T>
477where
478    T: TimelyTimestamp
479        + Lattice
480        + Codec64
481        + From<EpochMillis>
482        + TimestampManipulation
483        + Into<mz_repr::Timestamp>
484        + Sync,
485{
486    /// Creates and returns a new [StorageCollections].
487    ///
488    /// Note that when creating a new [StorageCollections], you must also
489    /// reconcile it with the previous state using
490    /// [StorageCollections::initialize_state],
491    /// [StorageCollections::prepare_state], and
492    /// [StorageCollections::create_collections_for_bootstrap].
493    pub async fn new(
494        persist_location: PersistLocation,
495        persist_clients: Arc<PersistClientCache>,
496        metrics_registry: &MetricsRegistry,
497        _now: NowFn,
498        txns_metrics: Arc<TxnMetrics>,
499        envd_epoch: NonZeroI64,
500        read_only: bool,
501        connection_context: ConnectionContext,
502        txn: &dyn StorageTxn<T>,
503    ) -> Self {
504        let metrics = StorageCollectionsMetrics::register_into(metrics_registry);
505
506        // This value must be already installed because we must ensure it's
507        // durably recorded before it is used, otherwise we risk leaking persist
508        // state.
509        let txns_id = txn
510            .get_txn_wal_shard()
511            .expect("must call prepare initialization before creating StorageCollections");
512
513        let txns_client = persist_clients
514            .open(persist_location.clone())
515            .await
516            .expect("location should be valid");
517
518        // We have to initialize, so that TxnsRead::start() below does not
519        // block.
520        let _txns_handle: TxnsHandle<SourceData, (), T, StorageDiff, PersistEpoch, TxnsCodecRow> =
521            TxnsHandle::open(
522                T::minimum(),
523                txns_client.clone(),
524                txns_client.dyncfgs().clone(),
525                Arc::clone(&txns_metrics),
526                txns_id,
527            )
528            .await;
529
530        // For handing to the background task, for listening to upper updates.
531        let (txns_key_schema, txns_val_schema) = TxnsCodecRow::schemas();
532        let mut txns_write = txns_client
533            .open_writer(
534                txns_id,
535                Arc::new(txns_key_schema),
536                Arc::new(txns_val_schema),
537                Diagnostics {
538                    shard_name: "txns".to_owned(),
539                    handle_purpose: "commit txns".to_owned(),
540                },
541            )
542            .await
543            .expect("txns schema shouldn't change");
544
545        let txns_read = TxnsRead::start::<TxnsCodecRow>(txns_client.clone(), txns_id).await;
546
547        let collections = Arc::new(std::sync::Mutex::new(BTreeMap::default()));
548        let finalizable_shards =
549            Arc::new(ShardIdSet::new(metrics.finalization_outstanding.clone()));
550        let finalized_shards =
551            Arc::new(ShardIdSet::new(metrics.finalization_pending_commit.clone()));
552        let config = Arc::new(Mutex::new(StorageConfiguration::new(
553            connection_context,
554            mz_dyncfgs::all_dyncfgs(),
555        )));
556
557        let initial_txn_upper = txns_write.fetch_recent_upper().await.to_owned();
558
559        let (cmd_tx, cmd_rx) = mpsc::unbounded_channel();
560        let (holds_tx, holds_rx) = mpsc::unbounded_channel();
561        let mut background_task = BackgroundTask {
562            config: Arc::clone(&config),
563            cmds_tx: cmd_tx.clone(),
564            cmds_rx: cmd_rx,
565            holds_rx,
566            collections: Arc::clone(&collections),
567            finalizable_shards: Arc::clone(&finalizable_shards),
568            shard_by_id: BTreeMap::new(),
569            since_handles: BTreeMap::new(),
570            txns_handle: Some(txns_write),
571            txns_shards: Default::default(),
572        };
573
574        let background_task =
575            mz_ore::task::spawn(|| "storage_collections::background_task", async move {
576                background_task.run().await
577            });
578
579        let finalize_shards_task = mz_ore::task::spawn(
580            || "storage_collections::finalize_shards_task",
581            finalize_shards_task::<T>(FinalizeShardsTaskConfig {
582                envd_epoch: envd_epoch.clone(),
583                config: Arc::clone(&config),
584                metrics,
585                finalizable_shards: Arc::clone(&finalizable_shards),
586                finalized_shards: Arc::clone(&finalized_shards),
587                persist_location: persist_location.clone(),
588                persist: Arc::clone(&persist_clients),
589                read_only,
590            }),
591        );
592
593        Self {
594            finalizable_shards,
595            finalized_shards,
596            collections,
597            txns_read,
598            envd_epoch,
599            read_only,
600            config,
601            initial_txn_upper,
602            persist_location,
603            persist: persist_clients,
604            cmd_tx,
605            holds_tx,
606            _background_task: Arc::new(background_task.abort_on_drop()),
607            _finalize_shards_task: Arc::new(finalize_shards_task.abort_on_drop()),
608        }
609    }
610
611    /// Opens a [WriteHandle] and a [SinceHandleWrapper], for holding back the since.
612    ///
613    /// `since` is an optional since that the read handle will be forwarded to
614    /// if it is less than its current since.
615    ///
616    /// This will `halt!` the process if we cannot successfully acquire a
617    /// critical handle with our current epoch.
618    async fn open_data_handles(
619        &self,
620        id: &GlobalId,
621        shard: ShardId,
622        since: Option<&Antichain<T>>,
623        relation_desc: RelationDesc,
624        persist_client: &PersistClient,
625    ) -> (
626        WriteHandle<SourceData, (), T, StorageDiff>,
627        SinceHandleWrapper<T>,
628    ) {
629        let since_handle = if self.read_only {
630            let read_handle = self
631                .open_leased_handle(id, shard, relation_desc.clone(), since, persist_client)
632                .await;
633            SinceHandleWrapper::Leased(read_handle)
634        } else {
635            let since_handle = self
636                .open_critical_handle(id, shard, since, persist_client)
637                .await;
638
639            SinceHandleWrapper::Critical(since_handle)
640        };
641
642        let mut write_handle = self
643            .open_write_handle(id, shard, relation_desc, persist_client)
644            .await;
645
646        // N.B.
647        // Fetch the most recent upper for the write handle. Otherwise, this may
648        // be behind the since of the since handle. Its vital this happens AFTER
649        // we create the since handle as it needs to be linearized with that
650        // operation. It may be true that creating the write handle after the
651        // since handle already ensures this, but we do this out of an abundance
652        // of caution.
653        //
654        // Note that this returns the upper, but also sets it on the handle to
655        // be fetched later.
656        write_handle.fetch_recent_upper().await;
657
658        (write_handle, since_handle)
659    }
660
661    /// Opens a write handle for the given `shard`.
662    async fn open_write_handle(
663        &self,
664        id: &GlobalId,
665        shard: ShardId,
666        relation_desc: RelationDesc,
667        persist_client: &PersistClient,
668    ) -> WriteHandle<SourceData, (), T, StorageDiff> {
669        let diagnostics = Diagnostics {
670            shard_name: id.to_string(),
671            handle_purpose: format!("controller data for {}", id),
672        };
673
674        let write = persist_client
675            .open_writer(
676                shard,
677                Arc::new(relation_desc),
678                Arc::new(UnitSchema),
679                diagnostics.clone(),
680            )
681            .await
682            .expect("invalid persist usage");
683
684        write
685    }
686
687    /// Opens a critical since handle for the given `shard`.
688    ///
689    /// `since` is an optional since that the read handle will be forwarded to
690    /// if it is less than its current since.
691    ///
692    /// This will `halt!` the process if we cannot successfully acquire a
693    /// critical handle with our current epoch.
694    async fn open_critical_handle(
695        &self,
696        id: &GlobalId,
697        shard: ShardId,
698        since: Option<&Antichain<T>>,
699        persist_client: &PersistClient,
700    ) -> SinceHandle<SourceData, (), T, StorageDiff, PersistEpoch> {
701        tracing::debug!(%id, ?since, "opening critical handle");
702
703        assert!(
704            !self.read_only,
705            "attempting to open critical SinceHandle in read-only mode"
706        );
707
708        let diagnostics = Diagnostics {
709            shard_name: id.to_string(),
710            handle_purpose: format!("controller data for {}", id),
711        };
712
713        // Construct the handle in a separate block to ensure all error paths
714        // are diverging
715        let since_handle = {
716            // This block's aim is to ensure the handle is in terms of our epoch
717            // by the time we return it.
718            let mut handle: SinceHandle<_, _, _, _, PersistEpoch> = persist_client
719                .open_critical_since(
720                    shard,
721                    PersistClient::CONTROLLER_CRITICAL_SINCE,
722                    diagnostics.clone(),
723                )
724                .await
725                .expect("invalid persist usage");
726
727            // Take the join of the handle's since and the provided `since`;
728            // this lets materialized views express the since at which their
729            // read handles "start."
730            let provided_since = match since {
731                Some(since) => since,
732                None => &Antichain::from_elem(T::minimum()),
733            };
734            let since = handle.since().join(provided_since);
735
736            let our_epoch = self.envd_epoch;
737
738            loop {
739                let current_epoch: PersistEpoch = handle.opaque().clone();
740
741                // Ensure the current epoch is <= our epoch.
742                let unchecked_success = current_epoch.0.map(|e| e <= our_epoch).unwrap_or(true);
743
744                if unchecked_success {
745                    // Update the handle's state so that it is in terms of our
746                    // epoch.
747                    let checked_success = handle
748                        .compare_and_downgrade_since(
749                            &current_epoch,
750                            (&PersistEpoch::from(our_epoch), &since),
751                        )
752                        .await
753                        .is_ok();
754                    if checked_success {
755                        break handle;
756                    }
757                } else {
758                    mz_ore::halt!("fenced by envd @ {current_epoch:?}. ours = {our_epoch}");
759                }
760            }
761        };
762
763        since_handle
764    }
765
766    /// Opens a leased [ReadHandle], for the purpose of holding back a since,
767    /// for the given `shard`.
768    ///
769    /// `since` is an optional since that the read handle will be forwarded to
770    /// if it is less than its current since.
771    async fn open_leased_handle(
772        &self,
773        id: &GlobalId,
774        shard: ShardId,
775        relation_desc: RelationDesc,
776        since: Option<&Antichain<T>>,
777        persist_client: &PersistClient,
778    ) -> ReadHandle<SourceData, (), T, StorageDiff> {
779        tracing::debug!(%id, ?since, "opening leased handle");
780
781        let diagnostics = Diagnostics {
782            shard_name: id.to_string(),
783            handle_purpose: format!("controller data for {}", id),
784        };
785
786        let use_critical_since = false;
787        let mut handle: ReadHandle<_, _, _, _> = persist_client
788            .open_leased_reader(
789                shard,
790                Arc::new(relation_desc),
791                Arc::new(UnitSchema),
792                diagnostics.clone(),
793                use_critical_since,
794            )
795            .await
796            .expect("invalid persist usage");
797
798        // Take the join of the handle's since and the provided `since`;
799        // this lets materialized views express the since at which their
800        // read handles "start."
801        let provided_since = match since {
802            Some(since) => since,
803            None => &Antichain::from_elem(T::minimum()),
804        };
805        let since = handle.since().join(provided_since);
806
807        handle.downgrade_since(&since).await;
808
809        handle
810    }
811
812    fn register_handles(
813        &self,
814        id: GlobalId,
815        is_in_txns: bool,
816        since_handle: SinceHandleWrapper<T>,
817        write_handle: WriteHandle<SourceData, (), T, StorageDiff>,
818    ) {
819        self.send(BackgroundCmd::Register {
820            id,
821            is_in_txns,
822            since_handle,
823            write_handle,
824        });
825    }
826
827    fn send(&self, cmd: BackgroundCmd<T>) {
828        let _ = self.cmd_tx.send(cmd);
829    }
830
831    async fn snapshot_stats_inner(
832        &self,
833        id: GlobalId,
834        as_of: SnapshotStatsAsOf<T>,
835    ) -> Result<SnapshotStats, StorageError<T>> {
836        // TODO: Pull this out of BackgroundTask. Unlike the other methods, the
837        // caller of this one drives it to completion.
838        //
839        // We'd need to either share the critical handle somehow or maybe have
840        // two instances around, one in the worker and one in the
841        // StorageCollections.
842        let (tx, rx) = oneshot::channel();
843        self.send(BackgroundCmd::SnapshotStats(id, as_of, tx));
844        rx.await.expect("BackgroundTask should be live").0.await
845    }
846
847    /// If this identified collection has a dependency, install a read hold on
848    /// it.
849    ///
850    /// This is necessary to ensure that the dependency's since does not advance
851    /// beyond its dependents'.
852    fn install_collection_dependency_read_holds_inner(
853        &self,
854        self_collections: &mut BTreeMap<GlobalId, CollectionState<T>>,
855        id: GlobalId,
856    ) -> Result<(), StorageError<T>> {
857        let (deps, collection_implied_capability) = match self_collections.get(&id) {
858            Some(CollectionState {
859                storage_dependencies: deps,
860                implied_capability,
861                ..
862            }) => (deps.clone(), implied_capability),
863            _ => return Ok(()),
864        };
865
866        for dep in deps.iter() {
867            let dep_collection = self_collections
868                .get(dep)
869                .ok_or(StorageError::IdentifierMissing(id))?;
870
871            mz_ore::soft_assert_or_log!(
872                PartialOrder::less_equal(
873                    &dep_collection.implied_capability,
874                    collection_implied_capability
875                ),
876                "dependency since ({dep}@{:?}) cannot be in advance of dependent's since ({id}@{:?})",
877                dep_collection.implied_capability,
878                collection_implied_capability,
879            );
880        }
881
882        self.install_read_capabilities_inner(
883            self_collections,
884            id,
885            &deps,
886            collection_implied_capability.clone(),
887        )?;
888
889        Ok(())
890    }
891
892    /// Determine if this collection has another dependency.
893    ///
894    /// Currently, collections have either 0 or 1 dependencies.
895    fn determine_collection_dependencies(
896        &self,
897        self_collections: &BTreeMap<GlobalId, CollectionState<T>>,
898        source_id: GlobalId,
899        data_source: &DataSource<T>,
900    ) -> Result<Vec<GlobalId>, StorageError<T>> {
901        let dependencies = match &data_source {
902            DataSource::Introspection(_)
903            | DataSource::Webhook
904            | DataSource::Table { primary: None }
905            | DataSource::Progress
906            | DataSource::Other => Vec::new(),
907            DataSource::Table {
908                primary: Some(primary),
909            } => vec![*primary],
910            DataSource::IngestionExport {
911                ingestion_id,
912                data_config,
913                ..
914            } => {
915                // Ingestion exports depend on their primary source's remap
916                // collection, except when they use a CDCv2 envelope.
917                let source = self_collections
918                    .get(ingestion_id)
919                    .ok_or(StorageError::IdentifierMissing(*ingestion_id))?;
920                let DataSource::Ingestion(ingestion) = &source.description.data_source else {
921                    panic!("SourceExport must refer to a primary source that already exists");
922                };
923
924                match data_config.envelope {
925                    SourceEnvelope::CdcV2 => Vec::new(),
926                    _ => vec![ingestion.remap_collection_id],
927                }
928            }
929            // Ingestions depend on their remap collection.
930            DataSource::Ingestion(ingestion) => {
931                if ingestion.remap_collection_id == source_id {
932                    vec![]
933                } else {
934                    vec![ingestion.remap_collection_id]
935                }
936            }
937            DataSource::Sink { desc } => vec![desc.sink.from],
938        };
939
940        Ok(dependencies)
941    }
942
943    /// Install read capabilities on the given `storage_dependencies`.
944    #[instrument(level = "debug")]
945    fn install_read_capabilities_inner(
946        &self,
947        self_collections: &mut BTreeMap<GlobalId, CollectionState<T>>,
948        from_id: GlobalId,
949        storage_dependencies: &[GlobalId],
950        read_capability: Antichain<T>,
951    ) -> Result<(), StorageError<T>> {
952        let mut changes = ChangeBatch::new();
953        for time in read_capability.iter() {
954            changes.update(time.clone(), 1);
955        }
956
957        if tracing::span_enabled!(tracing::Level::TRACE) {
958            // Collecting `user_capabilities` is potentially slow, thus only do it when needed.
959            let user_capabilities = self_collections
960                .iter_mut()
961                .filter(|(id, _c)| id.is_user())
962                .map(|(id, c)| {
963                    let updates = c.read_capabilities.updates().cloned().collect_vec();
964                    (*id, c.implied_capability.clone(), updates)
965                })
966                .collect_vec();
967
968            trace!(
969                %from_id,
970                ?storage_dependencies,
971                ?read_capability,
972                ?user_capabilities,
973                "install_read_capabilities_inner");
974        }
975
976        let mut storage_read_updates = storage_dependencies
977            .iter()
978            .map(|id| (*id, changes.clone()))
979            .collect();
980
981        StorageCollectionsImpl::update_read_capabilities_inner(
982            &self.cmd_tx,
983            self_collections,
984            &mut storage_read_updates,
985        );
986
987        if tracing::span_enabled!(tracing::Level::TRACE) {
988            // Collecting `user_capabilities` is potentially slow, thus only do it when needed.
989            let user_capabilities = self_collections
990                .iter_mut()
991                .filter(|(id, _c)| id.is_user())
992                .map(|(id, c)| {
993                    let updates = c.read_capabilities.updates().cloned().collect_vec();
994                    (*id, c.implied_capability.clone(), updates)
995                })
996                .collect_vec();
997
998            trace!(
999                %from_id,
1000                ?storage_dependencies,
1001                ?read_capability,
1002                ?user_capabilities,
1003                "after install_read_capabilities_inner!");
1004        }
1005
1006        Ok(())
1007    }
1008
1009    async fn recent_upper(&self, id: GlobalId) -> Result<Antichain<T>, StorageError<T>> {
1010        let metadata = &self.collection_metadata(id)?;
1011        let persist_client = self
1012            .persist
1013            .open(metadata.persist_location.clone())
1014            .await
1015            .unwrap();
1016        // Duplicate part of open_data_handles here because we don't need the
1017        // fetch_recent_upper call. The pubsub-updated shared_upper is enough.
1018        let diagnostics = Diagnostics {
1019            shard_name: id.to_string(),
1020            handle_purpose: format!("controller data for {}", id),
1021        };
1022        // NB: Opening a WriteHandle is cheap if it's never used in a
1023        // compare_and_append operation.
1024        let write = persist_client
1025            .open_writer::<SourceData, (), T, StorageDiff>(
1026                metadata.data_shard,
1027                Arc::new(metadata.relation_desc.clone()),
1028                Arc::new(UnitSchema),
1029                diagnostics.clone(),
1030            )
1031            .await
1032            .expect("invalid persist usage");
1033        Ok(write.shared_upper())
1034    }
1035
1036    async fn read_handle_for_snapshot(
1037        persist: Arc<PersistClientCache>,
1038        metadata: &CollectionMetadata,
1039        id: GlobalId,
1040    ) -> Result<ReadHandle<SourceData, (), T, StorageDiff>, StorageError<T>> {
1041        let persist_client = persist
1042            .open(metadata.persist_location.clone())
1043            .await
1044            .unwrap();
1045
1046        // We create a new read handle every time someone requests a snapshot
1047        // and then immediately expire it instead of keeping a read handle
1048        // permanently in our state to avoid having it heartbeat continually.
1049        // The assumption is that calls to snapshot are rare and therefore worth
1050        // it to always create a new handle.
1051        let read_handle = persist_client
1052            .open_leased_reader::<SourceData, (), _, _>(
1053                metadata.data_shard,
1054                Arc::new(metadata.relation_desc.clone()),
1055                Arc::new(UnitSchema),
1056                Diagnostics {
1057                    shard_name: id.to_string(),
1058                    handle_purpose: format!("snapshot {}", id),
1059                },
1060                USE_CRITICAL_SINCE_SNAPSHOT.get(&persist.cfg),
1061            )
1062            .await
1063            .expect("invalid persist usage");
1064        Ok(read_handle)
1065    }
1066
1067    // TODO(petrosagg): This signature is not very useful in the context of partially ordered times
1068    // where the as_of frontier might have multiple elements. In the current form the mutually
1069    // incomparable updates will be accumulated together to a state of the collection that never
1070    // actually existed. We should include the original time in the updates advanced by the as_of
1071    // frontier in the result and let the caller decide what to do with the information.
1072    fn snapshot(
1073        &self,
1074        id: GlobalId,
1075        as_of: T,
1076        txns_read: &TxnsRead<T>,
1077    ) -> BoxFuture<'static, Result<Vec<(Row, StorageDiff)>, StorageError<T>>>
1078    where
1079        T: Codec64 + From<EpochMillis> + TimestampManipulation,
1080    {
1081        let metadata = match self.collection_metadata(id) {
1082            Ok(metadata) => metadata.clone(),
1083            Err(e) => return async { Err(e) }.boxed(),
1084        };
1085        let txns_read = metadata.txns_shard.as_ref().map(|txns_id| {
1086            assert_eq!(txns_id, txns_read.txns_id());
1087            txns_read.clone()
1088        });
1089        let persist = Arc::clone(&self.persist);
1090        async move {
1091            let mut read_handle = Self::read_handle_for_snapshot(persist, &metadata, id).await?;
1092            let contents = match txns_read {
1093                None => {
1094                    // We're not using txn-wal for tables, so we can take a snapshot directly.
1095                    read_handle
1096                        .snapshot_and_fetch(Antichain::from_elem(as_of))
1097                        .await
1098                }
1099                Some(txns_read) => {
1100                    // We _are_ using txn-wal for tables. It advances the physical upper of the
1101                    // shard lazily, so we need to ask it for the snapshot to ensure the read is
1102                    // unblocked.
1103                    //
1104                    // Consider the following scenario:
1105                    // - Table A is written to via txns at time 5
1106                    // - Tables other than A are written to via txns consuming timestamps up to 10
1107                    // - We'd like to read A at 7
1108                    // - The application process of A's txn has advanced the upper to 5+1, but we need
1109                    //   it to be past 7, but the txns shard knows that (5,10) is empty of writes to A
1110                    // - This branch allows it to handle that advancing the physical upper of Table A to
1111                    //   10 (NB but only once we see it get past the write at 5!)
1112                    // - Then we can read it normally.
1113                    txns_read.update_gt(as_of.clone()).await;
1114                    let data_snapshot = txns_read
1115                        .data_snapshot(metadata.data_shard, as_of.clone())
1116                        .await;
1117                    data_snapshot.snapshot_and_fetch(&mut read_handle).await
1118                }
1119            };
1120            match contents {
1121                Ok(contents) => {
1122                    let mut snapshot = Vec::with_capacity(contents.len());
1123                    for ((data, _), _, diff) in contents {
1124                        // TODO(petrosagg): We should accumulate the errors too and let the user
1125                        // interprret the result
1126                        let row = data.expect("invalid protobuf data").0?;
1127                        snapshot.push((row, diff));
1128                    }
1129                    Ok(snapshot)
1130                }
1131                Err(_) => Err(StorageError::ReadBeforeSince(id)),
1132            }
1133        }
1134        .boxed()
1135    }
1136
1137    fn snapshot_and_stream(
1138        &self,
1139        id: GlobalId,
1140        as_of: T,
1141        txns_read: &TxnsRead<T>,
1142    ) -> BoxFuture<'static, Result<BoxStream<'static, (SourceData, T, StorageDiff)>, StorageError<T>>>
1143    {
1144        use futures::stream::StreamExt;
1145
1146        let metadata = match self.collection_metadata(id) {
1147            Ok(metadata) => metadata.clone(),
1148            Err(e) => return async { Err(e) }.boxed(),
1149        };
1150        let txns_read = metadata.txns_shard.as_ref().map(|txns_id| {
1151            assert_eq!(txns_id, txns_read.txns_id());
1152            txns_read.clone()
1153        });
1154        let persist = Arc::clone(&self.persist);
1155
1156        async move {
1157            let mut read_handle = Self::read_handle_for_snapshot(persist, &metadata, id).await?;
1158            let stream = match txns_read {
1159                None => {
1160                    // We're not using txn-wal for tables, so we can take a snapshot directly.
1161                    read_handle
1162                        .snapshot_and_stream(Antichain::from_elem(as_of))
1163                        .await
1164                        .map_err(|_| StorageError::ReadBeforeSince(id))?
1165                        .boxed()
1166                }
1167                Some(txns_read) => {
1168                    txns_read.update_gt(as_of.clone()).await;
1169                    let data_snapshot = txns_read
1170                        .data_snapshot(metadata.data_shard, as_of.clone())
1171                        .await;
1172                    data_snapshot
1173                        .snapshot_and_stream(&mut read_handle)
1174                        .await
1175                        .map_err(|_| StorageError::ReadBeforeSince(id))?
1176                        .boxed()
1177                }
1178            };
1179
1180            // Map our stream, unwrapping Persist internal errors.
1181            let stream = stream
1182                .map(|((k, _v), t, d)| {
1183                    // TODO(parkmycar): We should accumulate the errors and pass them on to the
1184                    // caller.
1185                    let data = k.expect("error while streaming from Persist");
1186                    (data, t, d)
1187                })
1188                .boxed();
1189            Ok(stream)
1190        }
1191        .boxed()
1192    }
1193
1194    fn set_read_policies_inner(
1195        &self,
1196        collections: &mut BTreeMap<GlobalId, CollectionState<T>>,
1197        policies: Vec<(GlobalId, ReadPolicy<T>)>,
1198    ) {
1199        trace!("set_read_policies: {:?}", policies);
1200
1201        let mut read_capability_changes = BTreeMap::default();
1202
1203        for (id, policy) in policies.into_iter() {
1204            let collection = match collections.get_mut(&id) {
1205                Some(c) => c,
1206                None => {
1207                    panic!("Reference to absent collection {id}");
1208                }
1209            };
1210
1211            let mut new_read_capability = policy.frontier(collection.write_frontier.borrow());
1212
1213            if PartialOrder::less_equal(&collection.implied_capability, &new_read_capability) {
1214                let mut update = ChangeBatch::new();
1215                update.extend(new_read_capability.iter().map(|time| (time.clone(), 1)));
1216                std::mem::swap(&mut collection.implied_capability, &mut new_read_capability);
1217                update.extend(new_read_capability.iter().map(|time| (time.clone(), -1)));
1218                if !update.is_empty() {
1219                    read_capability_changes.insert(id, update);
1220                }
1221            }
1222
1223            collection.read_policy = policy;
1224        }
1225
1226        for (id, changes) in read_capability_changes.iter() {
1227            if id.is_user() {
1228                trace!(%id, ?changes, "in set_read_policies, capability changes");
1229            }
1230        }
1231
1232        if !read_capability_changes.is_empty() {
1233            StorageCollectionsImpl::update_read_capabilities_inner(
1234                &self.cmd_tx,
1235                collections,
1236                &mut read_capability_changes,
1237            );
1238        }
1239    }
1240
1241    // This is not an associated function so that we can share it with the task
1242    // that updates the persist handles and also has a reference to the shared
1243    // collections state.
1244    fn update_read_capabilities_inner(
1245        cmd_tx: &mpsc::UnboundedSender<BackgroundCmd<T>>,
1246        collections: &mut BTreeMap<GlobalId, CollectionState<T>>,
1247        updates: &mut BTreeMap<GlobalId, ChangeBatch<T>>,
1248    ) {
1249        // Location to record consequences that we need to act on.
1250        let mut collections_net = BTreeMap::new();
1251
1252        // We must not rely on any specific relative ordering of `GlobalId`s.
1253        // That said, it is reasonable to assume that collections generally have
1254        // greater IDs than their dependencies, so starting with the largest is
1255        // a useful optimization.
1256        while let Some(id) = updates.keys().rev().next().cloned() {
1257            let mut update = updates.remove(&id).unwrap();
1258
1259            if id.is_user() {
1260                trace!(id = ?id, update = ?update, "update_read_capabilities");
1261            }
1262
1263            let collection = if let Some(c) = collections.get_mut(&id) {
1264                c
1265            } else {
1266                let has_positive_updates = update.iter().any(|(_ts, diff)| *diff > 0);
1267                if has_positive_updates {
1268                    panic!(
1269                        "reference to absent collection {id} but we have positive updates: {:?}",
1270                        update
1271                    );
1272                } else {
1273                    // Continue purely negative updates. Someone has probably
1274                    // already dropped this collection!
1275                    continue;
1276                }
1277            };
1278
1279            let current_read_capabilities = collection.read_capabilities.frontier().to_owned();
1280            for (time, diff) in update.iter() {
1281                assert!(
1282                    collection.read_capabilities.count_for(time) + diff >= 0,
1283                    "update {:?} for collection {id} would lead to negative \
1284                        read capabilities, read capabilities before applying: {:?}",
1285                    update,
1286                    collection.read_capabilities
1287                );
1288
1289                if collection.read_capabilities.count_for(time) + diff > 0 {
1290                    assert!(
1291                        current_read_capabilities.less_equal(time),
1292                        "update {:?} for collection {id} is trying to \
1293                            install read capabilities before the current \
1294                            frontier of read capabilities, read capabilities before applying: {:?}",
1295                        update,
1296                        collection.read_capabilities
1297                    );
1298                }
1299            }
1300
1301            let changes = collection.read_capabilities.update_iter(update.drain());
1302            update.extend(changes);
1303
1304            if id.is_user() {
1305                trace!(
1306                %id,
1307                ?collection.storage_dependencies,
1308                ?update,
1309                "forwarding update to storage dependencies");
1310            }
1311
1312            for id in collection.storage_dependencies.iter() {
1313                updates
1314                    .entry(*id)
1315                    .or_insert_with(ChangeBatch::new)
1316                    .extend(update.iter().cloned());
1317            }
1318
1319            let (changes, frontier) = collections_net
1320                .entry(id)
1321                .or_insert_with(|| (<ChangeBatch<_>>::new(), Antichain::new()));
1322
1323            changes.extend(update.drain());
1324            *frontier = collection.read_capabilities.frontier().to_owned();
1325        }
1326
1327        // Translate our net compute actions into downgrades of persist sinces.
1328        // The actual downgrades are performed by a Tokio task asynchronously.
1329        let mut persist_compaction_commands = Vec::with_capacity(collections_net.len());
1330        for (key, (mut changes, frontier)) in collections_net {
1331            if !changes.is_empty() {
1332                // If the table has a "primary" collection, let that collection drive compaction.
1333                let collection = collections.get(&key).expect("must still exist");
1334                let should_emit_persist_compaction = !matches!(
1335                    collection.description.data_source,
1336                    DataSource::Table { primary: Some(_) }
1337                );
1338
1339                if frontier.is_empty() {
1340                    info!(id = %key, "removing collection state because the since advanced to []!");
1341                    collections.remove(&key).expect("must still exist");
1342                }
1343
1344                if should_emit_persist_compaction {
1345                    persist_compaction_commands.push((key, frontier));
1346                }
1347            }
1348        }
1349
1350        if !persist_compaction_commands.is_empty() {
1351            cmd_tx
1352                .send(BackgroundCmd::DowngradeSince(persist_compaction_commands))
1353                .expect("cannot fail to send");
1354        }
1355    }
1356
1357    /// Remove any shards that we know are finalized
1358    fn synchronize_finalized_shards(&self, storage_metadata: &StorageMetadata) {
1359        self.finalized_shards
1360            .lock()
1361            .retain(|shard| storage_metadata.unfinalized_shards.contains(shard));
1362    }
1363}
1364
1365// See comments on the above impl for StorageCollectionsImpl.
1366#[async_trait]
1367impl<T> StorageCollections for StorageCollectionsImpl<T>
1368where
1369    T: TimelyTimestamp
1370        + Lattice
1371        + Codec64
1372        + From<EpochMillis>
1373        + TimestampManipulation
1374        + Into<mz_repr::Timestamp>
1375        + Sync,
1376{
1377    type Timestamp = T;
1378
1379    async fn initialize_state(
1380        &self,
1381        txn: &mut (dyn StorageTxn<T> + Send),
1382        init_ids: BTreeSet<GlobalId>,
1383    ) -> Result<(), StorageError<T>> {
1384        let metadata = txn.get_collection_metadata();
1385        let existing_metadata: BTreeSet<_> = metadata.into_iter().map(|(id, _)| id).collect();
1386
1387        // Determine which collections we do not yet have metadata for.
1388        let new_collections: BTreeSet<GlobalId> =
1389            init_ids.difference(&existing_metadata).cloned().collect();
1390
1391        self.prepare_state(
1392            txn,
1393            new_collections,
1394            BTreeSet::default(),
1395            BTreeMap::default(),
1396        )
1397        .await?;
1398
1399        // All shards that belong to collections dropped in the last epoch are
1400        // eligible for finalization.
1401        //
1402        // n.b. this introduces an unlikely race condition: if a collection is
1403        // dropped from the catalog, but the dataflow is still running on a
1404        // worker, assuming the shard is safe to finalize on reboot may cause
1405        // the cluster to panic.
1406        let unfinalized_shards = txn.get_unfinalized_shards().into_iter().collect_vec();
1407
1408        info!(?unfinalized_shards, "initializing finalizable_shards");
1409
1410        self.finalizable_shards.lock().extend(unfinalized_shards);
1411
1412        Ok(())
1413    }
1414
1415    fn update_parameters(&self, config_params: StorageParameters) {
1416        // We serialize the dyncfg updates in StorageParameters, but configure
1417        // persist separately.
1418        config_params.dyncfg_updates.apply(self.persist.cfg());
1419
1420        self.config
1421            .lock()
1422            .expect("lock poisoned")
1423            .update(config_params);
1424    }
1425
1426    fn collection_metadata(
1427        &self,
1428        id: GlobalId,
1429    ) -> Result<CollectionMetadata, StorageError<Self::Timestamp>> {
1430        let collections = self.collections.lock().expect("lock poisoned");
1431
1432        collections
1433            .get(&id)
1434            .map(|c| c.collection_metadata.clone())
1435            .ok_or(StorageError::IdentifierMissing(id))
1436    }
1437
1438    fn active_collection_metadatas(&self) -> Vec<(GlobalId, CollectionMetadata)> {
1439        let collections = self.collections.lock().expect("lock poisoned");
1440
1441        collections
1442            .iter()
1443            .filter(|(_id, c)| !c.is_dropped())
1444            .map(|(id, c)| (*id, c.collection_metadata.clone()))
1445            .collect()
1446    }
1447
1448    fn collections_frontiers(
1449        &self,
1450        ids: Vec<GlobalId>,
1451    ) -> Result<Vec<CollectionFrontiers<Self::Timestamp>>, StorageError<Self::Timestamp>> {
1452        if ids.is_empty() {
1453            return Ok(vec![]);
1454        }
1455
1456        let collections = self.collections.lock().expect("lock poisoned");
1457
1458        let res = ids
1459            .into_iter()
1460            .map(|id| {
1461                collections
1462                    .get(&id)
1463                    .map(|c| CollectionFrontiers {
1464                        id: id.clone(),
1465                        write_frontier: c.write_frontier.clone(),
1466                        implied_capability: c.implied_capability.clone(),
1467                        read_capabilities: c.read_capabilities.frontier().to_owned(),
1468                    })
1469                    .ok_or(StorageError::IdentifierMissing(id))
1470            })
1471            .collect::<Result<Vec<_>, _>>()?;
1472
1473        Ok(res)
1474    }
1475
1476    fn active_collection_frontiers(&self) -> Vec<CollectionFrontiers<Self::Timestamp>> {
1477        let collections = self.collections.lock().expect("lock poisoned");
1478
1479        let res = collections
1480            .iter()
1481            .filter(|(_id, c)| !c.is_dropped())
1482            .map(|(id, c)| CollectionFrontiers {
1483                id: id.clone(),
1484                write_frontier: c.write_frontier.clone(),
1485                implied_capability: c.implied_capability.clone(),
1486                read_capabilities: c.read_capabilities.frontier().to_owned(),
1487            })
1488            .collect_vec();
1489
1490        res
1491    }
1492
1493    async fn snapshot_stats(
1494        &self,
1495        id: GlobalId,
1496        as_of: Antichain<Self::Timestamp>,
1497    ) -> Result<SnapshotStats, StorageError<Self::Timestamp>> {
1498        let metadata = self.collection_metadata(id)?;
1499
1500        // See the comments in StorageController::snapshot for what's going on
1501        // here.
1502        let as_of = match metadata.txns_shard.as_ref() {
1503            None => SnapshotStatsAsOf::Direct(as_of),
1504            Some(txns_id) => {
1505                assert_eq!(txns_id, self.txns_read.txns_id());
1506                let as_of = as_of
1507                    .into_option()
1508                    .expect("cannot read as_of the empty antichain");
1509                self.txns_read.update_gt(as_of.clone()).await;
1510                let data_snapshot = self
1511                    .txns_read
1512                    .data_snapshot(metadata.data_shard, as_of.clone())
1513                    .await;
1514                SnapshotStatsAsOf::Txns(data_snapshot)
1515            }
1516        };
1517        self.snapshot_stats_inner(id, as_of).await
1518    }
1519
1520    async fn snapshot_parts_stats(
1521        &self,
1522        id: GlobalId,
1523        as_of: Antichain<Self::Timestamp>,
1524    ) -> BoxFuture<'static, Result<SnapshotPartsStats, StorageError<Self::Timestamp>>> {
1525        let metadata = {
1526            let self_collections = self.collections.lock().expect("lock poisoned");
1527
1528            let collection_metadata = self_collections
1529                .get(&id)
1530                .ok_or(StorageError::IdentifierMissing(id))
1531                .map(|c| c.collection_metadata.clone());
1532
1533            match collection_metadata {
1534                Ok(m) => m,
1535                Err(e) => return Box::pin(async move { Err(e) }),
1536            }
1537        };
1538
1539        // See the comments in StorageController::snapshot for what's going on
1540        // here.
1541        let persist = Arc::clone(&self.persist);
1542        let read_handle = Self::read_handle_for_snapshot(persist, &metadata, id).await;
1543
1544        let data_snapshot = match (metadata, as_of.as_option()) {
1545            (
1546                CollectionMetadata {
1547                    txns_shard: Some(txns_id),
1548                    data_shard,
1549                    ..
1550                },
1551                Some(as_of),
1552            ) => {
1553                assert_eq!(txns_id, *self.txns_read.txns_id());
1554                self.txns_read.update_gt(as_of.clone()).await;
1555                let data_snapshot = self
1556                    .txns_read
1557                    .data_snapshot(data_shard, as_of.clone())
1558                    .await;
1559                Some(data_snapshot)
1560            }
1561            _ => None,
1562        };
1563
1564        Box::pin(async move {
1565            let read_handle = read_handle?;
1566            let result = match data_snapshot {
1567                Some(data_snapshot) => data_snapshot.snapshot_parts_stats(&read_handle).await,
1568                None => read_handle.snapshot_parts_stats(as_of).await,
1569            };
1570            read_handle.expire().await;
1571            result.map_err(|_| StorageError::ReadBeforeSince(id))
1572        })
1573    }
1574
1575    // TODO(petrosagg): This signature is not very useful in the context of partially ordered times
1576    // where the as_of frontier might have multiple elements. In the current form the mutually
1577    // incomparable updates will be accumulated together to a state of the collection that never
1578    // actually existed. We should include the original time in the updates advanced by the as_of
1579    // frontier in the result and let the caller decide what to do with the information.
1580    fn snapshot(
1581        &self,
1582        id: GlobalId,
1583        as_of: Self::Timestamp,
1584    ) -> BoxFuture<'static, Result<Vec<(Row, StorageDiff)>, StorageError<Self::Timestamp>>> {
1585        self.snapshot(id, as_of, &self.txns_read)
1586    }
1587
1588    async fn snapshot_latest(
1589        &self,
1590        id: GlobalId,
1591    ) -> Result<Vec<Row>, StorageError<Self::Timestamp>> {
1592        let upper = self.recent_upper(id).await?;
1593        let res = match upper.as_option() {
1594            Some(f) if f > &T::minimum() => {
1595                let as_of = f.step_back().unwrap();
1596
1597                let snapshot = self.snapshot(id, as_of, &self.txns_read).await.unwrap();
1598                snapshot
1599                    .into_iter()
1600                    .map(|(row, diff)| {
1601                        assert_eq!(diff, 1, "snapshot doesn't accumulate to set");
1602                        row
1603                    })
1604                    .collect()
1605            }
1606            Some(_min) => {
1607                // The collection must be empty!
1608                Vec::new()
1609            }
1610            // The collection is closed, we cannot determine a latest read
1611            // timestamp based on the upper.
1612            _ => {
1613                return Err(StorageError::InvalidUsage(
1614                    "collection closed, cannot determine a read timestamp based on the upper"
1615                        .to_string(),
1616                ));
1617            }
1618        };
1619
1620        Ok(res)
1621    }
1622
1623    fn snapshot_cursor(
1624        &self,
1625        id: GlobalId,
1626        as_of: Self::Timestamp,
1627    ) -> BoxFuture<'static, Result<SnapshotCursor<Self::Timestamp>, StorageError<Self::Timestamp>>>
1628    where
1629        Self::Timestamp: TimelyTimestamp + Lattice + Codec64,
1630    {
1631        let metadata = match self.collection_metadata(id) {
1632            Ok(metadata) => metadata.clone(),
1633            Err(e) => return async { Err(e) }.boxed(),
1634        };
1635        let txns_read = metadata.txns_shard.as_ref().map(|txns_id| {
1636            // Ensure the txn's shard the controller has is the same that this
1637            // collection is registered to.
1638            assert_eq!(txns_id, self.txns_read.txns_id());
1639            self.txns_read.clone()
1640        });
1641        let persist = Arc::clone(&self.persist);
1642
1643        // See the comments in Self::snapshot for what's going on here.
1644        async move {
1645            let mut handle = Self::read_handle_for_snapshot(persist, &metadata, id).await?;
1646            let cursor = match txns_read {
1647                None => {
1648                    let cursor = handle
1649                        .snapshot_cursor(Antichain::from_elem(as_of), |_| true)
1650                        .await
1651                        .map_err(|_| StorageError::ReadBeforeSince(id))?;
1652                    SnapshotCursor {
1653                        _read_handle: handle,
1654                        cursor,
1655                    }
1656                }
1657                Some(txns_read) => {
1658                    txns_read.update_gt(as_of.clone()).await;
1659                    let data_snapshot = txns_read
1660                        .data_snapshot(metadata.data_shard, as_of.clone())
1661                        .await;
1662                    let cursor = data_snapshot
1663                        .snapshot_cursor(&mut handle, |_| true)
1664                        .await
1665                        .map_err(|_| StorageError::ReadBeforeSince(id))?;
1666                    SnapshotCursor {
1667                        _read_handle: handle,
1668                        cursor,
1669                    }
1670                }
1671            };
1672
1673            Ok(cursor)
1674        }
1675        .boxed()
1676    }
1677
1678    fn snapshot_and_stream(
1679        &self,
1680        id: GlobalId,
1681        as_of: Self::Timestamp,
1682    ) -> BoxFuture<
1683        'static,
1684        Result<
1685            BoxStream<'static, (SourceData, Self::Timestamp, StorageDiff)>,
1686            StorageError<Self::Timestamp>,
1687        >,
1688    >
1689    where
1690        Self::Timestamp: TimelyTimestamp + Lattice + Codec64 + 'static,
1691    {
1692        self.snapshot_and_stream(id, as_of, &self.txns_read)
1693    }
1694
1695    fn create_update_builder(
1696        &self,
1697        id: GlobalId,
1698    ) -> BoxFuture<
1699        'static,
1700        Result<
1701            TimestamplessUpdateBuilder<SourceData, (), Self::Timestamp, StorageDiff>,
1702            StorageError<Self::Timestamp>,
1703        >,
1704    > {
1705        let metadata = match self.collection_metadata(id) {
1706            Ok(m) => m,
1707            Err(e) => return Box::pin(async move { Err(e) }),
1708        };
1709        let persist = Arc::clone(&self.persist);
1710
1711        async move {
1712            let persist_client = persist
1713                .open(metadata.persist_location.clone())
1714                .await
1715                .expect("invalid persist usage");
1716            let write_handle = persist_client
1717                .open_writer::<SourceData, (), Self::Timestamp, StorageDiff>(
1718                    metadata.data_shard,
1719                    Arc::new(metadata.relation_desc.clone()),
1720                    Arc::new(UnitSchema),
1721                    Diagnostics {
1722                        shard_name: id.to_string(),
1723                        handle_purpose: format!("create write batch {}", id),
1724                    },
1725                )
1726                .await
1727                .expect("invalid persist usage");
1728            let builder = TimestamplessUpdateBuilder::new(&write_handle);
1729
1730            Ok(builder)
1731        }
1732        .boxed()
1733    }
1734
1735    fn check_exists(&self, id: GlobalId) -> Result<(), StorageError<Self::Timestamp>> {
1736        let collections = self.collections.lock().expect("lock poisoned");
1737
1738        if collections.contains_key(&id) {
1739            Ok(())
1740        } else {
1741            Err(StorageError::IdentifierMissing(id))
1742        }
1743    }
1744
1745    async fn prepare_state(
1746        &self,
1747        txn: &mut (dyn StorageTxn<Self::Timestamp> + Send),
1748        ids_to_add: BTreeSet<GlobalId>,
1749        ids_to_drop: BTreeSet<GlobalId>,
1750        ids_to_register: BTreeMap<GlobalId, ShardId>,
1751    ) -> Result<(), StorageError<T>> {
1752        txn.insert_collection_metadata(
1753            ids_to_add
1754                .into_iter()
1755                .map(|id| (id, ShardId::new()))
1756                .collect(),
1757        )?;
1758        txn.insert_collection_metadata(ids_to_register)?;
1759
1760        // Delete the metadata for any dropped collections.
1761        let dropped_mappings = txn.delete_collection_metadata(ids_to_drop);
1762
1763        let dropped_shards = dropped_mappings
1764            .into_iter()
1765            .map(|(_id, shard)| shard)
1766            .collect();
1767
1768        txn.insert_unfinalized_shards(dropped_shards)?;
1769
1770        // Reconcile any shards we've successfully finalized with the shard
1771        // finalization collection.
1772        let finalized_shards = self.finalized_shards.lock().iter().copied().collect();
1773        txn.mark_shards_as_finalized(finalized_shards);
1774
1775        Ok(())
1776    }
1777
1778    // TODO(aljoscha): It would be swell if we could refactor this Leviathan of
1779    // a method/move individual parts to their own methods.
1780    #[instrument(level = "debug")]
1781    async fn create_collections_for_bootstrap(
1782        &self,
1783        storage_metadata: &StorageMetadata,
1784        register_ts: Option<Self::Timestamp>,
1785        mut collections: Vec<(GlobalId, CollectionDescription<Self::Timestamp>)>,
1786        migrated_storage_collections: &BTreeSet<GlobalId>,
1787    ) -> Result<(), StorageError<Self::Timestamp>> {
1788        let is_in_txns = |id, metadata: &CollectionMetadata| {
1789            metadata.txns_shard.is_some()
1790                && !(self.read_only && migrated_storage_collections.contains(&id))
1791        };
1792
1793        // Validate first, to avoid corrupting state.
1794        // 1. create a dropped identifier, or
1795        // 2. create an existing identifier with a new description.
1796        // Make sure to check for errors within `ingestions` as well.
1797        collections.sort_by_key(|(id, _)| *id);
1798        collections.dedup();
1799        for pos in 1..collections.len() {
1800            if collections[pos - 1].0 == collections[pos].0 {
1801                return Err(StorageError::CollectionIdReused(collections[pos].0));
1802            }
1803        }
1804
1805        {
1806            // Early sanity check: if we knew about a collection already it's
1807            // description must match!
1808            //
1809            // NOTE: There could be concurrent modifications to
1810            // `self.collections`, but this sanity check is better than nothing.
1811            let self_collections = self.collections.lock().expect("lock poisoned");
1812            for (id, description) in collections.iter() {
1813                if let Some(existing_collection) = self_collections.get(id) {
1814                    if &existing_collection.description != description {
1815                        return Err(StorageError::CollectionIdReused(*id));
1816                    }
1817                }
1818            }
1819        }
1820
1821        // We first enrich each collection description with some additional
1822        // metadata...
1823        let enriched_with_metadata = collections
1824            .into_iter()
1825            .map(|(id, description)| {
1826                let data_shard = storage_metadata.get_collection_shard::<T>(id)?;
1827
1828                // If the shard is being managed by txn-wal (initially,
1829                // tables), then we need to pass along the shard id for the txns
1830                // shard to dataflow rendering.
1831                let txns_shard = description
1832                    .data_source
1833                    .in_txns()
1834                    .then(|| *self.txns_read.txns_id());
1835
1836                let metadata = CollectionMetadata {
1837                    persist_location: self.persist_location.clone(),
1838                    data_shard,
1839                    relation_desc: description.desc.clone(),
1840                    txns_shard,
1841                };
1842
1843                Ok((id, description, metadata))
1844            })
1845            .collect_vec();
1846
1847        // So that we can open `SinceHandle`s for each collections concurrently.
1848        let persist_client = self
1849            .persist
1850            .open(self.persist_location.clone())
1851            .await
1852            .unwrap();
1853        let persist_client = &persist_client;
1854        // Reborrow the `&mut self` as immutable, as all the concurrent work to
1855        // be processed in this stream cannot all have exclusive access.
1856        use futures::stream::{StreamExt, TryStreamExt};
1857        let this = &*self;
1858        let mut to_register: Vec<_> = futures::stream::iter(enriched_with_metadata)
1859            .map(|data: Result<_, StorageError<Self::Timestamp>>| {
1860                let register_ts = register_ts.clone();
1861                async move {
1862                    let (id, description, metadata) = data?;
1863
1864                    // should be replaced with real introspection
1865                    // (https://github.com/MaterializeInc/database-issues/issues/4078)
1866                    // but for now, it's helpful to have this mapping written down
1867                    // somewhere
1868                    debug!("mapping GlobalId={} to shard ({})", id, metadata.data_shard);
1869
1870                    let (write, mut since_handle) = this
1871                        .open_data_handles(
1872                            &id,
1873                            metadata.data_shard,
1874                            description.since.as_ref(),
1875                            metadata.relation_desc.clone(),
1876                            persist_client,
1877                        )
1878                        .await;
1879
1880                    // Present tables as springing into existence at the register_ts
1881                    // by advancing the since. Otherwise, we could end up in a
1882                    // situation where a table with a long compaction window appears
1883                    // to exist before the environment (and this the table) existed.
1884                    //
1885                    // We could potentially also do the same thing for other
1886                    // sources, in particular storage's internal sources and perhaps
1887                    // others, but leave them for now.
1888                    match description.data_source {
1889                        DataSource::Introspection(_)
1890                        | DataSource::IngestionExport { .. }
1891                        | DataSource::Webhook
1892                        | DataSource::Ingestion(_)
1893                        | DataSource::Progress
1894                        | DataSource::Other => {}
1895                        DataSource::Sink { .. } => {}
1896                        DataSource::Table { .. } => {
1897                            let register_ts = register_ts.expect(
1898                                "caller should have provided a register_ts when creating a table",
1899                            );
1900                            if since_handle.since().elements() == &[T::minimum()]
1901                                && !migrated_storage_collections.contains(&id)
1902                            {
1903                                debug!("advancing {} to initial since of {:?}", id, register_ts);
1904                                let token = since_handle.opaque();
1905                                let _ = since_handle
1906                                    .compare_and_downgrade_since(
1907                                        &token,
1908                                        (&token, &Antichain::from_elem(register_ts.clone())),
1909                                    )
1910                                    .await;
1911                            }
1912                        }
1913                    }
1914
1915                    Ok::<_, StorageError<Self::Timestamp>>((
1916                        id,
1917                        description,
1918                        write,
1919                        since_handle,
1920                        metadata,
1921                    ))
1922                }
1923            })
1924            // Poll each future for each collection concurrently, maximum of 50 at a time.
1925            .buffer_unordered(50)
1926            // HERE BE DRAGONS:
1927            //
1928            // There are at least 2 subtleties in using `FuturesUnordered`
1929            // (which `buffer_unordered` uses underneath:
1930            // - One is captured here
1931            //   <https://github.com/rust-lang/futures-rs/issues/2387>
1932            // - And the other is deadlocking if processing an OUTPUT of a
1933            //   `FuturesUnordered` stream attempts to obtain an async mutex that
1934            //   is also obtained in the futures being polled.
1935            //
1936            // Both of these could potentially be issues in all usages of
1937            // `buffer_unordered` in this method, so we stick the standard
1938            // advice: only use `try_collect` or `collect`!
1939            .try_collect()
1940            .await?;
1941
1942        // Reorder in dependency order.
1943        #[derive(Ord, PartialOrd, Eq, PartialEq)]
1944        enum DependencyOrder {
1945            /// Tables should always be registered first, and large ids before small ones.
1946            Table(Reverse<GlobalId>),
1947            /// For most collections the id order is the correct one.
1948            Collection(GlobalId),
1949            /// Sinks should always be registered last.
1950            Sink(GlobalId),
1951        }
1952        to_register.sort_by_key(|(id, desc, ..)| match &desc.data_source {
1953            DataSource::Table { .. } => DependencyOrder::Table(Reverse(*id)),
1954            DataSource::Sink { .. } => DependencyOrder::Sink(*id),
1955            _ => DependencyOrder::Collection(*id),
1956        });
1957
1958        // We hold this lock for a very short amount of time, just doing some
1959        // hashmap inserts and unbounded channel sends.
1960        let mut self_collections = self.collections.lock().expect("lock poisoned");
1961
1962        for (id, description, write_handle, since_handle, metadata) in to_register {
1963            let write_frontier = write_handle.upper();
1964            let data_shard_since = since_handle.since().clone();
1965
1966            // Determine if this collection has any dependencies.
1967            let storage_dependencies = self.determine_collection_dependencies(
1968                &*self_collections,
1969                id,
1970                &description.data_source,
1971            )?;
1972
1973            // Determine the initial since of the collection.
1974            let initial_since = match storage_dependencies
1975                .iter()
1976                .at_most_one()
1977                .expect("should have at most one dependency")
1978            {
1979                Some(dep) => {
1980                    let dependency_collection = self_collections
1981                        .get(dep)
1982                        .ok_or(StorageError::IdentifierMissing(*dep))?;
1983                    let dependency_since = dependency_collection.implied_capability.clone();
1984
1985                    // If an item has a dependency, its initial since must be
1986                    // advanced as far as its dependency, i.e. a dependency's
1987                    // since may never be in advance of its dependents.
1988                    //
1989                    // We have to do this every time we initialize the
1990                    // collection, though––the invariant might have been upheld
1991                    // correctly in the previous epoch, but the
1992                    // `data_shard_since` might not have compacted and, on
1993                    // establishing a new persist connection, still have data we
1994                    // said _could_ be compacted.
1995                    if PartialOrder::less_than(&data_shard_since, &dependency_since) {
1996                        // The dependency since cannot be beyond the dependent
1997                        // (our) upper unless the collection is new. In
1998                        // practice, the depdenency is the remap shard of a
1999                        // source (export), and if the since is allowed to
2000                        // "catch up" to the upper, that is `upper <= since`, a
2001                        // restarting ingestion cannot differentiate between
2002                        // updates that have already been written out to the
2003                        // backing persist shard and updates that have yet to be
2004                        // written. We would write duplicate updates.
2005                        //
2006                        // If this check fails, it means that the read hold
2007                        // installed on the dependency was probably not upheld
2008                        // –– if it were, the dependency's since could not have
2009                        // advanced as far the dependent's upper.
2010                        //
2011                        // We don't care about the dependency since when the
2012                        // write frontier is empty. In that case, no-one can
2013                        // write down any more updates.
2014                        mz_ore::soft_assert_or_log!(
2015                            write_frontier.elements() == &[T::minimum()]
2016                                || write_frontier.is_empty()
2017                                || PartialOrder::less_than(&dependency_since, write_frontier),
2018                            "dependency ({dep}) since has advanced past dependent ({id}) upper \n
2019                            dependent ({id}): since {:?}, upper {:?} \n
2020                            dependency ({dep}): since {:?}",
2021                            data_shard_since,
2022                            write_frontier,
2023                            dependency_since
2024                        );
2025
2026                        dependency_since
2027                    } else {
2028                        data_shard_since
2029                    }
2030                }
2031                None => data_shard_since,
2032            };
2033
2034            let mut collection_state = CollectionState::new(
2035                description,
2036                initial_since,
2037                write_frontier.clone(),
2038                storage_dependencies,
2039                metadata.clone(),
2040            );
2041
2042            // Install the collection state in the appropriate spot.
2043            match &collection_state.description.data_source {
2044                DataSource::Introspection(_) => {
2045                    self_collections.insert(id, collection_state);
2046                }
2047                DataSource::Webhook => {
2048                    self_collections.insert(id, collection_state);
2049                }
2050                DataSource::IngestionExport {
2051                    ingestion_id,
2052                    details,
2053                    data_config,
2054                } => {
2055                    // Adjust the source to contain this export.
2056                    let source_collection = self_collections
2057                        .get_mut(ingestion_id)
2058                        .expect("known to exist");
2059                    match &mut source_collection.description {
2060                        CollectionDescription {
2061                            data_source: DataSource::Ingestion(ingestion_desc),
2062                            ..
2063                        } => ingestion_desc.source_exports.insert(
2064                            id,
2065                            SourceExport {
2066                                storage_metadata: (),
2067                                details: details.clone(),
2068                                data_config: data_config.clone(),
2069                            },
2070                        ),
2071                        _ => unreachable!(
2072                            "SourceExport must only refer to primary sources that already exist"
2073                        ),
2074                    };
2075
2076                    self_collections.insert(id, collection_state);
2077                }
2078                DataSource::Table { .. } => {
2079                    // See comment on self.initial_txn_upper on why we're doing
2080                    // this.
2081                    if is_in_txns(id, &metadata)
2082                        && PartialOrder::less_than(
2083                            &collection_state.write_frontier,
2084                            &self.initial_txn_upper,
2085                        )
2086                    {
2087                        // We could try and be cute and use the join of the txn
2088                        // upper and the table upper. But that has more
2089                        // complicated reasoning for why it is or isn't correct,
2090                        // and we're only dealing with totally ordered times
2091                        // here.
2092                        collection_state
2093                            .write_frontier
2094                            .clone_from(&self.initial_txn_upper);
2095                    }
2096                    self_collections.insert(id, collection_state);
2097                }
2098                DataSource::Progress | DataSource::Other => {
2099                    self_collections.insert(id, collection_state);
2100                }
2101                DataSource::Ingestion(_) => {
2102                    self_collections.insert(id, collection_state);
2103                }
2104                DataSource::Sink { .. } => {
2105                    self_collections.insert(id, collection_state);
2106                }
2107            }
2108
2109            self.register_handles(id, is_in_txns(id, &metadata), since_handle, write_handle);
2110
2111            // If this collection has a dependency, install a read hold on it.
2112            self.install_collection_dependency_read_holds_inner(&mut *self_collections, id)?;
2113        }
2114
2115        drop(self_collections);
2116
2117        self.synchronize_finalized_shards(storage_metadata);
2118
2119        Ok(())
2120    }
2121
2122    async fn alter_ingestion_source_desc(
2123        &self,
2124        ingestion_id: GlobalId,
2125        source_desc: SourceDesc,
2126    ) -> Result<(), StorageError<Self::Timestamp>> {
2127        // The StorageController checks the validity of these. And we just
2128        // accept them.
2129
2130        let mut self_collections = self.collections.lock().expect("lock poisoned");
2131        let collection = self_collections
2132            .get_mut(&ingestion_id)
2133            .ok_or(StorageError::IdentifierMissing(ingestion_id))?;
2134
2135        let curr_ingestion = match &mut collection.description.data_source {
2136            DataSource::Ingestion(active_ingestion) => active_ingestion,
2137            _ => unreachable!("verified collection refers to ingestion"),
2138        };
2139
2140        curr_ingestion.desc = source_desc;
2141        debug!("altered {ingestion_id}'s SourceDesc");
2142
2143        Ok(())
2144    }
2145
2146    async fn alter_ingestion_export_data_configs(
2147        &self,
2148        source_exports: BTreeMap<GlobalId, SourceExportDataConfig>,
2149    ) -> Result<(), StorageError<Self::Timestamp>> {
2150        let mut self_collections = self.collections.lock().expect("lock poisoned");
2151
2152        for (source_export_id, new_data_config) in source_exports {
2153            // We need to adjust the data config on the CollectionState for
2154            // the source export collection directly
2155            let source_export_collection = self_collections
2156                .get_mut(&source_export_id)
2157                .ok_or_else(|| StorageError::IdentifierMissing(source_export_id))?;
2158            let ingestion_id = match &mut source_export_collection.description.data_source {
2159                DataSource::IngestionExport {
2160                    ingestion_id,
2161                    details: _,
2162                    data_config,
2163                } => {
2164                    *data_config = new_data_config.clone();
2165                    *ingestion_id
2166                }
2167                o => {
2168                    tracing::warn!("alter_ingestion_export_data_configs called on {:?}", o);
2169                    Err(StorageError::IdentifierInvalid(source_export_id))?
2170                }
2171            };
2172            // We also need to adjust the data config on the CollectionState of the
2173            // Ingestion that the export is associated with.
2174            let ingestion_collection = self_collections
2175                .get_mut(&ingestion_id)
2176                .ok_or_else(|| StorageError::IdentifierMissing(ingestion_id))?;
2177
2178            match &mut ingestion_collection.description.data_source {
2179                DataSource::Ingestion(ingestion_desc) => {
2180                    let source_export = ingestion_desc
2181                        .source_exports
2182                        .get_mut(&source_export_id)
2183                        .ok_or_else(|| StorageError::IdentifierMissing(source_export_id))?;
2184
2185                    if source_export.data_config != new_data_config {
2186                        tracing::info!(?source_export_id, from = ?source_export.data_config, to = ?new_data_config, "alter_ingestion_export_data_configs, updating");
2187                        source_export.data_config = new_data_config;
2188                    } else {
2189                        tracing::warn!(
2190                            "alter_ingestion_export_data_configs called on \
2191                                    export {source_export_id} of {ingestion_id} but \
2192                                    the data config was the same"
2193                        );
2194                    }
2195                }
2196                o => {
2197                    tracing::warn!("alter_ingestion_export_data_configs called on {:?}", o);
2198                    Err(StorageError::IdentifierInvalid(ingestion_id))?;
2199                }
2200            }
2201        }
2202
2203        Ok(())
2204    }
2205
2206    async fn alter_ingestion_connections(
2207        &self,
2208        source_connections: BTreeMap<GlobalId, GenericSourceConnection<InlinedConnection>>,
2209    ) -> Result<(), StorageError<Self::Timestamp>> {
2210        let mut self_collections = self.collections.lock().expect("lock poisoned");
2211
2212        for (id, conn) in source_connections {
2213            let collection = self_collections
2214                .get_mut(&id)
2215                .ok_or_else(|| StorageError::IdentifierMissing(id))?;
2216
2217            match &mut collection.description.data_source {
2218                DataSource::Ingestion(ingestion) => {
2219                    // If the connection hasn't changed, there's no sense in
2220                    // re-rendering the dataflow.
2221                    if ingestion.desc.connection != conn {
2222                        info!(from = ?ingestion.desc.connection, to = ?conn, "alter_ingestion_connections, updating");
2223                        ingestion.desc.connection = conn;
2224                    } else {
2225                        warn!(
2226                            "update_source_connection called on {id} but the \
2227                            connection was the same"
2228                        );
2229                    }
2230                }
2231                o => {
2232                    warn!("update_source_connection called on {:?}", o);
2233                    Err(StorageError::IdentifierInvalid(id))?;
2234                }
2235            }
2236        }
2237
2238        Ok(())
2239    }
2240
2241    async fn alter_table_desc(
2242        &self,
2243        existing_collection: GlobalId,
2244        new_collection: GlobalId,
2245        new_desc: RelationDesc,
2246        expected_version: RelationVersion,
2247    ) -> Result<(), StorageError<Self::Timestamp>> {
2248        let data_shard = {
2249            let self_collections = self.collections.lock().expect("lock poisoned");
2250            let existing = self_collections
2251                .get(&existing_collection)
2252                .ok_or_else(|| StorageError::IdentifierMissing(existing_collection))?;
2253
2254            // TODO(alter_table): Support changes to sources.
2255            if !matches!(&existing.description.data_source, DataSource::Table { .. }) {
2256                return Err(StorageError::IdentifierInvalid(existing_collection));
2257            }
2258
2259            existing.collection_metadata.data_shard
2260        };
2261
2262        let persist_client = self
2263            .persist
2264            .open(self.persist_location.clone())
2265            .await
2266            .unwrap();
2267
2268        // Evolve the schema of this shard.
2269        let diagnostics = Diagnostics {
2270            shard_name: existing_collection.to_string(),
2271            handle_purpose: "alter_table_desc".to_string(),
2272        };
2273        // We map the Adapter's RelationVersion 1:1 with SchemaId.
2274        let expected_schema = expected_version.into();
2275        let schema_result = persist_client
2276            .compare_and_evolve_schema::<SourceData, (), T, StorageDiff>(
2277                data_shard,
2278                expected_schema,
2279                &new_desc,
2280                &UnitSchema,
2281                diagnostics,
2282            )
2283            .await
2284            .map_err(|e| StorageError::InvalidUsage(e.to_string()))?;
2285        tracing::info!(
2286            ?existing_collection,
2287            ?new_collection,
2288            ?new_desc,
2289            "evolved schema"
2290        );
2291
2292        match schema_result {
2293            CaESchema::Ok(id) => id,
2294            // TODO(alter_table): If we get an expected mismatch we should retry.
2295            CaESchema::ExpectedMismatch {
2296                schema_id,
2297                key,
2298                val,
2299            } => {
2300                mz_ore::soft_panic_or_log!(
2301                    "schema expectation mismatch {schema_id:?}, {key:?}, {val:?}"
2302                );
2303                return Err(StorageError::Generic(anyhow::anyhow!(
2304                    "schema expected mismatch, {existing_collection:?}",
2305                )));
2306            }
2307            CaESchema::Incompatible => {
2308                mz_ore::soft_panic_or_log!(
2309                    "incompatible schema! {existing_collection} {new_desc:?}"
2310                );
2311                return Err(StorageError::Generic(anyhow::anyhow!(
2312                    "schema incompatible, {existing_collection:?}"
2313                )));
2314            }
2315        };
2316
2317        // Once the new schema is registered we can open new data handles.
2318        let (write_handle, since_handle) = self
2319            .open_data_handles(
2320                &new_collection,
2321                data_shard,
2322                None,
2323                new_desc.clone(),
2324                &persist_client,
2325            )
2326            .await;
2327
2328        // TODO(alter_table): Do we need to advance the since of the table to match the time this
2329        // new version was registered with txn-wal?
2330
2331        // Great! Our new schema is registered with Persist, now we need to update our internal
2332        // data structures.
2333        {
2334            let mut self_collections = self.collections.lock().expect("lock poisoned");
2335
2336            // Update the existing collection so we know it's a "projection" of this new one.
2337            let existing = self_collections
2338                .get_mut(&existing_collection)
2339                .expect("existing collection missing");
2340
2341            // A higher level should already be asserting this, but let's make sure.
2342            assert!(matches!(
2343                existing.description.data_source,
2344                DataSource::Table { primary: None }
2345            ));
2346
2347            // The existing version of the table will depend on the new version.
2348            existing.description.data_source = DataSource::Table {
2349                primary: Some(new_collection),
2350            };
2351            existing.storage_dependencies.push(new_collection);
2352
2353            // Copy over the frontiers from the previous version.
2354            // The new table starts with two holds - the implied capability, and the hold from
2355            // the previous version - both at the previous version's read frontier.
2356            let implied_capability = existing.read_capabilities.frontier().to_owned();
2357            let write_frontier = existing.write_frontier.clone();
2358
2359            // Determine the relevant read capabilities on the new collection.
2360            //
2361            // Note(parkmycar): Originally we used `install_collection_dependency_read_holds_inner`
2362            // here, but that only installed a ReadHold on the new collection for the implied
2363            // capability of the existing collection. This would cause runtime panics because it
2364            // would eventually result in negative read capabilities.
2365            let mut changes = ChangeBatch::new();
2366            changes.extend(implied_capability.iter().map(|t| (t.clone(), 1)));
2367
2368            // Note: The new collection is now the "primary collection" so we specify `None` here.
2369            let collection_desc = CollectionDescription::for_table(new_desc.clone(), None);
2370            let collection_meta = CollectionMetadata {
2371                persist_location: self.persist_location.clone(),
2372                relation_desc: collection_desc.desc.clone(),
2373                data_shard,
2374                txns_shard: Some(self.txns_read.txns_id().clone()),
2375            };
2376            let collection_state = CollectionState::new(
2377                collection_desc,
2378                implied_capability,
2379                write_frontier,
2380                Vec::new(),
2381                collection_meta,
2382            );
2383
2384            // Add a record of the new collection.
2385            self_collections.insert(new_collection, collection_state);
2386
2387            let mut updates = BTreeMap::from([(new_collection, changes)]);
2388            StorageCollectionsImpl::update_read_capabilities_inner(
2389                &self.cmd_tx,
2390                &mut *self_collections,
2391                &mut updates,
2392            );
2393        };
2394
2395        // TODO(alter_table): Support changes to sources.
2396        self.register_handles(new_collection, true, since_handle, write_handle);
2397
2398        info!(%existing_collection, %new_collection, ?new_desc, "altered table");
2399
2400        Ok(())
2401    }
2402
2403    fn drop_collections_unvalidated(
2404        &self,
2405        storage_metadata: &StorageMetadata,
2406        identifiers: Vec<GlobalId>,
2407    ) {
2408        debug!(?identifiers, "drop_collections_unvalidated");
2409
2410        let mut self_collections = self.collections.lock().expect("lock poisoned");
2411
2412        for id in identifiers.iter() {
2413            let metadata = storage_metadata.get_collection_shard::<T>(*id);
2414            mz_ore::soft_assert_or_log!(
2415                matches!(metadata, Err(StorageError::IdentifierMissing(_))),
2416                "dropping {id}, but drop was not synchronized with storage \
2417                controller via `synchronize_collections`"
2418            );
2419
2420            let dropped_data_source = match self_collections.get(id) {
2421                Some(col) => col.description.data_source.clone(),
2422                None => continue,
2423            };
2424
2425            // If we are dropping source exports, we need to modify the
2426            // ingestion that it runs on.
2427            if let DataSource::IngestionExport { ingestion_id, .. } = dropped_data_source {
2428                // Adjust the source to remove this export.
2429                let ingestion = match self_collections.get_mut(&ingestion_id) {
2430                    Some(ingestion) => ingestion,
2431                    // Primary ingestion already dropped.
2432                    None => {
2433                        tracing::error!(
2434                            "primary source {ingestion_id} seemingly dropped before subsource {id}",
2435                        );
2436                        continue;
2437                    }
2438                };
2439
2440                match &mut ingestion.description {
2441                    CollectionDescription {
2442                        data_source: DataSource::Ingestion(ingestion_desc),
2443                        ..
2444                    } => {
2445                        let removed = ingestion_desc.source_exports.remove(id);
2446                        mz_ore::soft_assert_or_log!(
2447                            removed.is_some(),
2448                            "dropped subsource {id} already removed from source exports"
2449                        );
2450                    }
2451                    _ => unreachable!(
2452                        "SourceExport must only refer to primary sources that already exist"
2453                    ),
2454                };
2455            }
2456        }
2457
2458        // Policies that advance the since to the empty antichain. We do still
2459        // honor outstanding read holds, and collections will only be dropped
2460        // once those are removed as well.
2461        //
2462        // We don't explicitly remove read capabilities! Downgrading the
2463        // frontier of the source to `[]` (the empty Antichain), will propagate
2464        // to the storage dependencies.
2465        let mut finalized_policies = Vec::new();
2466
2467        for id in identifiers {
2468            // Make sure it's still there, might already have been deleted.
2469            if self_collections.contains_key(&id) {
2470                finalized_policies.push((id, ReadPolicy::ValidFrom(Antichain::new())));
2471            }
2472        }
2473        self.set_read_policies_inner(&mut self_collections, finalized_policies);
2474
2475        drop(self_collections);
2476
2477        self.synchronize_finalized_shards(storage_metadata);
2478    }
2479
2480    fn set_read_policies(&self, policies: Vec<(GlobalId, ReadPolicy<Self::Timestamp>)>) {
2481        let mut collections = self.collections.lock().expect("lock poisoned");
2482
2483        if tracing::enabled!(tracing::Level::TRACE) {
2484            let user_capabilities = collections
2485                .iter_mut()
2486                .filter(|(id, _c)| id.is_user())
2487                .map(|(id, c)| {
2488                    let updates = c.read_capabilities.updates().cloned().collect_vec();
2489                    (*id, c.implied_capability.clone(), updates)
2490                })
2491                .collect_vec();
2492
2493            trace!(?policies, ?user_capabilities, "set_read_policies");
2494        }
2495
2496        self.set_read_policies_inner(&mut collections, policies);
2497
2498        if tracing::enabled!(tracing::Level::TRACE) {
2499            let user_capabilities = collections
2500                .iter_mut()
2501                .filter(|(id, _c)| id.is_user())
2502                .map(|(id, c)| {
2503                    let updates = c.read_capabilities.updates().cloned().collect_vec();
2504                    (*id, c.implied_capability.clone(), updates)
2505                })
2506                .collect_vec();
2507
2508            trace!(?user_capabilities, "after! set_read_policies");
2509        }
2510    }
2511
2512    fn acquire_read_holds(
2513        &self,
2514        desired_holds: Vec<GlobalId>,
2515    ) -> Result<Vec<ReadHold<Self::Timestamp>>, ReadHoldError> {
2516        if desired_holds.is_empty() {
2517            return Ok(vec![]);
2518        }
2519
2520        let mut collections = self.collections.lock().expect("lock poisoned");
2521
2522        let mut advanced_holds = Vec::new();
2523        // We advance the holds by our current since frontier. Can't acquire
2524        // holds for times that have been compacted away!
2525        //
2526        // NOTE: We acquire read holds at the earliest possible time rather than
2527        // at the implied capability. This is so that, for example, adapter can
2528        // acquire a read hold to hold back the frontier, giving the COMPUTE
2529        // controller a chance to also acquire a read hold at that early
2530        // frontier. If/when we change the interplay between adapter and COMPUTE
2531        // to pass around ReadHold tokens, we might tighten this up and instead
2532        // acquire read holds at the implied capability.
2533        for id in desired_holds.iter() {
2534            let collection = collections
2535                .get(id)
2536                .ok_or(ReadHoldError::CollectionMissing(*id))?;
2537            let since = collection.read_capabilities.frontier().to_owned();
2538            advanced_holds.push((*id, since));
2539        }
2540
2541        let mut updates = advanced_holds
2542            .iter()
2543            .map(|(id, hold)| {
2544                let mut changes = ChangeBatch::new();
2545                changes.extend(hold.iter().map(|time| (time.clone(), 1)));
2546                (*id, changes)
2547            })
2548            .collect::<BTreeMap<_, _>>();
2549
2550        StorageCollectionsImpl::update_read_capabilities_inner(
2551            &self.cmd_tx,
2552            &mut collections,
2553            &mut updates,
2554        );
2555
2556        let acquired_holds = advanced_holds
2557            .into_iter()
2558            .map(|(id, since)| ReadHold::with_channel(id, since, self.holds_tx.clone()))
2559            .collect_vec();
2560
2561        trace!(?desired_holds, ?acquired_holds, "acquire_read_holds");
2562
2563        Ok(acquired_holds)
2564    }
2565
2566    /// Determine time dependence information for the object.
2567    fn determine_time_dependence(
2568        &self,
2569        id: GlobalId,
2570    ) -> Result<Option<TimeDependence>, TimeDependenceError> {
2571        use TimeDependenceError::CollectionMissing;
2572        let collections = self.collections.lock().expect("lock poisoned");
2573        let mut collection = Some(collections.get(&id).ok_or(CollectionMissing(id))?);
2574
2575        let mut result = None;
2576
2577        while let Some(c) = collection.take() {
2578            use DataSource::*;
2579            if let Some(timeline) = &c.description.timeline {
2580                // Only the epoch timeline follows wall-clock.
2581                if *timeline != Timeline::EpochMilliseconds {
2582                    break;
2583                }
2584            }
2585            match &c.description.data_source {
2586                Ingestion(ingestion) => {
2587                    use GenericSourceConnection::*;
2588                    match ingestion.desc.connection {
2589                        // Kafka, Postgres, MySql, and SQL Server sources all
2590                        // follow wall clock.
2591                        Kafka(_) | Postgres(_) | MySql(_) | SqlServer(_) => {
2592                            result = Some(TimeDependence::default())
2593                        }
2594                        // Load generators not further specified.
2595                        LoadGenerator(_) => {}
2596                    }
2597                }
2598                IngestionExport { ingestion_id, .. } => {
2599                    let c = collections
2600                        .get(ingestion_id)
2601                        .ok_or(CollectionMissing(*ingestion_id))?;
2602                    collection = Some(c);
2603                }
2604                // Introspection, other, progress, table, and webhook sources follow wall clock.
2605                Introspection(_) | Progress | Table { .. } | Webhook { .. } => {
2606                    result = Some(TimeDependence::default())
2607                }
2608                // Materialized views, continual tasks, etc, aren't managed by storage.
2609                Other => {}
2610                Sink { .. } => {}
2611            };
2612        }
2613        Ok(result)
2614    }
2615}
2616
2617/// Wraps either a "critical" [SinceHandle] or a leased [ReadHandle].
2618///
2619/// When a [StorageCollections] is in read-only mode, we will only ever acquire
2620/// [ReadHandle], because acquiring the [SinceHandle] and driving forward its
2621/// since is considered a write. Conversely, when in read-write mode, we acquire
2622/// [SinceHandle].
2623#[derive(Debug)]
2624enum SinceHandleWrapper<T>
2625where
2626    T: TimelyTimestamp + Lattice + Codec64,
2627{
2628    Critical(SinceHandle<SourceData, (), T, StorageDiff, PersistEpoch>),
2629    Leased(ReadHandle<SourceData, (), T, StorageDiff>),
2630}
2631
2632impl<T> SinceHandleWrapper<T>
2633where
2634    T: TimelyTimestamp + Lattice + Codec64 + TotalOrder + Sync,
2635{
2636    pub fn since(&self) -> &Antichain<T> {
2637        match self {
2638            Self::Critical(handle) => handle.since(),
2639            Self::Leased(handle) => handle.since(),
2640        }
2641    }
2642
2643    pub fn opaque(&self) -> PersistEpoch {
2644        match self {
2645            Self::Critical(handle) => handle.opaque().clone(),
2646            Self::Leased(_handle) => {
2647                // The opaque is expected to be used with
2648                // `compare_and_downgrade_since`, and the leased handle doesn't
2649                // have a notion of an opaque. We pretend here and in
2650                // `compare_and_downgrade_since`.
2651                PersistEpoch(None)
2652            }
2653        }
2654    }
2655
2656    pub async fn compare_and_downgrade_since(
2657        &mut self,
2658        expected: &PersistEpoch,
2659        new: (&PersistEpoch, &Antichain<T>),
2660    ) -> Result<Antichain<T>, PersistEpoch> {
2661        match self {
2662            Self::Critical(handle) => handle.compare_and_downgrade_since(expected, new).await,
2663            Self::Leased(handle) => {
2664                let (opaque, since) = new;
2665                assert_none!(opaque.0);
2666
2667                handle.downgrade_since(since).await;
2668
2669                Ok(since.clone())
2670            }
2671        }
2672    }
2673
2674    pub async fn maybe_compare_and_downgrade_since(
2675        &mut self,
2676        expected: &PersistEpoch,
2677        new: (&PersistEpoch, &Antichain<T>),
2678    ) -> Option<Result<Antichain<T>, PersistEpoch>> {
2679        match self {
2680            Self::Critical(handle) => {
2681                handle
2682                    .maybe_compare_and_downgrade_since(expected, new)
2683                    .await
2684            }
2685            Self::Leased(handle) => {
2686                let (opaque, since) = new;
2687                assert_none!(opaque.0);
2688
2689                handle.maybe_downgrade_since(since).await;
2690
2691                Some(Ok(since.clone()))
2692            }
2693        }
2694    }
2695
2696    pub fn snapshot_stats(
2697        &self,
2698        id: GlobalId,
2699        as_of: Option<Antichain<T>>,
2700    ) -> BoxFuture<'static, Result<SnapshotStats, StorageError<T>>> {
2701        match self {
2702            Self::Critical(handle) => {
2703                let res = handle
2704                    .snapshot_stats(as_of)
2705                    .map(move |x| x.map_err(|_| StorageError::ReadBeforeSince(id)));
2706                Box::pin(res)
2707            }
2708            Self::Leased(handle) => {
2709                let res = handle
2710                    .snapshot_stats(as_of)
2711                    .map(move |x| x.map_err(|_| StorageError::ReadBeforeSince(id)));
2712                Box::pin(res)
2713            }
2714        }
2715    }
2716
2717    pub fn snapshot_stats_from_txn(
2718        &self,
2719        id: GlobalId,
2720        data_snapshot: DataSnapshot<T>,
2721    ) -> BoxFuture<'static, Result<SnapshotStats, StorageError<T>>> {
2722        match self {
2723            Self::Critical(handle) => Box::pin(
2724                data_snapshot
2725                    .snapshot_stats_from_critical(handle)
2726                    .map(move |x| x.map_err(|_| StorageError::ReadBeforeSince(id))),
2727            ),
2728            Self::Leased(handle) => Box::pin(
2729                data_snapshot
2730                    .snapshot_stats_from_leased(handle)
2731                    .map(move |x| x.map_err(|_| StorageError::ReadBeforeSince(id))),
2732            ),
2733        }
2734    }
2735}
2736
2737/// State maintained about individual collections.
2738#[derive(Debug, Clone)]
2739struct CollectionState<T> {
2740    /// Description with which the collection was created
2741    pub description: CollectionDescription<T>,
2742
2743    /// Accumulation of read capabilities for the collection.
2744    ///
2745    /// This accumulation will always contain `self.implied_capability`, but may
2746    /// also contain capabilities held by others who have read dependencies on
2747    /// this collection.
2748    pub read_capabilities: MutableAntichain<T>,
2749
2750    /// The implicit capability associated with collection creation.  This
2751    /// should never be less than the since of the associated persist
2752    /// collection.
2753    pub implied_capability: Antichain<T>,
2754
2755    /// The policy to use to downgrade `self.implied_capability`.
2756    pub read_policy: ReadPolicy<T>,
2757
2758    /// Storage identifiers on which this collection depends.
2759    pub storage_dependencies: Vec<GlobalId>,
2760
2761    /// Reported write frontier.
2762    pub write_frontier: Antichain<T>,
2763
2764    pub collection_metadata: CollectionMetadata,
2765}
2766
2767impl<T: TimelyTimestamp> CollectionState<T> {
2768    /// Creates a new collection state, with an initial read policy valid from
2769    /// `since`.
2770    pub fn new(
2771        description: CollectionDescription<T>,
2772        since: Antichain<T>,
2773        write_frontier: Antichain<T>,
2774        storage_dependencies: Vec<GlobalId>,
2775        metadata: CollectionMetadata,
2776    ) -> Self {
2777        let mut read_capabilities = MutableAntichain::new();
2778        read_capabilities.update_iter(since.iter().map(|time| (time.clone(), 1)));
2779        Self {
2780            description,
2781            read_capabilities,
2782            implied_capability: since.clone(),
2783            read_policy: ReadPolicy::NoPolicy {
2784                initial_since: since,
2785            },
2786            storage_dependencies,
2787            write_frontier,
2788            collection_metadata: metadata,
2789        }
2790    }
2791
2792    /// Returns whether the collection was dropped.
2793    pub fn is_dropped(&self) -> bool {
2794        self.read_capabilities.is_empty()
2795    }
2796}
2797
2798/// A task that keeps persist handles, downgrades sinces when asked,
2799/// periodically gets recent uppers from them, and updates the shard collection
2800/// state when needed.
2801///
2802/// This shares state with [StorageCollectionsImpl] via `Arcs` and channels.
2803#[derive(Debug)]
2804struct BackgroundTask<T: TimelyTimestamp + Lattice + Codec64> {
2805    config: Arc<Mutex<StorageConfiguration>>,
2806    cmds_tx: mpsc::UnboundedSender<BackgroundCmd<T>>,
2807    cmds_rx: mpsc::UnboundedReceiver<BackgroundCmd<T>>,
2808    holds_rx: mpsc::UnboundedReceiver<(GlobalId, ChangeBatch<T>)>,
2809    finalizable_shards: Arc<ShardIdSet>,
2810    collections: Arc<std::sync::Mutex<BTreeMap<GlobalId, CollectionState<T>>>>,
2811    // So we know what shard ID corresponds to what global ID, which we need
2812    // when re-enqueing futures for determining the next upper update.
2813    shard_by_id: BTreeMap<GlobalId, ShardId>,
2814    since_handles: BTreeMap<GlobalId, SinceHandleWrapper<T>>,
2815    txns_handle: Option<WriteHandle<SourceData, (), T, StorageDiff>>,
2816    txns_shards: BTreeSet<GlobalId>,
2817}
2818
2819#[derive(Debug)]
2820enum BackgroundCmd<T: TimelyTimestamp + Lattice + Codec64> {
2821    Register {
2822        id: GlobalId,
2823        is_in_txns: bool,
2824        write_handle: WriteHandle<SourceData, (), T, StorageDiff>,
2825        since_handle: SinceHandleWrapper<T>,
2826    },
2827    DowngradeSince(Vec<(GlobalId, Antichain<T>)>),
2828    SnapshotStats(
2829        GlobalId,
2830        SnapshotStatsAsOf<T>,
2831        oneshot::Sender<SnapshotStatsRes<T>>,
2832    ),
2833}
2834
2835/// A newtype wrapper to hang a Debug impl off of.
2836pub(crate) struct SnapshotStatsRes<T>(BoxFuture<'static, Result<SnapshotStats, StorageError<T>>>);
2837
2838impl<T> Debug for SnapshotStatsRes<T> {
2839    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2840        f.debug_struct("SnapshotStatsRes").finish_non_exhaustive()
2841    }
2842}
2843
2844impl<T> BackgroundTask<T>
2845where
2846    T: TimelyTimestamp
2847        + Lattice
2848        + Codec64
2849        + From<EpochMillis>
2850        + TimestampManipulation
2851        + Into<mz_repr::Timestamp>
2852        + Sync,
2853{
2854    async fn run(&mut self) {
2855        // Futures that fetch the recent upper from all other shards.
2856        let mut upper_futures: FuturesUnordered<
2857            std::pin::Pin<
2858                Box<
2859                    dyn Future<
2860                            Output = (
2861                                GlobalId,
2862                                WriteHandle<SourceData, (), T, StorageDiff>,
2863                                Antichain<T>,
2864                            ),
2865                        > + Send,
2866                >,
2867            >,
2868        > = FuturesUnordered::new();
2869
2870        let gen_upper_future =
2871            |id, mut handle: WriteHandle<_, _, _, _>, prev_upper: Antichain<T>| {
2872                let fut = async move {
2873                    soft_assert_or_log!(
2874                        !prev_upper.is_empty(),
2875                        "cannot await progress when upper is already empty"
2876                    );
2877                    handle.wait_for_upper_past(&prev_upper).await;
2878                    let new_upper = handle.shared_upper();
2879                    (id, handle, new_upper)
2880                };
2881
2882                fut
2883            };
2884
2885        let mut txns_upper_future = match self.txns_handle.take() {
2886            Some(txns_handle) => {
2887                let upper = txns_handle.upper().clone();
2888                let txns_upper_future =
2889                    gen_upper_future(GlobalId::Transient(1), txns_handle, upper);
2890                txns_upper_future.boxed()
2891            }
2892            None => async { std::future::pending().await }.boxed(),
2893        };
2894
2895        loop {
2896            tokio::select! {
2897                (id, handle, upper) = &mut txns_upper_future => {
2898                    trace!("new upper from txns shard: {:?}", upper);
2899                    let mut uppers = Vec::new();
2900                    for id in self.txns_shards.iter() {
2901                        uppers.push((*id, &upper));
2902                    }
2903                    self.update_write_frontiers(&uppers).await;
2904
2905                    let fut = gen_upper_future(id, handle, upper);
2906                    txns_upper_future = fut.boxed();
2907                }
2908                Some((id, handle, upper)) = upper_futures.next() => {
2909                    if id.is_user() {
2910                        trace!("new upper for collection {id}: {:?}", upper);
2911                    }
2912                    let current_shard = self.shard_by_id.get(&id);
2913                    if let Some(shard_id) = current_shard {
2914                        if shard_id == &handle.shard_id() {
2915                            // Still current, so process the update and enqueue
2916                            // again!
2917                            let uppers = &[(id, &upper)];
2918                            self.update_write_frontiers(uppers).await;
2919                            if !upper.is_empty() {
2920                                let fut = gen_upper_future(id, handle, upper);
2921                                upper_futures.push(fut.boxed());
2922                            }
2923                        } else {
2924                            // Be polite and expire the write handle. This can
2925                            // happen when we get an upper update for a write
2926                            // handle that has since been replaced via Update.
2927                            handle.expire().await;
2928                        }
2929                    }
2930                }
2931                cmd = self.cmds_rx.recv() => {
2932                    let cmd = if let Some(cmd) = cmd {
2933                        cmd
2934                    } else {
2935                        // We're done!
2936                        break;
2937                    };
2938
2939                    match cmd {
2940                        BackgroundCmd::Register{ id, is_in_txns, write_handle, since_handle } => {
2941                            debug!("registering handles for {}", id);
2942                            let previous = self.shard_by_id.insert(id, write_handle.shard_id());
2943                            if previous.is_some() {
2944                                panic!("already registered a WriteHandle for collection {id}");
2945                            }
2946
2947                            let previous = self.since_handles.insert(id, since_handle);
2948                            if previous.is_some() {
2949                                panic!("already registered a SinceHandle for collection {id}");
2950                            }
2951
2952                            if is_in_txns {
2953                                self.txns_shards.insert(id);
2954                            } else {
2955                                let upper = write_handle.upper().clone();
2956                                if !upper.is_empty() {
2957                                    let fut = gen_upper_future(id, write_handle, upper);
2958                                    upper_futures.push(fut.boxed());
2959                                }
2960                            }
2961
2962                        }
2963                        BackgroundCmd::DowngradeSince(cmds) => {
2964                            self.downgrade_sinces(cmds).await;
2965                        }
2966                        BackgroundCmd::SnapshotStats(id, as_of, tx) => {
2967                            // NB: The requested as_of could be arbitrarily far
2968                            // in the future. So, in order to avoid blocking
2969                            // this loop until it's available and the
2970                            // `snapshot_stats` call resolves, instead return
2971                            // the future to the caller and await it there.
2972                            let res = match self.since_handles.get(&id) {
2973                                Some(x) => {
2974                                    let fut: BoxFuture<
2975                                        'static,
2976                                        Result<SnapshotStats, StorageError<T>>,
2977                                    > = match as_of {
2978                                        SnapshotStatsAsOf::Direct(as_of) => {
2979                                            x.snapshot_stats(id, Some(as_of))
2980                                        }
2981                                        SnapshotStatsAsOf::Txns(data_snapshot) => {
2982                                            x.snapshot_stats_from_txn(id, data_snapshot)
2983                                        }
2984                                    };
2985                                    SnapshotStatsRes(fut)
2986                                }
2987                                None => SnapshotStatsRes(Box::pin(futures::future::ready(Err(
2988                                    StorageError::IdentifierMissing(id),
2989                                )))),
2990                            };
2991                            // It's fine if the listener hung up.
2992                            let _ = tx.send(res);
2993                        }
2994                    }
2995                }
2996                Some(holds_changes) = self.holds_rx.recv() => {
2997                    let mut batched_changes = BTreeMap::new();
2998                    batched_changes.insert(holds_changes.0, holds_changes.1);
2999
3000                    while let Ok(mut holds_changes) = self.holds_rx.try_recv() {
3001                        let entry = batched_changes.entry(holds_changes.0);
3002                        entry
3003                            .and_modify(|existing| existing.extend(holds_changes.1.drain()))
3004                            .or_insert_with(|| holds_changes.1);
3005                    }
3006
3007                    let mut collections = self.collections.lock().expect("lock poisoned");
3008
3009                    let user_changes = batched_changes
3010                        .iter()
3011                        .filter(|(id, _c)| id.is_user())
3012                        .map(|(id, c)| {
3013                            (id.clone(), c.clone())
3014                        })
3015                        .collect_vec();
3016
3017                    if !user_changes.is_empty() {
3018                        trace!(?user_changes, "applying holds changes from channel");
3019                    }
3020
3021                    StorageCollectionsImpl::update_read_capabilities_inner(
3022                        &self.cmds_tx,
3023                        &mut collections,
3024                        &mut batched_changes,
3025                    );
3026                }
3027            }
3028        }
3029
3030        warn!("BackgroundTask shutting down");
3031    }
3032
3033    #[instrument(level = "debug")]
3034    async fn update_write_frontiers(&mut self, updates: &[(GlobalId, &Antichain<T>)]) {
3035        let mut read_capability_changes = BTreeMap::default();
3036
3037        let mut self_collections = self.collections.lock().expect("lock poisoned");
3038
3039        for (id, new_upper) in updates.iter() {
3040            let collection = if let Some(c) = self_collections.get_mut(id) {
3041                c
3042            } else {
3043                trace!(
3044                    "Reference to absent collection {id}, due to concurrent removal of that collection"
3045                );
3046                continue;
3047            };
3048
3049            if PartialOrder::less_than(&collection.write_frontier, *new_upper) {
3050                collection.write_frontier.clone_from(new_upper);
3051            }
3052
3053            let mut new_read_capability = collection
3054                .read_policy
3055                .frontier(collection.write_frontier.borrow());
3056
3057            if id.is_user() {
3058                trace!(
3059                    %id,
3060                    implied_capability = ?collection.implied_capability,
3061                    policy = ?collection.read_policy,
3062                    write_frontier = ?collection.write_frontier,
3063                    ?new_read_capability,
3064                    "update_write_frontiers");
3065            }
3066
3067            if PartialOrder::less_equal(&collection.implied_capability, &new_read_capability) {
3068                let mut update = ChangeBatch::new();
3069                update.extend(new_read_capability.iter().map(|time| (time.clone(), 1)));
3070                std::mem::swap(&mut collection.implied_capability, &mut new_read_capability);
3071                update.extend(new_read_capability.iter().map(|time| (time.clone(), -1)));
3072
3073                if !update.is_empty() {
3074                    read_capability_changes.insert(*id, update);
3075                }
3076            }
3077        }
3078
3079        if !read_capability_changes.is_empty() {
3080            StorageCollectionsImpl::update_read_capabilities_inner(
3081                &self.cmds_tx,
3082                &mut self_collections,
3083                &mut read_capability_changes,
3084            );
3085        }
3086    }
3087
3088    async fn downgrade_sinces(&mut self, cmds: Vec<(GlobalId, Antichain<T>)>) {
3089        for (id, new_since) in cmds {
3090            let since_handle = if let Some(c) = self.since_handles.get_mut(&id) {
3091                c
3092            } else {
3093                // This can happen when someone concurrently drops a collection.
3094                trace!("downgrade_sinces: reference to absent collection {id}");
3095                continue;
3096            };
3097
3098            if id.is_user() {
3099                trace!("downgrading since of {} to {:?}", id, new_since);
3100            }
3101
3102            let epoch = since_handle.opaque().clone();
3103            let result = if new_since.is_empty() {
3104                // A shard's since reaching the empty frontier is a prereq for
3105                // being able to finalize a shard, so the final downgrade should
3106                // never be rate-limited.
3107                let res = Some(
3108                    since_handle
3109                        .compare_and_downgrade_since(&epoch, (&epoch, &new_since))
3110                        .await,
3111                );
3112
3113                info!(%id, "removing persist handles because the since advanced to []!");
3114
3115                let _since_handle = self.since_handles.remove(&id).expect("known to exist");
3116                let dropped_shard_id = if let Some(shard_id) = self.shard_by_id.remove(&id) {
3117                    shard_id
3118                } else {
3119                    panic!("missing GlobalId -> ShardId mapping for id {id}");
3120                };
3121
3122                // We're not responsible for writes to tables, so we also don't
3123                // de-register them from the txn system. Whoever is responsible
3124                // will remove them. We only make sure to remove the table from
3125                // our tracking.
3126                self.txns_shards.remove(&id);
3127
3128                if !self
3129                    .config
3130                    .lock()
3131                    .expect("lock poisoned")
3132                    .parameters
3133                    .finalize_shards
3134                {
3135                    info!(
3136                        "not triggering shard finalization due to dropped storage object because enable_storage_shard_finalization parameter is false"
3137                    );
3138                    return;
3139                }
3140
3141                info!(%id, %dropped_shard_id, "enqueing shard finalization due to dropped collection and dropped persist handle");
3142
3143                self.finalizable_shards.lock().insert(dropped_shard_id);
3144
3145                res
3146            } else {
3147                since_handle
3148                    .maybe_compare_and_downgrade_since(&epoch, (&epoch, &new_since))
3149                    .await
3150            };
3151
3152            if let Some(Err(other_epoch)) = result {
3153                mz_ore::halt!("fenced by envd @ {other_epoch:?}. ours = {epoch:?}");
3154            }
3155        }
3156    }
3157}
3158
3159struct FinalizeShardsTaskConfig {
3160    envd_epoch: NonZeroI64,
3161    config: Arc<Mutex<StorageConfiguration>>,
3162    metrics: StorageCollectionsMetrics,
3163    finalizable_shards: Arc<ShardIdSet>,
3164    finalized_shards: Arc<ShardIdSet>,
3165    persist_location: PersistLocation,
3166    persist: Arc<PersistClientCache>,
3167    read_only: bool,
3168}
3169
3170async fn finalize_shards_task<T>(
3171    FinalizeShardsTaskConfig {
3172        envd_epoch,
3173        config,
3174        metrics,
3175        finalizable_shards,
3176        finalized_shards,
3177        persist_location,
3178        persist,
3179        read_only,
3180    }: FinalizeShardsTaskConfig,
3181) where
3182    T: TimelyTimestamp + TotalOrder + Lattice + Codec64 + Sync,
3183{
3184    if read_only {
3185        info!("disabling shard finalization in read only mode");
3186        return;
3187    }
3188
3189    let mut interval = tokio::time::interval(Duration::from_secs(5));
3190    interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
3191    loop {
3192        interval.tick().await;
3193
3194        if !config
3195            .lock()
3196            .expect("lock poisoned")
3197            .parameters
3198            .finalize_shards
3199        {
3200            debug!(
3201                "not triggering shard finalization due to dropped storage object because enable_storage_shard_finalization parameter is false"
3202            );
3203            continue;
3204        }
3205
3206        let current_finalizable_shards = {
3207            // We hold the lock for as short as possible and pull our cloned set
3208            // of shards.
3209            finalizable_shards.lock().iter().cloned().collect_vec()
3210        };
3211
3212        if current_finalizable_shards.is_empty() {
3213            debug!("no shards to finalize");
3214            continue;
3215        }
3216
3217        debug!(?current_finalizable_shards, "attempting to finalize shards");
3218
3219        // Open a persist client to delete unused shards.
3220        let persist_client = persist.open(persist_location.clone()).await.unwrap();
3221
3222        let metrics = &metrics;
3223        let finalizable_shards = &finalizable_shards;
3224        let finalized_shards = &finalized_shards;
3225        let persist_client = &persist_client;
3226        let diagnostics = &Diagnostics::from_purpose("finalizing shards");
3227
3228        let force_downgrade_since = STORAGE_DOWNGRADE_SINCE_DURING_FINALIZATION
3229            .get(config.lock().expect("lock poisoned").config_set());
3230
3231        let epoch = &PersistEpoch::from(envd_epoch);
3232
3233        futures::stream::iter(current_finalizable_shards.clone())
3234            .map(|shard_id| async move {
3235                let persist_client = persist_client.clone();
3236                let diagnostics = diagnostics.clone();
3237                let epoch = epoch.clone();
3238
3239                metrics.finalization_started.inc();
3240
3241                let is_finalized = persist_client
3242                    .is_finalized::<SourceData, (), T, StorageDiff>(shard_id, diagnostics)
3243                    .await
3244                    .expect("invalid persist usage");
3245
3246                if is_finalized {
3247                    debug!(%shard_id, "shard is already finalized!");
3248                    Some(shard_id)
3249                } else {
3250                    debug!(%shard_id, "finalizing shard");
3251                        let finalize = || async move {
3252                        // TODO: thread the global ID into the shard finalization WAL
3253                        let diagnostics = Diagnostics::from_purpose("finalizing shards");
3254
3255                        let schemas = persist_client.latest_schema::<SourceData, (), T, StorageDiff>(shard_id, diagnostics.clone()).await.expect("codecs have not changed");
3256                        let (key_schema, val_schema) = match schemas {
3257                            Some((_, key_schema, val_schema)) => (key_schema, val_schema),
3258                            None => (RelationDesc::empty(), UnitSchema),
3259                        };
3260
3261                        let empty_batch: Vec<((SourceData, ()), T, StorageDiff)> = vec![];
3262                        let mut write_handle: WriteHandle<SourceData, (), T, StorageDiff> =
3263                            persist_client
3264                                .open_writer(
3265                                    shard_id,
3266                                    Arc::new(key_schema),
3267                                    Arc::new(val_schema),
3268                                    diagnostics,
3269                                )
3270                                .await
3271                                .expect("invalid persist usage");
3272
3273                        let upper = write_handle.upper();
3274
3275                        if !upper.is_empty() {
3276                            let append = write_handle
3277                                .append(empty_batch, upper.clone(), Antichain::new())
3278                                .await?;
3279
3280                            if let Err(e) = append {
3281                                warn!(%shard_id, "tried to finalize a shard with an advancing upper: {e:?}");
3282                                return Ok(());
3283                            }
3284                        }
3285                        write_handle.expire().await;
3286
3287                        if force_downgrade_since {
3288                            let mut since_handle: SinceHandle<
3289                                SourceData,
3290                                (),
3291                                T,
3292                                StorageDiff,
3293                                PersistEpoch,
3294                            > = persist_client
3295                                .open_critical_since(
3296                                    shard_id,
3297                                    PersistClient::CONTROLLER_CRITICAL_SINCE,
3298                                    Diagnostics::from_purpose("finalizing shards"),
3299                                )
3300                                .await
3301                                .expect("invalid persist usage");
3302                            let handle_epoch = since_handle.opaque().clone();
3303                            let our_epoch = epoch.clone();
3304                            let epoch = if our_epoch.0 > handle_epoch.0 {
3305                                // We're newer, but it's fine to use the
3306                                // handle's old epoch to try and downgrade.
3307                                handle_epoch
3308                            } else {
3309                                // Good luck, buddy! The downgrade below will
3310                                // not succeed. There's a process with a newer
3311                                // epoch out there and someone at some juncture
3312                                // will fence out this process.
3313                                our_epoch
3314                            };
3315                            let new_since = Antichain::new();
3316                            let downgrade = since_handle
3317                                .compare_and_downgrade_since(&epoch, (&epoch, &new_since))
3318                                .await;
3319                            if let Err(e) = downgrade {
3320                                warn!(
3321                                    "tried to finalize a shard with an advancing epoch: {e:?}"
3322                                );
3323                                return Ok(());
3324                            }
3325                            // Not available now, so finalization is broken.
3326                            // since_handle.expire().await;
3327                        }
3328
3329                        persist_client
3330                            .finalize_shard::<SourceData, (), T, StorageDiff>(
3331                                shard_id,
3332                                Diagnostics::from_purpose("finalizing shards"),
3333                            )
3334                            .await
3335                    };
3336
3337                    match finalize().await {
3338                        Err(e) => {
3339                            // Rather than error, just leave this shard as
3340                            // one to finalize later.
3341                            warn!("error during finalization of shard {shard_id}: {e:?}");
3342                            None
3343                        }
3344                        Ok(()) => {
3345                            debug!(%shard_id, "finalize success!");
3346                            Some(shard_id)
3347                        }
3348                    }
3349                }
3350            })
3351            // Poll each future for each collection concurrently, maximum of 10
3352            // at a time.
3353            // TODO(benesch): the concurrency here should be configurable
3354            // via LaunchDarkly.
3355            .buffer_unordered(10)
3356            // HERE BE DRAGONS: see warning on other uses of buffer_unordered.
3357            // The closure passed to `for_each` must remain fast or we risk
3358            // starving the finalization futures of calls to `poll`.
3359            .for_each(|shard_id| async move {
3360                match shard_id {
3361                    None => metrics.finalization_failed.inc(),
3362                    Some(shard_id) => {
3363                        // We make successfully finalized shards available for
3364                        // removal from the finalization WAL one by one, so that
3365                        // a handful of stuck shards don't prevent us from
3366                        // removing the shards that have made progress. The
3367                        // overhead of repeatedly acquiring and releasing the
3368                        // locks is negligible.
3369                        {
3370                            let mut finalizable_shards = finalizable_shards.lock();
3371                            let mut finalized_shards = finalized_shards.lock();
3372                            finalizable_shards.remove(&shard_id);
3373                            finalized_shards.insert(shard_id);
3374                        }
3375
3376                        metrics.finalization_succeeded.inc();
3377                    }
3378                }
3379            })
3380            .await;
3381
3382        debug!("done finalizing shards");
3383    }
3384}
3385
3386#[derive(Debug)]
3387pub(crate) enum SnapshotStatsAsOf<T: TimelyTimestamp + Lattice + Codec64> {
3388    /// Stats for a shard with an "eager" upper (one that continually advances
3389    /// as time passes, even if no writes are coming in).
3390    Direct(Antichain<T>),
3391    /// Stats for a shard with a "lazy" upper (one that only physically advances
3392    /// in response to writes).
3393    Txns(DataSnapshot<T>),
3394}
3395
3396#[cfg(test)]
3397mod tests {
3398    use std::str::FromStr;
3399    use std::sync::Arc;
3400
3401    use mz_build_info::DUMMY_BUILD_INFO;
3402    use mz_dyncfg::ConfigSet;
3403    use mz_ore::assert_err;
3404    use mz_ore::metrics::{MetricsRegistry, UIntGauge};
3405    use mz_ore::now::SYSTEM_TIME;
3406    use mz_ore::url::SensitiveUrl;
3407    use mz_persist_client::cache::PersistClientCache;
3408    use mz_persist_client::cfg::PersistConfig;
3409    use mz_persist_client::rpc::PubSubClientConnection;
3410    use mz_persist_client::{Diagnostics, PersistClient, PersistLocation, ShardId};
3411    use mz_persist_types::codec_impls::UnitSchema;
3412    use mz_repr::{RelationDesc, Row};
3413    use mz_secrets::InMemorySecretsController;
3414
3415    use super::*;
3416
3417    #[mz_ore::test(tokio::test)]
3418    #[cfg_attr(miri, ignore)] // unsupported operation: integer-to-pointer casts and `ptr::from_exposed_addr`
3419    async fn test_snapshot_stats(&self) {
3420        let persist_location = PersistLocation {
3421            blob_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
3422            consensus_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
3423        };
3424        let persist_client = PersistClientCache::new(
3425            PersistConfig::new_default_configs(&DUMMY_BUILD_INFO, SYSTEM_TIME.clone()),
3426            &MetricsRegistry::new(),
3427            |_, _| PubSubClientConnection::noop(),
3428        );
3429        let persist_client = Arc::new(persist_client);
3430
3431        let (cmds_tx, mut background_task) =
3432            BackgroundTask::new_for_test(persist_location.clone(), Arc::clone(&persist_client));
3433        let background_task =
3434            mz_ore::task::spawn(|| "storage_collections::background_task", async move {
3435                background_task.run().await
3436            });
3437
3438        let persist = persist_client.open(persist_location).await.unwrap();
3439
3440        let shard_id = ShardId::new();
3441        let since_handle = persist
3442            .open_critical_since(
3443                shard_id,
3444                PersistClient::CONTROLLER_CRITICAL_SINCE,
3445                Diagnostics::for_tests(),
3446            )
3447            .await
3448            .unwrap();
3449        let write_handle = persist
3450            .open_writer::<SourceData, (), mz_repr::Timestamp, StorageDiff>(
3451                shard_id,
3452                Arc::new(RelationDesc::empty()),
3453                Arc::new(UnitSchema),
3454                Diagnostics::for_tests(),
3455            )
3456            .await
3457            .unwrap();
3458
3459        cmds_tx
3460            .send(BackgroundCmd::Register {
3461                id: GlobalId::User(1),
3462                is_in_txns: false,
3463                since_handle: SinceHandleWrapper::Critical(since_handle),
3464                write_handle,
3465            })
3466            .unwrap();
3467
3468        let mut write_handle = persist
3469            .open_writer::<SourceData, (), mz_repr::Timestamp, StorageDiff>(
3470                shard_id,
3471                Arc::new(RelationDesc::empty()),
3472                Arc::new(UnitSchema),
3473                Diagnostics::for_tests(),
3474            )
3475            .await
3476            .unwrap();
3477
3478        // No stats for unknown GlobalId.
3479        let stats =
3480            snapshot_stats(&cmds_tx, GlobalId::User(2), Antichain::from_elem(0.into())).await;
3481        assert_err!(stats);
3482
3483        // Stats don't resolve for as_of past the upper.
3484        let stats_fut = snapshot_stats(&cmds_tx, GlobalId::User(1), Antichain::from_elem(1.into()));
3485        assert_none!(stats_fut.now_or_never());
3486
3487        // // Call it again because now_or_never consumed our future and it's not clone-able.
3488        let stats_ts1_fut =
3489            snapshot_stats(&cmds_tx, GlobalId::User(1), Antichain::from_elem(1.into()));
3490
3491        // Write some data.
3492        let data = (
3493            (SourceData(Ok(Row::default())), ()),
3494            mz_repr::Timestamp::from(0),
3495            1i64,
3496        );
3497        let () = write_handle
3498            .compare_and_append(
3499                &[data],
3500                Antichain::from_elem(0.into()),
3501                Antichain::from_elem(1.into()),
3502            )
3503            .await
3504            .unwrap()
3505            .unwrap();
3506
3507        // Verify that we can resolve stats for ts 0 while the ts 1 stats call is outstanding.
3508        let stats = snapshot_stats(&cmds_tx, GlobalId::User(1), Antichain::from_elem(0.into()))
3509            .await
3510            .unwrap();
3511        assert_eq!(stats.num_updates, 1);
3512
3513        // Write more data and unblock the ts 1 call
3514        let data = (
3515            (SourceData(Ok(Row::default())), ()),
3516            mz_repr::Timestamp::from(1),
3517            1i64,
3518        );
3519        let () = write_handle
3520            .compare_and_append(
3521                &[data],
3522                Antichain::from_elem(1.into()),
3523                Antichain::from_elem(2.into()),
3524            )
3525            .await
3526            .unwrap()
3527            .unwrap();
3528
3529        let stats = stats_ts1_fut.await.unwrap();
3530        assert_eq!(stats.num_updates, 2);
3531
3532        // Make sure it runs until at least here.
3533        drop(background_task);
3534    }
3535
3536    async fn snapshot_stats<T: TimelyTimestamp + Lattice + Codec64>(
3537        cmds_tx: &mpsc::UnboundedSender<BackgroundCmd<T>>,
3538        id: GlobalId,
3539        as_of: Antichain<T>,
3540    ) -> Result<SnapshotStats, StorageError<T>> {
3541        let (tx, rx) = oneshot::channel();
3542        cmds_tx
3543            .send(BackgroundCmd::SnapshotStats(
3544                id,
3545                SnapshotStatsAsOf::Direct(as_of),
3546                tx,
3547            ))
3548            .unwrap();
3549        let res = rx.await.expect("BackgroundTask should be live").0;
3550
3551        res.await
3552    }
3553
3554    impl<T: TimelyTimestamp + Lattice + Codec64> BackgroundTask<T> {
3555        fn new_for_test(
3556            _persist_location: PersistLocation,
3557            _persist_client: Arc<PersistClientCache>,
3558        ) -> (mpsc::UnboundedSender<BackgroundCmd<T>>, Self) {
3559            let (cmds_tx, cmds_rx) = mpsc::unbounded_channel();
3560            let (_holds_tx, holds_rx) = mpsc::unbounded_channel();
3561            let connection_context =
3562                ConnectionContext::for_tests(Arc::new(InMemorySecretsController::new()));
3563
3564            let task = Self {
3565                config: Arc::new(Mutex::new(StorageConfiguration::new(
3566                    connection_context,
3567                    ConfigSet::default(),
3568                ))),
3569                cmds_tx: cmds_tx.clone(),
3570                cmds_rx,
3571                holds_rx,
3572                finalizable_shards: Arc::new(ShardIdSet::new(
3573                    UIntGauge::new("finalizable_shards", "dummy gauge for tests").unwrap(),
3574                )),
3575                collections: Arc::new(Mutex::new(BTreeMap::new())),
3576                shard_by_id: BTreeMap::new(),
3577                since_handles: BTreeMap::new(),
3578                txns_handle: None,
3579                txns_shards: BTreeSet::new(),
3580            };
3581
3582            (cmds_tx, task)
3583        }
3584    }
3585}