mz_storage_client/
storage_collections.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! An abstraction for dealing with storage collections.
11
12use std::cmp::Reverse;
13use std::collections::{BTreeMap, BTreeSet};
14use std::fmt::Debug;
15use std::num::NonZeroI64;
16use std::sync::{Arc, Mutex};
17use std::time::Duration;
18
19use async_trait::async_trait;
20use differential_dataflow::lattice::Lattice;
21use futures::future::BoxFuture;
22use futures::stream::{BoxStream, FuturesUnordered};
23use futures::{Future, FutureExt, StreamExt};
24use itertools::Itertools;
25
26use mz_ore::collections::CollectionExt;
27use mz_ore::metrics::MetricsRegistry;
28use mz_ore::now::{EpochMillis, NowFn};
29use mz_ore::task::AbortOnDropHandle;
30use mz_ore::{assert_none, instrument, soft_assert_or_log};
31use mz_persist_client::cache::PersistClientCache;
32use mz_persist_client::cfg::USE_CRITICAL_SINCE_SNAPSHOT;
33use mz_persist_client::critical::SinceHandle;
34use mz_persist_client::read::{Cursor, ReadHandle};
35use mz_persist_client::schema::CaESchema;
36use mz_persist_client::stats::{SnapshotPartsStats, SnapshotStats};
37use mz_persist_client::write::WriteHandle;
38use mz_persist_client::{Diagnostics, PersistClient, PersistLocation, ShardId};
39use mz_persist_types::Codec64;
40use mz_persist_types::codec_impls::UnitSchema;
41use mz_persist_types::txn::TxnsCodec;
42use mz_repr::{GlobalId, RelationDesc, RelationVersion, Row, TimestampManipulation};
43use mz_storage_types::StorageDiff;
44use mz_storage_types::configuration::StorageConfiguration;
45use mz_storage_types::connections::ConnectionContext;
46use mz_storage_types::connections::inline::InlinedConnection;
47use mz_storage_types::controller::{CollectionMetadata, StorageError, TxnsCodecRow};
48use mz_storage_types::dyncfgs::STORAGE_DOWNGRADE_SINCE_DURING_FINALIZATION;
49use mz_storage_types::parameters::StorageParameters;
50use mz_storage_types::read_holds::{ReadHold, ReadHoldError};
51use mz_storage_types::read_policy::ReadPolicy;
52use mz_storage_types::sources::{
53    GenericSourceConnection, IngestionDescription, SourceData, SourceDesc, SourceEnvelope,
54    SourceExport, SourceExportDataConfig, Timeline,
55};
56use mz_storage_types::time_dependence::{TimeDependence, TimeDependenceError};
57use mz_txn_wal::metrics::Metrics as TxnMetrics;
58use mz_txn_wal::txn_read::{DataSnapshot, TxnsRead};
59use mz_txn_wal::txns::TxnsHandle;
60use timely::PartialOrder;
61use timely::order::TotalOrder;
62use timely::progress::frontier::MutableAntichain;
63use timely::progress::{Antichain, ChangeBatch, Timestamp as TimelyTimestamp};
64use tokio::sync::{mpsc, oneshot};
65use tokio::time::MissedTickBehavior;
66use tracing::{debug, info, trace, warn};
67
68use crate::client::TimestamplessUpdateBuilder;
69use crate::controller::{
70    CollectionDescription, DataSource, PersistEpoch, StorageMetadata, StorageTxn,
71};
72use crate::storage_collections::metrics::{ShardIdSet, StorageCollectionsMetrics};
73
74mod metrics;
75
76/// An abstraction for keeping track of storage collections and managing access
77/// to them.
78///
79/// Responsibilities:
80///
81/// - Keeps a critical persist handle for holding the since of collections
82///   where it need to be.
83///
84/// - Drives the since forward based on the upper of a collection and a
85///   [ReadPolicy].
86///
87/// - Hands out [ReadHolds](ReadHold) that prevent a collection's since from
88/// advancing while it needs to be read at a specific time.
89#[async_trait]
90pub trait StorageCollections: Debug {
91    type Timestamp: TimelyTimestamp;
92
93    /// On boot, reconcile this [StorageCollections] with outside state. We get
94    /// a [StorageTxn] where we can record any durable state that we need.
95    ///
96    /// We get `init_ids`, which tells us about all collections that currently
97    /// exist, so that we can record durable state for those that _we_ don't
98    /// know yet about.
99    async fn initialize_state(
100        &self,
101        txn: &mut (dyn StorageTxn<Self::Timestamp> + Send),
102        init_ids: BTreeSet<GlobalId>,
103    ) -> Result<(), StorageError<Self::Timestamp>>;
104
105    /// Update storage configuration with new parameters.
106    fn update_parameters(&self, config_params: StorageParameters);
107
108    /// Returns the [CollectionMetadata] of the collection identified by `id`.
109    fn collection_metadata(
110        &self,
111        id: GlobalId,
112    ) -> Result<CollectionMetadata, StorageError<Self::Timestamp>>;
113
114    /// Acquire an iterator over [CollectionMetadata] for all active
115    /// collections.
116    ///
117    /// A collection is "active" when it has a non empty frontier of read
118    /// capabilties.
119    fn active_collection_metadatas(&self) -> Vec<(GlobalId, CollectionMetadata)>;
120
121    /// Returns the frontiers of the identified collection.
122    fn collection_frontiers(
123        &self,
124        id: GlobalId,
125    ) -> Result<CollectionFrontiers<Self::Timestamp>, StorageError<Self::Timestamp>> {
126        let frontiers = self
127            .collections_frontiers(vec![id])?
128            .expect_element(|| "known to exist");
129
130        Ok(frontiers)
131    }
132
133    /// Atomically gets and returns the frontiers of all the identified
134    /// collections.
135    fn collections_frontiers(
136        &self,
137        id: Vec<GlobalId>,
138    ) -> Result<Vec<CollectionFrontiers<Self::Timestamp>>, StorageError<Self::Timestamp>>;
139
140    /// Atomically gets and returns the frontiers of all active collections.
141    ///
142    /// A collection is "active" when it has a non empty frontier of read
143    /// capabilties.
144    fn active_collection_frontiers(&self) -> Vec<CollectionFrontiers<Self::Timestamp>>;
145
146    /// Checks whether a collection exists under the given `GlobalId`. Returns
147    /// an error if the collection does not exist.
148    fn check_exists(&self, id: GlobalId) -> Result<(), StorageError<Self::Timestamp>>;
149
150    /// Returns aggregate statistics about the contents of the local input named
151    /// `id` at `as_of`.
152    async fn snapshot_stats(
153        &self,
154        id: GlobalId,
155        as_of: Antichain<Self::Timestamp>,
156    ) -> Result<SnapshotStats, StorageError<Self::Timestamp>>;
157
158    /// Returns aggregate statistics about the contents of the local input named
159    /// `id` at `as_of`.
160    ///
161    /// Note that this async function itself returns a future. We may
162    /// need to block on the stats being available, but don't want to hold a reference
163    /// to the controller for too long... so the outer future holds a reference to the
164    /// controller but returns quickly, and the inner future is slow but does not
165    /// reference the controller.
166    async fn snapshot_parts_stats(
167        &self,
168        id: GlobalId,
169        as_of: Antichain<Self::Timestamp>,
170    ) -> BoxFuture<'static, Result<SnapshotPartsStats, StorageError<Self::Timestamp>>>;
171
172    /// Returns a snapshot of the contents of collection `id` at `as_of`.
173    fn snapshot(
174        &self,
175        id: GlobalId,
176        as_of: Self::Timestamp,
177    ) -> BoxFuture<'static, Result<Vec<(Row, StorageDiff)>, StorageError<Self::Timestamp>>>;
178
179    /// Returns a snapshot of the contents of collection `id` at the largest
180    /// readable `as_of`.
181    async fn snapshot_latest(
182        &self,
183        id: GlobalId,
184    ) -> Result<Vec<Row>, StorageError<Self::Timestamp>>;
185
186    /// Returns a snapshot of the contents of collection `id` at `as_of`.
187    fn snapshot_cursor(
188        &self,
189        id: GlobalId,
190        as_of: Self::Timestamp,
191    ) -> BoxFuture<'static, Result<SnapshotCursor<Self::Timestamp>, StorageError<Self::Timestamp>>>
192    where
193        Self::Timestamp: Codec64 + TimelyTimestamp + Lattice;
194
195    /// Generates a snapshot of the contents of collection `id` at `as_of` and
196    /// streams out all of the updates in bounded memory.
197    ///
198    /// The output is __not__ consolidated.
199    fn snapshot_and_stream(
200        &self,
201        id: GlobalId,
202        as_of: Self::Timestamp,
203    ) -> BoxFuture<
204        'static,
205        Result<
206            BoxStream<'static, (SourceData, Self::Timestamp, StorageDiff)>,
207            StorageError<Self::Timestamp>,
208        >,
209    >;
210
211    /// Create a [`TimestamplessUpdateBuilder`] that can be used to stage
212    /// updates for the provided [`GlobalId`].
213    fn create_update_builder(
214        &self,
215        id: GlobalId,
216    ) -> BoxFuture<
217        'static,
218        Result<
219            TimestamplessUpdateBuilder<SourceData, (), Self::Timestamp, StorageDiff>,
220            StorageError<Self::Timestamp>,
221        >,
222    >
223    where
224        Self::Timestamp: Lattice + Codec64;
225
226    /// Update the given [`StorageTxn`] with the appropriate metadata given the
227    /// IDs to add and drop.
228    ///
229    /// The data modified in the `StorageTxn` must be made available in all
230    /// subsequent calls that require [`StorageMetadata`] as a parameter.
231    async fn prepare_state(
232        &self,
233        txn: &mut (dyn StorageTxn<Self::Timestamp> + Send),
234        ids_to_add: BTreeSet<GlobalId>,
235        ids_to_drop: BTreeSet<GlobalId>,
236        ids_to_register: BTreeMap<GlobalId, ShardId>,
237    ) -> Result<(), StorageError<Self::Timestamp>>;
238
239    /// Create the collections described by the individual
240    /// [CollectionDescriptions](CollectionDescription).
241    ///
242    /// Each command carries the source id, the source description, and any
243    /// associated metadata needed to ingest the particular source.
244    ///
245    /// This command installs collection state for the indicated sources, and
246    /// they are now valid to use in queries at times beyond the initial `since`
247    /// frontiers. Each collection also acquires a read capability at this
248    /// frontier, which will need to be repeatedly downgraded with
249    /// `allow_compaction()` to permit compaction.
250    ///
251    /// This method is NOT idempotent; It can fail between processing of
252    /// different collections and leave the [StorageCollections] in an
253    /// inconsistent state. It is almost always wrong to do anything but abort
254    /// the process on `Err`.
255    ///
256    /// The `register_ts` is used as the initial timestamp that tables are
257    /// available for reads. (We might later give non-tables the same treatment,
258    /// but hold off on that initially.) Callers must provide a Some if any of
259    /// the collections is a table. A None may be given if none of the
260    /// collections are a table (i.e. all materialized views, sources, etc).
261    ///
262    /// `migrated_storage_collections` is a set of migrated storage collections to be excluded
263    /// from the txn-wal sub-system.
264    async fn create_collections_for_bootstrap(
265        &self,
266        storage_metadata: &StorageMetadata,
267        register_ts: Option<Self::Timestamp>,
268        collections: Vec<(GlobalId, CollectionDescription<Self::Timestamp>)>,
269        migrated_storage_collections: &BTreeSet<GlobalId>,
270    ) -> Result<(), StorageError<Self::Timestamp>>;
271
272    /// Alters the identified ingestion to use the provided [`SourceDesc`].
273    ///
274    /// NOTE: Ideally, [StorageCollections] would not care about these, but we
275    /// have to learn about changes such that when new subsources are created we
276    /// can correctly determine a since based on its depenencies' sinces. This
277    /// is really only relevant because newly created subsources depend on the
278    /// remap shard, and we can't just have them start at since 0.
279    async fn alter_ingestion_source_desc(
280        &self,
281        ingestion_id: GlobalId,
282        source_desc: SourceDesc,
283    ) -> Result<(), StorageError<Self::Timestamp>>;
284
285    /// Alters the data config for the specified source exports of the specified ingestions.
286    async fn alter_ingestion_export_data_configs(
287        &self,
288        source_exports: BTreeMap<GlobalId, SourceExportDataConfig>,
289    ) -> Result<(), StorageError<Self::Timestamp>>;
290
291    /// Alters each identified collection to use the correlated
292    /// [`GenericSourceConnection`].
293    ///
294    /// See NOTE on [StorageCollections::alter_ingestion_source_desc].
295    async fn alter_ingestion_connections(
296        &self,
297        source_connections: BTreeMap<GlobalId, GenericSourceConnection<InlinedConnection>>,
298    ) -> Result<(), StorageError<Self::Timestamp>>;
299
300    /// Updates the [`RelationDesc`] for the specified table.
301    async fn alter_table_desc(
302        &self,
303        existing_collection: GlobalId,
304        new_collection: GlobalId,
305        new_desc: RelationDesc,
306        expected_version: RelationVersion,
307    ) -> Result<(), StorageError<Self::Timestamp>>;
308
309    /// Drops the read capability for the sources and allows their resources to
310    /// be reclaimed.
311    ///
312    /// TODO(jkosh44): This method does not validate the provided identifiers.
313    /// Currently when the controller starts/restarts it has no durable state.
314    /// That means that it has no way of remembering any past commands sent. In
315    /// the future we plan on persisting state for the controller so that it is
316    /// aware of past commands. Therefore this method is for dropping sources
317    /// that we know to have been previously created, but have been forgotten by
318    /// the controller due to a restart. Once command history becomes durable we
319    /// can remove this method and use the normal `drop_sources`.
320    fn drop_collections_unvalidated(
321        &self,
322        storage_metadata: &StorageMetadata,
323        identifiers: Vec<GlobalId>,
324    );
325
326    /// Assigns a read policy to specific identifiers.
327    ///
328    /// The policies are assigned in the order presented, and repeated
329    /// identifiers should conclude with the last policy. Changing a policy will
330    /// immediately downgrade the read capability if appropriate, but it will
331    /// not "recover" the read capability if the prior capability is already
332    /// ahead of it.
333    ///
334    /// This [StorageCollections] may include its own overrides on these
335    /// policies.
336    ///
337    /// Identifiers not present in `policies` retain their existing read
338    /// policies.
339    fn set_read_policies(&self, policies: Vec<(GlobalId, ReadPolicy<Self::Timestamp>)>);
340
341    /// Acquires and returns the earliest possible read holds for the specified
342    /// collections.
343    fn acquire_read_holds(
344        &self,
345        desired_holds: Vec<GlobalId>,
346    ) -> Result<Vec<ReadHold<Self::Timestamp>>, ReadHoldError>;
347
348    /// Get the time dependence for a storage collection. Returns no value if unknown or if
349    /// the object isn't managed by storage.
350    fn determine_time_dependence(
351        &self,
352        id: GlobalId,
353    ) -> Result<Option<TimeDependence>, TimeDependenceError>;
354}
355
356/// A cursor over a snapshot, allowing us to read just part of a snapshot in its
357/// consolidated form.
358pub struct SnapshotCursor<T: Codec64 + TimelyTimestamp + Lattice> {
359    // We allocate a temporary read handle for each snapshot, and that handle needs to live at
360    // least as long as the cursor itself, which holds part leases. Bundling them together!
361    pub _read_handle: ReadHandle<SourceData, (), T, StorageDiff>,
362    pub cursor: Cursor<SourceData, (), T, StorageDiff>,
363}
364
365impl<T: Codec64 + TimelyTimestamp + Lattice + Sync> SnapshotCursor<T> {
366    pub async fn next(
367        &mut self,
368    ) -> Option<
369        impl Iterator<
370            Item = (
371                (Result<SourceData, String>, Result<(), String>),
372                T,
373                StorageDiff,
374            ),
375        > + Sized
376        + '_,
377    > {
378        self.cursor.next().await
379    }
380}
381
382/// Frontiers of the collection identified by `id`.
383#[derive(Debug)]
384pub struct CollectionFrontiers<T> {
385    /// The [GlobalId] of the collection that these frontiers belong to.
386    pub id: GlobalId,
387
388    /// The upper/write frontier of the collection.
389    pub write_frontier: Antichain<T>,
390
391    /// The since frontier that is implied by the collection's existence,
392    /// disregarding any read holds.
393    ///
394    /// Concretely, it is the since frontier that is implied by the combination
395    /// of the `write_frontier` and a [ReadPolicy]. The implied capability is
396    /// derived from the write frontier using the [ReadPolicy].
397    pub implied_capability: Antichain<T>,
398
399    /// The frontier of all oustanding [ReadHolds](ReadHold). This includes the
400    /// implied capability.
401    pub read_capabilities: Antichain<T>,
402}
403
404/// Implementation of [StorageCollections] that is shallow-cloneable and uses a
405/// background task for doing work concurrently, in the background.
406#[derive(Debug, Clone)]
407pub struct StorageCollectionsImpl<
408    T: TimelyTimestamp + Lattice + Codec64 + From<EpochMillis> + TimestampManipulation,
409> {
410    /// The fencing token for this instance of [StorageCollections], and really
411    /// all of the controllers and Coordinator.
412    envd_epoch: NonZeroI64,
413
414    /// Whether or not this [StorageCollections] is in read-only mode.
415    ///
416    /// When in read-only mode, we are not allowed to affect changes to external
417    /// systems, including, for example, acquiring and downgrading critical
418    /// [SinceHandles](SinceHandle)
419    read_only: bool,
420
421    /// The set of [ShardIds](ShardId) that we have to finalize. These will have
422    /// been persisted by the caller of [StorageCollections::prepare_state].
423    finalizable_shards: Arc<ShardIdSet>,
424
425    /// The set of [ShardIds](ShardId) that we have finalized. We keep track of
426    /// shards here until we are given a chance to let our callers know that
427    /// these have been finalized, for example via
428    /// [StorageCollections::prepare_state].
429    finalized_shards: Arc<ShardIdSet>,
430
431    /// Collections maintained by this [StorageCollections].
432    collections: Arc<std::sync::Mutex<BTreeMap<GlobalId, CollectionState<T>>>>,
433
434    /// A shared TxnsCache running in a task and communicated with over a channel.
435    txns_read: TxnsRead<T>,
436
437    /// Storage configuration parameters.
438    config: Arc<Mutex<StorageConfiguration>>,
439
440    /// The upper of the txn shard as it was when we booted. We forward the
441    /// upper of created/registered tables to make sure that their uppers are at
442    /// least not less than the initially known txn upper.
443    ///
444    /// NOTE: This works around a quirk in how the adapter chooses the as_of of
445    /// existing indexes when bootstrapping, where tables that have an upper
446    /// that is less than the initially known txn upper can lead to indexes that
447    /// cannot hydrate in read-only mode.
448    initial_txn_upper: Antichain<T>,
449
450    /// The persist location where all storage collections are being written to
451    persist_location: PersistLocation,
452
453    /// A persist client used to write to storage collections
454    persist: Arc<PersistClientCache>,
455
456    /// For sending commands to our internal task.
457    cmd_tx: mpsc::UnboundedSender<BackgroundCmd<T>>,
458
459    /// For sending updates about read holds to our internal task.
460    holds_tx: mpsc::UnboundedSender<(GlobalId, ChangeBatch<T>)>,
461
462    /// Handles to tasks we own, making sure they're dropped when we are.
463    _background_task: Arc<AbortOnDropHandle<()>>,
464    _finalize_shards_task: Arc<AbortOnDropHandle<()>>,
465}
466
467// Supporting methods for implementing [StorageCollections].
468//
469// Almost all internal methods that are the backing implementation for a trait
470// method have the `_inner` suffix.
471//
472// We follow a pattern where `_inner` methods get a mutable reference to the
473// shared collections state, and it's the public-facing method that locks the
474// state for the duration of its invocation. This allows calling other `_inner`
475// methods from within `_inner` methods.
476impl<T> StorageCollectionsImpl<T>
477where
478    T: TimelyTimestamp
479        + Lattice
480        + Codec64
481        + From<EpochMillis>
482        + TimestampManipulation
483        + Into<mz_repr::Timestamp>
484        + Sync,
485{
486    /// Creates and returns a new [StorageCollections].
487    ///
488    /// Note that when creating a new [StorageCollections], you must also
489    /// reconcile it with the previous state using
490    /// [StorageCollections::initialize_state],
491    /// [StorageCollections::prepare_state], and
492    /// [StorageCollections::create_collections_for_bootstrap].
493    pub async fn new(
494        persist_location: PersistLocation,
495        persist_clients: Arc<PersistClientCache>,
496        metrics_registry: &MetricsRegistry,
497        _now: NowFn,
498        txns_metrics: Arc<TxnMetrics>,
499        envd_epoch: NonZeroI64,
500        read_only: bool,
501        connection_context: ConnectionContext,
502        txn: &dyn StorageTxn<T>,
503    ) -> Self {
504        let metrics = StorageCollectionsMetrics::register_into(metrics_registry);
505
506        // This value must be already installed because we must ensure it's
507        // durably recorded before it is used, otherwise we risk leaking persist
508        // state.
509        let txns_id = txn
510            .get_txn_wal_shard()
511            .expect("must call prepare initialization before creating StorageCollections");
512
513        let txns_client = persist_clients
514            .open(persist_location.clone())
515            .await
516            .expect("location should be valid");
517
518        // We have to initialize, so that TxnsRead::start() below does not
519        // block.
520        let _txns_handle: TxnsHandle<SourceData, (), T, StorageDiff, PersistEpoch, TxnsCodecRow> =
521            TxnsHandle::open(
522                T::minimum(),
523                txns_client.clone(),
524                txns_client.dyncfgs().clone(),
525                Arc::clone(&txns_metrics),
526                txns_id,
527            )
528            .await;
529
530        // For handing to the background task, for listening to upper updates.
531        let (txns_key_schema, txns_val_schema) = TxnsCodecRow::schemas();
532        let mut txns_write = txns_client
533            .open_writer(
534                txns_id,
535                Arc::new(txns_key_schema),
536                Arc::new(txns_val_schema),
537                Diagnostics {
538                    shard_name: "txns".to_owned(),
539                    handle_purpose: "commit txns".to_owned(),
540                },
541            )
542            .await
543            .expect("txns schema shouldn't change");
544
545        let txns_read = TxnsRead::start::<TxnsCodecRow>(txns_client.clone(), txns_id).await;
546
547        let collections = Arc::new(std::sync::Mutex::new(BTreeMap::default()));
548        let finalizable_shards =
549            Arc::new(ShardIdSet::new(metrics.finalization_outstanding.clone()));
550        let finalized_shards =
551            Arc::new(ShardIdSet::new(metrics.finalization_pending_commit.clone()));
552        let config = Arc::new(Mutex::new(StorageConfiguration::new(
553            connection_context,
554            mz_dyncfgs::all_dyncfgs(),
555        )));
556
557        let initial_txn_upper = txns_write.fetch_recent_upper().await.to_owned();
558
559        let (cmd_tx, cmd_rx) = mpsc::unbounded_channel();
560        let (holds_tx, holds_rx) = mpsc::unbounded_channel();
561        let mut background_task = BackgroundTask {
562            config: Arc::clone(&config),
563            cmds_tx: cmd_tx.clone(),
564            cmds_rx: cmd_rx,
565            holds_rx,
566            collections: Arc::clone(&collections),
567            finalizable_shards: Arc::clone(&finalizable_shards),
568            shard_by_id: BTreeMap::new(),
569            since_handles: BTreeMap::new(),
570            txns_handle: Some(txns_write),
571            txns_shards: Default::default(),
572        };
573
574        let background_task =
575            mz_ore::task::spawn(|| "storage_collections::background_task", async move {
576                background_task.run().await
577            });
578
579        let finalize_shards_task = mz_ore::task::spawn(
580            || "storage_collections::finalize_shards_task",
581            finalize_shards_task::<T>(FinalizeShardsTaskConfig {
582                envd_epoch: envd_epoch.clone(),
583                config: Arc::clone(&config),
584                metrics,
585                finalizable_shards: Arc::clone(&finalizable_shards),
586                finalized_shards: Arc::clone(&finalized_shards),
587                persist_location: persist_location.clone(),
588                persist: Arc::clone(&persist_clients),
589                read_only,
590            }),
591        );
592
593        Self {
594            finalizable_shards,
595            finalized_shards,
596            collections,
597            txns_read,
598            envd_epoch,
599            read_only,
600            config,
601            initial_txn_upper,
602            persist_location,
603            persist: persist_clients,
604            cmd_tx,
605            holds_tx,
606            _background_task: Arc::new(background_task.abort_on_drop()),
607            _finalize_shards_task: Arc::new(finalize_shards_task.abort_on_drop()),
608        }
609    }
610
611    /// Opens a [WriteHandle] and a [SinceHandleWrapper], for holding back the since.
612    ///
613    /// `since` is an optional since that the read handle will be forwarded to
614    /// if it is less than its current since.
615    ///
616    /// This will `halt!` the process if we cannot successfully acquire a
617    /// critical handle with our current epoch.
618    async fn open_data_handles(
619        &self,
620        id: &GlobalId,
621        shard: ShardId,
622        since: Option<&Antichain<T>>,
623        relation_desc: RelationDesc,
624        persist_client: &PersistClient,
625    ) -> (
626        WriteHandle<SourceData, (), T, StorageDiff>,
627        SinceHandleWrapper<T>,
628    ) {
629        let since_handle = if self.read_only {
630            let read_handle = self
631                .open_leased_handle(id, shard, relation_desc.clone(), since, persist_client)
632                .await;
633            SinceHandleWrapper::Leased(read_handle)
634        } else {
635            let since_handle = self
636                .open_critical_handle(id, shard, since, persist_client)
637                .await;
638
639            SinceHandleWrapper::Critical(since_handle)
640        };
641
642        let mut write_handle = self
643            .open_write_handle(id, shard, relation_desc, persist_client)
644            .await;
645
646        // N.B.
647        // Fetch the most recent upper for the write handle. Otherwise, this may
648        // be behind the since of the since handle. Its vital this happens AFTER
649        // we create the since handle as it needs to be linearized with that
650        // operation. It may be true that creating the write handle after the
651        // since handle already ensures this, but we do this out of an abundance
652        // of caution.
653        //
654        // Note that this returns the upper, but also sets it on the handle to
655        // be fetched later.
656        write_handle.fetch_recent_upper().await;
657
658        (write_handle, since_handle)
659    }
660
661    /// Opens a write handle for the given `shard`.
662    async fn open_write_handle(
663        &self,
664        id: &GlobalId,
665        shard: ShardId,
666        relation_desc: RelationDesc,
667        persist_client: &PersistClient,
668    ) -> WriteHandle<SourceData, (), T, StorageDiff> {
669        let diagnostics = Diagnostics {
670            shard_name: id.to_string(),
671            handle_purpose: format!("controller data for {}", id),
672        };
673
674        let write = persist_client
675            .open_writer(
676                shard,
677                Arc::new(relation_desc),
678                Arc::new(UnitSchema),
679                diagnostics.clone(),
680            )
681            .await
682            .expect("invalid persist usage");
683
684        write
685    }
686
687    /// Opens a critical since handle for the given `shard`.
688    ///
689    /// `since` is an optional since that the read handle will be forwarded to
690    /// if it is less than its current since.
691    ///
692    /// This will `halt!` the process if we cannot successfully acquire a
693    /// critical handle with our current epoch.
694    async fn open_critical_handle(
695        &self,
696        id: &GlobalId,
697        shard: ShardId,
698        since: Option<&Antichain<T>>,
699        persist_client: &PersistClient,
700    ) -> SinceHandle<SourceData, (), T, StorageDiff, PersistEpoch> {
701        tracing::debug!(%id, ?since, "opening critical handle");
702
703        assert!(
704            !self.read_only,
705            "attempting to open critical SinceHandle in read-only mode"
706        );
707
708        let diagnostics = Diagnostics {
709            shard_name: id.to_string(),
710            handle_purpose: format!("controller data for {}", id),
711        };
712
713        // Construct the handle in a separate block to ensure all error paths
714        // are diverging
715        let since_handle = {
716            // This block's aim is to ensure the handle is in terms of our epoch
717            // by the time we return it.
718            let mut handle: SinceHandle<_, _, _, _, PersistEpoch> = persist_client
719                .open_critical_since(
720                    shard,
721                    PersistClient::CONTROLLER_CRITICAL_SINCE,
722                    diagnostics.clone(),
723                )
724                .await
725                .expect("invalid persist usage");
726
727            // Take the join of the handle's since and the provided `since`;
728            // this lets materialized views express the since at which their
729            // read handles "start."
730            let provided_since = match since {
731                Some(since) => since,
732                None => &Antichain::from_elem(T::minimum()),
733            };
734            let since = handle.since().join(provided_since);
735
736            let our_epoch = self.envd_epoch;
737
738            loop {
739                let current_epoch: PersistEpoch = handle.opaque().clone();
740
741                // Ensure the current epoch is <= our epoch.
742                let unchecked_success = current_epoch.0.map(|e| e <= our_epoch).unwrap_or(true);
743
744                if unchecked_success {
745                    // Update the handle's state so that it is in terms of our
746                    // epoch.
747                    let checked_success = handle
748                        .compare_and_downgrade_since(
749                            &current_epoch,
750                            (&PersistEpoch::from(our_epoch), &since),
751                        )
752                        .await
753                        .is_ok();
754                    if checked_success {
755                        break handle;
756                    }
757                } else {
758                    mz_ore::halt!("fenced by envd @ {current_epoch:?}. ours = {our_epoch}");
759                }
760            }
761        };
762
763        since_handle
764    }
765
766    /// Opens a leased [ReadHandle], for the purpose of holding back a since,
767    /// for the given `shard`.
768    ///
769    /// `since` is an optional since that the read handle will be forwarded to
770    /// if it is less than its current since.
771    async fn open_leased_handle(
772        &self,
773        id: &GlobalId,
774        shard: ShardId,
775        relation_desc: RelationDesc,
776        since: Option<&Antichain<T>>,
777        persist_client: &PersistClient,
778    ) -> ReadHandle<SourceData, (), T, StorageDiff> {
779        tracing::debug!(%id, ?since, "opening leased handle");
780
781        let diagnostics = Diagnostics {
782            shard_name: id.to_string(),
783            handle_purpose: format!("controller data for {}", id),
784        };
785
786        let use_critical_since = false;
787        let mut handle: ReadHandle<_, _, _, _> = persist_client
788            .open_leased_reader(
789                shard,
790                Arc::new(relation_desc),
791                Arc::new(UnitSchema),
792                diagnostics.clone(),
793                use_critical_since,
794            )
795            .await
796            .expect("invalid persist usage");
797
798        // Take the join of the handle's since and the provided `since`;
799        // this lets materialized views express the since at which their
800        // read handles "start."
801        let provided_since = match since {
802            Some(since) => since,
803            None => &Antichain::from_elem(T::minimum()),
804        };
805        let since = handle.since().join(provided_since);
806
807        handle.downgrade_since(&since).await;
808
809        handle
810    }
811
812    fn register_handles(
813        &self,
814        id: GlobalId,
815        is_in_txns: bool,
816        since_handle: SinceHandleWrapper<T>,
817        write_handle: WriteHandle<SourceData, (), T, StorageDiff>,
818    ) {
819        self.send(BackgroundCmd::Register {
820            id,
821            is_in_txns,
822            since_handle,
823            write_handle,
824        });
825    }
826
827    fn send(&self, cmd: BackgroundCmd<T>) {
828        let _ = self.cmd_tx.send(cmd);
829    }
830
831    async fn snapshot_stats_inner(
832        &self,
833        id: GlobalId,
834        as_of: SnapshotStatsAsOf<T>,
835    ) -> Result<SnapshotStats, StorageError<T>> {
836        // TODO: Pull this out of BackgroundTask. Unlike the other methods, the
837        // caller of this one drives it to completion.
838        //
839        // We'd need to either share the critical handle somehow or maybe have
840        // two instances around, one in the worker and one in the
841        // StorageCollections.
842        let (tx, rx) = oneshot::channel();
843        self.send(BackgroundCmd::SnapshotStats(id, as_of, tx));
844        rx.await.expect("BackgroundTask should be live").0.await
845    }
846
847    /// If this identified collection has a dependency, install a read hold on
848    /// it.
849    ///
850    /// This is necessary to ensure that the dependency's since does not advance
851    /// beyond its dependents'.
852    fn install_collection_dependency_read_holds_inner(
853        &self,
854        self_collections: &mut BTreeMap<GlobalId, CollectionState<T>>,
855        id: GlobalId,
856    ) -> Result<(), StorageError<T>> {
857        let (deps, collection_implied_capability) = match self_collections.get(&id) {
858            Some(CollectionState {
859                storage_dependencies: deps,
860                implied_capability,
861                ..
862            }) => (deps.clone(), implied_capability),
863            _ => return Ok(()),
864        };
865
866        for dep in deps.iter() {
867            let dep_collection = self_collections
868                .get(dep)
869                .ok_or(StorageError::IdentifierMissing(id))?;
870
871            mz_ore::soft_assert_or_log!(
872                PartialOrder::less_equal(
873                    &dep_collection.implied_capability,
874                    collection_implied_capability
875                ),
876                "dependency since ({dep}@{:?}) cannot be in advance of dependent's since ({id}@{:?})",
877                dep_collection.implied_capability,
878                collection_implied_capability,
879            );
880        }
881
882        self.install_read_capabilities_inner(
883            self_collections,
884            id,
885            &deps,
886            collection_implied_capability.clone(),
887        )?;
888
889        Ok(())
890    }
891
892    /// Determine if this collection has another dependency.
893    ///
894    /// Currently, collections have either 0 or 1 dependencies.
895    fn determine_collection_dependencies(
896        &self,
897        self_collections: &BTreeMap<GlobalId, CollectionState<T>>,
898        data_source: &DataSource<T>,
899    ) -> Result<Vec<GlobalId>, StorageError<T>> {
900        let dependencies = match &data_source {
901            DataSource::Introspection(_)
902            | DataSource::Webhook
903            | DataSource::Table { primary: None }
904            | DataSource::Progress
905            | DataSource::Other => Vec::new(),
906            DataSource::Table {
907                primary: Some(primary),
908            } => vec![*primary],
909            DataSource::IngestionExport {
910                ingestion_id,
911                data_config,
912                ..
913            } => {
914                // Ingestion exports depend on their primary source's remap
915                // collection, except when they use a CDCv2 envelope.
916                let source = self_collections
917                    .get(ingestion_id)
918                    .ok_or(StorageError::IdentifierMissing(*ingestion_id))?;
919                let DataSource::Ingestion(ingestion) = &source.description.data_source else {
920                    panic!("SourceExport must refer to a primary source that already exists");
921                };
922
923                match data_config.envelope {
924                    SourceEnvelope::CdcV2 => Vec::new(),
925                    _ => vec![ingestion.remap_collection_id],
926                }
927            }
928            // Ingestions depend on their remap collection.
929            DataSource::Ingestion(ingestion) => vec![ingestion.remap_collection_id],
930            DataSource::Sink { desc } => vec![desc.sink.from],
931        };
932
933        Ok(dependencies)
934    }
935
936    /// Install read capabilities on the given `storage_dependencies`.
937    #[instrument(level = "debug")]
938    fn install_read_capabilities_inner(
939        &self,
940        self_collections: &mut BTreeMap<GlobalId, CollectionState<T>>,
941        from_id: GlobalId,
942        storage_dependencies: &[GlobalId],
943        read_capability: Antichain<T>,
944    ) -> Result<(), StorageError<T>> {
945        let mut changes = ChangeBatch::new();
946        for time in read_capability.iter() {
947            changes.update(time.clone(), 1);
948        }
949
950        if tracing::span_enabled!(tracing::Level::TRACE) {
951            // Collecting `user_capabilities` is potentially slow, thus only do it when needed.
952            let user_capabilities = self_collections
953                .iter_mut()
954                .filter(|(id, _c)| id.is_user())
955                .map(|(id, c)| {
956                    let updates = c.read_capabilities.updates().cloned().collect_vec();
957                    (*id, c.implied_capability.clone(), updates)
958                })
959                .collect_vec();
960
961            trace!(
962                %from_id,
963                ?storage_dependencies,
964                ?read_capability,
965                ?user_capabilities,
966                "install_read_capabilities_inner");
967        }
968
969        let mut storage_read_updates = storage_dependencies
970            .iter()
971            .map(|id| (*id, changes.clone()))
972            .collect();
973
974        StorageCollectionsImpl::update_read_capabilities_inner(
975            &self.cmd_tx,
976            self_collections,
977            &mut storage_read_updates,
978        );
979
980        if tracing::span_enabled!(tracing::Level::TRACE) {
981            // Collecting `user_capabilities` is potentially slow, thus only do it when needed.
982            let user_capabilities = self_collections
983                .iter_mut()
984                .filter(|(id, _c)| id.is_user())
985                .map(|(id, c)| {
986                    let updates = c.read_capabilities.updates().cloned().collect_vec();
987                    (*id, c.implied_capability.clone(), updates)
988                })
989                .collect_vec();
990
991            trace!(
992                %from_id,
993                ?storage_dependencies,
994                ?read_capability,
995                ?user_capabilities,
996                "after install_read_capabilities_inner!");
997        }
998
999        Ok(())
1000    }
1001
1002    async fn recent_upper(&self, id: GlobalId) -> Result<Antichain<T>, StorageError<T>> {
1003        let metadata = &self.collection_metadata(id)?;
1004        let persist_client = self
1005            .persist
1006            .open(metadata.persist_location.clone())
1007            .await
1008            .unwrap();
1009        // Duplicate part of open_data_handles here because we don't need the
1010        // fetch_recent_upper call. The pubsub-updated shared_upper is enough.
1011        let diagnostics = Diagnostics {
1012            shard_name: id.to_string(),
1013            handle_purpose: format!("controller data for {}", id),
1014        };
1015        // NB: Opening a WriteHandle is cheap if it's never used in a
1016        // compare_and_append operation.
1017        let write = persist_client
1018            .open_writer::<SourceData, (), T, StorageDiff>(
1019                metadata.data_shard,
1020                Arc::new(metadata.relation_desc.clone()),
1021                Arc::new(UnitSchema),
1022                diagnostics.clone(),
1023            )
1024            .await
1025            .expect("invalid persist usage");
1026        Ok(write.shared_upper())
1027    }
1028
1029    async fn read_handle_for_snapshot(
1030        persist: Arc<PersistClientCache>,
1031        metadata: &CollectionMetadata,
1032        id: GlobalId,
1033    ) -> Result<ReadHandle<SourceData, (), T, StorageDiff>, StorageError<T>> {
1034        let persist_client = persist
1035            .open(metadata.persist_location.clone())
1036            .await
1037            .unwrap();
1038
1039        // We create a new read handle every time someone requests a snapshot
1040        // and then immediately expire it instead of keeping a read handle
1041        // permanently in our state to avoid having it heartbeat continually.
1042        // The assumption is that calls to snapshot are rare and therefore worth
1043        // it to always create a new handle.
1044        let read_handle = persist_client
1045            .open_leased_reader::<SourceData, (), _, _>(
1046                metadata.data_shard,
1047                Arc::new(metadata.relation_desc.clone()),
1048                Arc::new(UnitSchema),
1049                Diagnostics {
1050                    shard_name: id.to_string(),
1051                    handle_purpose: format!("snapshot {}", id),
1052                },
1053                USE_CRITICAL_SINCE_SNAPSHOT.get(&persist.cfg),
1054            )
1055            .await
1056            .expect("invalid persist usage");
1057        Ok(read_handle)
1058    }
1059
1060    // TODO(petrosagg): This signature is not very useful in the context of partially ordered times
1061    // where the as_of frontier might have multiple elements. In the current form the mutually
1062    // incomparable updates will be accumulated together to a state of the collection that never
1063    // actually existed. We should include the original time in the updates advanced by the as_of
1064    // frontier in the result and let the caller decide what to do with the information.
1065    fn snapshot(
1066        &self,
1067        id: GlobalId,
1068        as_of: T,
1069        txns_read: &TxnsRead<T>,
1070    ) -> BoxFuture<'static, Result<Vec<(Row, StorageDiff)>, StorageError<T>>>
1071    where
1072        T: Codec64 + From<EpochMillis> + TimestampManipulation,
1073    {
1074        let metadata = match self.collection_metadata(id) {
1075            Ok(metadata) => metadata.clone(),
1076            Err(e) => return async { Err(e) }.boxed(),
1077        };
1078        let txns_read = metadata.txns_shard.as_ref().map(|txns_id| {
1079            assert_eq!(txns_id, txns_read.txns_id());
1080            txns_read.clone()
1081        });
1082        let persist = Arc::clone(&self.persist);
1083        async move {
1084            let mut read_handle = Self::read_handle_for_snapshot(persist, &metadata, id).await?;
1085            let contents = match txns_read {
1086                None => {
1087                    // We're not using txn-wal for tables, so we can take a snapshot directly.
1088                    read_handle
1089                        .snapshot_and_fetch(Antichain::from_elem(as_of))
1090                        .await
1091                }
1092                Some(txns_read) => {
1093                    // We _are_ using txn-wal for tables. It advances the physical upper of the
1094                    // shard lazily, so we need to ask it for the snapshot to ensure the read is
1095                    // unblocked.
1096                    //
1097                    // Consider the following scenario:
1098                    // - Table A is written to via txns at time 5
1099                    // - Tables other than A are written to via txns consuming timestamps up to 10
1100                    // - We'd like to read A at 7
1101                    // - The application process of A's txn has advanced the upper to 5+1, but we need
1102                    //   it to be past 7, but the txns shard knows that (5,10) is empty of writes to A
1103                    // - This branch allows it to handle that advancing the physical upper of Table A to
1104                    //   10 (NB but only once we see it get past the write at 5!)
1105                    // - Then we can read it normally.
1106                    txns_read.update_gt(as_of.clone()).await;
1107                    let data_snapshot = txns_read
1108                        .data_snapshot(metadata.data_shard, as_of.clone())
1109                        .await;
1110                    data_snapshot.snapshot_and_fetch(&mut read_handle).await
1111                }
1112            };
1113            match contents {
1114                Ok(contents) => {
1115                    let mut snapshot = Vec::with_capacity(contents.len());
1116                    for ((data, _), _, diff) in contents {
1117                        // TODO(petrosagg): We should accumulate the errors too and let the user
1118                        // interprret the result
1119                        let row = data.expect("invalid protobuf data").0?;
1120                        snapshot.push((row, diff));
1121                    }
1122                    Ok(snapshot)
1123                }
1124                Err(_) => Err(StorageError::ReadBeforeSince(id)),
1125            }
1126        }
1127        .boxed()
1128    }
1129
1130    fn snapshot_and_stream(
1131        &self,
1132        id: GlobalId,
1133        as_of: T,
1134        txns_read: &TxnsRead<T>,
1135    ) -> BoxFuture<'static, Result<BoxStream<'static, (SourceData, T, StorageDiff)>, StorageError<T>>>
1136    {
1137        use futures::stream::StreamExt;
1138
1139        let metadata = match self.collection_metadata(id) {
1140            Ok(metadata) => metadata.clone(),
1141            Err(e) => return async { Err(e) }.boxed(),
1142        };
1143        let txns_read = metadata.txns_shard.as_ref().map(|txns_id| {
1144            assert_eq!(txns_id, txns_read.txns_id());
1145            txns_read.clone()
1146        });
1147        let persist = Arc::clone(&self.persist);
1148
1149        async move {
1150            let mut read_handle = Self::read_handle_for_snapshot(persist, &metadata, id).await?;
1151            let stream = match txns_read {
1152                None => {
1153                    // We're not using txn-wal for tables, so we can take a snapshot directly.
1154                    read_handle
1155                        .snapshot_and_stream(Antichain::from_elem(as_of))
1156                        .await
1157                        .map_err(|_| StorageError::ReadBeforeSince(id))?
1158                        .boxed()
1159                }
1160                Some(txns_read) => {
1161                    txns_read.update_gt(as_of.clone()).await;
1162                    let data_snapshot = txns_read
1163                        .data_snapshot(metadata.data_shard, as_of.clone())
1164                        .await;
1165                    data_snapshot
1166                        .snapshot_and_stream(&mut read_handle)
1167                        .await
1168                        .map_err(|_| StorageError::ReadBeforeSince(id))?
1169                        .boxed()
1170                }
1171            };
1172
1173            // Map our stream, unwrapping Persist internal errors.
1174            let stream = stream
1175                .map(|((k, _v), t, d)| {
1176                    // TODO(parkmycar): We should accumulate the errors and pass them on to the
1177                    // caller.
1178                    let data = k.expect("error while streaming from Persist");
1179                    (data, t, d)
1180                })
1181                .boxed();
1182            Ok(stream)
1183        }
1184        .boxed()
1185    }
1186
1187    fn set_read_policies_inner(
1188        &self,
1189        collections: &mut BTreeMap<GlobalId, CollectionState<T>>,
1190        policies: Vec<(GlobalId, ReadPolicy<T>)>,
1191    ) {
1192        trace!("set_read_policies: {:?}", policies);
1193
1194        let mut read_capability_changes = BTreeMap::default();
1195
1196        for (id, policy) in policies.into_iter() {
1197            let collection = match collections.get_mut(&id) {
1198                Some(c) => c,
1199                None => {
1200                    panic!("Reference to absent collection {id}");
1201                }
1202            };
1203
1204            let mut new_read_capability = policy.frontier(collection.write_frontier.borrow());
1205
1206            if PartialOrder::less_equal(&collection.implied_capability, &new_read_capability) {
1207                let mut update = ChangeBatch::new();
1208                update.extend(new_read_capability.iter().map(|time| (time.clone(), 1)));
1209                std::mem::swap(&mut collection.implied_capability, &mut new_read_capability);
1210                update.extend(new_read_capability.iter().map(|time| (time.clone(), -1)));
1211                if !update.is_empty() {
1212                    read_capability_changes.insert(id, update);
1213                }
1214            }
1215
1216            collection.read_policy = policy;
1217        }
1218
1219        for (id, changes) in read_capability_changes.iter() {
1220            if id.is_user() {
1221                trace!(%id, ?changes, "in set_read_policies, capability changes");
1222            }
1223        }
1224
1225        if !read_capability_changes.is_empty() {
1226            StorageCollectionsImpl::update_read_capabilities_inner(
1227                &self.cmd_tx,
1228                collections,
1229                &mut read_capability_changes,
1230            );
1231        }
1232    }
1233
1234    // This is not an associated function so that we can share it with the task
1235    // that updates the persist handles and also has a reference to the shared
1236    // collections state.
1237    fn update_read_capabilities_inner(
1238        cmd_tx: &mpsc::UnboundedSender<BackgroundCmd<T>>,
1239        collections: &mut BTreeMap<GlobalId, CollectionState<T>>,
1240        updates: &mut BTreeMap<GlobalId, ChangeBatch<T>>,
1241    ) {
1242        // Location to record consequences that we need to act on.
1243        let mut collections_net = BTreeMap::new();
1244
1245        // We must not rely on any specific relative ordering of `GlobalId`s.
1246        // That said, it is reasonable to assume that collections generally have
1247        // greater IDs than their dependencies, so starting with the largest is
1248        // a useful optimization.
1249        while let Some(id) = updates.keys().rev().next().cloned() {
1250            let mut update = updates.remove(&id).unwrap();
1251
1252            if id.is_user() {
1253                trace!(id = ?id, update = ?update, "update_read_capabilities");
1254            }
1255
1256            let collection = if let Some(c) = collections.get_mut(&id) {
1257                c
1258            } else {
1259                let has_positive_updates = update.iter().any(|(_ts, diff)| *diff > 0);
1260                if has_positive_updates {
1261                    panic!(
1262                        "reference to absent collection {id} but we have positive updates: {:?}",
1263                        update
1264                    );
1265                } else {
1266                    // Continue purely negative updates. Someone has probably
1267                    // already dropped this collection!
1268                    continue;
1269                }
1270            };
1271
1272            let current_read_capabilities = collection.read_capabilities.frontier().to_owned();
1273            for (time, diff) in update.iter() {
1274                assert!(
1275                    collection.read_capabilities.count_for(time) + diff >= 0,
1276                    "update {:?} for collection {id} would lead to negative \
1277                        read capabilities, read capabilities before applying: {:?}",
1278                    update,
1279                    collection.read_capabilities
1280                );
1281
1282                if collection.read_capabilities.count_for(time) + diff > 0 {
1283                    assert!(
1284                        current_read_capabilities.less_equal(time),
1285                        "update {:?} for collection {id} is trying to \
1286                            install read capabilities before the current \
1287                            frontier of read capabilities, read capabilities before applying: {:?}",
1288                        update,
1289                        collection.read_capabilities
1290                    );
1291                }
1292            }
1293
1294            let changes = collection.read_capabilities.update_iter(update.drain());
1295            update.extend(changes);
1296
1297            if id.is_user() {
1298                trace!(
1299                %id,
1300                ?collection.storage_dependencies,
1301                ?update,
1302                "forwarding update to storage dependencies");
1303            }
1304
1305            for id in collection.storage_dependencies.iter() {
1306                updates
1307                    .entry(*id)
1308                    .or_insert_with(ChangeBatch::new)
1309                    .extend(update.iter().cloned());
1310            }
1311
1312            let (changes, frontier) = collections_net
1313                .entry(id)
1314                .or_insert_with(|| (<ChangeBatch<_>>::new(), Antichain::new()));
1315
1316            changes.extend(update.drain());
1317            *frontier = collection.read_capabilities.frontier().to_owned();
1318        }
1319
1320        // Translate our net compute actions into downgrades of persist sinces.
1321        // The actual downgrades are performed by a Tokio task asynchronously.
1322        let mut persist_compaction_commands = Vec::with_capacity(collections_net.len());
1323        for (key, (mut changes, frontier)) in collections_net {
1324            if !changes.is_empty() {
1325                // If the table has a "primary" collection, let that collection drive compaction.
1326                let collection = collections.get(&key).expect("must still exist");
1327                let should_emit_persist_compaction = !matches!(
1328                    collection.description.data_source,
1329                    DataSource::Table { primary: Some(_) }
1330                );
1331
1332                if frontier.is_empty() {
1333                    info!(id = %key, "removing collection state because the since advanced to []!");
1334                    collections.remove(&key).expect("must still exist");
1335                }
1336
1337                if should_emit_persist_compaction {
1338                    persist_compaction_commands.push((key, frontier));
1339                }
1340            }
1341        }
1342
1343        if !persist_compaction_commands.is_empty() {
1344            cmd_tx
1345                .send(BackgroundCmd::DowngradeSince(persist_compaction_commands))
1346                .expect("cannot fail to send");
1347        }
1348    }
1349
1350    /// Remove any shards that we know are finalized
1351    fn synchronize_finalized_shards(&self, storage_metadata: &StorageMetadata) {
1352        self.finalized_shards
1353            .lock()
1354            .retain(|shard| storage_metadata.unfinalized_shards.contains(shard));
1355    }
1356}
1357
1358// See comments on the above impl for StorageCollectionsImpl.
1359#[async_trait]
1360impl<T> StorageCollections for StorageCollectionsImpl<T>
1361where
1362    T: TimelyTimestamp
1363        + Lattice
1364        + Codec64
1365        + From<EpochMillis>
1366        + TimestampManipulation
1367        + Into<mz_repr::Timestamp>
1368        + Sync,
1369{
1370    type Timestamp = T;
1371
1372    async fn initialize_state(
1373        &self,
1374        txn: &mut (dyn StorageTxn<T> + Send),
1375        init_ids: BTreeSet<GlobalId>,
1376    ) -> Result<(), StorageError<T>> {
1377        let metadata = txn.get_collection_metadata();
1378        let existing_metadata: BTreeSet<_> = metadata.into_iter().map(|(id, _)| id).collect();
1379
1380        // Determine which collections we do not yet have metadata for.
1381        let new_collections: BTreeSet<GlobalId> =
1382            init_ids.difference(&existing_metadata).cloned().collect();
1383
1384        self.prepare_state(
1385            txn,
1386            new_collections,
1387            BTreeSet::default(),
1388            BTreeMap::default(),
1389        )
1390        .await?;
1391
1392        // All shards that belong to collections dropped in the last epoch are
1393        // eligible for finalization.
1394        //
1395        // n.b. this introduces an unlikely race condition: if a collection is
1396        // dropped from the catalog, but the dataflow is still running on a
1397        // worker, assuming the shard is safe to finalize on reboot may cause
1398        // the cluster to panic.
1399        let unfinalized_shards = txn.get_unfinalized_shards().into_iter().collect_vec();
1400
1401        info!(?unfinalized_shards, "initializing finalizable_shards");
1402
1403        self.finalizable_shards.lock().extend(unfinalized_shards);
1404
1405        Ok(())
1406    }
1407
1408    fn update_parameters(&self, config_params: StorageParameters) {
1409        // We serialize the dyncfg updates in StorageParameters, but configure
1410        // persist separately.
1411        config_params.dyncfg_updates.apply(self.persist.cfg());
1412
1413        self.config
1414            .lock()
1415            .expect("lock poisoned")
1416            .update(config_params);
1417    }
1418
1419    fn collection_metadata(
1420        &self,
1421        id: GlobalId,
1422    ) -> Result<CollectionMetadata, StorageError<Self::Timestamp>> {
1423        let collections = self.collections.lock().expect("lock poisoned");
1424
1425        collections
1426            .get(&id)
1427            .map(|c| c.collection_metadata.clone())
1428            .ok_or(StorageError::IdentifierMissing(id))
1429    }
1430
1431    fn active_collection_metadatas(&self) -> Vec<(GlobalId, CollectionMetadata)> {
1432        let collections = self.collections.lock().expect("lock poisoned");
1433
1434        collections
1435            .iter()
1436            .filter(|(_id, c)| !c.is_dropped())
1437            .map(|(id, c)| (*id, c.collection_metadata.clone()))
1438            .collect()
1439    }
1440
1441    fn collections_frontiers(
1442        &self,
1443        ids: Vec<GlobalId>,
1444    ) -> Result<Vec<CollectionFrontiers<Self::Timestamp>>, StorageError<Self::Timestamp>> {
1445        let collections = self.collections.lock().expect("lock poisoned");
1446
1447        let res = ids
1448            .into_iter()
1449            .map(|id| {
1450                collections
1451                    .get(&id)
1452                    .map(|c| CollectionFrontiers {
1453                        id: id.clone(),
1454                        write_frontier: c.write_frontier.clone(),
1455                        implied_capability: c.implied_capability.clone(),
1456                        read_capabilities: c.read_capabilities.frontier().to_owned(),
1457                    })
1458                    .ok_or(StorageError::IdentifierMissing(id))
1459            })
1460            .collect::<Result<Vec<_>, _>>()?;
1461
1462        Ok(res)
1463    }
1464
1465    fn active_collection_frontiers(&self) -> Vec<CollectionFrontiers<Self::Timestamp>> {
1466        let collections = self.collections.lock().expect("lock poisoned");
1467
1468        let res = collections
1469            .iter()
1470            .filter(|(_id, c)| !c.is_dropped())
1471            .map(|(id, c)| CollectionFrontiers {
1472                id: id.clone(),
1473                write_frontier: c.write_frontier.clone(),
1474                implied_capability: c.implied_capability.clone(),
1475                read_capabilities: c.read_capabilities.frontier().to_owned(),
1476            })
1477            .collect_vec();
1478
1479        res
1480    }
1481
1482    async fn snapshot_stats(
1483        &self,
1484        id: GlobalId,
1485        as_of: Antichain<Self::Timestamp>,
1486    ) -> Result<SnapshotStats, StorageError<Self::Timestamp>> {
1487        let metadata = self.collection_metadata(id)?;
1488
1489        // See the comments in StorageController::snapshot for what's going on
1490        // here.
1491        let as_of = match metadata.txns_shard.as_ref() {
1492            None => SnapshotStatsAsOf::Direct(as_of),
1493            Some(txns_id) => {
1494                assert_eq!(txns_id, self.txns_read.txns_id());
1495                let as_of = as_of
1496                    .into_option()
1497                    .expect("cannot read as_of the empty antichain");
1498                self.txns_read.update_gt(as_of.clone()).await;
1499                let data_snapshot = self
1500                    .txns_read
1501                    .data_snapshot(metadata.data_shard, as_of.clone())
1502                    .await;
1503                SnapshotStatsAsOf::Txns(data_snapshot)
1504            }
1505        };
1506        self.snapshot_stats_inner(id, as_of).await
1507    }
1508
1509    async fn snapshot_parts_stats(
1510        &self,
1511        id: GlobalId,
1512        as_of: Antichain<Self::Timestamp>,
1513    ) -> BoxFuture<'static, Result<SnapshotPartsStats, StorageError<Self::Timestamp>>> {
1514        let metadata = {
1515            let self_collections = self.collections.lock().expect("lock poisoned");
1516
1517            let collection_metadata = self_collections
1518                .get(&id)
1519                .ok_or(StorageError::IdentifierMissing(id))
1520                .map(|c| c.collection_metadata.clone());
1521
1522            match collection_metadata {
1523                Ok(m) => m,
1524                Err(e) => return Box::pin(async move { Err(e) }),
1525            }
1526        };
1527
1528        // See the comments in StorageController::snapshot for what's going on
1529        // here.
1530        let persist = Arc::clone(&self.persist);
1531        let read_handle = Self::read_handle_for_snapshot(persist, &metadata, id).await;
1532
1533        let data_snapshot = match (metadata, as_of.as_option()) {
1534            (
1535                CollectionMetadata {
1536                    txns_shard: Some(txns_id),
1537                    data_shard,
1538                    ..
1539                },
1540                Some(as_of),
1541            ) => {
1542                assert_eq!(txns_id, *self.txns_read.txns_id());
1543                self.txns_read.update_gt(as_of.clone()).await;
1544                let data_snapshot = self
1545                    .txns_read
1546                    .data_snapshot(data_shard, as_of.clone())
1547                    .await;
1548                Some(data_snapshot)
1549            }
1550            _ => None,
1551        };
1552
1553        Box::pin(async move {
1554            let read_handle = read_handle?;
1555            let result = match data_snapshot {
1556                Some(data_snapshot) => data_snapshot.snapshot_parts_stats(&read_handle).await,
1557                None => read_handle.snapshot_parts_stats(as_of).await,
1558            };
1559            read_handle.expire().await;
1560            result.map_err(|_| StorageError::ReadBeforeSince(id))
1561        })
1562    }
1563
1564    // TODO(petrosagg): This signature is not very useful in the context of partially ordered times
1565    // where the as_of frontier might have multiple elements. In the current form the mutually
1566    // incomparable updates will be accumulated together to a state of the collection that never
1567    // actually existed. We should include the original time in the updates advanced by the as_of
1568    // frontier in the result and let the caller decide what to do with the information.
1569    fn snapshot(
1570        &self,
1571        id: GlobalId,
1572        as_of: Self::Timestamp,
1573    ) -> BoxFuture<'static, Result<Vec<(Row, StorageDiff)>, StorageError<Self::Timestamp>>> {
1574        self.snapshot(id, as_of, &self.txns_read)
1575    }
1576
1577    async fn snapshot_latest(
1578        &self,
1579        id: GlobalId,
1580    ) -> Result<Vec<Row>, StorageError<Self::Timestamp>> {
1581        let upper = self.recent_upper(id).await?;
1582        let res = match upper.as_option() {
1583            Some(f) if f > &T::minimum() => {
1584                let as_of = f.step_back().unwrap();
1585
1586                let snapshot = self.snapshot(id, as_of, &self.txns_read).await.unwrap();
1587                snapshot
1588                    .into_iter()
1589                    .map(|(row, diff)| {
1590                        assert_eq!(diff, 1, "snapshot doesn't accumulate to set");
1591                        row
1592                    })
1593                    .collect()
1594            }
1595            Some(_min) => {
1596                // The collection must be empty!
1597                Vec::new()
1598            }
1599            // The collection is closed, we cannot determine a latest read
1600            // timestamp based on the upper.
1601            _ => {
1602                return Err(StorageError::InvalidUsage(
1603                    "collection closed, cannot determine a read timestamp based on the upper"
1604                        .to_string(),
1605                ));
1606            }
1607        };
1608
1609        Ok(res)
1610    }
1611
1612    fn snapshot_cursor(
1613        &self,
1614        id: GlobalId,
1615        as_of: Self::Timestamp,
1616    ) -> BoxFuture<'static, Result<SnapshotCursor<Self::Timestamp>, StorageError<Self::Timestamp>>>
1617    where
1618        Self::Timestamp: TimelyTimestamp + Lattice + Codec64,
1619    {
1620        let metadata = match self.collection_metadata(id) {
1621            Ok(metadata) => metadata.clone(),
1622            Err(e) => return async { Err(e) }.boxed(),
1623        };
1624        let txns_read = metadata.txns_shard.as_ref().map(|txns_id| {
1625            // Ensure the txn's shard the controller has is the same that this
1626            // collection is registered to.
1627            assert_eq!(txns_id, self.txns_read.txns_id());
1628            self.txns_read.clone()
1629        });
1630        let persist = Arc::clone(&self.persist);
1631
1632        // See the comments in Self::snapshot for what's going on here.
1633        async move {
1634            let mut handle = Self::read_handle_for_snapshot(persist, &metadata, id).await?;
1635            let cursor = match txns_read {
1636                None => {
1637                    let cursor = handle
1638                        .snapshot_cursor(Antichain::from_elem(as_of), |_| true)
1639                        .await
1640                        .map_err(|_| StorageError::ReadBeforeSince(id))?;
1641                    SnapshotCursor {
1642                        _read_handle: handle,
1643                        cursor,
1644                    }
1645                }
1646                Some(txns_read) => {
1647                    txns_read.update_gt(as_of.clone()).await;
1648                    let data_snapshot = txns_read
1649                        .data_snapshot(metadata.data_shard, as_of.clone())
1650                        .await;
1651                    let cursor = data_snapshot
1652                        .snapshot_cursor(&mut handle, |_| true)
1653                        .await
1654                        .map_err(|_| StorageError::ReadBeforeSince(id))?;
1655                    SnapshotCursor {
1656                        _read_handle: handle,
1657                        cursor,
1658                    }
1659                }
1660            };
1661
1662            Ok(cursor)
1663        }
1664        .boxed()
1665    }
1666
1667    fn snapshot_and_stream(
1668        &self,
1669        id: GlobalId,
1670        as_of: Self::Timestamp,
1671    ) -> BoxFuture<
1672        'static,
1673        Result<
1674            BoxStream<'static, (SourceData, Self::Timestamp, StorageDiff)>,
1675            StorageError<Self::Timestamp>,
1676        >,
1677    >
1678    where
1679        Self::Timestamp: TimelyTimestamp + Lattice + Codec64 + 'static,
1680    {
1681        self.snapshot_and_stream(id, as_of, &self.txns_read)
1682    }
1683
1684    fn create_update_builder(
1685        &self,
1686        id: GlobalId,
1687    ) -> BoxFuture<
1688        'static,
1689        Result<
1690            TimestamplessUpdateBuilder<SourceData, (), Self::Timestamp, StorageDiff>,
1691            StorageError<Self::Timestamp>,
1692        >,
1693    > {
1694        let metadata = match self.collection_metadata(id) {
1695            Ok(m) => m,
1696            Err(e) => return Box::pin(async move { Err(e) }),
1697        };
1698        let persist = Arc::clone(&self.persist);
1699
1700        async move {
1701            let persist_client = persist
1702                .open(metadata.persist_location.clone())
1703                .await
1704                .expect("invalid persist usage");
1705            let write_handle = persist_client
1706                .open_writer::<SourceData, (), Self::Timestamp, StorageDiff>(
1707                    metadata.data_shard,
1708                    Arc::new(metadata.relation_desc.clone()),
1709                    Arc::new(UnitSchema),
1710                    Diagnostics {
1711                        shard_name: id.to_string(),
1712                        handle_purpose: format!("create write batch {}", id),
1713                    },
1714                )
1715                .await
1716                .expect("invalid persist usage");
1717            let builder = TimestamplessUpdateBuilder::new(&write_handle);
1718
1719            Ok(builder)
1720        }
1721        .boxed()
1722    }
1723
1724    fn check_exists(&self, id: GlobalId) -> Result<(), StorageError<Self::Timestamp>> {
1725        let collections = self.collections.lock().expect("lock poisoned");
1726
1727        if collections.contains_key(&id) {
1728            Ok(())
1729        } else {
1730            Err(StorageError::IdentifierMissing(id))
1731        }
1732    }
1733
1734    async fn prepare_state(
1735        &self,
1736        txn: &mut (dyn StorageTxn<Self::Timestamp> + Send),
1737        ids_to_add: BTreeSet<GlobalId>,
1738        ids_to_drop: BTreeSet<GlobalId>,
1739        ids_to_register: BTreeMap<GlobalId, ShardId>,
1740    ) -> Result<(), StorageError<T>> {
1741        txn.insert_collection_metadata(
1742            ids_to_add
1743                .into_iter()
1744                .map(|id| (id, ShardId::new()))
1745                .collect(),
1746        )?;
1747        txn.insert_collection_metadata(ids_to_register)?;
1748
1749        // Delete the metadata for any dropped collections.
1750        let dropped_mappings = txn.delete_collection_metadata(ids_to_drop);
1751
1752        let dropped_shards = dropped_mappings
1753            .into_iter()
1754            .map(|(_id, shard)| shard)
1755            .collect();
1756
1757        txn.insert_unfinalized_shards(dropped_shards)?;
1758
1759        // Reconcile any shards we've successfully finalized with the shard
1760        // finalization collection.
1761        let finalized_shards = self.finalized_shards.lock().iter().copied().collect();
1762        txn.mark_shards_as_finalized(finalized_shards);
1763
1764        Ok(())
1765    }
1766
1767    // TODO(aljoscha): It would be swell if we could refactor this Leviathan of
1768    // a method/move individual parts to their own methods.
1769    #[instrument(level = "debug")]
1770    async fn create_collections_for_bootstrap(
1771        &self,
1772        storage_metadata: &StorageMetadata,
1773        register_ts: Option<Self::Timestamp>,
1774        mut collections: Vec<(GlobalId, CollectionDescription<Self::Timestamp>)>,
1775        migrated_storage_collections: &BTreeSet<GlobalId>,
1776    ) -> Result<(), StorageError<Self::Timestamp>> {
1777        let is_in_txns = |id, metadata: &CollectionMetadata| {
1778            metadata.txns_shard.is_some()
1779                && !(self.read_only && migrated_storage_collections.contains(&id))
1780        };
1781
1782        // Validate first, to avoid corrupting state.
1783        // 1. create a dropped identifier, or
1784        // 2. create an existing identifier with a new description.
1785        // Make sure to check for errors within `ingestions` as well.
1786        collections.sort_by_key(|(id, _)| *id);
1787        collections.dedup();
1788        for pos in 1..collections.len() {
1789            if collections[pos - 1].0 == collections[pos].0 {
1790                return Err(StorageError::CollectionIdReused(collections[pos].0));
1791            }
1792        }
1793
1794        {
1795            // Early sanity check: if we knew about a collection already it's
1796            // description must match!
1797            //
1798            // NOTE: There could be concurrent modifications to
1799            // `self.collections`, but this sanity check is better than nothing.
1800            let self_collections = self.collections.lock().expect("lock poisoned");
1801            for (id, description) in collections.iter() {
1802                if let Some(existing_collection) = self_collections.get(id) {
1803                    if &existing_collection.description != description {
1804                        return Err(StorageError::CollectionIdReused(*id));
1805                    }
1806                }
1807            }
1808        }
1809
1810        // We first enrich each collection description with some additional
1811        // metadata...
1812        let enriched_with_metadata = collections
1813            .into_iter()
1814            .map(|(id, description)| {
1815                let data_shard = storage_metadata.get_collection_shard::<T>(id)?;
1816
1817                let get_shard = |id| -> Result<ShardId, StorageError<T>> {
1818                    let shard = storage_metadata.get_collection_shard::<T>(id)?;
1819                    Ok(shard)
1820                };
1821
1822                let remap_shard = match &description.data_source {
1823                    // Only ingestions can have remap shards.
1824                    DataSource::Ingestion(IngestionDescription {
1825                        remap_collection_id,
1826                        ..
1827                    }) => {
1828                        // Iff ingestion has a remap collection, its metadata
1829                        // must exist (and be correct) by this point.
1830                        Some(get_shard(*remap_collection_id)?)
1831                    }
1832                    _ => None,
1833                };
1834
1835                // If the shard is being managed by txn-wal (initially,
1836                // tables), then we need to pass along the shard id for the txns
1837                // shard to dataflow rendering.
1838                let txns_shard = description
1839                    .data_source
1840                    .in_txns()
1841                    .then(|| *self.txns_read.txns_id());
1842
1843                let metadata = CollectionMetadata {
1844                    persist_location: self.persist_location.clone(),
1845                    remap_shard,
1846                    data_shard,
1847                    relation_desc: description.desc.clone(),
1848                    txns_shard,
1849                };
1850
1851                Ok((id, description, metadata))
1852            })
1853            .collect_vec();
1854
1855        // So that we can open `SinceHandle`s for each collections concurrently.
1856        let persist_client = self
1857            .persist
1858            .open(self.persist_location.clone())
1859            .await
1860            .unwrap();
1861        let persist_client = &persist_client;
1862        // Reborrow the `&mut self` as immutable, as all the concurrent work to
1863        // be processed in this stream cannot all have exclusive access.
1864        use futures::stream::{StreamExt, TryStreamExt};
1865        let this = &*self;
1866        let mut to_register: Vec<_> = futures::stream::iter(enriched_with_metadata)
1867            .map(|data: Result<_, StorageError<Self::Timestamp>>| {
1868                let register_ts = register_ts.clone();
1869                async move {
1870                    let (id, description, metadata) = data?;
1871
1872                    // should be replaced with real introspection
1873                    // (https://github.com/MaterializeInc/database-issues/issues/4078)
1874                    // but for now, it's helpful to have this mapping written down
1875                    // somewhere
1876                    debug!(
1877                        "mapping GlobalId={} to remap shard ({:?}), data shard ({})",
1878                        id, metadata.remap_shard, metadata.data_shard
1879                    );
1880
1881                    let (write, mut since_handle) = this
1882                        .open_data_handles(
1883                            &id,
1884                            metadata.data_shard,
1885                            description.since.as_ref(),
1886                            metadata.relation_desc.clone(),
1887                            persist_client,
1888                        )
1889                        .await;
1890
1891                    // Present tables as springing into existence at the register_ts
1892                    // by advancing the since. Otherwise, we could end up in a
1893                    // situation where a table with a long compaction window appears
1894                    // to exist before the environment (and this the table) existed.
1895                    //
1896                    // We could potentially also do the same thing for other
1897                    // sources, in particular storage's internal sources and perhaps
1898                    // others, but leave them for now.
1899                    match description.data_source {
1900                        DataSource::Introspection(_)
1901                        | DataSource::IngestionExport { .. }
1902                        | DataSource::Webhook
1903                        | DataSource::Ingestion(_)
1904                        | DataSource::Progress
1905                        | DataSource::Other => {}
1906                        DataSource::Sink { .. } => {}
1907                        DataSource::Table { .. } => {
1908                            let register_ts = register_ts.expect(
1909                                "caller should have provided a register_ts when creating a table",
1910                            );
1911                            if since_handle.since().elements() == &[T::minimum()]
1912                                && !migrated_storage_collections.contains(&id)
1913                            {
1914                                debug!("advancing {} to initial since of {:?}", id, register_ts);
1915                                let token = since_handle.opaque();
1916                                let _ = since_handle
1917                                    .compare_and_downgrade_since(
1918                                        &token,
1919                                        (&token, &Antichain::from_elem(register_ts.clone())),
1920                                    )
1921                                    .await;
1922                            }
1923                        }
1924                    }
1925
1926                    Ok::<_, StorageError<Self::Timestamp>>((
1927                        id,
1928                        description,
1929                        write,
1930                        since_handle,
1931                        metadata,
1932                    ))
1933                }
1934            })
1935            // Poll each future for each collection concurrently, maximum of 50 at a time.
1936            .buffer_unordered(50)
1937            // HERE BE DRAGONS:
1938            //
1939            // There are at least 2 subtleties in using `FuturesUnordered`
1940            // (which `buffer_unordered` uses underneath:
1941            // - One is captured here
1942            //   <https://github.com/rust-lang/futures-rs/issues/2387>
1943            // - And the other is deadlocking if processing an OUTPUT of a
1944            //   `FuturesUnordered` stream attempts to obtain an async mutex that
1945            //   is also obtained in the futures being polled.
1946            //
1947            // Both of these could potentially be issues in all usages of
1948            // `buffer_unordered` in this method, so we stick the standard
1949            // advice: only use `try_collect` or `collect`!
1950            .try_collect()
1951            .await?;
1952
1953        // Reorder in dependency order.
1954        #[derive(Ord, PartialOrd, Eq, PartialEq)]
1955        enum DependencyOrder {
1956            /// Tables should always be registered first, and large ids before small ones.
1957            Table(Reverse<GlobalId>),
1958            /// For most collections the id order is the correct one.
1959            Collection(GlobalId),
1960            /// Sinks should always be registered last.
1961            Sink(GlobalId),
1962        }
1963        to_register.sort_by_key(|(id, desc, ..)| match &desc.data_source {
1964            DataSource::Table { .. } => DependencyOrder::Table(Reverse(*id)),
1965            DataSource::Sink { .. } => DependencyOrder::Sink(*id),
1966            _ => DependencyOrder::Collection(*id),
1967        });
1968
1969        // We hold this lock for a very short amount of time, just doing some
1970        // hashmap inserts and unbounded channel sends.
1971        let mut self_collections = self.collections.lock().expect("lock poisoned");
1972
1973        for (id, mut description, write_handle, since_handle, metadata) in to_register {
1974            // Ensure that the ingestion has an export for its primary source if applicable.
1975            // This is done in an awkward spot to appease the borrow checker.
1976            // TODO(database-issues#8620): This will be removed once sources no longer export
1977            // to primary collections and only export to explicit SourceExports (tables).
1978            if let DataSource::Ingestion(ingestion) = &mut description.data_source {
1979                let export = ingestion.desc.primary_source_export();
1980                ingestion.source_exports.insert(id, export);
1981            }
1982
1983            let write_frontier = write_handle.upper();
1984            let data_shard_since = since_handle.since().clone();
1985
1986            // Determine if this collection has any dependencies.
1987            let storage_dependencies = self
1988                .determine_collection_dependencies(&*self_collections, &description.data_source)?;
1989
1990            // Determine the initial since of the collection.
1991            let initial_since = match storage_dependencies
1992                .iter()
1993                .at_most_one()
1994                .expect("should have at most one dependency")
1995            {
1996                Some(dep) => {
1997                    let dependency_collection = self_collections
1998                        .get(dep)
1999                        .ok_or(StorageError::IdentifierMissing(*dep))?;
2000                    let dependency_since = dependency_collection.implied_capability.clone();
2001
2002                    // If an item has a dependency, its initial since must be
2003                    // advanced as far as its dependency, i.e. a dependency's
2004                    // since may never be in advance of its dependents.
2005                    //
2006                    // We have to do this every time we initialize the
2007                    // collection, though––the invariant might have been upheld
2008                    // correctly in the previous epoch, but the
2009                    // `data_shard_since` might not have compacted and, on
2010                    // establishing a new persist connection, still have data we
2011                    // said _could_ be compacted.
2012                    if PartialOrder::less_than(&data_shard_since, &dependency_since) {
2013                        // The dependency since cannot be beyond the dependent
2014                        // (our) upper unless the collection is new. In
2015                        // practice, the depdenency is the remap shard of a
2016                        // source (export), and if the since is allowed to
2017                        // "catch up" to the upper, that is `upper <= since`, a
2018                        // restarting ingestion cannot differentiate between
2019                        // updates that have already been written out to the
2020                        // backing persist shard and updates that have yet to be
2021                        // written. We would write duplicate updates.
2022                        //
2023                        // If this check fails, it means that the read hold
2024                        // installed on the dependency was probably not upheld
2025                        // –– if it were, the dependency's since could not have
2026                        // advanced as far the dependent's upper.
2027                        //
2028                        // We don't care about the dependency since when the
2029                        // write frontier is empty. In that case, no-one can
2030                        // write down any more updates.
2031                        mz_ore::soft_assert_or_log!(
2032                            write_frontier.elements() == &[T::minimum()]
2033                                || write_frontier.is_empty()
2034                                || PartialOrder::less_than(&dependency_since, write_frontier),
2035                            "dependency ({dep}) since has advanced past dependent ({id}) upper \n
2036                            dependent ({id}): since {:?}, upper {:?} \n
2037                            dependency ({dep}): since {:?}",
2038                            data_shard_since,
2039                            write_frontier,
2040                            dependency_since
2041                        );
2042
2043                        dependency_since
2044                    } else {
2045                        data_shard_since
2046                    }
2047                }
2048                None => data_shard_since,
2049            };
2050
2051            let mut collection_state = CollectionState::new(
2052                description,
2053                initial_since,
2054                write_frontier.clone(),
2055                storage_dependencies,
2056                metadata.clone(),
2057            );
2058
2059            // Install the collection state in the appropriate spot.
2060            match &collection_state.description.data_source {
2061                DataSource::Introspection(_) => {
2062                    self_collections.insert(id, collection_state);
2063                }
2064                DataSource::Webhook => {
2065                    self_collections.insert(id, collection_state);
2066                }
2067                DataSource::IngestionExport {
2068                    ingestion_id,
2069                    details,
2070                    data_config,
2071                } => {
2072                    // Adjust the source to contain this export.
2073                    let source_collection = self_collections
2074                        .get_mut(ingestion_id)
2075                        .expect("known to exist");
2076                    match &mut source_collection.description {
2077                        CollectionDescription {
2078                            data_source: DataSource::Ingestion(ingestion_desc),
2079                            ..
2080                        } => ingestion_desc.source_exports.insert(
2081                            id,
2082                            SourceExport {
2083                                storage_metadata: (),
2084                                details: details.clone(),
2085                                data_config: data_config.clone(),
2086                            },
2087                        ),
2088                        _ => unreachable!(
2089                            "SourceExport must only refer to primary sources that already exist"
2090                        ),
2091                    };
2092
2093                    self_collections.insert(id, collection_state);
2094                }
2095                DataSource::Table { .. } => {
2096                    // See comment on self.initial_txn_upper on why we're doing
2097                    // this.
2098                    if is_in_txns(id, &metadata)
2099                        && PartialOrder::less_than(
2100                            &collection_state.write_frontier,
2101                            &self.initial_txn_upper,
2102                        )
2103                    {
2104                        // We could try and be cute and use the join of the txn
2105                        // upper and the table upper. But that has more
2106                        // complicated reasoning for why it is or isn't correct,
2107                        // and we're only dealing with totally ordered times
2108                        // here.
2109                        collection_state
2110                            .write_frontier
2111                            .clone_from(&self.initial_txn_upper);
2112                    }
2113                    self_collections.insert(id, collection_state);
2114                }
2115                DataSource::Progress | DataSource::Other => {
2116                    self_collections.insert(id, collection_state);
2117                }
2118                DataSource::Ingestion(_) => {
2119                    self_collections.insert(id, collection_state);
2120                }
2121                DataSource::Sink { .. } => {
2122                    self_collections.insert(id, collection_state);
2123                }
2124            }
2125
2126            self.register_handles(id, is_in_txns(id, &metadata), since_handle, write_handle);
2127
2128            // If this collection has a dependency, install a read hold on it.
2129            self.install_collection_dependency_read_holds_inner(&mut *self_collections, id)?;
2130        }
2131
2132        drop(self_collections);
2133
2134        self.synchronize_finalized_shards(storage_metadata);
2135
2136        Ok(())
2137    }
2138
2139    async fn alter_ingestion_source_desc(
2140        &self,
2141        ingestion_id: GlobalId,
2142        source_desc: SourceDesc,
2143    ) -> Result<(), StorageError<Self::Timestamp>> {
2144        // The StorageController checks the validity of these. And we just
2145        // accept them.
2146
2147        let mut self_collections = self.collections.lock().expect("lock poisoned");
2148        let collection = self_collections
2149            .get_mut(&ingestion_id)
2150            .ok_or(StorageError::IdentifierMissing(ingestion_id))?;
2151
2152        let curr_ingestion = match &mut collection.description.data_source {
2153            DataSource::Ingestion(active_ingestion) => active_ingestion,
2154            _ => unreachable!("verified collection refers to ingestion"),
2155        };
2156
2157        curr_ingestion.desc = source_desc;
2158        debug!("altered {ingestion_id}'s SourceDesc");
2159
2160        Ok(())
2161    }
2162
2163    async fn alter_ingestion_export_data_configs(
2164        &self,
2165        source_exports: BTreeMap<GlobalId, SourceExportDataConfig>,
2166    ) -> Result<(), StorageError<Self::Timestamp>> {
2167        let mut self_collections = self.collections.lock().expect("lock poisoned");
2168
2169        for (source_export_id, new_data_config) in source_exports {
2170            // We need to adjust the data config on the CollectionState for
2171            // the source export collection directly
2172            let source_export_collection = self_collections
2173                .get_mut(&source_export_id)
2174                .ok_or_else(|| StorageError::IdentifierMissing(source_export_id))?;
2175            let ingestion_id = match &mut source_export_collection.description.data_source {
2176                DataSource::IngestionExport {
2177                    ingestion_id,
2178                    details: _,
2179                    data_config,
2180                } => {
2181                    *data_config = new_data_config.clone();
2182                    *ingestion_id
2183                }
2184                o => {
2185                    tracing::warn!("alter_ingestion_export_data_configs called on {:?}", o);
2186                    Err(StorageError::IdentifierInvalid(source_export_id))?
2187                }
2188            };
2189            // We also need to adjust the data config on the CollectionState of the
2190            // Ingestion that the export is associated with.
2191            let ingestion_collection = self_collections
2192                .get_mut(&ingestion_id)
2193                .ok_or_else(|| StorageError::IdentifierMissing(ingestion_id))?;
2194
2195            match &mut ingestion_collection.description.data_source {
2196                DataSource::Ingestion(ingestion_desc) => {
2197                    let source_export = ingestion_desc
2198                        .source_exports
2199                        .get_mut(&source_export_id)
2200                        .ok_or_else(|| StorageError::IdentifierMissing(source_export_id))?;
2201
2202                    if source_export.data_config != new_data_config {
2203                        tracing::info!(?source_export_id, from = ?source_export.data_config, to = ?new_data_config, "alter_ingestion_export_data_configs, updating");
2204                        source_export.data_config = new_data_config;
2205                    } else {
2206                        tracing::warn!(
2207                            "alter_ingestion_export_data_configs called on \
2208                                    export {source_export_id} of {ingestion_id} but \
2209                                    the data config was the same"
2210                        );
2211                    }
2212                }
2213                o => {
2214                    tracing::warn!("alter_ingestion_export_data_configs called on {:?}", o);
2215                    Err(StorageError::IdentifierInvalid(ingestion_id))?;
2216                }
2217            }
2218        }
2219
2220        Ok(())
2221    }
2222
2223    async fn alter_ingestion_connections(
2224        &self,
2225        source_connections: BTreeMap<GlobalId, GenericSourceConnection<InlinedConnection>>,
2226    ) -> Result<(), StorageError<Self::Timestamp>> {
2227        let mut self_collections = self.collections.lock().expect("lock poisoned");
2228
2229        for (id, conn) in source_connections {
2230            let collection = self_collections
2231                .get_mut(&id)
2232                .ok_or_else(|| StorageError::IdentifierMissing(id))?;
2233
2234            match &mut collection.description.data_source {
2235                DataSource::Ingestion(ingestion) => {
2236                    // If the connection hasn't changed, there's no sense in
2237                    // re-rendering the dataflow.
2238                    if ingestion.desc.connection != conn {
2239                        info!(from = ?ingestion.desc.connection, to = ?conn, "alter_ingestion_connections, updating");
2240                        ingestion.desc.connection = conn;
2241                    } else {
2242                        warn!(
2243                            "update_source_connection called on {id} but the \
2244                            connection was the same"
2245                        );
2246                    }
2247                }
2248                o => {
2249                    warn!("update_source_connection called on {:?}", o);
2250                    Err(StorageError::IdentifierInvalid(id))?;
2251                }
2252            }
2253        }
2254
2255        Ok(())
2256    }
2257
2258    async fn alter_table_desc(
2259        &self,
2260        existing_collection: GlobalId,
2261        new_collection: GlobalId,
2262        new_desc: RelationDesc,
2263        expected_version: RelationVersion,
2264    ) -> Result<(), StorageError<Self::Timestamp>> {
2265        let data_shard = {
2266            let self_collections = self.collections.lock().expect("lock poisoned");
2267            let existing = self_collections
2268                .get(&existing_collection)
2269                .ok_or_else(|| StorageError::IdentifierMissing(existing_collection))?;
2270
2271            // TODO(alter_table): Support changes to sources.
2272            if !matches!(&existing.description.data_source, DataSource::Table { .. }) {
2273                return Err(StorageError::IdentifierInvalid(existing_collection));
2274            }
2275
2276            existing.collection_metadata.data_shard
2277        };
2278
2279        let persist_client = self
2280            .persist
2281            .open(self.persist_location.clone())
2282            .await
2283            .unwrap();
2284
2285        // Evolve the schema of this shard.
2286        let diagnostics = Diagnostics {
2287            shard_name: existing_collection.to_string(),
2288            handle_purpose: "alter_table_desc".to_string(),
2289        };
2290        // We map the Adapter's RelationVersion 1:1 with SchemaId.
2291        let expected_schema = expected_version.into();
2292        let schema_result = persist_client
2293            .compare_and_evolve_schema::<SourceData, (), T, StorageDiff>(
2294                data_shard,
2295                expected_schema,
2296                &new_desc,
2297                &UnitSchema,
2298                diagnostics,
2299            )
2300            .await
2301            .map_err(|e| StorageError::InvalidUsage(e.to_string()))?;
2302        tracing::info!(
2303            ?existing_collection,
2304            ?new_collection,
2305            ?new_desc,
2306            "evolved schema"
2307        );
2308
2309        match schema_result {
2310            CaESchema::Ok(id) => id,
2311            // TODO(alter_table): If we get an expected mismatch we should retry.
2312            CaESchema::ExpectedMismatch {
2313                schema_id,
2314                key,
2315                val,
2316            } => {
2317                mz_ore::soft_panic_or_log!(
2318                    "schema expectation mismatch {schema_id:?}, {key:?}, {val:?}"
2319                );
2320                return Err(StorageError::Generic(anyhow::anyhow!(
2321                    "schema expected mismatch, {existing_collection:?}",
2322                )));
2323            }
2324            CaESchema::Incompatible => {
2325                mz_ore::soft_panic_or_log!(
2326                    "incompatible schema! {existing_collection} {new_desc:?}"
2327                );
2328                return Err(StorageError::Generic(anyhow::anyhow!(
2329                    "schema incompatible, {existing_collection:?}"
2330                )));
2331            }
2332        };
2333
2334        // Once the new schema is registered we can open new data handles.
2335        let (write_handle, since_handle) = self
2336            .open_data_handles(
2337                &new_collection,
2338                data_shard,
2339                None,
2340                new_desc.clone(),
2341                &persist_client,
2342            )
2343            .await;
2344
2345        // TODO(alter_table): Do we need to advance the since of the table to match the time this
2346        // new version was registered with txn-wal?
2347
2348        // Great! Our new schema is registered with Persist, now we need to update our internal
2349        // data structures.
2350        {
2351            let mut self_collections = self.collections.lock().expect("lock poisoned");
2352
2353            // Update the existing collection so we know it's a "projection" of this new one.
2354            let existing = self_collections
2355                .get_mut(&existing_collection)
2356                .expect("existing collection missing");
2357
2358            // A higher level should already be asserting this, but let's make sure.
2359            assert!(matches!(
2360                existing.description.data_source,
2361                DataSource::Table { primary: None }
2362            ));
2363
2364            // The existing version of the table will depend on the new version.
2365            existing.description.data_source = DataSource::Table {
2366                primary: Some(new_collection),
2367            };
2368            existing.storage_dependencies.push(new_collection);
2369
2370            // Copy over the frontiers from the previous version.
2371            // The new table starts with two holds - the implied capability, and the hold from
2372            // the previous version - both at the previous version's read frontier.
2373            let implied_capability = existing.read_capabilities.frontier().to_owned();
2374            let write_frontier = existing.write_frontier.clone();
2375
2376            // Determine the relevant read capabilities on the new collection.
2377            //
2378            // Note(parkmycar): Originally we used `install_collection_dependency_read_holds_inner`
2379            // here, but that only installed a ReadHold on the new collection for the implied
2380            // capability of the existing collection. This would cause runtime panics because it
2381            // would eventually result in negative read capabilities.
2382            let mut changes = ChangeBatch::new();
2383            changes.extend(implied_capability.iter().map(|t| (t.clone(), 1)));
2384
2385            // Note: The new collection is now the "primary collection" so we specify `None` here.
2386            let collection_desc = CollectionDescription::for_table(new_desc.clone(), None);
2387            let collection_meta = CollectionMetadata {
2388                persist_location: self.persist_location.clone(),
2389                relation_desc: collection_desc.desc.clone(),
2390                data_shard,
2391                // TODO(alter_table): Support changes to sources.
2392                remap_shard: None,
2393                txns_shard: Some(self.txns_read.txns_id().clone()),
2394            };
2395            let collection_state = CollectionState::new(
2396                collection_desc,
2397                implied_capability,
2398                write_frontier,
2399                Vec::new(),
2400                collection_meta,
2401            );
2402
2403            // Add a record of the new collection.
2404            self_collections.insert(new_collection, collection_state);
2405
2406            let mut updates = BTreeMap::from([(new_collection, changes)]);
2407            StorageCollectionsImpl::update_read_capabilities_inner(
2408                &self.cmd_tx,
2409                &mut *self_collections,
2410                &mut updates,
2411            );
2412        };
2413
2414        // TODO(alter_table): Support changes to sources.
2415        self.register_handles(new_collection, true, since_handle, write_handle);
2416
2417        info!(%existing_collection, %new_collection, ?new_desc, "altered table");
2418
2419        Ok(())
2420    }
2421
2422    fn drop_collections_unvalidated(
2423        &self,
2424        storage_metadata: &StorageMetadata,
2425        identifiers: Vec<GlobalId>,
2426    ) {
2427        debug!(?identifiers, "drop_collections_unvalidated");
2428
2429        let mut self_collections = self.collections.lock().expect("lock poisoned");
2430
2431        for id in identifiers.iter() {
2432            let metadata = storage_metadata.get_collection_shard::<T>(*id);
2433            mz_ore::soft_assert_or_log!(
2434                matches!(metadata, Err(StorageError::IdentifierMissing(_))),
2435                "dropping {id}, but drop was not synchronized with storage \
2436                controller via `synchronize_collections`"
2437            );
2438
2439            let dropped_data_source = match self_collections.get(id) {
2440                Some(col) => col.description.data_source.clone(),
2441                None => continue,
2442            };
2443
2444            // If we are dropping source exports, we need to modify the
2445            // ingestion that it runs on.
2446            if let DataSource::IngestionExport { ingestion_id, .. } = dropped_data_source {
2447                // Adjust the source to remove this export.
2448                let ingestion = match self_collections.get_mut(&ingestion_id) {
2449                    Some(ingestion) => ingestion,
2450                    // Primary ingestion already dropped.
2451                    None => {
2452                        tracing::error!(
2453                            "primary source {ingestion_id} seemingly dropped before subsource {id}",
2454                        );
2455                        continue;
2456                    }
2457                };
2458
2459                match &mut ingestion.description {
2460                    CollectionDescription {
2461                        data_source: DataSource::Ingestion(ingestion_desc),
2462                        ..
2463                    } => {
2464                        let removed = ingestion_desc.source_exports.remove(id);
2465                        mz_ore::soft_assert_or_log!(
2466                            removed.is_some(),
2467                            "dropped subsource {id} already removed from source exports"
2468                        );
2469                    }
2470                    _ => unreachable!(
2471                        "SourceExport must only refer to primary sources that already exist"
2472                    ),
2473                };
2474            }
2475        }
2476
2477        // Policies that advance the since to the empty antichain. We do still
2478        // honor outstanding read holds, and collections will only be dropped
2479        // once those are removed as well.
2480        //
2481        // We don't explicitly remove read capabilities! Downgrading the
2482        // frontier of the source to `[]` (the empty Antichain), will propagate
2483        // to the storage dependencies.
2484        let mut finalized_policies = Vec::new();
2485
2486        for id in identifiers {
2487            // Make sure it's still there, might already have been deleted.
2488            if self_collections.contains_key(&id) {
2489                finalized_policies.push((id, ReadPolicy::ValidFrom(Antichain::new())));
2490            }
2491        }
2492        self.set_read_policies_inner(&mut self_collections, finalized_policies);
2493
2494        drop(self_collections);
2495
2496        self.synchronize_finalized_shards(storage_metadata);
2497    }
2498
2499    fn set_read_policies(&self, policies: Vec<(GlobalId, ReadPolicy<Self::Timestamp>)>) {
2500        let mut collections = self.collections.lock().expect("lock poisoned");
2501
2502        if tracing::enabled!(tracing::Level::TRACE) {
2503            let user_capabilities = collections
2504                .iter_mut()
2505                .filter(|(id, _c)| id.is_user())
2506                .map(|(id, c)| {
2507                    let updates = c.read_capabilities.updates().cloned().collect_vec();
2508                    (*id, c.implied_capability.clone(), updates)
2509                })
2510                .collect_vec();
2511
2512            trace!(?policies, ?user_capabilities, "set_read_policies");
2513        }
2514
2515        self.set_read_policies_inner(&mut collections, policies);
2516
2517        if tracing::enabled!(tracing::Level::TRACE) {
2518            let user_capabilities = collections
2519                .iter_mut()
2520                .filter(|(id, _c)| id.is_user())
2521                .map(|(id, c)| {
2522                    let updates = c.read_capabilities.updates().cloned().collect_vec();
2523                    (*id, c.implied_capability.clone(), updates)
2524                })
2525                .collect_vec();
2526
2527            trace!(?user_capabilities, "after! set_read_policies");
2528        }
2529    }
2530
2531    fn acquire_read_holds(
2532        &self,
2533        desired_holds: Vec<GlobalId>,
2534    ) -> Result<Vec<ReadHold<Self::Timestamp>>, ReadHoldError> {
2535        let mut collections = self.collections.lock().expect("lock poisoned");
2536
2537        let mut advanced_holds = Vec::new();
2538        // We advance the holds by our current since frontier. Can't acquire
2539        // holds for times that have been compacted away!
2540        //
2541        // NOTE: We acquire read holds at the earliest possible time rather than
2542        // at the implied capability. This is so that, for example, adapter can
2543        // acquire a read hold to hold back the frontier, giving the COMPUTE
2544        // controller a chance to also acquire a read hold at that early
2545        // frontier. If/when we change the interplay between adapter and COMPUTE
2546        // to pass around ReadHold tokens, we might tighten this up and instead
2547        // acquire read holds at the implied capability.
2548        for id in desired_holds.iter() {
2549            let collection = collections
2550                .get(id)
2551                .ok_or(ReadHoldError::CollectionMissing(*id))?;
2552            let since = collection.read_capabilities.frontier().to_owned();
2553            advanced_holds.push((*id, since));
2554        }
2555
2556        let mut updates = advanced_holds
2557            .iter()
2558            .map(|(id, hold)| {
2559                let mut changes = ChangeBatch::new();
2560                changes.extend(hold.iter().map(|time| (time.clone(), 1)));
2561                (*id, changes)
2562            })
2563            .collect::<BTreeMap<_, _>>();
2564
2565        StorageCollectionsImpl::update_read_capabilities_inner(
2566            &self.cmd_tx,
2567            &mut collections,
2568            &mut updates,
2569        );
2570
2571        let acquired_holds = advanced_holds
2572            .into_iter()
2573            .map(|(id, since)| ReadHold::with_channel(id, since, self.holds_tx.clone()))
2574            .collect_vec();
2575
2576        trace!(?desired_holds, ?acquired_holds, "acquire_read_holds");
2577
2578        Ok(acquired_holds)
2579    }
2580
2581    /// Determine time dependence information for the object.
2582    fn determine_time_dependence(
2583        &self,
2584        id: GlobalId,
2585    ) -> Result<Option<TimeDependence>, TimeDependenceError> {
2586        use TimeDependenceError::CollectionMissing;
2587        let collections = self.collections.lock().expect("lock poisoned");
2588        let mut collection = Some(collections.get(&id).ok_or(CollectionMissing(id))?);
2589
2590        let mut result = None;
2591
2592        while let Some(c) = collection.take() {
2593            use DataSource::*;
2594            if let Some(timeline) = &c.description.timeline {
2595                // Only the epoch timeline follows wall-clock.
2596                if *timeline != Timeline::EpochMilliseconds {
2597                    break;
2598                }
2599            }
2600            match &c.description.data_source {
2601                Ingestion(ingestion) => {
2602                    use GenericSourceConnection::*;
2603                    match ingestion.desc.connection {
2604                        // Kafka, Postgres, MySql, and SQL Server sources all
2605                        // follow wall clock.
2606                        Kafka(_) | Postgres(_) | MySql(_) | SqlServer(_) => {
2607                            result = Some(TimeDependence::default())
2608                        }
2609                        // Load generators not further specified.
2610                        LoadGenerator(_) => {}
2611                    }
2612                }
2613                IngestionExport { ingestion_id, .. } => {
2614                    let c = collections
2615                        .get(ingestion_id)
2616                        .ok_or(CollectionMissing(*ingestion_id))?;
2617                    collection = Some(c);
2618                }
2619                // Introspection, other, progress, table, and webhook sources follow wall clock.
2620                Introspection(_) | Progress | Table { .. } | Webhook { .. } => {
2621                    result = Some(TimeDependence::default())
2622                }
2623                // Materialized views, continual tasks, etc, aren't managed by storage.
2624                Other => {}
2625                Sink { .. } => {}
2626            };
2627        }
2628        Ok(result)
2629    }
2630}
2631
2632/// Wraps either a "critical" [SinceHandle] or a leased [ReadHandle].
2633///
2634/// When a [StorageCollections] is in read-only mode, we will only ever acquire
2635/// [ReadHandle], because acquiring the [SinceHandle] and driving forward its
2636/// since is considered a write. Conversely, when in read-write mode, we acquire
2637/// [SinceHandle].
2638#[derive(Debug)]
2639enum SinceHandleWrapper<T>
2640where
2641    T: TimelyTimestamp + Lattice + Codec64,
2642{
2643    Critical(SinceHandle<SourceData, (), T, StorageDiff, PersistEpoch>),
2644    Leased(ReadHandle<SourceData, (), T, StorageDiff>),
2645}
2646
2647impl<T> SinceHandleWrapper<T>
2648where
2649    T: TimelyTimestamp + Lattice + Codec64 + TotalOrder + Sync,
2650{
2651    pub fn since(&self) -> &Antichain<T> {
2652        match self {
2653            Self::Critical(handle) => handle.since(),
2654            Self::Leased(handle) => handle.since(),
2655        }
2656    }
2657
2658    pub fn opaque(&self) -> PersistEpoch {
2659        match self {
2660            Self::Critical(handle) => handle.opaque().clone(),
2661            Self::Leased(_handle) => {
2662                // The opaque is expected to be used with
2663                // `compare_and_downgrade_since`, and the leased handle doesn't
2664                // have a notion of an opaque. We pretend here and in
2665                // `compare_and_downgrade_since`.
2666                PersistEpoch(None)
2667            }
2668        }
2669    }
2670
2671    pub async fn compare_and_downgrade_since(
2672        &mut self,
2673        expected: &PersistEpoch,
2674        new: (&PersistEpoch, &Antichain<T>),
2675    ) -> Result<Antichain<T>, PersistEpoch> {
2676        match self {
2677            Self::Critical(handle) => handle.compare_and_downgrade_since(expected, new).await,
2678            Self::Leased(handle) => {
2679                let (opaque, since) = new;
2680                assert_none!(opaque.0);
2681
2682                handle.downgrade_since(since).await;
2683
2684                Ok(since.clone())
2685            }
2686        }
2687    }
2688
2689    pub async fn maybe_compare_and_downgrade_since(
2690        &mut self,
2691        expected: &PersistEpoch,
2692        new: (&PersistEpoch, &Antichain<T>),
2693    ) -> Option<Result<Antichain<T>, PersistEpoch>> {
2694        match self {
2695            Self::Critical(handle) => {
2696                handle
2697                    .maybe_compare_and_downgrade_since(expected, new)
2698                    .await
2699            }
2700            Self::Leased(handle) => {
2701                let (opaque, since) = new;
2702                assert_none!(opaque.0);
2703
2704                handle.maybe_downgrade_since(since).await;
2705
2706                Some(Ok(since.clone()))
2707            }
2708        }
2709    }
2710
2711    pub fn snapshot_stats(
2712        &self,
2713        id: GlobalId,
2714        as_of: Option<Antichain<T>>,
2715    ) -> BoxFuture<'static, Result<SnapshotStats, StorageError<T>>> {
2716        match self {
2717            Self::Critical(handle) => {
2718                let res = handle
2719                    .snapshot_stats(as_of)
2720                    .map(move |x| x.map_err(|_| StorageError::ReadBeforeSince(id)));
2721                Box::pin(res)
2722            }
2723            Self::Leased(handle) => {
2724                let res = handle
2725                    .snapshot_stats(as_of)
2726                    .map(move |x| x.map_err(|_| StorageError::ReadBeforeSince(id)));
2727                Box::pin(res)
2728            }
2729        }
2730    }
2731
2732    pub fn snapshot_stats_from_txn(
2733        &self,
2734        id: GlobalId,
2735        data_snapshot: DataSnapshot<T>,
2736    ) -> BoxFuture<'static, Result<SnapshotStats, StorageError<T>>> {
2737        match self {
2738            Self::Critical(handle) => Box::pin(
2739                data_snapshot
2740                    .snapshot_stats_from_critical(handle)
2741                    .map(move |x| x.map_err(|_| StorageError::ReadBeforeSince(id))),
2742            ),
2743            Self::Leased(handle) => Box::pin(
2744                data_snapshot
2745                    .snapshot_stats_from_leased(handle)
2746                    .map(move |x| x.map_err(|_| StorageError::ReadBeforeSince(id))),
2747            ),
2748        }
2749    }
2750}
2751
2752/// State maintained about individual collections.
2753#[derive(Debug, Clone)]
2754struct CollectionState<T> {
2755    /// Description with which the collection was created
2756    pub description: CollectionDescription<T>,
2757
2758    /// Accumulation of read capabilities for the collection.
2759    ///
2760    /// This accumulation will always contain `self.implied_capability`, but may
2761    /// also contain capabilities held by others who have read dependencies on
2762    /// this collection.
2763    pub read_capabilities: MutableAntichain<T>,
2764
2765    /// The implicit capability associated with collection creation.  This
2766    /// should never be less than the since of the associated persist
2767    /// collection.
2768    pub implied_capability: Antichain<T>,
2769
2770    /// The policy to use to downgrade `self.implied_capability`.
2771    pub read_policy: ReadPolicy<T>,
2772
2773    /// Storage identifiers on which this collection depends.
2774    pub storage_dependencies: Vec<GlobalId>,
2775
2776    /// Reported write frontier.
2777    pub write_frontier: Antichain<T>,
2778
2779    pub collection_metadata: CollectionMetadata,
2780}
2781
2782impl<T: TimelyTimestamp> CollectionState<T> {
2783    /// Creates a new collection state, with an initial read policy valid from
2784    /// `since`.
2785    pub fn new(
2786        description: CollectionDescription<T>,
2787        since: Antichain<T>,
2788        write_frontier: Antichain<T>,
2789        storage_dependencies: Vec<GlobalId>,
2790        metadata: CollectionMetadata,
2791    ) -> Self {
2792        let mut read_capabilities = MutableAntichain::new();
2793        read_capabilities.update_iter(since.iter().map(|time| (time.clone(), 1)));
2794        Self {
2795            description,
2796            read_capabilities,
2797            implied_capability: since.clone(),
2798            read_policy: ReadPolicy::NoPolicy {
2799                initial_since: since,
2800            },
2801            storage_dependencies,
2802            write_frontier,
2803            collection_metadata: metadata,
2804        }
2805    }
2806
2807    /// Returns whether the collection was dropped.
2808    pub fn is_dropped(&self) -> bool {
2809        self.read_capabilities.is_empty()
2810    }
2811}
2812
2813/// A task that keeps persist handles, downgrades sinces when asked,
2814/// periodically gets recent uppers from them, and updates the shard collection
2815/// state when needed.
2816///
2817/// This shares state with [StorageCollectionsImpl] via `Arcs` and channels.
2818#[derive(Debug)]
2819struct BackgroundTask<T: TimelyTimestamp + Lattice + Codec64> {
2820    config: Arc<Mutex<StorageConfiguration>>,
2821    cmds_tx: mpsc::UnboundedSender<BackgroundCmd<T>>,
2822    cmds_rx: mpsc::UnboundedReceiver<BackgroundCmd<T>>,
2823    holds_rx: mpsc::UnboundedReceiver<(GlobalId, ChangeBatch<T>)>,
2824    finalizable_shards: Arc<ShardIdSet>,
2825    collections: Arc<std::sync::Mutex<BTreeMap<GlobalId, CollectionState<T>>>>,
2826    // So we know what shard ID corresponds to what global ID, which we need
2827    // when re-enqueing futures for determining the next upper update.
2828    shard_by_id: BTreeMap<GlobalId, ShardId>,
2829    since_handles: BTreeMap<GlobalId, SinceHandleWrapper<T>>,
2830    txns_handle: Option<WriteHandle<SourceData, (), T, StorageDiff>>,
2831    txns_shards: BTreeSet<GlobalId>,
2832}
2833
2834#[derive(Debug)]
2835enum BackgroundCmd<T: TimelyTimestamp + Lattice + Codec64> {
2836    Register {
2837        id: GlobalId,
2838        is_in_txns: bool,
2839        write_handle: WriteHandle<SourceData, (), T, StorageDiff>,
2840        since_handle: SinceHandleWrapper<T>,
2841    },
2842    DowngradeSince(Vec<(GlobalId, Antichain<T>)>),
2843    SnapshotStats(
2844        GlobalId,
2845        SnapshotStatsAsOf<T>,
2846        oneshot::Sender<SnapshotStatsRes<T>>,
2847    ),
2848}
2849
2850/// A newtype wrapper to hang a Debug impl off of.
2851pub(crate) struct SnapshotStatsRes<T>(BoxFuture<'static, Result<SnapshotStats, StorageError<T>>>);
2852
2853impl<T> Debug for SnapshotStatsRes<T> {
2854    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2855        f.debug_struct("SnapshotStatsRes").finish_non_exhaustive()
2856    }
2857}
2858
2859impl<T> BackgroundTask<T>
2860where
2861    T: TimelyTimestamp
2862        + Lattice
2863        + Codec64
2864        + From<EpochMillis>
2865        + TimestampManipulation
2866        + Into<mz_repr::Timestamp>
2867        + Sync,
2868{
2869    async fn run(&mut self) {
2870        // Futures that fetch the recent upper from all other shards.
2871        let mut upper_futures: FuturesUnordered<
2872            std::pin::Pin<
2873                Box<
2874                    dyn Future<
2875                            Output = (
2876                                GlobalId,
2877                                WriteHandle<SourceData, (), T, StorageDiff>,
2878                                Antichain<T>,
2879                            ),
2880                        > + Send,
2881                >,
2882            >,
2883        > = FuturesUnordered::new();
2884
2885        let gen_upper_future =
2886            |id, mut handle: WriteHandle<_, _, _, _>, prev_upper: Antichain<T>| {
2887                let fut = async move {
2888                    soft_assert_or_log!(
2889                        !prev_upper.is_empty(),
2890                        "cannot await progress when upper is already empty"
2891                    );
2892                    handle.wait_for_upper_past(&prev_upper).await;
2893                    let new_upper = handle.shared_upper();
2894                    (id, handle, new_upper)
2895                };
2896
2897                fut
2898            };
2899
2900        let mut txns_upper_future = match self.txns_handle.take() {
2901            Some(txns_handle) => {
2902                let upper = txns_handle.upper().clone();
2903                let txns_upper_future =
2904                    gen_upper_future(GlobalId::Transient(1), txns_handle, upper);
2905                txns_upper_future.boxed()
2906            }
2907            None => async { std::future::pending().await }.boxed(),
2908        };
2909
2910        loop {
2911            tokio::select! {
2912                (id, handle, upper) = &mut txns_upper_future => {
2913                    trace!("new upper from txns shard: {:?}", upper);
2914                    let mut uppers = Vec::new();
2915                    for id in self.txns_shards.iter() {
2916                        uppers.push((*id, &upper));
2917                    }
2918                    self.update_write_frontiers(&uppers).await;
2919
2920                    let fut = gen_upper_future(id, handle, upper);
2921                    txns_upper_future = fut.boxed();
2922                }
2923                Some((id, handle, upper)) = upper_futures.next() => {
2924                    if id.is_user() {
2925                        trace!("new upper for collection {id}: {:?}", upper);
2926                    }
2927                    let current_shard = self.shard_by_id.get(&id);
2928                    if let Some(shard_id) = current_shard {
2929                        if shard_id == &handle.shard_id() {
2930                            // Still current, so process the update and enqueue
2931                            // again!
2932                            let uppers = &[(id, &upper)];
2933                            self.update_write_frontiers(uppers).await;
2934                            if !upper.is_empty() {
2935                                let fut = gen_upper_future(id, handle, upper);
2936                                upper_futures.push(fut.boxed());
2937                            }
2938                        } else {
2939                            // Be polite and expire the write handle. This can
2940                            // happen when we get an upper update for a write
2941                            // handle that has since been replaced via Update.
2942                            handle.expire().await;
2943                        }
2944                    }
2945                }
2946                cmd = self.cmds_rx.recv() => {
2947                    let cmd = if let Some(cmd) = cmd {
2948                        cmd
2949                    } else {
2950                        // We're done!
2951                        break;
2952                    };
2953
2954                    match cmd {
2955                        BackgroundCmd::Register{ id, is_in_txns, write_handle, since_handle } => {
2956                            debug!("registering handles for {}", id);
2957                            let previous = self.shard_by_id.insert(id, write_handle.shard_id());
2958                            if previous.is_some() {
2959                                panic!("already registered a WriteHandle for collection {id}");
2960                            }
2961
2962                            let previous = self.since_handles.insert(id, since_handle);
2963                            if previous.is_some() {
2964                                panic!("already registered a SinceHandle for collection {id}");
2965                            }
2966
2967                            if is_in_txns {
2968                                self.txns_shards.insert(id);
2969                            } else {
2970                                let upper = write_handle.upper().clone();
2971                                if !upper.is_empty() {
2972                                    let fut = gen_upper_future(id, write_handle, upper);
2973                                    upper_futures.push(fut.boxed());
2974                                }
2975                            }
2976
2977                        }
2978                        BackgroundCmd::DowngradeSince(cmds) => {
2979                            self.downgrade_sinces(cmds).await;
2980                        }
2981                        BackgroundCmd::SnapshotStats(id, as_of, tx) => {
2982                            // NB: The requested as_of could be arbitrarily far
2983                            // in the future. So, in order to avoid blocking
2984                            // this loop until it's available and the
2985                            // `snapshot_stats` call resolves, instead return
2986                            // the future to the caller and await it there.
2987                            let res = match self.since_handles.get(&id) {
2988                                Some(x) => {
2989                                    let fut: BoxFuture<
2990                                        'static,
2991                                        Result<SnapshotStats, StorageError<T>>,
2992                                    > = match as_of {
2993                                        SnapshotStatsAsOf::Direct(as_of) => {
2994                                            x.snapshot_stats(id, Some(as_of))
2995                                        }
2996                                        SnapshotStatsAsOf::Txns(data_snapshot) => {
2997                                            x.snapshot_stats_from_txn(id, data_snapshot)
2998                                        }
2999                                    };
3000                                    SnapshotStatsRes(fut)
3001                                }
3002                                None => SnapshotStatsRes(Box::pin(futures::future::ready(Err(
3003                                    StorageError::IdentifierMissing(id),
3004                                )))),
3005                            };
3006                            // It's fine if the listener hung up.
3007                            let _ = tx.send(res);
3008                        }
3009                    }
3010                }
3011                Some(holds_changes) = self.holds_rx.recv() => {
3012                    let mut batched_changes = BTreeMap::new();
3013                    batched_changes.insert(holds_changes.0, holds_changes.1);
3014
3015                    while let Ok(mut holds_changes) = self.holds_rx.try_recv() {
3016                        let entry = batched_changes.entry(holds_changes.0);
3017                        entry
3018                            .and_modify(|existing| existing.extend(holds_changes.1.drain()))
3019                            .or_insert_with(|| holds_changes.1);
3020                    }
3021
3022                    let mut collections = self.collections.lock().expect("lock poisoned");
3023
3024                    let user_changes = batched_changes
3025                        .iter()
3026                        .filter(|(id, _c)| id.is_user())
3027                        .map(|(id, c)| {
3028                            (id.clone(), c.clone())
3029                        })
3030                        .collect_vec();
3031
3032                    if !user_changes.is_empty() {
3033                        trace!(?user_changes, "applying holds changes from channel");
3034                    }
3035
3036                    StorageCollectionsImpl::update_read_capabilities_inner(
3037                        &self.cmds_tx,
3038                        &mut collections,
3039                        &mut batched_changes,
3040                    );
3041                }
3042            }
3043        }
3044
3045        warn!("BackgroundTask shutting down");
3046    }
3047
3048    #[instrument(level = "debug")]
3049    async fn update_write_frontiers(&mut self, updates: &[(GlobalId, &Antichain<T>)]) {
3050        let mut read_capability_changes = BTreeMap::default();
3051
3052        let mut self_collections = self.collections.lock().expect("lock poisoned");
3053
3054        for (id, new_upper) in updates.iter() {
3055            let collection = if let Some(c) = self_collections.get_mut(id) {
3056                c
3057            } else {
3058                trace!(
3059                    "Reference to absent collection {id}, due to concurrent removal of that collection"
3060                );
3061                continue;
3062            };
3063
3064            if PartialOrder::less_than(&collection.write_frontier, *new_upper) {
3065                collection.write_frontier.clone_from(new_upper);
3066            }
3067
3068            let mut new_read_capability = collection
3069                .read_policy
3070                .frontier(collection.write_frontier.borrow());
3071
3072            if id.is_user() {
3073                trace!(
3074                    %id,
3075                    implied_capability = ?collection.implied_capability,
3076                    policy = ?collection.read_policy,
3077                    write_frontier = ?collection.write_frontier,
3078                    ?new_read_capability,
3079                    "update_write_frontiers");
3080            }
3081
3082            if PartialOrder::less_equal(&collection.implied_capability, &new_read_capability) {
3083                let mut update = ChangeBatch::new();
3084                update.extend(new_read_capability.iter().map(|time| (time.clone(), 1)));
3085                std::mem::swap(&mut collection.implied_capability, &mut new_read_capability);
3086                update.extend(new_read_capability.iter().map(|time| (time.clone(), -1)));
3087
3088                if !update.is_empty() {
3089                    read_capability_changes.insert(*id, update);
3090                }
3091            }
3092        }
3093
3094        if !read_capability_changes.is_empty() {
3095            StorageCollectionsImpl::update_read_capabilities_inner(
3096                &self.cmds_tx,
3097                &mut self_collections,
3098                &mut read_capability_changes,
3099            );
3100        }
3101    }
3102
3103    async fn downgrade_sinces(&mut self, cmds: Vec<(GlobalId, Antichain<T>)>) {
3104        for (id, new_since) in cmds {
3105            let since_handle = if let Some(c) = self.since_handles.get_mut(&id) {
3106                c
3107            } else {
3108                // This can happen when someone concurrently drops a collection.
3109                trace!("downgrade_sinces: reference to absent collection {id}");
3110                continue;
3111            };
3112
3113            if id.is_user() {
3114                trace!("downgrading since of {} to {:?}", id, new_since);
3115            }
3116
3117            let epoch = since_handle.opaque().clone();
3118            let result = if new_since.is_empty() {
3119                // A shard's since reaching the empty frontier is a prereq for
3120                // being able to finalize a shard, so the final downgrade should
3121                // never be rate-limited.
3122                let res = Some(
3123                    since_handle
3124                        .compare_and_downgrade_since(&epoch, (&epoch, &new_since))
3125                        .await,
3126                );
3127
3128                info!(%id, "removing persist handles because the since advanced to []!");
3129
3130                let _since_handle = self.since_handles.remove(&id).expect("known to exist");
3131                let dropped_shard_id = if let Some(shard_id) = self.shard_by_id.remove(&id) {
3132                    shard_id
3133                } else {
3134                    panic!("missing GlobalId -> ShardId mapping for id {id}");
3135                };
3136
3137                // We're not responsible for writes to tables, so we also don't
3138                // de-register them from the txn system. Whoever is responsible
3139                // will remove them. We only make sure to remove the table from
3140                // our tracking.
3141                self.txns_shards.remove(&id);
3142
3143                if !self
3144                    .config
3145                    .lock()
3146                    .expect("lock poisoned")
3147                    .parameters
3148                    .finalize_shards
3149                {
3150                    info!(
3151                        "not triggering shard finalization due to dropped storage object because enable_storage_shard_finalization parameter is false"
3152                    );
3153                    return;
3154                }
3155
3156                info!(%id, %dropped_shard_id, "enqueing shard finalization due to dropped collection and dropped persist handle");
3157
3158                self.finalizable_shards.lock().insert(dropped_shard_id);
3159
3160                res
3161            } else {
3162                since_handle
3163                    .maybe_compare_and_downgrade_since(&epoch, (&epoch, &new_since))
3164                    .await
3165            };
3166
3167            if let Some(Err(other_epoch)) = result {
3168                mz_ore::halt!("fenced by envd @ {other_epoch:?}. ours = {epoch:?}");
3169            }
3170        }
3171    }
3172}
3173
3174struct FinalizeShardsTaskConfig {
3175    envd_epoch: NonZeroI64,
3176    config: Arc<Mutex<StorageConfiguration>>,
3177    metrics: StorageCollectionsMetrics,
3178    finalizable_shards: Arc<ShardIdSet>,
3179    finalized_shards: Arc<ShardIdSet>,
3180    persist_location: PersistLocation,
3181    persist: Arc<PersistClientCache>,
3182    read_only: bool,
3183}
3184
3185async fn finalize_shards_task<T>(
3186    FinalizeShardsTaskConfig {
3187        envd_epoch,
3188        config,
3189        metrics,
3190        finalizable_shards,
3191        finalized_shards,
3192        persist_location,
3193        persist,
3194        read_only,
3195    }: FinalizeShardsTaskConfig,
3196) where
3197    T: TimelyTimestamp + Lattice + Codec64 + Sync,
3198{
3199    if read_only {
3200        info!("disabling shard finalization in read only mode");
3201        return;
3202    }
3203
3204    let mut interval = tokio::time::interval(Duration::from_secs(5));
3205    interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
3206    loop {
3207        interval.tick().await;
3208
3209        if !config
3210            .lock()
3211            .expect("lock poisoned")
3212            .parameters
3213            .finalize_shards
3214        {
3215            debug!(
3216                "not triggering shard finalization due to dropped storage object because enable_storage_shard_finalization parameter is false"
3217            );
3218            continue;
3219        }
3220
3221        let current_finalizable_shards = {
3222            // We hold the lock for as short as possible and pull our cloned set
3223            // of shards.
3224            finalizable_shards.lock().iter().cloned().collect_vec()
3225        };
3226
3227        if current_finalizable_shards.is_empty() {
3228            debug!("no shards to finalize");
3229            continue;
3230        }
3231
3232        debug!(?current_finalizable_shards, "attempting to finalize shards");
3233
3234        // Open a persist client to delete unused shards.
3235        let persist_client = persist.open(persist_location.clone()).await.unwrap();
3236
3237        let metrics = &metrics;
3238        let finalizable_shards = &finalizable_shards;
3239        let finalized_shards = &finalized_shards;
3240        let persist_client = &persist_client;
3241        let diagnostics = &Diagnostics::from_purpose("finalizing shards");
3242
3243        let force_downgrade_since = STORAGE_DOWNGRADE_SINCE_DURING_FINALIZATION
3244            .get(config.lock().expect("lock poisoned").config_set());
3245
3246        let epoch = &PersistEpoch::from(envd_epoch);
3247
3248        futures::stream::iter(current_finalizable_shards.clone())
3249            .map(|shard_id| async move {
3250                let persist_client = persist_client.clone();
3251                let diagnostics = diagnostics.clone();
3252                let epoch = epoch.clone();
3253
3254                metrics.finalization_started.inc();
3255
3256                let is_finalized = persist_client
3257                    .is_finalized::<SourceData, (), T, StorageDiff>(shard_id, diagnostics)
3258                    .await
3259                    .expect("invalid persist usage");
3260
3261                if is_finalized {
3262                    debug!(%shard_id, "shard is already finalized!");
3263                    Some(shard_id)
3264                } else {
3265                    debug!(%shard_id, "finalizing shard");
3266                        let finalize = || async move {
3267                        // TODO: thread the global ID into the shard finalization WAL
3268                        let diagnostics = Diagnostics::from_purpose("finalizing shards");
3269
3270                        let schemas = persist_client.latest_schema::<SourceData, (), T, StorageDiff>(shard_id, diagnostics.clone()).await.expect("codecs have not changed");
3271                        let (key_schema, val_schema) = match schemas {
3272                            Some((_, key_schema, val_schema)) => (key_schema, val_schema),
3273                            None => (RelationDesc::empty(), UnitSchema),
3274                        };
3275
3276                        let empty_batch: Vec<((SourceData, ()), T, StorageDiff)> = vec![];
3277                        let mut write_handle: WriteHandle<SourceData, (), T, StorageDiff> =
3278                            persist_client
3279                                .open_writer(
3280                                    shard_id,
3281                                    Arc::new(key_schema),
3282                                    Arc::new(val_schema),
3283                                    diagnostics,
3284                                )
3285                                .await
3286                                .expect("invalid persist usage");
3287
3288                        let upper = write_handle.upper();
3289
3290                        if !upper.is_empty() {
3291                            let append = write_handle
3292                                .append(empty_batch, upper.clone(), Antichain::new())
3293                                .await?;
3294
3295                            if let Err(e) = append {
3296                                warn!(%shard_id, "tried to finalize a shard with an advancing upper: {e:?}");
3297                                return Ok(());
3298                            }
3299                        }
3300                        write_handle.expire().await;
3301
3302                        if force_downgrade_since {
3303                            let mut since_handle: SinceHandle<
3304                                SourceData,
3305                                (),
3306                                T,
3307                                StorageDiff,
3308                                PersistEpoch,
3309                            > = persist_client
3310                                .open_critical_since(
3311                                    shard_id,
3312                                    PersistClient::CONTROLLER_CRITICAL_SINCE,
3313                                    Diagnostics::from_purpose("finalizing shards"),
3314                                )
3315                                .await
3316                                .expect("invalid persist usage");
3317                            let handle_epoch = since_handle.opaque().clone();
3318                            let our_epoch = epoch.clone();
3319                            let epoch = if our_epoch.0 > handle_epoch.0 {
3320                                // We're newer, but it's fine to use the
3321                                // handle's old epoch to try and downgrade.
3322                                handle_epoch
3323                            } else {
3324                                // Good luck, buddy! The downgrade below will
3325                                // not succeed. There's a process with a newer
3326                                // epoch out there and someone at some juncture
3327                                // will fence out this process.
3328                                our_epoch
3329                            };
3330                            let new_since = Antichain::new();
3331                            let downgrade = since_handle
3332                                .compare_and_downgrade_since(&epoch, (&epoch, &new_since))
3333                                .await;
3334                            if let Err(e) = downgrade {
3335                                warn!(
3336                                    "tried to finalize a shard with an advancing epoch: {e:?}"
3337                                );
3338                                return Ok(());
3339                            }
3340                            // Not available now, so finalization is broken.
3341                            // since_handle.expire().await;
3342                        }
3343
3344                        persist_client
3345                            .finalize_shard::<SourceData, (), T, StorageDiff>(
3346                                shard_id,
3347                                Diagnostics::from_purpose("finalizing shards"),
3348                            )
3349                            .await
3350                    };
3351
3352                    match finalize().await {
3353                        Err(e) => {
3354                            // Rather than error, just leave this shard as
3355                            // one to finalize later.
3356                            warn!("error during finalization of shard {shard_id}: {e:?}");
3357                            None
3358                        }
3359                        Ok(()) => {
3360                            debug!(%shard_id, "finalize success!");
3361                            Some(shard_id)
3362                        }
3363                    }
3364                }
3365            })
3366            // Poll each future for each collection concurrently, maximum of 10
3367            // at a time.
3368            // TODO(benesch): the concurrency here should be configurable
3369            // via LaunchDarkly.
3370            .buffer_unordered(10)
3371            // HERE BE DRAGONS: see warning on other uses of buffer_unordered.
3372            // The closure passed to `for_each` must remain fast or we risk
3373            // starving the finalization futures of calls to `poll`.
3374            .for_each(|shard_id| async move {
3375                match shard_id {
3376                    None => metrics.finalization_failed.inc(),
3377                    Some(shard_id) => {
3378                        // We make successfully finalized shards available for
3379                        // removal from the finalization WAL one by one, so that
3380                        // a handful of stuck shards don't prevent us from
3381                        // removing the shards that have made progress. The
3382                        // overhead of repeatedly acquiring and releasing the
3383                        // locks is negligible.
3384                        {
3385                            let mut finalizable_shards = finalizable_shards.lock();
3386                            let mut finalized_shards = finalized_shards.lock();
3387                            finalizable_shards.remove(&shard_id);
3388                            finalized_shards.insert(shard_id);
3389                        }
3390
3391                        metrics.finalization_succeeded.inc();
3392                    }
3393                }
3394            })
3395            .await;
3396
3397        debug!("done finalizing shards");
3398    }
3399}
3400
3401#[derive(Debug)]
3402pub(crate) enum SnapshotStatsAsOf<T: TimelyTimestamp + Lattice + Codec64> {
3403    /// Stats for a shard with an "eager" upper (one that continually advances
3404    /// as time passes, even if no writes are coming in).
3405    Direct(Antichain<T>),
3406    /// Stats for a shard with a "lazy" upper (one that only physically advances
3407    /// in response to writes).
3408    Txns(DataSnapshot<T>),
3409}
3410
3411#[cfg(test)]
3412mod tests {
3413    use std::str::FromStr;
3414    use std::sync::Arc;
3415
3416    use mz_build_info::DUMMY_BUILD_INFO;
3417    use mz_dyncfg::ConfigSet;
3418    use mz_ore::assert_err;
3419    use mz_ore::metrics::{MetricsRegistry, UIntGauge};
3420    use mz_ore::now::SYSTEM_TIME;
3421    use mz_ore::url::SensitiveUrl;
3422    use mz_persist_client::cache::PersistClientCache;
3423    use mz_persist_client::cfg::PersistConfig;
3424    use mz_persist_client::rpc::PubSubClientConnection;
3425    use mz_persist_client::{Diagnostics, PersistClient, PersistLocation, ShardId};
3426    use mz_persist_types::codec_impls::UnitSchema;
3427    use mz_repr::{RelationDesc, Row};
3428    use mz_secrets::InMemorySecretsController;
3429
3430    use super::*;
3431
3432    #[mz_ore::test(tokio::test)]
3433    #[cfg_attr(miri, ignore)] // unsupported operation: integer-to-pointer casts and `ptr::from_exposed_addr`
3434    async fn test_snapshot_stats(&self) {
3435        let persist_location = PersistLocation {
3436            blob_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
3437            consensus_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
3438        };
3439        let persist_client = PersistClientCache::new(
3440            PersistConfig::new_default_configs(&DUMMY_BUILD_INFO, SYSTEM_TIME.clone()),
3441            &MetricsRegistry::new(),
3442            |_, _| PubSubClientConnection::noop(),
3443        );
3444        let persist_client = Arc::new(persist_client);
3445
3446        let (cmds_tx, mut background_task) =
3447            BackgroundTask::new_for_test(persist_location.clone(), Arc::clone(&persist_client));
3448        let background_task =
3449            mz_ore::task::spawn(|| "storage_collections::background_task", async move {
3450                background_task.run().await
3451            });
3452
3453        let persist = persist_client.open(persist_location).await.unwrap();
3454
3455        let shard_id = ShardId::new();
3456        let since_handle = persist
3457            .open_critical_since(
3458                shard_id,
3459                PersistClient::CONTROLLER_CRITICAL_SINCE,
3460                Diagnostics::for_tests(),
3461            )
3462            .await
3463            .unwrap();
3464        let write_handle = persist
3465            .open_writer::<SourceData, (), mz_repr::Timestamp, StorageDiff>(
3466                shard_id,
3467                Arc::new(RelationDesc::empty()),
3468                Arc::new(UnitSchema),
3469                Diagnostics::for_tests(),
3470            )
3471            .await
3472            .unwrap();
3473
3474        cmds_tx
3475            .send(BackgroundCmd::Register {
3476                id: GlobalId::User(1),
3477                is_in_txns: false,
3478                since_handle: SinceHandleWrapper::Critical(since_handle),
3479                write_handle,
3480            })
3481            .unwrap();
3482
3483        let mut write_handle = persist
3484            .open_writer::<SourceData, (), mz_repr::Timestamp, StorageDiff>(
3485                shard_id,
3486                Arc::new(RelationDesc::empty()),
3487                Arc::new(UnitSchema),
3488                Diagnostics::for_tests(),
3489            )
3490            .await
3491            .unwrap();
3492
3493        // No stats for unknown GlobalId.
3494        let stats =
3495            snapshot_stats(&cmds_tx, GlobalId::User(2), Antichain::from_elem(0.into())).await;
3496        assert_err!(stats);
3497
3498        // Stats don't resolve for as_of past the upper.
3499        let stats_fut = snapshot_stats(&cmds_tx, GlobalId::User(1), Antichain::from_elem(1.into()));
3500        assert_none!(stats_fut.now_or_never());
3501
3502        // // Call it again because now_or_never consumed our future and it's not clone-able.
3503        let stats_ts1_fut =
3504            snapshot_stats(&cmds_tx, GlobalId::User(1), Antichain::from_elem(1.into()));
3505
3506        // Write some data.
3507        let data = (
3508            (SourceData(Ok(Row::default())), ()),
3509            mz_repr::Timestamp::from(0),
3510            1i64,
3511        );
3512        let () = write_handle
3513            .compare_and_append(
3514                &[data],
3515                Antichain::from_elem(0.into()),
3516                Antichain::from_elem(1.into()),
3517            )
3518            .await
3519            .unwrap()
3520            .unwrap();
3521
3522        // Verify that we can resolve stats for ts 0 while the ts 1 stats call is outstanding.
3523        let stats = snapshot_stats(&cmds_tx, GlobalId::User(1), Antichain::from_elem(0.into()))
3524            .await
3525            .unwrap();
3526        assert_eq!(stats.num_updates, 1);
3527
3528        // Write more data and unblock the ts 1 call
3529        let data = (
3530            (SourceData(Ok(Row::default())), ()),
3531            mz_repr::Timestamp::from(1),
3532            1i64,
3533        );
3534        let () = write_handle
3535            .compare_and_append(
3536                &[data],
3537                Antichain::from_elem(1.into()),
3538                Antichain::from_elem(2.into()),
3539            )
3540            .await
3541            .unwrap()
3542            .unwrap();
3543
3544        let stats = stats_ts1_fut.await.unwrap();
3545        assert_eq!(stats.num_updates, 2);
3546
3547        // Make sure it runs until at least here.
3548        drop(background_task);
3549    }
3550
3551    async fn snapshot_stats<T: TimelyTimestamp + Lattice + Codec64>(
3552        cmds_tx: &mpsc::UnboundedSender<BackgroundCmd<T>>,
3553        id: GlobalId,
3554        as_of: Antichain<T>,
3555    ) -> Result<SnapshotStats, StorageError<T>> {
3556        let (tx, rx) = oneshot::channel();
3557        cmds_tx
3558            .send(BackgroundCmd::SnapshotStats(
3559                id,
3560                SnapshotStatsAsOf::Direct(as_of),
3561                tx,
3562            ))
3563            .unwrap();
3564        let res = rx.await.expect("BackgroundTask should be live").0;
3565
3566        res.await
3567    }
3568
3569    impl<T: TimelyTimestamp + Lattice + Codec64> BackgroundTask<T> {
3570        fn new_for_test(
3571            _persist_location: PersistLocation,
3572            _persist_client: Arc<PersistClientCache>,
3573        ) -> (mpsc::UnboundedSender<BackgroundCmd<T>>, Self) {
3574            let (cmds_tx, cmds_rx) = mpsc::unbounded_channel();
3575            let (_holds_tx, holds_rx) = mpsc::unbounded_channel();
3576            let connection_context =
3577                ConnectionContext::for_tests(Arc::new(InMemorySecretsController::new()));
3578
3579            let task = Self {
3580                config: Arc::new(Mutex::new(StorageConfiguration::new(
3581                    connection_context,
3582                    ConfigSet::default(),
3583                ))),
3584                cmds_tx: cmds_tx.clone(),
3585                cmds_rx,
3586                holds_rx,
3587                finalizable_shards: Arc::new(ShardIdSet::new(
3588                    UIntGauge::new("finalizable_shards", "dummy gauge for tests").unwrap(),
3589                )),
3590                collections: Arc::new(Mutex::new(BTreeMap::new())),
3591                shard_by_id: BTreeMap::new(),
3592                since_handles: BTreeMap::new(),
3593                txns_handle: None,
3594                txns_shards: BTreeSet::new(),
3595            };
3596
3597            (cmds_tx, task)
3598        }
3599    }
3600}