mz_storage_client/
storage_collections.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! An abstraction for dealing with storage collections.
11
12use std::cmp::Reverse;
13use std::collections::{BTreeMap, BTreeSet};
14use std::fmt::Debug;
15use std::num::NonZeroI64;
16use std::sync::{Arc, Mutex};
17use std::time::Duration;
18
19use async_trait::async_trait;
20use differential_dataflow::lattice::Lattice;
21use futures::future::BoxFuture;
22use futures::stream::{BoxStream, FuturesUnordered};
23use futures::{Future, FutureExt, StreamExt};
24use itertools::Itertools;
25
26use mz_ore::collections::CollectionExt;
27use mz_ore::metrics::MetricsRegistry;
28use mz_ore::now::{EpochMillis, NowFn};
29use mz_ore::task::AbortOnDropHandle;
30use mz_ore::{assert_none, instrument, soft_assert_or_log};
31use mz_persist_client::cache::PersistClientCache;
32use mz_persist_client::cfg::USE_CRITICAL_SINCE_SNAPSHOT;
33use mz_persist_client::critical::SinceHandle;
34use mz_persist_client::read::{Cursor, ReadHandle};
35use mz_persist_client::schema::CaESchema;
36use mz_persist_client::stats::{SnapshotPartsStats, SnapshotStats};
37use mz_persist_client::write::WriteHandle;
38use mz_persist_client::{Diagnostics, PersistClient, PersistLocation, ShardId};
39use mz_persist_types::Codec64;
40use mz_persist_types::codec_impls::UnitSchema;
41use mz_persist_types::txn::TxnsCodec;
42use mz_repr::{GlobalId, RelationDesc, RelationVersion, Row, TimestampManipulation};
43use mz_storage_types::StorageDiff;
44use mz_storage_types::configuration::StorageConfiguration;
45use mz_storage_types::connections::ConnectionContext;
46use mz_storage_types::connections::inline::InlinedConnection;
47use mz_storage_types::controller::{CollectionMetadata, StorageError, TxnsCodecRow};
48use mz_storage_types::dyncfgs::STORAGE_DOWNGRADE_SINCE_DURING_FINALIZATION;
49use mz_storage_types::errors::CollectionMissing;
50use mz_storage_types::parameters::StorageParameters;
51use mz_storage_types::read_holds::ReadHold;
52use mz_storage_types::read_policy::ReadPolicy;
53use mz_storage_types::sources::{
54    GenericSourceConnection, SourceData, SourceDesc, SourceEnvelope, SourceExport,
55    SourceExportDataConfig, Timeline,
56};
57use mz_storage_types::time_dependence::{TimeDependence, TimeDependenceError};
58use mz_txn_wal::metrics::Metrics as TxnMetrics;
59use mz_txn_wal::txn_read::{DataSnapshot, TxnsRead};
60use mz_txn_wal::txns::TxnsHandle;
61use timely::PartialOrder;
62use timely::order::TotalOrder;
63use timely::progress::frontier::MutableAntichain;
64use timely::progress::{Antichain, ChangeBatch, Timestamp as TimelyTimestamp};
65use tokio::sync::{mpsc, oneshot};
66use tokio::time::MissedTickBehavior;
67use tracing::{debug, info, trace, warn};
68
69use crate::client::TimestamplessUpdateBuilder;
70use crate::controller::{
71    CollectionDescription, DataSource, PersistEpoch, StorageMetadata, StorageTxn,
72};
73use crate::storage_collections::metrics::{ShardIdSet, StorageCollectionsMetrics};
74
75mod metrics;
76
77/// An abstraction for keeping track of storage collections and managing access
78/// to them.
79///
80/// Responsibilities:
81///
82/// - Keeps a critical persist handle for holding the since of collections
83///   where it need to be.
84///
85/// - Drives the since forward based on the upper of a collection and a
86///   [ReadPolicy].
87///
88/// - Hands out [ReadHolds](ReadHold) that prevent a collection's since from
89/// advancing while it needs to be read at a specific time.
90#[async_trait]
91pub trait StorageCollections: Debug + Sync {
92    type Timestamp: TimelyTimestamp;
93
94    /// On boot, reconcile this [StorageCollections] with outside state. We get
95    /// a [StorageTxn] where we can record any durable state that we need.
96    ///
97    /// We get `init_ids`, which tells us about all collections that currently
98    /// exist, so that we can record durable state for those that _we_ don't
99    /// know yet about.
100    async fn initialize_state(
101        &self,
102        txn: &mut (dyn StorageTxn<Self::Timestamp> + Send),
103        init_ids: BTreeSet<GlobalId>,
104    ) -> Result<(), StorageError<Self::Timestamp>>;
105
106    /// Update storage configuration with new parameters.
107    fn update_parameters(&self, config_params: StorageParameters);
108
109    /// Returns the [CollectionMetadata] of the collection identified by `id`.
110    fn collection_metadata(&self, id: GlobalId) -> Result<CollectionMetadata, CollectionMissing>;
111
112    /// Acquire an iterator over [CollectionMetadata] for all active
113    /// collections.
114    ///
115    /// A collection is "active" when it has a non empty frontier of read
116    /// capabilties.
117    fn active_collection_metadatas(&self) -> Vec<(GlobalId, CollectionMetadata)>;
118
119    /// Returns the frontiers of the identified collection.
120    fn collection_frontiers(
121        &self,
122        id: GlobalId,
123    ) -> Result<CollectionFrontiers<Self::Timestamp>, CollectionMissing> {
124        let frontiers = self
125            .collections_frontiers(vec![id])?
126            .expect_element(|| "known to exist");
127
128        Ok(frontiers)
129    }
130
131    /// Atomically gets and returns the frontiers of all the identified
132    /// collections.
133    fn collections_frontiers(
134        &self,
135        id: Vec<GlobalId>,
136    ) -> Result<Vec<CollectionFrontiers<Self::Timestamp>>, CollectionMissing>;
137
138    /// Atomically gets and returns the frontiers of all active collections.
139    ///
140    /// A collection is "active" when it has a non-empty frontier of read
141    /// capabilities.
142    fn active_collection_frontiers(&self) -> Vec<CollectionFrontiers<Self::Timestamp>>;
143
144    /// Checks whether a collection exists under the given `GlobalId`. Returns
145    /// an error if the collection does not exist.
146    fn check_exists(&self, id: GlobalId) -> Result<(), StorageError<Self::Timestamp>>;
147
148    /// Returns aggregate statistics about the contents of the local input named
149    /// `id` at `as_of`.
150    async fn snapshot_stats(
151        &self,
152        id: GlobalId,
153        as_of: Antichain<Self::Timestamp>,
154    ) -> Result<SnapshotStats, StorageError<Self::Timestamp>>;
155
156    /// Returns aggregate statistics about the contents of the local input named
157    /// `id` at `as_of`.
158    ///
159    /// Note that this async function itself returns a future. We may
160    /// need to block on the stats being available, but don't want to hold a reference
161    /// to the controller for too long... so the outer future holds a reference to the
162    /// controller but returns quickly, and the inner future is slow but does not
163    /// reference the controller.
164    async fn snapshot_parts_stats(
165        &self,
166        id: GlobalId,
167        as_of: Antichain<Self::Timestamp>,
168    ) -> BoxFuture<'static, Result<SnapshotPartsStats, StorageError<Self::Timestamp>>>;
169
170    /// Returns a snapshot of the contents of collection `id` at `as_of`.
171    fn snapshot(
172        &self,
173        id: GlobalId,
174        as_of: Self::Timestamp,
175    ) -> BoxFuture<'static, Result<Vec<(Row, StorageDiff)>, StorageError<Self::Timestamp>>>;
176
177    /// Returns a snapshot of the contents of collection `id` at the largest
178    /// readable `as_of`.
179    async fn snapshot_latest(
180        &self,
181        id: GlobalId,
182    ) -> Result<Vec<Row>, StorageError<Self::Timestamp>>;
183
184    /// Returns a snapshot of the contents of collection `id` at `as_of`.
185    fn snapshot_cursor(
186        &self,
187        id: GlobalId,
188        as_of: Self::Timestamp,
189    ) -> BoxFuture<'static, Result<SnapshotCursor<Self::Timestamp>, StorageError<Self::Timestamp>>>
190    where
191        Self::Timestamp: Codec64 + TimelyTimestamp + Lattice;
192
193    /// Generates a snapshot of the contents of collection `id` at `as_of` and
194    /// streams out all of the updates in bounded memory.
195    ///
196    /// The output is __not__ consolidated.
197    fn snapshot_and_stream(
198        &self,
199        id: GlobalId,
200        as_of: Self::Timestamp,
201    ) -> BoxFuture<
202        'static,
203        Result<
204            BoxStream<'static, (SourceData, Self::Timestamp, StorageDiff)>,
205            StorageError<Self::Timestamp>,
206        >,
207    >;
208
209    /// Create a [`TimestamplessUpdateBuilder`] that can be used to stage
210    /// updates for the provided [`GlobalId`].
211    fn create_update_builder(
212        &self,
213        id: GlobalId,
214    ) -> BoxFuture<
215        'static,
216        Result<
217            TimestamplessUpdateBuilder<SourceData, (), Self::Timestamp, StorageDiff>,
218            StorageError<Self::Timestamp>,
219        >,
220    >
221    where
222        Self::Timestamp: Lattice + Codec64;
223
224    /// Update the given [`StorageTxn`] with the appropriate metadata given the
225    /// IDs to add and drop.
226    ///
227    /// The data modified in the `StorageTxn` must be made available in all
228    /// subsequent calls that require [`StorageMetadata`] as a parameter.
229    async fn prepare_state(
230        &self,
231        txn: &mut (dyn StorageTxn<Self::Timestamp> + Send),
232        ids_to_add: BTreeSet<GlobalId>,
233        ids_to_drop: BTreeSet<GlobalId>,
234        ids_to_register: BTreeMap<GlobalId, ShardId>,
235    ) -> Result<(), StorageError<Self::Timestamp>>;
236
237    /// Create the collections described by the individual
238    /// [CollectionDescriptions](CollectionDescription).
239    ///
240    /// Each command carries the source id, the source description, and any
241    /// associated metadata needed to ingest the particular source.
242    ///
243    /// This command installs collection state for the indicated sources, and
244    /// they are now valid to use in queries at times beyond the initial `since`
245    /// frontiers. Each collection also acquires a read capability at this
246    /// frontier, which will need to be repeatedly downgraded with
247    /// `allow_compaction()` to permit compaction.
248    ///
249    /// This method is NOT idempotent; It can fail between processing of
250    /// different collections and leave the [StorageCollections] in an
251    /// inconsistent state. It is almost always wrong to do anything but abort
252    /// the process on `Err`.
253    ///
254    /// The `register_ts` is used as the initial timestamp that tables are
255    /// available for reads. (We might later give non-tables the same treatment,
256    /// but hold off on that initially.) Callers must provide a Some if any of
257    /// the collections is a table. A None may be given if none of the
258    /// collections are a table (i.e. all materialized views, sources, etc).
259    ///
260    /// `migrated_storage_collections` is a set of migrated storage collections to be excluded
261    /// from the txn-wal sub-system.
262    async fn create_collections_for_bootstrap(
263        &self,
264        storage_metadata: &StorageMetadata,
265        register_ts: Option<Self::Timestamp>,
266        collections: Vec<(GlobalId, CollectionDescription<Self::Timestamp>)>,
267        migrated_storage_collections: &BTreeSet<GlobalId>,
268    ) -> Result<(), StorageError<Self::Timestamp>>;
269
270    /// Alters the identified ingestion to use the provided [`SourceDesc`].
271    ///
272    /// NOTE: Ideally, [StorageCollections] would not care about these, but we
273    /// have to learn about changes such that when new subsources are created we
274    /// can correctly determine a since based on its depenencies' sinces. This
275    /// is really only relevant because newly created subsources depend on the
276    /// remap shard, and we can't just have them start at since 0.
277    async fn alter_ingestion_source_desc(
278        &self,
279        ingestion_id: GlobalId,
280        source_desc: SourceDesc,
281    ) -> Result<(), StorageError<Self::Timestamp>>;
282
283    /// Alters the data config for the specified source exports of the specified ingestions.
284    async fn alter_ingestion_export_data_configs(
285        &self,
286        source_exports: BTreeMap<GlobalId, SourceExportDataConfig>,
287    ) -> Result<(), StorageError<Self::Timestamp>>;
288
289    /// Alters each identified collection to use the correlated
290    /// [`GenericSourceConnection`].
291    ///
292    /// See NOTE on [StorageCollections::alter_ingestion_source_desc].
293    async fn alter_ingestion_connections(
294        &self,
295        source_connections: BTreeMap<GlobalId, GenericSourceConnection<InlinedConnection>>,
296    ) -> Result<(), StorageError<Self::Timestamp>>;
297
298    /// Updates the [`RelationDesc`] for the specified table.
299    async fn alter_table_desc(
300        &self,
301        existing_collection: GlobalId,
302        new_collection: GlobalId,
303        new_desc: RelationDesc,
304        expected_version: RelationVersion,
305    ) -> Result<(), StorageError<Self::Timestamp>>;
306
307    /// Drops the read capability for the sources and allows their resources to
308    /// be reclaimed.
309    ///
310    /// TODO(jkosh44): This method does not validate the provided identifiers.
311    /// Currently when the controller starts/restarts it has no durable state.
312    /// That means that it has no way of remembering any past commands sent. In
313    /// the future we plan on persisting state for the controller so that it is
314    /// aware of past commands. Therefore this method is for dropping sources
315    /// that we know to have been previously created, but have been forgotten by
316    /// the controller due to a restart. Once command history becomes durable we
317    /// can remove this method and use the normal `drop_sources`.
318    fn drop_collections_unvalidated(
319        &self,
320        storage_metadata: &StorageMetadata,
321        identifiers: Vec<GlobalId>,
322    );
323
324    /// Assigns a read policy to specific identifiers.
325    ///
326    /// The policies are assigned in the order presented, and repeated
327    /// identifiers should conclude with the last policy. Changing a policy will
328    /// immediately downgrade the read capability if appropriate, but it will
329    /// not "recover" the read capability if the prior capability is already
330    /// ahead of it.
331    ///
332    /// This [StorageCollections] may include its own overrides on these
333    /// policies.
334    ///
335    /// Identifiers not present in `policies` retain their existing read
336    /// policies.
337    fn set_read_policies(&self, policies: Vec<(GlobalId, ReadPolicy<Self::Timestamp>)>);
338
339    /// Acquires and returns the earliest possible read holds for the specified
340    /// collections.
341    fn acquire_read_holds(
342        &self,
343        desired_holds: Vec<GlobalId>,
344    ) -> Result<Vec<ReadHold<Self::Timestamp>>, CollectionMissing>;
345
346    /// Get the time dependence for a storage collection. Returns no value if unknown or if
347    /// the object isn't managed by storage.
348    fn determine_time_dependence(
349        &self,
350        id: GlobalId,
351    ) -> Result<Option<TimeDependence>, TimeDependenceError>;
352}
353
354/// A cursor over a snapshot, allowing us to read just part of a snapshot in its
355/// consolidated form.
356pub struct SnapshotCursor<T: Codec64 + TimelyTimestamp + Lattice> {
357    // We allocate a temporary read handle for each snapshot, and that handle needs to live at
358    // least as long as the cursor itself, which holds part leases. Bundling them together!
359    pub _read_handle: ReadHandle<SourceData, (), T, StorageDiff>,
360    pub cursor: Cursor<SourceData, (), T, StorageDiff>,
361}
362
363impl<T: Codec64 + TimelyTimestamp + Lattice + Sync> SnapshotCursor<T> {
364    pub async fn next(
365        &mut self,
366    ) -> Option<
367        impl Iterator<
368            Item = (
369                (Result<SourceData, String>, Result<(), String>),
370                T,
371                StorageDiff,
372            ),
373        > + Sized
374        + '_,
375    > {
376        self.cursor.next().await
377    }
378}
379
380/// Frontiers of the collection identified by `id`.
381#[derive(Debug)]
382pub struct CollectionFrontiers<T> {
383    /// The [GlobalId] of the collection that these frontiers belong to.
384    pub id: GlobalId,
385
386    /// The upper/write frontier of the collection.
387    pub write_frontier: Antichain<T>,
388
389    /// The since frontier that is implied by the collection's existence,
390    /// disregarding any read holds.
391    ///
392    /// Concretely, it is the since frontier that is implied by the combination
393    /// of the `write_frontier` and a [ReadPolicy]. The implied capability is
394    /// derived from the write frontier using the [ReadPolicy].
395    pub implied_capability: Antichain<T>,
396
397    /// The frontier of all oustanding [ReadHolds](ReadHold). This includes the
398    /// implied capability.
399    pub read_capabilities: Antichain<T>,
400}
401
402/// Implementation of [StorageCollections] that is shallow-cloneable and uses a
403/// background task for doing work concurrently, in the background.
404#[derive(Debug, Clone)]
405pub struct StorageCollectionsImpl<
406    T: TimelyTimestamp + Lattice + Codec64 + From<EpochMillis> + TimestampManipulation,
407> {
408    /// The fencing token for this instance of [StorageCollections], and really
409    /// all of the controllers and Coordinator.
410    envd_epoch: NonZeroI64,
411
412    /// Whether or not this [StorageCollections] is in read-only mode.
413    ///
414    /// When in read-only mode, we are not allowed to affect changes to external
415    /// systems, including, for example, acquiring and downgrading critical
416    /// [SinceHandles](SinceHandle)
417    read_only: bool,
418
419    /// The set of [ShardIds](ShardId) that we have to finalize. These will have
420    /// been persisted by the caller of [StorageCollections::prepare_state].
421    finalizable_shards: Arc<ShardIdSet>,
422
423    /// The set of [ShardIds](ShardId) that we have finalized. We keep track of
424    /// shards here until we are given a chance to let our callers know that
425    /// these have been finalized, for example via
426    /// [StorageCollections::prepare_state].
427    finalized_shards: Arc<ShardIdSet>,
428
429    /// Collections maintained by this [StorageCollections].
430    collections: Arc<std::sync::Mutex<BTreeMap<GlobalId, CollectionState<T>>>>,
431
432    /// A shared TxnsCache running in a task and communicated with over a channel.
433    txns_read: TxnsRead<T>,
434
435    /// Storage configuration parameters.
436    config: Arc<Mutex<StorageConfiguration>>,
437
438    /// The upper of the txn shard as it was when we booted. We forward the
439    /// upper of created/registered tables to make sure that their uppers are at
440    /// least not less than the initially known txn upper.
441    ///
442    /// NOTE: This works around a quirk in how the adapter chooses the as_of of
443    /// existing indexes when bootstrapping, where tables that have an upper
444    /// that is less than the initially known txn upper can lead to indexes that
445    /// cannot hydrate in read-only mode.
446    initial_txn_upper: Antichain<T>,
447
448    /// The persist location where all storage collections are being written to
449    persist_location: PersistLocation,
450
451    /// A persist client used to write to storage collections
452    persist: Arc<PersistClientCache>,
453
454    /// For sending commands to our internal task.
455    cmd_tx: mpsc::UnboundedSender<BackgroundCmd<T>>,
456
457    /// For sending updates about read holds to our internal task.
458    holds_tx: mpsc::UnboundedSender<(GlobalId, ChangeBatch<T>)>,
459
460    /// Handles to tasks we own, making sure they're dropped when we are.
461    _background_task: Arc<AbortOnDropHandle<()>>,
462    _finalize_shards_task: Arc<AbortOnDropHandle<()>>,
463}
464
465// Supporting methods for implementing [StorageCollections].
466//
467// Almost all internal methods that are the backing implementation for a trait
468// method have the `_inner` suffix.
469//
470// We follow a pattern where `_inner` methods get a mutable reference to the
471// shared collections state, and it's the public-facing method that locks the
472// state for the duration of its invocation. This allows calling other `_inner`
473// methods from within `_inner` methods.
474impl<T> StorageCollectionsImpl<T>
475where
476    T: TimelyTimestamp
477        + Lattice
478        + Codec64
479        + From<EpochMillis>
480        + TimestampManipulation
481        + Into<mz_repr::Timestamp>
482        + Sync,
483{
484    /// Creates and returns a new [StorageCollections].
485    ///
486    /// Note that when creating a new [StorageCollections], you must also
487    /// reconcile it with the previous state using
488    /// [StorageCollections::initialize_state],
489    /// [StorageCollections::prepare_state], and
490    /// [StorageCollections::create_collections_for_bootstrap].
491    pub async fn new(
492        persist_location: PersistLocation,
493        persist_clients: Arc<PersistClientCache>,
494        metrics_registry: &MetricsRegistry,
495        _now: NowFn,
496        txns_metrics: Arc<TxnMetrics>,
497        envd_epoch: NonZeroI64,
498        read_only: bool,
499        connection_context: ConnectionContext,
500        txn: &dyn StorageTxn<T>,
501    ) -> Self {
502        let metrics = StorageCollectionsMetrics::register_into(metrics_registry);
503
504        // This value must be already installed because we must ensure it's
505        // durably recorded before it is used, otherwise we risk leaking persist
506        // state.
507        let txns_id = txn
508            .get_txn_wal_shard()
509            .expect("must call prepare initialization before creating StorageCollections");
510
511        let txns_client = persist_clients
512            .open(persist_location.clone())
513            .await
514            .expect("location should be valid");
515
516        // We have to initialize, so that TxnsRead::start() below does not
517        // block.
518        let _txns_handle: TxnsHandle<SourceData, (), T, StorageDiff, PersistEpoch, TxnsCodecRow> =
519            TxnsHandle::open(
520                T::minimum(),
521                txns_client.clone(),
522                txns_client.dyncfgs().clone(),
523                Arc::clone(&txns_metrics),
524                txns_id,
525            )
526            .await;
527
528        // For handing to the background task, for listening to upper updates.
529        let (txns_key_schema, txns_val_schema) = TxnsCodecRow::schemas();
530        let mut txns_write = txns_client
531            .open_writer(
532                txns_id,
533                Arc::new(txns_key_schema),
534                Arc::new(txns_val_schema),
535                Diagnostics {
536                    shard_name: "txns".to_owned(),
537                    handle_purpose: "commit txns".to_owned(),
538                },
539            )
540            .await
541            .expect("txns schema shouldn't change");
542
543        let txns_read = TxnsRead::start::<TxnsCodecRow>(txns_client.clone(), txns_id).await;
544
545        let collections = Arc::new(std::sync::Mutex::new(BTreeMap::default()));
546        let finalizable_shards =
547            Arc::new(ShardIdSet::new(metrics.finalization_outstanding.clone()));
548        let finalized_shards =
549            Arc::new(ShardIdSet::new(metrics.finalization_pending_commit.clone()));
550        let config = Arc::new(Mutex::new(StorageConfiguration::new(
551            connection_context,
552            mz_dyncfgs::all_dyncfgs(),
553        )));
554
555        let initial_txn_upper = txns_write.fetch_recent_upper().await.to_owned();
556
557        let (cmd_tx, cmd_rx) = mpsc::unbounded_channel();
558        let (holds_tx, holds_rx) = mpsc::unbounded_channel();
559        let mut background_task = BackgroundTask {
560            config: Arc::clone(&config),
561            cmds_tx: cmd_tx.clone(),
562            cmds_rx: cmd_rx,
563            holds_rx,
564            collections: Arc::clone(&collections),
565            finalizable_shards: Arc::clone(&finalizable_shards),
566            shard_by_id: BTreeMap::new(),
567            since_handles: BTreeMap::new(),
568            txns_handle: Some(txns_write),
569            txns_shards: Default::default(),
570        };
571
572        let background_task =
573            mz_ore::task::spawn(|| "storage_collections::background_task", async move {
574                background_task.run().await
575            });
576
577        let finalize_shards_task = mz_ore::task::spawn(
578            || "storage_collections::finalize_shards_task",
579            finalize_shards_task::<T>(FinalizeShardsTaskConfig {
580                envd_epoch: envd_epoch.clone(),
581                config: Arc::clone(&config),
582                metrics,
583                finalizable_shards: Arc::clone(&finalizable_shards),
584                finalized_shards: Arc::clone(&finalized_shards),
585                persist_location: persist_location.clone(),
586                persist: Arc::clone(&persist_clients),
587                read_only,
588            }),
589        );
590
591        Self {
592            finalizable_shards,
593            finalized_shards,
594            collections,
595            txns_read,
596            envd_epoch,
597            read_only,
598            config,
599            initial_txn_upper,
600            persist_location,
601            persist: persist_clients,
602            cmd_tx,
603            holds_tx,
604            _background_task: Arc::new(background_task.abort_on_drop()),
605            _finalize_shards_task: Arc::new(finalize_shards_task.abort_on_drop()),
606        }
607    }
608
609    /// Opens a [WriteHandle] and a [SinceHandleWrapper], for holding back the since.
610    ///
611    /// `since` is an optional since that the read handle will be forwarded to
612    /// if it is less than its current since.
613    ///
614    /// This will `halt!` the process if we cannot successfully acquire a
615    /// critical handle with our current epoch.
616    async fn open_data_handles(
617        &self,
618        id: &GlobalId,
619        shard: ShardId,
620        since: Option<&Antichain<T>>,
621        relation_desc: RelationDesc,
622        persist_client: &PersistClient,
623    ) -> (
624        WriteHandle<SourceData, (), T, StorageDiff>,
625        SinceHandleWrapper<T>,
626    ) {
627        let since_handle = if self.read_only {
628            let read_handle = self
629                .open_leased_handle(id, shard, relation_desc.clone(), since, persist_client)
630                .await;
631            SinceHandleWrapper::Leased(read_handle)
632        } else {
633            // We're managing the data for this shard in read-write mode, which would fence out other
634            // processes in read-only mode; it's safe to upgrade the metadata version.
635            persist_client
636                .upgrade_version::<SourceData, (), T, StorageDiff>(
637                    shard,
638                    Diagnostics {
639                        shard_name: id.to_string(),
640                        handle_purpose: format!("controller data for {}", id),
641                    },
642                )
643                .await
644                .expect("invalid persist usage");
645
646            let since_handle = self
647                .open_critical_handle(id, shard, since, persist_client)
648                .await;
649
650            SinceHandleWrapper::Critical(since_handle)
651        };
652
653        let mut write_handle = self
654            .open_write_handle(id, shard, relation_desc, persist_client)
655            .await;
656
657        // N.B.
658        // Fetch the most recent upper for the write handle. Otherwise, this may
659        // be behind the since of the since handle. Its vital this happens AFTER
660        // we create the since handle as it needs to be linearized with that
661        // operation. It may be true that creating the write handle after the
662        // since handle already ensures this, but we do this out of an abundance
663        // of caution.
664        //
665        // Note that this returns the upper, but also sets it on the handle to
666        // be fetched later.
667        write_handle.fetch_recent_upper().await;
668
669        (write_handle, since_handle)
670    }
671
672    /// Opens a write handle for the given `shard`.
673    async fn open_write_handle(
674        &self,
675        id: &GlobalId,
676        shard: ShardId,
677        relation_desc: RelationDesc,
678        persist_client: &PersistClient,
679    ) -> WriteHandle<SourceData, (), T, StorageDiff> {
680        let diagnostics = Diagnostics {
681            shard_name: id.to_string(),
682            handle_purpose: format!("controller data for {}", id),
683        };
684
685        let write = persist_client
686            .open_writer(
687                shard,
688                Arc::new(relation_desc),
689                Arc::new(UnitSchema),
690                diagnostics.clone(),
691            )
692            .await
693            .expect("invalid persist usage");
694
695        write
696    }
697
698    /// Opens a critical since handle for the given `shard`.
699    ///
700    /// `since` is an optional since that the read handle will be forwarded to
701    /// if it is less than its current since.
702    ///
703    /// This will `halt!` the process if we cannot successfully acquire a
704    /// critical handle with our current epoch.
705    async fn open_critical_handle(
706        &self,
707        id: &GlobalId,
708        shard: ShardId,
709        since: Option<&Antichain<T>>,
710        persist_client: &PersistClient,
711    ) -> SinceHandle<SourceData, (), T, StorageDiff, PersistEpoch> {
712        tracing::debug!(%id, ?since, "opening critical handle");
713
714        assert!(
715            !self.read_only,
716            "attempting to open critical SinceHandle in read-only mode"
717        );
718
719        let diagnostics = Diagnostics {
720            shard_name: id.to_string(),
721            handle_purpose: format!("controller data for {}", id),
722        };
723
724        // Construct the handle in a separate block to ensure all error paths
725        // are diverging
726        let since_handle = {
727            // This block's aim is to ensure the handle is in terms of our epoch
728            // by the time we return it.
729            let mut handle: SinceHandle<_, _, _, _, PersistEpoch> = persist_client
730                .open_critical_since(
731                    shard,
732                    PersistClient::CONTROLLER_CRITICAL_SINCE,
733                    diagnostics.clone(),
734                )
735                .await
736                .expect("invalid persist usage");
737
738            // Take the join of the handle's since and the provided `since`;
739            // this lets materialized views express the since at which their
740            // read handles "start."
741            let provided_since = match since {
742                Some(since) => since,
743                None => &Antichain::from_elem(T::minimum()),
744            };
745            let since = handle.since().join(provided_since);
746
747            let our_epoch = self.envd_epoch;
748
749            loop {
750                let current_epoch: PersistEpoch = handle.opaque().clone();
751
752                // Ensure the current epoch is <= our epoch.
753                let unchecked_success = current_epoch.0.map(|e| e <= our_epoch).unwrap_or(true);
754
755                if unchecked_success {
756                    // Update the handle's state so that it is in terms of our
757                    // epoch.
758                    let checked_success = handle
759                        .compare_and_downgrade_since(
760                            &current_epoch,
761                            (&PersistEpoch::from(our_epoch), &since),
762                        )
763                        .await
764                        .is_ok();
765                    if checked_success {
766                        break handle;
767                    }
768                } else {
769                    mz_ore::halt!("fenced by envd @ {current_epoch:?}. ours = {our_epoch}");
770                }
771            }
772        };
773
774        since_handle
775    }
776
777    /// Opens a leased [ReadHandle], for the purpose of holding back a since,
778    /// for the given `shard`.
779    ///
780    /// `since` is an optional since that the read handle will be forwarded to
781    /// if it is less than its current since.
782    async fn open_leased_handle(
783        &self,
784        id: &GlobalId,
785        shard: ShardId,
786        relation_desc: RelationDesc,
787        since: Option<&Antichain<T>>,
788        persist_client: &PersistClient,
789    ) -> ReadHandle<SourceData, (), T, StorageDiff> {
790        tracing::debug!(%id, ?since, "opening leased handle");
791
792        let diagnostics = Diagnostics {
793            shard_name: id.to_string(),
794            handle_purpose: format!("controller data for {}", id),
795        };
796
797        let use_critical_since = false;
798        let mut handle: ReadHandle<_, _, _, _> = persist_client
799            .open_leased_reader(
800                shard,
801                Arc::new(relation_desc),
802                Arc::new(UnitSchema),
803                diagnostics.clone(),
804                use_critical_since,
805            )
806            .await
807            .expect("invalid persist usage");
808
809        // Take the join of the handle's since and the provided `since`;
810        // this lets materialized views express the since at which their
811        // read handles "start."
812        let provided_since = match since {
813            Some(since) => since,
814            None => &Antichain::from_elem(T::minimum()),
815        };
816        let since = handle.since().join(provided_since);
817
818        handle.downgrade_since(&since).await;
819
820        handle
821    }
822
823    fn register_handles(
824        &self,
825        id: GlobalId,
826        is_in_txns: bool,
827        since_handle: SinceHandleWrapper<T>,
828        write_handle: WriteHandle<SourceData, (), T, StorageDiff>,
829    ) {
830        self.send(BackgroundCmd::Register {
831            id,
832            is_in_txns,
833            since_handle,
834            write_handle,
835        });
836    }
837
838    fn send(&self, cmd: BackgroundCmd<T>) {
839        let _ = self.cmd_tx.send(cmd);
840    }
841
842    async fn snapshot_stats_inner(
843        &self,
844        id: GlobalId,
845        as_of: SnapshotStatsAsOf<T>,
846    ) -> Result<SnapshotStats, StorageError<T>> {
847        // TODO: Pull this out of BackgroundTask. Unlike the other methods, the
848        // caller of this one drives it to completion.
849        //
850        // We'd need to either share the critical handle somehow or maybe have
851        // two instances around, one in the worker and one in the
852        // StorageCollections.
853        let (tx, rx) = oneshot::channel();
854        self.send(BackgroundCmd::SnapshotStats(id, as_of, tx));
855        rx.await.expect("BackgroundTask should be live").0.await
856    }
857
858    /// If this identified collection has a dependency, install a read hold on
859    /// it.
860    ///
861    /// This is necessary to ensure that the dependency's since does not advance
862    /// beyond its dependents'.
863    fn install_collection_dependency_read_holds_inner(
864        &self,
865        self_collections: &mut BTreeMap<GlobalId, CollectionState<T>>,
866        id: GlobalId,
867    ) -> Result<(), StorageError<T>> {
868        let (deps, collection_implied_capability) = match self_collections.get(&id) {
869            Some(CollectionState {
870                storage_dependencies: deps,
871                implied_capability,
872                ..
873            }) => (deps.clone(), implied_capability),
874            _ => return Ok(()),
875        };
876
877        for dep in deps.iter() {
878            let dep_collection = self_collections
879                .get(dep)
880                .ok_or(StorageError::IdentifierMissing(id))?;
881
882            mz_ore::soft_assert_or_log!(
883                PartialOrder::less_equal(
884                    &dep_collection.implied_capability,
885                    collection_implied_capability
886                ),
887                "dependency since ({dep}@{:?}) cannot be in advance of dependent's since ({id}@{:?})",
888                dep_collection.implied_capability,
889                collection_implied_capability,
890            );
891        }
892
893        self.install_read_capabilities_inner(
894            self_collections,
895            id,
896            &deps,
897            collection_implied_capability.clone(),
898        )?;
899
900        Ok(())
901    }
902
903    /// Returns the given collection's dependencies.
904    fn determine_collection_dependencies(
905        &self,
906        self_collections: &BTreeMap<GlobalId, CollectionState<T>>,
907        source_id: GlobalId,
908        collection_desc: &CollectionDescription<T>,
909    ) -> Result<Vec<GlobalId>, StorageError<T>> {
910        let mut dependencies = Vec::new();
911
912        if let Some(id) = collection_desc.primary {
913            dependencies.push(id);
914        }
915
916        match &collection_desc.data_source {
917            DataSource::Introspection(_)
918            | DataSource::Webhook
919            | DataSource::Table
920            | DataSource::Progress
921            | DataSource::Other => (),
922            DataSource::IngestionExport {
923                ingestion_id,
924                data_config,
925                ..
926            } => {
927                // Ingestion exports depend on their primary source's remap
928                // collection, except when they use a CDCv2 envelope.
929                let source = self_collections
930                    .get(ingestion_id)
931                    .ok_or(StorageError::IdentifierMissing(*ingestion_id))?;
932                let DataSource::Ingestion(ingestion) = &source.description.data_source else {
933                    panic!("SourceExport must refer to a primary source that already exists");
934                };
935
936                match data_config.envelope {
937                    SourceEnvelope::CdcV2 => (),
938                    _ => dependencies.push(ingestion.remap_collection_id),
939                }
940            }
941            // Ingestions depend on their remap collection.
942            DataSource::Ingestion(ingestion) => {
943                if ingestion.remap_collection_id != source_id {
944                    dependencies.push(ingestion.remap_collection_id);
945                }
946            }
947            DataSource::Sink { desc } => dependencies.push(desc.sink.from),
948        }
949
950        Ok(dependencies)
951    }
952
953    /// Install read capabilities on the given `storage_dependencies`.
954    #[instrument(level = "debug")]
955    fn install_read_capabilities_inner(
956        &self,
957        self_collections: &mut BTreeMap<GlobalId, CollectionState<T>>,
958        from_id: GlobalId,
959        storage_dependencies: &[GlobalId],
960        read_capability: Antichain<T>,
961    ) -> Result<(), StorageError<T>> {
962        let mut changes = ChangeBatch::new();
963        for time in read_capability.iter() {
964            changes.update(time.clone(), 1);
965        }
966
967        if tracing::span_enabled!(tracing::Level::TRACE) {
968            // Collecting `user_capabilities` is potentially slow, thus only do it when needed.
969            let user_capabilities = self_collections
970                .iter_mut()
971                .filter(|(id, _c)| id.is_user())
972                .map(|(id, c)| {
973                    let updates = c.read_capabilities.updates().cloned().collect_vec();
974                    (*id, c.implied_capability.clone(), updates)
975                })
976                .collect_vec();
977
978            trace!(
979                %from_id,
980                ?storage_dependencies,
981                ?read_capability,
982                ?user_capabilities,
983                "install_read_capabilities_inner");
984        }
985
986        let mut storage_read_updates = storage_dependencies
987            .iter()
988            .map(|id| (*id, changes.clone()))
989            .collect();
990
991        StorageCollectionsImpl::update_read_capabilities_inner(
992            &self.cmd_tx,
993            self_collections,
994            &mut storage_read_updates,
995        );
996
997        if tracing::span_enabled!(tracing::Level::TRACE) {
998            // Collecting `user_capabilities` is potentially slow, thus only do it when needed.
999            let user_capabilities = self_collections
1000                .iter_mut()
1001                .filter(|(id, _c)| id.is_user())
1002                .map(|(id, c)| {
1003                    let updates = c.read_capabilities.updates().cloned().collect_vec();
1004                    (*id, c.implied_capability.clone(), updates)
1005                })
1006                .collect_vec();
1007
1008            trace!(
1009                %from_id,
1010                ?storage_dependencies,
1011                ?read_capability,
1012                ?user_capabilities,
1013                "after install_read_capabilities_inner!");
1014        }
1015
1016        Ok(())
1017    }
1018
1019    async fn recent_upper(&self, id: GlobalId) -> Result<Antichain<T>, StorageError<T>> {
1020        let metadata = &self.collection_metadata(id)?;
1021        let persist_client = self
1022            .persist
1023            .open(metadata.persist_location.clone())
1024            .await
1025            .unwrap();
1026        // Duplicate part of open_data_handles here because we don't need the
1027        // fetch_recent_upper call. The pubsub-updated shared_upper is enough.
1028        let diagnostics = Diagnostics {
1029            shard_name: id.to_string(),
1030            handle_purpose: format!("controller data for {}", id),
1031        };
1032        // NB: Opening a WriteHandle is cheap if it's never used in a
1033        // compare_and_append operation.
1034        let write = persist_client
1035            .open_writer::<SourceData, (), T, StorageDiff>(
1036                metadata.data_shard,
1037                Arc::new(metadata.relation_desc.clone()),
1038                Arc::new(UnitSchema),
1039                diagnostics.clone(),
1040            )
1041            .await
1042            .expect("invalid persist usage");
1043        Ok(write.shared_upper())
1044    }
1045
1046    async fn read_handle_for_snapshot(
1047        persist: Arc<PersistClientCache>,
1048        metadata: &CollectionMetadata,
1049        id: GlobalId,
1050    ) -> Result<ReadHandle<SourceData, (), T, StorageDiff>, StorageError<T>> {
1051        let persist_client = persist
1052            .open(metadata.persist_location.clone())
1053            .await
1054            .unwrap();
1055
1056        // We create a new read handle every time someone requests a snapshot
1057        // and then immediately expire it instead of keeping a read handle
1058        // permanently in our state to avoid having it heartbeat continually.
1059        // The assumption is that calls to snapshot are rare and therefore worth
1060        // it to always create a new handle.
1061        let read_handle = persist_client
1062            .open_leased_reader::<SourceData, (), _, _>(
1063                metadata.data_shard,
1064                Arc::new(metadata.relation_desc.clone()),
1065                Arc::new(UnitSchema),
1066                Diagnostics {
1067                    shard_name: id.to_string(),
1068                    handle_purpose: format!("snapshot {}", id),
1069                },
1070                USE_CRITICAL_SINCE_SNAPSHOT.get(&persist.cfg),
1071            )
1072            .await
1073            .expect("invalid persist usage");
1074        Ok(read_handle)
1075    }
1076
1077    // TODO(petrosagg): This signature is not very useful in the context of partially ordered times
1078    // where the as_of frontier might have multiple elements. In the current form the mutually
1079    // incomparable updates will be accumulated together to a state of the collection that never
1080    // actually existed. We should include the original time in the updates advanced by the as_of
1081    // frontier in the result and let the caller decide what to do with the information.
1082    fn snapshot(
1083        &self,
1084        id: GlobalId,
1085        as_of: T,
1086        txns_read: &TxnsRead<T>,
1087    ) -> BoxFuture<'static, Result<Vec<(Row, StorageDiff)>, StorageError<T>>>
1088    where
1089        T: Codec64 + From<EpochMillis> + TimestampManipulation,
1090    {
1091        let metadata = match self.collection_metadata(id) {
1092            Ok(metadata) => metadata.clone(),
1093            Err(e) => return async { Err(e.into()) }.boxed(),
1094        };
1095        let txns_read = metadata.txns_shard.as_ref().map(|txns_id| {
1096            assert_eq!(txns_id, txns_read.txns_id());
1097            txns_read.clone()
1098        });
1099        let persist = Arc::clone(&self.persist);
1100        async move {
1101            let mut read_handle = Self::read_handle_for_snapshot(persist, &metadata, id).await?;
1102            let contents = match txns_read {
1103                None => {
1104                    // We're not using txn-wal for tables, so we can take a snapshot directly.
1105                    read_handle
1106                        .snapshot_and_fetch(Antichain::from_elem(as_of))
1107                        .await
1108                }
1109                Some(txns_read) => {
1110                    // We _are_ using txn-wal for tables. It advances the physical upper of the
1111                    // shard lazily, so we need to ask it for the snapshot to ensure the read is
1112                    // unblocked.
1113                    //
1114                    // Consider the following scenario:
1115                    // - Table A is written to via txns at time 5
1116                    // - Tables other than A are written to via txns consuming timestamps up to 10
1117                    // - We'd like to read A at 7
1118                    // - The application process of A's txn has advanced the upper to 5+1, but we need
1119                    //   it to be past 7, but the txns shard knows that (5,10) is empty of writes to A
1120                    // - This branch allows it to handle that advancing the physical upper of Table A to
1121                    //   10 (NB but only once we see it get past the write at 5!)
1122                    // - Then we can read it normally.
1123                    txns_read.update_gt(as_of.clone()).await;
1124                    let data_snapshot = txns_read
1125                        .data_snapshot(metadata.data_shard, as_of.clone())
1126                        .await;
1127                    data_snapshot.snapshot_and_fetch(&mut read_handle).await
1128                }
1129            };
1130            match contents {
1131                Ok(contents) => {
1132                    let mut snapshot = Vec::with_capacity(contents.len());
1133                    for ((data, _), _, diff) in contents {
1134                        // TODO(petrosagg): We should accumulate the errors too and let the user
1135                        // interprret the result
1136                        let row = data.expect("invalid protobuf data").0?;
1137                        snapshot.push((row, diff));
1138                    }
1139                    Ok(snapshot)
1140                }
1141                Err(_) => Err(StorageError::ReadBeforeSince(id)),
1142            }
1143        }
1144        .boxed()
1145    }
1146
1147    fn snapshot_and_stream(
1148        &self,
1149        id: GlobalId,
1150        as_of: T,
1151        txns_read: &TxnsRead<T>,
1152    ) -> BoxFuture<'static, Result<BoxStream<'static, (SourceData, T, StorageDiff)>, StorageError<T>>>
1153    {
1154        use futures::stream::StreamExt;
1155
1156        let metadata = match self.collection_metadata(id) {
1157            Ok(metadata) => metadata.clone(),
1158            Err(e) => return async { Err(e.into()) }.boxed(),
1159        };
1160        let txns_read = metadata.txns_shard.as_ref().map(|txns_id| {
1161            assert_eq!(txns_id, txns_read.txns_id());
1162            txns_read.clone()
1163        });
1164        let persist = Arc::clone(&self.persist);
1165
1166        async move {
1167            let mut read_handle = Self::read_handle_for_snapshot(persist, &metadata, id).await?;
1168            let stream = match txns_read {
1169                None => {
1170                    // We're not using txn-wal for tables, so we can take a snapshot directly.
1171                    read_handle
1172                        .snapshot_and_stream(Antichain::from_elem(as_of))
1173                        .await
1174                        .map_err(|_| StorageError::ReadBeforeSince(id))?
1175                        .boxed()
1176                }
1177                Some(txns_read) => {
1178                    txns_read.update_gt(as_of.clone()).await;
1179                    let data_snapshot = txns_read
1180                        .data_snapshot(metadata.data_shard, as_of.clone())
1181                        .await;
1182                    data_snapshot
1183                        .snapshot_and_stream(&mut read_handle)
1184                        .await
1185                        .map_err(|_| StorageError::ReadBeforeSince(id))?
1186                        .boxed()
1187                }
1188            };
1189
1190            // Map our stream, unwrapping Persist internal errors.
1191            let stream = stream
1192                .map(|((k, _v), t, d)| {
1193                    // TODO(parkmycar): We should accumulate the errors and pass them on to the
1194                    // caller.
1195                    let data = k.expect("error while streaming from Persist");
1196                    (data, t, d)
1197                })
1198                .boxed();
1199            Ok(stream)
1200        }
1201        .boxed()
1202    }
1203
1204    fn set_read_policies_inner(
1205        &self,
1206        collections: &mut BTreeMap<GlobalId, CollectionState<T>>,
1207        policies: Vec<(GlobalId, ReadPolicy<T>)>,
1208    ) {
1209        trace!("set_read_policies: {:?}", policies);
1210
1211        let mut read_capability_changes = BTreeMap::default();
1212
1213        for (id, policy) in policies.into_iter() {
1214            let collection = match collections.get_mut(&id) {
1215                Some(c) => c,
1216                None => {
1217                    panic!("Reference to absent collection {id}");
1218                }
1219            };
1220
1221            let mut new_read_capability = policy.frontier(collection.write_frontier.borrow());
1222
1223            if PartialOrder::less_equal(&collection.implied_capability, &new_read_capability) {
1224                let mut update = ChangeBatch::new();
1225                update.extend(new_read_capability.iter().map(|time| (time.clone(), 1)));
1226                std::mem::swap(&mut collection.implied_capability, &mut new_read_capability);
1227                update.extend(new_read_capability.iter().map(|time| (time.clone(), -1)));
1228                if !update.is_empty() {
1229                    read_capability_changes.insert(id, update);
1230                }
1231            }
1232
1233            collection.read_policy = policy;
1234        }
1235
1236        for (id, changes) in read_capability_changes.iter() {
1237            if id.is_user() {
1238                trace!(%id, ?changes, "in set_read_policies, capability changes");
1239            }
1240        }
1241
1242        if !read_capability_changes.is_empty() {
1243            StorageCollectionsImpl::update_read_capabilities_inner(
1244                &self.cmd_tx,
1245                collections,
1246                &mut read_capability_changes,
1247            );
1248        }
1249    }
1250
1251    // This is not an associated function so that we can share it with the task
1252    // that updates the persist handles and also has a reference to the shared
1253    // collections state.
1254    fn update_read_capabilities_inner(
1255        cmd_tx: &mpsc::UnboundedSender<BackgroundCmd<T>>,
1256        collections: &mut BTreeMap<GlobalId, CollectionState<T>>,
1257        updates: &mut BTreeMap<GlobalId, ChangeBatch<T>>,
1258    ) {
1259        // Location to record consequences that we need to act on.
1260        let mut collections_net = BTreeMap::new();
1261
1262        // We must not rely on any specific relative ordering of `GlobalId`s.
1263        // That said, it is reasonable to assume that collections generally have
1264        // greater IDs than their dependencies, so starting with the largest is
1265        // a useful optimization.
1266        while let Some(id) = updates.keys().rev().next().cloned() {
1267            let mut update = updates.remove(&id).unwrap();
1268
1269            if id.is_user() {
1270                trace!(id = ?id, update = ?update, "update_read_capabilities");
1271            }
1272
1273            let collection = if let Some(c) = collections.get_mut(&id) {
1274                c
1275            } else {
1276                let has_positive_updates = update.iter().any(|(_ts, diff)| *diff > 0);
1277                if has_positive_updates {
1278                    panic!(
1279                        "reference to absent collection {id} but we have positive updates: {:?}",
1280                        update
1281                    );
1282                } else {
1283                    // Continue purely negative updates. Someone has probably
1284                    // already dropped this collection!
1285                    continue;
1286                }
1287            };
1288
1289            let current_read_capabilities = collection.read_capabilities.frontier().to_owned();
1290            for (time, diff) in update.iter() {
1291                assert!(
1292                    collection.read_capabilities.count_for(time) + diff >= 0,
1293                    "update {:?} for collection {id} would lead to negative \
1294                        read capabilities, read capabilities before applying: {:?}",
1295                    update,
1296                    collection.read_capabilities
1297                );
1298
1299                if collection.read_capabilities.count_for(time) + diff > 0 {
1300                    assert!(
1301                        current_read_capabilities.less_equal(time),
1302                        "update {:?} for collection {id} is trying to \
1303                            install read capabilities before the current \
1304                            frontier of read capabilities, read capabilities before applying: {:?}",
1305                        update,
1306                        collection.read_capabilities
1307                    );
1308                }
1309            }
1310
1311            let changes = collection.read_capabilities.update_iter(update.drain());
1312            update.extend(changes);
1313
1314            if id.is_user() {
1315                trace!(
1316                %id,
1317                ?collection.storage_dependencies,
1318                ?update,
1319                "forwarding update to storage dependencies");
1320            }
1321
1322            for id in collection.storage_dependencies.iter() {
1323                updates
1324                    .entry(*id)
1325                    .or_insert_with(ChangeBatch::new)
1326                    .extend(update.iter().cloned());
1327            }
1328
1329            let (changes, frontier) = collections_net
1330                .entry(id)
1331                .or_insert_with(|| (<ChangeBatch<_>>::new(), Antichain::new()));
1332
1333            changes.extend(update.drain());
1334            *frontier = collection.read_capabilities.frontier().to_owned();
1335        }
1336
1337        // Translate our net compute actions into downgrades of persist sinces.
1338        // The actual downgrades are performed by a Tokio task asynchronously.
1339        let mut persist_compaction_commands = Vec::with_capacity(collections_net.len());
1340        for (key, (mut changes, frontier)) in collections_net {
1341            if !changes.is_empty() {
1342                // If the collection has a "primary" collection, let that primary drive compaction.
1343                let collection = collections.get(&key).expect("must still exist");
1344                let should_emit_persist_compaction = collection.description.primary.is_none();
1345
1346                if frontier.is_empty() {
1347                    info!(id = %key, "removing collection state because the since advanced to []!");
1348                    collections.remove(&key).expect("must still exist");
1349                }
1350
1351                if should_emit_persist_compaction {
1352                    persist_compaction_commands.push((key, frontier));
1353                }
1354            }
1355        }
1356
1357        if !persist_compaction_commands.is_empty() {
1358            cmd_tx
1359                .send(BackgroundCmd::DowngradeSince(persist_compaction_commands))
1360                .expect("cannot fail to send");
1361        }
1362    }
1363
1364    /// Remove any shards that we know are finalized
1365    fn synchronize_finalized_shards(&self, storage_metadata: &StorageMetadata) {
1366        self.finalized_shards
1367            .lock()
1368            .retain(|shard| storage_metadata.unfinalized_shards.contains(shard));
1369    }
1370}
1371
1372// See comments on the above impl for StorageCollectionsImpl.
1373#[async_trait]
1374impl<T> StorageCollections for StorageCollectionsImpl<T>
1375where
1376    T: TimelyTimestamp
1377        + Lattice
1378        + Codec64
1379        + From<EpochMillis>
1380        + TimestampManipulation
1381        + Into<mz_repr::Timestamp>
1382        + Sync,
1383{
1384    type Timestamp = T;
1385
1386    async fn initialize_state(
1387        &self,
1388        txn: &mut (dyn StorageTxn<T> + Send),
1389        init_ids: BTreeSet<GlobalId>,
1390    ) -> Result<(), StorageError<T>> {
1391        let metadata = txn.get_collection_metadata();
1392        let existing_metadata: BTreeSet<_> = metadata.into_iter().map(|(id, _)| id).collect();
1393
1394        // Determine which collections we do not yet have metadata for.
1395        let new_collections: BTreeSet<GlobalId> =
1396            init_ids.difference(&existing_metadata).cloned().collect();
1397
1398        self.prepare_state(
1399            txn,
1400            new_collections,
1401            BTreeSet::default(),
1402            BTreeMap::default(),
1403        )
1404        .await?;
1405
1406        // All shards that belong to collections dropped in the last epoch are
1407        // eligible for finalization.
1408        //
1409        // n.b. this introduces an unlikely race condition: if a collection is
1410        // dropped from the catalog, but the dataflow is still running on a
1411        // worker, assuming the shard is safe to finalize on reboot may cause
1412        // the cluster to panic.
1413        let unfinalized_shards = txn.get_unfinalized_shards().into_iter().collect_vec();
1414
1415        info!(?unfinalized_shards, "initializing finalizable_shards");
1416
1417        self.finalizable_shards.lock().extend(unfinalized_shards);
1418
1419        Ok(())
1420    }
1421
1422    fn update_parameters(&self, config_params: StorageParameters) {
1423        // We serialize the dyncfg updates in StorageParameters, but configure
1424        // persist separately.
1425        config_params.dyncfg_updates.apply(self.persist.cfg());
1426
1427        self.config
1428            .lock()
1429            .expect("lock poisoned")
1430            .update(config_params);
1431    }
1432
1433    fn collection_metadata(&self, id: GlobalId) -> Result<CollectionMetadata, CollectionMissing> {
1434        let collections = self.collections.lock().expect("lock poisoned");
1435
1436        collections
1437            .get(&id)
1438            .map(|c| c.collection_metadata.clone())
1439            .ok_or(CollectionMissing(id))
1440    }
1441
1442    fn active_collection_metadatas(&self) -> Vec<(GlobalId, CollectionMetadata)> {
1443        let collections = self.collections.lock().expect("lock poisoned");
1444
1445        collections
1446            .iter()
1447            .filter(|(_id, c)| !c.is_dropped())
1448            .map(|(id, c)| (*id, c.collection_metadata.clone()))
1449            .collect()
1450    }
1451
1452    fn collections_frontiers(
1453        &self,
1454        ids: Vec<GlobalId>,
1455    ) -> Result<Vec<CollectionFrontiers<Self::Timestamp>>, CollectionMissing> {
1456        if ids.is_empty() {
1457            return Ok(vec![]);
1458        }
1459
1460        let collections = self.collections.lock().expect("lock poisoned");
1461
1462        let res = ids
1463            .into_iter()
1464            .map(|id| {
1465                collections
1466                    .get(&id)
1467                    .map(|c| CollectionFrontiers {
1468                        id: id.clone(),
1469                        write_frontier: c.write_frontier.clone(),
1470                        implied_capability: c.implied_capability.clone(),
1471                        read_capabilities: c.read_capabilities.frontier().to_owned(),
1472                    })
1473                    .ok_or(CollectionMissing(id))
1474            })
1475            .collect::<Result<Vec<_>, _>>()?;
1476
1477        Ok(res)
1478    }
1479
1480    fn active_collection_frontiers(&self) -> Vec<CollectionFrontiers<Self::Timestamp>> {
1481        let collections = self.collections.lock().expect("lock poisoned");
1482
1483        let res = collections
1484            .iter()
1485            .filter(|(_id, c)| !c.is_dropped())
1486            .map(|(id, c)| CollectionFrontiers {
1487                id: id.clone(),
1488                write_frontier: c.write_frontier.clone(),
1489                implied_capability: c.implied_capability.clone(),
1490                read_capabilities: c.read_capabilities.frontier().to_owned(),
1491            })
1492            .collect_vec();
1493
1494        res
1495    }
1496
1497    async fn snapshot_stats(
1498        &self,
1499        id: GlobalId,
1500        as_of: Antichain<Self::Timestamp>,
1501    ) -> Result<SnapshotStats, StorageError<Self::Timestamp>> {
1502        let metadata = self.collection_metadata(id)?;
1503
1504        // See the comments in StorageController::snapshot for what's going on
1505        // here.
1506        let as_of = match metadata.txns_shard.as_ref() {
1507            None => SnapshotStatsAsOf::Direct(as_of),
1508            Some(txns_id) => {
1509                assert_eq!(txns_id, self.txns_read.txns_id());
1510                let as_of = as_of
1511                    .into_option()
1512                    .expect("cannot read as_of the empty antichain");
1513                self.txns_read.update_gt(as_of.clone()).await;
1514                let data_snapshot = self
1515                    .txns_read
1516                    .data_snapshot(metadata.data_shard, as_of.clone())
1517                    .await;
1518                SnapshotStatsAsOf::Txns(data_snapshot)
1519            }
1520        };
1521        self.snapshot_stats_inner(id, as_of).await
1522    }
1523
1524    async fn snapshot_parts_stats(
1525        &self,
1526        id: GlobalId,
1527        as_of: Antichain<Self::Timestamp>,
1528    ) -> BoxFuture<'static, Result<SnapshotPartsStats, StorageError<Self::Timestamp>>> {
1529        let metadata = {
1530            let self_collections = self.collections.lock().expect("lock poisoned");
1531
1532            let collection_metadata = self_collections
1533                .get(&id)
1534                .ok_or(StorageError::IdentifierMissing(id))
1535                .map(|c| c.collection_metadata.clone());
1536
1537            match collection_metadata {
1538                Ok(m) => m,
1539                Err(e) => return Box::pin(async move { Err(e) }),
1540            }
1541        };
1542
1543        // See the comments in StorageController::snapshot for what's going on
1544        // here.
1545        let persist = Arc::clone(&self.persist);
1546        let read_handle = Self::read_handle_for_snapshot(persist, &metadata, id).await;
1547
1548        let data_snapshot = match (metadata, as_of.as_option()) {
1549            (
1550                CollectionMetadata {
1551                    txns_shard: Some(txns_id),
1552                    data_shard,
1553                    ..
1554                },
1555                Some(as_of),
1556            ) => {
1557                assert_eq!(txns_id, *self.txns_read.txns_id());
1558                self.txns_read.update_gt(as_of.clone()).await;
1559                let data_snapshot = self
1560                    .txns_read
1561                    .data_snapshot(data_shard, as_of.clone())
1562                    .await;
1563                Some(data_snapshot)
1564            }
1565            _ => None,
1566        };
1567
1568        Box::pin(async move {
1569            let read_handle = read_handle?;
1570            let result = match data_snapshot {
1571                Some(data_snapshot) => data_snapshot.snapshot_parts_stats(&read_handle).await,
1572                None => read_handle.snapshot_parts_stats(as_of).await,
1573            };
1574            read_handle.expire().await;
1575            result.map_err(|_| StorageError::ReadBeforeSince(id))
1576        })
1577    }
1578
1579    // TODO(petrosagg): This signature is not very useful in the context of partially ordered times
1580    // where the as_of frontier might have multiple elements. In the current form the mutually
1581    // incomparable updates will be accumulated together to a state of the collection that never
1582    // actually existed. We should include the original time in the updates advanced by the as_of
1583    // frontier in the result and let the caller decide what to do with the information.
1584    fn snapshot(
1585        &self,
1586        id: GlobalId,
1587        as_of: Self::Timestamp,
1588    ) -> BoxFuture<'static, Result<Vec<(Row, StorageDiff)>, StorageError<Self::Timestamp>>> {
1589        self.snapshot(id, as_of, &self.txns_read)
1590    }
1591
1592    async fn snapshot_latest(
1593        &self,
1594        id: GlobalId,
1595    ) -> Result<Vec<Row>, StorageError<Self::Timestamp>> {
1596        let upper = self.recent_upper(id).await?;
1597        let res = match upper.as_option() {
1598            Some(f) if f > &T::minimum() => {
1599                let as_of = f.step_back().unwrap();
1600
1601                let snapshot = self.snapshot(id, as_of, &self.txns_read).await.unwrap();
1602                snapshot
1603                    .into_iter()
1604                    .map(|(row, diff)| {
1605                        assert_eq!(diff, 1, "snapshot doesn't accumulate to set");
1606                        row
1607                    })
1608                    .collect()
1609            }
1610            Some(_min) => {
1611                // The collection must be empty!
1612                Vec::new()
1613            }
1614            // The collection is closed, we cannot determine a latest read
1615            // timestamp based on the upper.
1616            _ => {
1617                return Err(StorageError::InvalidUsage(
1618                    "collection closed, cannot determine a read timestamp based on the upper"
1619                        .to_string(),
1620                ));
1621            }
1622        };
1623
1624        Ok(res)
1625    }
1626
1627    fn snapshot_cursor(
1628        &self,
1629        id: GlobalId,
1630        as_of: Self::Timestamp,
1631    ) -> BoxFuture<'static, Result<SnapshotCursor<Self::Timestamp>, StorageError<Self::Timestamp>>>
1632    where
1633        Self::Timestamp: TimelyTimestamp + Lattice + Codec64,
1634    {
1635        let metadata = match self.collection_metadata(id) {
1636            Ok(metadata) => metadata.clone(),
1637            Err(e) => return async { Err(e.into()) }.boxed(),
1638        };
1639        let txns_read = metadata.txns_shard.as_ref().map(|txns_id| {
1640            // Ensure the txn's shard the controller has is the same that this
1641            // collection is registered to.
1642            assert_eq!(txns_id, self.txns_read.txns_id());
1643            self.txns_read.clone()
1644        });
1645        let persist = Arc::clone(&self.persist);
1646
1647        // See the comments in Self::snapshot for what's going on here.
1648        async move {
1649            let mut handle = Self::read_handle_for_snapshot(persist, &metadata, id).await?;
1650            let cursor = match txns_read {
1651                None => {
1652                    let cursor = handle
1653                        .snapshot_cursor(Antichain::from_elem(as_of), |_| true)
1654                        .await
1655                        .map_err(|_| StorageError::ReadBeforeSince(id))?;
1656                    SnapshotCursor {
1657                        _read_handle: handle,
1658                        cursor,
1659                    }
1660                }
1661                Some(txns_read) => {
1662                    txns_read.update_gt(as_of.clone()).await;
1663                    let data_snapshot = txns_read
1664                        .data_snapshot(metadata.data_shard, as_of.clone())
1665                        .await;
1666                    let cursor = data_snapshot
1667                        .snapshot_cursor(&mut handle, |_| true)
1668                        .await
1669                        .map_err(|_| StorageError::ReadBeforeSince(id))?;
1670                    SnapshotCursor {
1671                        _read_handle: handle,
1672                        cursor,
1673                    }
1674                }
1675            };
1676
1677            Ok(cursor)
1678        }
1679        .boxed()
1680    }
1681
1682    fn snapshot_and_stream(
1683        &self,
1684        id: GlobalId,
1685        as_of: Self::Timestamp,
1686    ) -> BoxFuture<
1687        'static,
1688        Result<
1689            BoxStream<'static, (SourceData, Self::Timestamp, StorageDiff)>,
1690            StorageError<Self::Timestamp>,
1691        >,
1692    >
1693    where
1694        Self::Timestamp: TimelyTimestamp + Lattice + Codec64 + 'static,
1695    {
1696        self.snapshot_and_stream(id, as_of, &self.txns_read)
1697    }
1698
1699    fn create_update_builder(
1700        &self,
1701        id: GlobalId,
1702    ) -> BoxFuture<
1703        'static,
1704        Result<
1705            TimestamplessUpdateBuilder<SourceData, (), Self::Timestamp, StorageDiff>,
1706            StorageError<Self::Timestamp>,
1707        >,
1708    > {
1709        let metadata = match self.collection_metadata(id) {
1710            Ok(m) => m,
1711            Err(e) => return Box::pin(async move { Err(e.into()) }),
1712        };
1713        let persist = Arc::clone(&self.persist);
1714
1715        async move {
1716            let persist_client = persist
1717                .open(metadata.persist_location.clone())
1718                .await
1719                .expect("invalid persist usage");
1720            let write_handle = persist_client
1721                .open_writer::<SourceData, (), Self::Timestamp, StorageDiff>(
1722                    metadata.data_shard,
1723                    Arc::new(metadata.relation_desc.clone()),
1724                    Arc::new(UnitSchema),
1725                    Diagnostics {
1726                        shard_name: id.to_string(),
1727                        handle_purpose: format!("create write batch {}", id),
1728                    },
1729                )
1730                .await
1731                .expect("invalid persist usage");
1732            let builder = TimestamplessUpdateBuilder::new(&write_handle);
1733
1734            Ok(builder)
1735        }
1736        .boxed()
1737    }
1738
1739    fn check_exists(&self, id: GlobalId) -> Result<(), StorageError<Self::Timestamp>> {
1740        let collections = self.collections.lock().expect("lock poisoned");
1741
1742        if collections.contains_key(&id) {
1743            Ok(())
1744        } else {
1745            Err(StorageError::IdentifierMissing(id))
1746        }
1747    }
1748
1749    async fn prepare_state(
1750        &self,
1751        txn: &mut (dyn StorageTxn<Self::Timestamp> + Send),
1752        ids_to_add: BTreeSet<GlobalId>,
1753        ids_to_drop: BTreeSet<GlobalId>,
1754        ids_to_register: BTreeMap<GlobalId, ShardId>,
1755    ) -> Result<(), StorageError<T>> {
1756        txn.insert_collection_metadata(
1757            ids_to_add
1758                .into_iter()
1759                .map(|id| (id, ShardId::new()))
1760                .collect(),
1761        )?;
1762        txn.insert_collection_metadata(ids_to_register)?;
1763
1764        // Delete the metadata for any dropped collections.
1765        let dropped_mappings = txn.delete_collection_metadata(ids_to_drop);
1766
1767        let dropped_shards = dropped_mappings
1768            .into_iter()
1769            .map(|(_id, shard)| shard)
1770            .collect();
1771
1772        txn.insert_unfinalized_shards(dropped_shards)?;
1773
1774        // Reconcile any shards we've successfully finalized with the shard
1775        // finalization collection.
1776        let finalized_shards = self.finalized_shards.lock().iter().copied().collect();
1777        txn.mark_shards_as_finalized(finalized_shards);
1778
1779        Ok(())
1780    }
1781
1782    // TODO(aljoscha): It would be swell if we could refactor this Leviathan of
1783    // a method/move individual parts to their own methods.
1784    #[instrument(level = "debug")]
1785    async fn create_collections_for_bootstrap(
1786        &self,
1787        storage_metadata: &StorageMetadata,
1788        register_ts: Option<Self::Timestamp>,
1789        mut collections: Vec<(GlobalId, CollectionDescription<Self::Timestamp>)>,
1790        migrated_storage_collections: &BTreeSet<GlobalId>,
1791    ) -> Result<(), StorageError<Self::Timestamp>> {
1792        let is_in_txns = |id, metadata: &CollectionMetadata| {
1793            metadata.txns_shard.is_some()
1794                && !(self.read_only && migrated_storage_collections.contains(&id))
1795        };
1796
1797        // Validate first, to avoid corrupting state.
1798        // 1. create a dropped identifier, or
1799        // 2. create an existing identifier with a new description.
1800        // Make sure to check for errors within `ingestions` as well.
1801        collections.sort_by_key(|(id, _)| *id);
1802        collections.dedup();
1803        for pos in 1..collections.len() {
1804            if collections[pos - 1].0 == collections[pos].0 {
1805                return Err(StorageError::CollectionIdReused(collections[pos].0));
1806            }
1807        }
1808
1809        {
1810            // Early sanity check: if we knew about a collection already it's
1811            // description must match!
1812            //
1813            // NOTE: There could be concurrent modifications to
1814            // `self.collections`, but this sanity check is better than nothing.
1815            let self_collections = self.collections.lock().expect("lock poisoned");
1816            for (id, description) in collections.iter() {
1817                if let Some(existing_collection) = self_collections.get(id) {
1818                    if &existing_collection.description != description {
1819                        return Err(StorageError::CollectionIdReused(*id));
1820                    }
1821                }
1822            }
1823        }
1824
1825        // We first enrich each collection description with some additional
1826        // metadata...
1827        let enriched_with_metadata = collections
1828            .into_iter()
1829            .map(|(id, description)| {
1830                let data_shard = storage_metadata.get_collection_shard::<T>(id)?;
1831
1832                // If the shard is being managed by txn-wal (initially,
1833                // tables), then we need to pass along the shard id for the txns
1834                // shard to dataflow rendering.
1835                let txns_shard = description
1836                    .data_source
1837                    .in_txns()
1838                    .then(|| *self.txns_read.txns_id());
1839
1840                let metadata = CollectionMetadata {
1841                    persist_location: self.persist_location.clone(),
1842                    data_shard,
1843                    relation_desc: description.desc.clone(),
1844                    txns_shard,
1845                };
1846
1847                Ok((id, description, metadata))
1848            })
1849            .collect_vec();
1850
1851        // So that we can open `SinceHandle`s for each collections concurrently.
1852        let persist_client = self
1853            .persist
1854            .open(self.persist_location.clone())
1855            .await
1856            .unwrap();
1857        let persist_client = &persist_client;
1858        // Reborrow the `&mut self` as immutable, as all the concurrent work to
1859        // be processed in this stream cannot all have exclusive access.
1860        use futures::stream::{StreamExt, TryStreamExt};
1861        let this = &*self;
1862        let mut to_register: Vec<_> = futures::stream::iter(enriched_with_metadata)
1863            .map(|data: Result<_, StorageError<Self::Timestamp>>| {
1864                let register_ts = register_ts.clone();
1865                async move {
1866                    let (id, description, metadata) = data?;
1867
1868                    // should be replaced with real introspection
1869                    // (https://github.com/MaterializeInc/database-issues/issues/4078)
1870                    // but for now, it's helpful to have this mapping written down
1871                    // somewhere
1872                    debug!("mapping GlobalId={} to shard ({})", id, metadata.data_shard);
1873
1874                    let (write, mut since_handle) = this
1875                        .open_data_handles(
1876                            &id,
1877                            metadata.data_shard,
1878                            description.since.as_ref(),
1879                            metadata.relation_desc.clone(),
1880                            persist_client,
1881                        )
1882                        .await;
1883
1884                    // Present tables as springing into existence at the register_ts
1885                    // by advancing the since. Otherwise, we could end up in a
1886                    // situation where a table with a long compaction window appears
1887                    // to exist before the environment (and this the table) existed.
1888                    //
1889                    // We could potentially also do the same thing for other
1890                    // sources, in particular storage's internal sources and perhaps
1891                    // others, but leave them for now.
1892                    match description.data_source {
1893                        DataSource::Introspection(_)
1894                        | DataSource::IngestionExport { .. }
1895                        | DataSource::Webhook
1896                        | DataSource::Ingestion(_)
1897                        | DataSource::Progress
1898                        | DataSource::Other => {}
1899                        DataSource::Sink { .. } => {}
1900                        DataSource::Table => {
1901                            let register_ts = register_ts.expect(
1902                                "caller should have provided a register_ts when creating a table",
1903                            );
1904                            if since_handle.since().elements() == &[T::minimum()]
1905                                && !migrated_storage_collections.contains(&id)
1906                            {
1907                                debug!("advancing {} to initial since of {:?}", id, register_ts);
1908                                let token = since_handle.opaque();
1909                                let _ = since_handle
1910                                    .compare_and_downgrade_since(
1911                                        &token,
1912                                        (&token, &Antichain::from_elem(register_ts.clone())),
1913                                    )
1914                                    .await;
1915                            }
1916                        }
1917                    }
1918
1919                    Ok::<_, StorageError<Self::Timestamp>>((
1920                        id,
1921                        description,
1922                        write,
1923                        since_handle,
1924                        metadata,
1925                    ))
1926                }
1927            })
1928            // Poll each future for each collection concurrently, maximum of 50 at a time.
1929            .buffer_unordered(50)
1930            // HERE BE DRAGONS:
1931            //
1932            // There are at least 2 subtleties in using `FuturesUnordered`
1933            // (which `buffer_unordered` uses underneath:
1934            // - One is captured here
1935            //   <https://github.com/rust-lang/futures-rs/issues/2387>
1936            // - And the other is deadlocking if processing an OUTPUT of a
1937            //   `FuturesUnordered` stream attempts to obtain an async mutex that
1938            //   is also obtained in the futures being polled.
1939            //
1940            // Both of these could potentially be issues in all usages of
1941            // `buffer_unordered` in this method, so we stick the standard
1942            // advice: only use `try_collect` or `collect`!
1943            .try_collect()
1944            .await?;
1945
1946        // Reorder in dependency order.
1947        #[derive(Ord, PartialOrd, Eq, PartialEq)]
1948        enum DependencyOrder {
1949            /// Tables should always be registered first, and large ids before small ones.
1950            Table(Reverse<GlobalId>),
1951            /// For most collections the id order is the correct one.
1952            Collection(GlobalId),
1953            /// Sinks should always be registered last.
1954            Sink(GlobalId),
1955        }
1956        to_register.sort_by_key(|(id, desc, ..)| match &desc.data_source {
1957            DataSource::Table => DependencyOrder::Table(Reverse(*id)),
1958            DataSource::Sink { .. } => DependencyOrder::Sink(*id),
1959            _ => DependencyOrder::Collection(*id),
1960        });
1961
1962        // We hold this lock for a very short amount of time, just doing some
1963        // hashmap inserts and unbounded channel sends.
1964        let mut self_collections = self.collections.lock().expect("lock poisoned");
1965
1966        for (id, description, write_handle, since_handle, metadata) in to_register {
1967            let write_frontier = write_handle.upper();
1968            let data_shard_since = since_handle.since().clone();
1969
1970            // Determine if this collection has any dependencies.
1971            let storage_dependencies =
1972                self.determine_collection_dependencies(&*self_collections, id, &description)?;
1973
1974            // Determine the initial since of the collection.
1975            let initial_since = match storage_dependencies
1976                .iter()
1977                .at_most_one()
1978                .expect("should have at most one dependency")
1979            {
1980                Some(dep) => {
1981                    let dependency_collection = self_collections
1982                        .get(dep)
1983                        .ok_or(StorageError::IdentifierMissing(*dep))?;
1984                    let dependency_since = dependency_collection.implied_capability.clone();
1985
1986                    // If an item has a dependency, its initial since must be
1987                    // advanced as far as its dependency, i.e. a dependency's
1988                    // since may never be in advance of its dependents.
1989                    //
1990                    // We have to do this every time we initialize the
1991                    // collection, though––the invariant might have been upheld
1992                    // correctly in the previous epoch, but the
1993                    // `data_shard_since` might not have compacted and, on
1994                    // establishing a new persist connection, still have data we
1995                    // said _could_ be compacted.
1996                    if PartialOrder::less_than(&data_shard_since, &dependency_since) {
1997                        // The dependency since cannot be beyond the dependent
1998                        // (our) upper unless the collection is new. In
1999                        // practice, the depdenency is the remap shard of a
2000                        // source (export), and if the since is allowed to
2001                        // "catch up" to the upper, that is `upper <= since`, a
2002                        // restarting ingestion cannot differentiate between
2003                        // updates that have already been written out to the
2004                        // backing persist shard and updates that have yet to be
2005                        // written. We would write duplicate updates.
2006                        //
2007                        // If this check fails, it means that the read hold
2008                        // installed on the dependency was probably not upheld
2009                        // –– if it were, the dependency's since could not have
2010                        // advanced as far the dependent's upper.
2011                        //
2012                        // We don't care about the dependency since when the
2013                        // write frontier is empty. In that case, no-one can
2014                        // write down any more updates.
2015                        mz_ore::soft_assert_or_log!(
2016                            write_frontier.elements() == &[T::minimum()]
2017                                || write_frontier.is_empty()
2018                                || PartialOrder::less_than(&dependency_since, write_frontier),
2019                            "dependency ({dep}) since has advanced past dependent ({id}) upper \n
2020                            dependent ({id}): since {:?}, upper {:?} \n
2021                            dependency ({dep}): since {:?}",
2022                            data_shard_since,
2023                            write_frontier,
2024                            dependency_since
2025                        );
2026
2027                        dependency_since
2028                    } else {
2029                        data_shard_since
2030                    }
2031                }
2032                None => data_shard_since,
2033            };
2034
2035            let mut collection_state = CollectionState::new(
2036                description,
2037                initial_since,
2038                write_frontier.clone(),
2039                storage_dependencies,
2040                metadata.clone(),
2041            );
2042
2043            // Install the collection state in the appropriate spot.
2044            match &collection_state.description.data_source {
2045                DataSource::Introspection(_) => {
2046                    self_collections.insert(id, collection_state);
2047                }
2048                DataSource::Webhook => {
2049                    self_collections.insert(id, collection_state);
2050                }
2051                DataSource::IngestionExport {
2052                    ingestion_id,
2053                    details,
2054                    data_config,
2055                } => {
2056                    // Adjust the source to contain this export.
2057                    let source_collection = self_collections
2058                        .get_mut(ingestion_id)
2059                        .expect("known to exist");
2060                    match &mut source_collection.description {
2061                        CollectionDescription {
2062                            data_source: DataSource::Ingestion(ingestion_desc),
2063                            ..
2064                        } => ingestion_desc.source_exports.insert(
2065                            id,
2066                            SourceExport {
2067                                storage_metadata: (),
2068                                details: details.clone(),
2069                                data_config: data_config.clone(),
2070                            },
2071                        ),
2072                        _ => unreachable!(
2073                            "SourceExport must only refer to primary sources that already exist"
2074                        ),
2075                    };
2076
2077                    self_collections.insert(id, collection_state);
2078                }
2079                DataSource::Table => {
2080                    // See comment on self.initial_txn_upper on why we're doing
2081                    // this.
2082                    if is_in_txns(id, &metadata)
2083                        && PartialOrder::less_than(
2084                            &collection_state.write_frontier,
2085                            &self.initial_txn_upper,
2086                        )
2087                    {
2088                        // We could try and be cute and use the join of the txn
2089                        // upper and the table upper. But that has more
2090                        // complicated reasoning for why it is or isn't correct,
2091                        // and we're only dealing with totally ordered times
2092                        // here.
2093                        collection_state
2094                            .write_frontier
2095                            .clone_from(&self.initial_txn_upper);
2096                    }
2097                    self_collections.insert(id, collection_state);
2098                }
2099                DataSource::Progress | DataSource::Other => {
2100                    self_collections.insert(id, collection_state);
2101                }
2102                DataSource::Ingestion(_) => {
2103                    self_collections.insert(id, collection_state);
2104                }
2105                DataSource::Sink { .. } => {
2106                    self_collections.insert(id, collection_state);
2107                }
2108            }
2109
2110            self.register_handles(id, is_in_txns(id, &metadata), since_handle, write_handle);
2111
2112            // If this collection has a dependency, install a read hold on it.
2113            self.install_collection_dependency_read_holds_inner(&mut *self_collections, id)?;
2114        }
2115
2116        drop(self_collections);
2117
2118        self.synchronize_finalized_shards(storage_metadata);
2119
2120        Ok(())
2121    }
2122
2123    async fn alter_ingestion_source_desc(
2124        &self,
2125        ingestion_id: GlobalId,
2126        source_desc: SourceDesc,
2127    ) -> Result<(), StorageError<Self::Timestamp>> {
2128        // The StorageController checks the validity of these. And we just
2129        // accept them.
2130
2131        let mut self_collections = self.collections.lock().expect("lock poisoned");
2132        let collection = self_collections
2133            .get_mut(&ingestion_id)
2134            .ok_or(StorageError::IdentifierMissing(ingestion_id))?;
2135
2136        let curr_ingestion = match &mut collection.description.data_source {
2137            DataSource::Ingestion(active_ingestion) => active_ingestion,
2138            _ => unreachable!("verified collection refers to ingestion"),
2139        };
2140
2141        curr_ingestion.desc = source_desc;
2142        debug!("altered {ingestion_id}'s SourceDesc");
2143
2144        Ok(())
2145    }
2146
2147    async fn alter_ingestion_export_data_configs(
2148        &self,
2149        source_exports: BTreeMap<GlobalId, SourceExportDataConfig>,
2150    ) -> Result<(), StorageError<Self::Timestamp>> {
2151        let mut self_collections = self.collections.lock().expect("lock poisoned");
2152
2153        for (source_export_id, new_data_config) in source_exports {
2154            // We need to adjust the data config on the CollectionState for
2155            // the source export collection directly
2156            let source_export_collection = self_collections
2157                .get_mut(&source_export_id)
2158                .ok_or_else(|| StorageError::IdentifierMissing(source_export_id))?;
2159            let ingestion_id = match &mut source_export_collection.description.data_source {
2160                DataSource::IngestionExport {
2161                    ingestion_id,
2162                    details: _,
2163                    data_config,
2164                } => {
2165                    *data_config = new_data_config.clone();
2166                    *ingestion_id
2167                }
2168                o => {
2169                    tracing::warn!("alter_ingestion_export_data_configs called on {:?}", o);
2170                    Err(StorageError::IdentifierInvalid(source_export_id))?
2171                }
2172            };
2173            // We also need to adjust the data config on the CollectionState of the
2174            // Ingestion that the export is associated with.
2175            let ingestion_collection = self_collections
2176                .get_mut(&ingestion_id)
2177                .ok_or_else(|| StorageError::IdentifierMissing(ingestion_id))?;
2178
2179            match &mut ingestion_collection.description.data_source {
2180                DataSource::Ingestion(ingestion_desc) => {
2181                    let source_export = ingestion_desc
2182                        .source_exports
2183                        .get_mut(&source_export_id)
2184                        .ok_or_else(|| StorageError::IdentifierMissing(source_export_id))?;
2185
2186                    if source_export.data_config != new_data_config {
2187                        tracing::info!(?source_export_id, from = ?source_export.data_config, to = ?new_data_config, "alter_ingestion_export_data_configs, updating");
2188                        source_export.data_config = new_data_config;
2189                    } else {
2190                        tracing::warn!(
2191                            "alter_ingestion_export_data_configs called on \
2192                                    export {source_export_id} of {ingestion_id} but \
2193                                    the data config was the same"
2194                        );
2195                    }
2196                }
2197                o => {
2198                    tracing::warn!("alter_ingestion_export_data_configs called on {:?}", o);
2199                    Err(StorageError::IdentifierInvalid(ingestion_id))?;
2200                }
2201            }
2202        }
2203
2204        Ok(())
2205    }
2206
2207    async fn alter_ingestion_connections(
2208        &self,
2209        source_connections: BTreeMap<GlobalId, GenericSourceConnection<InlinedConnection>>,
2210    ) -> Result<(), StorageError<Self::Timestamp>> {
2211        let mut self_collections = self.collections.lock().expect("lock poisoned");
2212
2213        for (id, conn) in source_connections {
2214            let collection = self_collections
2215                .get_mut(&id)
2216                .ok_or_else(|| StorageError::IdentifierMissing(id))?;
2217
2218            match &mut collection.description.data_source {
2219                DataSource::Ingestion(ingestion) => {
2220                    // If the connection hasn't changed, there's no sense in
2221                    // re-rendering the dataflow.
2222                    if ingestion.desc.connection != conn {
2223                        info!(from = ?ingestion.desc.connection, to = ?conn, "alter_ingestion_connections, updating");
2224                        ingestion.desc.connection = conn;
2225                    } else {
2226                        warn!(
2227                            "update_source_connection called on {id} but the \
2228                            connection was the same"
2229                        );
2230                    }
2231                }
2232                o => {
2233                    warn!("update_source_connection called on {:?}", o);
2234                    Err(StorageError::IdentifierInvalid(id))?;
2235                }
2236            }
2237        }
2238
2239        Ok(())
2240    }
2241
2242    async fn alter_table_desc(
2243        &self,
2244        existing_collection: GlobalId,
2245        new_collection: GlobalId,
2246        new_desc: RelationDesc,
2247        expected_version: RelationVersion,
2248    ) -> Result<(), StorageError<Self::Timestamp>> {
2249        let data_shard = {
2250            let self_collections = self.collections.lock().expect("lock poisoned");
2251            let existing = self_collections
2252                .get(&existing_collection)
2253                .ok_or_else(|| StorageError::IdentifierMissing(existing_collection))?;
2254
2255            // TODO(alter_table): Support changes to sources.
2256            if existing.description.data_source != DataSource::Table {
2257                return Err(StorageError::IdentifierInvalid(existing_collection));
2258            }
2259
2260            existing.collection_metadata.data_shard
2261        };
2262
2263        let persist_client = self
2264            .persist
2265            .open(self.persist_location.clone())
2266            .await
2267            .unwrap();
2268
2269        // Evolve the schema of this shard.
2270        let diagnostics = Diagnostics {
2271            shard_name: existing_collection.to_string(),
2272            handle_purpose: "alter_table_desc".to_string(),
2273        };
2274        // We map the Adapter's RelationVersion 1:1 with SchemaId.
2275        let expected_schema = expected_version.into();
2276        let schema_result = persist_client
2277            .compare_and_evolve_schema::<SourceData, (), T, StorageDiff>(
2278                data_shard,
2279                expected_schema,
2280                &new_desc,
2281                &UnitSchema,
2282                diagnostics,
2283            )
2284            .await
2285            .map_err(|e| StorageError::InvalidUsage(e.to_string()))?;
2286        tracing::info!(
2287            ?existing_collection,
2288            ?new_collection,
2289            ?new_desc,
2290            "evolved schema"
2291        );
2292
2293        match schema_result {
2294            CaESchema::Ok(id) => id,
2295            // TODO(alter_table): If we get an expected mismatch we should retry.
2296            CaESchema::ExpectedMismatch {
2297                schema_id,
2298                key,
2299                val,
2300            } => {
2301                mz_ore::soft_panic_or_log!(
2302                    "schema expectation mismatch {schema_id:?}, {key:?}, {val:?}"
2303                );
2304                return Err(StorageError::Generic(anyhow::anyhow!(
2305                    "schema expected mismatch, {existing_collection:?}",
2306                )));
2307            }
2308            CaESchema::Incompatible => {
2309                mz_ore::soft_panic_or_log!(
2310                    "incompatible schema! {existing_collection} {new_desc:?}"
2311                );
2312                return Err(StorageError::Generic(anyhow::anyhow!(
2313                    "schema incompatible, {existing_collection:?}"
2314                )));
2315            }
2316        };
2317
2318        // Once the new schema is registered we can open new data handles.
2319        let (write_handle, since_handle) = self
2320            .open_data_handles(
2321                &new_collection,
2322                data_shard,
2323                None,
2324                new_desc.clone(),
2325                &persist_client,
2326            )
2327            .await;
2328
2329        // TODO(alter_table): Do we need to advance the since of the table to match the time this
2330        // new version was registered with txn-wal?
2331
2332        // Great! Our new schema is registered with Persist, now we need to update our internal
2333        // data structures.
2334        {
2335            let mut self_collections = self.collections.lock().expect("lock poisoned");
2336
2337            // Update the existing collection so we know it's a "projection" of this new one.
2338            let existing = self_collections
2339                .get_mut(&existing_collection)
2340                .expect("existing collection missing");
2341
2342            // A higher level should already be asserting this, but let's make sure.
2343            assert_eq!(existing.description.data_source, DataSource::Table);
2344            assert_none!(existing.description.primary);
2345
2346            // The existing version of the table will depend on the new version.
2347            existing.description.primary = Some(new_collection);
2348            existing.storage_dependencies.push(new_collection);
2349
2350            // Copy over the frontiers from the previous version.
2351            // The new table starts with two holds - the implied capability, and the hold from
2352            // the previous version - both at the previous version's read frontier.
2353            let implied_capability = existing.read_capabilities.frontier().to_owned();
2354            let write_frontier = existing.write_frontier.clone();
2355
2356            // Determine the relevant read capabilities on the new collection.
2357            //
2358            // Note(parkmycar): Originally we used `install_collection_dependency_read_holds_inner`
2359            // here, but that only installed a ReadHold on the new collection for the implied
2360            // capability of the existing collection. This would cause runtime panics because it
2361            // would eventually result in negative read capabilities.
2362            let mut changes = ChangeBatch::new();
2363            changes.extend(implied_capability.iter().map(|t| (t.clone(), 1)));
2364
2365            // Note: The new collection is now the "primary collection".
2366            let collection_desc = CollectionDescription::for_table(new_desc.clone());
2367            let collection_meta = CollectionMetadata {
2368                persist_location: self.persist_location.clone(),
2369                relation_desc: collection_desc.desc.clone(),
2370                data_shard,
2371                txns_shard: Some(self.txns_read.txns_id().clone()),
2372            };
2373            let collection_state = CollectionState::new(
2374                collection_desc,
2375                implied_capability,
2376                write_frontier,
2377                Vec::new(),
2378                collection_meta,
2379            );
2380
2381            // Add a record of the new collection.
2382            self_collections.insert(new_collection, collection_state);
2383
2384            let mut updates = BTreeMap::from([(new_collection, changes)]);
2385            StorageCollectionsImpl::update_read_capabilities_inner(
2386                &self.cmd_tx,
2387                &mut *self_collections,
2388                &mut updates,
2389            );
2390        };
2391
2392        // TODO(alter_table): Support changes to sources.
2393        self.register_handles(new_collection, true, since_handle, write_handle);
2394
2395        info!(%existing_collection, %new_collection, ?new_desc, "altered table");
2396
2397        Ok(())
2398    }
2399
2400    fn drop_collections_unvalidated(
2401        &self,
2402        storage_metadata: &StorageMetadata,
2403        identifiers: Vec<GlobalId>,
2404    ) {
2405        debug!(?identifiers, "drop_collections_unvalidated");
2406
2407        let mut self_collections = self.collections.lock().expect("lock poisoned");
2408
2409        for id in identifiers.iter() {
2410            let metadata = storage_metadata.get_collection_shard::<T>(*id);
2411            mz_ore::soft_assert_or_log!(
2412                matches!(metadata, Err(StorageError::IdentifierMissing(_))),
2413                "dropping {id}, but drop was not synchronized with storage \
2414                controller via `synchronize_collections`"
2415            );
2416
2417            let dropped_data_source = match self_collections.get(id) {
2418                Some(col) => col.description.data_source.clone(),
2419                None => continue,
2420            };
2421
2422            // If we are dropping source exports, we need to modify the
2423            // ingestion that it runs on.
2424            if let DataSource::IngestionExport { ingestion_id, .. } = dropped_data_source {
2425                // Adjust the source to remove this export.
2426                let ingestion = match self_collections.get_mut(&ingestion_id) {
2427                    Some(ingestion) => ingestion,
2428                    // Primary ingestion already dropped.
2429                    None => {
2430                        tracing::error!(
2431                            "primary source {ingestion_id} seemingly dropped before subsource {id}",
2432                        );
2433                        continue;
2434                    }
2435                };
2436
2437                match &mut ingestion.description {
2438                    CollectionDescription {
2439                        data_source: DataSource::Ingestion(ingestion_desc),
2440                        ..
2441                    } => {
2442                        let removed = ingestion_desc.source_exports.remove(id);
2443                        mz_ore::soft_assert_or_log!(
2444                            removed.is_some(),
2445                            "dropped subsource {id} already removed from source exports"
2446                        );
2447                    }
2448                    _ => unreachable!(
2449                        "SourceExport must only refer to primary sources that already exist"
2450                    ),
2451                };
2452            }
2453        }
2454
2455        // Policies that advance the since to the empty antichain. We do still
2456        // honor outstanding read holds, and collections will only be dropped
2457        // once those are removed as well.
2458        //
2459        // We don't explicitly remove read capabilities! Downgrading the
2460        // frontier of the source to `[]` (the empty Antichain), will propagate
2461        // to the storage dependencies.
2462        let mut finalized_policies = Vec::new();
2463
2464        for id in identifiers {
2465            // Make sure it's still there, might already have been deleted.
2466            if self_collections.contains_key(&id) {
2467                finalized_policies.push((id, ReadPolicy::ValidFrom(Antichain::new())));
2468            }
2469        }
2470        self.set_read_policies_inner(&mut self_collections, finalized_policies);
2471
2472        drop(self_collections);
2473
2474        self.synchronize_finalized_shards(storage_metadata);
2475    }
2476
2477    fn set_read_policies(&self, policies: Vec<(GlobalId, ReadPolicy<Self::Timestamp>)>) {
2478        let mut collections = self.collections.lock().expect("lock poisoned");
2479
2480        if tracing::enabled!(tracing::Level::TRACE) {
2481            let user_capabilities = collections
2482                .iter_mut()
2483                .filter(|(id, _c)| id.is_user())
2484                .map(|(id, c)| {
2485                    let updates = c.read_capabilities.updates().cloned().collect_vec();
2486                    (*id, c.implied_capability.clone(), updates)
2487                })
2488                .collect_vec();
2489
2490            trace!(?policies, ?user_capabilities, "set_read_policies");
2491        }
2492
2493        self.set_read_policies_inner(&mut collections, policies);
2494
2495        if tracing::enabled!(tracing::Level::TRACE) {
2496            let user_capabilities = collections
2497                .iter_mut()
2498                .filter(|(id, _c)| id.is_user())
2499                .map(|(id, c)| {
2500                    let updates = c.read_capabilities.updates().cloned().collect_vec();
2501                    (*id, c.implied_capability.clone(), updates)
2502                })
2503                .collect_vec();
2504
2505            trace!(?user_capabilities, "after! set_read_policies");
2506        }
2507    }
2508
2509    fn acquire_read_holds(
2510        &self,
2511        desired_holds: Vec<GlobalId>,
2512    ) -> Result<Vec<ReadHold<Self::Timestamp>>, CollectionMissing> {
2513        if desired_holds.is_empty() {
2514            return Ok(vec![]);
2515        }
2516
2517        let mut collections = self.collections.lock().expect("lock poisoned");
2518
2519        let mut advanced_holds = Vec::new();
2520        // We advance the holds by our current since frontier. Can't acquire
2521        // holds for times that have been compacted away!
2522        //
2523        // NOTE: We acquire read holds at the earliest possible time rather than
2524        // at the implied capability. This is so that, for example, adapter can
2525        // acquire a read hold to hold back the frontier, giving the COMPUTE
2526        // controller a chance to also acquire a read hold at that early
2527        // frontier. If/when we change the interplay between adapter and COMPUTE
2528        // to pass around ReadHold tokens, we might tighten this up and instead
2529        // acquire read holds at the implied capability.
2530        for id in desired_holds.iter() {
2531            let collection = collections.get(id).ok_or(CollectionMissing(*id))?;
2532            let since = collection.read_capabilities.frontier().to_owned();
2533            advanced_holds.push((*id, since));
2534        }
2535
2536        let mut updates = advanced_holds
2537            .iter()
2538            .map(|(id, hold)| {
2539                let mut changes = ChangeBatch::new();
2540                changes.extend(hold.iter().map(|time| (time.clone(), 1)));
2541                (*id, changes)
2542            })
2543            .collect::<BTreeMap<_, _>>();
2544
2545        StorageCollectionsImpl::update_read_capabilities_inner(
2546            &self.cmd_tx,
2547            &mut collections,
2548            &mut updates,
2549        );
2550
2551        let acquired_holds = advanced_holds
2552            .into_iter()
2553            .map(|(id, since)| ReadHold::with_channel(id, since, self.holds_tx.clone()))
2554            .collect_vec();
2555
2556        trace!(?desired_holds, ?acquired_holds, "acquire_read_holds");
2557
2558        Ok(acquired_holds)
2559    }
2560
2561    /// Determine time dependence information for the object.
2562    fn determine_time_dependence(
2563        &self,
2564        id: GlobalId,
2565    ) -> Result<Option<TimeDependence>, TimeDependenceError> {
2566        use TimeDependenceError::CollectionMissing;
2567        let collections = self.collections.lock().expect("lock poisoned");
2568        let mut collection = Some(collections.get(&id).ok_or(CollectionMissing(id))?);
2569
2570        let mut result = None;
2571
2572        while let Some(c) = collection.take() {
2573            use DataSource::*;
2574            if let Some(timeline) = &c.description.timeline {
2575                // Only the epoch timeline follows wall-clock.
2576                if *timeline != Timeline::EpochMilliseconds {
2577                    break;
2578                }
2579            }
2580            match &c.description.data_source {
2581                Ingestion(ingestion) => {
2582                    use GenericSourceConnection::*;
2583                    match ingestion.desc.connection {
2584                        // Kafka, Postgres, MySql, and SQL Server sources all
2585                        // follow wall clock.
2586                        Kafka(_) | Postgres(_) | MySql(_) | SqlServer(_) => {
2587                            result = Some(TimeDependence::default())
2588                        }
2589                        // Load generators not further specified.
2590                        LoadGenerator(_) => {}
2591                    }
2592                }
2593                IngestionExport { ingestion_id, .. } => {
2594                    let c = collections
2595                        .get(ingestion_id)
2596                        .ok_or(CollectionMissing(*ingestion_id))?;
2597                    collection = Some(c);
2598                }
2599                // Introspection, other, progress, table, and webhook sources follow wall clock.
2600                Introspection(_) | Progress | Table { .. } | Webhook { .. } => {
2601                    result = Some(TimeDependence::default())
2602                }
2603                // Materialized views, continual tasks, etc, aren't managed by storage.
2604                Other => {}
2605                Sink { .. } => {}
2606            };
2607        }
2608        Ok(result)
2609    }
2610}
2611
2612/// Wraps either a "critical" [SinceHandle] or a leased [ReadHandle].
2613///
2614/// When a [StorageCollections] is in read-only mode, we will only ever acquire
2615/// [ReadHandle], because acquiring the [SinceHandle] and driving forward its
2616/// since is considered a write. Conversely, when in read-write mode, we acquire
2617/// [SinceHandle].
2618#[derive(Debug)]
2619enum SinceHandleWrapper<T>
2620where
2621    T: TimelyTimestamp + Lattice + Codec64,
2622{
2623    Critical(SinceHandle<SourceData, (), T, StorageDiff, PersistEpoch>),
2624    Leased(ReadHandle<SourceData, (), T, StorageDiff>),
2625}
2626
2627impl<T> SinceHandleWrapper<T>
2628where
2629    T: TimelyTimestamp + Lattice + Codec64 + TotalOrder + Sync,
2630{
2631    pub fn since(&self) -> &Antichain<T> {
2632        match self {
2633            Self::Critical(handle) => handle.since(),
2634            Self::Leased(handle) => handle.since(),
2635        }
2636    }
2637
2638    pub fn opaque(&self) -> PersistEpoch {
2639        match self {
2640            Self::Critical(handle) => handle.opaque().clone(),
2641            Self::Leased(_handle) => {
2642                // The opaque is expected to be used with
2643                // `compare_and_downgrade_since`, and the leased handle doesn't
2644                // have a notion of an opaque. We pretend here and in
2645                // `compare_and_downgrade_since`.
2646                PersistEpoch(None)
2647            }
2648        }
2649    }
2650
2651    pub async fn compare_and_downgrade_since(
2652        &mut self,
2653        expected: &PersistEpoch,
2654        new: (&PersistEpoch, &Antichain<T>),
2655    ) -> Result<Antichain<T>, PersistEpoch> {
2656        match self {
2657            Self::Critical(handle) => handle.compare_and_downgrade_since(expected, new).await,
2658            Self::Leased(handle) => {
2659                let (opaque, since) = new;
2660                assert_none!(opaque.0);
2661
2662                handle.downgrade_since(since).await;
2663
2664                Ok(since.clone())
2665            }
2666        }
2667    }
2668
2669    pub async fn maybe_compare_and_downgrade_since(
2670        &mut self,
2671        expected: &PersistEpoch,
2672        new: (&PersistEpoch, &Antichain<T>),
2673    ) -> Option<Result<Antichain<T>, PersistEpoch>> {
2674        match self {
2675            Self::Critical(handle) => {
2676                handle
2677                    .maybe_compare_and_downgrade_since(expected, new)
2678                    .await
2679            }
2680            Self::Leased(handle) => {
2681                let (opaque, since) = new;
2682                assert_none!(opaque.0);
2683
2684                handle.maybe_downgrade_since(since).await;
2685
2686                Some(Ok(since.clone()))
2687            }
2688        }
2689    }
2690
2691    pub fn snapshot_stats(
2692        &self,
2693        id: GlobalId,
2694        as_of: Option<Antichain<T>>,
2695    ) -> BoxFuture<'static, Result<SnapshotStats, StorageError<T>>> {
2696        match self {
2697            Self::Critical(handle) => {
2698                let res = handle
2699                    .snapshot_stats(as_of)
2700                    .map(move |x| x.map_err(|_| StorageError::ReadBeforeSince(id)));
2701                Box::pin(res)
2702            }
2703            Self::Leased(handle) => {
2704                let res = handle
2705                    .snapshot_stats(as_of)
2706                    .map(move |x| x.map_err(|_| StorageError::ReadBeforeSince(id)));
2707                Box::pin(res)
2708            }
2709        }
2710    }
2711
2712    pub fn snapshot_stats_from_txn(
2713        &self,
2714        id: GlobalId,
2715        data_snapshot: DataSnapshot<T>,
2716    ) -> BoxFuture<'static, Result<SnapshotStats, StorageError<T>>> {
2717        match self {
2718            Self::Critical(handle) => Box::pin(
2719                data_snapshot
2720                    .snapshot_stats_from_critical(handle)
2721                    .map(move |x| x.map_err(|_| StorageError::ReadBeforeSince(id))),
2722            ),
2723            Self::Leased(handle) => Box::pin(
2724                data_snapshot
2725                    .snapshot_stats_from_leased(handle)
2726                    .map(move |x| x.map_err(|_| StorageError::ReadBeforeSince(id))),
2727            ),
2728        }
2729    }
2730}
2731
2732/// State maintained about individual collections.
2733#[derive(Debug, Clone)]
2734struct CollectionState<T> {
2735    /// Description with which the collection was created
2736    pub description: CollectionDescription<T>,
2737
2738    /// Accumulation of read capabilities for the collection.
2739    ///
2740    /// This accumulation will always contain `self.implied_capability`, but may
2741    /// also contain capabilities held by others who have read dependencies on
2742    /// this collection.
2743    pub read_capabilities: MutableAntichain<T>,
2744
2745    /// The implicit capability associated with collection creation.  This
2746    /// should never be less than the since of the associated persist
2747    /// collection.
2748    pub implied_capability: Antichain<T>,
2749
2750    /// The policy to use to downgrade `self.implied_capability`.
2751    pub read_policy: ReadPolicy<T>,
2752
2753    /// Storage identifiers on which this collection depends.
2754    pub storage_dependencies: Vec<GlobalId>,
2755
2756    /// Reported write frontier.
2757    pub write_frontier: Antichain<T>,
2758
2759    pub collection_metadata: CollectionMetadata,
2760}
2761
2762impl<T: TimelyTimestamp> CollectionState<T> {
2763    /// Creates a new collection state, with an initial read policy valid from
2764    /// `since`.
2765    pub fn new(
2766        description: CollectionDescription<T>,
2767        since: Antichain<T>,
2768        write_frontier: Antichain<T>,
2769        storage_dependencies: Vec<GlobalId>,
2770        metadata: CollectionMetadata,
2771    ) -> Self {
2772        let mut read_capabilities = MutableAntichain::new();
2773        read_capabilities.update_iter(since.iter().map(|time| (time.clone(), 1)));
2774        Self {
2775            description,
2776            read_capabilities,
2777            implied_capability: since.clone(),
2778            read_policy: ReadPolicy::NoPolicy {
2779                initial_since: since,
2780            },
2781            storage_dependencies,
2782            write_frontier,
2783            collection_metadata: metadata,
2784        }
2785    }
2786
2787    /// Returns whether the collection was dropped.
2788    pub fn is_dropped(&self) -> bool {
2789        self.read_capabilities.is_empty()
2790    }
2791}
2792
2793/// A task that keeps persist handles, downgrades sinces when asked,
2794/// periodically gets recent uppers from them, and updates the shard collection
2795/// state when needed.
2796///
2797/// This shares state with [StorageCollectionsImpl] via `Arcs` and channels.
2798#[derive(Debug)]
2799struct BackgroundTask<T: TimelyTimestamp + Lattice + Codec64> {
2800    config: Arc<Mutex<StorageConfiguration>>,
2801    cmds_tx: mpsc::UnboundedSender<BackgroundCmd<T>>,
2802    cmds_rx: mpsc::UnboundedReceiver<BackgroundCmd<T>>,
2803    holds_rx: mpsc::UnboundedReceiver<(GlobalId, ChangeBatch<T>)>,
2804    finalizable_shards: Arc<ShardIdSet>,
2805    collections: Arc<std::sync::Mutex<BTreeMap<GlobalId, CollectionState<T>>>>,
2806    // So we know what shard ID corresponds to what global ID, which we need
2807    // when re-enqueing futures for determining the next upper update.
2808    shard_by_id: BTreeMap<GlobalId, ShardId>,
2809    since_handles: BTreeMap<GlobalId, SinceHandleWrapper<T>>,
2810    txns_handle: Option<WriteHandle<SourceData, (), T, StorageDiff>>,
2811    txns_shards: BTreeSet<GlobalId>,
2812}
2813
2814#[derive(Debug)]
2815enum BackgroundCmd<T: TimelyTimestamp + Lattice + Codec64> {
2816    Register {
2817        id: GlobalId,
2818        is_in_txns: bool,
2819        write_handle: WriteHandle<SourceData, (), T, StorageDiff>,
2820        since_handle: SinceHandleWrapper<T>,
2821    },
2822    DowngradeSince(Vec<(GlobalId, Antichain<T>)>),
2823    SnapshotStats(
2824        GlobalId,
2825        SnapshotStatsAsOf<T>,
2826        oneshot::Sender<SnapshotStatsRes<T>>,
2827    ),
2828}
2829
2830/// A newtype wrapper to hang a Debug impl off of.
2831pub(crate) struct SnapshotStatsRes<T>(BoxFuture<'static, Result<SnapshotStats, StorageError<T>>>);
2832
2833impl<T> Debug for SnapshotStatsRes<T> {
2834    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2835        f.debug_struct("SnapshotStatsRes").finish_non_exhaustive()
2836    }
2837}
2838
2839impl<T> BackgroundTask<T>
2840where
2841    T: TimelyTimestamp
2842        + Lattice
2843        + Codec64
2844        + From<EpochMillis>
2845        + TimestampManipulation
2846        + Into<mz_repr::Timestamp>
2847        + Sync,
2848{
2849    async fn run(&mut self) {
2850        // Futures that fetch the recent upper from all other shards.
2851        let mut upper_futures: FuturesUnordered<
2852            std::pin::Pin<
2853                Box<
2854                    dyn Future<
2855                            Output = (
2856                                GlobalId,
2857                                WriteHandle<SourceData, (), T, StorageDiff>,
2858                                Antichain<T>,
2859                            ),
2860                        > + Send,
2861                >,
2862            >,
2863        > = FuturesUnordered::new();
2864
2865        let gen_upper_future =
2866            |id, mut handle: WriteHandle<_, _, _, _>, prev_upper: Antichain<T>| {
2867                let fut = async move {
2868                    soft_assert_or_log!(
2869                        !prev_upper.is_empty(),
2870                        "cannot await progress when upper is already empty"
2871                    );
2872                    handle.wait_for_upper_past(&prev_upper).await;
2873                    let new_upper = handle.shared_upper();
2874                    (id, handle, new_upper)
2875                };
2876
2877                fut
2878            };
2879
2880        let mut txns_upper_future = match self.txns_handle.take() {
2881            Some(txns_handle) => {
2882                let upper = txns_handle.upper().clone();
2883                let txns_upper_future =
2884                    gen_upper_future(GlobalId::Transient(1), txns_handle, upper);
2885                txns_upper_future.boxed()
2886            }
2887            None => async { std::future::pending().await }.boxed(),
2888        };
2889
2890        loop {
2891            tokio::select! {
2892                (id, handle, upper) = &mut txns_upper_future => {
2893                    trace!("new upper from txns shard: {:?}", upper);
2894                    let mut uppers = Vec::new();
2895                    for id in self.txns_shards.iter() {
2896                        uppers.push((*id, &upper));
2897                    }
2898                    self.update_write_frontiers(&uppers).await;
2899
2900                    let fut = gen_upper_future(id, handle, upper);
2901                    txns_upper_future = fut.boxed();
2902                }
2903                Some((id, handle, upper)) = upper_futures.next() => {
2904                    if id.is_user() {
2905                        trace!("new upper for collection {id}: {:?}", upper);
2906                    }
2907                    let current_shard = self.shard_by_id.get(&id);
2908                    if let Some(shard_id) = current_shard {
2909                        if shard_id == &handle.shard_id() {
2910                            // Still current, so process the update and enqueue
2911                            // again!
2912                            let uppers = &[(id, &upper)];
2913                            self.update_write_frontiers(uppers).await;
2914                            if !upper.is_empty() {
2915                                let fut = gen_upper_future(id, handle, upper);
2916                                upper_futures.push(fut.boxed());
2917                            }
2918                        } else {
2919                            // Be polite and expire the write handle. This can
2920                            // happen when we get an upper update for a write
2921                            // handle that has since been replaced via Update.
2922                            handle.expire().await;
2923                        }
2924                    }
2925                }
2926                cmd = self.cmds_rx.recv() => {
2927                    let cmd = if let Some(cmd) = cmd {
2928                        cmd
2929                    } else {
2930                        // We're done!
2931                        break;
2932                    };
2933
2934                    match cmd {
2935                        BackgroundCmd::Register{ id, is_in_txns, write_handle, since_handle } => {
2936                            debug!("registering handles for {}", id);
2937                            let previous = self.shard_by_id.insert(id, write_handle.shard_id());
2938                            if previous.is_some() {
2939                                panic!("already registered a WriteHandle for collection {id}");
2940                            }
2941
2942                            let previous = self.since_handles.insert(id, since_handle);
2943                            if previous.is_some() {
2944                                panic!("already registered a SinceHandle for collection {id}");
2945                            }
2946
2947                            if is_in_txns {
2948                                self.txns_shards.insert(id);
2949                            } else {
2950                                let upper = write_handle.upper().clone();
2951                                if !upper.is_empty() {
2952                                    let fut = gen_upper_future(id, write_handle, upper);
2953                                    upper_futures.push(fut.boxed());
2954                                }
2955                            }
2956
2957                        }
2958                        BackgroundCmd::DowngradeSince(cmds) => {
2959                            self.downgrade_sinces(cmds).await;
2960                        }
2961                        BackgroundCmd::SnapshotStats(id, as_of, tx) => {
2962                            // NB: The requested as_of could be arbitrarily far
2963                            // in the future. So, in order to avoid blocking
2964                            // this loop until it's available and the
2965                            // `snapshot_stats` call resolves, instead return
2966                            // the future to the caller and await it there.
2967                            let res = match self.since_handles.get(&id) {
2968                                Some(x) => {
2969                                    let fut: BoxFuture<
2970                                        'static,
2971                                        Result<SnapshotStats, StorageError<T>>,
2972                                    > = match as_of {
2973                                        SnapshotStatsAsOf::Direct(as_of) => {
2974                                            x.snapshot_stats(id, Some(as_of))
2975                                        }
2976                                        SnapshotStatsAsOf::Txns(data_snapshot) => {
2977                                            x.snapshot_stats_from_txn(id, data_snapshot)
2978                                        }
2979                                    };
2980                                    SnapshotStatsRes(fut)
2981                                }
2982                                None => SnapshotStatsRes(Box::pin(futures::future::ready(Err(
2983                                    StorageError::IdentifierMissing(id),
2984                                )))),
2985                            };
2986                            // It's fine if the listener hung up.
2987                            let _ = tx.send(res);
2988                        }
2989                    }
2990                }
2991                Some(holds_changes) = self.holds_rx.recv() => {
2992                    let mut batched_changes = BTreeMap::new();
2993                    batched_changes.insert(holds_changes.0, holds_changes.1);
2994
2995                    while let Ok(mut holds_changes) = self.holds_rx.try_recv() {
2996                        let entry = batched_changes.entry(holds_changes.0);
2997                        entry
2998                            .and_modify(|existing| existing.extend(holds_changes.1.drain()))
2999                            .or_insert_with(|| holds_changes.1);
3000                    }
3001
3002                    let mut collections = self.collections.lock().expect("lock poisoned");
3003
3004                    let user_changes = batched_changes
3005                        .iter()
3006                        .filter(|(id, _c)| id.is_user())
3007                        .map(|(id, c)| {
3008                            (id.clone(), c.clone())
3009                        })
3010                        .collect_vec();
3011
3012                    if !user_changes.is_empty() {
3013                        trace!(?user_changes, "applying holds changes from channel");
3014                    }
3015
3016                    StorageCollectionsImpl::update_read_capabilities_inner(
3017                        &self.cmds_tx,
3018                        &mut collections,
3019                        &mut batched_changes,
3020                    );
3021                }
3022            }
3023        }
3024
3025        warn!("BackgroundTask shutting down");
3026    }
3027
3028    #[instrument(level = "debug")]
3029    async fn update_write_frontiers(&mut self, updates: &[(GlobalId, &Antichain<T>)]) {
3030        let mut read_capability_changes = BTreeMap::default();
3031
3032        let mut self_collections = self.collections.lock().expect("lock poisoned");
3033
3034        for (id, new_upper) in updates.iter() {
3035            let collection = if let Some(c) = self_collections.get_mut(id) {
3036                c
3037            } else {
3038                trace!(
3039                    "Reference to absent collection {id}, due to concurrent removal of that collection"
3040                );
3041                continue;
3042            };
3043
3044            if PartialOrder::less_than(&collection.write_frontier, *new_upper) {
3045                collection.write_frontier.clone_from(new_upper);
3046            }
3047
3048            let mut new_read_capability = collection
3049                .read_policy
3050                .frontier(collection.write_frontier.borrow());
3051
3052            if id.is_user() {
3053                trace!(
3054                    %id,
3055                    implied_capability = ?collection.implied_capability,
3056                    policy = ?collection.read_policy,
3057                    write_frontier = ?collection.write_frontier,
3058                    ?new_read_capability,
3059                    "update_write_frontiers");
3060            }
3061
3062            if PartialOrder::less_equal(&collection.implied_capability, &new_read_capability) {
3063                let mut update = ChangeBatch::new();
3064                update.extend(new_read_capability.iter().map(|time| (time.clone(), 1)));
3065                std::mem::swap(&mut collection.implied_capability, &mut new_read_capability);
3066                update.extend(new_read_capability.iter().map(|time| (time.clone(), -1)));
3067
3068                if !update.is_empty() {
3069                    read_capability_changes.insert(*id, update);
3070                }
3071            }
3072        }
3073
3074        if !read_capability_changes.is_empty() {
3075            StorageCollectionsImpl::update_read_capabilities_inner(
3076                &self.cmds_tx,
3077                &mut self_collections,
3078                &mut read_capability_changes,
3079            );
3080        }
3081    }
3082
3083    async fn downgrade_sinces(&mut self, cmds: Vec<(GlobalId, Antichain<T>)>) {
3084        for (id, new_since) in cmds {
3085            let since_handle = if let Some(c) = self.since_handles.get_mut(&id) {
3086                c
3087            } else {
3088                // This can happen when someone concurrently drops a collection.
3089                trace!("downgrade_sinces: reference to absent collection {id}");
3090                continue;
3091            };
3092
3093            if id.is_user() {
3094                trace!("downgrading since of {} to {:?}", id, new_since);
3095            }
3096
3097            let epoch = since_handle.opaque().clone();
3098            let result = if new_since.is_empty() {
3099                // A shard's since reaching the empty frontier is a prereq for
3100                // being able to finalize a shard, so the final downgrade should
3101                // never be rate-limited.
3102                let res = Some(
3103                    since_handle
3104                        .compare_and_downgrade_since(&epoch, (&epoch, &new_since))
3105                        .await,
3106                );
3107
3108                info!(%id, "removing persist handles because the since advanced to []!");
3109
3110                let _since_handle = self.since_handles.remove(&id).expect("known to exist");
3111                let dropped_shard_id = if let Some(shard_id) = self.shard_by_id.remove(&id) {
3112                    shard_id
3113                } else {
3114                    panic!("missing GlobalId -> ShardId mapping for id {id}");
3115                };
3116
3117                // We're not responsible for writes to tables, so we also don't
3118                // de-register them from the txn system. Whoever is responsible
3119                // will remove them. We only make sure to remove the table from
3120                // our tracking.
3121                self.txns_shards.remove(&id);
3122
3123                if !self
3124                    .config
3125                    .lock()
3126                    .expect("lock poisoned")
3127                    .parameters
3128                    .finalize_shards
3129                {
3130                    info!(
3131                        "not triggering shard finalization due to dropped storage object because enable_storage_shard_finalization parameter is false"
3132                    );
3133                    return;
3134                }
3135
3136                info!(%id, %dropped_shard_id, "enqueing shard finalization due to dropped collection and dropped persist handle");
3137
3138                self.finalizable_shards.lock().insert(dropped_shard_id);
3139
3140                res
3141            } else {
3142                since_handle
3143                    .maybe_compare_and_downgrade_since(&epoch, (&epoch, &new_since))
3144                    .await
3145            };
3146
3147            if let Some(Err(other_epoch)) = result {
3148                mz_ore::halt!("fenced by envd @ {other_epoch:?}. ours = {epoch:?}");
3149            }
3150        }
3151    }
3152}
3153
3154struct FinalizeShardsTaskConfig {
3155    envd_epoch: NonZeroI64,
3156    config: Arc<Mutex<StorageConfiguration>>,
3157    metrics: StorageCollectionsMetrics,
3158    finalizable_shards: Arc<ShardIdSet>,
3159    finalized_shards: Arc<ShardIdSet>,
3160    persist_location: PersistLocation,
3161    persist: Arc<PersistClientCache>,
3162    read_only: bool,
3163}
3164
3165async fn finalize_shards_task<T>(
3166    FinalizeShardsTaskConfig {
3167        envd_epoch,
3168        config,
3169        metrics,
3170        finalizable_shards,
3171        finalized_shards,
3172        persist_location,
3173        persist,
3174        read_only,
3175    }: FinalizeShardsTaskConfig,
3176) where
3177    T: TimelyTimestamp + TotalOrder + Lattice + Codec64 + Sync,
3178{
3179    if read_only {
3180        info!("disabling shard finalization in read only mode");
3181        return;
3182    }
3183
3184    let mut interval = tokio::time::interval(Duration::from_secs(5));
3185    interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
3186    loop {
3187        interval.tick().await;
3188
3189        if !config
3190            .lock()
3191            .expect("lock poisoned")
3192            .parameters
3193            .finalize_shards
3194        {
3195            debug!(
3196                "not triggering shard finalization due to dropped storage object because enable_storage_shard_finalization parameter is false"
3197            );
3198            continue;
3199        }
3200
3201        let current_finalizable_shards = {
3202            // We hold the lock for as short as possible and pull our cloned set
3203            // of shards.
3204            finalizable_shards.lock().iter().cloned().collect_vec()
3205        };
3206
3207        if current_finalizable_shards.is_empty() {
3208            debug!("no shards to finalize");
3209            continue;
3210        }
3211
3212        debug!(?current_finalizable_shards, "attempting to finalize shards");
3213
3214        // Open a persist client to delete unused shards.
3215        let persist_client = persist.open(persist_location.clone()).await.unwrap();
3216
3217        let metrics = &metrics;
3218        let finalizable_shards = &finalizable_shards;
3219        let finalized_shards = &finalized_shards;
3220        let persist_client = &persist_client;
3221        let diagnostics = &Diagnostics::from_purpose("finalizing shards");
3222
3223        let force_downgrade_since = STORAGE_DOWNGRADE_SINCE_DURING_FINALIZATION
3224            .get(config.lock().expect("lock poisoned").config_set());
3225
3226        let epoch = &PersistEpoch::from(envd_epoch);
3227
3228        futures::stream::iter(current_finalizable_shards.clone())
3229            .map(|shard_id| async move {
3230                let persist_client = persist_client.clone();
3231                let diagnostics = diagnostics.clone();
3232                let epoch = epoch.clone();
3233
3234                metrics.finalization_started.inc();
3235
3236                let is_finalized = persist_client
3237                    .is_finalized::<SourceData, (), T, StorageDiff>(shard_id, diagnostics)
3238                    .await
3239                    .expect("invalid persist usage");
3240
3241                if is_finalized {
3242                    debug!(%shard_id, "shard is already finalized!");
3243                    Some(shard_id)
3244                } else {
3245                    debug!(%shard_id, "finalizing shard");
3246                    let finalize = || async move {
3247                        // TODO: thread the global ID into the shard finalization WAL
3248                        let diagnostics = Diagnostics::from_purpose("finalizing shards");
3249
3250                        // We only use the writer to advance the upper, so using a dummy schema is
3251                        // fine.
3252                        let mut write_handle: WriteHandle<SourceData, (), T, StorageDiff> =
3253                            persist_client
3254                                .open_writer(
3255                                    shard_id,
3256                                    Arc::new(RelationDesc::empty()),
3257                                    Arc::new(UnitSchema),
3258                                    diagnostics,
3259                                )
3260                                .await
3261                                .expect("invalid persist usage");
3262                        write_handle.advance_upper(&Antichain::new()).await;
3263                        write_handle.expire().await;
3264
3265                        if force_downgrade_since {
3266                            let mut since_handle: SinceHandle<
3267                                SourceData,
3268                                (),
3269                                T,
3270                                StorageDiff,
3271                                PersistEpoch,
3272                            > = persist_client
3273                                .open_critical_since(
3274                                    shard_id,
3275                                    PersistClient::CONTROLLER_CRITICAL_SINCE,
3276                                    Diagnostics::from_purpose("finalizing shards"),
3277                                )
3278                                .await
3279                                .expect("invalid persist usage");
3280                            let handle_epoch = since_handle.opaque().clone();
3281                            let our_epoch = epoch.clone();
3282                            let epoch = if our_epoch.0 > handle_epoch.0 {
3283                                // We're newer, but it's fine to use the
3284                                // handle's old epoch to try and downgrade.
3285                                handle_epoch
3286                            } else {
3287                                // Good luck, buddy! The downgrade below will
3288                                // not succeed. There's a process with a newer
3289                                // epoch out there and someone at some juncture
3290                                // will fence out this process.
3291                                our_epoch
3292                            };
3293                            let new_since = Antichain::new();
3294                            let downgrade = since_handle
3295                                .compare_and_downgrade_since(&epoch, (&epoch, &new_since))
3296                                .await;
3297                            if let Err(e) = downgrade {
3298                                warn!("tried to finalize a shard with an advancing epoch: {e:?}");
3299                                return Ok(());
3300                            }
3301                            // Not available now, so finalization is broken.
3302                            // since_handle.expire().await;
3303                        }
3304
3305                        persist_client
3306                            .finalize_shard::<SourceData, (), T, StorageDiff>(
3307                                shard_id,
3308                                Diagnostics::from_purpose("finalizing shards"),
3309                            )
3310                            .await
3311                    };
3312
3313                    match finalize().await {
3314                        Err(e) => {
3315                            // Rather than error, just leave this shard as
3316                            // one to finalize later.
3317                            warn!("error during finalization of shard {shard_id}: {e:?}");
3318                            None
3319                        }
3320                        Ok(()) => {
3321                            debug!(%shard_id, "finalize success!");
3322                            Some(shard_id)
3323                        }
3324                    }
3325                }
3326            })
3327            // Poll each future for each collection concurrently, maximum of 10
3328            // at a time.
3329            // TODO(benesch): the concurrency here should be configurable
3330            // via LaunchDarkly.
3331            .buffer_unordered(10)
3332            // HERE BE DRAGONS: see warning on other uses of buffer_unordered.
3333            // The closure passed to `for_each` must remain fast or we risk
3334            // starving the finalization futures of calls to `poll`.
3335            .for_each(|shard_id| async move {
3336                match shard_id {
3337                    None => metrics.finalization_failed.inc(),
3338                    Some(shard_id) => {
3339                        // We make successfully finalized shards available for
3340                        // removal from the finalization WAL one by one, so that
3341                        // a handful of stuck shards don't prevent us from
3342                        // removing the shards that have made progress. The
3343                        // overhead of repeatedly acquiring and releasing the
3344                        // locks is negligible.
3345                        {
3346                            let mut finalizable_shards = finalizable_shards.lock();
3347                            let mut finalized_shards = finalized_shards.lock();
3348                            finalizable_shards.remove(&shard_id);
3349                            finalized_shards.insert(shard_id);
3350                        }
3351
3352                        metrics.finalization_succeeded.inc();
3353                    }
3354                }
3355            })
3356            .await;
3357
3358        debug!("done finalizing shards");
3359    }
3360}
3361
3362#[derive(Debug)]
3363pub(crate) enum SnapshotStatsAsOf<T: TimelyTimestamp + Lattice + Codec64> {
3364    /// Stats for a shard with an "eager" upper (one that continually advances
3365    /// as time passes, even if no writes are coming in).
3366    Direct(Antichain<T>),
3367    /// Stats for a shard with a "lazy" upper (one that only physically advances
3368    /// in response to writes).
3369    Txns(DataSnapshot<T>),
3370}
3371
3372#[cfg(test)]
3373mod tests {
3374    use std::str::FromStr;
3375    use std::sync::Arc;
3376
3377    use mz_build_info::DUMMY_BUILD_INFO;
3378    use mz_dyncfg::ConfigSet;
3379    use mz_ore::assert_err;
3380    use mz_ore::metrics::{MetricsRegistry, UIntGauge};
3381    use mz_ore::now::SYSTEM_TIME;
3382    use mz_ore::url::SensitiveUrl;
3383    use mz_persist_client::cache::PersistClientCache;
3384    use mz_persist_client::cfg::PersistConfig;
3385    use mz_persist_client::rpc::PubSubClientConnection;
3386    use mz_persist_client::{Diagnostics, PersistClient, PersistLocation, ShardId};
3387    use mz_persist_types::codec_impls::UnitSchema;
3388    use mz_repr::{RelationDesc, Row};
3389    use mz_secrets::InMemorySecretsController;
3390
3391    use super::*;
3392
3393    #[mz_ore::test(tokio::test)]
3394    #[cfg_attr(miri, ignore)] // unsupported operation: integer-to-pointer casts and `ptr::from_exposed_addr`
3395    async fn test_snapshot_stats(&self) {
3396        let persist_location = PersistLocation {
3397            blob_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
3398            consensus_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
3399        };
3400        let persist_client = PersistClientCache::new(
3401            PersistConfig::new_default_configs(&DUMMY_BUILD_INFO, SYSTEM_TIME.clone()),
3402            &MetricsRegistry::new(),
3403            |_, _| PubSubClientConnection::noop(),
3404        );
3405        let persist_client = Arc::new(persist_client);
3406
3407        let (cmds_tx, mut background_task) =
3408            BackgroundTask::new_for_test(persist_location.clone(), Arc::clone(&persist_client));
3409        let background_task =
3410            mz_ore::task::spawn(|| "storage_collections::background_task", async move {
3411                background_task.run().await
3412            });
3413
3414        let persist = persist_client.open(persist_location).await.unwrap();
3415
3416        let shard_id = ShardId::new();
3417        let since_handle = persist
3418            .open_critical_since(
3419                shard_id,
3420                PersistClient::CONTROLLER_CRITICAL_SINCE,
3421                Diagnostics::for_tests(),
3422            )
3423            .await
3424            .unwrap();
3425        let write_handle = persist
3426            .open_writer::<SourceData, (), mz_repr::Timestamp, StorageDiff>(
3427                shard_id,
3428                Arc::new(RelationDesc::empty()),
3429                Arc::new(UnitSchema),
3430                Diagnostics::for_tests(),
3431            )
3432            .await
3433            .unwrap();
3434
3435        cmds_tx
3436            .send(BackgroundCmd::Register {
3437                id: GlobalId::User(1),
3438                is_in_txns: false,
3439                since_handle: SinceHandleWrapper::Critical(since_handle),
3440                write_handle,
3441            })
3442            .unwrap();
3443
3444        let mut write_handle = persist
3445            .open_writer::<SourceData, (), mz_repr::Timestamp, StorageDiff>(
3446                shard_id,
3447                Arc::new(RelationDesc::empty()),
3448                Arc::new(UnitSchema),
3449                Diagnostics::for_tests(),
3450            )
3451            .await
3452            .unwrap();
3453
3454        // No stats for unknown GlobalId.
3455        let stats =
3456            snapshot_stats(&cmds_tx, GlobalId::User(2), Antichain::from_elem(0.into())).await;
3457        assert_err!(stats);
3458
3459        // Stats don't resolve for as_of past the upper.
3460        let stats_fut = snapshot_stats(&cmds_tx, GlobalId::User(1), Antichain::from_elem(1.into()));
3461        assert_none!(stats_fut.now_or_never());
3462
3463        // // Call it again because now_or_never consumed our future and it's not clone-able.
3464        let stats_ts1_fut =
3465            snapshot_stats(&cmds_tx, GlobalId::User(1), Antichain::from_elem(1.into()));
3466
3467        // Write some data.
3468        let data = (
3469            (SourceData(Ok(Row::default())), ()),
3470            mz_repr::Timestamp::from(0),
3471            1i64,
3472        );
3473        let () = write_handle
3474            .compare_and_append(
3475                &[data],
3476                Antichain::from_elem(0.into()),
3477                Antichain::from_elem(1.into()),
3478            )
3479            .await
3480            .unwrap()
3481            .unwrap();
3482
3483        // Verify that we can resolve stats for ts 0 while the ts 1 stats call is outstanding.
3484        let stats = snapshot_stats(&cmds_tx, GlobalId::User(1), Antichain::from_elem(0.into()))
3485            .await
3486            .unwrap();
3487        assert_eq!(stats.num_updates, 1);
3488
3489        // Write more data and unblock the ts 1 call
3490        let data = (
3491            (SourceData(Ok(Row::default())), ()),
3492            mz_repr::Timestamp::from(1),
3493            1i64,
3494        );
3495        let () = write_handle
3496            .compare_and_append(
3497                &[data],
3498                Antichain::from_elem(1.into()),
3499                Antichain::from_elem(2.into()),
3500            )
3501            .await
3502            .unwrap()
3503            .unwrap();
3504
3505        let stats = stats_ts1_fut.await.unwrap();
3506        assert_eq!(stats.num_updates, 2);
3507
3508        // Make sure it runs until at least here.
3509        drop(background_task);
3510    }
3511
3512    async fn snapshot_stats<T: TimelyTimestamp + Lattice + Codec64>(
3513        cmds_tx: &mpsc::UnboundedSender<BackgroundCmd<T>>,
3514        id: GlobalId,
3515        as_of: Antichain<T>,
3516    ) -> Result<SnapshotStats, StorageError<T>> {
3517        let (tx, rx) = oneshot::channel();
3518        cmds_tx
3519            .send(BackgroundCmd::SnapshotStats(
3520                id,
3521                SnapshotStatsAsOf::Direct(as_of),
3522                tx,
3523            ))
3524            .unwrap();
3525        let res = rx.await.expect("BackgroundTask should be live").0;
3526
3527        res.await
3528    }
3529
3530    impl<T: TimelyTimestamp + Lattice + Codec64> BackgroundTask<T> {
3531        fn new_for_test(
3532            _persist_location: PersistLocation,
3533            _persist_client: Arc<PersistClientCache>,
3534        ) -> (mpsc::UnboundedSender<BackgroundCmd<T>>, Self) {
3535            let (cmds_tx, cmds_rx) = mpsc::unbounded_channel();
3536            let (_holds_tx, holds_rx) = mpsc::unbounded_channel();
3537            let connection_context =
3538                ConnectionContext::for_tests(Arc::new(InMemorySecretsController::new()));
3539
3540            let task = Self {
3541                config: Arc::new(Mutex::new(StorageConfiguration::new(
3542                    connection_context,
3543                    ConfigSet::default(),
3544                ))),
3545                cmds_tx: cmds_tx.clone(),
3546                cmds_rx,
3547                holds_rx,
3548                finalizable_shards: Arc::new(ShardIdSet::new(
3549                    UIntGauge::new("finalizable_shards", "dummy gauge for tests").unwrap(),
3550                )),
3551                collections: Arc::new(Mutex::new(BTreeMap::new())),
3552                shard_by_id: BTreeMap::new(),
3553                since_handles: BTreeMap::new(),
3554                txns_handle: None,
3555                txns_shards: BTreeSet::new(),
3556            };
3557
3558            (cmds_tx, task)
3559        }
3560    }
3561}